Flink SQL 从kafka查数据写到kafka
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Flink05_SQL_Kafka2Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_source_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',"
+ "'properties.group.id' = 'atguigu',"
+ "'scan.startup.mode' = 'latest-offset',"
+ "'format' = 'json'"
+ ")");
tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_sink_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',"
+ "'format' = 'json'"
+ ")");
tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");
}
}
|