一个cdh5.14.4的hadoop数仓集群,使用hive以及hive on spark做批处理。随着job数量以及数据量增加,每天晚上满负荷跑,HDFS io, 网络等达到40gb/s, 而且job执行时间也在慢慢拉长,队列等待平均在5000,高的有15000左右。另外目前存在的问题也有,主要是hive on spark 核与内存比例高达1:8,也就是一个容器使用了1核8g内存甚至10g,设置小了,hive on spark跑不过去。为了提高集群的并发度,减少内存浪费,另外也的确需要一个更快的etl工具。因此测试了spark3.1.2的sql性能,比较下来有5-10倍的提升,以前1个半小时的,现在20分钟,以前30分钟的,现在大概5-7分钟,另外spark sql和hive sql的兼容性高达99%,也就是hive sql大部分可以直接迁移到spark sql上,也就是简单的改一个jdbc地址即可。?
调研及测试了一下,Kyuubi + Spark 3.1.2是一个很好的方式,接下来仅仅做一个备忘。
1. 由于cdh5.14.4的hadoop没有hadoop-client-api包,但是spark 3.2的版本需要这个包,因此3.2编译无法通过,最后选择Spark 3.1.2
2. Kyuubi选择的是 1.2.0 without Spark版本,Kyuubi自带的Spark是2.7的。 为什么使用Kyuubi,而不使用Thirftserver,是因为集群需要考虑多租户,thriftserver我没具体去测试,不清楚是否可以实现
3.集群开启了Kerberos + sentry, 这块和Kyuubi + Spark紧密度不大,开启与否不是重点,不管是否开启都不影响.
1)编译Spark 3.1.2, 官网下载好Spark源代码。
设置好编译环境(我是mac, JAVA_HOME看起来有点奇怪,):
1. 增加环境变量
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home
export MAVEN_HOME=/Users/jia-long.wang/Downloads/apache-maven-3.8.1 ? ? ? ? ? ? ??
export MAVEN_OPTS="-Xmx4g -XX:ReservedCodeCacheSize=1024m"
export SCALA_HOME=/Users/jia-long.wang/Downloads/scala-2.12.8
export PATH=$MAVEN_HOME/bin:$PATH:$SCALA_HOME/bin
2. 修改pom.xml,
增加Cloudera的maven源
<repository>
<id>cloudera</id>
<name>cloudera Repository</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
修改对应的hadoop版本,其他不用动,比如hive版本默认是2.3.7的,不要去修改,修改之后会有很多依赖错误。Spark其实早就可以适配所有版本的hive metastore,只需要使用spark.sql.hive.metastore.jars=/data1/cloudera/parcels/CDH/lib/hive/lib/*
spark.sql.hive.metastore.version=1.1.0
指定自己的hive版本即可
<hadoop.version>2.6.0-cdh5.14.4</hadoop.version>
修改源代码-1,spark3对hadoop2+版本有一个地方支持有问题。参考官方:[SPARK-19545][YARN]Fix compile issue for Spark on Yarn when building against Hadoop 2.6.0~2.6.3 by jerryshao · Pull Request #16884 · apache/spark · GitHub
修改源代码-2,spark-3.1.2的权限有一个地方有bug, 就是普通用户创建表的时候会显示no privileges. 其原理是 spark会去校验 hdfs location,但是hdfs location在create table那一刻还不存在,表都没有建,哪来的location ? 所以会验证失败,超级管理员hive没有这个问题。参考网友的地址:sparksql集成sentry遇到的问题_u012477420的博客-CSDN博客
简单理解就是:
override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
verifyColumnDataType(table.dataSchema)
client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
}
替换为:
override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
verifyColumnDataType(table.dataSchema)
val hiveTable = toHiveTable(table, Some(userName))
if (sparkConf.getBoolean("spark.sql.enable.sentry", defaultValue = false)) {
hiveTable.getTTable.getSd.setLocation(null)
}
client.createTable(hiveTable, ignoreIfExists)
}
修改dev/make-distribution.sh,Spark推荐的方式,采用maven直接编译也是ok的,但是那样无法单独打包二进制包,里面很多源代码也在,简洁起见,使用推荐方式编译。
VERSION=3.1.2
SCALA_VERSION=2.12
SPARK_HADOOP_VERSION=2.6.0-CDH5.14.4
SPARK_HIVE=1
#VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | fgrep --count "<id>hive</id>";\
# # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
# # because we use "set -o pipefail"
# echo -n)
开始编译:
dev/make-distribution.sh --tgz --name 2.6.0-cdh5.14.4 -Pyarn -Phadoop2.6.0-cdh5.14.4 -Dhadoop.version=2.6.0-cdh5.14.4 -Phive -Phive-thriftserver
喝杯茶应该差不多就好了。在spark3.1.2的目录下就看到一个包:spark-3.1.2-bin-2.6.0-cdh5.14.4.tgz
修改spark-env.sh
JAVA_HOME=/usr/java/jdk1.8.0_121
SPARK_CONF_DIR=/data1/software/spark-3.1.2/conf
HADOOP_CONF_DIR=/etc/hadoop/conf
YARN_CONF_DIR=/etc/hadoop/conf
修改spark-defaults.conf
spark.master yarn
spark.eventLog.enabled true
spark.eventLog.dir hdfs://nameservice/tmp/spark-events
spark.serializer org.apache.spark.serializer.KryoSerializer
拷贝 hive-site.xml, core-site.xml, hdfs-site.xml yarn-site.xml到 spark conf目录,spark就不用动了。
2) 安装Kyuubi ,我选择的是官方编译好的版本,但是without Spark。解压缩就算安装好了,其他的就是修改配置文件。
上面spark-defaults.conf其实就增加了日志目录等几个简单的配置,因为所有配置放在kyuubi里,启动的时候,kyuubi会把spark的配置加载,那么问题是spark-defaults.conf需要增加其他配置吗?无所谓了,加也可以,不加也可以。
修改kyuubi-defaults.conf
## Kyuubi Configurations
kyuubi.engine.share.level=CONNECTION
#kyuubi.engine.share.level=USER
#kyuubi.session.engine.idle.timeout=PT30M
kyuubi.sesson.engine.initialize.timeout=PT5M
kyuubi.authentication=KERBEROS
kyuubi.kinit.principal=hive/host224.slave.cluster@xxx.CN
kyuubi.kinit.keytab=/data1/software/kyuubi/kyuubi224.keytab
kyuubi.frontend.bind.host 10.37.54.224
kyuubi.frontend.bind.port 10009
#kyuubi.ha.enabled=true
#kyuubi.ha.zookeeper.quorum=10.37.54.187
#kyuubi.ha.zookeeper.client.port=2181
#kyuubi.ha.zookeeper.namespace=kyuubi
#kyuubi.ha.zookeeper.session.timeout=60000
#kyuubi.ha.zookeepe.acl.enabled=false
kyuubi.zookeeper.embedded.max.session.timeout=120000
hive.metastore.uris thrift://host12.master.cluster.xxx.cn:9083,thrift://host13.master.cluster.xxx.cn:9083
javax.jdo.option.ConnectionURL jdbc:mysql://manager.cluster.xxx.cn:3306/hive?useUnicode=true&characterEncoding=UTF-8
javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver
javax.jdo.option.ConnectionUserName root
javax.jdo.option.ConnectionPassword 123456
spark.sql.hive.metastore.jars=/data1/cloudera/parcels/CDH/lib/hive/lib/*
spark.sql.hive.metastore.version=1.1.0
spark.master=yarn
#spark.yarn.queue=root.default
spark.submit.deployMode=cluster
spark.executor.cores=2
spark.yarn.am.memory=4G
spark.driver.memory=4G
spark.executor.memory=12G
#spark.default.parallelism=10
spark.shuffle.service.port=7337
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.authenticate=false
spark.authenticate.enableSaslEncryption=false
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.schedulerBacklogTimeout=1
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.maxExecutors=1000
spark.shuffle.service.enabled=true
spark.shuffle.useOldFetchProtocol=true
spark.executor.heartbeatInterval=20s
spark.hadoop.fs.hdfs.impl.disable.cache=true
#spark.kerberos.keytab=/data1/software/kyuubi/kyuubi224.keytab
#spark.kerberos.principal=hive/host224.slave.cluster@xxx.CN
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
#spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG=true
#spark.yarn.am.extraJavaOptions="-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true"
spark.security.credentials.hdfs.enabled=true
spark.kerberos.access.hadoopFileSystems=hdfs://nameservice:8020,hdfs://host12.master.cluster.xxx.cn:8020,hdfs://host13.master.cluster.xxx.cn:8020
#spark.kerberos.renewal.credentials=ccache
spark.sql.hive.filesourcePartitionFileCacheSize=786432000
spark.sql.hive.verifyPartitionPath=true
spark.sql.hive.convertMetastoreParquet.mergeSchema=true
spark.hadoopRDD.ignoreEmptySplits=true
spark.sql.hive.convertMetastoreParquet=false
#spark.sql.hive.convertMetastoreParquet.mergeSchema=true
#spark.yarn.jars=hdfs://nameservice:8020/sparkjars
#spark.jars=/data1/software/kyuubi_2/sparkjars/hive-hcatalog-core-1.1.0-cdh5.14.4.jar
spark.sql.enable.sentry=true
spark.hive.warehouse.subdir.inherit.perms=false
配置是不是挺多? 都是经过一段时间的推磨得到的,我觉得你们可以直接用,基本没问题。
修改kyuubi-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_121
export HADOOP_CONF_DIR=/etc/hadoop/conf
export SPARK_HOME=/data1/software/spark-3.1.2
export SPARK_CONF_DIR=/data1/software/spark-3.1.2/conf
export KYUUBI_JAVA_OPTS="-Xmx10g -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=4096 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCondCardMark -XX:MaxDirectMemorySize=1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./logs -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:./logs/kyuubi-server-gc-%t.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=5M -XX:NewRatio=3 -XX:MetaspaceSize=512m"
新建启动kyuubi的kerberos账号,机器名是什么就建一个 xxx/hostname@dns 格式的名字即可,熟悉kerberos的不用多做解释,自然理解。hive/host224.slave.cluster@xxx.CN,为什么我选择hive呢? 因为kyuubi是多租户,有代理功能,如果选择kyuubi之类的用户,还需要在core-site.xml文件添加代理功能,然后还需要重启Hadoop集群,我的天,重启那是不可能的。hive本身在安装hadoop的时候就已经具备代理能力。
3) 启动kyuubi?
bin/kyuubi start, 查看日志及10009端口,没有异常即可。
4) beeline登入10009端口
beeline -u "jdbc:hive2://host224.slave.cluster.xxx.cn:10009/;principal=hive/host224.slave.cluster.xxx.cn@xxx.CN"
beeline登入日志如下:
21/11/29 21:53:52 INFO service.FrontendService: Client protocol version: HIVE_CLI_SERVICE_PROTOCOL_V7
21/11/29 21:53:52 INFO imps.CuratorFrameworkImpl: Starting
21/11/29 21:53:52 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=10.37.54.224:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@79fed433
21/11/29 21:53:52 INFO zookeeper.ClientCnxn: Opening socket connection to server host224.slave.cluster.enn.cn/10.37.54.224:2181. Will not attempt to authenticate using SASL (unknown error)
21/11/29 21:53:52 INFO zookeeper.ClientCnxn: Socket connection established to host224.slave.cluster.enn.cn/10.37.54.224:2181, initiating session
21/11/29 21:53:52 INFO server.NIOServerCnxnFactory: Accepted socket connection from /10.37.54.224:43544
21/11/29 21:53:52 INFO server.ZooKeeperServer: Client attempting to establish new session at /10.37.54.224:43544
21/11/29 21:53:52 INFO server.ZooKeeperServer: Established session 0x100cec978840010 with negotiated timeout 60000 for client /10.37.54.224:43544
21/11/29 21:53:52 INFO zookeeper.ClientCnxn: Session establishment complete on server host224.slave.cluster.enn.cn/10.37.54.224:2181, sessionid = 0x100cec978840010, negotiated timeout = 60000
21/11/29 21:53:52 INFO state.ConnectionStateManager: State change: CONNECTED
21/11/29 21:53:52 INFO engine.EngineRef: Launching engine:
/data1/software/spark-3.1.2/bin/spark-submit \
--class org.apache.kyuubi.engine.spark.SparkSQLEngine \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.sql.hive.verifyPartitionPath=true \
--conf spark.kerberos.access.hadoopFileSystems=hdfs://nameservice:8020,hdfs://host12.master.cluster.xxx.cn:8020,hdfs://host13.master.cluster.xxx.cn:8020 \
--conf spark.executor.heartbeatInterval=20s \
--conf spark.sql.adaptive.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
--conf spark.kyuubi.engine.share.level=CONNECTION \
--conf spark.app.name=kyuubi_CONNECTION_hive_8124e89e-ce69-42e8-9ab2-390b9c6321e3 \
--conf spark.sql.hive.metastore.jars=/data1/cloudera/parcels/CDH/lib/hive/lib/* \
--conf spark.security.credentials.hdfs.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet.mergeSchema=true \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=1 \
--conf spark.sql.hive.filesourcePartitionFileCacheSize=786432000 \
--conf spark.sql.enable.sentry=true \
--conf spark.driver.memory=4G \
--conf spark.hive.warehouse.subdir.inherit.perms=false \
--conf spark.dynamicAllocation.maxExecutors=1000 \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_CONNECTION/hive/8124e89e-ce69-42e8-9ab2-390b9c6321e3 \
--conf spark.yarn.am.memory=4G \
--conf spark.kyuubi.ha.zookeeper.quorum=10.37.54.224:2181 \
--conf spark.submit.deployMode=cluster \
--conf spark.shuffle.service.port=7337 \
--conf spark.master=yarn \
--conf spark.authenticate.enableSaslEncryption=false \
--conf spark.shuffle.useOldFetchProtocol=true \
--conf spark.yarn.tags=KYUUBI \
--conf spark.authenticate=false \
--conf spark.kyuubi.sesson.engine.initiaize.timeout=300000 \
--conf spark.executor.memory=12G \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.executor.cores=2 \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.hadoopRDD.ignoreEmptySplits=true \
--conf spark.kyuubi.ha.zookeeper.acl.enabled=false \
--conf spark.dynamicAllocation.executorIdleTimeout=60 \
--conf spark.sql.hive.metastore.version=1.1.0 \
--proxy-user hive /data1/software/kyuubi/externals/engines/spark/kyuubi-spark-sql-engine-1.2.0.jar
21/11/29 21:53:52 INFO engine.ProcBuilder: Logging to /data1/software/kyuubi/work/hive/kyuubi-spark-sql-engine.log.19
21/11/29 21:54:27 INFO server.NIOServerCnxnFactory: Accepted socket connection from /10.37.54.150:34834
21/11/29 21:54:27 INFO server.ZooKeeperServer: Client attempting to establish new session at /10.37.54.150:34834
21/11/29 21:54:27 INFO server.ZooKeeperServer: Established session 0x100cec978840011 with negotiated timeout 60000 for client /10.37.54.150:34834
21/11/29 21:54:28 INFO client.ServiceDiscovery: Get service instance:host150.slave.cluster.enn.cn:32823 and version:Some(1.2.0) under /kyuubi_CONNECTION/hive/8124e89e-ce69-42e8-9ab2-390b9c6321e3
21/11/29 21:54:28 INFO session.KyuubiSessionImpl: [hive:10.37.54.224] SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] - Connecting to engine [host150.slave.cluster.xxx.cn:32823]
21/11/29 21:54:28 INFO session.KyuubiSessionImpl: [hive:10.37.54.224] SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] - Connected to engine [host150.slave.cluster.xxx.cn:32823]
21/11/29 21:54:28 INFO session.KyuubiSessionImpl: [hive:10.37.54.224] SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] - Sending TOpenSessionReq to engine [host150.slave.cluster.xxx.cn:32823]
21/11/29 21:54:28 INFO session.KyuubiSessionImpl: [hive:10.37.54.224] SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] - Received TOpenSessionResp from engine [host150.slave.cluster.xxx.cn:32823]
21/11/29 21:54:28 INFO imps.CuratorFrameworkImpl: backgroundOperationsLoop exiting
21/11/29 21:54:28 INFO server.PrepRequestProcessor: Processed session termination for sessionid: 0x100cec978840010
21/11/29 21:54:28 INFO zookeeper.ZooKeeper: Session: 0x100cec978840010 closed
21/11/29 21:54:28 INFO server.NIOServerCnxn: Closed socket connection for client /10.37.54.224:43544 which had sessionid 0x100cec978840010
21/11/29 21:54:28 INFO session.KyuubiSessionManager: hive's session with SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] is opened, current opening sessions 1
21/11/29 21:54:28 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x100cec978840010
接下来属于QA阶段,自问自答:
1.? 编译spark为什么不用修改hive版本
因为spark支持适配不同版本的 hive metastore, 只需要指定2个参数:
spark.sql.hive.metastore.jars=/data1/cloudera/parcels/CDH/lib/hive/lib/* spark.sql.hive.metastore.version=1.1.0 (不要写 1.1.0-cdh5.14.4,只能写1.1.0,你要是1.2.0,就写1.2.0,不要带有cdh字样)
2.? 为什么下面的配置有3个,不是常用的2个namenode ? spark.kerberos.access.hadoopFileSystems=hdfs://nameservice:8020,hdfs://host12.master.cluster.enn.cn:8020,hdfs://host13.master.cluster.enn.cn:8020
?因为经过测试,也许是我不细心,不管是单独配置nameservice,还是配置2个实际的namenode都会出现连接hdfs kerberos的错误,不是一直出错,是偶尔,这就很烦躁了,因此我3个都配上去,没有出过错,因此,使用3个。
3.? 如下2个参数为什么注释了?
?#spark.kerberos.keytab=/data1/software/kyuubi/kyuubi224.keytab #spark.kerberos.principal=hive/host224.slave.cluster.xxx.cn@xxx.CN
--proxy-user和keytab只能用一个,因为kyuubi是多租户,有代理功能,在启动的时候会添加--proxy-user来代理用户,因此要注释掉。
4.? 为什么要添加:spark.hive.warehouse.subdir.inherit.perms=false
因为在多租户的时候,你即可有all权限,但是文件的owner不是你,会有一个Warning出现,大概意思是? ?xxxx user is not the owner of thie node=xxxxxxxx, 看着很烦。
5. spark sql 在truncate会出错,权限不足
使用drop table , create table替代,这个地方报错仍然为:xxxx user is not the owner ,我就奇怪了,spark有什么毛病,我不是owner,我有rwx权限即可,你管我是不是owner干什么? 这个地方没有办法,除非你修改代码吧。
6. kyuubi高可用为什么注释了
因为kyuubi的开发写了太多的bug,尤其是这个ha,不管zk是否开启了kerberos,都无法使用。区别在于,开了了,启动kyuubi没问题,beeline连接,创建connection有问题,没开启是2个都有问题。
7. 高可用怎么做?
可以等1.3.0版本出来,或者自己去改代码。 我才用的方式是使用前端load balance,就是和hive高可用一样的道理。
8. 自定义函数怎么用?
我的做法:hdfs://nameservice:8020/sparkjars 专门放自定义jar包,使用的时候add jar即可。
9. shuflle参数:
spark.shuffle.service.enabled=true spark.shuffle.useOldFetchProtocol=true
因为netty版本问题,因此需要添加 use Old Fetch Protocol, 否者会报错,错误为 mesage type 9啥玩意的。
|