• OkHttp原理解析


    前言

    OKHttp是当前Android使用最广泛的网络请求框架,由Square公司开源。Google在Android4.4以后开始将源码中的HttpURLConnection底层实现替换为OKHttp,同时现在流行的Retrofit框架底层同样是使用OKHttp的。github链接

    本文以最新版本4.10.0为例进行代码分析

    优点:

    • 支持Http1、Http2、Quic以及WebSocket;
    • 连接池复用底层TCP(Socket),减少请求延时
    • 无缝的支持GZIP减少数据流量
    • 缓存响应数据减少重复的网络请求
    • 请求失败自动重试主机的其他ip,自动重定向

    请求执行流程

    请求执行流程图
    在使用OKHttp发起一次请求时,对于使用者最少存在OKHttpClientRequestCall三个角色。其中OKHttpClientRequest的创建可以使用Builder(建造者模式)。而Call则是把Request交给OKHttpClient之后返回的一个已准备好执行的请求。

    建造者模式:将一个复杂的构建与其表示相分离,使用同样的构建过程可以创建不同的表示。实例化OKHttpClientRequest的时候,因为有太多的属性需要设置,而且开发者的需求组合千变万化,使用建造者模式可以让用户不需要关心这个类的内部细节,配置好后,建造者会帮助我们按部就班的初始化表示对象。
    
    • 1

    同时OKHttp在设计时采用的外观模式,将整个系统的复杂性给隐藏起来,将子系统接口通过一个客户端OKHttpClient统一暴露出来。

    OKHttpClient中全是一些配置,比如代理的配置、ssl证书的配置等。而Call本身是一个接口,我们获得的实现为RealCall

    class RealCall(
      val client: OkHttpClient,
      /** The application's original request unadulterated by redirects or auth headers. */
      val originalRequest: Request,
      val forWebSocket: Boolean
    ) : Call
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Callexecute代表了同步请求,而enqueue则代表异步请求。两者唯一区别在于一个会直接发送网络请求,而另一个使用OKHttp内置的线程池来进行。

    	### 同步请求
      override fun execute(): Response {
        check(executed.compareAndSet(false, true)) { "Already Executed" }
    
        timeout.enter()
        callStart()
        try {
          client.dispatcher.executed(this)
          return getResponseWithInterceptorChain()
        } finally {
          client.dispatcher.finished(this)
        }
      }
    
    	### 异步请求
      override fun enqueue(responseCallback: Callback) {
        check(executed.compareAndSet(false, true)) { "Already Executed" }
    
        callStart()
        client.dispatcher.enqueue(AsyncCall(responseCallback))
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    我们可以看出,最终请求都是交给dispatcher分发器进行进一步的执行。

    分发器

    Dispatcher,分发器就是来调配请求任务的,内部会包含一个线程池,可以在创建OKHttpClient时,传递我们自己定义的线程来创建分发器。

    我们先了解下Dispatcher中的一些涉及到请求的成员变量;

      //异步请求同时存在的最大请求
      var maxRequests = 64
      
      //异步请求同一域名同时存在的最大请求
      var maxRequestsPerHost = 5
      
      //闲置任务(没有请求时可执行一些任务,由使用者设置)
      var idleCallback: Runnable? = null
      
      //异步请求使用的线程池
    	val executorService: ExecutorService
        get() {
          if (executorServiceOrNull == null) {
          	//核心线程数 == 0
          	//最大线程数 == Int.MAX_VALUE
          	//非核心线程存活时间 60s
          	//任务队列使用 SynchronousQueue【不存储元素的阻塞队列】
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
          }
          return executorServiceOrNull!!
        }
    
    	//异步请求等待执行队列
      private val readyAsyncCalls = ArrayDeque<AsyncCall>()
    	
    	//异步请求正在执行队列
      private val runningAsyncCalls = ArrayDeque<AsyncCall>()
    	
    	//同步请求正在执行队列
      private val runningSyncCalls = ArrayDeque<RealCall>()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    同步请求

      @Synchronized internal fun executed(call: RealCall) {
        runningSyncCalls.add(call)
      }
    
    • 1
    • 2
    • 3

    同步请求不需要线程池,也不存在任何限制。所以分发器仅做一下记录。

    异步请求

      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
        	// 先将任务添加到异步请求任务等待队列中;
          readyAsyncCalls.add(call)
          if (!call.call.forWebSocket) {
          	//根据当前请求host查找当前请求中是否已存在相同host请求
            val existingCall = findExistingCallWithHost(call.host)
            //如果存在,获取当前请求任务中相同host数量
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
          }
        }
        promoteAndExecute()
      }
    
      private fun promoteAndExecute(): Boolean {
        this.assertThreadDoesntHoldLock()
    
        val executableCalls = mutableListOf<AsyncCall>()
        val isRunning: Boolean
        synchronized(this) {
        	//遍历请求等待任务队列
          val i = readyAsyncCalls.iterator()
          while (i.hasNext()) {
            val asyncCall = i.next()
    		//如果当前任务执行数量大于或等于64个,跳出方法执行
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            //如果当前任务对应的host数量大于或等于5个,continue掉,查找下一个满足条件的任务
            if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
    		//从等待队列中移除
            i.remove()
            //当前请求host对应数量+1
            asyncCall.callsPerHost.incrementAndGet()
            //丢入异步正在执行队列中
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
          }
          isRunning = runningCallsCount() > 0
        }
    	//调用线程池进行任务执行
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          asyncCall.executeOn(executorService)
        }
    
        return isRunning
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    当正在执行的任务没有超过最大限制64,同时asyncCall.callsPerHost.get() >= this.maxRequestsPerHost同一Host请求不超过5个,则会添加到正在执行队列,同时提交给线程池,否则先加入等待队列。

    加入线程池就直接执行,但是如果加入等待队列后,就需要等待有空闲名额才开始执行,因此每次执行完一个请求后,都会调用分发器的finished方法;

       //异步请求调用
      internal fun finished(call: AsyncCall) {
      	//host请求个数减少1
        call.callsPerHost.decrementAndGet()
        finished(runningAsyncCalls, call)
      }
    
    	//同步请求调用
      internal fun finished(call: RealCall) {
        finished(runningSyncCalls, call)
      }
    
      private fun <T> finished(calls: Deque<T>, call: T) {
        val idleCallback: Runnable?
        synchronized(this) {
          //不管异步还是同步,执行完后都要从队列移除任务
          if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
          idleCallback = this.idleCallback
        }
        //轮询执行下一个任务
        val isRunning = promoteAndExecute()
    	 //没有任务执行时,执行闲置任务
        if (!isRunning && idleCallback != null) {
          idleCallback.run()
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    执行完了移除正在执行队列中的元素,结束后会再次调用promoteAndExecute(),查找满足条件的异步任务进行执行。

    分发器线程池

    分发器是用来调配请求任务的,内部包含了一个自定义线程池executorService,如下:

      @get:Synchronized
      @get:JvmName("executorService") val executorService: ExecutorService
        get() {
          if (executorServiceOrNull == null) {
          	//核心线程数 == 0
          	//最大线程数 == Int.MAX_VALUE
          	//非核心线程存活时间 60s
          	//任务队列使用 SynchronousQueue【不存储元素的阻塞队列】
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
          }
          return executorServiceOrNull!!
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    那为什么要这么定义呢?

    我们先回顾下线程池调度策略和常用等待队列:

    • 线程池调度策略
      1.如果线程池中的线程数量未达到核心线程的数量,那么直接启动一个核心线程来执行任务;
      2.如果线程池中的线程数量已经达到或者超过核心线程的数量,那么任务会被插入到任务队列中排队等待执行;
      3.如果在步骤2中无法将任务插入到任务队列中,这往往是由于任务队列已满,这个时候如果线程数量未达到线程池规定的最大值,那么会立刻启动一个非核心线程来执行任务;
      4.如果步骤3中线程数量已经达到线程池规定的最大数量,那么就会拒绝执行此任务,ThreadPoolExecutor会调用RejectedExecutionHandler的rejectedExecution方法来通知调用者;
    • 常用等待队列
      ArrayBlockingQueueLinkedBlockQueueSynchronousQueue

    假设向线程池提交任务时,核心线程都被占用的情况下:

    ArrayBlockingQueue:基于数组的阻塞队列,初始化需要指定固定大小;
    当使用此队列时,向线程池提交任务,会首先加入到等待队列中,当等待队列满了以后,再次提交任务,尝试加入队列就会失败,这时就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交任务。所以最终可能出现后提交的任务先执行,而先提交的任务会一直在等待。

    LinkedBlockQueue:基于链表实现的阻塞队列,初始化可以指定大小,也可以不指定;
    当指定大小后,行为就和ArrayBlockingQueue一致。而如果未指定大小,则会默认使用Int.MAX_VALUE作为队列大小。这时候就会出现线程池的最大线程数参数无用,因为无论如何,向线程池提交任务加入等待队列都会成功,最终意味着所有任务都是在核心线程执行。如何核心线程一直被占,那就一直等待。

    SynchronousQueue:无容量队列。
    使用此队列意味着希望获取最大并发量。因为无论如何,向线程池提交任务,往队列提交任务都会失败,而失败后,如果没有空闲的非核心线程,就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务,完全没有任何等待,唯一制约它的就是最大线程数的个数。因此一般配合Int.MAX_VALUE就实现了真正的无等待。

    但是需要注意的是,进程的内存是存在限制的,线程并不能无限个数,那么当设置最大线程为Int.MAX_VALUE时,OkHttp同时还有最大请求任务个数:64的限制,这样既解决了这个问题同时也能获取最大吞吐。

    因此结合分发器定义的线程池分析如下: 首先核心线程数为0,表示线程池不会一直为我们缓存线程,线程池中所有线程都是在60s内没有工作就会被回收,而最大线程数Int.MAX_VALUE与等待队列SynchronousQueue的组合能够得到最大的吞吐量。当需要线程执行任务时,如果不存在空闲线程不需要等待,马上新建线程执行任务。

    请求流程

    用户是不需要直接操作分发器的,获取到RealCall后就分别调用executeenqueue来进行同步或异步请求;

    	//同步任务执行;
      override fun execute(): Response {
        check(executed.compareAndSet(false, true)) { "Already Executed" }
    
        timeout.enter()
        callStart()
        try {
        	//调用分发器
          client.dispatcher.executed(this)
          //执行请求
          return getResponseWithInterceptorChain()
        } finally {
          client.dispatcher.finished(this)
        }
      }
    
    	//异步任务执行;
      override fun enqueue(responseCallback: Callback) {
        check(executed.compareAndSet(false, true)) { "Already Executed" }
        callStart()
          //执行请求
        client.dispatcher.enqueue(AsyncCall(responseCallback))
      }
    
    	//异步任务交给线程池执行,最终调用RealCall.run()方法
        override fun run() {
          threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            timeout.enter()
            try {
            	//执行请求
              val response = getResponseWithInterceptorChain()
              signalledCallback = true
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
              } else {
                responseCallback.onFailure(this@RealCall, e)
              }
            } catch (t: Throwable) {
              cancel()
              if (!signalledCallback) {
                val canceledException = IOException("canceled due to $t")
                canceledException.addSuppressed(t)
                responseCallback.onFailure(this@RealCall, canceledException)
              }
              throw t
            } finally {
              client.dispatcher.finished(this)
            }
          }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    可以看到同步异步请求最终都会通过getResponseWithInterceptorChain方法来执行请求,下一篇我们继续学习OKHttp关于getResponseWithInterceptorChain涉及的各个拦截器;

    结语

    OKHttp源码中有许多值得我们学习的地方,比如使用时涉及的建造者设计模式,再如分发器中自定义的线程池等等,对于我们开发中很有帮助;

    如果以上文章对您有一点点帮助,希望您不要吝啬的点个赞加个关注,您每一次小小的举动都是我坚持写作的不懈动力!ღ( ´・ᴗ・` )

  • 相关阅读:
    讲讲如何用IDEA开发java项目——本文来自AI创作助手
    Dockerfile(3) - WORKDIR 指令详解
    Docker快速安装Mysql
    golang学习笔记系列之一些标准库的学习(log,bytes,errors等)
    Grating period and grating constant(光栅周期与光栅常数)
    【尚硅谷】第05章:随堂复习与企业真题(数组)
    企业可以自己建立大数据平台吗?有哪些好处?
    Java毕设项目——大学生社团管理系统(java+SSM+Maven+Mysql+Jsp)
    翻译软件哪个准确度高
    费用分析怎么做?如何解决财务数据分散、多表分析难问题?
  • 原文地址:https://blog.csdn.net/a734474820/article/details/126286861