最近开发了sparkstreaming的程序,且开发语言是采用python的,下述记录了开发的具体代码和过程,方便今后重复使用;
使用场景
需要从kafka的topic上消费数据,最终写入到hadoop集群中,这里面有几个方案; (1)采用kudu作为存储系统,直接将消费到的数据写入到kudu存储中,之后利用该数据; (2)消费写入到文件中,放在hdfs上,采用hive-load的方式写入到hive表中,之后利用该数据; (3)sparkstreaming直接写入到hive的分区表中,后续利用该数据;
注:方案(2)如果是流式处理的话,只能写到一个文件或文件夹当中,不好截取处理,我们最终采用了(3)方案,关于(3)方案会有小文件的问题,后续也会介绍该问题的解决方法;
代码开发
我们采用python的pyspark开发了相应的代码,这里介绍一下具体的环境情况; spark版本:2.4.0.7 pyspark的版本:2.4.6
代码如下:
#coding:utf-8
from pyspark.sql import SparkSession
# basic info
app_name = 'xxx'
# kafka info
kafka_broker_list = 'xxxx'
kafka_topic = 'xxx'
kafka_groupid = 'xxx'
kafka_username = 'xxx'
kafka_password = 'xxx'
def create_spark_session(app_name,log_level):
'''
create sparkSession,setting log level
:param app_name:
:return:
'''
spark_session = SparkSession \
.builder \
.appName(app_name) \
.enableHiveSupport() \
.getOrCreate()
spark_session.sparkContext.setLogLevel('ERROR')
return spark_session
def foreach_batch_function(batch_df, batch_id):
'''
'''
# write no partition table
#batch_df.write.format('hive').mode('append').saveAsTable('xxxx')
# write partition table
batch_df.write.format('hive').mode('append').partitionBy('partitionxxx').saveAsTable('xxx')
sqlstr1='''
create table xxx(xxx) partitioned by 'xxx' stored as parquet location 'hdfs://tmp/xxx'
'''
def main():
# 1.create sparkSession
spark_session = create_spark_session(app_name, 'ERROR')
# 2.create table,this can not created by hive user,privilege permission
#spark_session.sql(sqlstr1)
# 3.read kafka data, using structstreaming
df = spark_session.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker_list) \
.option("subscribe", kafka_topic) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_PLAINTEXT") \
.option("kafka.sasl.jaas.config",
'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";') \
.option("startingOffsets", "earliest") \
.load()
# 4. write partition table, you must set this config
spark_session.sql("set hive.exec.dynamic.partition.mode=nonstrict")
# 5.get the data you want to,using sql
words = df.selectExpr("cast(value as STRING) AS value")
job_info_df = words.where("get_json_object(value,'$.xxx')='xxx'") \
.selectExpr("get_json_object(value,'$.xxx1') as xxx1",
"get_json_object(value,'$.xxx2') as xxx2"
)
# 5.using streaming, write table, split streaming to batch, write batch to table
query1 = job_info_df. \
writeStream. \
foreachBatch(foreach_batch_function). \
trigger(processingTime='1 minute'). \
start()
query1.awaitTermination()
query1.stop()
if __name__ == '__main__':
main()
注: (1)我们在spark集群上运行python代码时,发现如果存在中文字符,会导致执行不通过,故建议不要使用中文注释,防止执行时报错; (2)本代码是使用structstreaming的方式消费kafka的数据,经过sql处理,选取想要的数据,最终将数据写入到hive分区表中,在写入hive分区表时,我们通过foreachBatch的函数,将读取的流分成多个dataFrame,然后用对应的foreach_batch_function这个函数分别对dataFrame进行处理,这里使用了saveAsTable的方式写入到最终表中,表可以是分区表和非分区表; (3)不知道是何原因,我们用hive直接建立最终表时,总是会报写入权限的问题,因此我们在sparksql中建立了最终表,且建立的是外表,目录是当前执行用户可以访问的目录;
代码执行
代码执行采用spark-submit的方式执行,具体如下:
spark-submit --master yarn --keytab /home/xxx/xxx.keytab --principal xxx xxx.py
注:我们的集群采用kerberos认证的方式,因此提交时需要使用配置–kertab和–principal这两个参数;此外,我们在用cluster模式运行时,会导致报错,因此我们暂时采用client的模式运行python代码;
spark写入hive小文件的问题
在用spark写入hive时,会出现很多的小文件,我们提供如下的解决方案: (1)写入的表最好是分区表; (2)对历史分区(非当前写入的分区)采用如下的命令解决小文件的问题;
INSERT OVERWRITE TABLE tablename PARTITION (partition)
SELECT * FROM tablename
DISTRIBUTE BY partition,cast(rand() * N as int)
|