IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 一文带你入门flink sql -> 正文阅读

[大数据]一文带你入门flink sql

一文带你入门flink sql

写在前面

本次实战主要是通过Flink SQL Client消费kafka的实时消息,再用各种SQL操作对数据进行查询统计。

环境准备

具体的环境安装过程就不在这里写了,网上很多资料,大家自己查阅按照就好了。我说下我本地的环境:

  • flink 1.12.4
  • mysql 8.0.25
  • kafka 2.8.0

另外就是,本次示例需要用到以下几个jar包:

flink-sql-connector-kafka_2.11-1.12.4.jar
flink-connector-jdbc_2.11-1.12.4.jar
mysql-connector-java-5.1.48.jar

把他们拷贝到flink安装目录lib目录下。

flink输出的结果,会落到一张mysql的表,也就是我们的sink表,这个表要提前建好。

CREATE TABLE `pvuv_sink` (
  `dt` varchar(100) DEFAULT NULL,
  `pv` bigint DEFAULT NULL,
  `uv` bigint DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3

三个字段分别表示时间,pv值和uv值。

正文

先启动flink以及flink sql的客户端。

$ ./bin/start-cluster.sh
$ .bin/sql-client.sh embedded

这样就开启了一个sql client的客户端。

接着在客户端执行下面这段sql,这相当于启动了一个source table进行监听我们的输入数据流。

CREATE TABLE user_log (
     user_id VARCHAR,
     item_id VARCHAR,
     category_id VARCHAR,
     behavior VARCHAR,
     ts TIMESTAMP(3)
 ) WITH (
     'connector.type' = 'kafka', -- 使用 kafka connector
     'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
     'connector.topic' = 'user',  -- kafka topic
     'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
     'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
     'connector.properties.0.value' = 'localhost:2181',
     'connector.properties.1.key' = 'bootstrap.servers',
     'connector.properties.1.value' = 'localhost:9092',
     'update-mode' = 'append',
     'format.type' = 'json',  -- 数据源格式为 json
     'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
 );

执行成功的话,会返回:

[INFO] Table has been created

解释下这段sql,flink会帮我们创建一张表,这个表的数据来源于kafka的消息,对应的topic是user,数据的格式是json。其它的信息都好理解,不做过多解释了。执行成功后,就开启监听了。

我们可以select下,看看表的情况:

在这里插入图片描述

因为还没有输入数据,所以表是空的。

然后执行sink sql,也就是输出数据的表,这个表前面我们提前建好了,在flink sql这里配置下:

CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
    'connector.table' = 'pvuv_sink',
    'connector.username' = 'root',
    'connector.password' = '11111111',
    'connector.write.flush.max-rows' = '1'
);

然后编写计算逻辑,逻辑比较简单,统计每个小时的pv和uv。

INSERT INTO pvuv_sink(dt, pv, uv)
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

执行后,flink就会启动一个job在后台执行。
在这里插入图片描述

我们可以通过

http://localhost:8081/#/overview

这个地址看到任务的详细情况。

在这里插入图片描述

然后我们在本地启动一个kafka的服务,然后再启动一个producer模拟发送数据。

kafka是基于zookeeper的,启动kafka之前,需要先启动zookeeper

/usr/local/Cellar/kafka/2.8.0/bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

启动kafka

/usr/local/Cellar/kafka/2.8.0/bin/kafka-server-start /usr/local/etc/kafka/server.properties &

查看启动是否成功

创建topic,注意和上面source table的配置保持一致。

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user

启动一个控制台的生产者,

kafka-console-producer --broker-list localhost:9092 --topic user

发送两条消息试试:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

去mysql看下pvuv_sink表,发现已经有数据了。

在这里插入图片描述

遇到的一些问题

在运行flink sql的时候踩过一些坑,这里列举下帮大家避坑。

错误一

java.lang.NoSuchMethodError: 'boolean org.apache.flink.table.api.TableColumn.isGenerated()'

这个是因为flink-jdbc的版本搞错了导致的。

错误二

Flink SQL> INSERT INTO pvuv_sink(dt, pv, uv)
> SELECT
>   DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
>   COUNT(*) AS pv,
>   COUNT(DISTINCT user_id) AS uv
> FROM user_log
> GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

这个是因为我一开始用错了lib,应该是

flink-sql-connector-kafka_2.11-1.12.4.jar

而不是

flink-connector-kafka_2.12-1.12.4.jar

错误三

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode (org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode and org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode are in unnamed module of loader 'app')

参考

  • https://blog.csdn.net/boling_cavalry/article/details/106038219
  • https://issues.apache.org/jira/browse/FLINK-19995
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-01 14:34:51  更:2021-08-01 14:37:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/2 23:59:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码