IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink table store 配置 hive catalog matestore -> 正文阅读

[大数据]Flink table store 配置 hive catalog matestore

目录

前言

本次食材

step1?准备环境:

step2?体验Flink table store?hive?catalog:

step3?Flink?mysql CDC?到?hiveFts

step4?每次启动加载?catalog :???????

前言???????

? ? ? ? Flink Table Store介绍:Flink 也搞存储了,Flink Table Store 是一个统一的存储,用于在 Flink 中为流式处理和批处理构建动态表,支持高速数据摄取和及时的数据查询。表格存储提供以下核心能力:

  • 支持大型数据集的存储,并允许在批处理和流模式下进行读/写。
  • 支持将延迟降至毫秒的流式查询。
  • 支持 Batch/OLAP 查询,将延迟降至秒级。
  • 默认支持增量快照流消费。所以用户不需要自己去组合不同的管道。

? ? ? ? 小白此次使用flink 1.14,个人尝试只支持java 11的flink 1.15 好难搭配,于是换回了支持java 1.8的flink 1.14。因为Flink Table Store Hive Connector 目前只支持到hive2.3,尝试重新编译hive 3.1.2失败,于是hive降到2.2.0。附:Flink Table Store 传送门

? ? ? ?文章存在不足的地方敬请指正。

本次食材:

Flink:1.14.5

Hive:2.2.0

java:1.8

flink-table-store-dist-0.2.0_1.14.jar? ? ? ? ? ? ? ? ? ??>>>

flink-table-store-hive-catalog-0.2.0_2.2.jar? ? ? ??>>>

flink-table-store-hive-connector-0.2.0_2.2.jar? ?>>>

温馨提示:flink-table-store-dist-0.2.0的1.14版本flink官网没有提供,只提供了编译教程>>>。但是我在华为镜像找到了,以上三个插件的目前已支持的其他版本均可在华为镜像找到。

请移步??华为镜像传送门>>>

Hive 2.1 CDH 6.3?官网重新编译教程>>>

step1?准备环境:

? ? ? ? 1、配置好Flink?集群环境。

? ? ? ? 2、启动hadoop、hive。

? ? ? ? 3、然后就复制以下两个插件到?${FLINK_HOME}/lib?下面 。

cp flink-table-store-dist-0.2.0_1.14.jar?${FLINK_HOME}/lib

cp flink-table-store-hive-catalog-0.2.0_2.2.jar ${FLINK_HOME}/lib

? ? ? ?(?千万不要把?flink-table-store-hive-connector-0.2.0_2.2.jar?放到 ${FLINK_HOME}/lib )

????????4、然后分发到其他机器上 。

step2?体验Flink table store?hive?catalog:

? ? ? ? 1、启动 FLink?集群模式?

 cd ${FLINK_HOME};./bin/start-cluster.sh 

? ? ? ? 2、启动?Flink-sql客户端

 ./bin/sql-client.sh

? ? ? ? 3、创建 Flink Table Store 的 Hive catalog ,(?uri和warehouse根据自己的来填,不要抄作业!!!,检查hive的hive-site.yaml是否已经配置有hive.metastore.uris。没有配置的配置重启hive)

<property>  
  <name>hive.metastore.uris</name>
  <value>thrift://h01:9083</value>  
</property>
-- 开启 Flink sql 客户端
-- 定义一个 table store catalog
-- 说明 catalog 类型为 table-store
-- 使用的 hive 作为 metastore
-- uri 为 hive metastore 的连接 
-- warehouse 定义 table store 存储路径 支持hdfs、本地文件系统

-- Fts catalog_hive
CREATE CATALOG hiveFts_catalog WITH (
  'type'='table-store',
  'metastore' = 'hive',
  'uri' = 'thrift://h01:9083',
  'warehouse'='hdfs://h01:9000/user/hiveFts'
);
-- !!! uri和warehouse根据自己的来填!!!

? ? ? ? 4、创建一个表、查询、插入

-- 使用该 fts hive catalog
show catalogs;
/*
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
| hiveFts_catalog |
+-----------------+
2 rows in set
*/


USE CATALOG hiveFts_catalog ;

create database if not exists testdb;

show databases;
/*
+---------------+
| database name |
+---------------+
|       default |
|        testdb |
+---------------+
2 rows in set
*/

-- 在test库建一个表
CREATE TABLE if not exists testdb.fts_test_table (
  a int,
  b string
);

show tables;


-- 插入test表
INSERT INTO testdb.fts_test_table VALUES (1, 'Table'), (2, 'Store');

-- 查表
SELECT * FROM testdb.fts_test_table;

? ? ? ?可在flink?web看到我们的两个程序

????????5、在 hive sql客户端查询 Flink Table Store Hive 表

? ? ? ? 前提工作:先在 ${HIVE_HOME}?目录下建一个auxlib文件夹,然后将?flink-table-store-hive-connector-0.2.0_2.2.jar?放进 auxlib 文件夹。

? ? ? ? 启动?hive?sql?客户端

cd ${HIVE_HOME};./bin/hive
-- 查 fts 新建的db
show databases;

-- 使用 testdb
use testdb;

-- 查 testdb 的表
show tables;

-- 在 hive sql 客户端查我们在 flink 那边建的表
select * from testdb.fts_test_table ;

/*
OK
1       Table
2       Store
Time taken: 0.192 seconds, Fetched: 2 row(s)
*/

step3?Flink?mysql CDC?到?hiveFts

? ? ? ? 1、flink已配置好mysql?cdc?教程请看>>>

? ? ? ? 2、创建 mysql?表;

CREATE TABLE test.mysql_cdc_hiveFts_test (
  a int  PRIMARY KEY not null,
  b varchar(50) 
);

? ? ? ? 3、在flink?sql?client?中 创建?hiveFts 表;

use catalog hiveFts_catalog;
use testdb;
CREATE TABLE if not exists testdb.mysql_cdc_hiveFts_test (
  a int  PRIMARY KEY,
  b string 
);

? ? ? ? 4、在?default_catalog 的 default_database?中创建我们的cdc虚拟表 ,with语句的配置不要直接抄作业哈,要提前已配置好mysql cdc,没有的请移步>>>???????。

CREATE TABLE if not exists default_catalog.default_database.mysql_cdc_hiveFts_test (
  a int ,
  b string ,
  PRIMARY KEY(a) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'h01',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test',
    'table-name' = 'mysql_cdc_hiveFts_test',
	'scan.startup.mode'='initial',
    'debezium.snapshot.mode' = 'initial'
);


-- 可以使用select 语句检查一下 cdc 是否成功
select * from default_catalog.default_database.mysql_cdc_hiveFts_test;

? ? ? ? 5、执行同步语句,并查询hiveFts表

SET 'execution.checkpointing.interval' = '3s';
-- 设置 3s 一个checkpoint,不设置mysql cdc 不会同步数据

-- cdc临时表中查取,然后插入hiveFts表
INSERT INTO hiveFts_catalog.testdb.mysql_cdc_hiveFts_test SELECT * FROM default_catalog.default_database.mysql_cdc_hiveFts_test;

-- 进行流式实时查询
select * from hiveFts_catalog.testdb.mysql_cdc_hiveFts_test;


? ? ? ? 对mysql表进行?插入、删除、更新操作,再在flink进行实时查询,以及在hive提交查询。

insert into test.mysql_cdc_hiveFts_test values (1,'huawei'),(2,'uiio');

insert into test.mysql_cdc_hiveFts_test select 3,'apple';
 
update from test.mysql_cdc_hiveFts_test set b='xiaomi' where a=2;

delete from  test.mysql_cdc_hiveFts_test where a=3;

? ? ? ??

step4?每次启动加载?catalog :

? ? ? ? 让 flink sql?客户端每一次启动都默认加载这个?catalog ,根据官方的 -i?参数教程建议,建一个sql-cli-init.sql文件作为sql客户端的初始化文件。把注册Fts hive?catalog的语句写进去,当然还可以包括其他的初始化设置,内容如下 ,(?uri和warehouse根据自己的来填,不要抄作业!!!):

cd ${FLINK_HOME};vim ./bin/sql-cli-init.sql
-- Fts catalog_hive
CREATE CATALOG hiveFts_catalog WITH (
  'type'='table-store',
  'metastore' = 'hive',
  'uri' = 'thrift://h01:9083',
  'warehouse'='hdfs://h01:9000/user/hiveFts'
);

-- !!! uri和warehouse根据自己的来填!!!

重新编写一个sql-cli启动脚本

vim ./bin/sql-client-server.sh

内容如下(自己更改自己的内容):

echo '=================== sql-cil-init info ========================'
initSqlFp='/opt/module/flink/bin/sql-cli-init.sql'
echo $initSqlFp

# jars='/opt/module/flink/lib/flink-table-store-hive-catalog-0.2.0_2.2.jar'
# echo $jars
# addjars="--jar ${jars}" #有需要添加其他jar的可以取消注释

com="/opt/module/flink/bin/sql-client.sh -i ${initSqlFp} " ${addjars}
echo $com
echo '============================================================='
sleep 5s
$com

直接启动./bin/sql-client-server.sh

sh ./bin/sql-client-server.sh
show catalogs;
/*
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
| hiveFts_catalog |
+-----------------+
2 rows in set
*/

至此,持久化 Fts?matestore 到 hive 完成了。

到此完结!

码字不容易,转载请说明出处!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-30 00:59:31  更:2022-09-30 01:00:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年5日历 -2025/5/11 2:02:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码