目录
1.Flink-sql客户端入湖
1.1 启动flink-sql客户端
1.2 创建hive_catalog
?1.3 创建iceberg db
1.4 创建iceberg Table
1.5 创建kafka流表?
1.6 kafka流表插入iceberg表
1.6.1 测试数据的制造方法
1.7 数据查询
2.Api接口代码入湖
1.Flink-sql客户端入湖
1.1 启动flink-sql客户端
进入flink的Home目录xxx/flink/flink-1.11.3
./bin/sql-client.sh embedded shell
[root@xxxx flink-1.11.3]# ./bin/sql-client.sh embedded shell
No default environment specified.
Searching for '/data/bd/flink/flink-1.11.3/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/data/bd/flink/flink-1.11.3/conf/sql-client-defaults.yaml
No session environment specified.
2021-07-06 18:39:15,856 INFO org.apache.hadoop.hive.conf.HiveConf [] - Found configuration file null
Command history file path: /root/.flink-sql-history
?▓██▓██?
▓████??█▓?▓███▓?
▓███▓?? ???▓██? ?
?██? ??▓▓█▓▓?? ?████
██? ??▓███? ?█?█?
?▓█ ███ ▓??██
▓█ ?????▓██▓???▓▓█
█? █ ??? ███▓▓█ ?█???
████? ?▓█▓ ██??? ▓███?
??█▓▓██ ▓█? ▓█?▓██▓ ?█?
▓??▓████? ██ ?█ █▓??█???█?
███▓?██▓ ▓█ █ █▓ ?▓█▓▓█?
?██▓ ?█? █ █? ?█████▓? ██▓??
███? ? █? ▓ ?█ █████??? ?█?▓ ▓?
██▓█ ??▓? ▓███████▓? ?█? ?▓ ▓██▓
?██▓ ▓█ █▓█ ??█████▓▓?? ██?? █ ? ▓█?
▓█▓ ▓█ ██▓ ?▓▓▓▓▓▓▓? ?██▓ ?█?
▓█ █ ▓███▓?? ?▓▓▓███▓ ??? ▓█
██▓ ██? ??▓▓███▓▓▓▓▓██████▓? ▓███ █
▓███? ███ ?▓▓??? ?▓████▓? ??▓? █▓
█▓??▓▓██ ??????????▓██▓? █▓
██ ▓??█ ▓▓▓▓??? ?█▓ ?▓▓██▓ ▓? ??▓
▓█▓ ▓?█ █▓? ??▓▓██? ?▓█? ??????▓█████?
██? ▓█?█? ?▓▓? ▓█ █? ???? ?█?
▓█ ?█▓ ? █? ?█ █▓
█▓ ██ █? ▓▓ ?█▓▓▓?█?
█▓ ?▓██? ▓? ▓█▓?????▓█? ?█
██ ▓█▓? ? ??█?██? ▓▓
▓█? ?█▓?? ?? █?█▓?????██
?██? ?▓▓? ▓██▓?█? ?▓▓▓▓?█▓
?▓██? ▓? ?█▓█ ?????
?▓▓▓▓▓?????????????????????????▓▓ ▓??█?
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
1.2 创建hive_catalog
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://localhost:9000/user/hive/warehouse/'
);
? ?show catalogs显示所有的catalog
Flink SQL> show catalogs;?
default_catalog
hadoop_catalog
hive_catalog
myhive
?1.3 创建iceberg db
use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;
1.4 创建iceberg Table
CREATE TABLE hive_catalog.iceberg_db.iceberg_003 ( uid STRING COMMENT '用户uid',
qipu_id STRING COMMENT '视频id',
stime STRING COMMENT '客户端时间' ) WITH ('connector'='iceberg','write.format.default'='ORC');
1.5 创建kafka流表?
CREATE TABLE kafka_dwd_rr_fact_kafka_all_interact_collect_anti (
uid STRING COMMENT '用户uid',
qipu_id STRING COMMENT '视频id',
stime STRING COMMENT '客户端时间' )
WITH (
? 'connector' = 'kafka', ? 'topic' = 'dwd_rr_fact_kafka_all_interact_collect_anti', ? 'properties.bootstrap.servers' = 'xxx:9092', ? 'properties.group.id' = 'testGroup', ? 'scan.startup.mode' = 'earliest-offset', ? 'format' = 'json'
)
1.6 kafka流表插入iceberg表
insert into hive_catalog.iceberg_db.iceberg_003 ?select uid,qipu_id,stime from kafka_dwd_rr_fact_kafka_all_interact_collect_anti;?
1.6.1 测试数据的制造方法
创建一个datagen的connector
CREATE TABLE sourceTable ( ?userid int, ?f_random_str STRING ) WITH ( ?'connector' = 'datagen', ?'rows-per-second'='100', ?'fields.userid.kind'='random', ?'fields.userid.min'='1', ?'fields.userid.max'='100', 'fields.f_random_str.length'='10' )
1.7 数据查询
Flink SQL> select * from hive_catalog.iceberg_db.iceberg_003;?
查询结果

2.Api接口代码入湖
未完待续
|