IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Dubbo线程模型设计解析 -> 正文阅读

[网络协议]Dubbo线程模型设计解析

前言?

Dubbo是一个支持大量并发请求的网络框架,单机TPS能够达到1w,这种并发处理请求的能力和它的线程模型是分不开的。

在提供者处理请求这一端,Dubbo通过多线程同时处理多个客户端请求。

Dubbo底层是使用netty作为通信组件的,了解Dubbo的线程模型之前我们先了解下Netty的线程模型,在Dubbo中使用的是netty的主从 Reactor 多线程模式,如下图:

在这种模式中,客户端的连接事件和读写数据的事件由不同的线程处理,一般连接事件使用1个线程处理,读写数据的事件交给线程池处理。在dubbo的源码中可以看出使用的是这种模式。?

<span style="color:#333333"><span style="background-color:rgba(0, 0, 0, 0.03)"><code>    <span style="color:#afafaf">@Override</span></code><code>    <span style="color:#ca7d37">protected</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">doOpen</span>() <span style="color:#ca7d37">throws</span> Throwable {</code><code>        bootstrap = <span style="color:#ca7d37">new</span> ServerBootstrap();</code><code>????????<span style="color:#afafaf"><em>//处理连接事件的线程</em></span></code><code>        bossGroup = <span style="color:#ca7d37">new</span> NioEventLoopGroup(<span style="color:#0e9ce5">1</span>, <span style="color:#ca7d37">new</span> DefaultThreadFactory(<span style="color:#dd1144">"NettyServerBoss"</span>, <span style="color:#ca7d37">true</span>));</code><code>        <span style="color:#afafaf"><em>//处理读写数据事件的线程池</em></span></code><code>        workerGroup = <span style="color:#ca7d37">new</span> NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),</code><code>                <span style="color:#ca7d37">new</span> DefaultThreadFactory(<span style="color:#dd1144">"NettyServerWorker"</span>, <span style="color:#ca7d37">true</span>));</code><code>?</code><code>        <span style="color:#ca7d37">final</span> NettyServerHandler nettyServerHandler = <span style="color:#ca7d37">new</span> NettyServerHandler(getUrl(), <span style="color:#ca7d37">this</span>);</code><code>        channels = nettyServerHandler.getChannels();</code><code>?</code><code>        bootstrap.group(bossGroup, workerGroup)</code><code>                .channel(NioServerSocketChannel.class)</code><code>                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)</code><code>                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)</code><code>                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)</code><code>                .childHandler(<span style="color:#ca7d37">new</span> ChannelInitializer<NioSocketChannel>() {</code><code>                    <span style="color:#afafaf">@Override</span></code><code>                    <span style="color:#ca7d37">protected</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">initChannel</span>(NioSocketChannel ch) <span style="color:#ca7d37">throws</span> Exception {</code><code>                        <span style="color:#afafaf"><em>// <span style="color:#dd1144">FIXME:</span> should we use getTimeout()?</em></span></code><code>                        <span style="color:#ca7d37">int</span> idleTimeout = UrlUtils.getIdleTimeout(getUrl());</code><code>                        NettyCodecAdapter adapter = <span style="color:#ca7d37">new</span> NettyCodecAdapter(getCodec(), getUrl(), NettyServer.<span style="color:#ca7d37">this</span>);</code><code>                        ch.pipeline()<span style="color:#afafaf"><em>//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug</em></span></code><code>                                .addLast(<span style="color:#dd1144">"decoder"</span>, adapter.getDecoder())</code><code>                                .addLast(<span style="color:#dd1144">"encoder"</span>, adapter.getEncoder())</code><code>                                .addLast(<span style="color:#dd1144">"server-idle-handler"</span>, <span style="color:#ca7d37">new</span> IdleStateHandler(<span style="color:#0e9ce5">0</span>, <span style="color:#0e9ce5">0</span>, idleTimeout, MILLISECONDS))</code><code>                                .addLast(<span style="color:#dd1144">"handler"</span>, nettyServerHandler);</code><code>                    }</code><code>                });</code><code>        <span style="color:#afafaf"><em>// bind</em></span></code><code>        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());</code><code>        channelFuture.syncUninterruptibly();</code><code>        channel = channelFuture.channel();</code><code>?</code><code>    }</code></span></span>

上面讲的是Netty的线程池模型,而在Dubbo中,Dubbo自身框架也有一套线程池模型,它和Netty的线程池模型是有一定关系的。

上图是Dubbo的线程派发模型,在Netty的线程池把请求转发到Dubbo的handler时候,会进行请求分发,这个时候就可能会通过Dubbo自身的线程来处理业务请求了。


Dubbo中的线程派发策略

dubbo总共有5类线程分发器,不同的线程分发器代表不同的线程派发策略,表示哪类消息会使用dubbo自身的线程池处理,默认使用AllDispatcher。

  • ?`all` 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。默认的

  • ?`direct` 所有消息都不派发到线程池,全部在 IO 线程上直接执行。

  • `message` 只有请求响应消息派发到线程池,其它连接断开事,心跳等消息,直接在 IO 线程上执行。

  • ?`execution` 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

  • `connection` 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。?

比如AllDispatcher分发器使用AllChannelHandler处理消息,这种处理器会使用dubbo自身线程池处理请求,响应,连接,端口,所有的和客户端交互的事件。源码如下:

<span style="color:#333333"><span style="background-color:rgba(0, 0, 0, 0.03)"><code><span style="color:#ca7d37">public</span> <span style="color:#ca7d37">class</span> <span style="color:#0e9ce5">AllChannelHandler</span> <span style="color:#ca7d37">extends</span> <span style="color:#0e9ce5">WrappedChannelHandler</span> {</code><code>?</code><code>    <span style="color:#ca7d37">public</span> <span style="color:#dd1144">AllChannelHandler</span>(ChannelHandler handler, URL url) {</code><code>        <span style="color:#ca7d37">super</span>(handler, url);</code><code>    }</code><code>    <span style="color:#afafaf"><em>//连接事件</em></span></code><code>    <span style="color:#afafaf">@Override</span></code><code>    <span style="color:#ca7d37">public</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">connected</span>(Channel channel) <span style="color:#ca7d37">throws</span> RemotingException {</code><code>        ExecutorService executor = getExecutorService();</code><code>        <span style="color:#ca7d37">try</span> {</code><code>            executor.execute(<span style="color:#ca7d37">new</span> ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));</code><code>        } <span style="color:#ca7d37">catch</span> (Throwable t) {</code><code>            <span style="color:#ca7d37">throw</span> <span style="color:#ca7d37">new</span> ExecutionException(<span style="color:#dd1144">"connect event"</span>, channel, getClass() + <span style="color:#dd1144">" error when process connected event ."</span>, t);</code><code>        }</code><code>    }</code><code>    <span style="color:#afafaf"><em>//断开事件</em></span></code><code>    <span style="color:#afafaf">@Override</span></code><code>    <span style="color:#ca7d37">public</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">disconnected</span>(Channel channel) <span style="color:#ca7d37">throws</span> RemotingException {</code><code>        ExecutorService executor = getExecutorService();</code><code>        <span style="color:#ca7d37">try</span> {</code><code>            executor.execute(<span style="color:#ca7d37">new</span> ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));</code><code>        } <span style="color:#ca7d37">catch</span> (Throwable t) {</code><code>            <span style="color:#ca7d37">throw</span> <span style="color:#ca7d37">new</span> ExecutionException(<span style="color:#dd1144">"disconnect event"</span>, channel, getClass() + <span style="color:#dd1144">" error when process disconnected event ."</span>, t);</code><code>        }</code><code>    }</code><code>    <span style="color:#afafaf"><em>//接收读事件</em></span></code><code>    <span style="color:#afafaf">@Override</span></code><code>    <span style="color:#ca7d37">public</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">received</span>(Channel channel, Object message) <span style="color:#ca7d37">throws</span> RemotingException {</code><code>        ExecutorService executor = getExecutorService();</code><code>        <span style="color:#ca7d37">try</span> {</code><code>            executor.execute(<span style="color:#ca7d37">new</span> ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));</code><code>        } <span style="color:#ca7d37">catch</span> (Throwable t) {</code><code>            <span style="color:#afafaf"><em>//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring</em></span></code><code>            <span style="color:#afafaf"><em>//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out</em></span></code><code>          <span style="color:#ca7d37">if</span>(message <span style="color:#ca7d37">instanceof</span> Request && t <span style="color:#ca7d37">instanceof</span> RejectedExecutionException){</code><code>            Request request = (Request)message;</code><code>            <span style="color:#ca7d37">if</span>(request.isTwoWay()){</code><code>              String msg = <span style="color:#dd1144">"Server side("</span> + url.getIp() + <span style="color:#dd1144">","</span> + url.getPort() + <span style="color:#dd1144">") threadpool is exhausted ,detail msg:"</span> + t.getMessage();</code><code>              Response response = <span style="color:#ca7d37">new</span> Response(request.getId(), request.getVersion());</code><code>              response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);</code><code>              response.setErrorMessage(msg);</code><code>              channel.send(response);</code><code>              <span style="color:#ca7d37">return</span>;</code><code>            }</code><code>          }</code><code>            <span style="color:#ca7d37">throw</span> <span style="color:#ca7d37">new</span> ExecutionException(message, channel, getClass() + <span style="color:#dd1144">" error when process received event ."</span>, t);</code><code>        }</code><code>    }</code><code>    <span style="color:#afafaf"><em>//异常事件</em></span></code><code>    <span style="color:#afafaf">@Override</span></code><code>    <span style="color:#ca7d37">public</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">caught</span>(Channel channel, Throwable exception) <span style="color:#ca7d37">throws</span> RemotingException {</code><code>        ExecutorService executor = getExecutorService();</code><code>        <span style="color:#ca7d37">try</span> {</code><code>            executor.execute(<span style="color:#ca7d37">new</span> ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));</code><code>        } <span style="color:#ca7d37">catch</span> (Throwable t) {</code><code>            <span style="color:#ca7d37">throw</span> <span style="color:#ca7d37">new</span> ExecutionException(<span style="color:#dd1144">"caught event"</span>, channel, getClass() + <span style="color:#dd1144">" error when process caught event ."</span>, t);</code><code>        }</code><code>    }</code><code>}</code></span></span>

dubbo线程池分类

总共4种线程池,默认使用固定大小的线程池。

  • ?`fixed` 固定大小线程池,启动时建立线程,不关闭,一直持有。(默认)

  • ?`cached` 缓存线程池,空闲一分钟自动删除,需要时重建。

  • `limited` 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来大流量引起的性能问题。

  • `eager` 优先创建`Worker`线程池。在任务数量大于`corePoolSize`但是小于`maximumPoolSize`时,优先创建`Worker`来处理任务。当任务数量大于`maximumPoolSize`时,将任务放入阻塞队列中。阻塞队列充满时抛出`RejectedExecutionException`。(相比`cached`:`cached`在任务数量超过`maximumPoolSize`时直接抛出异常而不是将任务放入阻塞队列)?

如果线程如果满了,?dubbo会抛异常,按如下提示打印日志。

?

总结

本文介绍了netty线程池模型和dubbo线程池模型的关系,也分析了dubbo线程池模型分的策略,最后对dubbo线程池分类进行了分析总结。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-15 00:40:35  更:2022-04-15 00:45:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 4:34:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码