IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> es中的网络模块 -> 正文阅读

[大数据]es中的网络模块

用于处理es内部节点通信及http请求通信的。类为NetworkModule,其主要管理三个主要的类Transport,HttpServerTransport以及TransportInterceptor。

1、初始化

node启动时,会创建NetworkModule,并且将网络插件传给NetworkModule。

final NetworkModule networkModule = new NetworkModule(settings, pluginsService.filterPlugins(NetworkPlugin.class),
                threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                networkService, restController, clusterService.getClusterSettings());

在构造函数中,会遍历网络插件中的HttpServerTransport,Transport,TransportInterceptor注册到NetworkModule中的transportHttpFactories,transportFactories和transportInterceptors中。

public NetworkModule(Settings settings, List<NetworkPlugin> plugins, ThreadPool threadPool,
                         BigArrays bigArrays,
                         PageCacheRecycler pageCacheRecycler,
                         CircuitBreakerService circuitBreakerService,
                         NamedWriteableRegistry namedWriteableRegistry,
                         NamedXContentRegistry xContentRegistry,
                         NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
                         ClusterSettings clusterSettings) {
        this.settings = settings;
        for (NetworkPlugin plugin : plugins) {
            Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
                pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
            for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                registerHttpTransport(entry.getKey(), entry.getValue());
            }
            Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
                circuitBreakerService, namedWriteableRegistry, networkService);
            for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
                registerTransport(entry.getKey(), entry.getValue());
            }
            List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
                threadPool.getThreadContext());
            for (TransportInterceptor interceptor : transportInterceptors) {
                registerTransportInterceptor(interceptor);
            }
        }
    }

1.1 NetworkPlugin

NetworkPlugin是接口,Netty4Plugin是其的一个实现类,其结构图为

  • getTransportInterceptors:获取网络插件提供的网络拦截器
  • getTransports:获取网络插件提供的RPC,即请求处理器
  • getHttpTransports:获取网络插件提供的Rest请求处理器

?Netty4Plugin会创建Netty4Transport和Netty4HttpServerTransport

public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                          CircuitBreakerService circuitBreakerService,
                                                          NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
        return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
            networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
    }

public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                        PageCacheRecycler pageCacheRecycler,
                                                                        CircuitBreakerService circuitBreakerService,
                                                                        NamedXContentRegistry xContentRegistry,
                                                                        NetworkService networkService,
                                                                        HttpServerTransport.Dispatcher dispatcher,
                                                                        ClusterSettings clusterSettings) {
        return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
            () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
                clusterSettings, getSharedGroupFactory(settings)));
    }

1.2?Transport

Transport接口,Netty4Transport是其中的一个实现。

Tramsport接口中包含了内部类,RequestHandlers,ResponseHandlers。registerRequestHandler通过TransportService#registerRequestHandler来调用。

RequestHandlers使用requestHandlers来管理acttion与RequestHandlerRegistry映射关系,registerHandler方法来向requestHandlers添加映射。

RequestHandlerRegistry结构如下图

action表示行为名称

handler为对应action的请求处理器

executor为线程池的名称

requestReader为请求的读取

Netty4Transport启动时会初始化clientBootstrap和serverBootstrap。

protected void doStart() {
        boolean success = false;
        try {
            sharedGroup = sharedGroupFactory.getTransportGroup();
            clientBootstrap = createClientBootstrap(sharedGroup);
            if (NetworkService.NETWORK_SERVER.get(settings)) {
                for (ProfileSettings profileSettings : profileSettings) {
                    createServerBootstrap(profileSettings, sharedGroup);
                    bindServer(profileSettings);
                }
            }
            super.doStart();
            success = true;
        } finally {
            if (success == false) {
                doStop();
            }
        }
    }

?Netty4Transport什么时候会调用doStart呢?

在node#start时,会调用TransportService#start,TransportService#doStart会调用transport#start。

1.3 HttpServerTransport

Netty4HttpServerTransport是其的一种实现

?HttpServerTransport包含内部接口Dispacher,分发请求

?Netty4HttpServerTransport#doStart会创建ServerBootstrap,设置childHandler为HttpChannelHandler,创建监听端口并且绑定,等待请求。

public ChannelHandler configureServerChannelHandler() {
        return new HttpChannelHandler(this, handlingSettings);
    }

?当收到请求时,会调用dipatchRequest来处理对应的请求。

?dipatcher是在创建NetworkModule时,从ActionModule#getRestController得到的Dispathcer传递给Networkmodule。

1.4 TransportInterceptor

请求的拦截器接口,对请求做功能增强,分为请求发送,请求接收处理

2、TransportService

2.1?集群内连接到其他结点

默认配置下,es中的每个结点与其它结点会保持13个长连接,可以通过配置调整连接数。

结点与其它结点连接通过TransportService#connectToNode,底层是通过ConnectionManager#connectToNode,最终是调用Transport#openConnection来开启连接。在完成连接后,内部维护一个NodeChannels,表示结点与结点之间的连接,包含多个tcp连接,并且记录连接类型。连接类型定义在TransportRequestOptions.Type

?连接由ConnectionProfile管理?

public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
        int connectionsPerNodeRecovery = TransportSettings.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
        int connectionsPerNodeBulk = TransportSettings.CONNECTIONS_PER_NODE_BULK.get(settings);
        int connectionsPerNodeReg = TransportSettings.CONNECTIONS_PER_NODE_REG.get(settings);
        int connectionsPerNodeState = TransportSettings.CONNECTIONS_PER_NODE_STATE.get(settings);
        int connectionsPerNodePing = TransportSettings.CONNECTIONS_PER_NODE_PING.get(settings);
        Builder builder = new Builder();
        builder.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
        builder.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
        builder.setPingInterval(TransportSettings.PING_SCHEDULE.get(settings));
        builder.setCompressionEnabled(TransportSettings.TRANSPORT_COMPRESS.get(settings));
        builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
        builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
        // if we are not master eligible we don't need a dedicated channel to publish the state
        builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
        // if we are not a data-node we don't need any dedicated channels for recovery
        builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
        builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
        return builder.build();
    }

连接类型对应的配置项

连接类型配置项默认值
RECOVERYtransport.connections_per_node.recovery2
BULKtransport.connections_per_node.bulk3
REGtransport.connections_per_node.reg6
STATEtransport.connections_per_node.state1
PINGtransport.connections_per_node.ping1

建立连接过程中,如果其中有一个连接失败,则全部失败,关闭所有连接。

?ConnectionManager管理负责处理连接。

2.2?发送请求

通过sendRequest发送请求,会检查目标结点是否是本地结点,如果是本地结点,在sendLocalRequest中通过action获取handler,调用对应的处理函数。

private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
        final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
        try {
            onRequestSent(localNode, requestId, action, request, options);
            onRequestReceived(requestId, action);
            final RequestHandlerRegistry reg = getRequestHandler(action);
            if (reg == null) {
                throw new ActionNotFoundTransportException("Action [" + action + "] not found");
            }
            final String executor = reg.getExecutor();
            if (ThreadPool.Names.SAME.equals(executor)) {
                //noinspection unchecked
                reg.processMessageReceived(request, channel);
            } else {
                threadPool.executor(executor).execute(new AbstractRunnable() {
                    @Override
                    protected void doRun() throws Exception {
                        //noinspection unchecked
                        reg.processMessageReceived(request, channel);
                    }

                    @Override
                    public boolean isForceExecution() {
                        return reg.isForceExecution();
                    }

                    @Override
                    public void onFailure(Exception e) {
                        try {
                            channel.sendResponse(e);
                        } catch (Exception inner) {
                            inner.addSuppressed(e);
                            logger.warn(() -> new ParameterizedMessage(
                                    "failed to notify channel of error message for action [{}]", action), inner);
                        }
                    }

                    @Override
                    public String toString() {
                        return "processing of [" + requestId + "][" + action + "]: " + request;
                    }
                });
            }

        } catch (Exception e) {
            try {
                channel.sendResponse(e);
            } catch (Exception inner) {
                inner.addSuppressed(e);
                logger.warn(
                    () -> new ParameterizedMessage(
                        "failed to notify channel of error message for action [{}]", action), inner);
            }
        }
    }

当需要发到网络上时,调用asyncSender.sendRequest发送,最终是调用TcpTransport.NodeChannels的sendRequest来发送。

public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
            throws IOException, TransportException {
            if (isClosing.get()) {
                throw new NodeNotConnectedException(node, "connection already closed");
            }
            TcpChannel channel = channel(options.type());
            outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), compress, false);
        }

2.3?对响应的处理

当通过TransportService#sendRequest发送请求时,作为客户端,?需要定义对响应的处理。通过TransportResponseHandler类负责定义对响应的处理。

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-06 13:08:12  更:2022-03-06 13:11:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:12:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码