Spark写入ES支持
本示例采用Spark2.3.3版本
其他软件版本
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>localsparkdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.major.minor.version>2.11</scala.major.minor.version>
<scala.version>2.11.8</scala.version>
<spark.version>2.3.3</spark.version>
<fastjson.version>1.2.76</fastjson.version>
<httpclient.version>4.5.13</httpclient.version>
<lombok.version>1.18.6</lombok.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.major.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.major.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.major.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.major.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Elasticsearch提供了官方的支持包,本文采用的是5.4.2版本的es
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.4.2</version>
</dependency>
Spark 连接器框架对版本不兼容最敏感。 为方便起见,下面提供了版本兼容性矩阵:
Spark Version | Scala Version | ES-Hadoop Artifact ID |
---|
1.0 - 1.2 | 2.10 | <unsupported> | 1.0 - 1.2 | 2.11 | <unsupported> | 1.3 - 1.6 | 2.10 | elasticsearch-spark-13_2.10 | 1.3 - 1.6 | 2.11 | elasticsearch-spark-13_2.11 | 2.0+ | 2.10 | elasticsearch-spark-20_2.10 | 2.0+ | 2.11 | elasticsearch-spark-20_2.11 |
version中填写的是ES的版本号
写入数据类型
Native RDD support
Map
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
EsSpark.saveToEs(mapRdd, "gudong20220715001/doc")
cass class
case class Trip(departure: String, arrival: String)
...
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
EsSpark.saveToEs(cassClassRdd, "gudong20220715001/doc")
JSON
val json1 = """{"reason" : "business", "airport" : "SFO"}"""
val json2 = """{"participants" : 5, "airport" : "OTP"}"""
val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
EsSpark.saveJsonToEs(jsonRdd, "gudong20220715001/doc")
dynamic/multi-resources
动态支持需要将action.auto_create_index配置成true
PUT _cluster/settings
{
"persistent": {
"action.auto_create_index": "true"
}
}
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
val rdd = spark.sparkContext.makeRDD(Seq(game, book, cd))
EsSpark.saveToEs(rdd, "gudong20220715001-{media_type}/doc")
handling document metadata
支持的元数据在枚举类org.elasticsearch.spark.rdd.Metadata 中
public enum Metadata {
ID,
PARENT,
ROUTING,
TTL,
TIMESTAMP,
VERSION,
VERSION_TYPE;
private Metadata() {
}
}
import org.elasticsearch.spark.rdd.Metadata._
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val otpMeta = Map(ID -> 1, TTL -> "3h")
val mucMeta = Map(ID -> 2)
val sfoMeta = Map(ID -> 3)
val metadataRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715001/doc")
EsSpark.saveToEsWithMeta(metadataRdd, conf_extend)
val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
val onlyIdRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp2), (mucMeta, muc2), (sfoMeta, sfo2)))
EsSpark.saveToEsWithMeta(onlyIdRdd, conf_extend)
完整代码
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
object Main {
case class Trip(departure: String, arrival: String)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("测试ES").setMaster("local")
conf.set("es.nodes", "192.168.1.1,192.168.1.2")
.set("es.port", "9200")
val spark = SparkSession.builder().config(conf).getOrCreate()
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
EsSpark.saveToEs(mapRdd, "gudong20220715001/doc")
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
EsSpark.saveToEs(cassClassRdd, "gudong20220715001/doc")
val json1 = """{"reason" : "business", "airport" : "SFO"}"""
val json2 = """{"participants" : 5, "airport" : "OTP"}"""
val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
EsSpark.saveJsonToEs(jsonRdd, "gudong20220715001/doc")
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
val rdd = spark.sparkContext.makeRDD(Seq(game, book, cd))
EsSpark.saveToEs(rdd, "gudong20220715001-{media_type}/doc")
import org.elasticsearch.spark.rdd.Metadata._
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val otpMeta = Map(ID -> 1, TTL -> "3h")
val mucMeta = Map(ID -> 2)
val sfoMeta = Map(ID -> 3)
val metadataRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715001/doc")
EsSpark.saveToEsWithMeta(metadataRdd, conf_extend)
val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
val onlyIdRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp2), (mucMeta, muc2), (sfoMeta, sfo2)))
EsSpark.saveToEsWithMeta(onlyIdRdd, conf_extend)
spark.stop()
}
}
Spark Streaming support
在Spark Steaming中,由于与 RDD 不同,由于 DStream 的连续性,您无法使用 DStream 从 Elasticsearch 中读取数据。
Map
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
val mapMicroBatches = mutable.Queue(mapRdd)
val mapDStream = ssc.queueStream(mapMicroBatches)
EsSparkStreaming.saveToEs(mapDStream, "gudong20220718001/doc")
cass class
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
val cassClassMicroBatches = mutable.Queue(cassClassRdd)
val cassClassDStream = ssc.queueStream(cassClassMicroBatches)
EsSparkStreaming.saveToEs(cassClassDStream, "gudong20220718001/doc")
JSON
val json1 = """{"reason" : "business", "airport" : "SFO"}"""
val json2 = """{"participants" : 5, "airport" : "OTP"}"""
val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
val jsonMicroBatch = mutable.Queue(jsonRdd)
val jsonDStream = ssc.queueStream(jsonMicroBatch)
EsSparkStreaming.saveJsonToEs(jsonDStream, "gudong20220718001/doc")
dynamic/multi-resources
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
val batch = spark.sparkContext.makeRDD(Seq(game, book, cd))
val microbatches = mutable.Queue(batch)
val dstream3 = ssc.queueStream(microbatches)
EsSparkStreaming.saveToEs(dstream3, "gudong20220715002-{media_type}/doc")
handling document metadata
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val airportsRDD = spark.sparkContext.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
val microbatches2 = mutable.Queue(airportsRDD)
val dstream4 = ssc.queueStream(microbatches2)
val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715002/doc")
EsSparkStreaming.saveToEsWithMeta(dstream4, conf_extend)
val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
val airportsRDD2 = spark.sparkContext.makeRDD(Seq((1, otp2), (2, muc2), (3, sfo2)))
val microbatches3 = mutable.Queue(airportsRDD2)
val dstream5 = ssc.queueStream(microbatches3)
EsSparkStreaming.saveToEsWithMeta(dstream5, conf_extend)
完整代码
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.streaming.EsSparkStreaming
import scala.collection.mutable
object Main {
case class Trip(departure: String, arrival: String)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("测试ES").setMaster("local[*]")
conf.set("es.nodes", "192.168.1.1,192.168.1.2").set("es.port", "9200")
val spark = SparkSession.builder().config(conf).getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
val mapMicroBatches = mutable.Queue(mapRdd)
val mapDStream = ssc.queueStream(mapMicroBatches)
EsSparkStreaming.saveToEs(mapDStream, "gudong20220718001/doc")
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
val cassClassMicroBatches = mutable.Queue(cassClassRdd)
val cassClassDStream = ssc.queueStream(cassClassMicroBatches)
EsSparkStreaming.saveToEs(cassClassDStream, "gudong20220718001/doc")
val json1 = """{"reason" : "business", "airport" : "SFO"}"""
val json2 = """{"participants" : 5, "airport" : "OTP"}"""
val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
val jsonMicroBatch = mutable.Queue(jsonRdd)
val jsonDStream = ssc.queueStream(jsonMicroBatch)
EsSparkStreaming.saveJsonToEs(jsonDStream, "gudong20220718001/doc")
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
val batch = spark.sparkContext.makeRDD(Seq(game, book, cd))
val microbatches = mutable.Queue(batch)
val dstream3 = ssc.queueStream(microbatches)
EsSparkStreaming.saveToEs(dstream3, "gudong20220715002-{media_type}/doc")
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val airportsRDD = spark.sparkContext.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
val microbatches2 = mutable.Queue(airportsRDD)
val dstream4 = ssc.queueStream(microbatches2)
val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715002/doc")
EsSparkStreaming.saveToEsWithMeta(dstream4, conf_extend)
val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
val airportsRDD2 = spark.sparkContext.makeRDD(Seq((1, otp2), (2, muc2), (3, sfo2)))
val microbatches3 = mutable.Queue(airportsRDD2)
val dstream5 = ssc.queueStream(microbatches3)
EsSparkStreaming.saveToEsWithMeta(dstream5, conf_extend)
ssc.start()
ssc.awaitTerminationOrTimeout(6000)
}
}
Spark SQL Support
在spark sql 1.3+ 之后的版本向ES写入数据可以采用以下方式
Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
val people = spark.sparkContext.textFile("F:\\people.txt")
.map(_.split(","))
.map(p => Person(p(0), null, p(1).trim.toInt))
.toDF()
EsSparkSQL.saveToEs(people, "gudong20220718001/doc")
数据源
在使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext 加载方法访问 Elasticsearch。 换句话说,以声明方式创建由 Elasticsearch 支持的 DataFrame/Dataset:
val sql = new SQLContext...
val df = sql.load(
"spark/index",
"org.elasticsearch.spark.sql")
- 数据源的
SQLContext 的load 方法 - 要加载的路径或资源 - 在本例中为 Elasticsearch 中的spark/index
- 数据源提供者 -
org.elasticsearch.spark.sql
In Spark 1.4, one would use the following similar API calls:
val df = sql.read
.format("org.elasticsearch.spark.sql")
.load("spark/index")
- 数据源的
SQLContext 的load 方法 - 数据源提供者 -
org.elasticsearch.spark.sql - 要加载的路径或资源 - 在本例中为 Elasticsearch 中的spark/index
In Spark 1.5, this can be further simplified to:
val df = sql.read.format("es")
.load("spark/index")
- 使用
es 作为别名而不是DataSource 提供程序的完整包名
无论使用什么 API,一旦创建,DataFrame 就可以自由访问以操作数据。
源声明还允许传入特定选项,即:
Name | Default value | Description |
---|
path | required | Elasticsearch index/type | pushdown | true | 是否将Spark SQL 转换 (push-down) 为 Elasticsearch Query DSL | strict | false | 是否使用精确(未分析)匹配或不(已分析) | Usable in Spark 1.6 or higher | | | double.filtering | true | 是否告诉 Spark 在pushed down的过滤器上应用自己的过滤 |
下一节将解释这两个选项。 要指定选项(包括通用的 elasticsearch-hadoop 选项),只需将 Map 传递给上述方法:
For example:
val sql = new SQLContext...
val options13 = Map("path" -> "spark/index",
"pushdown" -> "true",
"es.nodes" -> "someNode",
"es.port" -> "9200")
val spark13DF = sql.load("org.elasticsearch.spark.sql", options13)
val options = Map("pushdown" -> "true",
"es.nodes" -> "someNode",
"es.port" -> "9200")
val spark14DF = sql.read.format("org.elasticsearch.spark.sql")
.options(options)
.load("spark/index")
sqlContext.sql(
"CREATE TEMPORARY TABLE myIndex " +
"USING org.elasticsearch.spark.sql " +
"OPTIONS (resource 'spark/index', nodes 'someNode')" )
请注意,由于 SQL 解析器的原因,不允许使用 . (以及用于分隔的其他常用字符); 连接器尝试通过自动附加 es. 前缀来解决它,但这仅适用于仅使用一个 . 指定配置选项(如上面的 es.nodes )。 因此,如果需要具有多个 . 的属性,应该使用上面的 SQLContext.load 或 SQLContext.read 方法,并将属性作为 Map 传递。
Push-Down operations
使用 elasticsearch-hadoop 作为 Spark 源的一个重要隐藏特性是连接器了解在 DataFrame/SQL 中执行的操作,并且默认情况下会将它们转换为适当的 QueryDSL。 换句话说,连接器直接在源头pushes down操作,在那里数据被有效地过滤掉,以便只有所需的数据流回 Spark。 这显着提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为只返回所需的数据(与批量返回数据仅由 Spark 处理和丢弃相反)。 请注意,即使指定查询,下推操作也适用 - 连接器将根据指定的 SQL 对其进行增强。
附带说明一下,elasticsearch-hadoop 支持 Spark(1.3.0 及更高版本)中可用的所有“Filter”,同时保持与 Spark 1.3.0 的向后二进制兼容性,无需任何用户即可将 SQL 操作完全推送到 Elasticsearch 干涉。
已优化为下推过滤器的运算符:
SQL 语法 | ES 1.x/2.x 语法 | ES 5.x 语法 |
---|
= null , is_null | missing | must_not.exists | = (strict) | term | term | = (not strict) | match | match | > , < , >= , <= | range | range | is_not_null | exists | exists | in (strict) | terms | terms | in (not strict) | or.filters | bool.should | and | and.filters | bool.filter | or | or.filters | bool.should [bool.filter] | not | not.filter | bool.must_not | StringStartsWith | wildcard(arg*) | wildcard(arg*) | StringEndsWith | wildcard(*arg) | wildcard(*arg) | StringContains | wildcard(arg) | wildcard(arg) | EqualNullSafe (strict) | term | term | EqualNullSafe (not strict) | match | match |
也就是说,考虑以下 Spark SQL:
val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips")
df.printSchema()
val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))
或在纯 SQL 中:
CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips")
SELECT departure FROM trips WHERE arrival = "OTP" and days > 3
连接器将查询转换为:
{
"query" : {
"filtered" : {
"query" : {
"match_all" : {}
},
"filter" : {
"and" : [{
"query" : {
"match" : {
"arrival" : "OTP"
}
}
}, {
"days" : {
"gt" : 3
}
}
]
}
}
}
}
此外,下推过滤器可以处理已分析的术语(默认),或者可以配置为严格并提供完全匹配(仅适用于未分析的字段)。 除非手动指定映射,否则强烈建议保留默认值。 Elasticsearch 参考文档中详细讨论了这个和其他主题。
请注意,自 elasticsearch-hadoop 2.2 起适用于 Spark 1.6 或更高版本的double.filtering 允许已经pushes down到 Elasticsearch 的过滤器也由 Spark 处理/评估(默认)或不处理。 关闭此功能,尤其是在处理大数据时会加快速度。 但是,应该注意语义,因为关闭此功能可能会返回不同的结果(取决于数据的索引方式、analyzed 与not_analyzed )。 一般来说,当开启 strict 时,也可以禁用double.filtering 。
Data Sources as tables
从 Spark SQL 1.2 开始可用,还可以通过将数据源声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问数据源:
sqlContext.sql(
"CREATE TEMPORARY TABLE myIndex " +
"USING org.elasticsearch.spark.sql " +
"OPTIONS (resource 'spark/index', " +
"scroll_size '20')" )
-
myIndex is Spark’s temporary table name -
USING` clause identifying the data source provider, in this case `org.elasticsearch.spark.sql
-
elasticsearch-hadoop 配置选项,强制性的是resource 。 为方便起见,可以使用 es 前缀或跳过它。 -
由于使用 . 会导致语法异常,因此应将其替换为 _ 样式。 因此,在这个例子中,es.scroll.size 变成了 scroll_size (因为可以删除前导的 es )。 请注意,这仅适用于 Spark 1.3,因为 Spark 1.4 具有更严格的解析器。 有关详细信息,请参阅上面的章节。
定义后,模式会自动拾取。 因此,可以立即发出查询:
val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")
由于 elasticsearch-hadoop 知道正在进行的查询,它可以优化对 Elasticsearch 的请求。 例如,给定以下查询:
val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")
它知道只需要 name 和 id 字段(第一个返回给用户,第二个用于 Spark 的内部过滤),因此只会询问这些数据,从而使查询非常有效。
Reading DataFrame s (Spark SQL 1.3) from Elasticsearch
您可能已经猜到了,可以定义一个由 Elasticsearch 文档支持的 DataFrame。 或者更好的是,让它们得到查询结果的支持,从而有效地创建对数据的动态、实时视图。
val peopleEs = spark.sqlContext.esDF("gudong20220718001/doc")
println(peopleEs.printSchema)
peopleEs.show()
控制 DataFrame 架构在某些情况下,特别是当 Elasticsearch 中的索引包含很多字段时,最好创建一个仅包含其中一部分的 DataFrame 。 虽然可以通过官方 Spark API 或通过专用查询修改 DataFrame (通过处理其支持的 RDD ),但 elasticsearch-hadoop 允许用户在创建 DataFrame 时指定从 Elasticsearch 中包含和排除哪些字段。
通过 es.read.field.include 和 es.read.field.exclude 属性,可以指示从索引映射中包含或排除哪些字段。 语法类似于 Elasticsearch 包含/排除的语法。 可以使用逗号指定多个值。 默认情况下,未指定任何值,这意味着包含所有属性/字段,并且不排除任何属性/字段。
es.read.field.include = *name, address.*
es.read.field.exclude = *.created
Spark SQL Type conversion
elasticsearch-hadoop 自动将 Spark 内置类型转换为 Elasticsearch 类型(并返回),如下表所示:
虽然 Spark SQL 数据类型在 Scala 和 Java 中都具有等价类型,因此可以应用 RDD 转换,但语义略有不同 - 特别是 java.sql 类型,因为 Spark SQL 处理它们的方式:
Spark SQL DataType | Elasticsearch type |
---|
null | null | ByteType | byte | ShortType | short | IntegerType | int | LongType | long | FloatType | float | DoubleType | double | StringType | string | BinaryType | string (BASE64) | BooleanType | boolean | DateType | date (string format) | TimestampType | long (unix time) | ArrayType | array | MapType | object | StructType | object |
Geo Types Conversion Table 除了上表之外,对于 Spark SQL 1.3 或更高版本,elasticsearch-hadoop 对 geo 类型进行自动模式检测,即 Elasticsearch geo_point 和 geo_shape 。 由于每种类型都允许多种格式(geo_point 接受以 4 种不同方式指定纬度和经度,而 geo_shape 允许多种类型(目前为 9 种))并且映射不提供此类信息,elasticsearch-hadoop 将对确定的地理字段进行采样 在启动时检索包含所有相关字段的任意文档; 它将解析它,从而确定必要的模式(例如,它可以判断geo_point 是指定为 StringType 还是指定为 ArrayType )。
完整代码
people.txt 文件内容
Michael, 29
Andy, 30
Justin, 19
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql.{EsSparkSQL, sqlContextFunctions}
object Main {
case class Person(name: String, surname: String, age: Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("测试ES").setMaster("local[*]")
conf.set("es.nodes", "192.168.1.1,192.167.1.2")
.set("es.port", "9200")
.set("es.spark.dataframe.write.null", "true")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val people = spark.sparkContext.textFile("F:\\people.txt")
.map(_.split(","))
.map(p => Person(p(0), null, p(1).trim.toInt))
.toDF()
EsSparkSQL.saveToEs(people, "gudong20220718001/doc")
val peopleEs = spark.sqlContext.esDF("gudong20220718001/doc", "?q=age:30")
peopleEs.createTempView("PeopleTempTable")
val names = spark.sql("select name from PeopleTempTable where age>=20")
names.show()
}
}
Spark Structured Streaming support
使用前提:
- spark 2.2.0+
- elasticsearch 6.0+
作为 Spark 2.0 中的一项实验性功能,Spark Structured Streaming 提供了一个内置于 Spark SQL 集成中的统一流和批处理接口。 从 elasticsearch-hadoop 6.0 开始,我们提供了将流数据索引到 Elasticsearch 的原生功能。
与 Spark SQL 一样,结构化流处理也适用于结构化数据。 所有entries都应具有相同的结构(相同数量的字段、相同类型和名称)。 不支持使用非结构化数据(具有不同结构的文档),这会导致问题。 对于这种情况,请使用 DStreams。
Supported Spark Structured Streaming versions
从 Spark v2.2.0 开始,Spark Structured Streaming 被认为普遍可用。 因此,对结构化流的 elasticsearch-hadoop 支持(在 elasticsearch-hadoop 6.0+ 中可用)仅与 Spark 2.2.0 及更高版本兼容。 与之前的 Spark SQL 类似,Structured Streaming 在其接口被认为稳定之前可能会在版本之间发生重大变化。
Spark Structured Streaming 支持在org.elasticsearch.spark.sql 和org.elasticsearch.spark.sql.streaming 包下提供。 它以Dataset[_] api 的形式与 Spark SQL 共享一个统一的接口。 客户端可以以与常规**批处理Dataset 几乎完全相同的方式与流Dataset **交互,只有少数例外。
Writing Streaming Datasets (Spark SQL 2.0+) to Elasticsearch
val people = spark.readStream.textFile("F:\\people\\*")
.map(_.split(","))
.map(p => Person(p(0), null, p(1).trim.toInt))
people.writeStream
.outputMode(OutputMode.Append())
.option("checkpointLocation", "F:\\save\\location")
.option("es.mapping.id", "age")
.option("es.nodes", "192.168.1.1")
.option("es.port", "9200")
.format("es")
.option("es.resource", "gudong20220718001/doc")
.start()
.awaitTermination()
Writing existing JSON to Elasticsearch
使用 Spark SQL 时,如果输入数据是 JSON 格式,只需通过 DataStreamReader 的 json 格式将其转换为 Dataset(适用于 Spark SQL 2.0)(如 Spark 文档中所述)。
Spark Structured Streaming Type conversion
Spark SQL 1.3+ Conversion Table
Spark SQL DataType | Elasticsearch type |
---|
null | null | ByteType | byte | ShortType | short | IntegerType | int | LongType | long | FloatType | float | DoubleType | double | StringType | string | BinaryType | string (BASE64) | BooleanType | boolean | DateType | date (string format) | TimestampType | long (unix time) | ArrayType | array | MapType | object | StructType | object |
Sink commit log in Spark Structured Streaming
Spark Structured Streaming 宣传了一种端到端容错的一次性处理模型,该模型通过使用偏移检查点和维护每个流查询的提交日志来实现。 执行流式查询时,大多数源和接收器都要求您指定“检查点位置”以保持作业的状态。 如果发生中断,启动具有相同检查点位置的新流式查询将恢复作业的状态并从中断处继续。 我们在配置的检查点位置下的特殊目录中维护 elasticsearch-hadoop 的 Elasticsearch sink 实现的提交日志:
$> ls /path/to/checkpoint/location
metadata offsets/ sinks/
$> ls /path/to/checkpoint/location/sinks
elasticsearch/
$> ls /path/to/checkpoint/location/sinks/elasticsearch
12.compact 13 14 15 16 17 18
提交日志目录中的每个文件都对应一个已提交的批处理 id。 日志实现会定期压缩日志以避免混乱。 您可以通过多种方式设置日志目录的位置:
- 使用
es.spark.sql.streaming.sink.log.path 设置显式日志位置(见下文)。 - 如果未设置,则将使用
checkpointLocation 指定的路径。 - 如果未设置,则将通过将 SparkSession 中的
spark.sql.streaming.checkpointLocation 的值与数据集的给定查询名称相结合来构造路径。 - 如果不存在查询名称,则在上述情况下将使用随机 UUID 而不是查询名称
- 如果上述设置均未提供,则
start 调用将引发异常
以下是影响 Elasticsearch 提交日志行为的配置列表:
es.spark.sql.streaming.sink.log.enabled (default true )
启用或禁用流作业的提交日志。 默认情况下启用日志,并且将跳过具有相同批次 id 的输出批次,以避免重复写入。 当它设置为 false 时,提交日志被禁用,所有输出都将发送到 Elasticsearch,无论它们是否在之前的执行中发送。
es.spark.sql.streaming.sink.log.path
设置存储此流式查询的日志数据的位置。 如果未设置此值,则 Elasticsearch 接收器会将其提交日志存储在 checkpointLocation 中给出的路径下。 任何与 HDFS 客户端兼容的 URI 都是可接受的。
es.spark.sql.streaming.sink.log.cleanupDelay (default 10m )
提交日志通过 Spark 的 HDFS 客户端进行管理。 一些与 HDFS 兼容的文件系统(如 Amazon 的 S3)以异步方式传播文件更改。 为了解决这个问题,在一组日志文件被压缩后,客户端将在清理旧文件之前等待这段时间。
es.spark.sql.streaming.sink.log.deletion (default true )
确定日志是否应删除不再需要的旧日志。 在每批提交后,客户端将检查是否有任何提交日志已被压缩并且可以安全地删除。 如果设置为false ,则日志将跳过此清理步骤,为每个批次留下一个提交文件。
es.spark.sql.streaming.sink.log.compactInterval (default 10 )
设置在压缩日志文件之前要处理的批次数。 默认情况下,每 10 个批次,提交日志将被压缩到一个包含所有先前提交的批次 ID 的文件中。
完整代码
在F:/people 目录中方法存放了一个people.txt文件,与上例相同。
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.elasticsearch.spark.sql.{sqlContextFunctions}
object Main {
case class Person(name: String, surname: String, age: Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("测试ES").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val people = spark.readStream.textFile("F:\\people\\*")
.map(_.split(","))
.map(p => Person(p(0), null, p(1).trim.toInt))
people.writeStream
.outputMode(OutputMode.Append())
.option("checkpointLocation", "F:\\save\\location")
.option("es.mapping.id", "age")
.option("es.nodes", "192.168.1.1")
.option("es.port", "9200")
.format("es")
.option("es.resource", "gudong20220718001/doc")
.start()
.awaitTermination()
}
}
其他配置说明
ES-Hadoop中的配置项说明
|