IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink-SQL -> 正文阅读

[大数据]Flink-SQL

1. 基础知识

1.抽象出来一个动态表,并未进行存储,是Flink支持流数据的table API 和sql的核心概念,随时间变化的,查询动态表会生成一个连续的查询,结果是一个动态表
2.hive进入命令行需要先启动元数据服务,在查数据的时候数据是不变的
3.除非是有界流,否则连续的查询是不会停止的
4.将流转化(定义)成动态表,在动态表上计算一个连续的查询,生成一个新的动态表,最后转换成流,连续查询从不停止,会根据输入表上的更新对结果表进行更新
5.更新查询和追加查询,两者最后转化成的流是不一样的,滚动的事件时间窗口,每个窗口不更新,只追加
6.产生更新更改的查询需要维护更多的状态
7.流:Append-only Retract(撤回) Upsert
8.创建tabkleEnvironment

2.flink流到表的演示(仅仅演示)

构建流,再转换成表

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object Demo2DAtaStreamToTable {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  //创建flink-sql的流处理的环境,构建一个流
 val table: StreamTableEnvironment = StreamTableEnvironment.create(env)
 val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
 val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))
 //在流上定义表,是一个仅追加的动态表
 val wordsTable: Table =table.fromDataStream(
      wordsDS,     //指定流
      Schema
        .newBuilder()
        .column("f0",DataTypes.STRING())   //指定表结构
        .build()
    )

flink sql有两种写法,SQL(重点)和DSL(基于表或者视图,但是基本不用)

1.在表上进行连续查询:
val countTable: Table =wordsTable
     .groupBy($"f0")
    .select($"f0",$"f0".count())
2.或者创建临时视图来写SQL:
table.createTemporaryView("word",wordsTable)
    val countTable: Table = table.sqlQuery(
      """
        |select f0,count(1) as con
        |from
        |word
        |group by f0
        |
        |""".stripMargin)
    val countDS: DataStream[Row] = countTable.toChangelogStream
    countDS.print()
    env.execute()
  }
}

3. 连接器(要读数据,处理数据,存数据)

3.1 DataGen 随机生成数据的工具

创建flink-sql表环境
val settings: EnvironmentSettings =EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val table: TableEnvironment =TableEnvironment.create(settings)
只能作为source表:
CREATE TABLE datagen (
 id STRING,
 name STRING,
 age INT
) WITH (
 'connector' = 'datagen',
 'rows-per-second' = '5', -- 每秒生成的数据行数据
 'fields.id.length' = '5', --字段长度限制
 'fields.name.length'='3',
 'fields.age.min' ='1', -- 最小值
 'fields.age.max'='100' -- 最大值
)

-- ts AS localtimestamp, : localtimestamp获取当前时间戳,
 table.executeSql(
      """                          一次只能写一个sql,另开一个代码写
        |""".stripMargin)

3.2 print(用于打印连续查询的结果的表)

基于已有的表结构创建print表       print仅用于sink表
-- LIKE: 基于已有的表创建表
CREATE TABLE print_table 
WITH ('connector' = 'print')
LIKE datagen (EXCLUDING ALL)
采用插入建表
insert into print_table select * from datagen
进行操作后需要,手动设置字段
CREATE TABLE print_table (
 age INT,
 num BIGINT
)
WITH ('connector' = 'print')

3.3 BlackHole(用于flink性能测试)

CREATE TABLE blackhole_table 
WITH ('connector' = 'blackhole')
LIKE datagen (EXCLUDING ALL)

4. Flink-kafka

kafka和csv的依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>1.15.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-csv</artifactId>
  <version>1.15.0</version>
</dependency>

4.1 kafka_source

CREATE TABLE student_kafka (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'student',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv',
  'csv.field-delimiter'=',', -- csv格式数据的分隔符
  'csv.ignore-parse-errors'='true',  -- 如果出现脏数据,默认补null
  'csv.allow-comments'='true'  --跳过#注释行(并非跳过脏数据行)
)

参数介绍:

scan.startup.mode:
	earliest-offset: 读取所有的数据
	latest-offset:读取最新的数据,只能读取到任务启动之后生产的数据
	group-offsets(默认值): 基于以消费者组读取数据,一个组内只被读取一次,如果消费者组不存在读取最新的数据
	timestamp :指定时间戳读取数据
	specific-offsets:指定偏移量读取数据
	
format:
	csv: 文本格式,指定字段时需要按照顺序映射,flink sql会自动解析

4.2 kafka_sink

SQL的结果是更新流或者追加流,然后使用flink向kafka中写数据,此时根据流的情况分为两种情况

4.2.1 追加流写入kafka

读取kafka、插入表、存入kafka
kafka sink表创建:(追加流写入kafka)
CREATE TABLE student_kafka_sink (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',-- 只支持追加的流
  'topic' = 'student_nan',      //存到新的topic,可以自动生成
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'format' = 'csv',
  'csv.field-delimiter'='\t' -- csv格式数据的分隔符
)

--------------------------------------------------------------------------
注意:非聚合类的连续查询返回的动态表是一个追加表,可以直接写入Kafka中,进行了类似count的操作的表返回的是一个更新表,不断有新的数据流出进行分组

//取出性别为男的学生,将数据写入到kafka中
insert into student_kafka_sink
select * from
student_kafka_source
where gender ='男'

4.2.2 更新流写入kafka

创建sink表:
CREATE TABLE gender_num_sink (
    gender STRING,
	num BIGINT,
    PRIMARY KEY (gender) NOT ENFORCED   -- 更新的流要加上设置唯一主键
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'gender_num',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'key.format' = 'csv',
  'value.format' = 'csv'
)
--------------------------------------------------------------------------
--执行sql
-- 将更新的流写入kafka
-- 已唯一的主键作为kafka中key
-- 已数据作为kafkavalue
insert into gender_num_sink
select gender,count(1) as num
from student_kafka     (kafka  source表)----
where gender is not null
group by gender
--------------------------------------------------------------------------
--使用控制台查看数据(消费者)
kafka-console-consumer.sh --bootstrap-server  master:9092,node2:9092,node2:9092 --from-beginning --topic gender_num

--也可以使用Java代码进行消费(能取出key,每次基于最新的key作为基础进行统计)

4.2.3 打jar包到服务器运行

所报错误看看缺失哪个jar包,提交到集群运行需要先将kafka依赖包上传到flink  lib目录下
flink-sql-connector-kafka-1.15.0.jar

5.Flink-JDBC

5.1 jdbc-source

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>1.15.0</version>
</dependency>
jdbc有界流     字段按照名称和类型进行映射的,flink sql中表的字段和类型必须和数据库中保持一致比如(varchar--String),官网中有数据类型的映射关系
------------------------------------------------------------------------
jdbc source:
CREATE TABLE student_mysql (
  id BIGINT,
  name STRING,
  age BIGINT,
  gender STRING,
  clazz STRING
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/bigdata',
   'table-name' = 'students',
   'username' = 'root',
   'password' = '123456'
)
------------------------------------------------------------------------
-- 创建print sink 表
CREATE TABLE print_table 
WITH ('connector' = 'print')
LIKE student_mysql (EXCLUDING ALL)

-- 执行sql
insert into print_table
select * from student_mysql

5.2 jdbc-sink(存数据两处需要指定主键)

示例:读取kafka中的数据(消费者),实时统计每个班级学生的人数,将结果保存到mysql中
1.两个地方需要加主键,sink表中需要添加主键,去数据库中建表也需要指定主键,否则会出现数据重复的现象
2.Kafka-source是无界流,得到的结果是更新表,新数据会经过分组后进行统计
----------------------------------------------------
-- flink sql kafka source表  学生表
CREATE TABLE student_kafka (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'student',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv',
  'csv.field-delimiter'=',', -- csv格式数据的分隔符
  'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
  'csv.allow-comments'='true'--跳过#注释行
)

jdbc-sink:  
CREATE TABLE clazz_num_mysql (
  clazz STRING,
  num BIGINT,
  PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'clazz_num',  -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
)

-- 以下查询返回的是一个更新流,flinksql会自动按照主键更新数据
insert into clazz_num_mysql
select clazz,count(1) as num from 
student_kafka
where clazz is not null
group by clazz

6. FileSystem

本地文件,hdfs 其它的文件系统

6.1 hdfs_source

流处理模式(连续的结果)和批处理模式(最终的结果)
文件source:
CREATE TABLE student_file (
    id STRINg,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
)  WITH (
  'connector' = 'filesystem',   -- 必选:指定连接器类型
  'path' = 'data/students.txt',  -- 必选:指定路径
  'format' = 'csv'   -- 必选:文件系统连接器指定format
)
-- 读取csv格式字段需要按照顺序映射
----------------------------------------------------
--print sink 
CREATE TABLE print_table 
(
    clazz STRING,
    num BIGINT
)
WITH ('connector' = 'print')
----------------------------------------------------
--执行sql
insert into print_table
select clazz,count(1) as num from
student_file
group by clazz

6.2 hdfs_sink

source表:随机生成数
CREATE TABLE datagen (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING,
    ts AS localtimestamp
) WITH (
 'connector' = 'datagen',
 'rows-per-second' = '500', -- 每秒生成的数据行数据
 'fields.id.length' = '5', --字段长度限制
 'fields.name.length'='3',
 'fields.gender.length'='1',   
 'fields.clazz.length'='5',
 'fields.age.min' ='1', -- 最小值
 'fields.age.max'='100' -- 最大值
)
----------------------------------------------------
-- 创建file sink表
CREATE TABLE file_sink (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING,
    `day` STRING,
    `hour` STRING
) PARTITIONED BY (`day`,`hour`) WITH (
  'connector'='filesystem',
  'path'='data/flink_sink',
  'format'='csv',
  'sink.rolling-policy.file-size' ='100kb'--滚动生成新的文件的大小,默认128M
)
----------------------------------------------------
-- 执行sql
insert into file_sink
select 
id,name,age,gender,clazz,
DATE_FORMAT(ts, 'yyyy-MM-dd') as `day`,
DATE_FORMAT(ts, 'HH')  as `hour`
from 
datagen

7.Flink-Hbase

7.1 hbase回顾

hbase(16010页面)回顾:
列示存储数据库,创建表时不需要指定列但是需要指定列簇,插入数据时再指定列
通过rowKey对海量数据进行毫秒级别的延迟
数据保存到内存和hdfs中
再写入数据时会根据rowKey进行排序,建立索引,提高查询效率
hbase会将一个表拆分为多个region,实现分布式存储

7.2 依赖准备,与官网有差别

1.引入依赖:
idea中maven下载依赖找包:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hbase-1.4</artifactId>
    <version>1.15.0</version>
</dependency>
2.加一个插件,将依赖加入jar包中
将flink-hbase-1.0-jar-with-dependencies.jar依赖包上传到flink的lib目录下
3.是我们自己通过mavrn打的以包,官网提供的依赖包有问题
yarn application -list
# 关闭
yarn application -kill application_1658546198162_0009
# 启动
yarn-session.sh -d
4.先启动zookeeper
  start-hbase.sh

7.3 Hbase_sink

既可以用于写数据,也可以用于读数据,flink读取habse也是一个有界流

-- source表
CREATE TABLE student_kafka (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'student',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv',
  'csv.field-delimiter'=',', -- csv格式数据的分隔符
  'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
  'csv.allow-comments'='true'--跳过#注释行
)
-----------------------------------------------------------------------
-- sink 表
CREATE TABLE student_hbase (
 id STRING,
 info ROW<name STRING,age INT,gender STRING,clazz STRING>,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'student',
 'zookeeper.quorum' = 'master:2181,node1:2181,node2:2181'
);
------------------------------------------------------------------------
-- 需要先再hbase中创建表    hbase shell
create  'student','info'
------------------------------------------------------------------------
-- 将数据写入habse
insert into student_hbase
select id,ROW(name,age,gender,clazz) as info from student_kafka;

8. 格式

8.1 读取json格式

增加依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.15.0</version>
</dependency>
-- source 表 
CREATE TABLE student_file_json (
    id STRINg,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
)  WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'data/students.json',  -- 必选:指定路径
  'format' = 'json' ,                    -- 必选:文件系统连接器指定 format
  'json.ignore-parse-errors' = 'true' --跳过脏数据
)
----------------------------------------------------
-- sink 表
CREATE TABLE print_table 
WITH ('connector' = 'print')
LIKE student_file_json (EXCLUDING ALL)
----------------------------------------------------
--执行sql
insert into print_table
select * from student_file_json

8.2 保存为json格式数据

-- source 表 
CREATE TABLE student_file_json (
    id STRINg,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
)  WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'data/students.json',  -- 必选:指定路径
  'format' = 'json' ,                    -- 必选:文件系统连接器指定 format
  'json.ignore-parse-errors' = 'true'
)
----------------------------------------------------
-- kafka sink 
CREATE TABLE student_kafka_sink (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',-- 只支持追加的流
  'topic' = 'student_flink_json',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'format' = 'json'
)
----------------------------------------------------
-- 执行sql
insert into student_kafka_sink
select * from student_file_json
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-08-06 10:50:33  更:2022-08-06 10:52:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 23:39:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码