spark 连接kafka数据sink到tablestore中
直接上代码, 有不明白的可以留言 pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun.tablestore</groupId>
<artifactId>tablestore-spark</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.4.5</spark.version>
<scala.version>2.11.7</scala.version>
<scala.version.sp>2.11</scala.version.sp>
<emr.version>2.2.0</emr.version>
<kafka.version>2.3.0</kafka.version>
<tablestore.sdk.version>5.10.3</tablestore.sdk.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version.sp}</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version.sp}</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<!--<dependency>-->
<!--<groupId>org.apache.kafka</groupId>-->
<!--<artifactId>kafka-clients</artifactId>-->
<!--<version>${kafka.version}</version>-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<!--<dependency>-->
<!--<groupId>org.apache.kafka</groupId>-->
<!--<artifactId>kafka_${scala.version.sp}</artifactId>-->
<!--<version>${kafka.version}</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>org.slf4j</groupId>-->
<!--<artifactId>slf4j-log4j12</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version.sp}</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version.sp}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.19</version>
</dependency>
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-tablestore</artifactId>
<version>${emr.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- tablestore -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>${tablestore.sdk.version}</version>
<classifier>jar-with-dependencies</classifier>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-tablestore</artifactId>
<version>${emr.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.version.sp}</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
</project>
执行代码:
package com.spark
import com.alibaba.fastjson.JSON
import org.apache.spark.sql._
case class User(id_md5:String, id:String)
object StructuredKafkaToOTS {
def main(args: Array[String]): Unit = {
val brokers="localhost:9092"
val topics="test"
val groupId ="test"
val spark = SparkSession.builder.appName("123")
.master("local[*]")
.getOrCreate()
val dataCatalog: String =
s"""
|{"columns": {
| "id_md5": {"type":"string"},
| "id": {"type":"string"}
| }
|}""".stripMargin
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test12")
.option("group.id", "test11")
.option("startingOffsets","earliest")
.load()
import spark.implicits._
val tt = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]
val yy = tt.map(data =>{
val value = data._2
println("sec: " + value)
val user = JSON.parseObject(value,classOf[User])
user
})
val ttt = yy
.writeStream
.format("tablestore")
.option("endpoint", "https://xxx.aliyuncs.com")
.option("instance.name", "test-xxx")
.option("access.key.id", "xxx")
.option("access.key.secret", "xxx")
.option("table.name", "xxx")
.option("catalog", dataCatalog)
.option("checkpointLocation", "D:\\spark_data\\spark_scala\\logs")
.option("triggerInterval", 20)
.start()
ttt.awaitTermination()
tt
.writeStream
.format("kafka")
.option("checkpointLocation", "D:\\spark_data\\spark_scala\\logs")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test0")
.start().awaitTermination()
yy.show(false)
println("kkkkkkkkkkkkkkkkkkkkkkk")
}
}
|