OkHttp内部关键在于拦截器的处理来实现,把网络请求封装到各个拦截器来实现,实现了各层的解耦。
我们首先发起一个请求:
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectTimeout(6, TimeUnit.SECONDS)
.readTimeout(6, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder().url(strUrl).build();
try {
Response response = client.newCall(request).execute();
return response.body().string();
} catch (IOException e) {
e.printStackTrace();
}
内部请求流程
最终实现的代码是OkHttp会调用newCall,返回一个RealCall对象,并调用execute同步方法。其中RealCall 是管理网络请求的类。
## RealCall
override fun execute(): Response {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.timeoutEnter()
transmitter.callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
知道了这个三个方法后。我们一个个来看~
先看这里调用了dispatcher 的excuted 方法,那Dispatcher 在里面起到什么作用?可以看到有三个数组队列进行处理
### Dispatcher
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
private val runningSyncCalls = ArrayDeque<RealCall>()
在我们上一步过程中,调用了
### Dispatcher
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue
i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
这个方法promoteAndExecute 会在入队列和上一个任务执行结束后调用。我们再来看一下里面的executorService 线程池的用法。
### Dispatcher
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}
SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue。
分析好后,再看第二个方法getResponseWithInterceptorChain()`
### RealCall
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors】
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
.....
return response
}
......
}
可以看到addInterceptor(interceptor) 所设置的拦截器会在所有其他Intercept 处理之前运行。后面就是默认的五个拦截器。
拦截器 | 作用 |
---|
应用拦截器 | 处理header头信息, | RetryAndFollowUpInterceptor | 负责出错重试,重定向 | BridgeInterceptor | 填充http请求协议中的head头信息 | CacheInterceptor | 缓存拦截器,如果命中缓存则不会发起网络请求。 | ConnectInterceptor | 连接池拦截器,Okhttp中的核心 | networkInterceptors | 自定义网络拦截器,用于监控网络传输数据 | CallServerInterceptor | 负责和网络的收发 |
拦截器待会分析,把流程先走完。最终会执行chain.proceed(originalRequest) ,看下内部实现
### RealInterceptorChain
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
calls++
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
责任链在不同拦截器对于proceed的调用次数有不同限制,ConnectInterceptor 拦截器及其之后的拦截器能且只能调用一次。因为网络握手、连接、发送请求的工作发生在这些拦截器内,表示正式发出了一次网络请求。而在这之前的拦截器可以执行多次proceed。
其中主要实现了创建下一级责任链,然后取出当前拦截器,调用intercept 方法并传入创建的责任链,经过拦截链一级一级的调用,最终执行到CallServerInterceptor的 intercept返回 Response`对象。
缓存拦截器
要了解缓存拦截器的实现原理,首先就要先知道Http缓存的知识点。Http缓存分为两类(强制缓存和对比缓存)。
彻底弄懂HTTP缓存机制及原理
强制缓存
强制缓存是指网络请求响应header标识了Expires或Cache-Control带了max-age信息,而此时客户端计算缓存并未过期,则可以直接使用本地缓存内容,而不用真正的发起一次网络请求。
对比缓存
浏览器第一次请求数据时,服务器会将缓存标识与数据一起返回给客户端,客户端将二者备份至缓存数据库中。 再次请求数据时,客户端将备份的缓存标识发送给服务器,服务器根据缓存标识进行判断,判断成功后,返回304状态码,通知客户端比较成功,可以使用缓存数据。
在对比缓存中,缓存标识的传递需要重点理解下。
- Last-Madified: 服务器在响应请求时,告诉浏览器资源的最后修改时间。
- If-Modified-Since:再次请求服务器时,通过此字段通知服务器上次请求时,服务器返回的资源最后修改时间。
- Etag:(优先级高于Last-Modified / If-Modified-Since)服务器响应请求时,告诉浏览器当前资源在服务器的唯一标识。
- If-None-Match:再次请求时,通过此字段通知服务器客户端缓存数据的唯一标识。
对于强制缓存,服务器通知浏览器一个缓存时间,在缓存时间内,下次请求,直接用缓存,不在时间内,执行比较缓存策略。 对于比较缓存,将缓存信息中的Etag和Last-Modified通过请求发送给服务器,由服务器校验,返回304状态码时,浏览器直接使用缓存。
在CacheInterceptor 会调用intercept 方法
###CacheInterceptor
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response
} else {
cacheResponse.body?.closeQuietly()
}
}
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
}
}
}
return response
}
用到了使用缓存策略CacheStrategy 来确定是否使用缓存。
可以理解为:
- 网络和缓存都不能用 ,返回504
- 网络networkResponse为null, cacheResponse肯定就不为null,那么就使用缓存
- networkResponse不为null,不管cacheResponse是否为null,直接去请求网络
- cacheResponse 不为null,如果网络请求返回304 标识服务端资源没有修改
- 如果不是304,说明服务端资源有更新,将网络数据和缓存传入
- 如果网络响应可缓存,返回
cacheWritingResponse - 最后一步,不是get请求就移除缓存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
再来看下CacheStrategy 中处理了什么?
###CacheStrategy
class Factory(
private val nowMillis: Long,
internal val request: Request,
private val cacheResponse: Response?
) {
private var servedDate: Date? = null
private var servedDateString: String? = null
private var lastModified: Date? = null
private var lastModifiedString: String? = null
private var expires: Date? = null
private var sentRequestMillis = 0L
private var receivedResponseMillis = 0L
private var etag: String? = null
private var ageSeconds = -1
private fun isFreshnessLifetimeHeuristic(): Boolean {
return cacheResponse!!.cacheControl.maxAgeSeconds == -1 && expires == null
}
init {
if (cacheResponse != null) {
this.sentRequestMillis = cacheResponse.sentRequestAtMillis
this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis
val headers = cacheResponse.headers
for (i in 0 until headers.size) {
val fieldName = headers.name(i)
val value = headers.value(i)
when {
fieldName.equals("Date", ignoreCase = true) -> {
servedDate = value.toHttpDateOrNull()
servedDateString = value
}
fieldName.equals("Expires", ignoreCase = true) -> {
expires = value.toHttpDateOrNull()
}
fieldName.equals("Last-Modified", ignoreCase = true) -> {
lastModified = value.toHttpDateOrNull()
lastModifiedString = value
}
fieldName.equals("ETag", ignoreCase = true) -> {
etag = value
}
fieldName.equals("Age", ignoreCase = true) -> {
ageSeconds = value.toNonNegativeInt(-1)
}
}
}
}
}
.....
}
再看下它怎么返回CacheStrategy
### CacheStrategy
fun compute(): CacheStrategy {
val candidate = computeCandidate()
if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
return CacheStrategy(null, null)
}
return candidate
}
private fun computeCandidate(): CacheStrategy {
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
val requestCaching = request.cacheControl
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
val responseCaching = cacheResponse.cacheControl
val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
var minFreshMillis: Long = 0
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}
var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
else -> return CacheStrategy(request, null)
}
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}
CacheStrategy 处理了缓存请求过程:
-
没有缓存、是https但没有握手、要求不可缓存、忽略缓存或手动配置缓存过期,都是 直接进行 网络请求。 -
以上都不满足时,如果缓存没过期,那么就是用缓存(可能要添加警告)。 -
如果缓存过期了,但响应头有Etag,Last-Modified,Date,就添加这些header 进行条件网络请求。 -
如果缓存过期了,且响应头没有设置Etag,Last-Modified,Date,就进行网络请求。
还有个问题,缓存响应保存在哪里?
不废话了直接看,放在了DiskLruCache 当中。
### CacheInterceptor
val cacheCandidate = cache?.get(chain.request())
### Cahce
internal fun get(request: Request): Response? {
val key = key(request.url)
val snapshot: DiskLruCache.Snapshot = try {
cache[key] ?: return null
} catch (_: IOException) {
return null
}
......
return response
}
现在应该清楚Okhttp 缓存的实现了吧。
连接池拦截器
终于到核心的拦截器了~
先来看看ConnectInterceptor 做了什么
### ConnectInterceptor
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()
val doExtensiveHealthChecks = request.method != "GET"
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
}
}
下面来看下transmitter.newExchange 做了什么操作
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
synchronized(connectionPool) {
check(!noMoreExchanges) { "released" }
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
}
val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
ExchangeFinder
exchangeFinder 是主要负责真正的IO操作 写请求、读响应,本质是为请求寻找一个TCP连接。
### ExchangeFinder
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
synchronized(connectionPool) {
if (candidate.successCount == 0) {
return candidate
}
}
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges()
continue
}
return candidate
}
}
循环寻找连接。如果是不健康的连接,标记未不可用然后继续查找。
### ExchangeFinder
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
hasStreamFailure = false
releasedConnection = transmitter.connection
toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
transmitter.releaseConnectionNoEvents()
} else {
null
}
if (transmitter.connection != null) {
result = transmitter.connection
releasedConnection = null
}
if (result == null) {
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true
result = transmitter.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
} else if (retryCurrentRoute()) {
selectedRoute = transmitter.connection!!.route()
}
}
}
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
return result!!
}
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
newRouteSelection = true
routeSelection = routeSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
if (newRouteSelection) {
routes = routeSelection!!.routes
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true
result = transmitter.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
connectionPool.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
result!!.noNewExchanges = true
socket = result!!.socket()
result = transmitter.connection
nextRouteToTry = selectedRoute
} else {
connectionPool.put(result!!)
transmitter.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
寻找连接,经历了三次从连接池中寻找。
ConnectionPool
相同Address的http请求共享一个连接,ConnectioonPool 就是实现了连接的复用。
### ConnectionPool
class ConnectionPool(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) {
internal val delegate = RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit)
constructor() : this(5, 5, TimeUnit.MINUTES)
fun idleConnectionCount(): Int = delegate.idleConnectionCount()
fun connectionCount(): Int = delegate.connectionCount()
fun evictAll() {
delegate.evictAll()
}
}
目前,该池最多可容纳 5 个空闲连接,这些连接将在 5 分钟不活动后被逐出。可以看到真正实现它的类是RealConnectionPool 。这样就知道了它是okhttp内部真实管理连接的地方。
先看下连接放进去连接池做了什么操作:
### RealConnectionPool
fun put(connection: RealConnection) {
assert(Thread.holdsLock(this))
if (!cleanupRunning) {
cleanupRunning = true
executor.execute(cleanupRunnable)
}
connections.add(connection)
}
private val cleanupRunnable = object : Runnable {
override fun run() {
while (true) {
val waitNanos = cleanup(System.nanoTime())
if (waitNanos == -1L) return
try {
this@RealConnectionPool.lockAndWaitNanos(waitNanos)
} catch (ie: InterruptedException) {
evictAll()
}
}
}
}
这里为什么要开启一个清理线程,因为连接池有最大空闲连接数和最大空闲时间的限制,所以不满足的时候就要进行清理。接下来再看下cleanup() 方法
### RealConnectionPool
fun cleanup(now: Long): Long {
var inUseConnectionCount = 0
var idleConnectionCount = 0
var longestIdleConnection: RealConnection? = null
var longestIdleDurationNs = Long.MIN_VALUE
synchronized(this) {
for (connection in connections) {
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++
continue
}
idleConnectionCount++
val idleDurationNs = now - connection.idleAtNanos
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
}
}
when {
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
connections.remove(longestIdleConnection)
}
idleConnectionCount > 0 -> {
return keepAliveDurationNs - longestIdleDurationNs
}
inUseConnectionCount > 0 -> {
return keepAliveDurationNs
}
else -> {
cleanupRunning = false
return -1
}
}
}
longestIdleConnection!!.socket().closeQuietly()
return 0
}
可以总结下
- 如果最长的空闲时间大于5分钟或空闲连接大于5,就移除关闭这个最长的空闲时间。
- 如果有空闲连接,就返回到5分钟的剩余时间
- 没有空闲连接就等5分钟再次清理
- 没有连接不清理
### RealConnectionPool
private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
val references = connection.transmitters
var i = 0
while (i < references.size) {
val reference = references[i]
if (reference.get() != null) {
i++
continue
}
val transmitterRef = reference as TransmitterReference
val message = "A connection to ${connection.route().address.url} was leaked. " +
"Did you forget to close a response body?"
Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace)
references.removeAt(i)
connection.noNewExchanges = true
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs
return 0
}
}
return references.size
}
transmitters size 大于1即表示多个请求复用此连接。
将连接从连接池中取出来怎么操做?
### RealConnectionPool
fun transmitterAcquirePooledConnection(
address: Address,
transmitter: Transmitter,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
assert(Thread.holdsLock(this))
for (connection in connections) {
if (requireMultiplexed && !connection.isMultiplexed) continue
if (!connection.isEligible(address, routes)) continue
transmitter.acquireConnectionNoEvents(connection)
return true
}
return false
}
这里面有个方法要关注下 isEligible
### RealConnection
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
if (transmitters.size >= allocationLimit || noNewExchanges) return false
if (!this.route.address.equalsNonHost(address)) return false
if (address.url.host == this.route().address.url.host) {
return true
}
if (http2Connection == null) return false
if (routes == null || !routeMatchesAny(routes)) return false
if (address.hostnameVerifier !== OkHostnameVerifier) return false
if (!supportsUrl(address.url)) return false
try {
address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
} catch (_: SSLPeerUnverifiedException) {
return false
}
return true
}
取的过程就是 遍历连接池,进行地址等一系列匹配。
到这里,okHttp大部分关键的都看完了。
带着问题看源码
- Okhttp源码流程,线程池
- Okhttp拦截器,addInterceptor 和 addNetworkdInterceptor区别
- Okhttp责任链模式
- Okhttp缓存怎么处理
- Okhttp连接池和socket复用
参考:
听说你熟悉OkHttp原理?
拦截器详解1:重试重定向、桥、缓存
|