public class ElasticSearchTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(5000L);
DataStreamSource<String> streamSource = environment.addSource(new SocketTextStreamFunction("hadoop01", 4401, ",", 3));
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("hadoop01", 9200, "http"));
ElasticsearchSink.Builder<String> elasticsearchBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
});
elasticsearchBuilder.setBulkFlushMaxActions(1);
streamSource.addSink(elasticsearchBuilder.build());
environment.execute();
}
}
执行过程中报错 java.lang.NoClassDefFoundError: org/apache/http/client/config/RequestConfig
其中pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.11.2</version>
</dependency>
elasticsearch 6.3.1 依赖 httpclient 4.2.5 pom.xml
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.8</version>
</dependency>
|