前言
OKHttp是当前Android使用最广泛的网络请求框架,由Square公司开源。Google在Android4.4以后开始将源码中的HttpURLConnection底层实现替换为OKHttp,同时现在流行的Retrofit框架底层同样是使用OKHttp的。github链接
本文以最新版本4.10.0为例进行代码分析 ;
优点:
支持Http1、Http2、Quic以及WebSocket; 连接池复用底层TCP(Socket),减少请求延时 无缝的支持GZIP减少数据流量 缓存响应数据减少重复的网络请求 请求失败自动重试主机的其他ip,自动重定向 - …
请求执行流程
在使用OKHttp发起一次请求时,对于使用者最少存在OKHttpClient 、Request 、Call 三个角色。其中OKHttpClient 和Request 的创建可以使用Builder(建造者模式)。而Call 则是把Request 交给OKHttpClient 之后返回的一个已准备好执行的请求。
建造者模式:将一个复杂的构建与其表示相分离,使用同样的构建过程可以创建不同的表示。实例化OKHttpClient和Request的时候,因为有太多的属性需要设置,而且开发者的需求组合千变万化,使用建造者模式可以让用户不需要关心这个类的内部细节,配置好后,建造者会帮助我们按部就班的初始化表示对象。
同时OKHttp在设计时采用的外观模式 ,将整个系统的复杂性给隐藏起来,将子系统接口通过一个客户端OKHttpClient 统一暴露出来。
OKHttpClient 中全是一些配置,比如代理的配置、ssl证书的配置等。而Call 本身是一个接口,我们获得的实现为RealCall 。
class RealCall(
val client: OkHttpClient,
val originalRequest: Request,
val forWebSocket: Boolean
) : Call
Call 的execute 代表了同步请求,而enqueue 则代表异步请求。两者唯一区别在于一个会直接发送网络请求,而另一个使用OKHttp 内置的线程池来进行。
### 同步请求
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
### 异步请求
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
我们可以看出,最终请求都是交给dispatcher 分发器进行进一步的执行。
分发器
Dispatcher ,分发器就是来调配请求任务的,内部会包含一个线程池,可以在创建OKHttpClient 时,传递我们自己定义的线程来创建分发器。
我们先了解下Dispatcher 中的一些涉及到请求的成员变量;
var maxRequests = 64
var maxRequestsPerHost = 5
var idleCallback: Runnable? = null
val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
private val runningSyncCalls = ArrayDeque<RealCall>()
同步请求
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
同步请求不需要线程池,也不存在任何限制。所以分发器仅做一下记录。
异步请求
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
当正在执行的任务没有超过最大限制64,同时asyncCall.callsPerHost.get() >= this.maxRequestsPerHost 同一Host请求不超过5个,则会添加到正在执行队列,同时提交给线程池,否则先加入等待队列。
加入线程池就直接执行,但是如果加入等待队列后,就需要等待有空闲名额才开始执行,因此每次执行完一个请求后,都会调用分发器的finished 方法;
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
执行完了移除正在执行队列中的元素,结束后会再次调用promoteAndExecute() ,查找满足条件的异步任务进行执行。
分发器线程池
分发器是用来调配请求任务的,内部包含了一个自定义线程池executorService ,如下:
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
那为什么要这么定义呢?
我们先回顾下线程池调度策略和常用等待队列:
- 线程池调度策略
1.如果线程池中的线程数量未达到核心线程的数量,那么直接启动一个核心线程来执行任务; 2.如果线程池中的线程数量已经达到或者超过核心线程的数量,那么任务会被插入到任务队列中排队等待执行; 3.如果在步骤2中无法将任务插入到任务队列中,这往往是由于任务队列已满,这个时候如果线程数量未达到线程池规定的最大值,那么会立刻启动一个非核心线程来执行任务; 4.如果步骤3中线程数量已经达到线程池规定的最大数量,那么就会拒绝执行此任务,ThreadPoolExecutor会调用RejectedExecutionHandler的rejectedExecution方法来通知调用者; - 常用等待队列
ArrayBlockingQueue 、LinkedBlockQueue 与SynchronousQueue 。
假设向线程池提交任务时,核心线程都被占用的情况下:
ArrayBlockingQueue :基于数组的阻塞队列,初始化需要指定固定大小; 当使用此队列时,向线程池提交任务,会首先加入到等待队列中,当等待队列满了以后,再次提交任务,尝试加入队列就会失败,这时就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交任务。所以最终可能出现后提交的任务先执行,而先提交的任务会一直在等待。
LinkedBlockQueue :基于链表实现的阻塞队列,初始化可以指定大小,也可以不指定; 当指定大小后,行为就和ArrayBlockingQueue 一致。而如果未指定大小,则会默认使用Int.MAX_VALUE 作为队列大小。这时候就会出现线程池的最大线程数参数无用,因为无论如何,向线程池提交任务加入等待队列都会成功,最终意味着所有任务都是在核心线程执行。如何核心线程一直被占,那就一直等待。
SynchronousQueue :无容量队列。 使用此队列意味着希望获取最大并发量。因为无论如何,向线程池提交任务,往队列提交任务都会失败,而失败后,如果没有空闲的非核心线程,就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务,完全没有任何等待,唯一制约它的就是最大线程数的个数。因此一般配合Int.MAX_VALUE 就实现了真正的无等待。
但是需要注意的是,进程的内存是存在限制的,线程并不能无限个数,那么当设置最大线程为Int.MAX_VALUE 时,OkHttp同时还有最大请求任务个数:64的限制,这样既解决了这个问题同时也能获取最大吞吐。
因此结合分发器定义的线程池分析如下: 首先核心线程数为0,表示线程池不会一直为我们缓存线程,线程池中所有线程都是在60s内没有工作就会被回收,而最大线程数Int.MAX_VALUE与等待队列SynchronousQueue的组合能够得到最大的吞吐量。当需要线程执行任务时,如果不存在空闲线程不需要等待,马上新建线程执行任务。
请求流程
用户是不需要直接操作分发器的,获取到RealCall 后就分别调用execute 和enqueue 来进行同步或异步请求;
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
可以看到同步异步请求最终都会通过getResponseWithInterceptorChain 方法来执行请求,下一篇我们继续学习OKHttp关于getResponseWithInterceptorChain 涉及的各个拦截器;
结语
OKHttp 源码中有许多值得我们学习的地方,比如使用时涉及的建造者设计模式 ,再如分发器中自定义的线程池 等等,对于我们开发中很有帮助;
如果以上文章对您有一点点帮助,希望您不要吝啬的点个赞加个关注,您每一次小小的举动都是我坚持写作的不懈动力!?( ′・?・` )
|