KafkaProducer生产数据格式
向kafka发送数据的格式
{"u_id":"11629939865","timestamp":1622955136,"channel_id":"","detail":{"order_id":"6b3a5eb82a1a","goods_id":1004,"goods_name":"coin","amount":819}}
解析数据所对应的 schema
schema = T.StructType() \
.add("u_id", T.StringType()) \
.add("timestamp", T.IntegerType()) \
.add("channel_id", T.StringType()) \
.add("detail", T.StructType()
.add("order_id", T.StringType()) \
.add("goods_id", T.IntegerType()) \
.add("goods_name", T.StringType())
.add("amount", T.IntegerType()))
python 代码
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from common.config import config
from common.environment import SparkEnv
from pyspark.sql import types as T
if __name__ == '__main__':
spark = SparkSession \
.builder \
.master("local[6]") \
.appName("Python Spark basic example") \
.getOrCreate()
spark.sparkContext.setLogLevel("Error")
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", f"{ip}:9092") \
.option("subscribe", f"{topic}") \
.load()
words = df.selectExpr("CAST(topic AS STRING)", "CAST(value AS STRING)")
res = words.withColumn("data", F.from_json("value", schema)).select("data.*") \
.select("unique_id", "timestamp", "channel_id", "detail.*") \
.groupBy("unique_id").agg(F.sum("amount").alias("pay_amount"))
console_query = res.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.option("truncate", "false") \
.outputMode("update") \
.start()
def write(df, batch_id):
df.write.format("com.mongodb.spark.sql") \
.mode("append") \
.option("spark.mongodb.output.uri", f"mongodb://{ip}:27017") \
.option("database", "databaseName") \
.option("collection", "StructuredStreaming").save()
query = res.writeStream \
.option("checkpointLocation", "cp") \
.outputMode("complete").foreachBatch(write).trigger(processingTime='1 minute').start()
query.awaitTermination()
console_query.awaitTermination()
|