内容
package TEST
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{desc, row_number}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object Test2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("rank")
.master("local[*]").enableHiveSupport().getOrCreate()
import spark.implicits._
val array = Array("1,xgs,22", "2,ls,23", "2,ww,18", "3,wemz,17", "3,zy,16", "4,lb,25", "5,zf,26", "6,gy,27")
val rdd = spark.sparkContext.parallelize(array).map { row =>
val Array(id, name, age) = row.split(",")
Row(id, name, age.toInt)
}
val structType = new StructType(Array(
StructField("id", StringType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.createDataFrame(rdd, structType)
df.show()
df.createTempView("tmp")
spark.sql("select id,name,age, row_number() over (partition by id order by age desc) top from tmp").show()
df.withColumn("rk", row_number().over(Window.partitionBy("id").orderBy(desc("age"))))
.where($"rk" <= 1)
.drop("rk")
.show()
}
}
结果
原始数据
+---+----+---+
| id|name|age|
+---+----+---+
| 1| xgs| 22|
| 2| ls| 23|
| 2| ww| 18|
| 3|wemz| 17|
| 3| zy| 16|
| 4| lb| 25|
| 5| zf| 26|
| 6| gy| 27|
+---+----+---+
SQL风格
+---+----+---+---+
| id|name|age|top|
+---+----+---+---+
| 3|wemz| 17| 1|
| 3| zy| 16| 2|
| 5| zf| 26| 1|
| 6| gy| 27| 1|
| 1| xgs| 22| 1|
| 4| lb| 25| 1|
| 2| ls| 23| 1|
| 2| ww| 18| 2|
+---+----+---+---+
DSL风格
+---+----+---+
| id|name|age|
+---+----+---+
| 3|wemz| 17|
| 5| zf| 26|
| 6| gy| 27|
| 1| xgs| 22|
| 4| lb| 25|
| 2| ls| 23|
+---+----+---+
|