IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink1.14学习测试:接收kafka消息将结构化数据通过JDBC保存到数据库中 -> 正文阅读

[大数据]Flink1.14学习测试:接收kafka消息将结构化数据通过JDBC保存到数据库中

Flink1.14学习测试:接收kafka消息将结构化数据通过JDBC保存到数据库中


准备事项

关键依赖的版本

  • Flink : 1.14.4
  • Scala:2.12.10
  • Java : 1.8

参考资料

??Flink相关依赖查看官方文档后依赖即可。

驱动包(测试用)

??Mysql驱动依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
    <scope>runtime</scope>
</dependency>

??Postgresql驱动依赖

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.4.0</version>
    <scope>runtime</scope>
</dependency>

其它(非必要,纯属个人习惯)

??Hutool工具类库

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.3</version>
</dependency>

??简化 Java POJO 对象的工具

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.24</version>
    <scope>provided</scope>
</dependency>

一、接收Kafka消息内容(Java)

??在1.14中FlinkKafkaConsumer类已经过时,此处使用KafkaSource用于就收Kafka消息流。

测试接收消息内容并解析打印

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

public class KafkaReceiveTest {
    //测试类(消息内容)
    @Data
    private static class Person {
        private String name;
        private int age;
        private char gender;
    }
    @SneakyThrows
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
        KafkaSource<Person> source = KafkaSource.<Person>builder()
                .setBootstrapServers("192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092")//Kafka服务
                .setTopics("ly_test")//消息主题
                .setGroupId("KafkaReceiveTest")//消费组
                //偏移量 当没有提交偏移量则从最开始开始消费
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                //自定义解析消息内容
                .setValueOnlyDeserializer(new AbstractDeserializationSchema<Person>() {
                    @Override
                    public Person deserialize(byte[] message) {
                        return JSONUtil.toBean(StrUtil.str(message, StandardCharsets.UTF_8), Person.class);
                    }
                })
                .build();
        DataStreamSource<Person> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "person");
        kafkaSource.print();
        env.execute();
    }
}

运行结果

??程序正常运行后,发送消息内容(此处测试发送了五条消息),可在控制台中打印如下内容:

5> KafkaReceiveTest.Person(name=Bmtjom, age=15, gender=男)
5> KafkaReceiveTest.Person(name=72r3gp, age=15, gender=女)
5> KafkaReceiveTest.Person(name=Cf4qa3, age=16, gender=男)
5> KafkaReceiveTest.Person(name=Wwqbpw, age=15, gender=男)
5> KafkaReceiveTest.Person(name=So5skv, age=15, gender=男)

二、写入数据到Mysql中(Scala)

准备测试表

create table if not exists mysql_person
(
    name   varchar(6),
    age    tinyint,
    gender char
);

向Mysql数据库中写入测试数据

import cn.hutool.core.text.CharSequenceUtil
import cn.hutool.core.util.RandomUtil
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, FieldExpression, Schema, TableDescriptor, TableEnvironment}
import org.apache.flink.types.Row

import scala.collection.JavaConverters.seqAsJavaListConverter

object MysqlWriteTest {

  //测试数据结构
  case class Person(name: String, age: Int, gender: String)

  def main(args: Array[String]): Unit = {
    //设置为批处理
    val environmentSettings = EnvironmentSettings.newInstance().inBatchMode().build()
    //使用TableApi
    val tableEnv = TableEnvironment.create(environmentSettings)
    //创建测试数据
    val rows = (1 to 5)
      //创建5条数据对象
      .map(_ =>
        //随机创建数据内容
        Person(
          CharSequenceUtil.upperFirst(RandomUtil.randomString(6)),
          RandomUtil.randomInt(15, 17),
          RandomUtil.randomString("男女", 1)))
      //转换成Row
      .map(p => Row.of(p.name, Int.box(p.age), p.gender))
      //转换成Java List
      .asJava
    //将测试数据转换成数据表
    val dataTable = tableEnv.fromValues(rows).as("name", "age", "gender")
    //创建JDBC连接
    val targetTableName = "person"
    tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
      .schema(Schema.newBuilder()
        .column("name", DataTypes.STRING())
        .column("age", DataTypes.INT())
        .column("gender", DataTypes.CHAR(1))
        .build())
      .option("url", "jdbc:mysql://127.0.0.1:3306/copy")
      .option("table-name", "mysql_person") //目标表名称
      .option("username", "root")
      .option("password", "123456")
      .build())
    //将数据插入目标表
    dataTable.select($"*").executeInsert(targetTableName)
    //查看目标数据表内容
    tableEnv.from(targetTableName).select($"*").execute().print()
  }

}

运行结果

??程序正常运行后,创建五条测试数据并写入到目标表中,可在控制台中打印如下内容:

+--------------------------------+-------------+--------------------------------+
|                           name |         age |                         gender |
+--------------------------------+-------------+--------------------------------+
|                         Jku9ch |          15 |                             女 |
|                         1z4a3m |          16 |                             女 |
|                         7k5yf5 |          15 |                             女 |
|                         2taztn |          15 |                             女 |
|                         75a2y4 |          15 |                             男 |
+--------------------------------+-------------+--------------------------------+
5 rows in set

三、写入数据到PostgreSql中(Java)

准备测试表

create table if not exists postgresql_person
(
    name   varchar(6),
    age    int,
    gender char
);

向PostgreSql数据库中写入测试数据

import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.table.api.*;
import org.apache.flink.types.Row;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.api.Expressions.$;

public class PostgresqlWriteTest {

    //测试数据结构
    @Data
    @AllArgsConstructor
    private static class Person {
        private String name;
        private int age;
        private String gender;
    }

    public static void main(String[] args) {
        //设置为批处理
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        //使用TableApi
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        //创建测试数据
        List<Row> rows = IntStream.rangeClosed(1, 5)
                .mapToObj(v -> {
                    //创建5条数据对象,随机创建数据内容
                    return new Person(
                            StrUtil.upperFirst(RandomUtil.randomString(6)),
                            RandomUtil.randomInt(15, 17),
                            RandomUtil.randomString("男女", 1));
                })
                .map(p -> Row.of(p.name, p.age, p.gender))
                .collect(Collectors.toList());
        //将测试数据转换成数据表
        Table dataTable = tableEnv.fromValues(rows).as("name", "age", "gender");
        //创建JDBC连接
        String targetTableName = "person";
        tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
                .schema(Schema.newBuilder()
                        .column("name", DataTypes.STRING())
                        .column("age", DataTypes.INT())
                        .column("gender", DataTypes.CHAR(1))
                        .build())
                .option("url", "jdbc:postgresql://192.168.3.190:5432/y")
                .option("table-name", "postgresql_person") //目标表名称
                .option("username", "postgres")
                .option("password", "123456")
                .build());
        //将数据插入目标表
        dataTable.select($("*")).executeInsert(targetTableName);
        //查看目标数据表内容
        tableEnv.from(targetTableName).select($("*")).execute().print();
    }

}

运行结果

??程序正常运行后,创建五条测试数据并写入到目标表中,可在控制台中打印如下内容:

+--------------------------------+-------------+--------------------------------+
|                           name |         age |                         gender |
+--------------------------------+-------------+--------------------------------+
|                         Xfsg1q |          16 |                             女 |
|                         0z5ilr |          16 |                             男 |
|                         Eutala |          15 |                             女 |
|                         F75phz |          15 |                             男 |
|                         2xhqd8 |          15 |                             男 |
+--------------------------------+-------------+--------------------------------+

四、接收Kafka消息流写入数据到Mysql中(Scala)

测试代码

??sink的目标表与第二步的测试表结构一致。

import cn.hutool.core.util.StrUtil
import cn.hutool.json.JSONUtil
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, FieldExpression, Schema, Table, TableDescriptor}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.kafka.clients.consumer.OffsetResetStrategy

import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

object KafkaToMysqlTest {

  //测试数据结构(自带schema不需要再设置)
  private case class Person(name: String, age: Int, gender: String)

  def main(args: Array[String]): Unit = {
    //环境配置
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //每个1s检查一次,精确一次
    env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE)
    //table api
    val tableEnv = StreamTableEnvironment.create(env)
    //配置kafka
    val source = KafkaSource.builder()
      .setBootstrapServers("192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092")
      .setTopics("ly_test")
      .setGroupId("KafkaToMysqlTest")
      .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
      //自定义解析消息内容
      .setValueOnlyDeserializer(new AbstractDeserializationSchema[Person]() {
        override def deserialize(message: Array[Byte]): Person = {
          val json = JSONUtil.parse(StrUtil.str(message, StandardCharsets.UTF_8))
          Person(
            json.getByPath("name", classOf[String]),
            json.getByPath("age", classOf[java.lang.Integer]),
            json.getByPath("gender", classOf[String]),
          )
        }
      })
      .build()
    //数据源
    val kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks[Person], "person")
    //将kafka转成数据表
    val dataTable: Table = tableEnv.fromDataStream(kafkaSource)
    //创建 sink 目标 (mysql数据库)
    val targetTableName = "person"
    tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
      .schema(Schema.newBuilder()
        .column("name", DataTypes.STRING())
        .column("age", DataTypes.INT())
        .column("gender", DataTypes.STRING)
        .build())
      .option("url", "jdbc:mysql://127.0.0.1:3306/copy")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("table-name", "mysql_person") //目标表名称
      .option("username", "root")
      .option("password", "123456")
      .build())
    //将数据插入目标表
    dataTable.select($"name", $"age", $"gender").executeInsert(targetTableName)
  }

}

运行结果

Mysql插入结果

五、接收Kafka消息流写入数据到PostgreSql中(Java)

测试代码

??sink的目标表与第三步的测试表结构一致。

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.$;

public class KafkaToPostgresqlTest {

    //测试数据结构
    @Data
    @AllArgsConstructor
    private static class Person {
        private String name;
        private int age;
        private String gender;
    }

    public static void main(String[] args) {
        //环境配置
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //每个1s检查一次,精确一次
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
        //table api
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //配置kafka
        KafkaSource<Person> source = KafkaSource.<Person>builder()
                .setBootstrapServers("192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092")
                .setTopics("ly_test")
                .setGroupId("KafkaToPostgresqlTest")
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                //自定义解析消息内容
                .setValueOnlyDeserializer(new AbstractDeserializationSchema<Person>() {
                    @Override
                    public Person deserialize(byte[] message) {
                        return JSONUtil.toBean(StrUtil.str(message, StandardCharsets.UTF_8), Person.class);
                    }
                })
                .build();
        //数据源
        DataStreamSource<Person> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "person");
        //数据结构转换
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> dataSource =
                kafkaSource.map(p -> Tuple3.of(p.getName(), p.getAge(), p.getGender()),
                        Types.TUPLE(Types.STRING, Types.INT, Types.STRING));
        //将kafka转成数据表
        Table dataTable = tableEnv.fromDataStream(dataSource).as("name", "age", "gender");
        //创建JDBC连接
        String targetTableName = "person";
        tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
                .schema(Schema.newBuilder()
                        .column("name", DataTypes.STRING())
                        .column("age", DataTypes.INT())
                        .column("gender", DataTypes.STRING())
                        .build())
                .option("url", "jdbc:postgresql://192.168.3.190:5432/y")
                .option("driver", "org.postgresql.Driver")
                .option("table-name", "postgresql_person") //目标表名称
                .option("username", "postgres")
                .option("password", "123456")
                .build());
        //将数据插入目标表
        dataTable.select($("name"), $("age"), $("gender")).executeInsert(targetTableName);
    }

}

运行结果

Postgresql插入结果

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-17 16:30:00  更:2022-07-17 16:33:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:29:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码