IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Clickhouse 从S3/Hive导入数据 -> 正文阅读

[大数据]Clickhouse 从S3/Hive导入数据

背景

????????我们的埋点数据上传到S3,大概是每天10亿条的数据量级别。最近花了一些时间思考和学习如何将每天如此大量的数据从S3导入到Clickhouse,为后续的实时查询做准备。

方案一

? ? ? ? 1. 先将S3的数据导入到hive,这一步操作比较简单,创建一个外部表即可,按日期字段进行分区。

CREATE TABLE `s3_to_hive_test`(
 	id             ,
	aaa            ,
	bbb            ,
	ccc            ,
	ddd            ,
	……              )
PARTITIONED BY ( 
  `ingestion_date` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://host/bucket/path'
TBLPROPERTIES (
  'last_modified_by'='hadoop', 
  'last_modified_time'='1625729438', 
  'parquet.column.index.access'='true', 
  'spark.sql.create.version'='2.2 or prior', 
  'spark.sql.sources.schema.numPartCols'='1', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{……}', 
  'spark.sql.sources.schema.partCol.0'='ingestion_date')

????????2. 每天定时从hive把前一天的数据导入到Clickhouse,这里可以借助waterdrop(现改名为seatunnel)工具进行导入,定时调度可以自己写一个shell脚本,如果导入后还需要进行数据清洗、聚合等,推荐DolphinScheduler

#waterdrop的config文件

spark {
 spark.app.name = "Waterdrop"
 spark.executor.cores = 1
 spark.executor.memory = "2g" // 这个配置必需填写,否则会使用 sparksql 内置元数据库
 spark.default.parallelism = 12
 spark.driver.memory = "64g"
 spark.sql.catalogImplementation = "hive"
}
input {
 hive {
 pre_sql = "select `id`,`aaa`,`bbb`,`ccc`,`ddd`,…… from default.s3_to_hive_test where ……"
 table_name = "waterdrop_tmp"
 }
}
filter {
}
output {
 clickhouse {
 host = "ch_host:ch_port"
 database = "default"
 username = "***"
 password = "******"
 table = "ch_mergetree_test"
 bulk_size = 200000
 retry = 3
 fields = 
[ 	id             ,
	aaa            ,
	bbb            ,
	ccc            ,
	ddd            ,
	……              ]
 }
}

????????3. 导入后的清洗、聚合等操作。

适用场景:

? ? ? ? 绝大多数的场景都适用,之前我们项目也是采用此方案。目前由于引入K8S进行管理,hive、Spark、waterdrop、DolphinScheduler分属不同容器,要处理太多网络通信的问题,故改用更为简单的方案三。

方案二

? ? ? ? 1. 如方案一,先将S3的数据以外部表的方式导入到hive中并按日期分区。

CREATE TABLE `s3_to_hive_test`(
 	id             ,
	aaa            ,
	bbb            ,
	ccc            ,
	ddd            ,
	……              )
PARTITIONED BY ( 
  `ingestion_date` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://host/bucket/path'
TBLPROPERTIES (
  'last_modified_by'='hadoop', 
  'last_modified_time'='1625729438', 
  'parquet.column.index.access'='true', 
  'spark.sql.create.version'='2.2 or prior', 
  'spark.sql.sources.schema.numPartCols'='1', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{……}', 
  'spark.sql.sources.schema.partCol.0'='ingestion_date')

? ? ? ? 2. Clickhouse建立hive集成表

CREATE TABLE IF NOT EXISTS hive_to_ch_test
(
    `id` String NULL,
    `aaa` String NULL,
    `bbb` String  NULL,
    `ccc` String NULL,
    `ddd` String NULL,
	……              
) ENGINE = Hive('thrift://host:port', 'database', 'table');
PARTITION BY ingestion_date

??????????3. Clickhouse内部将hive集成表的数据导入到MergeTree表

insert into ch_mergetree_test(
	id             ,
	aaa            ,
	bbb            ,
	ccc            ,
	ddd            ,
	……                   )
SELECT
    id,
    ifNull(aaa, ''),
    ifNull(bbb, ''),
    ifNull(ccc, ''),
    ifNull(ddd, ''),
    ……
FROM hive_to_ch_test
WHERE ……

? ? ? ? 4. 后续的数据清洗、聚合等操作

????????在实测中,Clickhouse到22.4版本为止似乎还不支持hive集成表的底层存储为S3这种形式。具体表现为能建立hive集成表,但查询的时候报以下错误:

Query id: bfeb2774-eb2b-4b2c-9230-64bd6d35acfe


0 rows in set. Elapsed: 0.013 sec. 

Received exception from server (version 22.4.5):
Code: 210. DB::Exception: Received from localhost:9000. DB::Exception: Unable to connect to HDFS: InvalidParameter: Cannot parse URI: hdfs://****, missing port or invalid HA configuration     Caused by: HdfsConfigNotFound: Config key: dfs.ha.namenodes.**** not found. (NETWORK_ERROR)

但支持底层为HDFS,如下图所示。(注意:如HDFS是HA模式,需要参考官方文档进行一些额外的配置,否则也会报以上错误)?适用场景:

? ? ? ? Clickhouse版本必须高于22.1,因为hive集成表引擎是在22.1版本才发布的。详见Clickhouse ChangeLog

方案三

?大道至简,直接省略S3导入到Hive这个步骤。

? ? ? ? 1. 建立S3集成表

DROP TABLE IF EXISTS s3_to_ch_test ;

CREATE TABLE s3_to_ch_test ( 
    `id` String NULL,
    `aaa` String NULL,
    `bbb` String  NULL,
    `ccc` String NULL,
    `ddd` String NULL,
     ……             ) 
    ENGINE=S3(concat('https://s3_host/bucket/path/*' ),'accessKey','secretKey', 'Parquet')
SETTINGS input_format_parquet_allow_missing_columns=true ;

? ? ? ? 2. Clickhouse内部将数据从S3集成表导入MergeTree表

insert into ch_mergetree_test(
	id             ,
	aaa            ,
	bbb            ,
	ccc            ,
	ddd            ,
	……                   )
SELECT
    id,
    ifNull(aaa, ''),
    ifNull(bbb, ''),
    ifNull(ccc, ''),
    ifNull(ddd, ''),
    ……
FROM s3_to_ch_test
WHERE ……

? ? ? ? 3. 后续的数据清洗、集成等操作

????????当然,在具体实施过程,还是会遇到一些坑。

坑一、超出内存限制

????????由于我们在S3中存储的文件格式是Parquet类型,Parquet是面向分析型业务的列式存储格式,Clickhouse在处理Parquet文件是内存密集型的。我尝试将output_format_parquet_row_group_size?参数调小,但没有任何作用,仍报上述异常。猜测Clickhouse在查询数据时并不是按文件一个个读取并插入的,而是将所有文件的列为单位装到内存中。因为我单个文件最大也才10G,分配了50G内存仍超出内存。

解决方案:调大内存限制的值。

????????方式1. 临时设置(仅对当前session有效):

set max_memory_usage=150000000000;

select * from system.settings where name='max_memory_usage';

????????方式2. 修改/etc/clickhouse-server/users.xml文件(长期有效)

<max_memory_usage>150000000000</max_memory_usage>

坑二、效率问题

解决方案1:加大内存

????????内存100G的情况下,实测10亿条数据导入到MergeTree表消耗时间4300+ Sec,加到150G消耗时间3800+ Sec。

解决方案2:增加线程数

????????当内存150G,max_insert_threads=1时,耗时3837s

?????????当内存150G,max_insert_threads=2时,耗时1939s

????????当内存150G,max_insert_threads=3时,耗时1407s

????????当内存150G,max_insert_threads=4时,超出内存限制!!

????????虽然增加线程数对导入效率会有明显提升,但不意味着线程数越多越好,因为高线程数是以高内存为代价的,需要根据服务器内存和导入数据量的情况,平衡好max_memory_usage和max_insert_threads的关系。?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-29 19:08:56  更:2022-06-29 19:10:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:58:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码