spark 2.4
es 7.10.2
Scala2.11??
<dependency> ? ? ? ? ? ? <groupId>org.elasticsearch</groupId> ? ? ? ? ? ? <artifactId>elasticsearch-hadoop</artifactId> ? ? ? ? ? ? <version>7.10.2</version> ? ? ? ? </dependency>
? ? ? ? <dependency> ? ? ? ? ? ? <groupId>org.elasticsearch</groupId> ? ? ? ? ? ? <artifactId>elasticsearch-spark-20_2.11</artifactId> ? ? ? ? ? ? <version>7.10.0</version> ? ? ? ? </dependency>
? ? ? ? <dependency> ? ? ? ? ? ? <groupId>io.searchbox</groupId> ? ? ? ? ? ? <artifactId>jest</artifactId> ? ? ? ? ? ? <version>6.3.1</version> ? ? ? ? </dependency>
? ? ? ? <dependency> ? ? ? ? ? ? <groupId>org.projectlombok</groupId> ? ? ? ? ? ? <artifactId>lombok</artifactId> ? ? ? ? ? ? <version>1.16.10</version> ? ? ? ? </dependency>
? ? ? ? <dependency> ? ? ? ? ? ? <groupId>commons-logging</groupId> ? ? ? ? ? ? <artifactId>commons-logging</artifactId> ? ? ? ? ? ? <version>1.1.1</version> ? ? ? ? </dependency>
? ? ? ? <dependency> ? ? ? ? ? ? <groupId>commons-codec</groupId> ? ? ? ? ? ? <artifactId>commons-codec</artifactId> ? ? ? ? ? ? <version>1.4</version> ? ? ? ? </dependency>
? ? ? ? <dependency> ? ? ? ? ? ? <groupId>commons-httpclient</groupId> ? ? ? ? ? ? <artifactId>commons-httpclient</artifactId> ? ? ? ? ? ? <version>3.0.1</version> ? ? ? ? </dependency>
================
package com
import com.constant.PropConstants import com.javaUtil.PropertieUtil import io.searchbox.client.JestClient import io.searchbox.core.{Bulk, BulkResult, Index} import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions.lit import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} import org.elasticsearch.spark.sql.EsSparkSQL
import java.sql.Timestamp import java.text.SimpleDateFormat import java.util import java.util.{Date, Properties}
/**
?*/ object center_materiel_hotspot2es {
? def main(args: Array[String]): Unit = { ? ? val log: Logger = Logger.getRootLogger ? ? //读取集群配置文件 ? ? val prop: Properties = PropertieUtil.load("config.properties") ? ? //本地测试读文件 // ? ?val prop: Properties = PropertieUtil.getProperties("/config.properties")
? ? //读hive 的Kerberos认证 ? ? System.setProperty("java.security.krb5.conf", prop.getProperty(PropConstants.KRB5_CONF_PATH)) ? ? System.setProperty("HADOOP_USER_NAME", prop.getProperty(PropConstants.HADOOP_USER_NAME)) ? ? System.setProperty("user.name", prop.getProperty(PropConstants.USER_NAME)) ? ? UserGroupInformation.loginUserFromKeytab( ? ? ? prop.getProperty(PropConstants.KEYTAB_NAME), prop.getProperty(PropConstants.KEYTAB_FILE_PATH) ? ? )
? ? val session: SparkSession = SparkSession.builder() // ? ? ?.master("local[9]") // ? ? ?.config("spark.testing.memory","4718592000") // ? ? ?.appName("spark to es") // ? ? ?.config("spark.yarn.am.waitTime", "1000") ? ? ? .config("spark.hadoop.hive.exec.dynamic.partition", "true")//开启动态分区 ? ? ? .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")//开启动态分区 ? ? ? .enableHiveSupport() //支持hive ? ? ? .getOrCreate()
? ? import session.implicits._
? ? //首先处理 ? ? val dataFrame: DataFrame = session.sql( ? ? ? """ ? ? ? ? |select id,create_by,create_at,update_by,update_at,position_top,position_left,materiel_group_child_id, ? ? ? ? |drawing_no,width,height,drawing_type from ? ? ? ? | dws.center_materiel_hotspot ? ? ? ? |""".stripMargin)
? ? val rdd = dataFrame.map(x => { ? ? ? val id: String = x.getAs(0) ? ? ? val create_by: String = x.getAs(1) ? ? ? val create_at: String = x.getAs(2).toString ? ? ? val update_by: String = x.getAs(3) ? ? ? val update_at: String = x.getAs(4).toString ? ? ? val position_top: String = if(x.getAs(5)!=null) x.getAs(5).toString else x.getAs(5) ? ? ? val position_left: String = if(x.getAs(6)!=null) x.getAs(6).toString else x.getAs(6) ? ? ? val materiel_group_child_id: String = x.getAs(7) ? ? ? val drawing_no: String = x.getAs(8) ? ? ? val width: String = if(x.getAs(9)!=null) x.getAs(9).toString else x.getAs(9) ? ? ? val height: String = if(x.getAs(10)!=null) x.getAs(10).toString else x.getAs(10) ? ? ? val drawing_type: String = x.getAs(11) ? ? ? val createTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(create_at).getTime ? ? ? val updateTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(update_at).getTime ? ? ? center_materiel_hotspot_local(id, create_by, createTime, update_by, updateTime, position_top,position_left, ? ? ? ? materiel_group_child_id,drawing_no,width,height,drawing_type) ? ? }) // ? ?rdd.show(2)
? ? rdd.foreachPartition(x=>{ ? ? ? val tuples: List[(String, center_materiel_hotspot_local)] = x.toList.map { data => (data.id, data) } ? ? ? //tuples 数据,参二:索引名称 ?,参三:类型 ? ? ? val dt: String = new SimpleDateFormat("yyyyMMdd").format(new Date()) ? ? ? savaBulk(tuples,"center_materiel_hotspot_local","_doc") ? ? })
? ? session.stop() ? }
? case class center_materiel_hotspot_local(id:String, createBy:String, createTime:Long, updateBy:String, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?updateTime:Long,positionTop:String, positionLeft:String, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?materielGroupChildId:String,drawingNo:String, width:String, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?height:String, drawingType:String )
? def savaBulk(dataList:List[(String,AnyRef)],indexName:String,typeName:String):Unit={ ? ? if (dataList!=null && dataList.nonEmpty) { ? ? ? val client: JestClient = jestClient
? ? ? val builder = new Bulk.Builder ? ? ? builder.defaultIndex(indexName).defaultType(typeName) ? ? ? for ((id, data) <- dataList) { ? ? ? ? val index: Index = new Index.Builder(data).id(id).build() ? ? ? ? builder.addAction(index) ? ? ? } ? ? ? val bulk: Bulk = builder.build() ? ? ? //获取执行的返回值 ? ? ? val items: util.List[BulkResult#BulkResultItem] = client.execute(bulk).getItems ? ? ? println("以保存:" + items.size() + "条数据!") ? ? ? client.close() ? ? } ? }
? import io.searchbox.client.JestClient ? import io.searchbox.client.JestClientFactory ? import io.searchbox.client.config.HttpClientConfig
? def jestClient: JestClient = { ? ? val factory = new JestClientFactory ? ? factory.setHttpClientConfig(new HttpClientConfig.Builder( ? ? ? "http://es-cn-tl32b5q0t003a6n6g.public.elasticsearch.aliyuncs.com:9200") ? ? ? .multiThreaded(true) ? ? ? .defaultMaxTotalConnectionPerRoute(2) ? ? ? .maxTotalConnection(10) ? ? ? .build) ? ? factory.getObject ? }
} ?
|