<dependency>
? ? <groupId>org.apache.flink</groupId>
? ? <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
? ? <version>1.14.4</version>
</dependency>
SinkFunction实现类:
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
class ElasticIndexSinkFunction(indexType:String) extends ElasticsearchSinkFunction[JSONObject]{
override def process(element: JSONObject, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
try{
val uniqueKey=element.getString("uniqueKey")
val indexName=EsIndexName.getWeekMondayIndexName(indexType,element.getLongValue("startTime"))
val request: IndexRequest = Requests.indexRequest
.index(indexName)
.`type`("_doc")
.id(uniqueKey)
.source(element)
requestIndexer.add(request)
}catch {
case e:Exception=>e.printStackTrace()
}
}
}
数据写入es:
import org.apache.commons.configuration2.builder.fluent.Configurations
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import java.util
import java.util.UUID
object EsSlinkUtil {
def sendData(indexType: String, data: DataStream[JSONObject], jobType: String) = {
val configs = new Configurations()
val configuration = configs.properties("SysConfig.properties")
val esIP = configuration.getString("eSPort_export")
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost(esIP, 9200, "http"))
try {
val elasticSink: ElasticsearchSink.Builder[JSONObject] = new ElasticsearchSink.Builder[JSONObject](httpHosts, new ElasticIndexSinkFunction(indexType))
data.addSink(elasticSink.build())
.name(indexType + "-" + UUID.randomUUID().toString.replaceAll("-", ""))
.uid(indexType + "-" + jobType)
} catch {
case e: Exception => e.printStackTrace()
}
}
}
|