IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Clickhouse外部储存表引擎(HDFS、MySQL、Kafka) -> 正文阅读

[大数据]Clickhouse外部储存表引擎(HDFS、MySQL、Kafka)

外部储存表引擎只负责元数据管理,和数据查询

1. HDFS表引擎

HDFS的安装请参考Centos7上Hadoop 3.3.1的全分布式安装过程

1.1 准备工作

  1. HDFS表引擎不支持Kerberos认证,如果开启,请关闭HDFS的Kerberos认证
  2. HDFS上新建clickhouse目录
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# hadoop fs -mkdir /clickhouse
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# hadoop fs -ls /
Found 1 items
drwxr-xr-x   - root supergroup          0 2021-07-21 08:15 /clickhouse
[root@clickhouse1 ~]#
  1. 添加新的Clickhouse集群
    创建分布式HDFS表时,不需要副本功能,所以我们添加一个新的Clickhouse集群,在/etc/clickhouse-server/config.d/metrika.xmlclickhouse_remote_servers标签里,添加以下内容:
        <sharding_cluster> 
            <shard>
                <replica>
                    <host>clickhouse1</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>default123</password>
                    <weight>1</weight>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>clickhouse2</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>default123</password>
                    <weight>1</weight>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>clickhouse3</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>default123</password>
                    <weight>1</weight>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>clickhouse4</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>default123</password>
                    <weight>1</weight>
                </replica>
            </shard>
        </sharding_cluster>

然后重启Clickhouse服务器

1.2 HDFS表负责读写

  1. 分布式表的创建
clickhouse1 :)
clickhouse1 :) create table hdfs_table1_local on cluster sharding_cluster(
:-] id UInt32,
:-] name String
:-] ) engine = HDFS('hdfs://clickhouse1:9099/clickhouse/hdfs_table1', 'CSV');

CREATE TABLE hdfs_table1_local ON CLUSTER sharding_cluster
(
    `id` UInt32,
    `name` String
)
ENGINE = HDFS('hdfs://clickhouse1:9099/clickhouse/hdfs_table1', 'CSV')

Query id: 46414107-0399-48dd-99bc-6839b16a8fdd

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   3 │                1 │
│ clickhouse3 │ 9000 │      0 │       │                   2 │                1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │      0 │       │                   1 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.327 sec. 

clickhouse1 :)  

其中数据的文件格式,常用的有CSV、TSV、JSON

这里我们只分布式的创建本地表,因为Distributed表引擎insert数据,需要Zookeeper的协同,而HDFS表引擎不支持设置Zookeeper路径;但是分布式的本地表可以进行读写分离

  1. 插入数据
clickhouse1 :) 
clickhouse1 :) insert into hdfs_table1_local select number, concat('code', toString(number)) code from numbers(5);

INSERT INTO hdfs_table1_local SELECT
    number,
    concat('code', toString(number)) AS code
FROM numbers(5)

Query id: 0fd035df-7ab1-424e-9efc-bffbe42c32cc

Ok.

0 rows in set. Elapsed: 0.043 sec. 

clickhouse1 :)

只能插入一次数据,如果再次执行,会抱异常DB::Exception: File: /clickhouse/hdfs_table1 is already exists.

  1. 查询数据
clickhouse1 :)
clickhouse1 :) select * from hdfs_table1_local;

SELECT *
FROM hdfs_table1_local

Query id: 23a1b6b2-7f8e-4508-ba62-f30b09706981

┌─id─┬─name──┐
│  0 │ code0 │
│  1 │ code1 │
│  2 │ code2 │
│  3 │ code3 │
│  4 │ code4 │
└────┴───────┘

5 rows in set. Elapsed: 0.064 sec. 

clickhouse1 :)

  1. HDFS文件查看
[root@clickhouse1 ~]# 
[root@clickhouse1 ~]# hadoop fs -cat /clickhouse/hdfs_table1
0,"code0"
1,"code1"
2,"code2"
3,"code3"
4,"code4"
[root@clickhouse1 ~]#

insert数据才会生成hdfs_table1文件,drop表时,hdfs_table1文件不会删除

1.3 HDFS表负责读, 不负责写

  1. HDFS路径的匹配规则
规则含义示例示例说明
绝对路径指定单个文件/clickhouse/hdfs_table2指定hdfs_table2单个文件
*通配符指定目录下的所有文件/clickhouse/*指定clickhouse目录下的所有文件
?通配符匹配文件名的一个字符/clickhouse/hdfs_table?可以匹配hdfs_table2、hdfs_tableA等
{M…N}数字区间/clickhouse/hdfs_table{2…3}可以匹配hdfs_table2、hdfs_table3
  1. 数据准备
[root@clickhouse1 ~]# 
[root@clickhouse1 ~]# hadoop fs -cat /clickhouse/hdfs_table2_1
0,"code0"
1,"code1"
2,"code2"
3,"code3"
4,"code4"
[root@clickhouse1 ~]# 
[root@clickhouse1 ~]# hadoop fs -cat /clickhouse/hdfs_table2_2
5,code5
6,code6
7,code7
8,code8
9,code9
[root@clickhouse1 ~]# 

数据不能有空行,String类型的字段有无双引号都可以

  1. 创建分布式本地表
clickhouse1 :) 
clickhouse1 :) create table hdfs_table2_local on cluster sharding_cluster(
:-] id UInt32,
:-] name String
:-] ) engine = HDFS('hdfs://clickhouse1:9099/clickhouse/hdfs_table2_{1..2}', 'CSV');

CREATE TABLE hdfs_table2_local ON CLUSTER sharding_cluster
(
    `id` UInt32,
    `name` String
)
ENGINE = HDFS('hdfs://clickhouse1:9099/clickhouse/hdfs_table2_{1..2}', 'CSV')

Query id: 94cdfdf1-f00d-4d21-bbc9-7f451aa656a5

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   3 │                0 │
│ clickhouse3 │ 9000 │      0 │       │                   2 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   1 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.188 sec. 

clickhouse1 :) 
  1. 查询数据
clickhouse1 :)
clickhouse1 :) select * from hdfs_table2_local;

SELECT *
FROM hdfs_table2_local

Query id: f61d2831-0a00-475d-a205-31d64cde6986

┌─id─┬─name──┐
│  0 │ code0 │
│  1 │ code1 │
│  2 │ code2 │
│  3 │ code3 │
│  4 │ code4 │
└────┴───────┘
┌─id─┬─name──┐
│  5 │ code5 │
│  6 │ code6 │
│  7 │ code7 │
│  8 │ code8 │
│  9 │ code9 │
└────┴───────┘

10 rows in set. Elapsed: 0.091 sec. 

clickhouse1 :)

一个文件形成一个分区

2. MySQL表引擎

Mysql的安装请参考centos7安装mysql8.0.25版本

2.1 表引擎的基本操作

  1. Mysql数据库的数据
mysql> 
mysql> select * from test_db.clickhouse_mysql_test;
+----+-------+-----------+
| id | name  | city      |
+----+-------+-----------+
|  1 | name1 | Beijing   |
|  2 | name2 | Shanghai  |
|  3 | name3 | Guangzhou |
+----+-------+-----------+
3 rows in set (0.01 sec)

mysql> 
  1. 创建分布式本地表
clickhouse1 :) 
clickhouse1 :) create table clickhouse_mysql_test_local on cluster sharding_cluster(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = MySQL('clickhouse1:3306', 'test_db', 'clickhouse_mysql_test', 'root', 'Root_123', 0, "update id = id + 1, name = concat('update_', name)");

CREATE TABLE clickhouse_mysql_test_local ON CLUSTER sharding_cluster
(
    `id` UInt32,
    `name` String,
    `city` String
)
ENGINE = MySQL('clickhouse1:3306', 'test_db', 'clickhouse_mysql_test', 'root', 'Root_123', 0, `update id = id + 1, name = concat('update_', name)`)

Query id: 7d3f6fed-c3f3-4482-892a-bd37dc9d7721

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   3 │                0 │
│ clickhouse3 │ 9000 │      0 │       │                   2 │                0 │
│ clickhouse1 │ 9000 │      0 │       │                   1 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.192 sec. 

clickhouse1 :) 

第6个参数默认为0;当为1时,clickhouse的insert into有replace into的功能(replace into当主键或唯一索引相同,删除原数据,插入新数据),此时第7个参数必须为0

第7个参数默认为0;当为update字符串时,且Clickhouse insert的数据的主键或唯一索引与原数据相同时,则原数据应用update字符串,update没有的字段保留原字段数据

  1. 插入数据
clickhouse1 :) 
clickhouse1 :) insert into clickhouse_mysql_test_local(id, name, city) values(3, 'name4', 'Shenzhen');

INSERT INTO clickhouse_mysql_test_local (id, name, city) VALUES

Query id: ebfb1435-aa19-4c38-9226-98775006d47c

Ok.

1 rows in set. Elapsed: 0.233 sec. 

clickhouse1 :) 
  1. 查询数据
clickhouse1 :) 
clickhouse1 :) select * from clickhouse_mysql_test_local;

SELECT *
FROM clickhouse_mysql_test_local

Query id: 1b8f71e0-86d1-448d-9cf7-0a8614419665

┌─id─┬─name─────────┬─city──────┐
│  1 │ name1        │ Beijing   │
│  2 │ name2        │ Shanghai  │
│  4 │ update_name3 │ Guangzhou │
└────┴──────────────┴───────────┘

3 rows in set. Elapsed: 0.009 sec. 

clickhouse1 :)

2.2 表引擎配合物化视图

  1. 创建表
clickhouse1 :) 
clickhouse1 :) create table table_mysql_local on cluster sharding_ha(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = ReplicatedMergeTree('/clickhouse/tables/table_mysql/{shard}', '{replica}')
:-] order by id
:-] primary key id
:-] partition by city;

CREATE TABLE table_mysql_local ON CLUSTER sharding_ha
(
    `id` UInt32,
    `name` String,
    `city` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/table_mysql/{shard}', '{replica}')
PARTITION BY city
PRIMARY KEY id
ORDER BY id

Query id: 8aaf2bdd-d8c1-4e3c-a58d-e5adae2552c1

ysql_all on cluster sharding_ha(
id UInt32,
name String,
city String
) engine = Distributed(sharding_ha, default, table_mysql_local, id)
populate 
as select * from clickhouse_mysql_test_local;
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   3 │                0 │
│ clickhouse3 │ 9000 │      0 │       │                   2 │                0 │
│ clickhouse1 │ 9000 │      0 │       │                   1 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.191 sec. 

clickhouse1 :) 
clickhouse1 :) create materialized view view_mysql_all on cluster sharding_ha(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = Distributed(sharding_ha, default, table_mysql_local, id)
:-] populate 
:-] as select * from clickhouse_mysql_test_local;

CREATE MATERIALIZED VIEW view_mysql_all ON CLUSTER sharding_ha
(
    `id` UInt32,
    `name` String,
    `city` String
)
ENGINE = Distributed(sharding_ha, default, table_mysql_local, id) POPULATE AS
SELECT *
FROM clickhouse_mysql_test_local

Query id: e9d07014-3bde-4026-8743-6e7ade47cbb7

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   3 │                0 │
│ clickhouse1 │ 9000 │      0 │       │                   2 │                0 │
│ clickhouse3 │ 9000 │      0 │       │                   1 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.282 sec. 

clickhouse1 :) 
  1. 查询数据
clickhouse1 :) 
clickhouse1 :) select * from view_mysql_all;

SELECT *
FROM view_mysql_all

Query id: 8b523a07-8c1d-48ab-8d51-544e88f623c0

┌─id─┬─name──┬─city─────┐
│  2 │ name2 │ Shanghai │
└────┴───────┴──────────┘
┌─id─┬─name─────────┬─city──────┐
│  4 │ update_name3 │ Guangzhou │
└────┴──────────────┴───────────┘
┌─id─┬─name──┬─city────┐
│  1 │ name1 │ Beijing │
└────┴───────┴─────────┘

3 rows in set. Elapsed: 0.014 sec. 

clickhouse1 :)

表引擎不支持update和delete,所以table_mysql_local表可以使用CollapsingMergeTree引擎来解决

3. Kafka表引擎

Kafka的安装可以参考在Centos7上全分布式安装kafka2.8.0

Kafka表引擎能实时接收Kafka的数据,但不支持Exactly Once语义

3.1 准备Kafka测试数据

  1. 创建topic
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# kafka_2.13-2.8.0/bin/kafka-topics.sh --bootstrap-server clickhouse1:9092,clickhouse2:9092,clickhouse3:9092 --create --topic clickhouse_kafka_test --replication-factor 1 --partitions 3
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic clickhouse_kafka_test.
[root@clickhouse1 ~]#
  1. 发送测试数据
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# kafka_2.13-2.8.0/bin/kafka-console-producer.sh --bootstrap-server clickhouse1:9092,clickhouse2:9092,clickhouse3:9092 --topic clickhouse_kafka_test
>{"id":1,"name":"name1","city":"Beijing"}
>{"id":2,"name":"name2","city":"Shanghai"}
>{"id":3,"name":"name3","city":"Guangzhou"}
>

3.2 Kafka表引擎操作(第二次select查询不到数据)

  1. 设置auto.offset.reset参数

clickhouse的kafka表引擎消费时,auto.offset.reset默认是latest,编辑/etc/clickhouse-server/config.xml修改, 内容如下,然后重启clickhouse

    <kafka>
        <auto_offset_reset>earliest</auto_offset_reset>
    </kafka>
  1. 创建表
clickhouse1 :) 
clickhouse1 :) create table clickhouse_kafka_test_local(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = Kafka()
:-] settings 
:-] kafka_broker_list = 'clickhouse1:9092,clickhouse2:9092,clickhouse3:9092',
:-] kafka_topic_list = 'clickhouse_kafka_test',
:-] kafka_group_name = 'clickhouse_kafka_test_group',
:-] kafka_format = 'JSONEachRow',
:-] kafka_skip_broken_messages = 100;

CREATE TABLE clickhouse_kafka_test_local
(
    `id` UInt32,
    `name` String,
    `city` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'clickhouse1:9092,clickhouse2:9092,clickhouse3:9092', kafka_topic_list = 'clickhouse_kafka_test', kafka_group_name = 'clickhouse_kafka_test_group', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 100

Query id: 20d1cb23-e3f7-40be-b227-d9a204fd7451

Ok.

0 rows in set. Elapsed: 0.035 sec. 

clickhouse1 :)
  1. 查询数据
clickhouse1 :) select * from clickhouse_kafka_test_local;

SELECT *
FROM clickhouse_kafka_test_local

Query id: 40ab902f-54b5-4cd2-98c6-d20c9985b9e8

┌─id─┬─name──┬─city──────┐
│  2 │ name2 │ Shanghai  │
│  1 │ name1 │ Beijing   │
│  3 │ name3 │ Guangzhou │
└────┴───────┴───────────┘

3 rows in set. Elapsed: 0.566 sec. 

clickhouse1 :) select * from clickhouse_kafka_test_local;

SELECT *
FROM clickhouse_kafka_test_local

Query id: 19479a67-9734-4f5e-99e8-6b708e0f5fb3

Ok.

0 rows in set. Elapsed: 5.013 sec. 

clickhouse1 :)

由上面可见,select一次数据就代表消费完了,再次select查询不到数据了,因为从kafka新的consumer offset消费数据了

3.2 Kafka表引擎配合物化视图(多次select查询到数据)

  1. 准备工作

删除clickhouse_kafka_test_local表

clickhouse1 :) drop table clickhouse_kafka_test_local;

删除kafka的topic clickhouse_kafka_test

[root@clickhouse1 ~]# kafka_2.13-2.8.0/bin/kafka-topics.sh --bootstrap-server clickhouse1:9092,clickhouse2:9092,clickhouse3:9092 --delete --topic clickhouse_kafka_test

再次执行步骤3.1

  1. 创建表

创建对接Kafka的拥有Kafka表引擎的分布式数据渠道表

clickhouse1 :) 
clickhouse1 :) create table clickhouse_kafka_test_channelLocal on cluster sharding_cluster(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = Kafka()
:-] settings 
:-] kafka_broker_list = 'clickhouse1:9092,clickhouse2:9092,clickhouse3:9092',
:-] kafka_topic_list = 'clickhouse_kafka_test',
:-] kafka_group_name = 'clickhouse_kafka_test_group',
:-] kafka_format = 'JSONEachRow',
:-] kafka_skip_broken_messages = 100;

CREATE TABLE clickhouse_kafka_test_channelLocal ON CLUSTER sharding_cluster
(
    `id` UInt32,
    `name` String,
    `city` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'clickhouse1:9092,clickhouse2:9092,clickhouse3:9092', kafka_topic_list = 'clickhouse_kafka_test', kafka_group_name = 'clickhouse_kafka_test_group', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 100

Query id: a719f376-838f-4494-bcee-e4d470b34ae2

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   3 │                0 │
│ clickhouse1 │ 9000 │      0 │       │                   2 │                0 │
│ clickhouse3 │ 9000 │      0 │       │                   1 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.141 sec. 

clickhouse1 :) 

创建一个分布式的物化视图,用于同步kafka数据渠道表的数据,到分布式的副本合并树引擎表

clickhouse1 :) 
clickhouse1 :) create materialized view clickhouse_kafka_test_viewAll on cluster sharding_ha(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = Distributed(sharding_ha, default, clickhouse_kafka_test_storageLocal, id)
:-] populate
:-] as select * from clickhouse_kafka_test_channelLocal;

CREATE MATERIALIZED VIEW clickhouse_kafka_test_viewAll ON CLUSTER sharding_ha
(
    `id` UInt32,
    `name` String,
    `city` String
)
ENGINE = Distributed(sharding_ha, default, clickhouse_kafka_test_storageLocal, id) POPULATE AS
SELECT *
FROM clickhouse_kafka_test_channelLocal

Query id: e4084a86-f777-45e7-bef7-e2f568d93db2

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │      0 │       │                   3 │                3 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   2 │                1 │
│ clickhouse3 │ 9000 │      0 │       │                   1 │                1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 15.574 sec. 

clickhouse1 :) 

创建一个分布式的副本合并树引擎表,用于储存物化视图的数据

clickhouse1 :) 
clickhouse1 :) create table clickhouse_kafka_test_storageLocal on cluster sharding_ha(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = ReplicatedMergeTree('/clickhouse/tables/clickhouse_kafka_test/{shard}', '{replica}')
:-] order by id
:-] primary key id
:-] partition by city;

CREATE TABLE clickhouse_kafka_test_storageLocal ON CLUSTER sharding_ha
(
    `id` UInt32,
    `name` String,
    `city` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/clickhouse_kafka_test/{shard}', '{replica}')
PARTITION BY city
PRIMARY KEY id
ORDER BY id

Query id: 9506837c-f1ca-4c0f-aa9a-2370cc423546

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse3 │ 9000 │      0 │       │                   3 │                3 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   2 │                0 │
│ clickhouse1 │ 9000 │      0 │       │                   1 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.194 sec. 

clickhouse1 :)
  1. 查询数据
clickhouse1 :) 
clickhouse1 :) select * from clickhouse_kafka_test_viewAll;

SELECT *
FROM clickhouse_kafka_test_viewAll

Query id: 26073fb1-8011-465a-b52e-bec15174e451

┌─id─┬─name──┬─city─────┐
│  2 │ name2 │ Shanghai │
└────┴───────┴──────────┘
┌─id─┬─name──┬─city────┐
│  1 │ name1 │ Beijing │
└────┴───────┴─────────┘
┌─id─┬─name──┬─city──────┐
│  3 │ name3 │ Guangzhou │
└────┴───────┴───────────┘

3 rows in set. Elapsed: 0.017 sec. 

clickhouse1 :) select * from clickhouse_kafka_test_viewAll;

SELECT *
FROM clickhouse_kafka_test_viewAll

Query id: a7059da9-40b7-4143-b91c-708d807071eb

┌─id─┬─name──┬─city─────┐
│  2 │ name2 │ Shanghai │
└────┴───────┴──────────┘
┌─id─┬─name──┬─city────┐
│  1 │ name1 │ Beijing │
└────┴───────┴─────────┘
┌─id─┬─name──┬─city──────┐
│  3 │ name3 │ Guangzhou │
└────┴───────┴───────────┘

3 rows in set. Elapsed: 0.009 sec. 

clickhouse1 :)

两次select都是能查询到数据的,因为此时select查询的数据是clickhouse_kafka_test_storageLocal表的数据

  1. 装载和卸载物化视图

如果想停止同步,可以卸载物化视图

clickhouse1 :)
clickhouse1 :) detach table clickhouse_kafka_test_viewAll on cluster sharding_ha;

DETACH TABLE clickhouse_kafka_test_viewAll ON CLUSTER sharding_ha

Query id: dc221033-fc04-4e95-95ad-fbe9a052d27f

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   3 │                0 │
│ clickhouse3 │ 9000 │      0 │       │                   2 │                0 │
│ clickhouse1 │ 9000 │      0 │       │                   1 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.135 sec. 

clickhouse1 :)

想再次开启同步,可以装载物化视图

clickhouse1 :)
clickhouse1 :) ATTACH TABLE clickhouse_kafka_test_viewAll on cluster sharding_ha;

ATTACH TABLE clickhouse_kafka_test_viewAll ON CLUSTER sharding_ha

Query id: 7940821b-8d55-452a-840a-9da97082e1de

┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │      0 │       │                   3 │                0 │
│ clickhouse3 │ 9000 │      0 │       │                   2 │                0 │
│ clickhouse1 │ 9000 │      0 │       │                   1 │                0 │
│ clickhouse4 │ 9000 │      0 │       │                   0 │                0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘

4 rows in set. Elapsed: 0.131 sec. 

clickhouse1 :)

3.3 kafka表引擎的参数说明

  • kafka_format(必填):kafka消息的格式,必须是clickhouse支持的格式,如TSV、JSONEachRow、CSV等
  • kafka_row_delimiter:两行之间的分隔符, 默认值是’\0’
  • kafka_num_consumers:消费组中实际消费数据的消费者数量,默认是1;一个partition只能被一个消费者消费
  • kafka_skip_broken_messages:默认是0,表示出现一条解析错误的消息,则停止接收数据;如果设置为10,则表示解析错误的消息总数达到10条时,停止接收数据,前9条错误消息自动忽略
  • stream_poll_timeout_ms:默认500ms,kafka表引擎拉取数据的时间间隔,数据拉取放入缓存,刷新到数据表由两个参数控制,只要一个满足即可:
    • kafka_max_block_size:默认等于max_block_size=65536, 表示一个block写入的数据达到这个值时,刷新到数据表
    • stream_flush_interval_ms:缓存数据刷新到数据表的时间间隔
  • kafka_commit_every_batch:一个block由多个消息batch组成,参数默认为0,表示一个block写入数据表后,才提交kafka consumer offset;如果为1,则表示一个batch写入数据后,再提交kafka consumer offset
  • 自定义参数:kafka表引擎底层与kafka通信的部分是基于librdkafka实现的,自定义参数看这里,自定义参数需要在/etc/clickhouse-server/config.xml的kafka标签定义
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-27 16:17:12  更:2021-07-27 16:18:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/9 4:27:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码