1、基于DataStream api的Table api&Flink SQL
1、使用Table api必须要添加Flink或者Blink的计划器。
2、Flink1.10(含)之前默认使用flink计划器,flink1.11(含)之后默认使用blink计划器。
<!--flink 1.11及之后使用的都是blink的计划器,这儿引入的也是blink的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink 1.10(含)之前默认的计划器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
package com.atguigu.GTable_api_Flink_sql;
import com.atguigu.Zbeans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* Table api&Flink sql第一课:基于dataStream的Table api&Flink sql
* 1、需要引入flink的计划器的依赖
* flink1.10(含)之前默认使用flink计划器,flink1.11之后使用blink的计划器
* 2、需要创建表的执行环境。
* 3、Table api实际上是基于DSL语法来处理数据的,每次操作都返回一个Table对象。
* 4、tableEnv.fromDataStream得到的表,必须注册成视图才能使用SQL api.
* SQL api的操作后同样返回一张表。
* 5、输出表对象必须转换成DataStream对象才行,Row对象导包导的是flink.types.Row包,别导错了。
*
*/
public class AFirstExample {
public static void main(String[] args) throws Exception{
//加载环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //使用blink计划器 useOldPlanner()使用的就是flink的计划器,也需要依赖
.inStreamingMode() //流模式
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings); //创建表的执行环境
//读取数据包装成pojo类型
DataStreamSource<String> inputStream = env.readTextFile("G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(new String(fields[0]), new Long(fields[1]), new Double(fields[2]));
});
//基于流创建一张表
Table inputTable = tableEnv.fromDataStream(dataStream);
//table api处理数据
Table resultTable1 = inputTable.select("id,temperature")
.where("id = 'sensor_1'");
//flink sql 处理数据
//fromDataStream得到的表,必须注册成视图才能使用SQL api
tableEnv.createTemporaryView("sensor",inputTable);
Table resultTable2 = tableEnv.sqlQuery("select id,temperature from sensor where id='sensor_1'");
//转成流输出,Row导包导的是flink.types.Row,别导错了
//如果Table api或Flink sql有聚合操作,则需要使用toRetractStream
//toRetractStream会将一次更新转换成一次删除和一次新增,删除在数据头部添加false,新增添加true.
tableEnv.toAppendStream(resultTable1,Row.class).print("table api处理数据");
tableEnv.toAppendStream(resultTable2,Row.class).print("flink sql处理数据");
//执行
env.execute("基于DataStream的table api和flink sql");
}
}
2、纯粹的Table api&Flink SQL
读取文件、kafka计算后输出到文件、kafka
package com.atguigu.GTable_api_Flink_sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
/**
* 使用纯粹的Table api&Flink sql来完成一个Test
* 从文件、kafka读取数据,输出到文件或kafka。
* 直接读取数据源再通过createTemporaryTable得到的表:
* Flink sql中直接使用,table api则还需要先tableEnv.from("inputTable")才可以使用DSL语法进行操作。
* 写出到文件和kafka都不支持聚合操作,不能用聚合结果表调用insertInto进行输出。
* 若要支持聚合输出,则可以输出到ES或者MySQL。
* 从kafka读取数据输入到kafka时的注意事项:
* kafka连接器依赖中artifactId如果是flink-connector-kafka_2.12,则为通用连接器,
* 无论是读取kafka还是写入kafka,version方法中版本应该写“universal”。
* kafka连接器依赖中artifactId如果是flink-connector-kafka-0.11_2.12,则为0.11版的连接器。
* 无论是读取kafka还是写入kafka,version方法中版本应该写“0.11”,当然也可以写0.10,高版本兼容低版本。
* 启动kafka:
* cd /opt/apps/kafka_2.11-0.11.0.3/bin/ && zkServer.sh start && kafka-server-start.sh ../config/server.properties
* 启动生产者:
* kafka-console-producer.sh -broker-list Linux001:9092 --topic topic_producer
* 启动消费者:
* kafka-console-consumer.sh --bootstrap-server Linux001:9092 --topic topic_consumer
*/
public class BPureTableApi {
public static void main(String[] args) throws Exception{
//加载环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings); //创建表的执行环境
// /**
// * 读取文件注册成表(flink1.12已经废弃了这种方式)
// * 这样注册的表在Flink sql中可以直接使用,但在table api中使用经过tableEnv.from("inputTable")才行
// */
// String inpath="G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor.txt";
// tableEnv.connect(new FileSystem().path(inpath))
// .withFormat(new Csv()) //是需要加入csv依赖的
// .withSchema(new Schema() //字段名可以更改,顺序不能改变
// .field("id", DataTypes.STRING())
// .field("timestamp",DataTypes.BIGINT())
// .field("temperature",DataTypes.DOUBLE())
// ).createTemporaryTable("inputTable");
/**
* 读取kafka数据
* 这样注册的表在Flink sql中可以直接使用,但在table api中使用经过tableEnv.from("inputTable")才行
* flink table api连接kafka的属性配置中zookeeper.connect和bootstrap.servers都要配置,简直神奇
*/
tableEnv.connect(new Kafka()
.version("universal") //kafka通用连接器版本
.topic("topic_producer")
.property("zookeeper.connect","192.168.149.131:2181")
.property("bootstrap.servers","192.168.149.131:9092")
)
.withFormat(new Csv()) //解析格式,有些格式是需要导依赖的
.withSchema(new Schema() //字段名可以更改,顺序不能改变
.field("id", DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
).createTemporaryTable("inputTable");
//使用Table api操作
Table inputTable=tableEnv.from("inputTable");
Table Ttable=inputTable.select("id,temperature")
.where("id = 'sensor_1'");
//使用Flink sql操作
Table Stable=tableEnv.sqlQuery("select id,avg(temperature) as avg_temp from inputTable group by id");
// /**
// * 将结果数据注册成表,然后输出到文件中
// * withschema的字段要和Table api&Flink sql查询字段一致
// */
// String outpath="G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor_out.txt";
// tableEnv.connect(new FileSystem().path(outpath))
// .withFormat(new Csv()) //是需要加入csv依赖的
// .withSchema(new Schema() //字段名可以更改,顺序不能改变
// .field("id", DataTypes.STRING())
// .field("temp",DataTypes.DOUBLE()))
// .createTemporaryTable("outputTable");
/**
* 将结果数据注册成表,然后写入到kafka中
* withschema的字段要和Table api&Flink sql查询字段一致
*/
tableEnv.connect(new Kafka()
.version("universal") //kafka通用连接器版本
.topic("topic_consumer")
.property("zookeeper.connect","192.168.149.131:2181")
.property("bootstrap.servers","192.168.149.131:9092")
)
.withFormat(new Csv()) //解析格式,有些格式是需要导依赖的
.withSchema(new Schema() //字段名可以更改,顺序不能改变
.field("id", DataTypes.STRING())
.field("temp",DataTypes.DOUBLE())
).createTemporaryTable("outputTable");
/**
* 写出到文件,不支持聚合操作,不能用聚合结果表调用insertInto
* 写出到Kafka,不支持聚合操作,不能用聚合结果表调用insertInto
*/
Ttable.insertInto("outputTable");
//执行
env.execute("测试纯粹的Table api & Flink sql");
}
}
3、输出模式
1、Flink支持三种输出模式
追加(append)模式:
只支持插入。
对应方法toAppendStream。
撤回(retract)模式:
支持插入、删除、更新。插入和删除都很单纯。
更新则会转换成一次撤回和一次插入,撤回的消息添加false前缀,插入的消息添加true前缀。
对应方法toRetractStream。
更新插入(upsert)模式:
支持插入、删除、更新。删除很单纯。
插入和更新都是upsert,需要指定key来判断当前写入操作是插入还是更新。
只有外部系统支持retract或upsert模式,才可以将聚合操作写出。
这样的系统有ES\MySQL\Oracle等。
4、输出到ES
依赖
<!--Elasticsearch-connector连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
tableEnv.connect(new Elasticsearch()
.version("6") //ES版本
.host("localhost",9200,"http")
.index("id")
.documentType("temp")
)
.inUpsertMode()
.withFormat(new Json()) //解析格式,有些格式是需要导依赖的
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("temp",DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable");
aggtable.insertInto("outputTable");
?5、输出到MySQL
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.1</version>
</dependency>
String sinkDDL = "create table outputTable("+
"id varchar(20) not null,"+
"temp double(10,2) not null"+
") with ("+
"'connector.type'='jdbc',"+
"'connector.url'='jdbc:mysql://localhost:3306/test',"+
"'connector.table'='id_count',"+
"'connector.driver'='com.mysql.jdbc.Driver',"+
"'connector.username'='root',"+
"'connector.password'='123456')";
tableEnv.sqlUpdate(sinkDDL);
aggTable.insertInto("outputTable");
|