前言?
Dubbo是一个支持大量并发请求的网络框架,单机TPS能够达到1w,这种并发处理请求的能力和它的线程模型是分不开的。
在提供者处理请求这一端,Dubbo通过多线程同时处理多个客户端请求。
Dubbo底层是使用netty作为通信组件的,了解Dubbo的线程模型之前我们先了解下Netty的线程模型,在Dubbo中使用的是netty的主从 Reactor 多线程模式,如下图:
在这种模式中,客户端的连接事件和读写数据的事件由不同的线程处理,一般连接事件使用1个线程处理,读写数据的事件交给线程池处理。在dubbo的源码中可以看出使用的是这种模式。?
<span style="color:#333333"><span style="background-color:rgba(0, 0, 0, 0.03)"><code> <span style="color:#afafaf">@Override</span></code><code> <span style="color:#ca7d37">protected</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">doOpen</span>() <span style="color:#ca7d37">throws</span> Throwable {</code><code> bootstrap = <span style="color:#ca7d37">new</span> ServerBootstrap();</code><code>????????<span style="color:#afafaf"><em>//处理连接事件的线程</em></span></code><code> bossGroup = <span style="color:#ca7d37">new</span> NioEventLoopGroup(<span style="color:#0e9ce5">1</span>, <span style="color:#ca7d37">new</span> DefaultThreadFactory(<span style="color:#dd1144">"NettyServerBoss"</span>, <span style="color:#ca7d37">true</span>));</code><code> <span style="color:#afafaf"><em>//处理读写数据事件的线程池</em></span></code><code> workerGroup = <span style="color:#ca7d37">new</span> NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),</code><code> <span style="color:#ca7d37">new</span> DefaultThreadFactory(<span style="color:#dd1144">"NettyServerWorker"</span>, <span style="color:#ca7d37">true</span>));</code><code>?</code><code> <span style="color:#ca7d37">final</span> NettyServerHandler nettyServerHandler = <span style="color:#ca7d37">new</span> NettyServerHandler(getUrl(), <span style="color:#ca7d37">this</span>);</code><code> channels = nettyServerHandler.getChannels();</code><code>?</code><code> bootstrap.group(bossGroup, workerGroup)</code><code> .channel(NioServerSocketChannel.class)</code><code> .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)</code><code> .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)</code><code> .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)</code><code> .childHandler(<span style="color:#ca7d37">new</span> ChannelInitializer<NioSocketChannel>() {</code><code> <span style="color:#afafaf">@Override</span></code><code> <span style="color:#ca7d37">protected</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">initChannel</span>(NioSocketChannel ch) <span style="color:#ca7d37">throws</span> Exception {</code><code> <span style="color:#afafaf"><em>// <span style="color:#dd1144">FIXME:</span> should we use getTimeout()?</em></span></code><code> <span style="color:#ca7d37">int</span> idleTimeout = UrlUtils.getIdleTimeout(getUrl());</code><code> NettyCodecAdapter adapter = <span style="color:#ca7d37">new</span> NettyCodecAdapter(getCodec(), getUrl(), NettyServer.<span style="color:#ca7d37">this</span>);</code><code> ch.pipeline()<span style="color:#afafaf"><em>//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug</em></span></code><code> .addLast(<span style="color:#dd1144">"decoder"</span>, adapter.getDecoder())</code><code> .addLast(<span style="color:#dd1144">"encoder"</span>, adapter.getEncoder())</code><code> .addLast(<span style="color:#dd1144">"server-idle-handler"</span>, <span style="color:#ca7d37">new</span> IdleStateHandler(<span style="color:#0e9ce5">0</span>, <span style="color:#0e9ce5">0</span>, idleTimeout, MILLISECONDS))</code><code> .addLast(<span style="color:#dd1144">"handler"</span>, nettyServerHandler);</code><code> }</code><code> });</code><code> <span style="color:#afafaf"><em>// bind</em></span></code><code> ChannelFuture channelFuture = bootstrap.bind(getBindAddress());</code><code> channelFuture.syncUninterruptibly();</code><code> channel = channelFuture.channel();</code><code>?</code><code> }</code></span></span>
上面讲的是Netty的线程池模型,而在Dubbo中,Dubbo自身框架也有一套线程池模型,它和Netty的线程池模型是有一定关系的。
上图是Dubbo的线程派发模型,在Netty的线程池把请求转发到Dubbo的handler时候,会进行请求分发,这个时候就可能会通过Dubbo自身的线程来处理业务请求了。
Dubbo中的线程派发策略
dubbo总共有5类线程分发器,不同的线程分发器代表不同的线程派发策略,表示哪类消息会使用dubbo自身的线程池处理,默认使用AllDispatcher。
-
?`all` 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。默认的 -
?`direct` 所有消息都不派发到线程池,全部在 IO 线程上直接执行。 -
`message` 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。 -
?`execution` 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。 -
`connection` 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。?
比如AllDispatcher分发器使用AllChannelHandler处理消息,这种处理器会使用dubbo自身线程池处理请求,响应,连接,端口,心态所有的和客户端交互的事件。源码如下:
<span style="color:#333333"><span style="background-color:rgba(0, 0, 0, 0.03)"><code><span style="color:#ca7d37">public</span> <span style="color:#ca7d37">class</span> <span style="color:#0e9ce5">AllChannelHandler</span> <span style="color:#ca7d37">extends</span> <span style="color:#0e9ce5">WrappedChannelHandler</span> {</code><code>?</code><code> <span style="color:#ca7d37">public</span> <span style="color:#dd1144">AllChannelHandler</span>(ChannelHandler handler, URL url) {</code><code> <span style="color:#ca7d37">super</span>(handler, url);</code><code> }</code><code> <span style="color:#afafaf"><em>//连接事件</em></span></code><code> <span style="color:#afafaf">@Override</span></code><code> <span style="color:#ca7d37">public</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">connected</span>(Channel channel) <span style="color:#ca7d37">throws</span> RemotingException {</code><code> ExecutorService executor = getExecutorService();</code><code> <span style="color:#ca7d37">try</span> {</code><code> executor.execute(<span style="color:#ca7d37">new</span> ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));</code><code> } <span style="color:#ca7d37">catch</span> (Throwable t) {</code><code> <span style="color:#ca7d37">throw</span> <span style="color:#ca7d37">new</span> ExecutionException(<span style="color:#dd1144">"connect event"</span>, channel, getClass() + <span style="color:#dd1144">" error when process connected event ."</span>, t);</code><code> }</code><code> }</code><code> <span style="color:#afafaf"><em>//断开事件</em></span></code><code> <span style="color:#afafaf">@Override</span></code><code> <span style="color:#ca7d37">public</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">disconnected</span>(Channel channel) <span style="color:#ca7d37">throws</span> RemotingException {</code><code> ExecutorService executor = getExecutorService();</code><code> <span style="color:#ca7d37">try</span> {</code><code> executor.execute(<span style="color:#ca7d37">new</span> ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));</code><code> } <span style="color:#ca7d37">catch</span> (Throwable t) {</code><code> <span style="color:#ca7d37">throw</span> <span style="color:#ca7d37">new</span> ExecutionException(<span style="color:#dd1144">"disconnect event"</span>, channel, getClass() + <span style="color:#dd1144">" error when process disconnected event ."</span>, t);</code><code> }</code><code> }</code><code> <span style="color:#afafaf"><em>//接收读事件</em></span></code><code> <span style="color:#afafaf">@Override</span></code><code> <span style="color:#ca7d37">public</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">received</span>(Channel channel, Object message) <span style="color:#ca7d37">throws</span> RemotingException {</code><code> ExecutorService executor = getExecutorService();</code><code> <span style="color:#ca7d37">try</span> {</code><code> executor.execute(<span style="color:#ca7d37">new</span> ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));</code><code> } <span style="color:#ca7d37">catch</span> (Throwable t) {</code><code> <span style="color:#afafaf"><em>//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring</em></span></code><code> <span style="color:#afafaf"><em>//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out</em></span></code><code> <span style="color:#ca7d37">if</span>(message <span style="color:#ca7d37">instanceof</span> Request && t <span style="color:#ca7d37">instanceof</span> RejectedExecutionException){</code><code> Request request = (Request)message;</code><code> <span style="color:#ca7d37">if</span>(request.isTwoWay()){</code><code> String msg = <span style="color:#dd1144">"Server side("</span> + url.getIp() + <span style="color:#dd1144">","</span> + url.getPort() + <span style="color:#dd1144">") threadpool is exhausted ,detail msg:"</span> + t.getMessage();</code><code> Response response = <span style="color:#ca7d37">new</span> Response(request.getId(), request.getVersion());</code><code> response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);</code><code> response.setErrorMessage(msg);</code><code> channel.send(response);</code><code> <span style="color:#ca7d37">return</span>;</code><code> }</code><code> }</code><code> <span style="color:#ca7d37">throw</span> <span style="color:#ca7d37">new</span> ExecutionException(message, channel, getClass() + <span style="color:#dd1144">" error when process received event ."</span>, t);</code><code> }</code><code> }</code><code> <span style="color:#afafaf"><em>//异常事件</em></span></code><code> <span style="color:#afafaf">@Override</span></code><code> <span style="color:#ca7d37">public</span> <span style="color:#ca7d37">void</span> <span style="color:#dd1144">caught</span>(Channel channel, Throwable exception) <span style="color:#ca7d37">throws</span> RemotingException {</code><code> ExecutorService executor = getExecutorService();</code><code> <span style="color:#ca7d37">try</span> {</code><code> executor.execute(<span style="color:#ca7d37">new</span> ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));</code><code> } <span style="color:#ca7d37">catch</span> (Throwable t) {</code><code> <span style="color:#ca7d37">throw</span> <span style="color:#ca7d37">new</span> ExecutionException(<span style="color:#dd1144">"caught event"</span>, channel, getClass() + <span style="color:#dd1144">" error when process caught event ."</span>, t);</code><code> }</code><code> }</code><code>}</code></span></span>
dubbo线程池分类
总共4种线程池,默认使用固定大小的线程池。
-
?`fixed` 固定大小线程池,启动时建立线程,不关闭,一直持有。(默认) -
?`cached` 缓存线程池,空闲一分钟自动删除,需要时重建。 -
`limited` 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。 -
`eager` 优先创建`Worker`线程池。在任务数量大于`corePoolSize`但是小于`maximumPoolSize`时,优先创建`Worker`来处理任务。当任务数量大于`maximumPoolSize`时,将任务放入阻塞队列中。阻塞队列充满时抛出`RejectedExecutionException`。(相比`cached`:`cached`在任务数量超过`maximumPoolSize`时直接抛出异常而不是将任务放入阻塞队列)?
如果线程如果满了,?dubbo会抛异常,按如下提示打印日志。
?
总结
本文介绍了netty线程池模型和dubbo线程池模型的关系,也分析了dubbo线程池模型分发的策略,最后对dubbo线程池分类进行了分析总结。
|