IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 采用python开发sparkstreming全流程 -> 正文阅读

[大数据]采用python开发sparkstreming全流程

最近开发了sparkstreaming的程序,且开发语言是采用python的,下述记录了开发的具体代码和过程,方便今后重复使用;

使用场景

需要从kafka的topic上消费数据,最终写入到hadoop集群中,这里面有几个方案;
(1)采用kudu作为存储系统,直接将消费到的数据写入到kudu存储中,之后利用该数据;
(2)消费写入到文件中,放在hdfs上,采用hive-load的方式写入到hive表中,之后利用该数据;
(3)sparkstreaming直接写入到hive的分区表中,后续利用该数据;

注:方案(2)如果是流式处理的话,只能写到一个文件或文件夹当中,不好截取处理,我们最终采用了(3)方案,关于(3)方案会有小文件的问题,后续也会介绍该问题的解决方法;

代码开发

我们采用python的pyspark开发了相应的代码,这里介绍一下具体的环境情况;
spark版本:2.4.0.7
pyspark的版本:2.4.6

代码如下:

#coding:utf-8

from pyspark.sql import SparkSession
 
# basic info
app_name = 'xxx'
 
# kafka info
kafka_broker_list = 'xxxx'
kafka_topic = 'xxx'
kafka_groupid = 'xxx'
kafka_username = 'xxx'
kafka_password = 'xxx'
 
def create_spark_session(app_name,log_level):
    '''
    create sparkSession,setting log level
    :param app_name:
    :return:
    '''
    spark_session = SparkSession \
        .builder \
        .appName(app_name) \
        .enableHiveSupport() \
        .getOrCreate()
    spark_session.sparkContext.setLogLevel('ERROR')
    return spark_session
          
def foreach_batch_function(batch_df, batch_id):
    '''
    '''
    # write no partition table
    #batch_df.write.format('hive').mode('append').saveAsTable('xxxx')
    # write partition table
    batch_df.write.format('hive').mode('append').partitionBy('partitionxxx').saveAsTable('xxx')
     
sqlstr1='''
create table xxx(xxx) partitioned by 'xxx' stored as parquet location 'hdfs://tmp/xxx'
'''       
def main():   
    # 1.create sparkSession
   spark_session = create_spark_session(app_name, 'ERROR')
 
    # 2.create table,this can not created by hive user,privilege permission
    #spark_session.sql(sqlstr1)
 
    # 3.read kafka data, using structstreaming
    df = spark_session.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_broker_list) \
        .option("subscribe", kafka_topic) \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.security.protocol", "SASL_PLAINTEXT") \
        .option("kafka.sasl.jaas.config",
                'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";') \
        .option("startingOffsets", "earliest") \
        .load()
     
    # 4. write partition table, you must set this config   
    spark_session.sql("set hive.exec.dynamic.partition.mode=nonstrict")
     
    # 5.get the data you want to,using sql
    words = df.selectExpr("cast(value as STRING) AS value")
    job_info_df = words.where("get_json_object(value,'$.xxx')='xxx'") \
                       .selectExpr("get_json_object(value,'$.xxx1') as xxx1",
                                   "get_json_object(value,'$.xxx2') as xxx2"
                       )
 
    # 5.using streaming, write table, split streaming to batch, write batch to table
    query1 = job_info_df. \
        writeStream. \
        foreachBatch(foreach_batch_function). \
        trigger(processingTime='1 minute'). \
        start()
    query1.awaitTermination()
    query1.stop()
 
     
if __name__ == '__main__':
    main()

注:
(1)我们在spark集群上运行python代码时,发现如果存在中文字符,会导致执行不通过,故建议不要使用中文注释,防止执行时报错;
(2)本代码是使用structstreaming的方式消费kafka的数据,经过sql处理,选取想要的数据,最终将数据写入到hive分区表中,在写入hive分区表时,我们通过foreachBatch的函数,将读取的流分成多个dataFrame,然后用对应的foreach_batch_function这个函数分别对dataFrame进行处理,这里使用了saveAsTable的方式写入到最终表中,表可以是分区表和非分区表;
(3)不知道是何原因,我们用hive直接建立最终表时,总是会报写入权限的问题,因此我们在sparksql中建立了最终表,且建立的是外表,目录是当前执行用户可以访问的目录;

代码执行

代码执行采用spark-submit的方式执行,具体如下:

spark-submit --master yarn --keytab /home/xxx/xxx.keytab --principal xxx xxx.py

注:我们的集群采用kerberos认证的方式,因此提交时需要使用配置–kertab和–principal这两个参数;此外,我们在用cluster模式运行时,会导致报错,因此我们暂时采用client的模式运行python代码;

spark写入hive小文件的问题

在用spark写入hive时,会出现很多的小文件,我们提供如下的解决方案:
(1)写入的表最好是分区表;
(2)对历史分区(非当前写入的分区)采用如下的命令解决小文件的问题;

INSERT OVERWRITE TABLE tablename PARTITION (partition)
SELECT * FROM tablename
DISTRIBUTE BY partition,cast(rand() * N as int)
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-21 21:28:50  更:2022-06-21 21:28:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:32:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码