IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark 写入 es -> 正文阅读

[大数据]spark 写入 es

spark 2.4

es 7.10.2

Scala2.11??

<dependency>
? ? ? ? ? ? <groupId>org.elasticsearch</groupId>
? ? ? ? ? ? <artifactId>elasticsearch-hadoop</artifactId>
? ? ? ? ? ? <version>7.10.2</version>
? ? ? ? </dependency>

? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.elasticsearch</groupId>
? ? ? ? ? ? <artifactId>elasticsearch-spark-20_2.11</artifactId>
? ? ? ? ? ? <version>7.10.0</version>
? ? ? ? </dependency>

? ? ? ? <dependency>
? ? ? ? ? ? <groupId>io.searchbox</groupId>
? ? ? ? ? ? <artifactId>jest</artifactId>
? ? ? ? ? ? <version>6.3.1</version>
? ? ? ? </dependency>

? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.projectlombok</groupId>
? ? ? ? ? ? <artifactId>lombok</artifactId>
? ? ? ? ? ? <version>1.16.10</version>
? ? ? ? </dependency>

? ? ? ? <dependency>
? ? ? ? ? ? <groupId>commons-logging</groupId>
? ? ? ? ? ? <artifactId>commons-logging</artifactId>
? ? ? ? ? ? <version>1.1.1</version>
? ? ? ? </dependency>

? ? ? ? <dependency>
? ? ? ? ? ? <groupId>commons-codec</groupId>
? ? ? ? ? ? <artifactId>commons-codec</artifactId>
? ? ? ? ? ? <version>1.4</version>
? ? ? ? </dependency>

? ? ? ? <dependency>
? ? ? ? ? ? <groupId>commons-httpclient</groupId>
? ? ? ? ? ? <artifactId>commons-httpclient</artifactId>
? ? ? ? ? ? <version>3.0.1</version>
? ? ? ? </dependency>

================

package com

import com.constant.PropConstants
import com.javaUtil.PropertieUtil
import io.searchbox.client.JestClient
import io.searchbox.core.{Bulk, BulkResult, Index}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Properties}

/**

?*/
object center_materiel_hotspot2es {

? def main(args: Array[String]): Unit = {
? ? val log: Logger = Logger.getRootLogger
? ? //读取集群配置文件
? ? val prop: Properties = PropertieUtil.load("config.properties")
? ? //本地测试读文件
// ? ?val prop: Properties = PropertieUtil.getProperties("/config.properties")

? ? //读hive 的Kerberos认证
? ? System.setProperty("java.security.krb5.conf", prop.getProperty(PropConstants.KRB5_CONF_PATH))
? ? System.setProperty("HADOOP_USER_NAME", prop.getProperty(PropConstants.HADOOP_USER_NAME))
? ? System.setProperty("user.name", prop.getProperty(PropConstants.USER_NAME))
? ? UserGroupInformation.loginUserFromKeytab(
? ? ? prop.getProperty(PropConstants.KEYTAB_NAME), prop.getProperty(PropConstants.KEYTAB_FILE_PATH)
? ? )

? ? val session: SparkSession = SparkSession.builder()
// ? ? ?.master("local[9]")
// ? ? ?.config("spark.testing.memory","4718592000")
// ? ? ?.appName("spark to es")
// ? ? ?.config("spark.yarn.am.waitTime", "1000")
? ? ? .config("spark.hadoop.hive.exec.dynamic.partition", "true")//开启动态分区
? ? ? .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")//开启动态分区
? ? ? .enableHiveSupport() //支持hive
? ? ? .getOrCreate()

? ? import session.implicits._


? ? //首先处理
? ? val dataFrame: DataFrame = session.sql(
? ? ? """
? ? ? ? |select id,create_by,create_at,update_by,update_at,position_top,position_left,materiel_group_child_id,
? ? ? ? |drawing_no,width,height,drawing_type from
? ? ? ? | dws.center_materiel_hotspot
? ? ? ? |""".stripMargin)

? ? val rdd = dataFrame.map(x => {
? ? ? val id: String = x.getAs(0)
? ? ? val create_by: String = x.getAs(1)
? ? ? val create_at: String = x.getAs(2).toString
? ? ? val update_by: String = x.getAs(3)
? ? ? val update_at: String = x.getAs(4).toString
? ? ? val position_top: String = if(x.getAs(5)!=null) x.getAs(5).toString else x.getAs(5)
? ? ? val position_left: String = if(x.getAs(6)!=null) x.getAs(6).toString else x.getAs(6)
? ? ? val materiel_group_child_id: String = x.getAs(7)
? ? ? val drawing_no: String = x.getAs(8)
? ? ? val width: String = if(x.getAs(9)!=null) x.getAs(9).toString else x.getAs(9)
? ? ? val height: String = if(x.getAs(10)!=null) x.getAs(10).toString else x.getAs(10)
? ? ? val drawing_type: String = x.getAs(11)
? ? ? val createTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(create_at).getTime
? ? ? val updateTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(update_at).getTime
? ? ? center_materiel_hotspot_local(id, create_by, createTime, update_by, updateTime, position_top,position_left,
? ? ? ? materiel_group_child_id,drawing_no,width,height,drawing_type)
? ? })
// ? ?rdd.show(2)

? ? rdd.foreachPartition(x=>{
? ? ? val tuples: List[(String, center_materiel_hotspot_local)] = x.toList.map { data => (data.id, data) }
? ? ? //tuples 数据,参二:索引名称 ?,参三:类型
? ? ? val dt: String = new SimpleDateFormat("yyyyMMdd").format(new Date())
? ? ? savaBulk(tuples,"center_materiel_hotspot_local","_doc")
? ? })

? ? session.stop()
? }

? case class center_materiel_hotspot_local(id:String, createBy:String, createTime:Long, updateBy:String,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?updateTime:Long,positionTop:String, positionLeft:String,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?materielGroupChildId:String,drawingNo:String, width:String,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?height:String, drawingType:String )

? def savaBulk(dataList:List[(String,AnyRef)],indexName:String,typeName:String):Unit={
? ? if (dataList!=null && dataList.nonEmpty) {
? ? ? val client: JestClient = jestClient

? ? ? val builder = new Bulk.Builder
? ? ? builder.defaultIndex(indexName).defaultType(typeName)
? ? ? for ((id, data) <- dataList) {
? ? ? ? val index: Index = new Index.Builder(data).id(id).build()
? ? ? ? builder.addAction(index)
? ? ? }
? ? ? val bulk: Bulk = builder.build()
? ? ? //获取执行的返回值
? ? ? val items: util.List[BulkResult#BulkResultItem] = client.execute(bulk).getItems
? ? ? println("以保存:" + items.size() + "条数据!")
? ? ? client.close()
? ? }
? }

? import io.searchbox.client.JestClient
? import io.searchbox.client.JestClientFactory
? import io.searchbox.client.config.HttpClientConfig

? def jestClient: JestClient = {
? ? val factory = new JestClientFactory
? ? factory.setHttpClientConfig(new HttpClientConfig.Builder(
? ? ? "http://es-cn-tl32b5q0t003a6n6g.public.elasticsearch.aliyuncs.com:9200")
? ? ? .multiThreaded(true)
? ? ? .defaultMaxTotalConnectionPerRoute(2)
? ? ? .maxTotalConnection(10)
? ? ? .build)
? ? factory.getObject
? }

}
?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 12:10:28  更:2021-08-26 12:12:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 17:01:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码