一、通过Flink向ES中大批量跑入数据
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
List<HttpHost> hosts = new ArrayList<>();
hosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<String> esBuilder = new ElasticsearchSink.Builder<>(hosts,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
Map<String, String> jsonMap = new HashMap<>();
jsonMap.put("data", s);
IndexRequest indexRequest = Requests.indexRequest();
indexRequest.index("flink-index");
indexRequest.id("9001");
indexRequest.source(jsonMap);
requestIndexer.add(indexRequest);
}
});
esBuilder.setBulkFlushMaxActions(1);
source.addSink(esBuilder.build());
env.execute("flink-es");
}
二、通过Sparkstreaming向ES中跑入大批量数据
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
ds.foreachRDD(
rdd => {
rdd.foreach(
data => {
val client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost",9200, "http"))
)
val ss = data.split(" ")
val request = new IndexRequest()
request.index("product").id(ss(0))
val json =
s"""
| { "data" : "${ss(1)}" }
|""".stripMargin
request.source(json, XContentType.JSON)
val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)
println(response.getResult)
client.close()
}
)
}
)
ssc.start()
ssc.awaitTermination()
}
三、通过Kafka向ES中跑入数据 未完待续
|