用于处理es内部节点通信及http请求通信的。类为NetworkModule,其主要管理三个主要的类Transport,HttpServerTransport以及TransportInterceptor。
1、初始化
node启动时,会创建NetworkModule,并且将网络插件传给NetworkModule。
final NetworkModule networkModule = new NetworkModule(settings, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController, clusterService.getClusterSettings());
在构造函数中,会遍历网络插件中的HttpServerTransport,Transport,TransportInterceptor注册到NetworkModule中的transportHttpFactories,transportFactories和transportInterceptors中。
public NetworkModule(Settings settings, List<NetworkPlugin> plugins, ThreadPool threadPool,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings) {
this.settings = settings;
for (NetworkPlugin plugin : plugins) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
registerHttpTransport(entry.getKey(), entry.getValue());
}
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
circuitBreakerService, namedWriteableRegistry, networkService);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
}
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
threadPool.getThreadContext());
for (TransportInterceptor interceptor : transportInterceptors) {
registerTransportInterceptor(interceptor);
}
}
}
1.1 NetworkPlugin
NetworkPlugin是接口,Netty4Plugin是其的一个实现类,其结构图为
- getTransportInterceptors:获取网络插件提供的网络拦截器
- getTransports:获取网络插件提供的RPC,即请求处理器
- getHttpTransports:获取网络插件提供的Rest请求处理器
?Netty4Plugin会创建Netty4Transport和Netty4HttpServerTransport
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
}
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
clusterSettings, getSharedGroupFactory(settings)));
}
1.2?Transport
Transport接口,Netty4Transport是其中的一个实现。
Tramsport接口中包含了内部类,RequestHandlers,ResponseHandlers。registerRequestHandler通过TransportService#registerRequestHandler来调用。
RequestHandlers使用requestHandlers来管理acttion与RequestHandlerRegistry映射关系,registerHandler方法来向requestHandlers添加映射。
RequestHandlerRegistry结构如下图
action表示行为名称
handler为对应action的请求处理器
executor为线程池的名称
requestReader为请求的读取
Netty4Transport启动时会初始化clientBootstrap和serverBootstrap。
protected void doStart() {
boolean success = false;
try {
sharedGroup = sharedGroupFactory.getTransportGroup();
clientBootstrap = createClientBootstrap(sharedGroup);
if (NetworkService.NETWORK_SERVER.get(settings)) {
for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings, sharedGroup);
bindServer(profileSettings);
}
}
super.doStart();
success = true;
} finally {
if (success == false) {
doStop();
}
}
}
?Netty4Transport什么时候会调用doStart呢?
在node#start时,会调用TransportService#start,TransportService#doStart会调用transport#start。
1.3 HttpServerTransport
Netty4HttpServerTransport是其的一种实现
?HttpServerTransport包含内部接口Dispacher,分发请求
?Netty4HttpServerTransport#doStart会创建ServerBootstrap,设置childHandler为HttpChannelHandler,创建监听端口并且绑定,等待请求。
public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings);
}
?当收到请求时,会调用dipatchRequest来处理对应的请求。
?dipatcher是在创建NetworkModule时,从ActionModule#getRestController得到的Dispathcer传递给Networkmodule。
1.4 TransportInterceptor
请求的拦截器接口,对请求做功能增强,分为请求发送,请求接收处理
2、TransportService
2.1?集群内连接到其他结点
默认配置下,es中的每个结点与其它结点会保持13个长连接,可以通过配置调整连接数。
结点与其它结点连接通过TransportService#connectToNode,底层是通过ConnectionManager#connectToNode,最终是调用Transport#openConnection来开启连接。在完成连接后,内部维护一个NodeChannels,表示结点与结点之间的连接,包含多个tcp连接,并且记录连接类型。连接类型定义在TransportRequestOptions.Type
?连接由ConnectionProfile管理?
public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportSettings.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportSettings.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportSettings.CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = TransportSettings.CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = TransportSettings.CONNECTIONS_PER_NODE_PING.get(settings);
Builder builder = new Builder();
builder.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
builder.setPingInterval(TransportSettings.PING_SCHEDULE.get(settings));
builder.setCompressionEnabled(TransportSettings.TRANSPORT_COMPRESS.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}
连接类型对应的配置项
连接类型 | 配置项 | 默认值 | RECOVERY | transport.connections_per_node.recovery | 2 | BULK | transport.connections_per_node.bulk | 3 | REG | transport.connections_per_node.reg | 6 | STATE | transport.connections_per_node.state | 1 | PING | transport.connections_per_node.ping | 1 |
建立连接过程中,如果其中有一个连接失败,则全部失败,关闭所有连接。
?ConnectionManager管理负责处理连接。
2.2?发送请求
通过sendRequest发送请求,会检查目标结点是否是本地结点,如果是本地结点,在sendLocalRequest中通过action获取handler,调用对应的处理函数。
private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
try {
onRequestSent(localNode, requestId, action, request, options);
onRequestReceived(requestId, action);
final RequestHandlerRegistry reg = getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
}
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
//noinspection unchecked
reg.processMessageReceived(request, channel);
} else {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
//noinspection unchecked
reg.processMessageReceived(request, channel);
}
@Override
public boolean isForceExecution() {
return reg.isForceExecution();
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"failed to notify channel of error message for action [{}]", action), inner);
}
}
@Override
public String toString() {
return "processing of [" + requestId + "][" + action + "]: " + request;
}
});
}
} catch (Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(
() -> new ParameterizedMessage(
"failed to notify channel of error message for action [{}]", action), inner);
}
}
}
当需要发到网络上时,调用asyncSender.sendRequest发送,最终是调用TcpTransport.NodeChannels的sendRequest来发送。
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (isClosing.get()) {
throw new NodeNotConnectedException(node, "connection already closed");
}
TcpChannel channel = channel(options.type());
outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), compress, false);
}
2.3?对响应的处理
当通过TransportService#sendRequest发送请求时,作为客户端,?需要定义对响应的处理。通过TransportResponseHandler类负责定义对响应的处理。
?
|