TPCDS 数据生成工具
TPC-DS 测试数据的生成依赖tpcds-kit项目, Databricks forks了改项目,并支持将生产的数据写入到Stdout,然后直接生成测试数据。我们需要提前安装好这个工具。
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=MACOS
在编译过程中可能会报错
w_store_sales.c:141:7: error: implicit declaration of function 'is_set_filter' is invalid in C99 [-Werror,-Wimplicit-function-declaration]
if (!is_set_filter() || is_set_child()) {
^
w_store_sales.c:141:26: error: implicit declaration of function 'is_set_child' is invalid in C99 [-Werror,-Wimplicit-function-declaration]
if (!is_set_filter() || is_set_child()) {
^
2 errors generated.
make: *** [w_store_sales.o] Error 1
(base)
感觉这个错误是因为GCC版本太低导致的,修改一下makefile中的gcc版本, 编译通过
HPUX_CC = gcc-8
LINUX_CC = gcc-8
NCR_CC = cc
MACOS_CC = gcc-8
SOLARIS_CC = gcc-8
通过 spark-sql-perf 项目生成TPCDS测试数据
spark-sql-perf 项目编译
项目Git地址 https://github.com/databricks/spark-sql-perf.git ,后面会提到在本地编译安装的过程中,做了一些修改,也可以参考我fork的地址https://github.com/wankunde/spark-sql-perf . 然后通过 sbt package 命令对项目编译和打包。
下面说一下我自己在编译该项目的时候踩的一些坑吧~~
sbt-spark-package 插件无法下载
此项目在使用SBT进行编译的时候,会抱 addSbtPlugin("org.spark-packages" %% "sbt-spark-package" % "0.1.1") 这个插件无法下载的报错。最后在Maven中央仓库中找到了该插件: https://mvnrepository.com/artifact/org.spark-packages/sbt-spark-package/0.1.1 , 但是该插件的下载地址却并不在中央仓库中,而是 https://repos.spark-packages.org/org/spark-packages/sbt-spark-package/0.1.1/sbt-spark-package-0.1.1.jar ,所以按照常规的library 下载路径进行查找确实是找不到的。Workaround的方案是在 project/plugins.sbt 中手动添加该插件的 resolver 地址: resolvers += "sbt-spark-package-maven" at "https://repos.spark-packages.org/"
走过的弯路
- 因为国内的网络访问SBT相关资源的时候太慢,所以之前配置过一些用于加速sbt 下载的repo地址,还使用了加速下载的插件,感觉有可能会对resolver的解析有影响(0.13.* 和 1.* 都不确定~~) 所以比较稳妥的方案是直接移除本地的
.sbt 配置目录 - SBT 既支持ivy 格式的资源路径,又支持Maven 资源格式路径。现在真不确定SBT是否会根据 resolver 的名称是否含有 -maven 关键字来确定使用哪一种格式来尝试寻找资源。但是我是在name中加上
-maven 格式就改成maven 格式的地址了。如果是自定义的ivy资源地址,可以添加类似的地址 resolvers += Resolver.url("sbt-plugin-ivy", url("https://dl.bintray.com/sbt/sbt-plugin-releases/"))( Patterns("[organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]") )
- Resolve Doc : https://www.scala-sbt.org/1.x/docs/Resolvers.html
- 项目中一些其他的Plugin都可以正常下载, 参考下载地址: https://scala.jfrog.io/ui/native/sbt-plugin-releases/com.databricks/sbt-databricks/scala_2.10/sbt_0.13
生成测试数据
通过命令 build/sbt "test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d <dsdgenDir> -s <scaleFactor> -l <location> -f <format>" 生成需要的测试数据。有不明白的参数,看玉明大佬写的PR
build/sbt "test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d /Users/wakun/ws/apache/tpcds-kit/tools -s 1 -l hdfs://localhost:9000/tpcds -f parquet"
[root@spark-3267648 spark-sql-perf]# build/sbt "test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData --help"
[info] Running com.databricks.spark.sql.perf.tpcds.GenTPCDSData --help
[info] Usage: Gen-TPC-DS-data [options]
[info]
[info] -m, --master <value> the Spark master to use, default to local[*]
[info] -d, --dsdgenDir <value> location of dsdgen
[info] -s, --scaleFactor <value>
[info] scaleFactor defines the size of the dataset to generate (in GB)
[info] -l, --location <value> root directory of location to create data in
[info] -f, --format <value> valid spark format, Parquet, ORC ...
[info] -i, --useDoubleForDecimal <value>
[info] true to replace DecimalType with DoubleType
[info] -e, --useStringForDate <value>
[info] true to replace DateType with StringType
[info] -o, --overwrite <value> overwrite the data that is already there
[info] -p, --partitionTables <value>
[info] create the partitioned fact tables
[info] -c, --clusterByPartitionColumns <value>
[info] shuffle to get partitions coalesced into single files
[info] -v, --filterOutNullPartitionValues <value>
[info] true to filter out the partition with NULL key value
[info] -t, --tableFilter <value>
[info] "" means generate all tables
[info] -n, --numPartitions <value>
[info] how many dsdgen partitions to run - number of input tasks.
[info] --help prints this usage text
注册TPCDS外部表
官方文档上有注册TPCDS 外部表的命令,但是不知道原作者是如何去执行的。 我最终选择了最简单粗暴的方式,直接编写并运行类test:runMain com.databricks.spark.sql.perf.tpcds.CreateTPCTables 搞定
package com.databricks.spark.sql.perf.tpcds
import org.apache.spark.sql.SparkSession
object CreateTPCTables {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Create TPC Tables")
.master("local")
.enableHiveSupport()
.getOrCreate()
val sqlContext = spark.sqlContext
val rootDir = "hdfs://localhost:9000/tpcds"
val databaseName = "tpcds_1g"
val scaleFactor = "1"
val format = "parquet"
val tables = new TPCDSTables(sqlContext,
dsdgenDir = "/Users/wakun/ws/apache/tpcds-kit/tools",
scaleFactor = scaleFactor,
useDoubleForDecimal = false,
useStringForDate = false)
tables.createExternalTables(rootDir, "parquet", databaseName, overwrite = true, discoverPartitions = true)
tables.analyzeTables(databaseName, analyzeColumns = true)
}
}
下面说一下这个过程我踩过的一些坑:
SBT console 无法运行
- 首先再次提一下
sbt-spark-package 这个插件,这个插件的 console 命令会启动一个Spark 本地程序,同时会将项目的的所有程序代码加入到Classpath 中,这样本地快速开发和调试程序的时候非常方便 - 因为我们自己在开发spark application 的时候,会有很多我们自己常用的一些命令,同样的,这个项目定义了
initialCommands ,用于我们常用语句的执行
initialCommands in console :=
"""
|import org.apache.spark.sql._
|import org.apache.spark.sql.functions._
|import org.apache.spark.sql.types._
|import org.apache.spark.sql.hive.test.TestHive
|import TestHive.implicits
|import TestHive.sql
|
|val sqlContext = TestHive
|import sqlContext.implicits._
""".stripMargin
- 第一个问题来了, 上面的初始化代码里使用的都是 spark-hive 模块的test package里的代码,但是项目一般都不会引入 module的 tests 包的啊?没办法,手动加入该依赖
libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion.value classifier "tests" 。 我本地为了加速依赖包的下载,增加了maven local resolver: resolvers += Resolver.mavenLocal , 之后正常进入console。 - 在执行 spark sql 语句的时候,无法正常连接hive metasotre。将对应的
hive-site.xml 和 spark-defaults.conf 拷贝到 resource 文件夹,连接hive metastore 正常 - 连接Hive metastore 之后,默认又报Hive thrift RPC error :
Invalid method name: 'get_all_functions' , 感觉是 hive 客户端和 hive 服务器端两边版本不匹配导致。spark 3.0 默认的Hive 客户端版本是 hive-cli-2.3.8.jar ,升级Hive 服务器版本到 2.3.* 连接恢复正常。 - 此时你以为现在一切都OK了吗?没有~~ 上面的
initialCommands 中使用的是 TestHive ~~, 搞崩溃了,直接转到上面的最终大招,搞定~~
Run tpcds query in spark
Run tpcds query in spark with SBT
sql/test:runMain org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark --data-location file:///opt/tpcds --query-filter q5
[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
[info] Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
[info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q5 6176 6308 188 0.9 1127.4 1.0X
Benchmark in Spark
BenchmarkBase 类是测试基础类,内部定义 main() 方法,然后调用各个子类实现的 runBenchmarkSuite() 方法
object JoinBenchmark extends SqlBasedBenchmark 类实现了 runBenchmarkSuite() 方法,并在内部定义了各种类型的Join测试
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("Join Benchmark") {
broadcastHashJoinLongKey()
broadcastHashJoinLongKeyWithDuplicates()
...
}
}
def broadcastHashJoinLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as string) as v"))
codegenBenchmark("Join w long", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.noop()
}
}
|