1.Okhttp请求流程:
Okhttp内部的大致请求流程图如下所示:
?使用方式:
public class OkHttpUtils {
private static String TAG = "OkHttpUtils";
private static String url = "http://www.baidu.com";
//OkHttp 异步get请求
public static void OkHttpEnqueue() {
OkHttpClient okHttpClient = new OkHttpClient();
Request request = new Request.Builder()
.url(url)
.get() //默认就是get请求 可以不写
.build();
final Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
}
//OkHttp 同步get请求
public static void okHttpSync() {
OkHttpClient okHttpClient = new OkHttpClient();
Request request = new Request.Builder()
.url(url)
.build();
Call call = okHttpClient.newCall(request);
new Thread(new Runnable() {
@Override
public void run() {
try {
Response response = call.execute();
Log.d(TAG, "run: " + response.body().string());
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
//OkHttp 异步post方式提交String
public static void OkHttpPostEnqueue() {
MediaType mediaType = MediaType.parse("text/x-markdown; charset=utf-8");
String requestBody = "I am Jeled";
OkHttpClient okHttpClient = new OkHttpClient();
Request request = new Request.Builder()
.url(url)
.post(RequestBody.create(mediaType, requestBody))
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure:" + e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, response.protocol() + " " + response.code() + " " + response.message());
Headers header = response.headers();
for (int i = 0; i < header.size(); i++) {
Log.d(TAG, header.name(i) + ":" + header.value(i));
}
Log.d(TAG, "onResponse:" + response.body().string());
}
});
}
//OkHttp 异步post方式提交流
public static void OkHttpPost() {
RequestBody requestBody = new RequestBody() {
@Nullable
@Override
public MediaType contentType() {
return MediaType.parse("text/x-markdown; charset=utf-8");
}
@Override
public void writeTo(BufferedSink sink) throws IOException {
sink.writeUtf8("I am Jeled");
}
};
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.build();
OkHttpClient okHttpClient = new OkHttpClient();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure:" + e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, response.protocol() + " " + response.code() + " " + response.message());
Headers header = response.headers();
for (int i = 0; i < header.size(); i++) {
Log.d(TAG, header.name(i) + ":" + header.value(i));
}
Log.d(TAG, "onResponse:" + response.body().string());
}
});
}
//OkHttp 异步post方式提交文件
public static void OkHttpPostFile() {
MediaType mediaType = MediaType.parse("text/x-markdown; charset=utf-8");
File file = new File("test.md");
Request request = new Request.Builder()
.url(url)
.post(RequestBody.create(mediaType,file))
.build();
OkHttpClient okHttpClient = new OkHttpClient();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure:" + e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, response.protocol() + " " + response.code() + " " + response.message());
Headers header = response.headers();
for (int i = 0; i < header.size(); i++) {
Log.d(TAG, header.name(i) + ":" + header.value(i));
}
Log.d(TAG, "onResponse:" + response.body().string());
}
});
}
//OkHttp 异步post方式提交表单
public static void OkHttpPostForm() {
RequestBody requestBody = new FormBody.Builder()
.add("search","Jurassic Park")
.build();
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.build();
OkHttpClient okHttpClient = new OkHttpClient();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure:" + e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, response.protocol() + " " + response.code() + " " + response.message());
Headers header = response.headers();
for (int i = 0; i < header.size(); i++) {
Log.d(TAG, header.name(i) + ":" + header.value(i));
}
Log.d(TAG, "onResponse:" + response.body().string());
}
});
}
}
备注:以上就是Okhttp的get或是post的同步或异步的请求的简单使用
源码梳理:
1、拦截器:
-
用户自定义拦截器:(继承Interceptor接口,实现intercept方法) -
RetryAndFollowUpInterceptor:失败重试以及重定向拦截器 -
BridgeInterceterptor:桥接拦截器 (请求时,对必要的Header进行一些添加,接收响应时,移除必要的Header) -
CacheInterceptor:缓存拦截
1.根据request得到cache中缓存的response
2.确认 request判断缓存的策略,是否要使用了网络,缓存或两者都使用
3.调用下一个拦截器,决定从网络上来得到response
4.如果本地已经存在cacheResponse,那么让它和网络得到的networkResponse做比较,决定是否来更新缓存的cacheResponse
5.缓存未缓存过的response
缓存拦截器会根据请求的信息和缓存的响应的信息来判断是否存在缓存可用,如果有可以使用的缓存,那么就返回该缓存给用户,否则就继续使用责任链模式来从服务器中获取响应。当获取到响应的时候,又会把响应缓存到磁盘上面 -
ConnectionInterceptor:连接拦截器
1.判断当前的连接是否可以使用:流是否已经被关闭,并且已经被限制创建新的流;
2.如果当前的连接无法使用,就从连接池中获取一个连接;
3.连接池中也没有发现可用的连接,创建一个新的连接,并进行握手,然后将其放到连接池中
-
NetworkInterceptors:网络拦截器(配置OkHttpClient时设置的 NetworkInterceptors) -
CallServerInterceptor:请求拦截器(负责向服务器发送请求数据、从服务器读取响应数据)
核心
// Call the next interceptor in the chain.
//调用链的下一个拦截器
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request); //(1)
Interceptor interceptor = interceptors.get(index); //(2)
Response response = interceptor.intercept(next); //(3)
复制代码
1.实例化下一个拦截器对应的RealIterceptorChain 对象,这个对象会在传递给当前的拦截器
-
得到当前的拦截器:interceptors 是存放拦截器的ArryList -
调用当前拦截器的intercept() 方法,并将下一个拦截器的RealIterceptorChain 对象传递下去 -
除了client中用户设置的interceptor ,第一个调用的就是retryAndFollowUpInterceptor
小结
1.拦截器用了责任链设计模式 ,它将请求一层一层向下传,直到有一层能够得到Response就停止向下传递
2.然后将response 向上面的拦截器传递,然后各个拦截器会对respone 进行一些处理,最后会传到RealCall 类中通过execute 来得到response
简而言之:每一个拦截器都对应一个 RealInterceptorChain ,然后每一个interceptor 再产生下一个RealInterceptorChain,直到 List 迭代完成,如下图所示
关于调度Dispatch
在开始下面的内容前,我们先简单的对Dispatcher有个认识 👇👇👇
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
//正在准备中的异步请求队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
//运行中的异步请求
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
//同步请求
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
......
}
复制代码
1.有一个最大请求次数:64
2.有一个最大请求主机数:5
3.有一个懒加载的线程池,当执行 executorService() 方式时才创建
4.有三个队列(准备中的异步请求 | 运行中的异步请求 | 同步请求)
1.创建OkhttpClient
OkHttpClient client = new OkHttpClient()
复制代码
这部分中Java和kotlin中没有什么区别,都用了 建造者模式 ,Builder里面的可配置参数也是一样的
public OkHttpClient() {
this(new Builder());
}
OkHttpClient(Builder builder) {
....
}
public static final class Builder {
Dispatcher dispatcher;// 分发器
@Nullable Proxy proxy;
List<Protocol> protocols;
List<ConnectionSpec> connectionSpecs;// 传输层版本和连接协议
final List<Interceptor> interceptors = new ArrayList<>();// 拦截器
final List<Interceptor> networkInterceptors = new ArrayList<>();
EventListener.Factory eventListenerFactory;
ProxySelector proxySelector;
CookieJar cookieJar;
@Nullable Cache cache;
@Nullable InternalCache internalCache;// 内部缓存
SocketFactory socketFactory;
@Nullable SSLSocketFactory sslSocketFactory;// 安全套接层socket 工厂,用于HTTPS
@Nullable CertificateChainCleaner certificateChainCleaner;// 验证确认响应证书 适用 HTTPS 请求连接的主机名。
HostnameVerifier hostnameVerifier;// 验证确认响应证书 适用 HTTPS 请求连接的主机名。
CertificatePinner certificatePinner;// 证书锁定,使用CertificatePinner来约束哪些认证机构被信任。
Authenticator proxyAuthenticator;// 代理身份验证
Authenticator authenticator;// 身份验证
ConnectionPool connectionPool;// 连接池
Dns dns;
boolean followSslRedirects; // 安全套接层重定向
boolean followRedirects;// 本地重定向
boolean retryOnConnectionFailure;// 重试连接失败
int callTimeout;
int connectTimeout;
int readTimeout;
int writeTimeout;
int pingInterval;
// 这里是默认配置的构建参数
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
...
}
// 这里传入自己配置的构建参数
Builder(OkHttpClient okHttpClient) {
this.dispatcher = okHttpClient.dispatcher;
this.proxy = okHttpClient.proxy;
this.protocols = okHttpClient.protocols;
this.connectionSpecs = okHttpClient.connectionSpecs;
this.interceptors.addAll(okHttpClient.interceptors);
this.networkInterceptors.addAll(okHttpClient.networkInterceptors);
...
}
复制代码
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
//...
constructor() : this(Builder())
//...
internal constructor(okHttpClient: OkHttpClient) : this() {
this.dispatcher = okHttpClient.dispatcher
this.connectionPool = okHttpClient.connectionPool
this.interceptors += okHttpClient.interceptors
this.networkInterceptors += okHttpClient.networkInterceptors
this.eventListenerFactory = okHttpClient.eventListenerFactory
this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure
this.authenticator = okHttpClient.authenticator
this.followRedirects = okHttpClient.followRedirects
this.followSslRedirects = okHttpClient.followSslRedirects
this.cookieJar = okHttpClient.cookieJar
this.cache = okHttpClient.cache
this.dns = okHttpClient.dns
this.proxy = okHttpClient.proxy
this.proxySelector = okHttpClient.proxySelector
this.proxyAuthenticator = okHttpClient.proxyAuthenticator
this.socketFactory = okHttpClient.socketFactory
this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull
this.x509TrustManagerOrNull = okHttpClient.x509TrustManager
this.connectionSpecs = okHttpClient.connectionSpecs
this.protocols = okHttpClient.protocols
this.hostnameVerifier = okHttpClient.hostnameVerifier
this.certificatePinner = okHttpClient.certificatePinner
this.certificateChainCleaner = okHttpClient.certificateChainCleaner
this.callTimeout = okHttpClient.callTimeoutMillis
this.connectTimeout = okHttpClient.connectTimeoutMillis
this.readTimeout = okHttpClient.readTimeoutMillis
this.writeTimeout = okHttpClient.writeTimeoutMillis
this.pingInterval = okHttpClient.pingIntervalMillis
this.minWebSocketMessageToCompress = okHttpClient.minWebSocketMessageToCompress
this.routeDatabase = okHttpClient.routeDatabase
}
}
复制代码
2.执行请求
同步请求
OkHttpClient client = new OkHttpClient();
//同步请求
Response response = client.newCall(request).execute();
复制代码
整体流程
通过创建完OkHttpClient对象,调用内部的 newCall() 方法,将最终的请求交给RealCall的 execute() 方法,在该方法内部处理
1.确保Call方法只执行一次(有版本区别,请看下文)
2.通知dispatcher进入执行状态
3.通过一系列的拦截器的请求处理和响应处理得到最终的结果
4告诉dispatcher已经执行完毕
java版本
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
// RealCall为真正的请求执行者
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
复制代码
private boolean executed;
@Override public Response execute() throws IOException {
synchronized (this) {
// 每个Call只能执行一次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
// 通知dispatcher已经进入执行状态
client.dispatcher().executed(this);
// 通过一系列的拦截器请求处理和响应处理得到最终的返回结果
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
// 通知 dispatcher 自己已经执行完毕
client.dispatcher().finished(this);
}
}
复制代码
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
// 在配置 OkHttpClient 时设置的 interceptors;
interceptors.addAll(client.interceptors());
// 负责失败重试以及重定向
interceptors.add(retryAndFollowUpInterceptor);
// 请求时,对必要的Header进行一些添加,接收响应时,移除必要的Header
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 负责读取缓存直接返回、更新缓存
interceptors.add(new CacheInterceptor(client.internalCache()));
// 负责和服务器建立连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// 配置 OkHttpClient 时设置的 networkInterceptors
interceptors.addAll(client.networkInterceptors());
}
// 负责向服务器发送请求数据、从服务器读取响应数据
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 使用责任链模式开启链式调用
return chain.proceed(originalRequest);
}
// StreamAllocation 对象,它相当于一个管理类,维护了服务器连接、并发流
// 和请求之间的关系,该类还会初始化一个 Socket 连接对象,获取输入/输出流对象。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
...
// Call the next interceptor in the chain.
// 实例化下一个拦截器对应的RealIterceptorChain对象
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 得到当前的拦截器
Interceptor interceptor = interceptors.get(index);
// 调用当前拦截器的intercept()方法,并将下一个拦截器的RealIterceptorChain对象传递下去,最后得到响应
Response response = interceptor.intercept(next);
...
return response;
复制代码
Kotlin版本
//newcall真正的执行在RealCall类
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
复制代码
RealCall.kt
private val executed = AtomicBoolean()
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
复制代码
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
复制代码
在Java版本中通过使用 synchronized 关键字来保证保证线程安全,并且确保executed只会被执行一次
kotlin版本中直接移除了 synchronized 关键字,并且将executed字段设置为具有原子性特征的boolean值,且通过CAS操作去确保是否已经执行了
异步请求
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(url)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
复制代码
整体流程
- 通过Request的构建者模式创建一个Request对象,然后调用OkHttpClient内部的 newCall() 方法,将最终的请求交给RealCall的 enqueue() 方法,在方法内部进行逻辑处理
? 直接进入RealCall代码
java版本
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
复制代码
从上面看到,通过加锁后确保只会执行一次后,将当前的请求交给dispatcher拦截器中的enqueue()方法执行,那么我们来看看这个代码
Dispatcher.java
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
复制代码
在拦截器中的方法中,发现它首先将这个请求添加到了readyAsyncCalls这个队列中,你问我怎么就知道它是队列了?你还记得上面我说的吗?👆👆👆
添加到队列后接着执行**promoteAndExecute()**方法
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; //判断是否大于最大的请求数
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // 判断主机数是否已经达到最大.
// 如果其中的runningAsynCalls不满,且call占用的host小于最大数量,则将call加入到runningAsyncCalls中执行,
//利用线程池执行call否者将call加入到readyAsyncCalls中。
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
复制代码
call 加入到线程池中执行了。现在再看AsynCall的 代码,它是RealCall 中的内部类
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
// 跟同步执行一样,最后都会调用到这里
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
复制代码
kotlin版本
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
复制代码
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
复制代码
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
复制代码
inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
val host: String
get() = originalRequest.url.host
val request: Request
get() = originalRequest
val call: RealCall
get() = this@RealCall
/**
* Attempt to enqueue this async call on [executorService]. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
复制代码
AysncCall 中的execute() 中的方法,同样是通过Response response = getResponseWithInterceptorChain(); 来获得response,这样异步任务也同样通过了
总结
首先,请求的时候初始化一个Call的实例,然后执行它的**execute()方法或enqueue()方法,内部最后都会执行到getResponseWithInterceptorChain()**方法,
这个方法通过拦截器组成的责任链模式,依次经过用户自定义普通拦截器、重试拦截器(RetryAndFollowUpInterceptor)、桥接拦截器(BridgeInterceptor)、缓存拦截器(BridgeInterceptor)、连接拦截器(CallServerInterceptor)和用户自定义网络拦截器以及访问服务器拦截器等拦截处理过程,最终将获取到的响应结果交给用户
最后上图
连接拦截器
在Okhttp整个过程中,比较重要的两个拦截器,缓存拦截器和连接拦截器,关于缓存拦截器在文一开始的时候就简单的说了下👆👆👆
现在说下另一个比较重要的拦截器
Java版本
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// HttpCodec是对 HTTP 协议操作的抽象,有两个实现:Http1Codec和Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。在这个方法的内部实现连接池的复用处理
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
复制代码
通过调用了 streamAllocation 的 newStream() 方法的时候,经过一系列判断到达 StreamAllocation类 中的 findConnection() 方法
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
...
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
// 尝试使用已分配的连接,已经分配的连接可能已经被限制创建新的流
releasedConnection = this.connection;
// 释放当前连接的资源,如果该连接已经被限制创建新的流,就返回一个Socket以关闭连接
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
// 如果该连接从未被标记为获得,不要标记为发布状态,reportedAcquired 通过 acquire() 方法修改
releasedConnection = null;
}
if (result == null) {
// Attempt to get a connection from the pool.
// 尝试供连接池中获取一个连接
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
// 关闭连接
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
// 如果已经从连接池中获取到了一个连接,就将其返回
return result;
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
// 根据一系列的 IP地址从连接池中获取一个链接
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size;i++) {
Route route = routes.get(i);
// 从连接池中获取一个连接
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
// 在连接池中如果没有该连接,则创建一个新的连接,并将其分配,这样我们就可以在握手之前进行终端
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
// 如果我们在第二次的时候发现了一个池连接,那么我们就将其返回
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
// 进行 TCP 和 TLS 握手
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
// 将该连接放进连接池中
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
// 如果同时创建了另一个到同一地址的多路复用连接,释放这个连接并获取那个连接
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
复制代码
通过上面的源码也可以证明我们开头的结论是正确的,添加连接池里面去了,简单来说,连接复用省去了TCP和TLS握手的过程,因为建立连接本身也是需要消耗时间的,连接复用后就可以提升网络访问效率
最后说下ConnectionPool的作用
public final class ConnectionPool {
private final Deque<RealConnection> connections = new ArrayDeque<>();
//......
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
private final Runnable cleanupRunnable = () -> {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
}
复制代码
创建一个新的连接的时候,一方面需要它放进缓存里面另一边,另一方面对缓存进行清理。在 ConnectionPool 中,当我们向连接池中缓存一个连接的时候,只要调用双端队列的 add() 方法,将其加入到双端队列即可,而清理连接缓存的操作则交给线程池来定时执行
kotlin版本
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
Okhttp面试推荐:
面试突击:OkHttp 原理 8 连问?
|