原始代码
object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStreamSource[String] = env.readTextFile("src/main/resources/hello.txt")
val value: DataStream[SensorReading] = stream
.map(a => {
val arr: Array[String] = a.split(",")
new SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
val value1: DataStream[SensorReading] = value
.keyBy("id")
.reduce((a,b)=>new SensorReading(a.id,a.timestamp+1,b.temperature+10))
value1.print()
env.execute("tranform")
}
}
case class SensorReading(id:String,timestamp:Long,temperature:Double)
解决办法
? ? ? ?样例类(实体类)中要有无参构造方法,应把SensorReading类修改如下形式
class SensorReading(){
var id:String=_
var timestamp:Long=_
var temperature:Double=_
def this(id1:String,timestamp1:Long,temperature1:Double){
this
id=id1
timestamp=timestamp1
temperature=temperature1
}
override def toString: String = id+"+"+timestamp+"+"+temperature
}
|