适用于flink1.13+版本
一、准备lib与环境变量
1、把flink-connector-hive_2.11-1.14.4.jar、flink-connector-kafka_2.11-1.14.4.jar、hive-exec-2.3.4.jar、antlr-runtime-3.5.3.jar下载到flink的lib目录 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/overview/#dependencies
2、把hadoop依赖包下载到flink的lib目录(HADOOP_CLASSPATH环境变量指定的依赖包,不会自动包含到flink on yarn实例里) https://flink.apache.org/downloads.html#additional-components
3、提供yarn连接信息,yarn-site.xml文件里要准备好resource manager地址
export HADOOP_CONF_DIR=/apps/svr/hadoop-2.9.2/etc/hadoop
二、启动一个flink on yarn实例
用于为sql提供实际执行环境
cd /apps/svr/flink-1.14.4/bin
./yarn-session.sh -jm 2048 -tm 3072 -s 1 -nm flink -d
执行yarn-session.sh后,会在本地残余一个/tmp/.yarn-properties-<username>文件,如果立即继续启动sql-client,则已经可以正常连接到刚刚启动的flink实例。
三、启动sql-client
(一)提供flink on yarn实例的地址
修改flink目录下的conf/flink-conf.yaml文件,指定已有的flink实例:
execution.target: yarn-session
yarn.application.id: application_XXXXXXXXX_0001
sql-client在需要执行sql时,即可连接到yarn上的flink实例来提交job
(二)连接到hive metastore
新版sql-client不再支持使用默认配置文件,只能使用初始化命令文件:
CREATE CATALOG myhive
WITH (
'type' = 'hive',
'hive-conf-dir' = '/apps/svr/apache-hive-2.3.9-bin/conf'
);
USE CATALOG myhive;
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'sql-client.execution.max-table-result.rows' = '10000';
SET 'table.exec.state.ttl' = '1000';
启动sql-client:
cd /apps/svr/flink-1.14.4/bin
./sql-client.sh -i sql-cli-init.sql
上述初始化执行环境的sql命令,既可以放在文件里通过-i启动参数执行,也可在启动sql-client后再手动执行
四、执行SQL
flink专用的数据表字段定义信息,放在metastore的table_params表里; hive原生的数据表字段定义信息,放在metastore的columns_v2表里
(一)操作flink专用的数据表
CREATE TABLE mykafka (
`partition` INT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`timestamp` TIMESTAMP_LTZ(3) METADATA VIRTUAL,
`name` STRING,
`age` INT
) WITH (
'connector' = 'kafka',
'topic' = 'wzp_test_flink',
'properties.bootstrap.servers' = '10.10.1.1:9092',
'properties.group.id' = 'wzp',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
select * from mykafka;
metastore的table_params表记录了mykafka的字段定义:
(二)操作hive原生的数据表
SET 'execution.runtime-mode' = 'batch';
load module hive;
use modules hive,core;
set table.sql-dialect=hive;
create table tbl_hive (key int,value string);
insert overwrite table tbl_hive (`key`,`value`) values (5,'e'),(1,'a'),(1,'a'),(3,'c'),(2,'b'),(3,'c'),(3,'c'),(4,'d');
select * from tbl_hive cluster by key;
在beeline里也可以查询到tbl_hive表里已存在这些数据
|