场景
flink上游一般使用kafka, 因为可以充分利用flink的checkPoint保存状态, 实现高可用. 而由于Sink输出端数据量可能也比较大, 所以部分场景还是要Sink到kafka, 降低数据库或者其他组件压力
代码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class ConsumeKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10 * 1000L);
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "localhost:9092");
prop.setProperty("group.id", "consumer_flink");
String consumeTopic = "flink-source";
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
consumeTopic, new SimpleStringSchema(), prop);
kafkaConsumer.setStartFromLatest();
String produceTopic = "flink-sink";
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
produceTopic, new SimpleStringSchema(), prop);
env.addSource(kafkaConsumer)
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
System.out.println("Flink msg: " + s);
collector.collect(s + "_sink");
}
})
.addSink(kafkaProducer)
.setParallelism(2);
env.execute("My Flink Test");
}
}
|