1 输出模式
在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下: 文档: http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#starting-streaming-queries
"Output"是用来定义写入外部存储器的内容,输出可以被定义为不同模式:
- 追加模式(Append mode),默认模式,其中只有自从上一次触发以来,添加到 Result Table的新行将会是outputted to the sink。只有添加到Result Table的行将永远不会改变那些查询才支持这一点。这种模式保证每行只能输出一次(假设 fault-tolerant sink )。例如,只有select,where, map, flatMap, filter, join等查询支持 Append mode 。只输出那些将来永远不可能再更新的数据,然后数据从内存移除 。没有聚合的时候,append和update一致;有聚合的时候,一定要有水印,才能使用。
- 完全模式(Complete mode),每次触发后,整个Result Table将被输出到sink,aggregationqueries(聚合查询)支持。全部输出,必须有聚合。
- 更新模式(Update mode),只有 Result Table rows 自上次触发后更新将被输出到 sink。与Complete模式不同,因为该模式只输出自上次触发器以来已经改变的行。如果查询不包含聚合,那么等同于Append模式。只输出更新数据(更新和新增)。
- 注意,不同查询Query,支持对应的输出模式,如下表所示:
2 查询名称
可以给每个查询Query设置名称Name,必须是唯一的,直接调用DataFrameWriter中queryName方法即可,实际生产开发建议设置名称,API说明如下:
3 触发间隔
触发器Trigger决定了多久执行一次查询并输出结果,当不设置时,默认只要有新数据,就立即执行查询Query,再进行输出。目前来说,支持三种触发间隔设置: 其中Trigger.Processing表示每隔多少时间触发执行一次,此时流式处理依然属于微批处理;从Spark 2.3以后,支持Continue Processing流式处理,设置触发间隔为Trigger.Continuous。设置 代码示例如下:
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.format("console")
.start()
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()
4 检查点位置
在Structured Streaming中使用Checkpoint 检查点进行故障恢复。如果实时应用发生故障或 关机,可以恢复之前的查询的进度和状态,并从停止的地方继续执行,使用Checkpoint和预写日志 WAL完成。使用检查点位置配置查询,那么查询将所有进度信息(即每个触发器中处理的偏移范围) 和运行聚合(例如词频统计wordcount)保存到检查点位置。此检查点位置必须是HDFS兼容文件 系统中的路径,两种方式设置Checkpoint Location位置:
- streamDF.writeStream.option(“checkpointLocation”, “xxx”)
- sparkConf.set(“spark.sql.streaming.checkpointLocation”, “xxx”)
修改上述词频统计案例程序,设置输出模式、查询名称、触发间隔及检查点位置,演示代码如下:
package cn.itcast.spark.output
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery,Trigger}
import org.apache.spark.sql.{DataFrame,SparkSession}
object StructuredQueryOutput{
def main(args:Array[String]):Unit={
val spark:SparkSession=SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions","2")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val inputStreamDF:DataFrame=spark.readStream
.format("socket")
.option("host","node1.itcast.cn")
.option("port",9999)
.load()
val resultStreamDF:DataFrame=inputStreamDF
.as[String]
.filter(line=>null!=line&&line.trim.length>0)
.flatMap(line=>line.trim.split("\\s+"))
.groupBy($"value").count()
val query:StreamingQuery=resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.queryName("query-socket-wc")
.trigger(Trigger.ProcessingTime("5 seconds"))
.format("console")
.option("numRows","10")
.option("truncate","false")
.option("checkpointLocation","datas/structured/ckpt-1001")
.start()
query.awaitTermination()
query.stop()
}
}
运行流式应用,查看Checkpoint Location,包含以下几个目录:
各个子目录及文件含义说明:
- 第一、偏移量目录【offsets】:记录每个批次中的偏移量。为了保证给定的批次始终包含相同的
数据,在处理数据前将其写入此日志记录。此日志中的第 N 条记录表示当前正在已处理,第 N-1 个条目指示哪些偏移已处理完成。 - 第二、提交记录目录【commits】:记录已完成的批次,重启任务检查完成的批次与 offsets 批
次记录比对,确定接下来运行的批次; - 第三、元数据文件【metadata】:metadata 与整个查询关联的元数据,目前仅保留当前job id
- 第四、数据源目录【sources】:sources 目录为数据源(Source)时各个批次读取详情
- 第五、数据接收端目录【sinks】:sinks 目录为数据接收端(Sink)时批次的写出详情
- 第六、记录状态目录【state】:当有状态操作时,如累加聚合、去重、最大最小等场景,这个
目录会被用来记录这些状态数据,根据配置周期性地生成.snapshot文件用于记录状态。
5 输出终端(Sinks)
Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的3个组件,并且在每个组件显式地做到fault-tolerant(容错),由此得到整个streaming程序的end-to-end exactly-once guarantees。目前Structured Streaming内置FileSink、Console Sink、Foreach Sink(ForeachBatch Sink)、Memory Sink及Kafka Sink,其中测试最为方便的是Console Sink。
5.1 文件接收器
将输出存储到目录文件中,支持文件格式:parquet、orc、json、csv等,示例如下: 相关注意事项如下:
- 支持OutputMode为:Append追加模式;
- 必须指定输出目录参数【path】,必选参数,其中格式有parquet、orc、json、csv等等;
- 容灾恢复支持精确一次性语义exactly-once;
- 此外支持写入分区表,实际项目中常常按时间划分;
5.2 Memory Sink
此种接收器作为调试使用,输出作为内存表存储在内存中, 支持Append和Complete输出模式。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中,因此,请谨慎使用,示例如下:
5.3 Foreach Sink
Structured Streaming提供接口foreach和foreachBatch,允许用户在流式查询的输出上应用任意操作和编写逻辑,比如输出到MySQL表、Redis数据库等外部存系统。其中foreach允许每行自定义写入逻辑,foreachBatch允许在每个微批量的输出上进行任意操作和自定义逻辑,建议使用foreachBatch操作。 foreach表达自定义编写器逻辑具体来说,需要编写类class继承ForeachWriter,其中包含三个 方法来表达数据写入逻辑:打开,处理和关闭。
streamingDatasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
}
def process(record: String): Unit = {
}
def close(errorOrNull: Throwable): Unit = {
}
}
).start()
演示案例:将前面词频统计结果输出到MySQL表【tb_word_count】中。
CREATE TABLE `db_spark`.`tb_word_count` (
`id` int NOT NULL AUTO_INCREMENT,
`word` varchar(255) NOT NULL,
`count` int NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `word` (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
REPLACE INTO ` tb_word_count` (`id`, `word`, `count`) VALUES (NULL, ?, ?);
- 第二步、编写MySQLForeachWriter,继承ForeachWriter,其中DataFrame中数据类型为Row
import java.sql.{Connection,DriverManager,PreparedStatement}
import org.apache.spark.sql.{ForeachWriter,Row}
class MySQLForeachWriter extends ForeachWriter[Row]{
var conn:Connection=_
var pstmt:PreparedStatement=_
val insertSQL="REPLACE INTO `tb_word_count` (`id`, `word`, `count`) VALUES (NULL, ?, ?)"
override def open(partitionId:Long,version:Long):Boolean={
Class.forName("com.mysql.cj.jdbc.Driver")
conn=DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/db_spark?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
"root",
"123456"
)
pstmt=conn.prepareStatement(insertSQL)
true
}
override def process(row:Row):Unit={
pstmt.setString(1,row.getAs[String]("value"))
pstmt.setLong(2,row.getAs[Long]("count"))
pstmt.executeUpdate()
}
override def close(errorOrNull:Throwable):Unit={
if(null!=pstmt)pstmt.close()
if(null!=conn)conn.close()
}
}
- 第三步、修改词频统计程序,使用foreach设置Sink为自定义MySQLForeachWriter,代码如下:
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.{DataFrame,SparkSession}
object StructuredMySQLSink{
def main(args:Array[String]):Unit={
val spark:SparkSession=SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions","2")
.getOrCreate()
import spark.implicits._
val inputStreamDF:DataFrame=spark.readStream
.format("socket")
.option("host","node1.itcast.cn")
.option("port",9999)
.load()
val resultStreamDF:DataFrame=inputStreamDF
.as[String]
.filter(line=>null!=line&&line.trim.length>0)
.flatMap(line=>line.trim.split("\\s+"))
.groupBy($"value").count()
val query:StreamingQuery=resultStreamDF.writeStream
.outputMode(OutputMode.Update())
.foreach(new MySQLForeachWriter())
.start()
query.awaitTermination()
query.stop()
}
}
运行应用,模式数据,查看MySQL表的结果数据如下:
5.4 ForeachBatch Sink
方法foreachBatch允许指定在流式查询的每个微批次的输出数据上执行的函数,需要两个参数:微批次的输出数据DataFrame或Dataset、微批次的唯一ID。
使用foreachBatch函数输出时,以下几个注意事项:
- 第一、重用现有的批处理数据源,可以在每个微批次的输出上使用批处理数据输出Output;
- 第二、写入多个位置,如果要将流式查询的输出写入多个位置,则可以简单地多次写入输出
DataFrame/Dataset 。但是,每次写入尝试都会导致重新计算输出数据(包括可能重新读取输入数据)。要避免重新计算,您应该缓存cache输出 DataFrame/Dataset,将其写入多个位置,然后 uncache 。 - 第三、应用其他DataFrame操作,流式DataFrame中不支持许多DataFrame和Dataset操作,使用foreachBatch可以在每个微批输出上应用其中一些操作,但是,必须自己解释执行该操作的端到端语义。
- 第四、默认情况下,foreachBatch仅提供至少一次写保证。 但是,可以使用提供给该函数的batchId作为重复数据删除输出并获得一次性保证的方法。
- 第五、foreachBatch不适用于连续处理模式,因为它从根本上依赖于流式查询的微批量执行。 如果以连续模式写入数据,请改用foreach。
- 范例演示:使用foreachBatch将词频统计结果输出到MySQL表中,代码如下:
package cn.itcast.spark.sink.batch
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
object StructuredForeachBatch{
def main(args:Array[String]):Unit={
val spark:SparkSession=SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions","2")
.getOrCreate()
import spark.implicits._
val inputStreamDF:DataFrame=spark.readStream
.format("socket")
.option("host","node1.itcast.cn")
.option("port",9999)
.load()
val resultStreamDF:DataFrame=inputStreamDF
.as[String]
.filter(line=>null!=line&&line.trim.length>0)
.flatMap(line=>line.trim.split("\\s+"))
.groupBy($"value").count()
val query:StreamingQuery=resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.foreachBatch{(batchDF:DataFrame,batchId:Long)=>
println(s"BatchId = ${batchId}")
if(!batchDF.isEmpty){
batchDF
.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url","jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&charact
erEncoding=utf8&useUnicode=true")
.option("user","root")
.option("password","123456")
.option("dbtable","db_spark.tb_word_count2")
.save()
}
}
.start()
query.awaitTermination()
query.stop()
}
}
6 容错语义
针对任何流式应用处理框架(Storm、SparkStreaming、StructuredStreaming和Flink等)处理数据时,都要考虑语义,任意流式系统处理流式数据三个步骤:
- 1)、Receiving the data:接收数据源端的数据
- 采用接收器或其他方式从数据源接收数据(The data is received from sources usingReceivers or otherwise)。
- 2)、Transforming the data:转换数据,进行处理分析
- 针对StructuredStreaming来说就是Stream DataFrame(The received data is
transformed using DStream and RDD transformations)。
- 3)、Pushing out the data:将结果数据输出
- 最终分析结果数据推送到外部存储系统,比如文件系统HDFS、数据库等(The finaltransformed data is pushed out to external systems like file systems, databases,dashboards, etc)。
在处理数据时,往往需要保证数据处理一致性语义:从数据源端接收数据,经过数据处理分析,到最终数据输出仅被处理一次,是最理想最好的状态。在Streaming数据处理分析中,需要考虑数据是否被处理及被处理次数,称为消费语义,主要有三种:
- At most once:最多一次,可能出现不消费,数据丢失;
- At least once:至少一次,数据至少消费一次,可能出现多次消费数据;
- Exactly once:精确一次,数据当且仅当消费一次,不多不少;
Structured Streaming的核心设计理念和目标之一:支持一次且仅一次Extracly-Once的语义。 为了实现这个目标,Structured Streaming设计source、sink和execution engine来追踪计算处理的进度,这样就可以在任何一个步骤出现失败时自动重试。 - 1、每个Streaming source都被设计成支持offset,进而可以让Spark来追踪读取的位置;
- 2、Spark基于checkpoint和wal来持久化保存每个trigger interval内处理的offset的范围;
- 3、sink被设计成可以支持在多次计算处理时保持幂等性,就是说,用同样的一批数据,无论多少次去更新sink,都会保持一致和相同的状态。综合利用基于offset的source,基于checkpoint和wal的execution engine,以及基于幂等性的sink,可以支持完整的一次且仅一次的语义。
7 Kafka 数据消费
Apache Kafka 是目前最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景,Kafka基本是标配。StructuredStreaming很好的集成Kafka,可以从Kafka拉取消息,然后就可以把流数据看做一个DataFrame, 一张无限增长的大表,在这个大表上做查询,Structured Streaming保证了端到端的 exactly-once,用户只需要关心业务即可,不用费心去关心底层是怎么做的。 StructuredStreaming集成Kafka,官方文档如下:
http://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
目前仅支持Kafka 0.10.+版本及以上,底层使用Kafka New Consumer API拉取数据,如果公司 Kafka版本为0.8.0版本,StructuredStreaming集成Kafka参考文档: StructuredStreaming既可以从Kafka读取数据,又可以向Kafka 写入数据,添加Maven依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
Maven Project工程中目录结构如下: Kafka把生产者发送的数据放在不同的分区里面,这样就可以并行进行消费了。每个分区里面的数据都是递增有序的,跟structured commit log类似,生产者和消费者使用Kafka 进行解耦,消费者不管你生产者发送的速率如何,只要按照一定的节奏进行消费就可以了。每条消息在一个分区里面都有一个唯一的序列号offset(偏移量),Kafka 会对内部存储的消息设置一个过期时间,如果过期了,就会标记删除,不管这条消息有没有被消费。Kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了。涉及一个问题:如果开始消费,就要定一下从什么位置开始。
- 第一、earliest:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉了,所以可以理解为从现存的数据里最小位置开始消费;
- 第二、latest:从最末位置开始消费;
- 第三、per-partition assignment:对每个分区都指定一个offset,然后从offset位置开始消费;当第一次开始消费一个Kafka 流的时候,上述策略任选其一,如果之前已经消费了,而且做了checkpoint ,比如消费程序升级了,这时候就会从上次结束的位置开始继续消费。目前StructuredStreaming和Flink框架从Kafka消费数据时,采用的就是上述的策略。
8 Kafka 数据源
Structured Streaming消费Kafka数据,采用的是poll方式拉取数据,与Spark Streaming中New Consumer API集成方式一致。从Kafka Topics中读取消息,需要指定数据源(kafka)、Kafka集群 的连接地址(kafka.bootstrap.servers)、消费的topic(subscribe或subscribePattern), 指定topic的时候,可以使用正则来指定,也可以指定一个 topic 的集合。官方提供三种方式从Kafka topic中消费数据,主要区别在于每次消费Topic名称指定,
- 方式一:消费一个Topic数据
- 方式二:消费多个Topic数据
- 方式三:消费通配符匹配Topic数据
从Kafka 获取数据后Schema字段信息如下,既包含数据信息有包含元数据信息:
在实际开发时,往往需要获取每条数据的消息,存储在value字段中,由于是binary类型,需要 转换为字符串String类型;此外了方便数据操作,通常将获取的key和value的DataFrame转换为 Dataset强类型,伪代码如下: 从Kafka数据源读取数据时,可以设置相关参数,包含必须参数和可选参数:
- 必须参数:kafka.bootstrap.servers和subscribe,可以指定开始消费偏移量assign。
- 可选参数:
范例演示:从Kafka消费数据,进行词频统计,Topic为wordsTopic。
/export/server/zookeeper/bin/zkServer.sh start
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replicatio
n-factor 1 --partitions 3 --topic wordsTopic
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic wordsTopic
package cn.itcast.spark.kafka.source
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.{DataFrame,SparkSession}
object StructuredKafkaSource{
def main(args:Array[String]):Unit={
val spark:SparkSession=SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.config("spark.sql.shuffle.partitions","3")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val kafkaStreamDF:DataFrame=spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","node1.itcast.cn:9092")
.option("subscribe","wordsTopic")
.option("maxOffsetsPerTrigger","100000")
.load()
val resultStreamDF:DataFrame=kafkaStreamDF
.selectExpr("CAST(value AS STRING)")
.as[String]
.filter(line=>null!=line&&line.trim.length>0)
.flatMap(line=>line.trim.split("\\s+"))
.groupBy($"value").count()
val query:StreamingQuery=resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.format("console").option("numRows","10").option("truncate","false")
.start()
query.awaitTermination()
query.stop()
}
}
9 Kafka 接收器
往Kafka里面写数据类似读取数据,可以在DataFrame上调用writeStream来写入Kafka,设置参 数指定value,其中key是可选的,如果不指定就是null。如果key为null,有时候可能导致分区数据 不均匀。
9.1 配置说明
将DataFrame写入Kafka时,Schema信息中所需的字段: 需要写入哪个topic,可以像上述所示在操作DataFrame 的时候在每条record上加一列topic字段指定,也可以在DataStreamWriter上指定option配置。 写入数据至Kafka,需要设置Kafka Brokers地址信息及可选配置:
- 必选参数:kafka.bootstrap.servers,使用逗号隔开【host:port】字符;
- 可选参数:topic,如果DataFrame中没有topic列,此处指定topic表示写入Kafka Topic。
官方提供示例代码如下:
9.2 实时数据ETL架构
在实际实时流式项目中,无论使用Storm、SparkStreaming、Flink及Structured Streaming处理流式数据时,往往先从Kafka 消费原始的流式数据,经过ETL后将其存储到Kafka Topic中,以便其他业务相关应用消费数据,实时处理分析,技术架构流程图如下所示: 接下来模拟产生运营商基站数据,实时发送到Kafka 中,使用StructuredStreaming消费,经过ETL(获取通话状态为success数据)后,写入Kafka中,便于其他实时应用消费处理分析。
9.3 模拟基站日志数据
模拟产生运营商基站通话日志数据,封装到样例类中,字段信息如下:
package cn.itcast.spark.kafka.mock
case
class StationLog(
stationId:String,
callOut:String,
callIn:String,
callStatus:String,
callTime:Long,
duration:Long
) {
override def
toString:String =s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
}
创建Topic,相关命令如下:
/export/server/zookeeper/bin/zkServer.sh start
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-
factor 1 --partitions 3 --topic stationTopic
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic stationTopic
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic station
Topic --from-beginning
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic statio
nTopic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-
factor 1 --partitions 3 --topic etlTopic
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic etlTopic
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic etlTopi
c --from-beginning
编写代码,实时产生日志数据,发送Kafka Topic:
package cn.itcast.spark.kafka.mock
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
object MockStationLog{
def main(args:Array[String]):Unit={
val props=new Properties()
props.put("bootstrap.servers","node1.itcast.cn:9092")
props.put("acks","1")
props.put("retries","3")
props.put("key.serializer",classOf[StringSerializer].getName)
props.put("value.serializer",classOf[StringSerializer].getName)
val producer=new KafkaProducer[String,String](props)
val random=new Random()
val allStatus=Array(
"fail","busy","barring","success","success","success",
"success","success","success","success","success","success"
)
while(true){
val callOut:String="1860000%04d".format(random.nextInt(10000))
val callIn:String="1890000%04d".format(random.nextInt(10000))
val callStatus:String=allStatus(random.nextInt(allStatus.length))
val callDuration=if("success".equals(callStatus))(1+random.nextInt(10))*1000Lelse 0L
val stationLog:StationLog=StationLog(
"station_"+random.nextInt(10),
callOut,
callIn,
callStatus,
System.currentTimeMillis(),
callDuration
)
println(stationLog.toString)
Thread.sleep(100+random.nextInt(100))
val record=new ProducerRecord[String,String]("stationTopic",stationLog.toString)
producer.send(record)
}
producer.close()
}
}
运行程序,基站通话日志数据格式如下:
station_7,18600009710,18900000269,success,1590709965144,4000
station_6,18600003894,18900000028,success,1590709965333,8000
station_7,18600007207,18900001057,busy,1590709965680,0
9.4 实时增量ETL
编写应用实时从Kafka的【stationTopic】消费数据,经过处理分析后,存储至Kafka的 【etlTopic】,其中需要设置检查点目录,保证应用一次且仅一次的语义。
package cn.itcast.spark.kafka.sink
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.{DataFrame,Dataset,SparkSession}
object StructuredEtlSink{
def main(args:Array[String]):Unit={
val spark:SparkSession=SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.config("spark.sql.shuffle.partitions","3")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val kafkaStreamDF:DataFrame=spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","node1.itcast.cn:9092")
.option("subscribe","stationTopic")
.load()
val etlStreamDF:Dataset[String]=kafkaStreamDF
.selectExpr("CAST(value AS STRING)")
.as[String]
.filter{log=>
null!=log&&log.trim.split(",").length==6&&"success".equals(log.trim.split(",")(3))
}
etlStreamDF.printSchema()
val query:StreamingQuery=etlStreamDF.writeStream
.outputMode(OutputMode.Append())
.format("kafka")
.option("kafka.bootstrap.servers","node1.itcast.cn:9092")
.option("topic","etlTopic")
.option("checkpointLocation",s"datas/structured/etl-100001")
.start()
query.awaitTermination()
query.stop()
}
}
10 Kafka 特定配置
从Kafka消费数据时,相关配置属性可以通过带有kafka.prefix的DataStreamReader.option进行设置,例如前面设置Kafka Brokers地址属性:stream.option(“kafka.bootstrap.servers”, “host:port”),更多关于Kafka 生产者Producer Config配置属和消费者Consumer Config配置属性,参考文档:
- http://kafka.apache.org/20/documentation.html#producerconfigs
- 消费者配置(New Consumer Configs):
- http://kafka.apache.org/20/documentation.html#newconsumerconfigs
注意以下Kafka参数属性可以不设置,如果设置的话,Kafka source或者sink可能会抛出错误:
- 1)、group.id:Kafka source将会自动为每次查询创建唯一的分组ID;
- 2)、auto.offset.reset:在将source选项startingOffsets设置为指定从哪里开始。结构化流管理
内部消费的偏移量,而不是依赖Kafka消费者来完成。这将确保在topic/partitons动态订阅时不 会遗漏任何数据。注意,只有在启动新的流式查询时才会应用startingOffsets,并且恢复操作 始终会从查询停止的位置启动; - 3)、key.deserializer/value.deserializer:Keys/Values总是被反序列化为ByteArrayDeserializer
的字节数组,使用DataFrame操作显式反序列化keys/values; - 4)、key.serializer/value.serializer:keys/values总是使用ByteArraySerializer或StringSerializer
进行序列化,使用DataFrame操作将keysvalues/显示序列化为字符串或字节数组; - 5)、enable.auto.commit:Kafka source不提交任何offset;
- 6)、interceptor.classes:Kafka source总是以字节数组的形式读取key和value。使用
ConsumerInterceptor是不安全的,因为它可能会打断查询;
|