1.创建源kafka主题和目标kafka主题: 2.编写读写kafka的处理代码如下:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import traceback
from pyspark.sql.functions import max
from pyspark.sql.functions import desc
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.types import *
from pyspark.sql.functions import col, column, expr
from pyspark.sql.functions import *
from pyspark.sql import Row
appname = "test"
master = "local[*]"
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
try:
conf = SparkConf().setAppName(appname). \
set('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') \
.set("spark.jars.repositories", 'http://maven.aliyun.com/nexus/content/groups/public/') \
.setMaster(master)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "mykafkatest") \
.load()
words = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "offset", "timestamp")
schema = StructType() \
.add("topic", StringType()) \
.add("age", StringType())
streamSource = words.where(instr("value", 'topic') > 0).select(from_json("value", schema).alias("data")).select(
"data.*")
res = streamSource.withColumn('constfield', lit('1'))
query = res.select(col('age').alias('key'), to_json(struct('topic', 'age','constfield')).alias('value')) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "mykafkatestdest") \
.option("checkpointLocation", '''D:\spark\spark\spark-2.3.0-bin-hadoop2.7\checkpoint''')\
.start()
query.awaitTermination()
print('计算成功!')
except:
traceback.print_exc()
print('连接出错!')
这样从源kafka主题消费到的消息就会被写入到目标mykafkatestdest主题中
其中注意事项包括: 指定spark.jars.packages包以及json和dataframe字段的转换.
参考文献:https://blog.csdn.net/qq_33689414/article/details/86469267
|