消费Kafka
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaDemo01 {
/**
* 版本1.13之前的常见写法
*
* @param env StreamExecutionEnvironment
* @throws Exception e
*/
private static void beforeVersion13(StreamExecutionEnvironment env) throws Exception {
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("vm01:9092")
.setTopics(Arrays.asList("test01"))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source");
stream.print();
env.execute("beforeAndEqualVersion12");
}
/**
* 版本1.13及之后的常见写法
*
* @param env StreamExecutionEnvironment
* @throws Exception e
*/
private static void afterVersion13(StreamExecutionEnvironment env) throws Exception {
// kafka source
String topic = "test";
Properties props = new Properties();
props.setProperty("bootstrap.servers", "vm01:9092");
props.setProperty("group.id", "test");
props.setProperty("auto.offset.reset", "latest");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "3000");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props);
DataStreamSource<String> stream = env.addSource(consumer);
stream.print();
env.execute("afterVersion12");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// beforeVersion13(env);
afterVersion13(env);
env.execute("Demo");
}
}
|