pyspark使用hbase详解
一、测试代码:
def write2hbase():
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
spark = SparkSession.builder.appName("nlp").getOrCreate()
print('spark对象已创建')
host = 'emr1:2181,emr2:2181,emr3:2181'
table = 'article_test'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
rawData = ['1,info,article_type,pm', '2,info,article_type,analyze']
print('准备写入数据')
spark.sparkContext.parallelize(rawData).map(lambda x: (x[0], x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,
keyConverter=keyConv,
valueConverter=valueConv)
if __name__ == '__main__':
write2hbase()
二、执行代码:
nohup spark-submit --master yarn --deploy-mode client --conf "spark.pyspark.driver.python=/usr/bin/python3" --conf "spark.pyspark.python=/usr/bin/python3" /pm/project/cbs-nlp/cbs-nlp-base/main/hbase_test.py > /pm/project/cbs-nlp/cbs-nlp-base/main/hbase_test.out &
三、遇到问题:
问题1:找不到类StringToImmutableBytesWritableConverter
解决方式: 1、在spark的jars包中新建hbase目录,我的目录为(/usr/lib/spark-current/jars/hbase) 2、把hbase的lib(/usr/lib/spark-current/jars/hbase)中hbase开头的jar复制到新建的hbase目录中 3、下载spark-examples_2.11-1.6.0-typesafe-001.jar放到创建的hbase文件夹中(链接: https://pan.baidu.com/s/1ZGyqIrfpRmIZGU6Sz7AYmQ 密码: 5n1r) 4、修改spark-env.sh文件,新增一个配置,阿里的EMR的spark-env的地址为 /etc/ecm/spark-conf/spark-env.sh 对于emr还需要配置下面的文件,否则重启后/etc/ecm/spark-conf/spark-env.sh里的配置失效 /var/lib/ecm-agent/cache/ecm/service/SPARK/2.4.5.300.3/package/templates/spark-env.sh
export SPARK_DIST_CLASSPATH="$(/usr/lib/hadoop-current/bin/hadoop classpath):$(/usr/lib/hbase-current/bin/hbase classpath):/usr/lib/spark-current/jars/hbase/*"
/usr/lib/hadoop-current/bin/hadoop :hadoop的安装目录 /usr/lib/hbase-current/bin/hbase :hbase的安装目录 /usr/lib/spark-current/jars/hbase/*:创建的hbase的地址
问题2:找不到方法: org.apache.hadoop.hbase.client.Put.add([B[B[B)Lorg/apache/hadoop/hbase/client/Put
问题原因: spark-examples里面的StringListToPutConverter类调用了hbase-client里面的Put.add函数,由于hbase升级了到2.*之后,hbase-client的Put.add接口变了。从Put.add(Byte[…], Byte[…], Byte[…])变成了 Put.addColumn(Byte[…], Byte[…], Byte[…])。
解决方式: 1、重新编译spark-example源码,具体可参考 https://blog.csdn.net/yanpenggong/article/details/115492035 这里我就放一个已经重新编译好的 (链接: https://pan.baidu.com/s/1jFwfwGsMSh-yuvH2xmmo6g 密码: ua81) 2、将original-spark-examples_2.11_hardfixed-2.4.3.jar替换掉原来下载的spark-examples_2.11-1.6.0-typesafe-001.jar
注意??:spark on cluster模式需要在所有的机器都要以上配置,重启spark集群,我使用的是 client模式,只需要修改执行脚本的那台机器就可以
|