IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Doris FE启动流程源码详细解析 -> 正文阅读

[大数据]Doris FE启动流程源码详细解析

Doris FE启动流程源码详细解析

一、简介

Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。

Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!


二、名词解释

  • FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
  • BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
  • bdbje:Oracle Berkeley DB Java Edition (opens new window)。在 Doris 中,我们使用 bdbje 完成元数据操作日志的持久化、FE 高可用等功能。

三、流程图

在这里插入图片描述


四、源码分析

下载Doris源码详细步骤:https://doris.apache.org/zh-CN/developer-guide/fe-idea-dev.html#_1-%E7%8E%AF%E5%A2%83%E5%87%86%E5%A4%87
  1. Doris FE启动步骤

    我们先看看 FE启动类代码:

    if (Strings.isNullOrEmpty(dorisHomeDir)) {
            System.err.println("env DORIS_HOME is not set.");
            return;
        }
    
        if (Strings.isNullOrEmpty(pidDir)) {
            System.err.println("env PID_DIR is not set.");
            return;
        }
    
        CommandLineOptions cmdLineOpts = parseArgs(args);
    
        try {
            // 创建 pid 文件
            if (!createAndLockPidFile(pidDir + "/fe.pid")) {
                throw new IOException("pid file is already locked.");
            }
    
            // 初始化 config文件
            Config config = new Config();
            config.init(dorisHomeDir + "/conf/fe.conf");
            // Must init custom config after init config, separately.
            // Because the path of custom config file is defined in fe.conf
            config.initCustom(Config.custom_config_dir + "/fe_custom.conf");
    
            LdapConfig ldapConfig = new LdapConfig();
            if (new File(dorisHomeDir + "/conf/ldap.conf").exists()) {
                ldapConfig.init(dorisHomeDir + "/conf/ldap.conf");
            }
    
            // check it after Config is initialized, otherwise the config 'check_java_version' won't work.
            if (!JdkUtils.checkJavaVersion()) {
                throw new IllegalArgumentException("Java version doesn't match");
            }
    
            Log4jConfig.initLogging(dorisHomeDir + "/conf/");
    
            // set dns cache ttl
            java.security.Security.setProperty("networkaddress.cache.ttl", "60");
    
            // check command line options
            checkCommandLineOptions(cmdLineOpts);
    
            LOG.info("Palo FE starting...");
    
            //FE Address 初始化
            FrontendOptions.init();
    
            // 检查端口是否正常
            checkAllPorts();
    
            if (Config.enable_bdbje_debug_mode) {
                // Start in BDB Debug mode
                BDBDebugger.get().startDebugMode(dorisHomeDir);
                return;
            }
    
            // 初始化 Catelog 并且等待加载完成
            Catalog.getCurrentCatalog().initialize(args);
            Catalog.getCurrentCatalog().waitForReady();
    
    
            // 第一步 启动 HttpServer 类
            // 第二步 启动 FeServer 类
            // 第三步 启动 QeService
            QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled, ExecuteEnv.getInstance().getScheduler());
            FeServer feServer = new FeServer(Config.rpc_port);
    
            feServer.start();
    
            HttpServer httpServer = new HttpServer();
            httpServer.setPort(Config.http_port);
            httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
            httpServer.setAcceptors(Config.jetty_server_acceptors);
            httpServer.setSelectors(Config.jetty_server_selectors);
            httpServer.setWorkers(Config.jetty_server_workers);
            httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads);
            httpServer.setMaxThreads(Config.jetty_threadPool_minThreads);
            httpServer.start();
            
            qeService.start();
    
            ThreadPoolManager.registerAllThreadPoolMetric();
    
            while (true) {
                Thread.sleep(2000);
            }
        } catch (Throwable e) {
            e.printStackTrace();
        }
    

    通过上面代码,我们可以清楚了解FE启动时主要执行以下过程:

    • 初始化 Catalog ,并且等待Catelog加载完成
    • 创建 QeServer ,负责与mysql client 通信
    • 创建 FeServer ,由Thrift Server组成,负责 FE 和 BE 通信
    • 创建 HttpServer ,负责提供Rest API以及Doris FE前端页面接口
  2. CataLog 源码解析

    CataLog 主要职责是维护FE 元数据,接下来我们看看FE启动时,CataLog初始化时,做什么处理:

    // 获取本地节点和helper节点信息
         getSelfHostPort();
         getHelperNodes(args);
    
         // 检查meta文件目录是否创建
         File meta = new File(metaDir);
         if (!meta.exists()) {
             LOG.warn("Doris' meta dir {} does not exist. You need to create it before starting FE", meta.getAbsolutePath());
             throw new Exception(meta.getAbsolutePath() + " does not exist, will exit");
         }
    
         //检查 BDB和Image目录是否创建
         if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
             File bdbDir = new File(this.bdbDir);
             if (!bdbDir.exists()) {
                 bdbDir.mkdirs();
             }
    
             File imageDir = new File(this.imageDir);
             if (!imageDir.exists()) {
                 imageDir.mkdirs();
             }
         } else {
             throw new Exception("Invalid edit log type: " + Config.edit_log_type);
         }
    
         // 初始化插件管理
         pluginMgr.init();
         auditEventProcessor.start();
    
         // 2.获取集群ID和角色(Observer or Follower)
         getClusterIdAndRole();
    
         // 3. 首次加载image文件和回放Elog日志
         this.editLog = new EditLog(nodeName);
         loadImage(this.imageDir); // 加载image文件
         editLog.open(); // 夹杂bdb环境配置
         this.globalTransactionMgr.setEditLog(editLog);
         this.idGenerator.setEditLog(editLog);
    
         // 4. 创建加载和导出作业标签清理Daemon线程
         createLabelCleaner();
    
         // 5. 创建事务清理Daemon线程
         createTxnCleaner();
    
         // 6. 开始监听线程状态(MASTER/FOLLOWER/OBSERVER状态转换,以及leader选举工作和元数据同步工作)
         createStateListener();
         listener.start();
    

    通过上面源码,我们可以发现,CateLog初始化时,执行以下操作:

    • 首先对Image镜像文件读取数据,和对Elog进行回放操作。
    • 创建加载和导出作业标签清理Daemon线程
    • 创建事务清理Daemon线程
    • 开始监听线程状态(MASTER/FOLLOWER/OBSERVER状态转换,以及leader选举工作和元数据同步工作)(后续更新一篇文章,专门说元数据同步和Leader选举流程源码解析)
  3. QeServer 源码解析

    QeServer职责是与Mysql Client进行通讯,支持Socket和Nio连接,具体源码:

    try {
             HelpModule.getInstance().setUpModule();
         } catch (Exception e) {
             LOG.error("Help module failed, because:", e);
         }
         this.port = port;
         if (nioEnabled) {
             mysqlServer = new NMysqlServer(port, scheduler);
         } else {
             mysqlServer = new MysqlServer(port, scheduler);
         }
    

    当nioEnabled(可配置) 为true时,使用Nio进行通讯,采用这种方式通信的好处是:

    • 同步非阻塞IO
    • IO是面向流的,NIO是面向缓冲区的
    • NIO引入了选择器的概念,选择器用于监听多个通道的事件
  4. FeServer 源码解析

FeServer职责是负责FE和BE之间通信。

try {
            switch (type) {
                case SIMPLE:
                    createSimpleServer();
                    break;
                case THREADED_SELECTOR:
                    createThreadedServer();
                    break;
                default:
                    createThreadPoolServer();
            }
        } catch (TTransportException ex) {
            LOG.warn("create thrift server failed.", ex);
            throw new IOException("create thrift server failed.", ex);
        }

        ThriftServerEventProcessor eventProcessor = new ThriftServerEventProcessor(this);
        server.setServerEventHandler(eventProcessor);

        serverThread = new Thread(new Runnable() {
            @Override
            public void run() {
                server.serve();
            }
        });
        serverThread.setDaemon(true);
        serverThread.start();

FE的Thrift使用的服务模型分为三种:

  • SIMPLE:一般不适用于生产环境,仅限于测试使用。
  • THREADED_SELECTOR:非阻塞式I/O模型,即主从 Reactor 模型,该模型能及时响应大量的并发连接请求,在多数场景下有较好的表现。
  • THREAD_POOL:阻塞式I/O模型,使用线程池处理用户连接,并发连接数受限于线程池的数量,如果能提前预估并发请求的数量,并且能容忍足够多的线程资源开销,该模型会有较好的性能表现,默认使用该服务模型
  1. HttpServer 源码解析
    HttpServer职责主要是为Rest API和doris Web页面提供接口服务,源码如下:
    Map<String, Object> properties = new HashMap<>();
        properties.put("server.port", port);
        properties.put("server.servlet.context-path", "/");
        properties.put("spring.resources.static-locations", "classpath:/static");
        properties.put("spring.http.encoding.charset", "UTF-8");
        properties.put("spring.http.encoding.enabled", true);
        properties.put("spring.http.encoding.force", true);
        //enable jetty config
        properties.put("server.jetty.acceptors", this.acceptors);
        properties.put("server.jetty.max-http-post-size", this.maxHttpPostSize);
        properties.put("server.jetty.selectors", this.selectors);
        //Worker thread pool is not set by default, set according to your needs
        if(this.workers > 0) {
            properties.put("server.jetty.workers", this.workers);
        }
        // This is to disable the spring-boot-devtools restart feature.
        // To avoid some unexpected behavior.
        System.setProperty("spring.devtools.restart.enabled", "false");
        // Value of `DORIS_HOME_DIR` is null in unit test.
        if (PaloFe.DORIS_HOME_DIR != null) {
            System.setProperty("spring.http.multipart.location", PaloFe.DORIS_HOME_DIR);
        }
        System.setProperty("spring.banner.image.location", "doris-logo.png");
        if (FeConstants.runningUnitTest) {
            // this is currently only used for unit test
            properties.put("logging.config", getClass().getClassLoader().getResource("log4j2.xml").getPath());
        } else {
            properties.put("logging.config", Config.custom_config_dir + "/" + SpringLog4j2Config.SPRING_LOG_XML_FILE);
        }
        new SpringApplicationBuilder()
                .sources(HttpServer.class)
                .properties(properties)
                .run(new String[]{});
    

HttpServer继承了SpringBootServletInitializer,同时使用了SpringApplicationBuilder类,那么我们就可以很清楚知道,使用Springboot框架提供Rest Api服务。


五、问题思考

  1. CataLog 如何对Elog进行回放?
  2. ELog 日志如何实现数据同步?
  3. BDB 如何存储元数据的?
  4. 说说Doris FE Leader选举流程
  5. Doris FE Leader节点如何判断non-Leader节点是否在线?leader和non-leader长时间失联会导致non-leader提供过期元数据,此问题如何解决?
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 17:49:30  更:2022-04-18 17:51:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:43:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码