Structured Streaming整合Kafka实时统计
一、实时ETL

启动Zookeeper和kafka
bin/zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties
创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic stationTopic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic etlTopic
模拟生成数据代码:
写入kafka 的stationTopic主题
package com.jiang.structed_streaming_kafka
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
object MockStationLog {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "192.168.89.15:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val random = new Random()
val allStatus = Array(
"fail", "busy", "barring", "success", "success", "success",
"success", "success", "success", "success", "success", "success"
)
while (true) {
val callOut: String = "1860000%04d".format(random.nextInt(10000))
val callIn: String = "1890000%04d".format(random.nextInt(10000))
val callStatus: String = allStatus(random.nextInt(allStatus.length))
val callDuration = if ("success".equals(callStatus)) (1 + random.nextInt(10)) * 1000L else 0L
val stationLog: StationLog = StationLog(
"station_" + random.nextInt(10),
callOut,
callIn,
callStatus,
System.currentTimeMillis(),
callDuration
)
println(stationLog.toString)
Thread.sleep(100 + random.nextInt(100))
val record = new ProducerRecord[String, String]("stationTopic", stationLog.toString)
producer.send(record)
}
producer.close()
}
case class StationLog(
stationId:String,
callOut:String,
callIn:String,
callStatus:String,
callTime:Long,
duration:Long,
){
override def toString(): String = {
s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
}
}
}
从stationTopic消费数据---->使用StructuredStreaming进行ETL—>将ETL的结果写入etlTopic主题
获取stationTopic中的success数据
package com.jiang.StructedStreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Kafka_ETL {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions","1")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.89.10:9092")
.option("subscribe", "stationTopic")
.load()
val ds: Dataset[String] = kafkaDF.selectExpr("CAST(value AS STRING)").as[String]
val etlResult: Dataset[String] = ds.filter(_.contains("success"))
etlResult.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.89.10:9092")
.option("topic", "etlTopic")
.option("checkpointLocation","./ckp")
.start()
.awaitTermination()
spark.stop()
}
}
二、物联网设备数据实时分析
1、需求

2、创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic iotTopic
3、代码示例
(1)模拟数据的生成代码(数据发送到kafka的iotTopic主题)
package com.jiang.structed_streaming_kafka
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
object MockIotDatas {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "192.168.89.10:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val random = new Random()
val deviceTypes = Array(
"db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata"
)
while (true) {
val index: Int = random.nextInt(deviceTypes.length)
val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}"
val deviceType: String = deviceTypes(index)
val deviceSignal: Int = 10 + random.nextInt(90)
val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())
val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)
println(deviceJson)
Thread.sleep(1000 + random.nextInt(500))
val record = new ProducerRecord[String, String]("iotTopic", deviceJson)
producer.send(record)
}
producer.close()
}
case class DeviceData(
device: String,
deviceType: String,
signal: Double,
time: Long
)
}
(2)业务处理代码(统计信号强度>30的各种设备类型对应的数量和平均信号强度)
? 从kafka的iotTopic主题消费数据 -->使用StructuredStreaming进行实时分析–>将结果写到控制台
package com.jiang.structed_streaming_kafka
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object Kafka_IOT {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.89.10:9092")
.option("subscribe", "iotTopic")
.load()
val valueDS: Dataset[String] = kafkaDF.selectExpr("CAST(value AS STRING)").as[String]
val schemaDF: DataFrame = valueDS.filter(StringUtils.isNotBlank(_))
.select(
get_json_object($"value", "$.device").as("device_id"),
get_json_object($"value", "$.deviceType").as("deviceType"),
get_json_object($"value", "$.signal").cast(DoubleType).as("signal")
)
schemaDF.createOrReplaceTempView("t_iot")
val sql: String =
"""
|select deviceType,count(*) as counts,avg(signal) as avgsignal
|from t_iot
|where signal > 30
|group by deviceType
|""".stripMargin
val result1: DataFrame = spark.sql(sql)
val result2: DataFrame = schemaDF.filter('signal > 30)
.groupBy('deviceType)
.agg(
count('device_id) as "counts",
avg('signal) as "avgsignal"
)
result1.writeStream
.format("console")
.outputMode("complete")
.start()
result2.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
spark.stop()
}
}
|