1.创建spark对象并配置读取es的连接信息
val spark = SparkSession
.builder()
.appName("test")
.config("spark.port.maxretries", "128")
.config("spark.sql.parquet.writelegacyFormat", true)
.config("es.index.auto.create", "true")
.config("es.nodes.wan.only", "true")
.config("es.nodes", "0.0.0.0:9200")
.master("local")
.config("hive.metastore.uris", "thrift://0.0.0.0:9083")
.config("hive.execution.engine", "mr")
.config("spark.sql.warehouse.dir", "hdfs://0.0.0.0:8020/apps/hive/warehouse")
.config("spark.yarn.am.waitTime", "1000s")
.config("spark.default.parallelism", "300")
.config("spark.ui.retainedStages", 500)
.config("spark.hadoop.mapred.output.compress", "true")
.config("spark.hadoop.mapred.output.compression.codec", "snappy")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config("spark.sql.hive.convertMetastoreOrc", "false")
.enableHiveSupport()
.getOrCreate()
import org.elasticsearch.spark.sql._
val options = Map(
"es.nodes.wan.only" -> "true",
"es.nodes" -> "0.0.0.0:9200,0.0.0.0:9200,0.0.0.0:9200,0.0.0.0:9200",
"es.port" -> "9200"
)
2.读取es数据
import org.elasticsearch.spark.sql._
spark.read.format("es").options(options).load("索引名称/_doc")
.filter("添加一些过滤条件")
.createOrReplaceTempView("创建的临时表名称")
3.写数据到es
import org.elasticsearch.spark.sql._
val sourceDF = spark.sql("hiveSql")
sourceDF
.write
.format("org.elasticsearch.spark.sql")
.options(options)
.mode(SaveMode.Append)
.save("写入的索引名/_docs")
|