本地集群flinksql客户端
介绍
这里和sparksql、hivesql一样,都可以创建表,执行sql语句,这里尝试创建一张关联了Kafka的表,从官网直接拿模板 可以看到这边正在运行,并且已经出来了数据 由于这是一个动态表,在原表直接修改数据,这里经过了SQL变换的数据也会发生相应的改变
写入到print表
CREATE TABLE print_table (
clazz STRING,
c BIGINT
) WITH (
'connector' = 'print'
)
insert into print_table
select clazz,count(1) as c from student group by clazz
在task manager中都可以看到任务运行情况
写入到MySQL表
CREATE TABLE students (
clazz STRING,
c BIGINT,
PRIMARY KEY (clazz) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.5.201:3306/student?useUnicode=true&characterEncoding=utf-8',
'table-name' = 'student1',
'username'='root',
'password'='123456'
)
insert into students
select clazz,count(1) as c from student group by clazz
这里也可以设置跳过null值
这些都只是测试用的,这里都没法加checkpoint,只是测试用一下
问题
但是这里都有一些致命的问题,就是在我们每一次重新打开flinksql客户端的时候,这里所有的表都会消失,因为我们每一次打开,数据表都是存在内存中的(默认是保存在当前会话),没有地方存储它的元数据,导致我们的flink找不到数据的位置,这里我们就尝试着使用flink整合hive,将元数据放在hive中,本质上还是存储在MySQL,但是flink不能整合MySQL,所以要这样过度一下。
整合
三种存储位置
1、默认是当前会话 2、存储在关系型数据库,但是当前只支持postgres数据库 3、hive
将元数据放到hive,整合hive
我们其实完全可以直接查看官网找到所需的一切东西: https://nightlies.apache.org/flink/flink-docs-release-1.11/zh/dev/table/hive/index.html
里面有我们所需要的各种jar包的依赖,maven的依赖,以及基础的语法如何写,这里我们直接拿来:
1、将整合需要的jar上传到flink的lib目录
flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar
hive-exec-1.2.1.jar
hive-metastore-1.2.1.jar
上传jar 之后需要重新启动yarn-session.sh
yarn application -kill appid
yarn-session.sh -jm 1024m -tm 1096
2、启动hive元数据服务
nohup hive --service metastore >> metastore.log 2>&1 &
3、如果在sql-client中使用hive的catalog
修改sql-client-defaults.yaml
catalogs:
- name: myhive
type: hive
hive-conf-dir: /usr/local/soft/hive-1.2.1/conf
default-database: default
4、在sql-client中使用hive的catalog
sql-client.sh embedded
这里要首先切换元数据的位置
show catalogs;
USE CATALOG myhive;
use databases;
show tables;
就可以直接通过flink操作hive了
在flink中创建的表在hive中就可以查看,不能查询数据
hive中的表在flink中可以查询
在这里就可以直接通过命令行来通过flinksql操作hive了,这些也可以在idea中操作,首先也是先导入依赖
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.11.2</version>
</dependency>
官网这里也有直接连接到hive的模板
package com.shujia.SQL
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog
object Demo7FlinkOnHive {
def main(args: Array[String]): Unit = {
//如果主键为null,自动删除
val configuration: Configuration = new Configuration()
configuration.setString("table.exec.sink.not-null-enforcer","drop")
configuration.setString("table.dynamic-table-options.enabled", "true")
//flinksql的环境
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner() //使用blink的计划器
.inStreamingMode() //使用流处理模型
.build()
//创建table的环境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
bsTableEnv.getConfig.addConfiguration(configuration)
/**
* 注册hive的元数据
* 可以直接读取hive中的表
* 这没法在本地运行,因为找不到本地的配置文件,除非把配置文件放到idea,或者上传到集群运行
*/
val name = "myhive"
val defaultDatabase = "flink"
val hiveConfDir = "/usr/local/soft/hive-1.2.1/conf"
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
//注册catalog
bsTableEnv.registerCatalog("myhive", hive)
// 切换catalog
bsTableEnv.useCatalog("myhive")
/**
* 编写sql,使用hive中的表
*/
bsTableEnv.executeSql(
"""
|insert into mysql_clazz_num
|select clazz,count(1) from student group by clazz;
""".stripMargin)
}
}
这里的代码没法在本地运行,因为本地没法使用hive的配置文件,除非将配置文件放到本地,或者可以提交到集群上运行
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
|