一、SparkSQL
1.SparkSQL介绍
??Hive 是 Shark 的前身,Shark 是 SparkSQL 的前身,SparkSQL 产生的根本原因是其完全脱离了 Hive 的限制。
- SparkSQL 支持查询原生的 RDD。RDD 是 Spark 平台的核心概念, 是 Spark 能够高效的处理大数据的各种场景的基础。
- 能够在 scala 、Java中写 SQL 语句。支持简单的 SQL 语法检查,能够在Scala 中写 Hive 语句访问 Hive 数据,并将结果取回作为 RDD 使用。
- Spark on Hive: Hive 只作为储存角色,Spark 负责 sql 解析优化,执行。
- Hive on Spark:Hive 即作为存储又负责 sql 的解析优化,Spark 负责执行。
两者数据源均为Hive表,底层人物均为Spark人物,关键区别在于一个是Hive去解析,一个是Spark去解析。
2.Dataset 与 DataFrame概念解析
??Dataset 也是一个分布式数据容器。与 RDD 类似,然而 Dataset 更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息(元数据),即schema。同时,与 Hive 类似,Dataset 也支持嵌套数据类型(struct、array 和 map)。从 API 易用性的角度上 看, Dataset API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。Dataset 的底层封装的是 RDD,当 RDD 的泛型是 Row 类型的时候,我们也可以称它为 DataFrame。即 Dataset<Row> = DataFrame
3.SparkSQL 的数据源
??SparkSQL 的数据源可以是 JSON 类型的字符串,JDBC,Parquent,Hive,HDFS 等。
4.SparkSQL 底层架构
??首先拿到 sql 后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过 SparkPlanner 的策略转化成一批物理计划,随后经过消费模型转换成一个个的 Spark 任务执行。
5.谓词下推(predicate Pushdown)
二、创建DataSet的几种方式
1.读取 json 格式的文件创建 Dataset
- json 文件中的 json 数据不能嵌套 json 格式数据。
- Dataset 是一个一个 Row 类型的 RDD,
ds.rdd()/ds.javaRdd() 。 - 可以两种方式读取 json 格式的文件。
- df.show()默认显示前 20 行数据。
- Dataset 原生 API 可以操作 Dataset(不方便)。
- 注册成临时表时,表中的列默认按 ascii 顺序显示列。
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class CreateDSFromJosonFile {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("jsonfile")
.master("local")
.getOrCreate();
Dataset<Row> ds = sparkSession.read().format("json").load("data/json");
ds.show();
ds.show(100);
ds.printSchema();
ds.select("name").show();
ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show();
ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show();
ds.groupBy(ds.col("age")).count().show();
ds.createOrReplaceTempView("jtable");
Dataset<Row> sql = sparkSession.sql("select age,count(*) as gg from jtable group by age");
sql.show();
sparkSession.stop();
}
}
+----+--------+
| age| name|
+----+--------+
| 18|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 28|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
+----+--------+
| age| name|
+----+--------+
| 18|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 28|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+--------+
| name|
+--------+
|zhangsan|
| lisi|
| wangwu|
| laoliu|
|zhangsan|
| lisi|
| wangwu|
| laoliu|
|zhangsan|
| lisi|
| wangwu|
+--------+
+--------+------+
| name|addage|
+--------+------+
|zhangsan| 28|
| lisi| null|
| wangwu| 28|
| laoliu| 38|
|zhangsan| 30|
| lisi| null|
| wangwu| 28|
| laoliu| 38|
|zhangsan| 38|
| lisi| null|
| wangwu| 28|
+--------+------+
+--------+---+
| name|age|
+--------+---+
| laoliu| 28|
|zhangsan| 20|
| laoliu| 28|
|zhangsan| 28|
+--------+---+
+----+---+
| age| gg|
+----+---+
|null| 3|
| 28| 3|
| 18| 4|
| 20| 1|
+----+---+
2.通过 json 格式的 RDD 创建 Dataset
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import java.util.Arrays;
public class CreateDSFromJsonRDD {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("jsonrdd")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> nameRDD = jsc.parallelize(Arrays.asList(
"{'name':'zhangsan','age':\"18\"}",
"{\"name\":\"lisi\",\"age\":\"19\"}",
"{\"name\":\"wangwu\",\"age\":\"20\"}"
));
JavaRDD<String> scoreRDD = jsc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
));
Dataset<Row> nameds = sparkSession.read().json(nameRDD);
Dataset<Row> scoreds = sparkSession.read().json(scoreRDD);
nameds.createOrReplaceTempView("nameTable");
scoreds.createOrReplaceTempView("scoreTable");
Dataset<Row> result =
sparkSession.sql("select nameTable.name,nameTable.age,scoreTable.score "
+ "from nameTable join scoreTable "
+ "on nameTable.name = scoreTable.name");
result.show();
sc.stop();
}
}
+--------+---+-----+
| name|age|score|
+--------+---+-----+
| wangwu| 20| 300|
|zhangsan| 18| 100|
| lisi| 19| 200|
+--------+---+-----+
3.非 json 格式的 RDD 创建 Dataset
3.1 反射
??通过反射的方式将非 json 格式的 RDD 转换成 Dataset 。实际上就是先将数据转换成自定义类对象,变成JavaRDD ,在底层通过反射的方式解析Person.class 获得Person 的所有schema 信息(field),结合RDD 本身,就生成了Dataset 。
- 自定义类要可序列化
- 自定义类的访问级别是
Public RDD 转成 Dataset 后会根据映射将字段按 Assci 码排序- 将
Dataset 转换成RDD 时获取字段两种方式,一种是ds.getInt(0) 下标获取(不推荐使用),另一种是 ds.getAs(“列名”) 获取(推荐使用)
Person.java :
package com.shsxt.java_Test.sql.dataset;
import java.io.Serializable;
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
private String id ;
private String name;
private Integer age;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";
}
}
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class CreateDSFromRDDWithReflect {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("reflect")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> lineRDD = jsc.textFile("data/person.txt");
JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
@Override
public Person call(String line) throws Exception {
Person p = new Person();
p.setId(line.split(",")[0]);
p.setName(line.split(",")[1]);
p.setAge(Integer.valueOf(line.split(",")[2]));
return p;
}
});
Dataset<Row> dataFrame = sparkSession.createDataFrame(personRDD, Person.class);
dataFrame.show();
dataFrame.printSchema();
dataFrame.registerTempTable("person");
Dataset sql = sparkSession.sql("select name,id,age from person where id = 2");
sql.show();
JavaRDD<Row> javaRDD = dataFrame.javaRDD();
JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
@Override
public Person call(Row row) throws Exception {
Person p = new Person();
p.setId(row.getAs("id"));
p.setName(row.getAs("name"));
p.setAge(row.getAs("age"));
return p;
}
});
map.foreach(x-> System.out.println(x));
sparkSession.stop();
}
}
+---+---+--------+
|age| id| name|
+---+---+--------+
| 18| 1|zhangsan|
| 19| 2| lisi|
| 20| 3| wangwu|
+---+---+--------+
root
|-- age: integer (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
+----+---+---+
|name| id|age|
+----+---+---+
|lisi| 2| 19|
+----+---+---+
Person [id=1, name=zhangsan, age=18]
Person [id=2, name=lisi, age=19]
Person [id=3, name=wangwu, age=20]
3.2 动态创建 Schema
??动态创建 Schema 将非 json 格式的 RDD 转换成 Dataset。
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
public class CreateDSFromRDDWithStruct {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("struct")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> lineRDD = jsc.textFile("data/person.txt");
final JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
@Override
public Row call(String s) throws Exception {
return RowFactory.create(
s.split(",")[0],
s.split(",")[1],
Integer.valueOf(s.split(",")[2])
);
}
});
List<StructField> asList = Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
df.printSchema();
df.show();
sc.stop();
}
}
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan| 18|
| 2| lisi| 19|
| 3| wangwu| 20|
+---+--------+---+
4.读取 parquet 文件创建 Dataset
??读取与保存二进制格式–parquet 文件。
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.*;
public class CreateDFFromParquet {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("parquet")
.master("local")
.getOrCreate();
Dataset<Row> df = sparkSession.read().json("data/json");
df.show();
df.write().mode(SaveMode.Overwrite).format("parquet").save("data/parquet");
Dataset load = sparkSession.read().format("parquet").load("data/parquet");
load.show();
sparkSession.stop();
}
}
+----+--------+
| age| name|
+----+--------+
| 18|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 28|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
5.通过Bean class来创建 Dataset
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.*;
import java.util.Arrays;
import java.util.List;
public class CreateDSFromBeanClass {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("beanclass")
.master("local")
.getOrCreate();
Person person = new Person();
person.setId("1");
person.setAge(18);
person.setName("zs");
Person person2 = new Person();
person2.setId("2");
person2.setAge(20);
person2.setName("ls");
List<Person> people = Arrays.asList(person, person2);
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> dataset = sparkSession.createDataset(people, personEncoder);
dataset.printSchema();
dataset.show();
dataset.registerTempTable("person");
Dataset<Row> result = sparkSession.sql("select name , id from person");
result.show();
sparkSession.stop();
}
}
root
|-- age: integer (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
+---+---+----+
|age| id|name|
+---+---+----+
| 18| 1| zs|
| 20| 2| ls|
+---+---+----+
+----+---+
|name| id|
+----+---+
| zs| 1|
| ls| 2|
+----+---+
6.读取 JDBC 中的数据创建 Dataset(MySql 为例)
??从MYSQL中读取创建 Dataset与写入。
- 第一种方式读取MySql数据库表,加载为DataFrame
- 第二种方式读取MySql数据表加载为Dataset
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CreateDSFromMysql {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("mysql")
.master("local")
.getOrCreate();
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "root");
options.put("dbtable", "person");
Dataset<Row> person = sparkSession.read().format("jdbc").options(options).load();
person.show();
person.createOrReplaceTempView("person");
DataFrameReader reader = sparkSession.read().format("jdbc");
reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "root");
reader.option("dbtable", "score");
Dataset<Row> score = reader.load();
score.show();
score.createOrReplaceTempView("score");
Dataset<Row> result =
sparkSession.sql("select person.id,person.name,person.age,score.score "
+ "from person,score "
+ "where person.name = score.name and score.score> 82");
result.show();
result.registerTempTable("result");
Dataset<Row> df = sparkSession.sql("select id,name,age,score from result where age > 18");
df.show();
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "root");
result.write().mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties);
System.out.println("----Finish----");
sparkSession.stop();
}
}
7.读取 Hive 中的数据加载成 Dataset
package com.shsxt.scala_Test.sql.dataset
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object CreateDFFromHive {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("hive")
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = spark.sparkContext
spark.sql("use spark")
spark.sql("drop table if exists student_infos")
spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")
spark.sql("load data local inpath './data/student_infos' into table student_infos")
spark.sql("drop table if exists student_scores")
spark.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
spark.sql("load data local inpath './data/student_scores' into table student_scores")
val df: DataFrame = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
spark.sql("drop table if exists good_student_infos2")
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos2")
sc.stop()
}
}
三、序列化问题
??序列化是生成对象的一种方式。
private static final long serialVersionUID =2L;
- 反序列化时
serializable 版本号不一致时会导致不能反序列化。 简单来说,Java 的序列化机制是通过在运行时判断类的serialVersionUID 来验证版本一致性的。在进行反序列化时,JVM 会把传来的字节流中的serialVersionUID 与本地相应实体(类)的serialVersionUID 进行比较,如果相同就认为是一致的, 可以进行反序列化,否则就会出现序列化版本不一致的异常。当实现java.io.Serializable 接口的实体(类)没有显式地定义一个名为serialVersionUID ,类型为long 的变量时,Java 序列化机制会根据编译的class 自动生成一个serialVersionUID 作序列化版本比较用,这种情况下,只有同一次编译生成的class 才会生成相同的serialVersionUID 。如果我们不希望通过编译来强制划分软件版本,即实现序列化接口的实体能够兼容先前版本,未作更改的类,就需要显式地定义一个名为serialVersionUID ,类型为long 的变量,不修改这个变量值的序列化实体都可以相互进行串行化和反串行化。 - 子类中实现了
serializable 接口,父类中没有实现,父类中的变量是不能被序列化,序列化后父类中的变量会得到null。
- 注意:父类实现
serializable 接口,子类没有实现serializable 接口时,子类可以正常序列化(应用:将一些不需要序列化的属性值抽取出来放到父类(未实现序列化接口),子类实现序列化接口) - 被关键字
transient 修饰的变量不能被序列化。 - 静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。
- 静态变量的值是在jvm中,能获取到不是因为反序列化。
四、自定义函数 UDF 和 UDAF
1.自定义函数 UDF
package com.shsxt.java_Test.sql.udf_udaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class UDF {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("udf")
.master("local")
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD,schema);
df.registerTempTable("user");
sparkSession.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1, Integer t2) throws Exception {
return t1.length() + t2;
}
} ,DataTypes.IntegerType );
sparkSession.sql("select name ,StrLen(name,100) as length from user").show();
sparkSession.stop();
}
}
+--------+------+
| name|length|
+--------+------+
|zhangsan| 108|
| lisi| 104|
| wangwu| 106|
+--------+------+
2.自定义函数 UDAF
??实现 UDAF 函数,如果要自定义类,要实现UserDefinedAggregateFunction 类。
package com.shsxt.java_Test.sql.udf_udaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class UDAF {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("udaf")
.master("local")
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<String> parallelize = sc.parallelize(
Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi","zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi"),2);
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() {
private static final long serialVersionUID = 1L;
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0);
System.out.println("init ....." + buffer.get(0));
}
@Override
public void update(MutableAggregationBuffer buffer, Row arg1) {
buffer.update(0, buffer.getInt(0) + 1);
System.out.println("update.....buffer" + buffer.toString() + " | row" + arg1.toString() );
}
public void merge(MutableAggregationBuffer buffer1, Row arg1) {
buffer1.update(0, buffer1.getInt(0) + arg1.getInt(0));
System.out.println("merge.....buffer : " + buffer1.toString() + "| row" + arg1.toString() );
}
@Override
public StructType bufferSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer", DataTypes.IntegerType, true)));
}
@Override
public Object evaluate(Row row) {
return row.getInt(0);
}
@Override
public DataType dataType() {
return DataTypes.IntegerType;
}
@Override
public StructType inputSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));
}
@Override
public boolean deterministic() {
return true;
}
});
sparkSession.sql("select name ,StringCount(name) as number from user group by name").show();
sc.stop();
}
}
+--------+------+
| name|number|
+--------+------+
| wangwu| 2|
|zhangsan| 6|
| lisi| 4|
+--------+------+
5.开窗函数
??SQL函数
package com.shsxt.java_Test.sql.windowfun;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
public class CreateDSFromRDDWithStruct {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("struct")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> lineRDD = jsc.textFile("data/sales.txt");
final JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
@Override
public Row call(String s) throws Exception {
return RowFactory.create(
Integer.valueOf(s.split("\t")[0]),
s.split("\t")[1],
Integer.valueOf(s.split("\t")[2])
);
}
});
List<StructField> asList = Arrays.asList(
DataTypes.createStructField("riqi", DataTypes.IntegerType, true),
DataTypes.createStructField("leibie", DataTypes.StringType, true),
DataTypes.createStructField("jine", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
df.printSchema();
df.show();
df.createOrReplaceTempView("sales");
Dataset<Row> result = sparkSession.sql("select riqi,leibie,jine,rank "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show(100);
sc.stop();
}
}
+----+------+----+----+
|riqi|leibie|jine|rank|
+----+------+----+----+
| 6| F| 96| 1|
| 9| F| 87| 2|
| 9| F| 84| 3|
| 7| E| 97| 1|
| 4| E| 94| 2|
| 9| E| 90| 3|
| 8| B| 98| 1|
| 9| B| 82| 2|
| 7| B| 67| 3|
| 3| D| 93| 1|
| 8| D| 79| 2|
| 8| D| 76| 3|
| 5| C| 95| 1|
| 9| C| 86| 2|
| 9| C| 81| 3|
| 9| A| 99| 1|
| 2| A| 92| 2|
| 9| A| 88| 3|
| 1| G| 91| 1|
| 9| G| 89| 2|
| 8| G| 75| 3|
+----+------+----+----+
|