IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark 结构化stream读写kafka示例 -> 正文阅读

[大数据]spark 结构化stream读写kafka示例

1.创建源kafka主题和目标kafka主题:
在这里插入图片描述
在这里插入图片描述
2.编写读写kafka的处理代码如下:

from pyspark import SparkConf
from pyspark.sql import SparkSession
import traceback
# import builtins as py_builtin
from pyspark.sql.functions import max
from pyspark.sql.functions import desc
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.types import *
from pyspark.sql.functions import col, column, expr
from pyspark.sql.functions import *
from pyspark.sql import Row

appname = "test"  # 任务名称
master = "local[*]"  # 单机模式设置
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
# spark_driver_host = "10.0.0.248"

try:
    # conf = SparkConf().setAppName(appname).setMaster(master).set("spark.driver.host", spark_driver_host) # 集群
    conf = SparkConf().setAppName(appname). \
        set('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') \
        .set("spark.jars.repositories", 'http://maven.aliyun.com/nexus/content/groups/public/') \
        .setMaster(master)  # 本地
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "mykafkatest") \
        .load()
    words = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "offset", "timestamp")
    schema = StructType() \
        .add("topic", StringType()) \
        .add("age", StringType())

    # 通过from_json,定义schema来解析json
    # res = words.where(instr("value", '{') ==0).select(from_json("value", schema).alias("data")).select("data.*")
    streamSource = words.where(instr("value", 'topic') > 0).select(from_json("value", schema).alias("data")).select(
        "data.*")

    # query = res.writeStream \
    #     .format("console") \
    #     .outputMode("append") \
    #     .start()
    res = streamSource.withColumn('constfield', lit('1'))
    query = res.select(col('age').alias('key'), to_json(struct('topic', 'age','constfield')).alias('value')) \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "mykafkatestdest") \
        .option("checkpointLocation", '''D:\spark\spark\spark-2.3.0-bin-hadoop2.7\checkpoint''')\
        .start()
    query.awaitTermination()

    print('计算成功!')
except:
    traceback.print_exc()  # 返回出错信息
    print('连接出错!')

这样从源kafka主题消费到的消息就会被写入到目标mykafkatestdest主题中

其中注意事项包括: 指定spark.jars.packages包以及json和dataframe字段的转换.

参考文献:https://blog.csdn.net/qq_33689414/article/details/86469267

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-08 22:34:18  更:2022-03-08 22:37:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:16:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码