Flink与ElasticSearch集成
在生产中,经常会使用Flink和ES集成,将Flink处理后的数据存放在ES之中,我们了解Flink是个实时处理的框架,其输出Sink可以由用户自定义输出,所以就可以将Flink处理后的数据通过自定义的Sink输出到ES中。
-
首先要添加相关的依赖 <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.12.0</version>
</dependency>
-
创建flink的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
使用socket当作数据源 DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
-
创建ES连接 List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
-
将ES作为Sink source.addSink(esSinkBuilder.build());
-
执行 env.execute("flink-es");
|