IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【ES实战】ES-Hadoop之Spark ES支持 -> 正文阅读

[大数据]【ES实战】ES-Hadoop之Spark ES支持

Spark写入ES支持

本示例采用Spark2.3.3版本


其他软件版本

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>localsparkdemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.major.minor.version>2.11</scala.major.minor.version>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.3</spark.version>
        <fastjson.version>1.2.76</fastjson.version>
        <httpclient.version>4.5.13</httpclient.version>
        <lombok.version>1.18.6</lombok.version>
    </properties>


    <dependencies>
        <!-- spark hive hadoop的相关包在生产环境都有,除了这些一般打包就要打到自己jar里面-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.major.minor.version}</artifactId>
            <version>${spark.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.major.minor.version}</artifactId>
            <version>${spark.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.major.minor.version}</artifactId>
            <version>${spark.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.major.minor.version}</artifactId>
            <version>${spark.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1.1-jre</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.7</version>
        </dependency>

        <!-- 阿里JSON解析器 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- http工具包-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>${httpclient.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Elasticsearch提供了官方的支持包,本文采用的是5.4.2版本的es

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>5.4.2</version>
        </dependency>

Spark 连接器框架对版本不兼容最敏感。 为方便起见,下面提供了版本兼容性矩阵:

Spark VersionScala VersionES-Hadoop Artifact ID
1.0 - 1.22.10<unsupported>
1.0 - 1.22.11<unsupported>
1.3 - 1.62.10elasticsearch-spark-13_2.10
1.3 - 1.62.11elasticsearch-spark-13_2.11
2.0+2.10elasticsearch-spark-20_2.10
2.0+2.11elasticsearch-spark-20_2.11

version中填写的是ES的版本号

写入数据类型

Native RDD support

Map

    // 1.data of type map
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
    val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
    EsSpark.saveToEs(mapRdd, "gudong20220715001/doc")

cass class

	  // define a case class called Trip
	  case class Trip(departure: String, arrival: String)
	  
	...

    // 2.cass class of type map
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")
    val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    EsSpark.saveToEs(cassClassRdd, "gudong20220715001/doc")

JSON

	// 3.JSON type data
    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""
    val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
    EsSpark.saveJsonToEs(jsonRdd, "gudong20220715001/doc")

dynamic/multi-resources

动态支持需要将action.auto_create_index配置成true

PUT _cluster/settings
{
    "persistent": {
        "action.auto_create_index": "true" 
    }
}
    // 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
    val rdd = spark.sparkContext.makeRDD(Seq(game, book, cd))
    // 会以各个Map中参数media_type中的值,创建索引,写入数据
    EsSpark.saveToEs(rdd, "gudong20220715001-{media_type}/doc")

handling document metadata

支持的元数据在枚举类org.elasticsearch.spark.rdd.Metadata

public enum Metadata {
    ID,
    PARENT,
    ROUTING,
    TTL,
    TIMESTAMP,
    VERSION,
    VERSION_TYPE;

    private Metadata() {
    }
}
 // 5.handling document metadata
    import org.elasticsearch.spark.rdd.Metadata._
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
    val otpMeta = Map(ID -> 1, TTL -> "3h")
    val mucMeta = Map(ID -> 2)
    val sfoMeta = Map(ID -> 3)
    val metadataRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
    val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715001/doc")
    EsSpark.saveToEsWithMeta(metadataRdd, conf_extend)

    // ID type for handling document metadata, also set ”Map("es.mapping.id" -> "iata")“ for upset data
    val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
    val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
    val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")

    val onlyIdRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp2), (mucMeta, muc2), (sfoMeta, sfo2)))
    EsSpark.saveToEsWithMeta(onlyIdRdd, conf_extend)

完整代码

package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark

object Main {

  // define a case class called Trip
  case class Trip(departure: String, arrival: String)

  def main(args: Array[String]): Unit = {
    // config for support es
    // es.index.auto.create 当索引不存在的时候,进行自动创建
    // es.nodes es节点的IP,可以使用协调节点和数据节点 es.port是集群的http端口
    val conf = new SparkConf().setAppName("测试ES").setMaster("local")
    conf.set("es.nodes", "192.168.1.1,192.168.1.2")
      .set("es.port", "9200")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // write date to es
    // 1.data of type map
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
    val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
    EsSpark.saveToEs(mapRdd, "gudong20220715001/doc")

    // 2.cass class of type map
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")
    val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    EsSpark.saveToEs(cassClassRdd, "gudong20220715001/doc")

    // 3.JSON type data
    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""
    val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
    EsSpark.saveJsonToEs(jsonRdd, "gudong20220715001/doc")

    // 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
    val rdd = spark.sparkContext.makeRDD(Seq(game, book, cd))
    // 会以各个Map中参数media_type中的值,创建索引,写入数据
    EsSpark.saveToEs(rdd, "gudong20220715001-{media_type}/doc")

    // 5.handling document metadata
    import org.elasticsearch.spark.rdd.Metadata._
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
    val otpMeta = Map(ID -> 1, TTL -> "3h")
    val mucMeta = Map(ID -> 2)
    val sfoMeta = Map(ID -> 3)
    val metadataRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
    val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715001/doc")
    EsSpark.saveToEsWithMeta(metadataRdd, conf_extend)

    // ID type for handling document metadata, also set ”Map("es.mapping.id" -> "iata")“ for upset data
    val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
    val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
    val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")

    val onlyIdRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp2), (mucMeta, muc2), (sfoMeta, sfo2)))
    EsSpark.saveToEsWithMeta(onlyIdRdd, conf_extend)

    spark.stop()
  }
}

Spark Streaming support

在Spark Steaming中,由于与 RDD 不同,由于 DStream 的连续性,您无法使用 DStream 从 Elasticsearch 中读取数据。

Map

	// 1.data of type map
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

    val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
    val mapMicroBatches = mutable.Queue(mapRdd)
    // 另一种写法 ssc.queueStream(microbatches).saveToEs("gudong20220715002/doc")
    val mapDStream = ssc.queueStream(mapMicroBatches)
    EsSparkStreaming.saveToEs(mapDStream, "gudong20220718001/doc")

cass class

    // 2.cass class of type map
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")
    val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    val cassClassMicroBatches = mutable.Queue(cassClassRdd)
    val cassClassDStream = ssc.queueStream(cassClassMicroBatches)
    EsSparkStreaming.saveToEs(cassClassDStream, "gudong20220718001/doc")

JSON

    // 3.JSON type data
    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""

    val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
    val jsonMicroBatch = mutable.Queue(jsonRdd)
    val jsonDStream = ssc.queueStream(jsonMicroBatch)
    EsSparkStreaming.saveJsonToEs(jsonDStream, "gudong20220718001/doc")

dynamic/multi-resources

    // 4.Writing to dynamic/multi-resources 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")

    val batch = spark.sparkContext.makeRDD(Seq(game, book, cd))
    val microbatches = mutable.Queue(batch)
    val dstream3 = ssc.queueStream(microbatches)
    EsSparkStreaming.saveToEs(dstream3, "gudong20220715002-{media_type}/doc")

handling document metadata

    // 5.Handling document metadata
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

    val airportsRDD = spark.sparkContext.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
    val microbatches2 = mutable.Queue(airportsRDD)
    val dstream4 = ssc.queueStream(microbatches2)
    val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715002/doc")
    EsSparkStreaming.saveToEsWithMeta(dstream4, conf_extend)

    val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
    val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
    val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
    val airportsRDD2 = spark.sparkContext.makeRDD(Seq((1, otp2), (2, muc2), (3, sfo2)))

    val microbatches3 = mutable.Queue(airportsRDD2)
    val dstream5 = ssc.queueStream(microbatches3)
    EsSparkStreaming.saveToEsWithMeta(dstream5, conf_extend)

完整代码

package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.streaming.EsSparkStreaming

import scala.collection.mutable

object Main {

  // define a case class called Trip
  case class Trip(departure: String, arrival: String)

  def main(args: Array[String]): Unit = {
    // config for support es
    // es.index.auto.create 当索引不存在的时候,进行自动创建
    // es.nodes es节点的IP,可以使用协调节点和数据节点 es.port端口
    val conf = new SparkConf().setAppName("测试ES").setMaster("local[*]")
    conf.set("es.nodes", "192.168.1.1,192.168.1.2").set("es.port", "9200")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // Writing DStream to Elasticsearch
    // Though, unlike RDDs, you are unable to read data out of Elasticsearch using a DStream due to the continuous nature of it.

    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    // 1.data of type map
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

    val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
    val mapMicroBatches = mutable.Queue(mapRdd)
    // 另一种写法 ssc.queueStream(microbatches).saveToEs("gudong20220715002/doc")
    val mapDStream = ssc.queueStream(mapMicroBatches)
    EsSparkStreaming.saveToEs(mapDStream, "gudong20220718001/doc")

    // 2.cass class of type map
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")
    val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    val cassClassMicroBatches = mutable.Queue(cassClassRdd)
    val cassClassDStream = ssc.queueStream(cassClassMicroBatches)
    EsSparkStreaming.saveToEs(cassClassDStream, "gudong20220718001/doc")

    // 3.JSON type data
    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""

    val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
    val jsonMicroBatch = mutable.Queue(jsonRdd)
    val jsonDStream = ssc.queueStream(jsonMicroBatch)
    EsSparkStreaming.saveJsonToEs(jsonDStream, "gudong20220718001/doc")

    // 4.Writing to dynamic/multi-resources 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")

    val batch = spark.sparkContext.makeRDD(Seq(game, book, cd))
    val microbatches = mutable.Queue(batch)
    val dstream3 = ssc.queueStream(microbatches)
    EsSparkStreaming.saveToEs(dstream3, "gudong20220715002-{media_type}/doc")

    // 5 .Handling document metadata
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

    val airportsRDD = spark.sparkContext.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
    val microbatches2 = mutable.Queue(airportsRDD)
    val dstream4 = ssc.queueStream(microbatches2)
    val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715002/doc")
    EsSparkStreaming.saveToEsWithMeta(dstream4, conf_extend)

    val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
    val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
    val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
    val airportsRDD2 = spark.sparkContext.makeRDD(Seq((1, otp2), (2, muc2), (3, sfo2)))

    val microbatches3 = mutable.Queue(airportsRDD2)
    val dstream5 = ssc.queueStream(microbatches3)
    EsSparkStreaming.saveToEsWithMeta(dstream5, conf_extend)

    // 流的处理应该是实时的,这边只是为了本地测试,所以设置了等待超时关闭
    ssc.start()
    ssc.awaitTerminationOrTimeout(6000)

  }
}

Spark SQL Support

spark sql 1.3+之后的版本向ES写入数据可以采用以下方式

Writing DataFrame (Spark SQL 1.3+) to Elasticsearch

// Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
val people = spark.sparkContext.textFile("F:\\people.txt")
  .map(_.split(","))
  .map(p => Person(p(0), null, p(1).trim.toInt))
  .toDF()
EsSparkSQL.saveToEs(people, "gudong20220718001/doc")

数据源

在使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext 加载方法访问 Elasticsearch。 换句话说,以声明方式创建由 Elasticsearch 支持的 DataFrame/Dataset:

val sql = new SQLContext...
// Spark 1.3 style
val df = sql.load( 
  "spark/index",   
  "org.elasticsearch.spark.sql") 
  • 数据源的SQLContextload方法
  • 要加载的路径或资源 - 在本例中为 Elasticsearch 中的spark/index
  • 数据源提供者 - org.elasticsearch.spark.sql

In Spark 1.4, one would use the following similar API calls:

// Spark 1.4 style
val df = sql.read      
  .format("org.elasticsearch.spark.sql") 
  .load("spark/index") 
  • 数据源的SQLContextload方法
  • 数据源提供者 - org.elasticsearch.spark.sql
  • 要加载的路径或资源 - 在本例中为 Elasticsearch 中的spark/index

In Spark 1.5, this can be further simplified to:

// Spark 1.5 style
val df = sql.read.format("es")
  .load("spark/index")
  • 使用 es 作为别名而不是DataSource 提供程序的完整包名

无论使用什么 API,一旦创建,DataFrame 就可以自由访问以操作数据。

源声明还允许传入特定选项,即:

NameDefault valueDescription
pathrequiredElasticsearch index/type
pushdowntrue是否将Spark SQL 转换 (push-down) 为 Elasticsearch Query DSL
strictfalse是否使用精确(未分析)匹配或不(已分析)
Usable in Spark 1.6 or higher
double.filteringtrue是否告诉 Spark 在pushed down的过滤器上应用自己的过滤

下一节将解释这两个选项。 要指定选项(包括通用的 elasticsearch-hadoop 选项),只需将 Map 传递给上述方法:

For example:

val sql = new SQLContext...
// options for Spark 1.3 need to include the target path/resource
val options13 = Map("path" -> "spark/index",
                    "pushdown" -> "true",     
                    "es.nodes" -> "someNode", 
                     "es.port" -> "9200")

// Spark 1.3 style
val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) 

// options for Spark 1.4 - the path/resource is specified separately
val options = Map("pushdown" -> "true",     
                  "es.nodes" -> "someNode", 
                   "es.port" -> "9200")

// Spark 1.4 style
val spark14DF = sql.read.format("org.elasticsearch.spark.sql")
                        .options(options) 
                        .load("spark/index")
sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', nodes 'someNode')" ) 

请注意,由于 SQL 解析器的原因,不允许使用 .(以及用于分隔的其他常用字符); 连接器尝试通过自动附加 es. 前缀来解决它,但这仅适用于仅使用一个 . 指定配置选项(如上面的 es.nodes)。 因此,如果需要具有多个 . 的属性,应该使用上面的 SQLContext.loadSQLContext.read 方法,并将属性作为 Map 传递。

Push-Down operations

使用 elasticsearch-hadoop 作为 Spark 源的一个重要隐藏特性是连接器了解在 DataFrame/SQL 中执行的操作,并且默认情况下会将它们转换为适当的 QueryDSL。 换句话说,连接器直接在源头pushes down操作,在那里数据被有效地过滤掉,以便只有所需的数据流回 Spark。 这显着提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为只返回所需的数据(与批量返回数据仅由 Spark 处理和丢弃相反)。 请注意,即使指定查询,下推操作也适用 - 连接器将根据指定的 SQL 对其进行增强。

附带说明一下,elasticsearch-hadoop 支持 Spark(1.3.0 及更高版本)中可用的所有“Filter”,同时保持与 Spark 1.3.0 的向后二进制兼容性,无需任何用户即可将 SQL 操作完全推送到 Elasticsearch 干涉。

已优化为下推过滤器的运算符:

SQL 语法ES 1.x/2.x 语法ES 5.x 语法
= null , is_nullmissingmust_not.exists
= (strict)termterm
= (not strict)matchmatch
> , < , >= , <=rangerange
is_not_nullexistsexists
in (strict)termsterms
in (not strict)or.filtersbool.should
andand.filtersbool.filter
oror.filtersbool.should [bool.filter]
notnot.filterbool.must_not
StringStartsWithwildcard(arg*)wildcard(arg*)
StringEndsWithwildcard(*arg)wildcard(*arg)
StringContainswildcard(arg)wildcard(arg)
EqualNullSafe (strict)termterm
EqualNullSafe (not strict)matchmatch

也就是说,考虑以下 Spark SQL:

// as a DataFrame
val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips")

df.printSchema()
// root
//|-- departure: string (nullable = true)
//|-- arrival: string (nullable = true)
//|-- days: long (nullable = true)

val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))

或在纯 SQL 中:

CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips")
SELECT departure FROM trips WHERE arrival = "OTP" and days > 3

连接器将查询转换为:

{
  "query" : {
    "filtered" : {
      "query" : {
        "match_all" : {}

      },
      "filter" : {
        "and" : [{
            "query" : {
              "match" : {
                "arrival" : "OTP"
              }
            }
          }, {
            "days" : {
              "gt" : 3
            }
          }
        ]
      }
    }
  }
}

此外,下推过滤器可以处理已分析的术语(默认),或者可以配置为严格并提供完全匹配(仅适用于未分析的字段)。 除非手动指定映射,否则强烈建议保留默认值。 Elasticsearch 参考文档中详细讨论了这个和其他主题。

请注意,自 elasticsearch-hadoop 2.2 起适用于 Spark 1.6 或更高版本的double.filtering允许已经pushes down到 Elasticsearch 的过滤器也由 Spark 处理/评估(默认)或不处理。 关闭此功能,尤其是在处理大数据时会加快速度。 但是,应该注意语义,因为关闭此功能可能会返回不同的结果(取决于数据的索引方式、analyzednot_analyzed)。 一般来说,当开启 strict 时,也可以禁用double.filtering

Data Sources as tables

从 Spark SQL 1.2 开始可用,还可以通过将数据源声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问数据源:

sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', " + 
            "scroll_size '20')" ) 
  • myIndex is Spark’s temporary table name

  • USING` clause identifying the data source provider, in this case `org.elasticsearch.spark.sql
    
  • elasticsearch-hadoop 配置选项,强制性的是resource。 为方便起见,可以使用 es 前缀或跳过它。

  • 由于使用 . 会导致语法异常,因此应将其替换为 _ 样式。 因此,在这个例子中,es.scroll.size 变成了 scroll_size(因为可以删除前导的 es)。 请注意,这仅适用于 Spark 1.3,因为 Spark 1.4 具有更严格的解析器。 有关详细信息,请参阅上面的章节。

定义后,模式会自动拾取。 因此,可以立即发出查询:

val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")

由于 elasticsearch-hadoop 知道正在进行的查询,它可以优化对 Elasticsearch 的请求。 例如,给定以下查询:

val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")

它知道只需要 name 和 id 字段(第一个返回给用户,第二个用于 Spark 的内部过滤),因此只会询问这些数据,从而使查询非常有效。

Reading DataFrames (Spark SQL 1.3) from Elasticsearch

您可能已经猜到了,可以定义一个由 Elasticsearch 文档支持的 DataFrame。 或者更好的是,让它们得到查询结果的支持,从而有效地创建对数据的动态、实时视图。

	// 可以使用查询条件
	// val peopleEs = spark.sqlContext.esDF("gudong20220718001/doc","?q=age:39")
	val peopleEs = spark.sqlContext.esDF("gudong20220718001/doc")
    println(peopleEs.printSchema)
    //	root
    // 	|-- age: long (nullable = true)
    // 	|-- name: string (nullable = true)
    peopleEs.show()
    //	+---+-------+
    //	|age|   name|
    //	+---+-------+
    //	| 30|   Andy|
    //	| 29|Michael|
    //	| 19| Justin|
    //	+---+-------+

控制 DataFrame 架构在某些情况下,特别是当 Elasticsearch 中的索引包含很多字段时,最好创建一个仅包含其中一部分的 DataFrame。 虽然可以通过官方 Spark API 或通过专用查询修改 DataFrame (通过处理其支持的 RDD ),但 elasticsearch-hadoop 允许用户在创建 DataFrame 时指定从 Elasticsearch 中包含和排除哪些字段。

通过 es.read.field.include es.read.field.exclude 属性,可以指示从索引映射中包含或排除哪些字段。 语法类似于 Elasticsearch 包含/排除的语法。 可以使用逗号指定多个值。 默认情况下,未指定任何值,这意味着包含所有属性/字段,并且不排除任何属性/字段。

# include
es.read.field.include = *name, address.*
# exclude
es.read.field.exclude = *.created

Spark SQL Type conversion

elasticsearch-hadoop 自动将 Spark 内置类型转换为 Elasticsearch 类型(并返回),如下表所示:

虽然 Spark SQL 数据类型在 Scala 和 Java 中都具有等价类型,因此可以应用 RDD 转换,但语义略有不同 - 特别是 java.sql 类型,因为 Spark SQL 处理它们的方式:

Spark SQL DataTypeElasticsearch type
nullnull
ByteTypebyte
ShortTypeshort
IntegerTypeint
LongTypelong
FloatTypefloat
DoubleTypedouble
StringTypestring
BinaryTypestring (BASE64)
BooleanTypeboolean
DateTypedate (string format)
TimestampTypelong (unix time)
ArrayTypearray
MapTypeobject
StructTypeobject

Geo Types Conversion Table 除了上表之外,对于 Spark SQL 1.3 或更高版本,elasticsearch-hadoop 对 geo 类型进行自动模式检测,即 Elasticsearch geo_pointgeo_shape。 由于每种类型都允许多种格式(geo_point 接受以 4 种不同方式指定纬度和经度,而 geo_shape 允许多种类型(目前为 9 种))并且映射不提供此类信息,elasticsearch-hadoop 将对确定的地理字段进行采样 在启动时检索包含所有相关字段的任意文档; 它将解析它,从而确定必要的模式(例如,它可以判断geo_point是指定为 StringType 还是指定为 ArrayType)。

完整代码

people.txt文件内容

Michael, 29
Andy, 30
Justin, 19
package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql.{EsSparkSQL, sqlContextFunctions}

object Main {

  // case class used to define the DataFrame
  case class Person(name: String, surname: String, age: Int)

  def main(args: Array[String]): Unit = {
    // config for support es
    // es.index.auto.create 当索引不存在的时候,进行自动创建
    // es.nodes es节点的IP,可以使用协调节点和数据节点 es.port端口
    // 默认情况下,elasticsearch-hadoop 将忽略空值,而根本不写入任何字段。
    // 由于 DataFrame 旨在被视为结构化表格数据,因此您只能通过将 es.spark.dataframe.write.null 设置切换为 true 来启用将空值写入 DataFrame 对象的空值字段。
    val conf = new SparkConf().setAppName("测试ES").setMaster("local[*]")
    conf.set("es.nodes", "192.168.1.1,192.167.1.2")
      .set("es.port", "9200")
      .set("es.spark.dataframe.write.null", "true")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    // Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
    val people = spark.sparkContext.textFile("F:\\people.txt")
      .map(_.split(","))
      .map(p => Person(p(0), null, p(1).trim.toInt))
      .toDF()
    EsSparkSQL.saveToEs(people, "gudong20220718001/doc")

    // Using pure SQL to read from Elasticsearch
    // val peopleEs = spark.sqlContext.esDF("gudong20220718001/doc")
    val peopleEs = spark.sqlContext.esDF("gudong20220718001/doc", "?q=age:30")
    // create temp table
    peopleEs.createTempView("PeopleTempTable")
    // use spark sql syntax
    val names = spark.sql("select name from PeopleTempTable where age>=20")
    names.show()
  }
}

Spark Structured Streaming support

使用前提:

  1. spark 2.2.0+
  2. elasticsearch 6.0+

作为 Spark 2.0 中的一项实验性功能,Spark Structured Streaming 提供了一个内置于 Spark SQL 集成中的统一流和批处理接口。 从 elasticsearch-hadoop 6.0 开始,我们提供了将流数据索引到 Elasticsearch 的原生功能。

与 Spark SQL 一样,结构化流处理也适用于结构化数据。 所有entries都应具有相同的结构(相同数量的字段、相同类型和名称)。 不支持使用非结构化数据(具有不同结构的文档),这会导致问题。 对于这种情况,请使用 DStreams。

Supported Spark Structured Streaming versions

从 Spark v2.2.0 开始,Spark Structured Streaming 被认为普遍可用。 因此,对结构化流的 elasticsearch-hadoop 支持(在 elasticsearch-hadoop 6.0+ 中可用)仅与 Spark 2.2.0 及更高版本兼容。 与之前的 Spark SQL 类似,Structured Streaming 在其接口被认为稳定之前可能会在版本之间发生重大变化。

Spark Structured Streaming 支持在org.elasticsearch.spark.sqlorg.elasticsearch.spark.sql.streaming包下提供。 它以Dataset[_] api的形式与 Spark SQL 共享一个统一的接口。 客户端可以以与常规**批处理Dataset几乎完全相同的方式与Dataset**交互,只有少数例外。

Writing Streaming Datasets (Spark SQL 2.0+) to Elasticsearch

    // Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
    val people = spark.readStream.textFile("F:\\people\\*")
      .map(_.split(","))
      .map(p => Person(p(0), null, p(1).trim.toInt))
    people.writeStream
      .outputMode(OutputMode.Append())
      .option("checkpointLocation", "F:\\save\\location")
      .option("es.mapping.id", "age")
      .option("es.nodes", "192.168.1.1")
      .option("es.port", "9200")
      .format("es")
      .option("es.resource", "gudong20220718001/doc")
      .start()
      .awaitTermination()

Writing existing JSON to Elasticsearch

使用 Spark SQL 时,如果输入数据是 JSON 格式,只需通过 DataStreamReader 的 json 格式将其转换为 Dataset(适用于 Spark SQL 2.0)(如 Spark 文档中所述)。

Spark Structured Streaming Type conversion

Spark SQL 1.3+ Conversion Table

Spark SQL DataTypeElasticsearch type
nullnull
ByteTypebyte
ShortTypeshort
IntegerTypeint
LongTypelong
FloatTypefloat
DoubleTypedouble
StringTypestring
BinaryTypestring (BASE64)
BooleanTypeboolean
DateTypedate (string format)
TimestampTypelong (unix time)
ArrayTypearray
MapTypeobject
StructTypeobject

Sink commit log in Spark Structured Streaming

Spark Structured Streaming 宣传了一种端到端容错的一次性处理模型,该模型通过使用偏移检查点和维护每个流查询的提交日志来实现。 执行流式查询时,大多数源和接收器都要求您指定“检查点位置”以保持作业的状态。 如果发生中断,启动具有相同检查点位置的新流式查询将恢复作业的状态并从中断处继续。 我们在配置的检查点位置下的特殊目录中维护 elasticsearch-hadoop 的 Elasticsearch sink 实现的提交日志:

$> ls /path/to/checkpoint/location
metadata  offsets/  sinks/
$> ls /path/to/checkpoint/location/sinks
elasticsearch/
$> ls /path/to/checkpoint/location/sinks/elasticsearch
12.compact  13  14  15 16  17  18

提交日志目录中的每个文件都对应一个已提交的批处理 id。 日志实现会定期压缩日志以避免混乱。 您可以通过多种方式设置日志目录的位置:

  1. 使用 es.spark.sql.streaming.sink.log.path 设置显式日志位置(见下文)。
  2. 如果未设置,则将使用 checkpointLocation 指定的路径。
  3. 如果未设置,则将通过将 SparkSession 中的 spark.sql.streaming.checkpointLocation 的值与数据集的给定查询名称相结合来构造路径。
  4. 如果不存在查询名称,则在上述情况下将使用随机 UUID 而不是查询名称
  5. 如果上述设置均未提供,则 start 调用将引发异常

以下是影响 Elasticsearch 提交日志行为的配置列表:

es.spark.sql.streaming.sink.log.enabled (default true)

启用或禁用流作业的提交日志。 默认情况下启用日志,并且将跳过具有相同批次 id 的输出批次,以避免重复写入。 当它设置为 false 时,提交日志被禁用,所有输出都将发送到 Elasticsearch,无论它们是否在之前的执行中发送。

es.spark.sql.streaming.sink.log.path

设置存储此流式查询的日志数据的位置。 如果未设置此值,则 Elasticsearch 接收器会将其提交日志存储在 checkpointLocation 中给出的路径下。 任何与 HDFS 客户端兼容的 URI 都是可接受的。

es.spark.sql.streaming.sink.log.cleanupDelay (default 10m)

提交日志通过 Spark 的 HDFS 客户端进行管理。 一些与 HDFS 兼容的文件系统(如 Amazon 的 S3)以异步方式传播文件更改。 为了解决这个问题,在一组日志文件被压缩后,客户端将在清理旧文件之前等待这段时间。

es.spark.sql.streaming.sink.log.deletion (default true)

确定日志是否应删除不再需要的旧日志。 在每批提交后,客户端将检查是否有任何提交日志已被压缩并且可以安全地删除。 如果设置为false,则日志将跳过此清理步骤,为每个批次留下一个提交文件。

es.spark.sql.streaming.sink.log.compactInterval (default 10)

设置在压缩日志文件之前要处理的批次数。 默认情况下,每 10 个批次,提交日志将被压缩到一个包含所有先前提交的批次 ID 的文件中。

完整代码

F:/people目录中方法存放了一个people.txt文件,与上例相同。

package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.elasticsearch.spark.sql.{sqlContextFunctions}

object Main {

  // case class used to define the DataFrame
  case class Person(name: String, surname: String, age: Int)

  def main(args: Array[String]): Unit = {
    // config for support es
    // es.index.auto.create 当索引不存在的时候,进行自动创建
    // es.nodes es节点的IP,可以使用协调节点和数据节点 es.port端口
    // 默认情况下,elasticsearch-hadoop 将忽略空值,而根本不写入任何字段。
    // 由于 DataFrame 旨在被视为结构化表格数据,因此您只能通过将 es.spark.dataframe.write.null 设置切换为 true 来启用将空值写入 DataFrame 对象的空值字段。
    val conf = new SparkConf().setAppName("测试ES").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    // Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
    val people = spark.readStream.textFile("F:\\people\\*")
      .map(_.split(","))
      .map(p => Person(p(0), null, p(1).trim.toInt))
    people.writeStream
      .outputMode(OutputMode.Append())
      .option("checkpointLocation", "F:\\save\\location")
      .option("es.mapping.id", "age")
      .option("es.nodes", "192.168.1.1")
      .option("es.port", "9200")
      .format("es")
      .option("es.resource", "gudong20220718001/doc")
      .start()
      .awaitTermination()

  }
}

其他配置说明

ES-Hadoop中的配置项说明

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-20 18:56:33  更:2022-07-20 18:59:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/20 1:06:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码