IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> canal 同步mysql的数据到 clickhouse -> 正文阅读

[大数据]canal 同步mysql的数据到 clickhouse

mysql数据迁移

基于库同步

CREATE DATABASE saic_iov_user ON CLUSTER default
ENGINE = MySQL('10.188.49.10:3306', 'saic_iov_user','iov_user', 'ENC(Mzg2MTE4ODgAYO6BQozrsqT8Gc+MwKN3)');
 
 
CREATE DATABASE saic_iov_merge ON CLUSTER default
ENGINE = MySQL('10.188.49.10:3306', 'saic_iov_merge','iov_merge_rw', 'ENC(ODYzMDQxMDmV516U8QSxtUcrPsdFRcMf)');

1、创建mysql的同步表

CREATE TABLE iov_sit.sync_Team ON CLUSTER default
(
`id` Int64 comment '主键ID',
`mis_id` String default '' comment 'mis公司id',
`team_name` String comment '车队名称',
`mis_team_name` String default '' comment 'mis公司名称',
`formal_id` Int64 comment '司管车队标记',
`area_id` Int64 default 0 comment '所属区域ID',
`create_by` String comment '创建者',
`create_time` DateTime comment '创建时间',
`update_by` String comment '更新者',
`update_time` DateTime comment '更新时间',
`is_deleted` Int8 comment '是否删除',
`team_category` Int8 default 0 comment '公司类别:1正式、2测试、3空装、4维修',
`is_manage_vehicle` Int8 default 0 comment '是否车管',
`is_manage_driver` Int8 default 0 comment '是否司管',
`vehicle_sync_default_team` Int8 comment '默认同步车队 0否 1是',
`aj_id` Int64 default 0 comment '安吉车司管同步的车队id'
)ENGINE = MySQL('10.188.136.118:3306', 'saic_iov_user', 't_team', 'saic_test', 'Sit_saic_test181030');

注意这里没有创建 ts Int64 comment ‘处理时间’ 这一列

2、创建clickhouse的基础表

CREATE TABLE iov_sit.t_team ON CLUSTER cluster_2s_2r
(

`id` Int64 comment '主键ID',
`mis_id` Nullable(String) default '' comment 'mis公司id',
`team_name` Nullable(String) comment '车队名称',
`mis_team_name` Nullable(String) default '' comment 'mis公司名称',
`formal_id` Nullable(Int64) comment '司管车队标记',
`area_id` Nullable(Int64) default 0 comment '所属区域ID',
`create_by` Nullable(String) comment '创建者',
`create_time` Nullable(DateTime) comment '创建时间',
`update_by` Nullable(String) comment '更新者',
`update_time` Nullable(DateTime) comment '更新时间',
`is_deleted` Nullable(Int8) comment '是否删除',
`team_category` Nullable(Int8) default 0 comment '公司类别:1正式、2测试、3空装、4维修',
`is_manage_vehicle` Nullable(Int8) default 0 comment '是否车管',
`is_manage_driver` Nullable(Int8) default 0 comment '是否司管',
`vehicle_sync_default_team` Nullable(Int8) comment '默认同步车队 0否 1是',
`aj_id` Nullable(Int64) default 0 comment '安吉车司管同步的车队id',
`ts` Int64 comment '处理时间'
)ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{database}/{table}/{shard}', '{replica}', ts)
ORDER BY (id)
primary key(id)

3、创建clickhouse队列表

CREATE TABLE iov_sit.q_team ON CLUSTER cluster_2s_2r (
message String
) ENGINE = Kafka('10.188.139.21:9092', 'saic_iov_user_t_team_sit', 'saic_iov_user_t_team_sit_group2', 'JSONAsString');

4、创建clickhouse分布式表

CREATE TABLE iov_sit.dt_team ON CLUSTER cluster_2s_2r as iov_sit.t_team ENGINE = Distributed(cluster_2s_2r, iov_sit, t_team, id);

5、同步插入sql

INSERT INTO iov_sit.dt_team
(id, mis_id, team_name, mis_team_name, formal_id, area_id, create_by, create_time, update_by, update_time, is_deleted, team_category, is_manage_vehicle, is_manage_driver, vehicle_sync_default_team, aj_id)
select id, mis_id, team_name, mis_team_name, formal_id, area_id, create_by, create_time, update_by, update_time, is_deleted, team_category, is_manage_vehicle, is_manage_driver, vehicle_sync_default_team, aj_id
from saic_iov_user.t_team

6、创建clickhouse物化视图

CREATE MATERIALIZED VIEW iov_sit.mv_team ON CLUSTER cluster_2s_2r TO iov_sit.dt_team AS
SELECT visitParamExtractInt(JSONExtractRaw(message, 'data'), 'id') as id,
visitParamExtractString(JSONExtractRaw(message, 'data'), 'mis_id') as mis_id,
visitParamExtractString(JSONExtractRaw(message, 'data'), 'team_name') as team_name,
visitParamExtractString(JSONExtractRaw(message, 'data'), 'mis_team_name') as mis_team_name,
toInt64(visitParamExtractInt(JSONExtractRaw(message, 'data'), 'formal_id')) as formal_id,
toInt64(visitParamExtractInt(JSONExtractRaw(message, 'data'), 'area_id')) as area_id,
visitParamExtractString(JSONExtractRaw(message, 'data'), 'create_by') as create_by,
toDateTimeOrZero(visitParamExtractString(JSONExtractRaw(message, 'data'), 'create_time')) as create_time,
visitParamExtractString(JSONExtractRaw(message, 'data'), 'update_by') as update_by,
toDateTimeOrZero(visitParamExtractString(JSONExtractRaw(message, 'data'), 'update_time')) as update_time,
toInt8(visitParamExtractInt(JSONExtractRaw(message, 'data'), 'is_deleted')) as is_deleted,
toInt8(visitParamExtractInt(JSONExtractRaw(message, 'data'), 'team_category')) as team_category,
toInt8(visitParamExtractInt(JSONExtractRaw(message, 'data'), 'is_manage_vehicle')) as is_manage_vehicle,
toInt8(visitParamExtractInt(JSONExtractRaw(message, 'data'), 'is_manage_driver')) as is_manage_driver,
toInt8(visitParamExtractInt(JSONExtractRaw(message, 'data'), 'vehicle_sync_default_team')) as vehicle_sync_default_team,
toInt64(visitParamExtractInt(JSONExtractRaw(message, 'data'), 'aj_id')) as aj_id,
visitParamExtractInt(JSONExtractRaw(message, 'data'), 'ts') as ts from iov_sit.q_team;
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-26 11:47:11  更:2022-04-26 11:51:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 10:48:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码