1. 基础知识
1.抽象出来一个动态表,并未进行存储,是Flink支持流数据的table API 和sql的核心概念,随时间变化的,查询动态表会生成一个连续的查询,结果是一个动态表 2.hive进入命令行需要先启动元数据服务,在查数据的时候数据是不变的 3.除非是有界流,否则连续的查询是不会停止的 4.将流转化(定义)成动态表,在动态表上计算一个连续的查询,生成一个新的动态表,最后转换成流,连续查询从不停止,会根据输入表上的更新对结果表进行更新 5.更新查询和追加查询,两者最后转化成的流是不一样的,滚动的事件时间窗口,每个窗口不更新,只追加 6.产生更新更改的查询需要维护更多的状态 7.流:Append-only Retract(撤回) Upsert 8.创建tabkleEnvironment
2.flink流到表的演示(仅仅演示)
构建流,再转换成表
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object Demo2DAtaStreamToTable {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//创建flink-sql的流处理的环境,构建一个流
val table: StreamTableEnvironment = StreamTableEnvironment.create(env)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
//在流上定义表,是一个仅追加的动态表
val wordsTable: Table =table.fromDataStream(
wordsDS, //指定流
Schema
.newBuilder()
.column("f0",DataTypes.STRING()) //指定表结构
.build()
)
flink sql有两种写法,SQL(重点)和DSL(基于表或者视图,但是基本不用)
1.在表上进行连续查询:
val countTable: Table =wordsTable
.groupBy($"f0")
.select($"f0",$"f0".count())
2.或者创建临时视图来写SQL:
table.createTemporaryView("word",wordsTable)
val countTable: Table = table.sqlQuery(
"""
|select f0,count(1) as con
|from
|word
|group by f0
|
|""".stripMargin)
val countDS: DataStream[Row] = countTable.toChangelogStream
countDS.print()
env.execute()
}
}
3. 连接器(要读数据,处理数据,存数据)
3.1 DataGen 随机生成数据的工具
创建flink-sql表环境
val settings: EnvironmentSettings =EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val table: TableEnvironment =TableEnvironment.create(settings)
只能作为source表:
CREATE TABLE datagen (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
)
-- ts AS localtimestamp, : localtimestamp获取当前时间戳,
table.executeSql(
""" 一次只能写一个sql,另开一个代码写
|""".stripMargin)
3.2 print(用于打印连续查询的结果的表)
基于已有的表结构创建print表 print仅用于sink表
-- LIKE: 基于已有的表创建表
CREATE TABLE print_table
WITH ('connector' = 'print')
LIKE datagen (EXCLUDING ALL)
采用插入建表
insert into print_table select * from datagen
进行操作后需要,手动设置字段
CREATE TABLE print_table (
age INT,
num BIGINT
)
WITH ('connector' = 'print')
3.3 BlackHole(用于flink性能测试)
CREATE TABLE blackhole_table
WITH ('connector' = 'blackhole')
LIKE datagen (EXCLUDING ALL)
4. Flink-kafka
kafka和csv的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.15.0</version>
</dependency>
4.1 kafka_source
CREATE TABLE student_kafka (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'student',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.field-delimiter'=',', -- csv格式数据的分隔符
'csv.ignore-parse-errors'='true', -- 如果出现脏数据,默认补null
'csv.allow-comments'='true' --跳过#注释行(并非跳过脏数据行)
)
参数介绍:
scan.startup.mode:
earliest-offset: 读取所有的数据
latest-offset:读取最新的数据,只能读取到任务启动之后生产的数据
group-offsets(默认值): 基于以消费者组读取数据,一个组内只被读取一次,如果消费者组不存在读取最新的数据
timestamp :指定时间戳读取数据
specific-offsets:指定偏移量读取数据
format:
csv: 文本格式,指定字段时需要按照顺序映射,flink sql会自动解析
4.2 kafka_sink
SQL的结果是更新流或者追加流,然后使用flink向kafka中写数据,此时根据流的情况分为两种情况
4.2.1 追加流写入kafka
读取kafka、插入表、存入kafka
kafka sink表创建:(追加流写入kafka)
CREATE TABLE student_kafka_sink (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka',-- 只支持追加的流
'topic' = 'student_nan', //存到新的topic,可以自动生成
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'format' = 'csv',
'csv.field-delimiter'='\t' -- csv格式数据的分隔符
)
--------------------------------------------------------------------------
注意:非聚合类的连续查询返回的动态表是一个追加表,可以直接写入Kafka中,进行了类似count的操作的表返回的是一个更新表,不断有新的数据流出进行分组
//取出性别为男的学生,将数据写入到kafka中
insert into student_kafka_sink
select * from
student_kafka_source
where gender ='男'
4.2.2 更新流写入kafka
创建sink表:
CREATE TABLE gender_num_sink (
gender STRING,
num BIGINT,
PRIMARY KEY (gender) NOT ENFORCED -- 更新的流要加上设置唯一主键
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'gender_num',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'key.format' = 'csv',
'value.format' = 'csv'
)
--------------------------------------------------------------------------
--执行sql
-- 将更新的流写入kafka
-- 已唯一的主键作为kafka中key
-- 已数据作为kafkavalue
insert into gender_num_sink
select gender,count(1) as num
from student_kafka (kafka source表)----
where gender is not null
group by gender
--------------------------------------------------------------------------
--使用控制台查看数据(消费者)
kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic gender_num
--也可以使用Java代码进行消费(能取出key,每次基于最新的key作为基础进行统计)
4.2.3 打jar包到服务器运行
所报错误看看缺失哪个jar包,提交到集群运行需要先将kafka依赖包上传到flink lib目录下
flink-sql-connector-kafka-1.15.0.jar
5.Flink-JDBC
5.1 jdbc-source
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.0</version>
</dependency>
jdbc有界流 字段按照名称和类型进行映射的,flink sql中表的字段和类型必须和数据库中保持一致比如(varchar--String),官网中有数据类型的映射关系
------------------------------------------------------------------------
jdbc source:
CREATE TABLE student_mysql (
id BIGINT,
name STRING,
age BIGINT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata',
'table-name' = 'students',
'username' = 'root',
'password' = '123456'
)
------------------------------------------------------------------------
-- 创建print sink 表
CREATE TABLE print_table
WITH ('connector' = 'print')
LIKE student_mysql (EXCLUDING ALL)
-- 执行sql
insert into print_table
select * from student_mysql
5.2 jdbc-sink(存数据两处需要指定主键)
示例:读取kafka中的数据(消费者),实时统计每个班级学生的人数,将结果保存到mysql中
1.两个地方需要加主键,sink表中需要添加主键,去数据库中建表也需要指定主键,否则会出现数据重复的现象
2.Kafka-source是无界流,得到的结果是更新表,新数据会经过分组后进行统计
----------------------------------------------------
-- flink sql kafka source表 学生表
CREATE TABLE student_kafka (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'student',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.field-delimiter'=',', -- csv格式数据的分隔符
'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
'csv.allow-comments'='true'--跳过#注释行
)
jdbc-sink:
CREATE TABLE clazz_num_mysql (
clazz STRING,
num BIGINT,
PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'clazz_num', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
)
-- 以下查询返回的是一个更新流,flinksql会自动按照主键更新数据
insert into clazz_num_mysql
select clazz,count(1) as num from
student_kafka
where clazz is not null
group by clazz
6. FileSystem
本地文件,hdfs 其它的文件系统
6.1 hdfs_source
流处理模式(连续的结果)和批处理模式(最终的结果)
文件source:
CREATE TABLE student_file (
id STRINg,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'filesystem', -- 必选:指定连接器类型
'path' = 'data/students.txt', -- 必选:指定路径
'format' = 'csv' -- 必选:文件系统连接器指定format
)
-- 读取csv格式字段需要按照顺序映射
----------------------------------------------------
--print sink
CREATE TABLE print_table
(
clazz STRING,
num BIGINT
)
WITH ('connector' = 'print')
----------------------------------------------------
--执行sql
insert into print_table
select clazz,count(1) as num from
student_file
group by clazz
6.2 hdfs_sink
source表:随机生成数
CREATE TABLE datagen (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING,
ts AS localtimestamp
) WITH (
'connector' = 'datagen',
'rows-per-second' = '500', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.gender.length'='1',
'fields.clazz.length'='5',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
)
----------------------------------------------------
-- 创建file sink表
CREATE TABLE file_sink (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING,
`day` STRING,
`hour` STRING
) PARTITIONED BY (`day`,`hour`) WITH (
'connector'='filesystem',
'path'='data/flink_sink',
'format'='csv',
'sink.rolling-policy.file-size' ='100kb'--滚动生成新的文件的大小,默认128M
)
----------------------------------------------------
-- 执行sql
insert into file_sink
select
id,name,age,gender,clazz,
DATE_FORMAT(ts, 'yyyy-MM-dd') as `day`,
DATE_FORMAT(ts, 'HH') as `hour`
from
datagen
7.Flink-Hbase
7.1 hbase回顾
hbase(16010页面)回顾:
列示存储数据库,创建表时不需要指定列但是需要指定列簇,插入数据时再指定列
通过rowKey对海量数据进行毫秒级别的延迟
数据保存到内存和hdfs中
再写入数据时会根据rowKey进行排序,建立索引,提高查询效率
hbase会将一个表拆分为多个region,实现分布式存储
7.2 依赖准备,与官网有差别
1.引入依赖:
idea中maven下载依赖找包:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-1.4</artifactId>
<version>1.15.0</version>
</dependency>
2.加一个插件,将依赖加入jar包中
将flink-hbase-1.0-jar-with-dependencies.jar依赖包上传到flink的lib目录下
3.是我们自己通过mavrn打的以包,官网提供的依赖包有问题
yarn application -list
# 关闭
yarn application -kill application_1658546198162_0009
# 启动
yarn-session.sh -d
4.先启动zookeeper
start-hbase.sh
7.3 Hbase_sink
既可以用于写数据,也可以用于读数据,flink读取habse也是一个有界流
-- source表
CREATE TABLE student_kafka (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'student',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.field-delimiter'=',', -- csv格式数据的分隔符
'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
'csv.allow-comments'='true'--跳过#注释行
)
-----------------------------------------------------------------------
-- sink 表
CREATE TABLE student_hbase (
id STRING,
info ROW<name STRING,age INT,gender STRING,clazz STRING>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'student',
'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181'
);
------------------------------------------------------------------------
-- 需要先再hbase中创建表 hbase shell
create 'student','info'
------------------------------------------------------------------------
-- 将数据写入habse
insert into student_hbase
select id,ROW(name,age,gender,clazz) as info from student_kafka;
8. 格式
8.1 读取json格式
增加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.15.0</version>
</dependency>
-- source 表
CREATE TABLE student_file_json (
id STRINg,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'filesystem', -- 必选:指定连接器类型
'path' = 'data/students.json', -- 必选:指定路径
'format' = 'json' , -- 必选:文件系统连接器指定 format
'json.ignore-parse-errors' = 'true' --跳过脏数据
)
----------------------------------------------------
-- sink 表
CREATE TABLE print_table
WITH ('connector' = 'print')
LIKE student_file_json (EXCLUDING ALL)
----------------------------------------------------
--执行sql
insert into print_table
select * from student_file_json
8.2 保存为json格式数据
-- source 表
CREATE TABLE student_file_json (
id STRINg,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'filesystem', -- 必选:指定连接器类型
'path' = 'data/students.json', -- 必选:指定路径
'format' = 'json' , -- 必选:文件系统连接器指定 format
'json.ignore-parse-errors' = 'true'
)
----------------------------------------------------
-- kafka sink
CREATE TABLE student_kafka_sink (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka',-- 只支持追加的流
'topic' = 'student_flink_json',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'format' = 'json'
)
----------------------------------------------------
-- 执行sql
insert into student_kafka_sink
select * from student_file_json
|