Flink1.14学习测试:接收kafka消息将结构化数据通过JDBC保存到数据库中
准备事项
关键依赖的版本
- Flink : 1.14.4
- Scala:2.12.10
- Java : 1.8
参考资料
??Flink相关依赖查看官方文档后依赖即可。
驱动包(测试用)
??Mysql驱动依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
<scope>runtime</scope>
</dependency>
??Postgresql驱动依赖
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.4.0</version>
<scope>runtime</scope>
</dependency>
其它(非必要,纯属个人习惯)
??Hutool工具类库
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.3</version>
</dependency>
??简化 Java POJO 对象的工具
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
一、接收Kafka消息内容(Java)
??在1.14中FlinkKafkaConsumer 类已经过时,此处使用KafkaSource 用于就收Kafka消息流。
测试接收消息内容并解析打印
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class KafkaReceiveTest {
@Data
private static class Person {
private String name;
private int age;
private char gender;
}
@SneakyThrows
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
KafkaSource<Person> source = KafkaSource.<Person>builder()
.setBootstrapServers("192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092")
.setTopics("ly_test")
.setGroupId("KafkaReceiveTest")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new AbstractDeserializationSchema<Person>() {
@Override
public Person deserialize(byte[] message) {
return JSONUtil.toBean(StrUtil.str(message, StandardCharsets.UTF_8), Person.class);
}
})
.build();
DataStreamSource<Person> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "person");
kafkaSource.print();
env.execute();
}
}
运行结果
??程序正常运行后,发送消息内容(此处测试发送了五条消息),可在控制台中打印如下内容:
5> KafkaReceiveTest.Person(name=Bmtjom, age=15, gender=男)
5> KafkaReceiveTest.Person(name=72r3gp, age=15, gender=女)
5> KafkaReceiveTest.Person(name=Cf4qa3, age=16, gender=男)
5> KafkaReceiveTest.Person(name=Wwqbpw, age=15, gender=男)
5> KafkaReceiveTest.Person(name=So5skv, age=15, gender=男)
二、写入数据到Mysql中(Scala)
准备测试表
create table if not exists mysql_person
(
name varchar(6),
age tinyint,
gender char
);
向Mysql数据库中写入测试数据
import cn.hutool.core.text.CharSequenceUtil
import cn.hutool.core.util.RandomUtil
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, FieldExpression, Schema, TableDescriptor, TableEnvironment}
import org.apache.flink.types.Row
import scala.collection.JavaConverters.seqAsJavaListConverter
object MysqlWriteTest {
case class Person(name: String, age: Int, gender: String)
def main(args: Array[String]): Unit = {
val environmentSettings = EnvironmentSettings.newInstance().inBatchMode().build()
val tableEnv = TableEnvironment.create(environmentSettings)
val rows = (1 to 5)
.map(_ =>
Person(
CharSequenceUtil.upperFirst(RandomUtil.randomString(6)),
RandomUtil.randomInt(15, 17),
RandomUtil.randomString("男女", 1)))
.map(p => Row.of(p.name, Int.box(p.age), p.gender))
.asJava
val dataTable = tableEnv.fromValues(rows).as("name", "age", "gender")
val targetTableName = "person"
tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.CHAR(1))
.build())
.option("url", "jdbc:mysql://127.0.0.1:3306/copy")
.option("table-name", "mysql_person")
.option("username", "root")
.option("password", "123456")
.build())
dataTable.select($"*").executeInsert(targetTableName)
tableEnv.from(targetTableName).select($"*").execute().print()
}
}
运行结果
??程序正常运行后,创建五条测试数据并写入到目标表中,可在控制台中打印如下内容:
+--------------------------------+-------------+--------------------------------+
| name | age | gender |
+--------------------------------+-------------+--------------------------------+
| Jku9ch | 15 | 女 |
| 1z4a3m | 16 | 女 |
| 7k5yf5 | 15 | 女 |
| 2taztn | 15 | 女 |
| 75a2y4 | 15 | 男 |
+--------------------------------+-------------+--------------------------------+
5 rows in set
三、写入数据到PostgreSql中(Java)
准备测试表
create table if not exists postgresql_person
(
name varchar(6),
age int,
gender char
);
向PostgreSql数据库中写入测试数据
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.table.api.*;
import org.apache.flink.types.Row;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.table.api.Expressions.$;
public class PostgresqlWriteTest {
@Data
@AllArgsConstructor
private static class Person {
private String name;
private int age;
private String gender;
}
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
List<Row> rows = IntStream.rangeClosed(1, 5)
.mapToObj(v -> {
return new Person(
StrUtil.upperFirst(RandomUtil.randomString(6)),
RandomUtil.randomInt(15, 17),
RandomUtil.randomString("男女", 1));
})
.map(p -> Row.of(p.name, p.age, p.gender))
.collect(Collectors.toList());
Table dataTable = tableEnv.fromValues(rows).as("name", "age", "gender");
String targetTableName = "person";
tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.CHAR(1))
.build())
.option("url", "jdbc:postgresql://192.168.3.190:5432/y")
.option("table-name", "postgresql_person")
.option("username", "postgres")
.option("password", "123456")
.build());
dataTable.select($("*")).executeInsert(targetTableName);
tableEnv.from(targetTableName).select($("*")).execute().print();
}
}
运行结果
??程序正常运行后,创建五条测试数据并写入到目标表中,可在控制台中打印如下内容:
+--------------------------------+-------------+--------------------------------+
| name | age | gender |
+--------------------------------+-------------+--------------------------------+
| Xfsg1q | 16 | 女 |
| 0z5ilr | 16 | 男 |
| Eutala | 15 | 女 |
| F75phz | 15 | 男 |
| 2xhqd8 | 15 | 男 |
+--------------------------------+-------------+--------------------------------+
四、接收Kafka消息流写入数据到Mysql中(Scala)
测试代码
??sink的目标表与第二步的测试表结构一致。
import cn.hutool.core.util.StrUtil
import cn.hutool.json.JSONUtil
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, FieldExpression, Schema, Table, TableDescriptor}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
object KafkaToMysqlTest {
private case class Person(name: String, age: Int, gender: String)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE)
val tableEnv = StreamTableEnvironment.create(env)
val source = KafkaSource.builder()
.setBootstrapServers("192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092")
.setTopics("ly_test")
.setGroupId("KafkaToMysqlTest")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new AbstractDeserializationSchema[Person]() {
override def deserialize(message: Array[Byte]): Person = {
val json = JSONUtil.parse(StrUtil.str(message, StandardCharsets.UTF_8))
Person(
json.getByPath("name", classOf[String]),
json.getByPath("age", classOf[java.lang.Integer]),
json.getByPath("gender", classOf[String]),
)
}
})
.build()
val kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks[Person], "person")
val dataTable: Table = tableEnv.fromDataStream(kafkaSource)
val targetTableName = "person"
tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING)
.build())
.option("url", "jdbc:mysql://127.0.0.1:3306/copy")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("table-name", "mysql_person")
.option("username", "root")
.option("password", "123456")
.build())
dataTable.select($"name", $"age", $"gender").executeInsert(targetTableName)
}
}
运行结果
五、接收Kafka消息流写入数据到PostgreSql中(Java)
测试代码
??sink的目标表与第三步的测试表结构一致。
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.$;
public class KafkaToPostgresqlTest {
@Data
@AllArgsConstructor
private static class Person {
private String name;
private int age;
private String gender;
}
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
KafkaSource<Person> source = KafkaSource.<Person>builder()
.setBootstrapServers("192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092")
.setTopics("ly_test")
.setGroupId("KafkaToPostgresqlTest")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new AbstractDeserializationSchema<Person>() {
@Override
public Person deserialize(byte[] message) {
return JSONUtil.toBean(StrUtil.str(message, StandardCharsets.UTF_8), Person.class);
}
})
.build();
DataStreamSource<Person> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "person");
SingleOutputStreamOperator<Tuple3<String, Integer, String>> dataSource =
kafkaSource.map(p -> Tuple3.of(p.getName(), p.getAge(), p.getGender()),
Types.TUPLE(Types.STRING, Types.INT, Types.STRING));
Table dataTable = tableEnv.fromDataStream(dataSource).as("name", "age", "gender");
String targetTableName = "person";
tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING())
.build())
.option("url", "jdbc:postgresql://192.168.3.190:5432/y")
.option("driver", "org.postgresql.Driver")
.option("table-name", "postgresql_person")
.option("username", "postgres")
.option("password", "123456")
.build());
dataTable.select($("name"), $("age"), $("gender")).executeInsert(targetTableName);
}
}
运行结果
|