sparkSql数据离线处理
前言:本文作为本人学习sparkSql离线数据抽取,离线数据处理的学习整理记录,文中参考博客均附上原文链接。
一、Hive环境准备
1、配置文件准备:
/opt/hive/conf/hive-site.xml:(2021/12/31修改,添加了&useSSL=false&useUnicode=true&characterEncoding=utf8支持中文编码)
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8</value>
<description>hive的元数据库 </description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>mysql的驱动jar包 </description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>设定数据库的用户名 </description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>xxx</value>
<description>设定数据库的密码</description>
</property>
<property>
<name>hive.exec.max.dynamic.partitions</name>
<value>100000</value>
<description>在所有执行MR的节点上,最大一共可以创建多少个动态分区</description>
</property>
<property>
<name>hive.exec.max.dynamic.partitions.pernode</name>
<value>100000</value>
<description>在所有执行MR的节点上,最大可以创建多少个动态分区</description>
</property>
</configuration>
若要在idea环境下运行要把
hdfs-site.xml
core-site.xml
hive-site.xml
放到resources文件夹中
否则hive.exec.max.dynamic.partitions.pernode,hive.exec.max.dynamic.partitions
配置不生效
2、hosts设置
若在不同网络环境下
需设置本地hosts
设置的内容为集群主机名
Ubuntu的hosts文件在 /etc 下
参考资料:(10条消息) java.lang.IllegalArgumentException: java.net.UnknownHostException: xxx_小健的博客-CSDN博客
3、远程连接服务开启
hive --service metastore
参考资料:(13条消息) hive的几种启动方式_lbl的博客-CSDN博客_hive启动
4、其他
mysql服务启动
service mysqld start
防火墙关闭
systemctl stop firewalld
二、IDEA环境准备
1、pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sparkDome1</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>HiveAndMysql</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<hadoop.version>2.7.7</hadoop.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2、Hadoop环境
window下运行需要准备Hadoop环境
在代码编写中指定hadoop.home.dir
System.setProperty("hadoop.home.dir","........")
3、其他
Scala插件依赖需先下载好
注意环境与集群对应,本文档的环境为Scala-11
三、代码编写
1、全量抽取
import org.apache.spark.sql.SparkSession
object ShopTest {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.appName("ShopTest")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
val mysqlMap = Map(
"url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false",
"user" -> "root",
"password" -> "xxx",
"driver" -> "com.mysql.jdbc.Driver"
)
val inputTable = spark.read.format("jdbc")
.options(mysqlMap)
.option("dbtable", "EcData_tb_1")
.load()
inputTable.createOrReplaceTempView("inputTable")
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
spark.sqlContext.sql("set hive.exec.max.dynamic.partitions.pernode = 10000")
spark.sqlContext.sql("set hive.exec.max.dynamic.partitions = 10000")
spark.sqlContext.sql(
"""
|create table if not exists clown_test_db.ShopTest_ods_tb_1
|(
| InvoiceNo string ,
| StockCode string ,
| Description string ,
| Quantity int ,
| InvoiceDate string ,
| UnitPrice double ,
| CustomerID int ,
| Country string
|)
|partitioned by (country_pid string,customer_pid int)
|row format delimited
|fields terminated by '\t' //本数据中字段值存在','不能用','作为分隔符
|lines terminated by '\n'
|stored as textfile
|""".stripMargin)
spark.sqlContext.sql(
"""
|insert into table clown_test_db.ShopTest_ods_tb_1 partition(country_pid,customer_pid)
|select *,Country,CustomerID from inputTable
|""".stripMargin)
}
}
2、增量抽取
import java.text.SimpleDateFormat
import org.apache.spark.sql.{SaveMode, SparkSession}
object ShopTest2 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val spark = SparkSession.builder()
.appName("ShopTest2")
.master("local[*]")
.config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val inputData = spark.sqlContext.sql("select * from clown_test_db.ShopTest_ods_tb_1")
val timeStr = "2011/01/01 00:00"
val timeTemp = new SimpleDateFormat("yyyy/MM/dd HH:mm").parse(timeStr).getTime
println(timeTemp)
val timeFormat = inputData
.withColumn("InvoiceDate",unix_timestamp($"InvoiceDate","MM/dd/yyyy HH:mm"))
.where(s"InvoiceDate>$timeTemp/1000")
.withColumn("InvoiceDate",from_unixtime($"InvoiceDate","yyyy/MM/dd HH:mm"))
.where("Country='United Kingdom' or Country = 'Finland'")
spark.sqlContext.sql(
"""
|create table if not exists clown_dwd_db.shoptest_dwd_tb_1
|like clown_test_db.ShopTest_ods_tb_1
|""".stripMargin)
timeFormat.write.format("hive")
.mode(SaveMode.Append)
.insertInto("clown_dwd_db.shoptest_dwd_tb_1")
}
}
3、数据清洗
import org.apache.spark.sql.{SaveMode, SparkSession}
object ShopTest3 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val spark = SparkSession.builder()
.appName("ShopTest3")
.master("local[*]")
.config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val data = spark.sqlContext.sql("select * from clown_dwd_db.shoptest_dwd_tb_1")
spark.sqlContext.sql(
"""
|create table if not exists clown_dwd_db.shopTest_dwd_tb_3
|(
| InvoiceNo string ,
| StockCode string ,
| Description string ,
| Quantity int ,
| InvoiceDate string ,
| UnitPrice double ,
| CustomerID int ,
| Country string
|)
|partitioned by (country_pid string)
|row format delimited
|fields terminated by '\t'
|lines terminated by '\n'
|stored as textfile
|""".stripMargin)
data.na.fill(
Map(
"Country"->"Country_Null",
"CustomerID"->0
)
)
.na.drop(
Seq("UnitPrice","Quantity")
) .selectExpr("InvoiceNo","StockCode","Description","Quantity","InvoiceDate","UnitPrice","CustomerID","Country","Country")
.limit(10000)
.write
.format("hive")
.mode(SaveMode.Append)
.insertInto("clown_dwd_db.shopTest_dwd_tb_3")
}
}
4、指标计算
import org.apache.spark.sql.SparkSession
object ShopTest4 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.appName("ShopTest4")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
data.dropDuplicates("CustomerID","Country")
.withColumn("x",lit(1))
.groupBy("Country")
.sum("x")
.show(20)
data.withColumn("x", $"Quantity" * $"UnitPrice")
.groupBy("Country")
.sum("x")
.withColumn("sum(x)", round($"sum(x)", 2))
data.groupBy("StockCode")
.sum("Quantity")
.coalesce(1)
.orderBy(desc("sum(Quantity)"))
.show(10)
data.withColumn("InvoiceDate",substring_index($"InvoiceDate","/",2))
.withColumn("x",$"Quantity"*$"UnitPrice")
.groupBy("InvoiceDate")
.sum("x")
.coalesce(1)
.orderBy(desc("InvoiceDate"))
.withColumn("sum(x)",round($"sum(x)",2))
.show(100)
data.select(col("Description"))
.flatMap(x=>x.toString().split("\\W"))
.withColumn("x",lit(1))
.groupBy("value")
.sum("x")
.where("value != '' ")
.coalesce(1)
.orderBy(desc("sum(x)"))
.show(300)
}
}
import org.apache.spark.sql.SparkSession
object ShopTest5 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.appName("ShopTest5")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
data.createOrReplaceTempView("dataTable")
spark.sqlContext.sql(
"""
|select Country,count(distinct Country,CustomerID) from dataTable group by Country
|""".stripMargin)
.show()
spark.sqlContext.sql(
"""
|select Country ,round(sum(Quantity*UnitPrice),2)
|from dataTable
|group by Country
|""".stripMargin)
.show()
spark.sqlContext.sql(
"""
|select StockCode,round(sum(Quantity*UnitPrice),2) as xl
|from dataTable
|group by StockCode
|order by xl desc
|""".stripMargin)
.show(10)
spark.sqlContext.sql(
"""
|select substring_index(InvoiceDate,"/",2) as time,round(sum(Quantity*UnitPrice),2) as sum
|from dataTable
|group by substring_index(InvoiceDate,"/",2)
|order by substring_index(InvoiceDate,"/",2)
|""".stripMargin)
.show()
}
}
四、其他
1、hive分区的增删改查
参考资料:(15条消息) HIve学习:Hive分区修改_u011047968的专栏-CSDN博客_hive修改分区
hive表新增分区:[]内的不必要
alter table tb_name add partition (pid1 = ‘’,pid2 = ) [location ‘xxx’]
多个分区
alter table tb_name add partition (pid1 = ‘’,pid2 = ) partition (pid1 = ‘’,pid2 = ) [location ‘xxx’]
hive表修改分区:
alter table tb_name partition(pid1='') rename to partition(pid1='');
alter table tb_name partition(pid1='') set location 'hdfs://master:8020/....';
alter table tb_name partition column (pid1 string);
hive表删除分区:
alter table tb_name drop partition (pid1 = ‘’,pid2 = )[ partition (pid1 = ‘’,pid2 = )…]
hive分区值查询:
show partitions tb_name;
2、spark打包运行
命令:
spark-submit --class ShopTest4 --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar
若使用了jdbc连接,需要指明驱动jar包 mysql-connector-java-5.1.48.jar
spark-submit --jars mysql-connector-java-5.1.48.jar --class ShopTest --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar
或者将mysql驱动放至 $‘spark_home’/jars 目录下
3、时间格式
时间模式字符串用来指定时间格式。在此模式中,所有的 ASCII 字母被保留为模式字母,定义如下:
字母 | 描述 | 示例 |
---|
G | 纪元标记 | AD | y | 四位年份 | 2001 | M | 月份 | July or 07 | d | 一个月的日期 | 10 | h | A.M./P.M. (1~12)格式小时 | 12 | H | 一天中的小时 (0~23) | 22 | m | 分钟数 | 30 | s | 秒数 | 55 | S | 毫秒数 | 234 | E | 星期几 | Tuesday | D | 一年中的日子 | 360 | F | 一个月中第几周的周几 | 2 (second Wed. in July) | w | 一年中第几周 | 40 | W | 一个月中第几周 | 1 | a | A.M./P.M. 标记 | PM | k | 一天中的小时(1~24) | 24 | K | A.M./P.M. (0~11)格式小时 | 10 | z | 时区 | Eastern Standard Time | ’ | 文字定界符 | Delimiter | " | 单引号 | ` |
4、Scala正则表达式
Scala 的正则表达式继承了 Java 的语法规则,Java 则大部分使用了 Perl 语言的规则。
下表我们给出了常用的一些正则表达式规则:(注意:\需要转义,算子中写为\,sql语句中写为\\\)
表达式 | 匹配规则 |
---|
^ | 匹配输入字符串开始的位置。 | $ | 匹配输入字符串结尾的位置。 | . | 匹配除"\r\n"之外的任何单个字符。 | […] | 字符集。匹配包含的任一字符。例如,"[abc]“匹配"plain"中的"a”。 | [^…] | 反向字符集。匹配未包含的任何字符。例如,"[^abc]“匹配"plain"中"p”,“l”,“i”,“n”。 | \A | 匹配输入字符串开始的位置(无多行支持) | \z | 字符串结尾(类似$,但不受处理多行选项的影响) | \Z | 字符串结尾或行尾(不受处理多行选项的影响) | re* | 重复零次或更多次 | re+ | 重复一次或更多次 | re? | 重复零次或一次 | re{ n} | 重复n次 | re{ n,} | | re{ n, m} | 重复n到m次 | a|b | 匹配 a 或者 b | (re) | 匹配 re,并捕获文本到自动命名的组里 | (?: re) | 匹配 re,不捕获匹配的文本,也不给此分组分配组号 | (?> re) | 贪婪子表达式 | \w | 匹配字母或数字或下划线或汉字 | \W | 匹配任意不是字母,数字,下划线,汉字的字符 | \s | 匹配任意的空白符,相等于 [\t\n\r\f] | \S | 匹配任意不是空白符的字符 | \d | 匹配数字,类似 [0-9] | \D | 匹配任意非数字的字符 | \G | 当前搜索的开头 | \n | 换行符 | \b | 通常是单词分界位置,但如果在字符类里使用代表退格 | \B | 匹配不是单词开头或结束的位置 | \t | 制表符 | \Q | 开始引号:\Q(a+b)*3\E 可匹配文本 “(a+b)*3”。 | \E | 结束引号:\Q(a+b)*3\E 可匹配文本 “(a+b)*3”。 |
正则表达式实例
实例 | 描述 |
---|
. | 匹配除"\r\n"之外的任何单个字符。 | [Rr]uby | 匹配 “Ruby” 或 “ruby” | rub[ye] | 匹配 “ruby” 或 “rube” | [aeiou] | 匹配小写字母 :aeiou | [0-9] | 匹配任何数字,类似 [0123456789] | [a-z] | 匹配任何 ASCII 小写字母 | [A-Z] | 匹配任何 ASCII 大写字母 | [a-zA-Z0-9] | 匹配数字,大小写字母 | [^aeiou] | 匹配除了 aeiou 其他字符 | [^0-9] | 匹配除了数字的其他字符 | \d | 匹配数字,类似: [0-9] | \D | 匹配非数字,类似: [^0-9] | \s | 匹配空格,类似: [ \t\r\n\f] | \S | 匹配非空格,类似: [^ \t\r\n\f] | \w | 匹配字母,数字,下划线,类似: [A-Za-z0-9_] | \W | 匹配非字母,数字,下划线,类似: [^A-Za-z0-9_] | ruby? | 匹配 “rub” 或 “ruby”: y 是可选的 | ruby* | 匹配 “rub” 加上 0 个或多个的 y。 | ruby+ | 匹配 “rub” 加上 1 个或多个的 y。 | \d{3} | 刚好匹配 3 个数字。 | \d{3,} | 匹配 3 个或多个数字。 | \d{3,5} | 匹配 3 个、4 个或 5 个数字。 | \D\d+ | 无分组: + 重复 \d | (\D\d)+/ | 分组: + 重复 \D\d 对 | ([Rr]uby(, )?)+ | 匹配 “Ruby”、“Ruby, ruby, ruby”,等等 |
常用可以应用正则的函数:
.split("")切割字符串
.regexp_extract(string subject, string pattern, int index) 将字符串subject按照pattern正则表达式的规则拆分,返回index指定的字符
.regexp_replace(string A, string B, string C) 将字符串A中的符合Java正则表达式B的部分替换为C
.equals("")匹配
5、SQL like与rlike
like为通配符匹配,不是正则
%:匹配零个及多个任意字符
_:与任意单字符匹配
[]:匹配一个范围
[^]:排除一个范围
rlike为正则匹配
regexp与rlike功能相似
参考资料:(15条消息) sparksql 正则匹配总结_Andrew LD-CSDN博客_spark 正则表达式
6、中文数据
关于csv文件若包含中文,可在读取时设置option参数
val inputData = spark.sqlContext.read.format("csv")
.option("sep","\t")
.option("encoding","GBK")
.option("header","true")
.load("file:///C:\\Users\\61907\\Desktop\\BigData\\Spark\\sparkDome1\\HiveAndMysql\\src\\main\\resources\\cov19.csv")
jdbc读取数据库数据时,若有中文需设置jdbc连接参数
&useUnicode=true&characterEncoding=utf8
val mysqlMap = Map(
"url"->"jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
"user"->"root",
"password"->"xxx",
"driver"->"com.mysql.jdbc.Driver"
)
关于hive中存储中文数据,中文注释,中文分区(索引)
Ⅰ~Ⅲ参考资料:
(16条消息) hive设置中文编码格式utf-8_2020xyz的博客-CSDN博客_hive建表指定编码格式
(16条消息) hive修改使用utf8编码支持中文字符集_那又怎样?的博客-CSDN博客_hive默认字符集编码
Ⅰ.元数据库设置
元数据库需设置为utf-8编码
mysql>create database hive DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
mysql>alter database hive character set utf8;
mysql>use hive;
mysql>show variables like 'character_set_database';
Ⅱ.相关表设置
1).修改字段注释字符集
mysql>alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
2).修改表注释字符集
mysql>alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
类似的,PARAM_KEY若需要中文也可设置为utf8
3).修改分区表参数,以支持分区能够用中文表示
mysql>alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
mysql>alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
另外,PARTITIONS表中存放分区名的字段也需要修改为utf8
mysql>alter table PARTITIONS modify column PART_name varchar(4000) character set utf8;
4).修改索引注解
mysql>alter table INDEX_PARAMS modify column PARAM_VALUE varchar(250) character set utf8;
Ⅲ.hive-site.xml配置文件设置
需要在jdbc连接中设置支持中文编码
&useSSL=false&useUnicode=true&characterEncoding=utf8
其中&需要使用& 转义
参考资料:(16条消息) 【已解决】The reference to entity “useSSL” must end with the ‘;’ delimiter_清宵尚温的博客-CSDN博客
/opt/hive/conf/hive-site.xml:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8</value>
<description>hive的元数据库 </description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>mysql的驱动jar包 </description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>设定数据库的用户名 </description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>xxx</value>
<description>设定数据库的密码</description>
</property>
<property>
<name>hive.exec.max.dynamic.partitions</name>
<value>100000</value>
<description>在所有执行MR的节点上,最大一共可以创建多少个动态分区</description>
</property>
<property>
<name>hive.exec.max.dynamic.partitions.pernode</name>
<value>100000</value>
<description>在所有执行MR的节点上,最大可以创建多少个动态分区</description>
</property>
</configuration>
Ⅳ.未解决问题
hdfs文件系统中显示
虽然正常显示中文但在文件夹中会出现
Path does not exist on HDFS or WebHDFS is disabled. Please check your path or enable WebHDFS
可能是中文路径导致的错误,但该错误目前未影响到分区表的正常操作,具体影响仍需实验。
Ⅴ.暴力脚本- -
参考资料:(16条消息) hive分区字段含中文导致的报错_一定要努力努力再努力的博客-CSDN博客_hive分区字段是中文
alter database hive_meta default character set utf8;
alter table BUCKETING_COLS default character set utf8;
alter table CDS default character set utf8;
alter table COLUMNS_V2 default character set utf8;
alter table DATABASE_PARAMS default character set utf8;
alter table DBS default character set utf8;
alter table FUNCS default character set utf8;
alter table FUNC_RU default character set utf8;
alter table GLOBAL_PRIVS default character set utf8;
alter table PARTITIONS default character set utf8;
alter table PARTITION_KEYS default character set utf8;
alter table PARTITION_KEY_VALS default character set utf8;
alter table PARTITION_PARAMS default character set utf8;
alter table ROLES default character set utf8;
alter table SDS default character set utf8;
alter table SD_PARAMS default character set utf8;
alter table SEQUENCE_TABLE default character set utf8;
alter table SERDES default character set utf8;
alter table SERDE_PARAMS default character set utf8;
alter table SKEWED_COL_NAMES default character set utf8;
alter table SKEWED_COL_VALUE_LOC_MAP default character set utf8;
alter table SKEWED_STRING_LIST default character set utf8;
alter table SKEWED_STRING_LIST_VALUES default character set utf8;
alter table SKEWED_VALUES default character set utf8;
alter table SORT_COLS default character set utf8;
alter table TABLE_PARAMS default character set utf8;
alter table TAB_COL_STATS default character set utf8;
alter table TBLS default character set utf8;
alter table VERSION default character set utf8;
alter table BUCKETING_COLS convert to character set utf8;
alter table CDS convert to character set utf8;
alter table COLUMNS_V2 convert to character set utf8;
alter table DATABASE_PARAMS convert to character set utf8;
alter table DBS convert to character set utf8;
alter table FUNCS convert to character set utf8;
alter table FUNC_RU convert to character set utf8;
alter table GLOBAL_PRIVS convert to character set utf8;
alter table PARTITIONS convert to character set utf8;
alter table PARTITION_KEYS convert to character set utf8;
alter table PARTITION_KEY_VALS convert to character set utf8;
alter table PARTITION_PARAMS convert to character set utf8;
alter table ROLES convert to character set utf8;
alter table SDS convert to character set utf8;
alter table SD_PARAMS convert to character set utf8;
alter table SEQUENCE_TABLE convert to character set utf8;
alter table SERDES convert to character set utf8;
alter table SERDE_PARAMS convert to character set utf8;
alter table SKEWED_COL_NAMES convert to character set utf8;
alter table SKEWED_COL_VALUE_LOC_MAP convert to character set utf8;
alter table SKEWED_STRING_LIST convert to character set utf8;
alter table SKEWED_STRING_LIST_VALUES convert to character set utf8;
alter table SKEWED_VALUES convert to character set utf8;
alter table SORT_COLS convert to character set utf8;
alter table TABLE_PARAMS convert to character set utf8;
alter table TAB_COL_STATS convert to character set utf8;
alter table TBLS convert to character set utf8;
alter table VERSION convert to character set utf8;
SET character_set_client = utf8 ;
SET character_set_database = utf8 ;
SET character_set_results = utf8 ;
SET character_set_server = utf8 ;
SET NAMES 'utf8';
只复制了博客中修改表字段的部分
看看就好,最好还是根据需求修改。
Ⅵ.实例
import org.apache.spark.sql.{SaveMode, SparkSession}
object CNHivePartitionTest {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "D:\\BaiduNetdiskDownload\\hadoop-2.7.3")
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.appName("Cov19DataDome4")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val mysqlMap = Map(
"url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
"user" -> "root",
"password" -> "xxx",
"driver" -> "com.mysql.jdbc.Driver"
)
val mysqlData = spark.read.format("jdbc")
.options(mysqlMap)
.option("dbtable","tc_hotel2")
.load()
spark.sqlContext.sql(
"""
|create table if not exists clown_test_db.CNTest
|(
| `hname` string,
| `hbrand` string,
| `province` string,
| `city` string,
| `starlevel` string,
| `rating` string,
| `comment_count` string,
| `price` string
|)
|partitioned by (pid string)
|row format delimited
|fields terminated by '\t'
|lines terminated by '\n'
|stored as textfile
|""".stripMargin)
mysqlData
.select(col("*"),col("province"))
.write
.format("hive")
.mode(SaveMode.Append)
.insertInto("clown_test_db.CNTest")
}
}
7、表连接join/union
参考资料:https://blog.csdn.net/m0_37809146/article/details/91282446
val tb1 = spark.read.format("jdbc")
.options(mysqlMap)
.option("dbtable", "cov19_test_tb")
.load()
val tb2 = spark.read.format("jdbc")
.options(mysqlMap)
.option("dbtable", "cov19_test_tb_2")
.load()
.withColumnRenamed("", "")
tb1.join(tb2, Seq("provinceName", "cityName"), "inner")
tb1.join(tb2, Seq("provinceName", "cityName"), "right")
tb1.join(tb2, Seq("provinceName", "cityName"), "left")
val testTb1 = tb1.withColumnRenamed("cityName", "tb1CN")
val testTb2 = tb2.withColumnRenamed("cityName", "tb1CN")
testTb1.join(testTb2, "tb1CN")
tb1.join(tb2, Seq("provinceName", "cityName"), "right_outer")
tb1.join(tb2, Seq("provinceName", "cityName"), "left_outer")
tb1.join(tb2, Seq("provinceName", "cityName"), "outer")
tb1.join(tb2, Seq("provinceName", "cityName"), "full")
tb1.join(tb2, Seq("provinceName", "cityName"), "full_outer")
tb1.join(tb2, Seq("provinceName", "cityName"), "left_semi")
.show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "left_anti")
.show(100)
8、自定义UDF,UDAF函数
Spark 2.4.0编程指南–Spark SQL UDF和UDAF-阿里云开发者社区 (aliyun.com)
(17条消息) Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator_L-CSDN博客
(17条消息) UDF和UDAF,UDTF的区别_山海-CSDN博客_udf和udtf区别
[(17条消息) Spark] 自定义函数 udf & pandas_udf_風の唄を聴け的博客-CSDN博客_pandas spark udf
9、数据集获取
UCI机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http://archive.ics.uci.edu/ml/
Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。这些数据集包括人类基因组项目、Common Craw网页语料库、维基百科数据和Google Books Ngrams。相关信息可参见:http://aws.amazon.com/publicdatasets/
Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分类、回归、排名、推荐系统以及图像分析领域,可从
Competitions区域下载:http://www.kaggle.com/competitions
KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该列表位于:http://www.kdnuggets.com/datasets/index.html
10、数仓分层概念
参考资料:(10条消息) 数据仓库–数据分层(ETL、ODS、DW、APP、DIM)_hello_java_lcl的博客-CSDN博客_dim层
五、实战复盘
1、2022/1/3
题目:
数据源:
csv文件(未修改)
mysql表格(增加脏数据)
环境准备:
1.mysql数据表格 2.hive目标表 3.pom文件
完成速度:
3h+
遇到问题:
1.data->mysql,数据保存
SaveMode.Overwrite 保存至mysql数据库,不仅会覆盖数据格式,字段名也会被覆盖
在做题途中遇到了保存SaveMode.Append失败的错误,修改为Overwrite 不报错,原因不明
是否解决: ?
出现错误
Unknown column 'sum' in 'field list'
原因是字段名与mysql数据库目标表中的字段名不同
修改字段名相同即可
.withColumnRenamed("sum","total_price")
在hive中是否有相同特性?
2.Join等表连接的使用
Join,union仍不熟悉 select子查询也比较生疏
是否解决: ??
join理解下图足够
union联合要求字段相同 否则报错
3.Date计算
参考资料:https://blog.csdn.net/wybshyy/article/details/52064337
使用datediff不需要转换时间格式
是否解决: ?
参考资料:
(18条消息) Spark-SQL常用内置日期时间函数_绿萝蔓蔓绕枝生-CSDN博客_sparksql 时间函数
(18条消息) sparksql 时间函数_OH LEI``-CSDN博客_sparksql时间函数
datediff 计算两个时间差天数 结果返回一个整数
对时间格式可能有要求例如‘2021/1/4‘这样的时间格式无法被计算(sql中,算子貌似没有这个问题)
sql写法:
spark.sql(
"""
|select datediff('2021-1-4','2020-12-30')
|""".stripMargin).show()
算子写法:
.withColumn("o",datediff(col("delivery_date"),col("order_date")))
months_between 计算两个时间差月数 结果返回一个浮点数
sql写法:
spark.sql(
"""
|select months_between('2021-1-4','2020-12-30')
|""".stripMargin).show()
返回:0.16129032
若想返回整数月份可以将天数删除:
spark.sql(
"""
|select months_between('2021-1','2020-12')
|""".stripMargin).show()
返回:1.0
算子写法:
.withColumn("o",months_between(col("delivery_date"),col("order_date")))
直接用时间戳相减通过计算也可以
spark.sql(
"""
|select (unix_timestamp('2022/1/1','yyyy/MM/dd') - unix_timestamp('2021/12/31','yyyy/MM/dd'))/60/60/24
|""".stripMargin).show()
|