案例需求:
需求:使用netcat工具向9999端口不断的发送数据,通过flink读取端口数据并统计不同单词出现的次数。
代码实现:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WordCountFromSocket {
case class WordWithCount(word: String, count: Int)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.socketTextStream("192.168.91.128", 9999, '\n')
val transformed = stream
.flatMap(line => line.split("\\s"))
.map(w => WordWithCount(w, 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
transformed.print()
env.execute()
}
}
使用netcat发送数据:
命令nc -lk 9999
运行结果:
|