Spark 的 StructedStreaming
一、Spark Streaming的不足
1、基于微批,延迟高,不能做真正的实时
2、DStream基于RDD,不直接支持SQL
3、流批处理的API应用层不统一(流用的DStream–底层是RDD,批的用DF/DS/RDD)
4、不支持EventTime事件时间
- 注
- EventTime事件时间:事件真正发生的时间
- PorcessingTime处理时间:事件被流系统处理的时间
- IngestionTime摄入时间:事件到达流系统的时间
- 如:一条错误日志10月1日,23:59:00秒产生的(事件时间),因为网络延迟,到10月2日00:00:10到达日志处理系统(摄入时间),10月2日00:00:20被流系统处理(处理时间)
- 如果要统计10月1日的系统bug数量,那么SparkStreaming不能正确统计,因为它不支持事件时间
5、数据的Exactly-Once(恰好一次语义)需要手动实现
- 注:数据的一次性语义
- 最多一次
- 恰好一次-----是我们的目标,Spark Streaming如果要实现恰好一次,需要手动维护偏移量
- 最少一次
二、StructedStreaming
1、StructedStreaming位置

2、StructedStreaming介绍
编程模型:无界表/动态表格
数据抽象:DataFrame/DataSet

3、Source-----数据源代码示例
(1)StructedStreaming -----Socket
package com.jiang.StructedStreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Source_Socket {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions","3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
val df: DataFrame = spark.readStream
.format("socket")
.option("host","192.168.89.15")
.option("port","9999")
.load()
val ds: Dataset[String] = df.as[String]
val result: Dataset[Row] = ds.flatMap(_.split("\\s+"))
.groupBy('value)
.count()
.orderBy('count.desc)
result.writeStream.format("console")
.outputMode("complete")
.start()
.awaitTermination()
spark.stop()
}
}
(2)StructedStreaming -----Rate
package com.jiang.StructedStreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Source_Rate {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions","3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond","10")
.option("rampUpTime","0s")
.option("numPartitions","2")
.load()
df.writeStream.format("console")
.outputMode("append")
.option("truncate",false)
.start()
.awaitTermination()
spark.stop()
}
}
(3)StructedStreaming -----File
package com.jiang.StructedStreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
object Source_File {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions","3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
val csvSchema:StructType = new StructType()
.add("id",IntegerType,nullable = true)
.add("name",StringType,nullable = true)
.add("age",IntegerType,nullable = true)
val df: DataFrame = spark.readStream
.option("sep",",")
.option("header","false")
.schema(csvSchema)
.format("csv").load("data/input/person.csv")
df.writeStream.format("console")
.outputMode("append")
.option("truncate",false)
.start()
.awaitTermination()
spark.stop()
}
}
(4)StructedStreaming -----Operation
package com.jiang.StructedStreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Operation_Socket {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions","3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
val df: DataFrame = spark.readStream
.format("socket")
.option("host","192.168.89.15")
.option("port","9999")
.load()
val ds: Dataset[String] = df.as[String]
val wordsDS: Dataset[String] = ds.flatMap(_.split("\\s+"))
val result: Dataset[Row] = wordsDS
.groupBy('value)
.count()
.orderBy('count.desc)
wordsDS.createOrReplaceTempView("t_words")
val sql:String =
"""
|select value,count(*) as counts
|from t_words
|group by value
|order by counts desc
""".stripMargin
val result2: DataFrame = spark.sql(sql)
result.writeStream.format("console")
.outputMode("complete")
.start()
result2.writeStream.format("console")
.outputMode("complete")
.start()
.awaitTermination()
spark.stop()
}
}
4、Sink
(1)StructedStreaming -----Output Modes

Sink-------ForeachBatch
scala代码:
package com.jiang.StructedStreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql._
object Sink_ForeachBatch {
val url = "jdbc:mysql://localhost:3306/bigdata_test"
var username = "root"
val passwd = "123456"
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions","3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
val df: DataFrame = spark.readStream
.format("socket")
.option("host","192.168.89.15")
.option("port","9999")
.load()
val ds: Dataset[String] = df.as[String]
val wordsDS: Dataset[String] = ds.flatMap(_.split("\\s+"))
val result: Dataset[Row] = wordsDS
.groupBy('value)
.count()
.orderBy('count.desc)
result.writeStream
.foreachBatch((ds:Dataset[Row],batchId:Long)=>{
println("==========自定义输出控制台==========")
println("batchId------>" + batchId)
println("==========自定义输出控制台==========")
ds.show()
ds.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("url",url)
.option("user",username)
.option("password",passwd)
.option("dbtable","bigdata_test.struct_words")
.save()
})
.outputMode("complete")
.start()
.awaitTermination()
spark.stop()
}
}
触发间隔:(Tirgger、Checkpoint)
package com.jiang.StructedStreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Sink_Tirgger_Checkpoint {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions","3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
val df: DataFrame = spark.readStream
.format("socket")
.option("host","192.168.89.15")
.option("port","9999")
.load()
val ds: Dataset[String] = df.as[String]
val result: Dataset[Row] = ds.coalesce(1).flatMap(_.split("\\s+"))
.groupBy('value)
.count()
result.writeStream
.format("console")
.outputMode("complete")
.trigger(Trigger.Continuous("1 seconds"))
.option("checkpointLocation","./ckp"+System.currentTimeMillis())
.start()
.awaitTermination()
spark.stop()
}
}
|