一、测试数据
二、任务要求
- 任务1:求每个部门的员工工资总额
- 任务2:求每个部门员工工资与奖金总额
- 任务3:将任务2中的结果按照
部门号 进行升序排 - 任务4:将任务2中的结果按照
工资总额 进行降序排
三、编码实现
-
创建maven工程 -
添加spark相关依赖,在pom.xml中添加如下依赖 <packaging>jar</packaging>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.8</spark.version>
<spark.artifact.version>2.12</spark.artifact.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${spark.artifact.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.5.4</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
-
【任务一】代码实现如下:
- 实现代码:
import org.apache.spark.{SparkConf, SparkContext}
object CountSalary {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(CountSalary.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.textFile("d:/Tools/emp.csv")
.map(line => {
val strings = line.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
(deptNo,salary)
})
.reduceByKey(_+_)
.collect()
.foreach(println)
sc.stop()
}
}
- 结果:
-
【任务二】代码实现如下:
- 实现代码:
import org.apache.spark.{SparkConf, SparkContext}
object CountBonusAndSalary {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(CountBonusAndSalary.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.textFile("d:/Tools/emp.csv")
.map(line => {
val strings = line.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
var bonus = 0
if (!"".equals(strings(6)) && null != strings(6)){
bonus = strings(6).toInt
}
(deptNo,salary+bonus)
})
.reduceByKey(_+_)
.collect()
.foreach(println)
sc.stop()
}
}
- 结果如下:
-
【任务三】代码实现如下:
- 代码实现
import org.apache.spark.{SparkConf, SparkContext}
object CountBonusAndSalaryByAsc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(CountBonusAndSalaryByAsc.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.textFile("d:/Tools/emp.csv")
.map(line => {
val strings = line.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
var bonus = 0
if (!"".equals(strings(6)) && null != strings(6)){
bonus = strings(6).toInt
}
(deptNo,salary+bonus)
})
.reduceByKey(_+_)
.sortByKey(true)
.collect()
.foreach(println)
sc.stop()
}
}
- 结果如下
-
【任务四】代码实现如下:
- 代码实现:
import org.apache.spark.{SparkConf, SparkContext}
object CountTotalByAsc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(CountTotalByAsc.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.textFile("d:/Tools/emp.csv")
.map(line => {
val strings = line.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
var bonus = 0
if (!"".equals(strings(6)) && null != strings(6)){
bonus = strings(6).toInt
}
(deptNo,salary+bonus)
})
.reduceByKey(_+_)
.sortBy(tuple2 => {
tuple2._2
},false)
.collect()
.foreach(println)
sc.stop()
}
}
- 结果如下:
四、实验要求
|