添加 flink-doris-connector 和必要的 Flink Maven 依赖
此处参考官网的配置 Flink 1.13.* 及以前的版本
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
Flink 1.14.* 版本
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.12</artifactId>
<version>1.0.3</version>
</dependency>
案例是采用1.14版本的,就在今天Flink出了1.15 ,真快 我是紧赶慢赶啊
建表
CREATE TABLE dbname.`worker` (
`startTime` datetime NOT NULL ,
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` int DEFAULT NULL,
`city` varchar(255) NOT NULL,
`salary` int NOT NULL
)ENGINE=olap
DUPLICATE KEY(startTime,id,name)
PARTITION BY RANGE(startTime)()
distributed BY HASH(name)
PROPERTIES (
.......
);
记得要把分区完善,如果是空分区,会报错,无法导入数据的
模拟数据源
因为用Flink基本都是流式数据,又不想再写个kafka,所以就自己早了个数据源
class MyDataSource extends SourceFunction[String] {
var runnning: Boolean = true
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val random: Random = new Random()
var id: Int = 0
val nameList: util.ArrayList[String] = new util.ArrayList[String]()
nameList.addAll(util.Arrays.asList("aa", "bb", "cc", "dd"))
val cityList: util.ArrayList[String] = new util.ArrayList[String]()
cityList.addAll(util.Arrays.asList("苏州", "无锡", "常州", "南京"))
var age: Int = 0
var salary: Int = 0
var r:Int = 0
var name:String = null
var city :String = null
while (runnning) {
id = id + 1
r = random.nextInt(10)%nameList.size()
age = age+random.nextInt(20)
salary = salary+random.nextInt(5000)+10000
name = nameList.get(r)
city = cityList.get(r)
val str:String = "{\"startTime\":\"2022-05-06\","+"\"id\":"+id+",\"name\":\""+name+"\",\"age\":"+age+",\"city\":\""+city+"\",\"salary\":"+salary+"}"
sourceContext.collect(str)
Thread.sleep(1000L)
}
}
override def cancel(): Unit = ???
}
sink到Doris
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
DataStreamSource<String> stream = env.addSource(new MyDataSource());
stream.print();
stream.addSink(
DorisSink.sink(
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(3)
.setBatchIntervalMs(1L)
.setMaxRetries(3)
.setStreamLoadProp(pro).build(),
DorisOptions.builder()
.setFenodes("xxx.xxx.xxx.xxx.xxx:8030")
.setTableIdentifier("dbname.worker")
.setUsername("username")
.setPassword("password").build()
));
try {
env.execute("Flink2Doris");
} catch (Exception e) {
e.printStackTrace();
}
}
封装一下
考虑到使用场景,我就封装了一下,用起来能方便一些
public class MySinkF {
public SinkFunction<String> MySinkDoris(String tablename){
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
SinkFunction<String> sink = DorisSink.sink(
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(3)
.setBatchIntervalMs(1L)
.setMaxRetries(3)
.setStreamLoadProp(pro).build(),
DorisOptions.builder()
.setFenodes("xxx.xxx.xxx.xxx.xxx:8030")
.setTableIdentifier("dbname.worker")
.setUsername("username")
.setPassword("password").build()
);
return sink;
}
}
原代码就可以简单一些了
stream.addSink(new MySinkF().MySinkDoris("worker"));
小升级一下
因为我的业务是用AGGREGATE类型,有些字段需要replace,所以我又试了一下,是否正常使用
重新建表
CREATE TABLE test_db.`worker_replace` (
`startTime` datetime NOT NULL ,
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` int DEFAULT NULL,
`city` varchar(255) NOT NULL,
`salary` int REPLACE NOT NULL
)ENGINE=olap
AGGREGATE KEY(startTime,id,name,age,city)
PARTITION BY RANGE(startTime)()
distributed BY HASH(name)
PROPERTIES (
......
);
数据源
直接从文件中拿了
{"startTime" : "2022-05-06 00:00:00","id" : 1,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 2,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 3,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 4,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 5,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 6,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 7,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 8,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 9,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 10,"name" : "dd","age" :14,"city" : "南京","salary" : 888}
{"startTime" : "2022-05-06 00:00:00","id" : 1,"name" : "dd","age" :14,"city" : "南京","salary" : 999}
{"startTime" : "2022-05-06 00:00:00","id" : 2,"name" : "dd","age" :14,"city" : "南京","salary" : 999}
{"startTime" : "2022-05-06 00:00:00","id" : 3,"name" : "dd","age" :14,"city" : "南京","salary" : 999}
{"startTime" : "2022-05-06 00:00:00","id" : 4,"name" : "dd","age" :14,"city" : "南京","salary" : 999}
最后代码
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> data = env.readTextFile("your_path").setParallelism(1);
data.print();
data.addSink(new MySinkF().MySinkDoris("worker_replace"));
try {
env.execute("doris repalce");
} catch (Exception e) {
e.printStackTrace();
}
}
最后去观察表,确实和预期结果一样
|