IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> Android网络框架OkHttp源码分析(2)——拦截器分析 -> 正文阅读

[移动开发]Android网络框架OkHttp源码分析(2)——拦截器分析

Android网络框架——OkHttp源码分析

本文基于Kotlin语言讲解

上一篇文章:
Android网络框架OkHttp源码分析(1)——请求流程
讲解了OkHttp的请求流程,这篇文章进入第二部分——OkHttp的拦截器分析。

OkHttp源码分析之拦截器分析

上一章节中发送请求的过程中有一个重要的方法getResponseWithInterceptorChain(),里面会创建许多拦截器,它们会在请求发送到服务器之前对请求做一系列的处理,在服务器返回响应之后,同样这些拦截器对响应做了一系列处理后才返回。拦截器是okhttp的一个重要的核心功能,在各个拦截器功能实现的同时也会牵扯出OkHttp的缓存和连接机制:

// RealCall的getResponseWithInterceptorChain方法
Response getResponseWithInterceptorChain() throws IOException {
    // 创建一个拦截器链
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//添加自定义应用拦截器
    interceptors.add(retryAndFollowUpInterceptor);.//添加负责重试重定向的拦截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//添加负责转换请求响应的拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));//添加负责缓存的拦截器
    interceptors.add(new ConnectInterceptor(client));//添加负责管理连接的拦截器
    if (!forWebSocket) {
        //没有特殊要求,一般不添加该拦截器
        interceptors.addAll(client.networkInterceptors());//添加我们自定义的网络拦截器
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));//添加负责发起请求获取响应的拦截器
    //创建链条
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

	//调用Chain的proceed(Request)方法处理请求(该方法返回一个response)
    return chain.proceed(originalRequest);
}

上述代码我们能够看到拦截器可以分为以下几个类别:

  • 自定义拦截器
  • RetryAndFollowUpInterceptor(重定向拦截器)
  • BridgeInterceptor(桥拦截器)
  • CacheInterceptor(缓存拦截器)
  • ConnectInterceptor(连接拦截器)
  • CallServerInterceptor(请求服务器拦截器)
  • networkInterceptors()(网络拦截器,不常用,本文不予讲解)

一、自定义拦截器

我们先看一下OkHttp中拦截器接口:

fun interface Interceptor {
  @Throws(IOException::class)
  //拦截器逻辑
  fun intercept(chain: Chain): Response

  companion object {

    inline operator fun invoke(crossinline block: (chain: Chain) -> Response): Interceptor =
      Interceptor { block(it) }
  }

  interface Chain {
  	//返回Request
    fun request(): Request

    @Throws(IOException::class)
    //调用Chain的proceed方法处理Request返回Response
    fun proceed(request: Request): Response
    //自定义应用拦截器一般为null
    fun connection(): Connection?
	//返回对应的Call对象
    fun call(): Call
	//连接超时
    fun connectTimeoutMillis(): Int

    fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
	//读入超时
    fun readTimeoutMillis(): Int

    fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
	//写入超时
    fun writeTimeoutMillis(): Int

    fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
  }
}

可以看出拦截器接口主要由intercept(Chain)方法内部接口Chain两个大部分组成,下面是我们自定义一个拦截器:

class MyInterceptor : Interceptor {
    @Throws(IOException::class)
    fun intercept(chain: Chain): Response {

        //1、获取Request
        val request: Request = chain.request()

        //2、 处理Request,逻辑部分自行定义
        //...

        //3、调用Chain的proceed(Request)方法处理请求,得到Response
		response = chain.proceed(request)
        //4、 处理Response,逻辑部分自行定义
        //...

        //5、返回Response
        return response
    }
}

自定义好我们的自定义拦截器后,该怎么将自定义拦截器添加至我们的OkHttp配置中呢?请看如下java代码:

OkHttpClient client = new OkHttpClient.Builder()
     .addInterceptor(new MyInterceptor())//通过addInterceptor()方法添加自定义拦截器
     .build();

二、RetryAndFollowUpInterceptor

//RetryAndFollowUpInterceptor类的intercept方法
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //获取request对象
    var request = chain.request
    //获取RealCall对象
    val call = realChain.call
    var followUpCount = 0
    var priorResponse: Response? = null
    var newExchangeFinder = true
    var recoveredFailures = listOf<IOException>()
    while (true) {
      //注释1,内部创建ExchangeFinder
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)

      var response: Response
      var closeActiveExchange = true
      try {
        if (call.isCanceled()) {
          throw IOException("Canceled")
        }

        try {
          //调用proceed方法,里面调用下一个拦截器BridgeInterceptor的intercept方法(上一篇文章含有讲解)
          response = realChain.proceed(request)
          //成功请求,说明该ExchangeFinder被占用,newExchangeFinder设置true表示下次创建新的ExchangeFinder
          newExchangeFinder = true
        } catch (e: RouteException) {
          //调用recover方法检测连接是否可以继续使用
          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          //请求失败,说明该ExchangeFinder被闲置,newExchangeFinder设置false表示下次不创建新的ExchangeFinder而是用该ExchangeFinder
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
          //调用recover方法检测连接是否可以继续使用
          if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
            throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          //请求失败,说明该ExchangeFinder被闲置,newExchangeFinder设置false表示下次不创建新的ExchangeFinder而是用该ExchangeFinder
          newExchangeFinder = false
          continue
        }
		//如果之前有重定向的请求,直接返回该重定向请求的响应
        if (priorResponse != null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        val exchange = call.interceptorScopedExchange
        val followUp = followUpRequest(response, exchange)

        //followUp为空,不需要重定向,直接返回Response
        if (followUp == null) {
		//...
          return response
        }

        //followUp不为空,需要重定向
        val followUpBody = followUp.body
        if (followUpBody != null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          return response
        }

        response.body?.closeQuietly()
		//重定向次数过多,抛出异常
        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }
		//以重定向后的Request再次重试
        request = followUp
        priorResponse = response
      } finally {
      //异常情况下,释放连接资源
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }
  }

注释1:

  fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    check(interceptorScopedExchange == null)
	//...
    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

上述代码可以清楚看到,这里根据newExchangeFinder值是否返回一个ExchangeFinder对象,这个对象的作用是请求寻找一个可用的TCP连接,其中有一个参数将request.url封装进Address对象中,看看其实现:

  private fun createAddress(url: HttpUrl): Address {
    var sslSocketFactory: SSLSocketFactory? = null
    var hostnameVerifier: HostnameVerifier? = null
    var certificatePinner: CertificatePinner? = null
    if (url.isHttps) {
      sslSocketFactory = client.sslSocketFactory
      hostnameVerifier = client.hostnameVerifier
      certificatePinner = client.certificatePinner
    }

    return Address(
        uriHost = url.host,
        uriPort = url.port,
        dns = client.dns,
        socketFactory = client.socketFactory,
        sslSocketFactory = sslSocketFactory,
        hostnameVerifier = hostnameVerifier,
        certificatePinner = certificatePinner,
        proxyAuthenticator = client.proxyAuthenticator,
        proxy = client.proxy,
        protocols = client.protocols,
        connectionSpecs = client.connectionSpecs,
        proxySelector = client.proxySelector
    )
  }

可以看到返回结果使用urlclient配置 创建一个Address实例。Address是指向服务的连接的地址,可以理解为请求地址及其配置。

Address有一个重要作用:相同Address的HTTP请求 共享 相同的连接,这其实就是对 HTTP1.1和HTTP2.0 复用连接 的请求的判断。

这里我们先记住在RetryAndFollowUpInterceptor类的intercept方法中首先创建了一个ExchangeFinder对象,后面会用到!

三、BridgeInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    if (body != null) {
      //进行Header的封装
      val contentType = body.contentType()
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    //如果我们添加一个“Accept-Encoding:gzip”头字段,我们也负责解压缩传输流。
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }
	//创建OkhttpClient配置的cookieJar
    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }
	//调用proceed方法,里面调用下一个拦截器ChcheInterceptor的intercept方法(上一篇文章含有讲解)
    val networkResponse = chain.proceed(requestBuilder.build())

	//以下步骤根据网络响应构建用户响应
    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
  }

同它的名字,它是一个网络连接的桥梁,负责把用户构造的请求转换为发送给服务器的请求,把服务器返回的响应转换为对用户友好的响应。

  • 在Request阶段配置用户信息,并添加一些请求头。
  • 在Response阶段,取消部分请求头,进行gzip解压。

四、CacheInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    //1. 读取候选缓存
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()
	//2. 创建缓存策略,强制缓存、对比缓存等
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

	//根据缓存策略,更新统计指标:请求次数、使用网络请求次数、使用缓存次数
    cache?.trackResponse(strategy)
    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

    if (cacheCandidate != null && cacheResponse == null) {
      //若缓存不可用,关闭
      cacheCandidate.body?.closeQuietly()
    }

    //3. 根据策略,不使用网络,又没有缓存的直接报错,并返回错误码504
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    //4.缓存可用,则返回缓存中数据
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build().also {
            listener.cacheHit(call, it)
          }
    }

    if (cacheResponse != null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if (cache != null) {
      listener.cacheMiss(call)
    }

    var networkResponse: Response? = null
    try {
      //5.调用proceed方法,里面调用下一个拦截器ConnectInterceptor的intercept方法(上一篇文章含有讲解)
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    //6. 接收到网络结果,如果响应code式304,则使用缓存,返回缓存结果
    if (cacheResponse != null) {
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        //更新cacheResponse
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response.also {
          listener.cacheHit(call, it)
        }
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }
	//7. 读取网络结果
    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()
	//8. 对数据进行缓存
    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if (cacheResponse != null) {
            // This will log a conditional cache miss only.
            listener.cacheMiss(call)
          }
        }
      }

      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
  }

CacheStrategy是okhttp缓存策略的实现,okhttp缓存策略遵循了HTTP缓存策略,下面推荐一篇文章,了解了HTTP缓存策略后,我们再来看CacheStrategy:
浏览器 HTTP 协议缓存机制详解

注释2处,调用CacheStrategy类的compute方法获得缓存策略:

    fun compute(): CacheStrategy {
      val candidate = computeCandidate()

      // We're forbidden from using the network and the cache is insufficient.
      if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
        return CacheStrategy(null, null)
      }

      return candidate
    }

其内部调用computeCandidate()方法:

   private fun computeCandidate(): CacheStrategy {
      //没有缓存响应
      if (cacheResponse == null) {
        return CacheStrategy(request, null)
      }

      //如果缺少所需的握手,则删除缓存的响应
      if (request.isHttps && cacheResponse.handshake == null) {
        return CacheStrategy(request, null)
      }

      //如果不可以缓存
      if (!isCacheable(cacheResponse, request)) {
        return CacheStrategy(request, null)
      }

      val requestCaching = request.cacheControl
      请求头nocache或者请求头包含If-Modified-Since或者If-None-Match(意味着本地缓存过期,需要服务器验证本地缓存是不是还能继续使用)
      if (requestCaching.noCache || hasConditions(request)) {
        return CacheStrategy(request, null)
      }
	  //走到这里表示Response缓存可用

	  //获得Response的缓存控制字段CacheControl
      val responseCaching = cacheResponse.cacheControl
	  //获得该Response已经缓存的时长
      val ageMillis = cacheResponseAge()
      //获得该Response可以缓存的时长
      var freshMillis = computeFreshnessLifetime()

      if (requestCaching.maxAgeSeconds != -1) {
        freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
      }

      var minFreshMillis: Long = 0
      if (requestCaching.minFreshSeconds != -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
      }

      var maxStaleMillis: Long = 0
      if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
      }
	  //判断缓存是否过期,决定是否使用Response缓存:Response已经缓存的时长 < max-stale + max-age
      if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        val builder = cacheResponse.newBuilder()
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
        }
        val oneDayMillis = 24 * 60 * 60 * 1000L
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
        }
        //缓存没有过期,直接使用该Response缓存
        return CacheStrategy(null, builder.build())
      }

      //缓存过期了,判断是否设置了Etag或Last-Modified等标记
      val conditionName: String
      val conditionValue: String?
      when {
        etag != null -> {
          conditionName = "If-None-Match"
          conditionValue = etag
        }

        lastModified != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = lastModifiedString
        }

        servedDate != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = servedDateString
        }
		//缓存没有设置Etag或Last-Modified等标记,所以直接进行网络请求
        else -> return CacheStrategy(request, null)
      }
	  //缓存设置了Etag或Last-Modified等标记,所以添加If-None-Match或If-Modified-Since请求头,构造请求,交给服务器判断缓存是否可用
      val conditionalRequestHeaders = request.headers.newBuilder()
      conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

      val conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build()
      return CacheStrategy(conditionalRequest, cacheResponse)
    }

可见OkHttp缓存是根据HTTP的缓存策略决定networkRequest和cacheResponse的组合:

  • 1、强制缓存:客户端参与决策决定是否继续使用缓存,客户端第一次请求数据时,服务端返回了缓存的过期时间:Expires或Cache-Control,当客户端再次请求时,就判断缓存的过期时间,没有过期就可以继续使用缓存,否则就不使用,重新请求服务端。
  • 2、对比缓存:服务端参与决策决定是否继续使用缓存,客户端第一次请求数据时,服务端会将缓存标识:Last-Modified/If-Modified-Since、Etag/If-None-Match和数据一起返回给客户端 ,当客户端再次请求时,客户端将缓存标识发送给服务端,服务端根据缓存标识进行判断,如果缓存还没有更新,可以使用,则返回304,表示客户端可以继续使用缓存,否则客户端不能继续使用缓存,只能使用服务器返回的新的响应。

五、ConnectInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //返回一个Exchange实例,实际就是创建可用连接
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    //调用proceed方法,里面调用下一个拦截器CallServer的intercept方法(上一篇文章含有讲解)
    return connectedChain.proceed(realChain.request)
  }

看一下initExchange是如何实现的:

  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released" }
      check(!responseBodyOpen)
      check(!requestBodyOpen)
    }
	//1.获取ExchangFinder,重定向拦截器中定义过
    val exchangeFinder = this.exchangeFinder!!
    //2.获取编码解码器
    val codec = exchangeFinder.find(client, chain)
    //3.创建连接
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    return result
  }

注释2:

  fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      //1.根据clien的配置获取一个健康的可用连接
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      //2.返回这个连接的编码解码器
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

先进入findHealthyConnection方法看返回了一个怎样的连接:

  @Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      //寻找连接
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // 可用连接即返回
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }
		
	//如果异常进入下个循环继续获得连接...

继续跟进到findConnection,这个方法就比较难理解,看注释能方便加强理解:

  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    //尝试使用已经创建过的连接,已经创建过的连接可能已经被限制创建新的流
    val callConnection = call.connection 
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        //如果已经创建过的连接已经被限制创建新的流,就释放该连接(releaseConnectionNoEvents中会把该连接置空),并返回该连接的Socket以关闭
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

      //1.已经创建过的连接还能使用,就直接使用它当作结果
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    //走到这里说明我们需要一个新的连接,给它新的统计数据。
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    //2.第一次尝试从连接池中找可用的连接
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    //配置路由
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      routes = null
      route = routeSelection!!.next()
    } else {
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      //第二次尝试从连接池中找可用连接(比第一次多一个路由参数)
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    //还是没拿到连接,所以这里新创建一个连接,后面会进行Socket连接
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    //如果我们之前创建了同一地址的多路复用连接,释放新创建的连接并获取之前的连接
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      //获取池中可用的多路复用连接
      val result = call.connection!!
      nextRouteToTry = route
      //释放新生成的那个连接
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      //如果新生成的连接,将其放入连接池
      connectionPool.put(newConnection)
      //将新建的连接替换call.connection
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }

先进入callAcquirePooledConnection方法看其如何拿到可用连接:

  fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
  //遍历连接池
    for (connection in connections) {
      synchronized(connection) {
        //如果需要多路复用但该连接不能多路复用(适用于HTTP2)直接返回false
        if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
        //如果连接有资格使用,就取出连接赋值给call.connection
        if (!connection.isEligible(address, routes)) return@synchronized
        call.acquireConnectionNoEvents(connection)
        return true
      }
    }
    return false
  }

我们看下什么才算是有资格使用?

  internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
    assertThreadHoldsLock()

    //如果不接受新请求或请求数超出限制返回false
    if (calls.size >= allocationLimit || noNewExchanges) return false

    //如果url非主机段地址不同,返回fasle
    if (!this.route.address.equalsNonHost(address)) return false

    //如果host相同就返回true
    if (address.url.host == this.route().address.url.host) {
      return true // This connection is a perfect match.
    }
    
    //HTTP2的情况下
    //HTTP2连接为空,返回false
    if (http2Connection == null) return false

    //路由必须共享一个IP地址,否则返回false
    if (routes == null || !routeMatchesAny(routes)) return false

    //此连接服务证书必须涵盖主机
    if (address.hostnameVerifier !== OkHostnameVerifier) return false
    if (!supportsUrl(address.url)) return false

    //固定证书也必须和主机相匹配
    try {
      address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
    } catch (_: SSLPeerUnverifiedException) {
      return false
    }

    return true
  }

经过这样一些步骤我们就会返回一个RealConnection实例,接下来就回到本拦截器最上方的注释2里, 看看语句 return resultConnection.newCodec(client, chain)做了什么:

  internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
    val socket = this.socket!!
    val source = this.source!!
    val sink = this.sink!!
    val http2Connection = this.http2Connection
	//如果是Http2连接,返回一个Http2的解码器
    return if (http2Connection != null) {
      Http2ExchangeCodec(client, this, chain, http2Connection)
    } else {
    //否则返回一个Http1的解码器
      socket.soTimeout = chain.readTimeoutMillis()
      source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
      sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
      Http1ExchangeCodec(client, this, source, sink)
    }
  }

这些编码解码器,内部就是将OkHttp请求转换为HTTP请求报文,返回结果时将响应报文转换为对用户友好的报文

拿到专属的编码解码器以后,就将解码器和exchangeFinder对象封装进Exchange对象并且返回。

Exchange 与 Request 一一对应,新建一个请求时就会创建一个 Exchange,该 Exchange 负责将这个请求发送出去并读取到响应数据,而发送与接收数据使用的是 ExchangeCodec。

六、CallServerInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()
    
	//1.通过上一个拦截器创建的Exchange对象,写入请求的Header
    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
	//...
	  //通过okio写入请求的Body
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        //...
    } else {
      exchange.noRequestBody()
    }
	//...
	
    //通过Exchange的readResponseHeaders(boolean)方法读取响应的header
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
    //获取响应后,通过Builder模式构造Response
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      //即使我们没有请求,服务器也发送了 100-continue。再次尝试读取实际响应状态。
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }
	//读取请求头结束
    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
     // 请求者已要求服务器切换协议,服务器已确认并准备切换,但我们需要确保拦截器看到非空响应主体。
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
      	  //返回具有响应体的response
          .body(exchange.openResponseBody(response))
          .build()
    }
	//...
    }
    return response
  }

在ConnectInterceptor中我们已经建立了连接,连接到了服务器,获取了输入输出流,所以CallServerInterceptor的intercept(Chain)方法逻辑就是把请求发送到服务器,然后获取服务器的响应

这个拦截器的整个流程就是:

  1. 写入请求头
  2. 写入请求体
  3. 读取响应头
  4. 读取响应体

关于OkHttp两篇文章参考文献:
okhttp3源码分析之请求流程
okhttp3源码分析之拦截器
OKhttp源码解析详解系列
Okhttp3源码分析

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2021-07-24 11:36:36  更:2021-07-24 11:38:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/1 23:16:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码