IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> doris计算分析 -> 正文阅读

[网络协议]doris计算分析

exec_plan_fragment

fe:

1.fe-core/src/main/java/org/apache/doris/qe/Coordinator.java

public void exec() throws Exception {

?service FrontendService {
718 ? ? TGetDbsResult getDbNames(1:TGetDbsParams params)
719 ? ? TGetTablesResult getTableNames(1:TGetTablesParams params)
720 ? ? TDescribeTableResult describeTable(1:TDescribeTableParams params)
721 ? ? TShowVariableResult showVariables(1:TShowVariableRequest params)

? ? ? ? ? ?// be在执行plan_fragement_exexutor.cpp执行计划时候,上报执行状态
722 ? ? TReportExecStatusResult reportExecStatus(1:TReportExecStatusParams params)
723? ? ?// be在完成比如create table语句后,发送
724 ? ? MasterService.TMasterResult finishTask(1:MasterService.TFinishTaskRequest request)

? ? ? ? ? ?//
725 ? ? MasterService.TMasterResult report(1:MasterService.TReportRequest request)

be:

1.exec_plan_fragment rpc接口,_exec_env->fragment_mgr()->exec_plan_fragment( backend_service.cpp
2.runtime/fragment_mgr.cpp执行exec_plan_fragment
?std::shared_ptr<FragmentExecState> exec_state;
exec_state->prepare(params));
FragmentExecState::prepare(
_executor.prepare(params);
PlanFragmentExecutor _executor;
3.runtime/plan_fragment_executor.cpp
ExecNode::create_tree(

finish_task

fe: 将任务从AgentTaskQueue中移除

文件名master/MasterImpl.java

public TMasterResult finishTask(TFinishTaskRequest request) {

finishCreateReplica

AgentTaskQueue.removeTask(? ? 将任务从?AgentTaskQueue移除

be:完成任务发送finishTask rpc请求

文件名be/src/agent/task_worker_pool.cpp?

void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) {? ? ?

submit_tasks

service BackendService {

AgentService.TAgentResult submit_tasks(1:list<AgentService.TAgentTaskRequest> tasks);

be:? 将task放入threadpool中出来

be/src/agent/agent_server.cpp? task_worker_pool.cpp? ? ?

void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {? ?

if (_register_task_info(task_type, signature)) { 添加当前be正在处理的任务

_finish_task(finish_task_request); 发送finishTask rpc给fe
_remove_task_info(agent_task_req.task_type, agent_task_req.signature); 将任务从be中移除

fe:

report

be:每隔interval时间上报be当前正在处理的任务

be/src/agent/agent_server.cpp? ?task_worker_pool.cpp

void TaskWorkerPool::_report_task_worker_thread_callback() {? ?

_handle_report(request, ReportType::TASK); ? ? ? ?

} while (!_stop_background_threads_latch.wait_for(
? ? ? ? ? ? MonoDelta::FromSeconds(config::report_task_interval_seconds)));? ?

fe:

rpc 接口:src/main/java/org/apache/doris/master/MasterImpl.java:? ?reportHandler.handleReport(request);

ReportHandler.java: ? ?public TMasterResult handleReport(TReportRequest request) throws TException {

?ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion);
? ? ? ? try {

? ? ? ? ? ? ?增加任务
? ? ? ? ? ? putToQueue(reportTask);

取出任务:

? ?@Override
? ? protected void runOneCycle() {
? ? ? ? while (true) {
? ? ? ? ? ? ReportTask task = null;
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? task = reportQueue.take();
? ? ? ? ? ? ? ? task.exec();
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? LOG.warn("got interupted exception when executing report", e);
? ? ? ? ? ? }
? ? ? ? }

执行任务

protected void exec() {
? ? ? ? ? ? if (tasks != null) {
? ? ? ? ? ? ? ? ReportHandler.taskReport(beId, tasks);
? ? ? ? ? ? }

List<AgentTask> diffTasks = AgentTaskQueue.getDiffTasks(backendId, runningTasks);

diffTask是fe中存在的任务,而be节点中不存在的任务

?if (task.shouldResend(taskReportTime)) { // 重做任务
? ? ? ? ? ? ? ? batchTask.addTask(task);
? ? ? ? ? ? }

reportExecStatus:

runtime/fragment_mgr.cpp 上报执行心跳

void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile,
bool done) {中上报

coord->reportExecStatus(res, params);

初始化FragmentExecState对象会设置callback

FragmentExecState::FragmentExecState(const TUniqueId& query_id,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?const TUniqueId& fragment_instance_id, int backend_num,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?ExecEnv* exec_env, const TNetworkAddress& coord_addr)
? ? ? ? : _query_id(query_id),
? ? ? ? ? _fragment_instance_id(fragment_instance_id),
? ? ? ? ? _backend_num(backend_num),
? ? ? ? ? _exec_env(exec_env),
? ? ? ? ? _coord_addr(coord_addr),
? ? ? ? ? _executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? this, std::placeholders::_1, std::placeholders::_2,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? std::placeholders::_3)),

plan_fragment_executor执行的时候会上报???????

void PlanFragmentExecutor::report_profile() { 上报exec正在执行心跳

send_report(false);

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-09 18:53:50  更:2022-04-09 18:56:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/31 4:19:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码