目录
01:上篇回顾
02:本篇内容
03:SparkStreaming的缺点
04:StructStreaming的设计
05:官方示例WordCount
06:自定义开发WordCount实现
07:Source数据源类型及File Source
08:常用Query查询器选项
09:Sink数据源:支持类型
10:Sink数据源:Foreach Sink
11:StructStreaming容错机制
12:集成Kafka:消费Kafka数据
13:集成Kafka:生产Kafka数据
14:物联网设备分析:需求及环境
15:物联网设备分析:DSL实现
16:物联网设备分析:SQL实现
17:数据去重Deduplication
18:Continuous Processing
19:事件时间处理:需求及实现
20:事件时间处理:延迟数据处理
附录一:Streaming Maven依赖
01:上篇回顾
https://blog.csdn.net/m0_57498038/article/details/119113968
-
DStream的函数有哪些?
-
流式计算的三种计算模式及应用场景是什么?
-
SparkStreaming集成Kafka的方式和原理是什么?
-
SparkStreaming程序怎么做容灾?
02:本篇内容
-
StructStreaming基本介绍
-
==StructStreaming的使用==
-
驱动接口:SparkSession -
数据抽象:DataSet -
开发方式:DSL 、SQL -
数据源
-
Source:Kafka -
Sink:Kafka、MySQL、Redis
-
物理网设备分析的案例:DSL和SQL
-
了解一些其他特性
-
流式数据的去重 -
持续数据处理:真实时计算
-
EventTime:基于数据的事件时间的处理
03:SparkStreaming的缺点
-
目标:了解SparkStreaming的设计缺点 -
实施
-
缺点1:对于数据分析业务而言,编程复杂,底层都是RDD的函数式编程
-
缺点2:批流代码不统一
-
缺点3:很难支持流式应用端到端精确性一次语义,需要自己保证Input和Output的Exactly Once
-
缺点4:使用的是Processing Time而不是Event Time
-
解决:流式计算一直没有一套标准化、能应对各种场景的模型,直到2015年Google发表了The Dataflow Model的论文( https://yq.aliyun.com/articles/73255 )。  ?
-
小结
04:StructStreaming的设计
05:官方示例WordCount
-
目标:实现官方示例程序WordCount的测试 -
实施
-
小结
06:自定义开发WordCount实现
07:Source数据源类型及File Source
08:常用Query查询器选项
09:Sink数据源:支持类型
10:Sink数据源:Foreach Sink
11:StructStreaming容错机制
12:集成Kafka:消费Kafka数据
13:集成Kafka:生产Kafka数据
14:物联网设备分析:需求及环境
-
目标:了解物联网设备分析的需求及环境配置 -
实施
-
数据格式:
-
device:设备名称 -
deviceType:设备类型 -
signal:信号 -
time:时间
{"device":"device_62","deviceType":"db","signal":31.0,"time":1608805779682}
{"device":"device_32","deviceType":"kafka","signal":85.0,"time":1608805780208}
{"device":"device_65","deviceType":"db","signal":73.0,"time":1608805780724}
{"device":"device_98","deviceType":"bigdata","signal":58.0,"time":1608805780914}
{"device":"device_43","deviceType":"route","signal":54.0,"time":1608805781028}
{"device":"device_71","deviceType":"bigdata","signal":31.0,"time":1608805781320}
{"device":"device_20","deviceType":"bigdata","signal":85.0,"time":1608805781481}
{"device":"device_96","deviceType":"bigdata","signal":26.0,"time":1608805782002}
{"device":"device_96","deviceType":"bigdata","signal":55.0,"time":1608805782411}
{"device":"device_62","deviceType":"db","signal":21.0,"time":1608805782980} -
需求条件
-
需求:各种设备类型的设备数量和平均信号强度 设备类型 个数 平均信号强度 -
条件:信号强度必须大于30
-
创建topic # 创建topic
kafka-topics.sh --create --zookeeper node1:2181/kafka200 --replication-factor 1 --partitions 3 --topic iotTopic
# 模拟生产者
kafka-console-producer.sh --broker-list node1:9092 --topic iotTopic
# 模拟消费者
kafka-console-consumer.sh --bootstrap-server node1:9092 --topic iotTopic --from-beginning
# 删除topic
kafka-topics.sh --delete --zookeeper node1:2181/kafka200 --topic iotTopic -
模拟数据 -
创建 DeviceData的Bean对象 -
package bigdata.spark.iot.mock
/**
* 物联网设备发送状态数据
*
* @param device 设备标识符ID
* @param deviceType 设备类型,如服务器mysql, redis, kafka或路由器route
* @param signal 设备信号
* @param time 发送数据时间
*/
case class DeviceData(
device: String, //
deviceType: String, //
signal: Double, //
time: Long //
) 模拟数据代码 -
package cn.bigdata.spark.iot.mock
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
object MockIotDatas {
def main(args: Array[String]): Unit = {
// 发送Kafka Topic
val props = new Properties()
props.put("bootstrap.servers", "node1:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val deviceTypes = Array(
"db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata"
)
val random: Random = new Random()
while (true){
val index: Int = random.nextInt(deviceTypes.length)
val deviceId: String = s"device_${(index +1) * 10 + random.nextInt(index + 1)}"
val deviceType: String = deviceTypes(index)
val deviceSignal: Int = 10 + random.nextInt(90)
// 模拟构造设备数据
val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())
// 转换为JSON字符串
val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)
println(deviceJson)
Thread.sleep(100 + random.nextInt(500))
val record = new ProducerRecord[String, String]("iotTopic", deviceJson)
producer.send(record)
}
// 关闭连接
producer.close()
}
}
-

小结
15:物联网设备分析:DSL实现
-
目标:使用StructStreaming的DSL实现物理网设备分析 -
实施 package bigdata.spark.struct.iot
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{DoubleType, LongType, StringType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @ClassName StructIOTDSL
* @Description TODO 使用结构化流DSL计算构建Wordcount程序
*/
object StructIOTDSL {
def main(args: Array[String]): Unit = {
//todo:1-构建SparkSession
val spark = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions",2)
.getOrCreate()
//修改日志级别
spark.sparkContext.setLogLevel("WARN")
//导包
import spark.implicits._
import org.apache.spark.sql.functions._
//todo:2-处理数据
//step1:读取数据
val kafkaData = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "iotTopic")
.load()
//step2:处理数据
val etlData = kafkaData
.selectExpr("cast(value as string)")
.as[String]
//取出每个字段
.select(
//{"device":"device_95","deviceType":"bigdata","signal":96.0,"time":1626849208123}
get_json_object($"value","$.device").cast(StringType).as("device"),
get_json_object($"value","$.deviceType").cast(StringType).as("deviceType"),
get_json_object($"value","$.signal").cast(DoubleType).as("signal"),
get_json_object($"value","$.time").cast(LongType).as("time")
)
val rsData = etlData
.filter($"signal" > 30)
.groupBy($"deviceType")
.agg(
count($"device").as("cnt"),
round(avg($"signal"),2).as("avgsignal")
)
//step3:保存结果
val query = rsData
.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.start()
//todo:3-启动并持久运行
query.awaitTermination()
query.stop()
}
} -
小结
16:物联网设备分析:SQL实现
-
目标:使用StructStreaming的SQL实现物理网设备分析 -
实施 package bigdata.spark.struct.iot
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{DoubleType, LongType, StringType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @ClassName StructIOTSQL
* @Description TODO 使用结构化流SQL计算构建Wordcount程序
*/
object StructIOTSQL {
def main(args: Array[String]): Unit = {
//todo:1-构建SparkSession
val spark = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions",2)
.getOrCreate()
//修改日志级别
spark.sparkContext.setLogLevel("WARN")
//导包
import spark.implicits._
import org.apache.spark.sql.functions._
//todo:2-处理数据
//step1:读取数据
val kafkaData = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "iotTopic")
.load()
//step2:处理数据
val etlData = kafkaData
.selectExpr("cast(value as string)")
.as[String]
.select(
//{"device":"device_80","deviceType":"bigdata","signal":42.0,"time":1626786347440}
get_json_object($"value","$.device").cast(StringType).as("device"),
get_json_object($"value","$.deviceType").cast(StringType).as("deviceType"),
get_json_object($"value","$.signal").cast(DoubleType).as("signal"),
get_json_object($"value","$.time").cast(LongType).as("time")
)
//注册视图
etlData.createOrReplaceTempView("tmp_iot")
//执行SQL语句
val rsData = spark.sql(
"""
|select
| deviceType,
| count(device) as cnt,
| round(avg(signal),2) as avgsignal
|from tmp_iot
|where signal > 30
|group by deviceType
""".stripMargin)
//step3:保存结果
val query = rsData
.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.start()
//todo:3-启动并持久运行
query.awaitTermination()
query.stop()
}
} -
小结
17:数据去重Deduplication
18:Continuous Processing
19:事件时间处理:需求及实现
-
目标:了解事件时间处理的需求及实现 -
路径
-
step1:问题与需求 -
step2:事件时间处理 -
step3:窗口的生成
-
实施
-
问题与需求
-
事件时间处理
-
需求:基于数据的时间来实现词频统计,每5s计算前10s的数据 -
测试数据:数据生成时间,一行单词 2019-10-12 09:00:02,cat dog
2019-10-12 09:00:03,dog dog
2019-10-12 09:00:07,owl cat
2019-10-12 09:00:11,dog
2019-10-12 09:00:13,owl
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2019-10-12 08:59:55, 2019-10-12 09:00:05]|dog |3 |
|[2019-10-12 08:59:55, 2019-10-12 09:00:05]|cat |1 |
|[2019-10-12 09:00:00, 2019-10-12 09:00:10]|dog |3 |
|[2019-10-12 09:00:00, 2019-10-12 09:00:10]|owl |1 |
|[2019-10-12 09:00:00, 2019-10-12 09:00:10]|cat |2 |
|[2019-10-12 09:00:05, 2019-10-12 09:00:15]|cat |1 |
|[2019-10-12 09:00:05, 2019-10-12 09:00:15]|owl |2 |
|[2019-10-12 09:00:05, 2019-10-12 09:00:15]|dog |1 |
|[2019-10-12 09:00:10, 2019-10-12 09:00:20]|dog |1 |
|[2019-10-12 09:00:10, 2019-10-12 09:00:20]|owl |1 |
+------------------------------------------+----+-----+ -
代码测试 package bigdata.spark.struct.window
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 基于Structured Streaming 模块读取TCP Socket读取数据,进行事件时间窗口统计词频WordCount,将结果打印到控制台
* TODO:每5秒钟统计最近10秒内的数据(词频:WordCount)
*
* EventTime即事件真正生成的时间:
* 例如一个用户在10:06点击 了一个按钮,记录在系统中为10:06
* 这条数据发送到Kafka,又到了Spark Streaming中处理,已经是10:08,这个处理的时间就是process Time。
*
*/
object StructuredWindow {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象,传递sparkConf参数
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
// 导入隐式转换及函数库
import org.apache.spark.sql.functions._
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
// 2. 使用SparkSession从TCP Socket读取流式数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
// 3. 针对获取流式DStream进行词频统计
val resultStreamDF = inputStreamDF
// 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
.as[String]
// 过滤无效数据
.filter(line => null != line && line.trim.length > 0)
// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
.flatMap{line =>
val arr = line.trim.split(",")
arr(1).split("\\s+").map(word => (Timestamp.valueOf(arr(0)), word))
}
/**
* 2019-10-12 09:00:02,cat
* 2019-10-12 09:00:02,dog
* 2019-10-12 09:00:03,dog
* 2019-10-12 09:00:03,dog
*/
// 设置列的名称
.toDF("insert_timestamp", "word")
// TODO:设置基于事件时间(event time)窗口 -> insert_timestamp, 每5秒统计最近10秒内数据
/*
1. 先按照窗口分组
2. 再对窗口中按照单词分组
3. 最后使用聚合函数聚合
*/
.groupBy(
//按照数据中的事件时间构建窗口
window($"insert_timestamp", "10 seconds", "5 seconds"),
$"word"
)
.count()
// 按照窗口字段降序排序
.orderBy($"window")
/*
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- word: string (nullable = true)
|-- count: long (nullable = false)
*/
//resultStreamDF.printSchema()
// 4. 将计算的结果输出,打印到控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start() // 流式DataFrame,需要启动
// 查询器一直等待流式应用结束
query.awaitTermination()
query.stop()
}
} -
结果:数据的结果属于对应时间的 +------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2019-10-12 08:59:55, 2019-10-12 09:00:05]|dog |3 |
|[2019-10-12 08:59:55, 2019-10-12 09:00:05]|cat |1 |
|[2019-10-12 09:00:00, 2019-10-12 09:00:10]|dog |3 |
|[2019-10-12 09:00:00, 2019-10-12 09:00:10]|owl |1 |
|[2019-10-12 09:00:00, 2019-10-12 09:00:10]|cat |2 |
|[2019-10-12 09:00:05, 2019-10-12 09:00:15]|cat |1 |
|[2019-10-12 09:00:05, 2019-10-12 09:00:15]|owl |2 |
|[2019-10-12 09:00:05, 2019-10-12 09:00:15]|dog |1 |
|[2019-10-12 09:00:10, 2019-10-12 09:00:20]|dog |1 |
|[2019-10-12 09:00:10, 2019-10-12 09:00:20]|owl |1 |
+------------------------------------------+----+-----+
-
窗口的生成 2019-10-12 09:00:02,cat dog
2019-10-12 09:00:03,dog dog
2019-10-12 09:00:07,owl cat
2019-10-12 09:00:11,dog
2019-10-12 09:00:13,owl
-
小结
20:事件时间处理:延迟数据处理
-
目标:了解基于事件时间的延迟数据处理 -
路径
-
实施
-
设计及规则
-
思考一个问题:如果一个数据延迟到达计算系统,并且已经迟到很久了,再计算已经没有意义,是否需要参与计算的问题 -
举个栗子:实时的统计双十一的累计成交额
-
==如果一条数据迟到的时间特别的久,已经对结果没有什么影响了,就不参与计算了== -
==根据业务来设置一个水位线:如果高于水位线的数据就计算,如果低于水位线就不计算了==
-
假设:11-13 00点对外公布
-
如果数据延迟1天达到,计算 -
如果数据延迟超过1天:不计算
-
延迟时间:1天 -
官方的图例  ? -
水位线的计算规则 水位线 = 上个批次的最大事件时间 - 延迟时间 -
代码中设置水位 // TODO:设置水位Watermark
.withWatermark("time", "10 seconds")
-
测试实现
-
-
dog,2019-10-10 12:00:07
owl,2019-10-10 12:00:08
dog,2019-10-10 12:00:14
cat,2019-10-10 12:00:09
cat,2019-10-10 12:00:15
dog,2019-10-10 12:00:08
owl,2019-10-10 12:00:13
owl,2019-10-10 12:00:21
owl,2019-10-10 12:00:04
owl,2019-10-10 12:00:20 -
代码测试 package bigdata.spark.struct.window
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 基于Structured Streaming 读取TCP Socket读取数据,事件时间窗口统计词频,将结果打印到控制台
* TODO:每5秒钟统计最近10秒内的数据(词频:WordCount),设置水位Watermark时间为10秒
*/
object StructuredWatermarkUpdate {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象,传递sparkConf参数
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
// b. 导入隐式转换及函数库
import org.apache.spark.sql.functions._
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
// 2. 使用SparkSession从TCP Socket读取流式数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
// 3. 针对获取流式DStream设置EventTime窗口及Watermark水位限制
val resultStreamDF = inputStreamDF
// 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
.as[String]
// 过滤无效数据
.filter(line => null != line && line.trim.length > 0)
// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
.map{line =>
val arr = line.trim.split(",")
(arr(0), Timestamp.valueOf(arr(1)))
}
// 设置列的名称
.toDF("word", "time")
// TODO:设置水位Watermark
.withWatermark("time", "10 seconds")
// TODO:设置基于事件时间(event time)窗口 -> time, 每5秒统计最近10秒内数据
.groupBy(
window($"time", "10 seconds", "5 seconds"),
$"word"
).count()
// 4. 将计算的结果输出,打印到控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start() // 流式DataFrame,需要启动
// 查询器一直等待流式应用结束
query.awaitTermination()
query.stop()
}
}
结果
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2019-10-10 12:00:00, 2019-10-10 12:00:10]|dog |2 |
|[2019-10-10 12:00:00, 2019-10-10 12:00:10]|cat |1 |
|[2019-10-10 12:00:10, 2019-10-10 12:00:20]|cat |1 |
|[2019-10-10 12:00:00, 2019-10-10 12:00:10]|owl |1 |
|[2019-10-10 12:00:10, 2019-10-10 12:00:20]|dog |1 |
|[2019-10-10 12:00:05, 2019-10-10 12:00:15]|dog |3 |
|[2019-10-10 12:00:05, 2019-10-10 12:00:15]|cat |1 |
|[2019-10-10 12:00:15, 2019-10-10 12:00:25]|cat |1 |
|[2019-10-10 12:00:05, 2019-10-10 12:00:15]|owl |2 |
|[2019-10-10 12:00:10, 2019-10-10 12:00:20]|owl |2 |
|[2019-10-10 12:00:15, 2019-10-10 12:00:25]|owl |2 |
|[2019-10-10 12:00:20, 2019-10-10 12:00:30]|owl |1 |
+------------------------------------------+----+-----+ -
小结
附录一:Streaming Maven依赖
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
<hbase.version>1.2.0-cdh5.16.2</hbase.version>
<kafka.version>2.0.0</kafka.version>
<mysql.version>8.0.19</mysql.version>
<jedis.version>3.2.0</jedis.version>
</properties>
<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Structured Streaming + Kafka 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- HBase Client 依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- Kafka Client 依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!-- MySQL Client 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- Jedis 依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- JSON解析库:fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
|