前言:
通过之前对provider启动过程的学习,我们知道,提供者默认是以Netty来启动对应协议端口来提供服务的。
Netty的标准启动模式下有两个线程组:boss和work线程组。
在接收到具体的请求后,如果服务提供者对该请求处理时间比较短,那么直接在work线程上处理即可;
如果服务提供者对该请求处理时间比较长,那么如果还在work线程上处理,则会阻塞其他请求的处理,降低了整个provider的处理能力。
如何解决这种问题呢?
可以通过自定义一个线程池,将work线程接收到的请求,交由该线程池来处理,这样就避免了请求都阻塞在work线程上。
基于这种解决方案,Dubbo提供了多种线程模式。我们一起来看下。
1.创建线程模型入口
先来分析下线程模型创建的入口。
还是从NettyServer(Netty4下的)的创建开始
public class NettyServer extends AbstractServer implements RemotingServer {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 具体创建在ChannelHandlers.wrap()中
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
}
1.1 ChannelHandlers.wrap()
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 本质上最终还是调用到这
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
从NettyServer的构造方法中可以看到,传入的Handler通过不断的复合,最终以MultiMessageHandler --> HeartbeatHandler --> (Dispatcher生成的Handler) 这种顺序来处理请求。
所以,线程模式的创建是在NettyServer创建时就已经完成的。
2.任务执行线程模型
线程模式本身通过SPI的方式来获取Dispatcher接口的实现类。具体有以下实现
下面具体来分析下每种Dispatcher的不同之处。
2.1 AllDispatcher(默认)
当前类型Dispatcher,则会把接收到的所有请求(请求、响应、连接心跳等事件)都交由自定义的线程池来处理。
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
// 在接收到请求时,直接交由executor线程池处理
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
通过源码可知:针对以上覆写的几种请求类型,全部交由自定义线程池来执行。
2.2 DirectDispatcher
所有的请求都直接在work线程上执行,不交由自定义线程池来处理。
public class DirectDispatcher implements Dispatcher {
public static final String NAME = "direct";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new DirectChannelHandler(handler, url);
}
}
public class DirectChannelHandler extends WrappedChannelHandler {
public DirectChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
if (executor instanceof ThreadlessExecutor) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
// 直接在当前线程执行handler操作
} else {
handler.received(channel, message);
}
}
}
2.3 MessageOnlyDispatcher
只有请求响应消息派发到自定义线程池,其他的连接、心跳等直接在work线程上执行
public class MessageOnlyDispatcher implements Dispatcher {
public static final String NAME = "message";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new MessageOnlyChannelHandler(handler, url);
}
}
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
只重写了received方法,其他的connect等方法直接在当前work线程上执行Handler相关方法
2.4 ExecutionDispatcher
只有请求类型消息交由自定义线程池执行,其他类型消息都在work线程上执行
public class ExecutionDispatcher implements Dispatcher {
public static final String NAME = "execution";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ExecutionChannelHandler(handler, url);
}
}
public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
// 这里会过滤消息类型,只有request请求才会交由自定义线程池执行
if (message instanceof Request) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if (t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
}
throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
}
} else if (executor instanceof ThreadlessExecutor) {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} else {
handler.received(channel, message);
}
}
}
2.5 ConnectionOrderedDispatcher
连接、断开事件放入队列中,在work线程上逐个执行,而其他类型的消息(请求、响应)则交由自定义业务线程池来执行
public class ConnectionOrderedDispatcher implements Dispatcher {
public static final String NAME = "connection";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ConnectionOrderedChannelHandler(handler, url);
}
}
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queuewarninglimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
// 定义一个单线程的线程池,后续为执行连接、断开事件做准备
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor!
queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
@Override
public void connected(Channel channel) throws RemotingException {
try {
// 连接事件被单线程池执行
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
try {
// 连接事件被单线程池执行
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
// 请求响应信息则交由自定义线程池执行
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if (message instanceof Request && t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queuewarninglimit) {
logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
}
}
}
总结:
Dubbo的分层实在太细了,每层之间也是强解耦的。用户可以随时自定义相关Dispatcher来选择不同的执行请求方案。
下一篇我们再来分析下Dubbo线程池的模型。
|