package com. gu.networkflow_analysis
import java.sql.Timestamp import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
/**
- Copyright ? 2018-2028 All Rights Reserved
- Project: UserBehaviorAnalysis
- Package: com. gu.networkflow_analysis
- Version: 1.0
- Created by wushengran on 2019/9/23 9:21
*/
// 输入数据样例类 case class ApacheLogEvent( ip: String, userId: String, eventTime: Long, method: String, url: String)
// 窗口聚合结果样例类 case class UrlViewCount( url: String, windowEnd: Long, count: Long )
object NetworkFlow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// val dataStream = env.readTextFile(“D:\Projects\BigData\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\apache.log”) val dataStream = env.socketTextStream(“localhost”, 7777) .map( data => { val dataArray = data.split(" ") // 定义时间转换 val simpleDateFormat = new SimpleDateFormat(“dd/MM/yyyy:HH:mm:ss”) val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime ApacheLogEvent( dataArray(0).trim, dataArray(1).trim, timestamp, dataArray(5).trim, dataArray(6).trim ) } ) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorApacheLogEvent { override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime } ) .keyBy(_.url) .timeWindow(Time.minutes(10), Time.seconds(5)) .allowedLateness(Time.seconds(60)) .aggregate( new CountAgg(), new WindowResult() )
val processedStream = dataStream
.keyBy(_.windowEnd)
.process( new TopNHotUrls(5) )
dataStream.print("aggregate")
processedStream.print("process")
env.execute("network flow job")
} }
// 自定义预聚合函数 class CountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long]{ override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b }
// 自定义窗口处理函数 class WindowResult() extends WindowFunction[Long, UrlViewCount, String, TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = { out.collect( UrlViewCount( key, window.getEnd, input.iterator.next() ) ) } }
// 自定义排序输出处理函数 class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long, UrlViewCount, String]{ lazy val urlState: MapState[String, Long] = getRuntimeContext.getMapState( new MapStateDescriptor[String, Long](“url-state”, classOf[String], classOf[Long] ) )
override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = { urlState.put(value.url, value.count) ctx.timerService().registerEventTimeTimer(value.windowEnd + 1) }
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { // 从状态中拿到数据 val allUrlViews: ListBuffer[(String, Long)] = new ListBuffer(String, Long) val iter = urlState.entries().iterator() while(iter.hasNext){ val entry = iter.next() allUrlViews += (( entry.getKey, entry.getValue )) }
// urlState.clear()
val sortedUrlViews = allUrlViews.sortWith(_._2 > _._2).take(topSize)
// 格式化结果输出
val result: StringBuilder = new StringBuilder()
result.append("时间:").append( new Timestamp( timestamp - 1 ) ).append("\n")
for( i <- sortedUrlViews.indices ){
val currentUrlView = sortedUrlViews(i)
result.append("NO").append(i + 1).append(":")
.append(" URL=").append(currentUrlView._1)
.append(" 访问量=").append(currentUrlView._2).append("\n")
}
result.append("=============================")
Thread.sleep(1000)
out.collect(result.toString())
} } package com. gu.networkflow_analysis
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time
/**
- Copyright ? 2018-2028 All Rights Reserved
- Project: UserBehaviorAnalysis
- Package: com. gu.networkflow_analysis
- Version: 1.0
- Created by wushengran on 2019/9/23 10:28
*/
// 定义输入数据的样例类 case class UserBehavior( userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long )
object PageView { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
.map( data => {
val dataArray = data.split(",")
UserBehavior( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong )
} )
.assignAscendingTimestamps(_.timestamp * 1000L)
.filter( _.behavior == "pv" ) // 只统计pv操作
.map( data => ("pv", 1) )
.keyBy(_._1)
.timeWindow(Time.hours(1))
.sum(1)
dataStream.print("pv count")
env.execute("page view jpb")
} } package com. gu.networkflow_analysis
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector
/**
- Copyright ? 2018-2028 All Rights Reserved
- Project: UserBehaviorAnalysis
- Package: com. gu.networkflow_analysis
- Version: 1.0
- Created by wushengran on 2019/9/23 10:43
*/ case class UvCount( windowEnd: Long, uvCount: Long )
object UniqueVisitor { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
.map( data => {
val dataArray = data.split(",")
UserBehavior( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong )
} )
.assignAscendingTimestamps(_.timestamp * 1000L)
.filter( _.behavior == "pv" ) // 只统计pv操作
.timeWindowAll( Time.hours(1) )
.apply( new UvCountByWindow() )
dataStream.print()
env.execute("uv job")
} }
class UvCountByWindow() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{ override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = { // 定义一个scala set,用于保存所有的数据userId并去重 var idSet = SetLong // 把当前窗口所有数据的ID收集到set中,最后输出set的大小 for( userBehavior <- input ){ idSet += userBehavior.userId } out.collect( UvCount( window.getEnd, idSet.size ) ) } } package com. gu.networkflow_analysis
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import redis.clients.jedis.Jedis
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: UserBehaviorAnalysis -
Package: com. gu.networkflow_analysis -
Version: 1.0 -
Created by wushengran on 2019/9/23 11:34 */ object UvWithBloom { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 用相对路径定义数据源 val resource = getClass.getResource("/UserBehavior.csv") val dataStream = env.readTextFile(resource.getPath) .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong) }) .assignAscendingTimestamps(.timestamp * 1000L) .filter(.behavior == “pv”) // 只统计pv操作 .map(data => (“dummyKey”, data.userId)) .keyBy(_._1) .timeWindow(Time.hours(1)) .trigger(new MyTrigger()) .process(new UvCountWithBloom()) dataStream.print() env.execute(“uv with bloom job”) } }
// 自定义窗口触发器 class MyTrigger() extends Trigger[(String, Long), TimeWindow] { override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { // 每来一条数据,就直接触发窗口操作,并清空所有窗口状态 TriggerResult.FIRE_AND_PURGE } }
// 定义一个布隆过滤器 class Bloom(size: Long) extends Serializable { // 位图的总大小,默认16M private val cap = if (size > 0) size else 1 << 27
// 定义hash函数 def hash(value: String, seed: Int): Long = { var result = 0L for( i <- 0 until value.length ){ result = result * seed + value.charAt(i) } result & ( cap - 1 ) } }
class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{ // 定义redis连接 lazy val jedis = new Jedis(“localhost”, 6379) lazy val bloom = new Bloom(1<<29)
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = { // 位图的存储方式,key是windowEnd,value是bitmap val storeKey = context.window.getEnd.toString var count = 0L // 把每个窗口的uv count值也存入名为count的redis表,存放内容为(windowEnd -> uvCount),所以要先从redis中读取 if( jedis.hget(“count”, storeKey) != null ){ count = jedis.hget(“count”, storeKey).toLong } // 用布隆过滤器判断当前用户是否已经存在 val userId = elements.last._2.toString val offset = bloom.hash(userId, 61) // 定义一个标识位,判断reids位图中有没有这一位 val isExist = jedis.getbit(storeKey, offset) if(!isExist){ // 如果不存在,位图对应位置1,count + 1 jedis.setbit(storeKey, offset, true) jedis.hset(“count”, storeKey, (count + 1).toString) out.collect( UvCount(storeKey.toLong, count + 1) ) } else { out.collect( UvCount(storeKey.toLong, count) ) } } }
package com. gu.marketanalysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector
/**
- Copyright ? 2018-2028 All Rights Reserved
- Project: UserBehaviorAnalysis
- Package: com. gu.marketanalysis
- Version: 1.0
- Created by wushengran on 2019/9/24 10:10
*/ // 输入的广告点击事件样例类 case class AdClickEvent( userId: Long, adId: Long, province: String, city: String, timestamp: Long ) // 按照省份统计的输出结果样例类 case class CountByProvince( windowEnd: String, province: String, count: Long ) // 输出的黑名单报警信息 case class BlackListWarning( userId: Long, adId: Long, msg: String )
object AdStatisticsByGeo { // 定义侧输出流的tag val blackListOutputTag: OutputTag[BlackListWarning] = new OutputTagBlackListWarning
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
// 读取数据并转换成AdClickEvent
val resource = getClass.getResource("/AdClickLog.csv")
val adEventStream = env.readTextFile(resource.getPath)
.map( data => {
val dataArray = data.split(",")
AdClickEvent( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim, dataArray(3).trim, dataArray(4).trim.toLong )
} )
.assignAscendingTimestamps(_.timestamp * 1000L)
// 自定义process function,过滤大量刷点击的行为
val filterBlackListStream = adEventStream
.keyBy( data => (data.userId, data.adId) )
.process( new FilterBlackListUser(100) )
// 根据省份做分组,开窗聚合
val adCountStream = filterBlackListStream
.keyBy(_.province)
.timeWindow( Time.hours(1), Time.seconds(5) )
.aggregate( new AdCountAgg(), new AdCountResult() )
adCountStream.print("count")
filterBlackListStream.getSideOutput(blackListOutputTag).print("blacklist")
env.execute("ad statistics job")
}
class FilterBlackListUser(maxCount: Int) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]{ // 定义状态,保存当前用户对当前广告的点击量 lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long](“count-state”, classOf[Long])) // 保存是否发送过黑名单的状态 lazy val isSentBlackList: ValueState[Boolean] = getRuntimeContext.getState( new ValueStateDescriptor[Boolean](“issent-state”, classOf[Boolean]) ) // 保存定时器触发的时间戳 lazy val resetTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“resettime-state”, classOf[Long]) )
override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = {
// 取出count状态
val curCount = countState.value()
// 如果是第一次处理,注册定时器,每天00:00触发
if( curCount == 0 ){
val ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24)
resetTimer.update(ts)
ctx.timerService().registerProcessingTimeTimer(ts)
}
// 判断计数是否达到上限,如果到达则加入黑名单
if( curCount >= maxCount ){
// 判断是否发送过黑名单,只发送一次
if( !isSentBlackList.value() ){
isSentBlackList.update(true)
// 输出到侧输出流
ctx.output( blackListOutputTag, BlackListWarning(value.userId, value.adId, "Click over " + maxCount + " times today.") )
}
return
}
// 计数状态加1,输出数据到主流
countState.update( curCount + 1 )
out.collect( value )
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = {
// 定时器触发时,清空状态
if( timestamp == resetTimer.value() ){
isSentBlackList.clear()
countState.clear()
resetTimer.clear()
}
}
} }
// 自定义预聚合函数 class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{ override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b }
// 自定义窗口处理函数 class AdCountResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[CountByProvince]): Unit = { out.collect( CountByProvince( new Timestamp(window.getEnd).toString, key, input.iterator.next() ) ) } } package com. gu.marketanalysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: UserBehaviorAnalysis -
Package: com. gu.marketanalysis -
Version: 1.0 -
Created by wushengran on 2019/9/23 15:37 */ object AppMarketing { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.addSource( new SimulatedEventSource() ) .assignAscendingTimestamps(_.timestamp) .filter( .behavior != “UNINSTALL” ) .map( data => { ( “dummyKey”, 1L ) } ) .keyBy(._1) // 以渠道和行为类型作为key分组 .timeWindow( Time.hours(1), Time.seconds(10) ) .aggregate( new CountAgg(), new MarketingCountTotal() ) dataStream.print() env.execute(“app marketing job”) } }
class CountAgg() extends AggregateFunction[(String, Long), Long, Long]{ override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b }
class MarketingCountTotal() extends WindowFunction[Long, MarketingViewCount, String, TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketingViewCount]): Unit = { val startTs = new Timestamp(window.getStart).toString val endTs = new Timestamp(window.getEnd).toString val count = input.iterator.next() out.collect( MarketingViewCount(startTs, endTs, “app marketing”, “total”, count) ) } } package com. gu.marketanalysis
import java.sql.Timestamp import java.util.UUID import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector
import scala.util.Random
/**
- Copyright ? 2018-2028 All Rights Reserved
- Project: UserBehaviorAnalysis
- Package: com. gu.marketanalysis
- Version: 1.0
- Created by wushengran on 2019/9/23 15:06
*/
// 输入数据样例类 case class MarketingUserBehavior( userId: String, behavior: String, channel: String, timestamp: Long ) // 输出结果样例类 case class MarketingViewCount( windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long )
object AppMarketingByChannel { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource( new SimulatedEventSource() )
.assignAscendingTimestamps(_.timestamp)
.filter( _.behavior != "UNINSTALL" )
.map( data => {
( (data.channel, data.behavior), 1L )
} )
.keyBy(_._1) // 以渠道和行为类型作为key分组
.timeWindow( Time.hours(1), Time.seconds(10) )
.process( new MarketingCountByChannel() )
dataStream.print()
env.execute("app marketing by channel job")
} }
// 自定义数据源 class SimulatedEventSource() extends RichSourceFunction[MarketingUserBehavior]{ // 定义是否运行的标识位 var running = true // 定义用户行为的集合 val behaviorTypes: Seq[String] = Seq(“CLICK”, “DOWNLOAD”, “INSTALL”, “UNINSTALL”) // 定义渠道的集合 val channelSets: Seq[String] = Seq(“wechat”, “weibo”, “appstore”, “huaweistore”) // 定义一个随机数发生器 val rand: Random = new Random()
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = { // 定义一个生成数据的上限 val maxElements = Long.MaxValue var count = 0L
// 随机生成所有数据
while( running && count < maxElements ){
val id = UUID.randomUUID().toString
val behavior = behaviorTypes(rand.nextInt(behaviorTypes.size))
val channel = channelSets(rand.nextInt(channelSets.size))
val ts = System.currentTimeMillis()
ctx.collect( MarketingUserBehavior( id, behavior, channel, ts ) )
count += 1
TimeUnit.MILLISECONDS.sleep(10L)
}
} }
// 自定义处理函数 class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long), MarketingViewCount, (String, String), TimeWindow]{ override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)], out: Collector[MarketingViewCount]): Unit = { val startTs = new Timestamp(context.window.getStart).toString val endTs = new Timestamp(context.window.getEnd).toString val channel = key._1 val behavior = key._2 val count = elements.size out.collect( MarketingViewCount(startTs, endTs, channel, behavior, count) ) } }
package com. gu.apitest.sinktest
import java.util
import com. gu.apitest.SensorReading import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.apitest.sinktest -
Version: 1.0 -
Created by wushengran on 2019/9/17 16:27 */ object EsSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // source val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”) // transform val dataStream = inputStream .map( data => { val dataArray = data.split(",") SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ) } ) val httpHosts = new util.ArrayListHttpHost httpHosts.add(new HttpHost(“localhost”, 9200)) // 创建一个esSink 的builder val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] { override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = { println("saving data: " + element) // 包装成一个Map或者JsonObject val json = new util.HashMapString, String json.put(“sensor_id”, element.id) json.put(“temperature”, element.temperature.toString) json.put(“ts”, element.timestamp.toString) // 创建index request,准备发送数据
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("readingdata")
.source(json)
// 利用index发送请求,写入数据
indexer.add(indexRequest)
println("data saved.")
}
} ) // sink dataStream.addSink( esSinkBuilder.build() ) env.execute(“es sink test”) } } package com. gu.apitest.sinktest
import java.sql.{Connection, DriverManager, PreparedStatement}
import com. gu.apitest.SensorReading import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala._
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.apitest.sinktest -
Version: 1.0 -
Created by wushengran on 2019/9/17 16:44 */ object JdbcSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // source val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”) // transform val dataStream = inputStream .map( data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) } ) // sink dataStream.addSink( new MyJdbcSink() ) env.execute(“jdbc sink test”) } }
class MyJdbcSink() extends RichSinkFunction[SensorReading]{ // 定义sql连接、预编译器 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _
// 初始化,创建连接和预编译语句 override def open(parameters: Configuration): Unit = { super.open(parameters) conn = DriverManager.getConnection(“jdbc:mysql://localhost:3306/test”, “root”, “123456”) insertStmt = conn.prepareStatement(“INSERT INTO temperatures (sensor, temp) VALUES (?,?)”) updateStmt = conn.prepareStatement(“UPDATE temperatures SET temp = ? WHERE sensor = ?”) }
// 调用连接,执行sql override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { // 执行更新语句 updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() // 如果update没有查到数据,那么执行插入语句 if( updateStmt.getUpdateCount == 0 ){ insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } }
// 关闭时做清理工作 override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } } package com. gu.apitest.sinktest
import com. gu.apitest.SensorReading import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.apitest.sinktest -
Version: 1.0 -
Created by wushengran on 2019/9/17 16:12 */ object RedisSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // source val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”) // transform val dataStream = inputStream .map( data => { val dataArray = data.split(",") SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ) } ) val conf = new FlinkJedisPoolConfig.Builder() .setHost(“localhost”) .setPort(6379) .build() // sink dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) ) env.execute(“redis sink test”) } }
class MyRedisMapper() extends RedisMapper[SensorReading]{
// 定义保存数据到redis的命令 override def getCommandDescription: RedisCommandDescription = { // 把传感器id和温度值保存成哈希表 HSET key field value new RedisCommandDescription( RedisCommand.HSET, “sensor_temperature” ) }
// 定义保存到redis的value override def getValueFromData(t: SensorReading): String = t.temperature.toString
// 定义保存到redis的key override def getKeyFromData(t: SensorReading): String = t.id } package com. gu.apitest.sinktest
import java.util.Properties
import com. gu.apitest.SensorReading import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.apitest.sinktest -
Version: 1.0 -
Created by wushengran on 2019/9/17 15:43 */ object KafkaSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // source // val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”) val properties = new Properties() properties.setProperty(“bootstrap.servers”, “localhost:9092”) properties.setProperty(“group.id”, “consumer-group”) properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”) properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”) properties.setProperty(“auto.offset.reset”, “latest”) val inputStream = env.addSource(new FlinkKafkaConsumer011[String](“sensor”, new SimpleStringSchema(), properties)) // Transform操作 val dataStream = inputStream .map( data => { val dataArray = data.split(",") SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出 } ) // sink dataStream.addSink( new FlinkKafkaProducer011[String]( “sinkTest”, new SimpleStringSchema(), properties) ) dataStream.print() env.execute(“kafka sink test”) } } package com. gu.apitest
import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction} import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.apitest -
Version: 1.0 -
Created by wushengran on 2019/8/24 10:14 */ object ProcessFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(60000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setCheckpointTimeout(100000) env.getCheckpointConfig.setFailOnCheckpointingErrors(false) // env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))
// env.setStateBackend( new RocksDBStateBackend("") )
val stream = env.socketTextStream("localhost", 7777)
val dataStream = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
} )
val processedStream = dataStream.keyBy(_.id)
.process( new TempIncreAlert() )
val processedStream2 = dataStream.keyBy(_.id)
// .process( new TempChangeAlert(10.0) ) .flatMap( new TempChangeAlert(10.0) )
val processedStream3 = dataStream.keyBy(_.id)
.flatMapWithState[(String, Double, Double), Double]{
// 如果没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
case ( input: SensorReading, None ) => ( List.empty, Some(input.temperature) )
// 如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警
case ( input: SensorReading, lastTemp: Some[Double] ) =>
val diff = ( input.temperature - lastTemp.get ).abs
if( diff > 10.0 ){
( List((input.id, lastTemp.get, input.temperature)), Some(input.temperature) )
} else
( List.empty, Some(input.temperature) )
}
dataStream.print("input data")
processedStream3.print("processed data")
env.execute("process function test")
} }
class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String]{
// 定义一个状态,用来保存上一个数据的温度值 lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) ) // 定义一个状态,用来保存定时器的时间戳 lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“currentTimer”, classOf[Long]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { // 先取出上一个温度值 val preTemp = lastTemp.value() // 更新温度值 lastTemp.update( value.temperature )
val curTimerTs = currentTimer.value()
if( value.temperature < preTemp || preTemp == 0.0 ){
// 如果温度下降,或是第一条数据,删除定时器并清空状态
ctx.timerService().deleteProcessingTimeTimer( curTimerTs )
currentTimer.clear()
} else if ( value.temperature > preTemp && curTimerTs == 0 ){
// 温度上升且没有设过定时器,则注册定时器
val timerTs = ctx.timerService().currentProcessingTime() + 5000L
ctx.timerService().registerProcessingTimeTimer( timerTs )
currentTimer.update( timerTs )
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { // 输出报警信息 out.collect( ctx.getCurrentKey + " 温度连续上升" ) currentTimer.clear() } }
class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = { // 初始化的时候声明state变量 lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double](“lastTemp”, classOf[Double])) }
override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { // 获取上次的温度值 val lastTemp = lastTempState.value() // 用当前的温度值和上次的求差,如果大于阈值,输出报警信息 val diff = (value.temperature - lastTemp).abs if(diff > threshold){ out.collect( (value.id, lastTemp, value.temperature) ) } lastTempState.update(value.temperature) }
}
class TempChangeAlert2(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)]{ // 定义一个状态变量,保存上次的温度值 lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { // 获取上次的温度值 val lastTemp = lastTempState.value() // 用当前的温度值和上次的求差,如果大于阈值,输出报警信息 val diff = (value.temperature - lastTemp).abs if(diff > threshold){ out.collect( (value.id, lastTemp, value.temperature) ) } lastTempState.update(value.temperature) } } package com. gu.apitest
import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.apitest -
Version: 1.0 -
Created by wushengran on 2019/8/24 11:16 */ object SideOutputTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.socketTextStream(“localhost”, 7777) val dataStream = stream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorSensorReading { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000 } ) val processedStream = dataStream .process( new FreezingAlert() )
// dataStream.print(“input data”) processedStream.print(“processed data”) processedStream.getSideOutput( new OutputTag[String](“freezing alert”) ).print(“alert data”)
env.execute("side output test")
} }
// 冰点报警,如果小于32F,输出报警信息到侧输出流 class FreezingAlert() extends ProcessFunction[SensorReading, SensorReading]{
// lazy val alertOutput: OutputTag[String] = new OutputTag[String]( “freezing alert” )
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { if( value.temperature < 32.0 ){ ctx.output( new OutputTag[String]( “freezing alert” ), "freezing alert for " + value.id ) } out.collect( value ) } } package com. gu.apitest
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import scala.util.Random
/**
- Copyright ? 2018-2028 All Rights Reserved
- Project: FlinkTutorial
- Package: com. gu.apitest
- Version: 1.0
- Created by wushengran on 2019/9/17 10:11
*/
// 定义传感器数据样例类 case class SensorReading( id: String, timestamp: Long, temperature: Double )
object SourceTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1)
// 1. 从集合中读取数据
val stream1 = env.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))
// env.fromElements(“flink”, 1, 32, 3213, 0.324).print(“test”)
// 2. 从文件中读取数据
val stream2 = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
// 3. 从kafka中读取数据
// 创建kafka相关的配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
// 4. 自定义数据源
val stream4 = env.addSource(new SensorSource())
// sink输出
stream4.print("stream4")
env.execute("source api test")
} }
class SensorSource() extends SourceFunction[SensorReading]{ // 定义一个flag:表示数据源是否还在正常运行 var running: Boolean = true override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = { // 创建一个随机数发生器 val rand = new Random()
// 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据
var curTemp = 1.to(10).map(
i => ( "sensor_" + i, 60 + rand.nextGaussian() * 20 )
)
// 无限循环生成流数据,除非被cancel
while(running){
// 更新温度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian())
)
// 获取当前的时间戳
val curTime = System.currentTimeMillis()
// 包装成SensorReading,输出
curTemp.foreach(
t => ctx.collect( SensorReading(t._1, curTime, t._2) )
)
// 间隔100ms
Thread.sleep(100)
}
} } package com. gu.apitest
import org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.apitest -
Version: 1.0 -
Created by wushengran on 2019/9/17 11:41 */ object TransformTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 读入数据 val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”) // Transform操作 val dataStream = inputStream .map( data => { val dataArray = data.split(",") SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ) } ) // 1. 聚合操作 val stream1 = dataStream .keyBy(“id”) // .sum(“temperature”) .reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10) ) // 2. 分流,根据温度是否大于30度划分 val splitStream = dataStream .split( sensorData => { if( sensorData.temperature > 30 ) Seq(“high”) else Seq(“low”) } ) val highTempStream = splitStream.select(“high”) val lowTempStream = splitStream.select(“low”) val allTempStream = splitStream.select(“high”, “low”) // 3. 合并两条流 val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) ) val connectedStreams = warningStream.connect(lowTempStream) val coMapStream = connectedStreams.map( warningData => ( warningData._1, warningData._2, “high temperature warning” ), lowData => ( lowData.id, “healthy” ) ) val unionStream = highTempStream.union(lowTempStream) // 函数类 dataStream.filter( new MyFilter() ).print() // 输出数据 // dataStream.print() // highTempStream.print(“high”) // lowTempStream.print(“low”) // allTempStream.print(“all”) // unionStream.print(“union”) env.execute(“transform test job”) } }
class MyFilter() extends FilterFunction[SensorReading]{ override def filter(value: SensorReading): Boolean = { value.id.startsWith(“sensor_1”) } }
class MyMapper() extends RichMapFunction[SensorReading, String]{ override def map(value: SensorReading): String = { “flink” }
override def open(parameters: Configuration): Unit = super.open(parameters) } package com. gu.apitest
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, KeyedProcessFunction} import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.apitest -
Version: 1.0 -
Created by wushengran on 2019/9/18 9:31 */ object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(500) // 读入数据 // val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”) val inputStream = env.socketTextStream(“localhost”, 7777) val dataStream = inputStream .map( data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) } ) // .assignAscendingTimestamps(.timestamp * 1000L) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L }) // .assignTimestampsAndWatermarks( new MyAssigner() ) .map(data => (data.id, data.temperature)) .keyBy(._1) // .process( new MyProcess() ) .timeWindow(Time.seconds(10), Time.seconds(3)) .reduce((result, data) => (data._1, result._2.min(data._2))) // 统计10秒内的最低温度值 dataStream.print() env.execute(“window api test”) } }
class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{ // 定义固定延迟为3秒 val bound: Long = 3 * 1000L // 定义当前收到的最大的时间戳 var maxTs: Long = Long.MinValue
override def getCurrentWatermark: Watermark = { new Watermark(maxTs - bound) }
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = { maxTs = maxTs.max(element.timestamp * 1000L) element.timestamp * 1000L } }
class MyAssigner2() extends AssignerWithPunctuatedWatermarks[SensorReading]{ val bound: Long = 1000L
override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = { if( lastElement.id == “sensor_1” ){ new Watermark(extractedTimestamp - bound) }else{ null } }
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = { element.timestamp * 1000L } } package com. gu.wc
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._
/**
-
Copyright ? 2018-2028 All Rights Reserved -
Project: FlinkTutorial -
Package: com. gu.wc -
Version: 1.0 -
Created by wushengran on 2019/9/16 14:08 */ object StreamWordCount { def main(args: Array[String]): Unit = { val params = ParameterTool.fromArgs(args) val host: String = params.get(“host”) val port: Int = params.getInt(“port”) // 创建一个流处理的执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // env.setParallelism(1) // env.disableOperatorChaining() // 接收socket数据流 val textDataStream = env.socketTextStream(host, port) // 逐一读取数据,分词之后进行wordcount val wordCountDataStream = textDataStream.flatMap(.split("\s")) .filter(.nonEmpty).startNewChain() .map( (_, 1) ) .keyBy(0) .sum(1) // 打印输出 wordCountDataStream.print().setParallelism(1) // 执行任务 env.execute(“stream word count job”) } } package com. gu.wc
import org.apache.flink.api.scala._
/**
- Copyright ? 2018-2028 All Rights Reserved
- Project: FlinkTutorial
- Package: com. gu.wc
- Version: 1.0
- Created by wushengran on 2019/9/16 11:48
*/
// 批处理代码 object WordCount { def main(args: Array[String]): Unit = { // 创建一个批处理的执行环境 val env = ExecutionEnvironment.getExecutionEnvironment
// 从文件中读取数据
val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"
val inputDataSet = env.readTextFile(inputPath)
// 分词之后做count
val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
.map( (_, 1) )
.groupBy(0)
.sum(1)
// 打印输出
wordCountDataSet.print()
} } package streaming.sink;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
-
把数据写入redis */ public class SinkForRedisDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource text = env.socketTextStream(“192.168.167.254”, 8888, “\n”); //lpsuh l_words word //对数据进行组装,把string转化为tuple2<String,String> DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String value) throws Exception { return new Tuple2<>(“b”, value); } }); //创建redis的配置 FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“192.168.167.254”).setPort(6379).build(); //创建redissink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
} public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> { //表示从接收的数据中获取需要操作的redis key @Override public String getKeyFromData(Tuple2<String, String> data) { return data.f0; } //表示从接收的数据中获取需要操作的redis value @Override public String getValueFromData(Tuple2<String, String> data) { return data.f1; } @Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
} } <?xml version="1.0" encoding="UTF-8"?>
4.0.0
<groupId>com. .flink</groupId>
<artifactId> Pro</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>ETL</module>
<module>Report</module>
</modules>
<properties>
<flink.version>1.9.0</flink.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
<!-- 日志相关依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- redis依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- json依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<!--es依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<testExcludes>
<testExclude>/src/test/**</testExclude>
</testExcludes>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<?xml version="1.0" encoding="UTF-8"?>
Pro com. .flink 1.0-SNAPSHOT 4.0.0
<artifactId>ETL</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<!-- 日志相关依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- redis依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- json依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
package com. .core;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com. .source. RedisSource; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.util.Collector;
import java.util.HashMap; import java.util.Properties;
/**
-
数据清洗 */ public class DataClean { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //我们是从Kafka里面读取数据,所以这儿就是topic有多少个partition,那么就设置几个并行度。 env.setParallelism(3); env.enableCheckpointing(60000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //注释,我们这儿其实需要设置state backed类型,我们要把checkpoint的数据存储到 //rocksdb里面 //第一步:从Kafka里面读取数据 消费者 数据源需要kafka
//topic的取名还是有讲究的,最好就是让人根据这个名字就能知道里面有什么数据。
//xxxx_xxx_xxx_xxx
String topic="allData";
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers","192.168.167.254:9092");
consumerProperties.put("group.id","allTopic_consumer");
/**
* String topic, 主题
* KafkaDeserializationSchema<T> deserializer,
* Properties props
*/
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic,
new SimpleStringSchema(),
consumerProperties);
//{"dt":"2019-11-24 19:54:23","countryCode":"PK","data":[{"type":"s4","score":0.8,"level":"C"},{"type":"s5","score":0.2,"level":"C"}]}
DataStreamSource<String> allData = env.addSource(consumer);
//设置为广播变量
DataStream<HashMap<String, String>> mapData = env.addSource(new RedisSource()).broadcast();
SingleOutputStreamOperator<String> etlData = allData.connect(mapData).flatMap(new CoFlatMapFunction<String, HashMap<String, String>, String>() {
HashMap<String, String> allMap = new HashMap<String, String>();
//里面处理的是kafka的数据
@Override
public void flatMap1(String line, Collector<String> out) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(line);
String dt = jsonObject.getString("dt");
String countryCode = jsonObject.getString("countryCode");
//可以根据countryCode获取大区的名字
String area = allMap.get(countryCode);
JSONArray data = jsonObject.getJSONArray("data");
for (int i = 0; i < data.size(); i++) {
JSONObject dataObject = data.getJSONObject(i);
System.out.println("大区:"+area);
dataObject.put("dt", dt);
dataObject.put("area", area);
//下游获取到数据的时候,也就是一个json格式的数据
out.collect(dataObject.toJSONString());
}
}
//里面处理的是redis里面的数据
@Override
public void flatMap2(HashMap<String, String> map,
Collector<String> collector) throws Exception {
System.out.println(map.toString());
allMap = map;
}
});
//ETL -> load kafka
etlData.print().setParallelism(1);
/**
* String topicId,
* SerializationSchema<IN> serializationSchema,
* Properties producerConfig)
*/
// String outputTopic=“allDataClean”; // Properties producerProperties = new Properties(); // producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”); // FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic, // new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), // producerProperties); // // //搞一个Kafka的生产者 // etlData.addSink(producer);
env.execute("DataClean");
}
} package com. .producer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;
import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Random;
/**
-
模拟数据源 */ public class kafkaProducer { public static void main(String[] args) throws Exception{ Properties prop = new Properties(); //指定kafka broker地址 prop.put(“bootstrap.servers”, “192.168.167.254:9092”); //指定key value的序列化方式 prop.put(“key.serializer”, StringSerializer.class.getName()); prop.put(“value.serializer”, StringSerializer.class.getName()); //指定topic名称 String topic = “allData”; //创建producer链接
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
//{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}
while(true){
String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\",\"data\":[{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"},{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"}]}";
System.out.println(message);
//同步的方式,往Kafka里面生产数据
producer.send(new ProducerRecord<String, String>(topic,message));
Thread.sleep(2000);
}
//关闭链接
//producer.close();
} public static String getCurrentTime(){ SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”); return sdf.format(new Date()); } public static String getCountryCode(){ String[] types = {“US”,“TW”,“HK”,“PK”,“KW”,“SA”,“IN”}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getRandomType(){ String[] types = {“s1”,“s2”,“s3”,“s4”,“s5”}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static double getRandomScore(){ double[] types = {0.3,0.2,0.1,0.5,0.8}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getRandomLevel(){ String[] types = {“A”,“A+”,“B”,“C”,“D”}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; }
} package com. .source;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.HashMap; import java.util.Map;
/** *
- hset areas AREA_US US
- hset areas AREA_CT TW,HK
- hset areas AREA_AR PK,KW,SA
- hset areas AREA_IN IN
- IN,AREA_IN
*/ public class RedisSource implements SourceFunction<HashMap<String,String>> {
private Logger logger=LoggerFactory.getLogger( RedisSource.class);
private Jedis jedis;
private boolean isRunning=true;
@Override
public void run(SourceContext<HashMap<String, String>> cxt) throws Exception {
this.jedis = new Jedis("192.168.167.254",6379);
HashMap<String, String> map = new HashMap<>();
while(isRunning){
try{
map.clear();
Map<String, String> areas = jedis.hgetAll("areas");
for(Map.Entry<String,String> entry: areas.entrySet()){
String area = entry.getKey();
String value = entry.getValue();
String[] fields = value.split(",");
for(String country:fields){
map.put(country,area);
}
}
if(map.size() > 0 ){
cxt.collect(map);
}
Thread.sleep(60000);
}catch (JedisConnectionException e){
logger.error("redis连接异常",e.getCause());
this.jedis = new Jedis("192.168.167.254",6379);
}catch (Exception e){
logger.error("数据源异常",e.getCause());
}
}
}
@Override
public void cancel() {
isRunning=false;
if(jedis != null){
jedis.close();
}
}
} <?xml version="1.0" encoding="UTF-8"?>
Pro com. .flink 1.0-SNAPSHOT 4.0.0
<artifactId>Report</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<!-- 日志相关依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- redis依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- json依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
package com. .core;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com. .function.MySumFuction; import com. .watermark.MyWaterMark; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties;
/**
-
ETL:对数据进行预处理 -
报表:就是要计算一些指标 */ public class DataReport { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); env.enableCheckpointing(60000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //env.setStateBackend(new RocksDBStateBackend("")); //设置time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); String topic=“auditLog”; Properties consumerProperties = new Properties(); consumerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”); consumerProperties.put(“group.id”,“auditLog_consumer”); //读取Kafka里面对数据 //{“dt”:“2019-11-24 21:19:47”,“type”:“child_unshelf”,“username”:“shenhe1”,“area”:“AREA_ID”} FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(topic,new SimpleStringSchema(),consumerProperties); DataStreamSource data = env.addSource(consumer); Logger logger= LoggerFactory.getLogger(DataReport.class); //对数据进行处理 SingleOutputStreamOperator<Tuple3<Long, String, String>> preData = data.map(new MapFunction<String, Tuple3<Long, String, String>>() { /** * Long:time * String: type * String: area * @return * @throws Exception */ @Override public Tuple3<Long, String, String> map(String line) throws Exception { JSONObject jsonObject = JSON.parseObject(line); String dt = jsonObject.getString(“dt”); String type = jsonObject.getString(“type”); String area = jsonObject.getString(“area”); long time = 0; try { SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”); time = sdf.parse(dt).getTime(); } catch (ParseException e) { logger.error(“时间解析失败,dt:” + dt, e.getCause()); } return Tuple3.of(time, type, area); } }); /**
*过滤无效的数据
*/
SingleOutputStreamOperator<Tuple3<Long, String, String>> filterData = preData.filter(tuple3 -> tuple3.f0 != 0);
/**
* 收集迟到太久的数据
*/
OutputTag<Tuple3<Long,String,String>> outputTag=
new OutputTag<Tuple3<Long,String,String>>("late-date"){};
/**
* 进行窗口的统计操作
* 统计的过去的一分钟的每个大区的,不同类型的有效视频数量
*/
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> resultData = filterData.assignTimestampsAndWatermarks(new MyWaterMark())
.keyBy(1, 2)
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.sideOutputLateData(outputTag)
.apply(new MySumFuction());
/**
* 收集到延迟太多的数据,业务里面要求写到Kafka
*/
SingleOutputStreamOperator<String> sideOutput =
//java8
resultData.getSideOutput(outputTag).map(line -> line.toString());
// String outputTopic=“lateData”; // Properties producerProperties = new Properties(); // producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”); // FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic, // new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), // producerProperties); // sideOutput.addSink(producer);
/**
* 业务里面需要吧数据写到ES里面
* 而我们公司是需要把数据写到kafka
*/
resultData.print();
env.execute("DataReport");
}
} package com. .function;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat; import java.util.Date;
/**
- IN,输入的数据类型
- OUT,输出的数据类型
- KEY,在flink里面这儿其实就是分组的字段,大家永远看到的是一个tuple字段
- 只不过,如果你的分组的字段是有一个,那么这个tuple里面就只会有一个字段
- 如果说你的分组的字段有多个,那么这个里面就会有多个字段。
- W extends Window
*/ public class MySumFuction implements WindowFunction<Tuple3<Long,String,String>, Tuple4<String,String,String,Long>,Tuple,TimeWindow> { @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, String, String>> input, Collector<Tuple4<String, String, String, Long>> out) { //获取分组字段信息 String type = tuple.getField(0).toString(); String area = tuple.getField(1).toString();
java.util.Iterator<Tuple3<Long, String, String>> iterator = input.iterator();
long count=0;
while(iterator.hasNext()){
iterator.next();
count++;
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = sdf.format(new Date(timeWindow.getEnd()));
Tuple4<String, String, String, Long> result =
new Tuple4<String, String, String, Long>(time, type, area, count);
out.collect(result);
}
} package com. .source;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;
import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Random;
/** * */ public class ProducerDataReport {
public static void main(String[] args) throws Exception{ Properties prop = new Properties(); //指定kafka broker地址 prop.put(“bootstrap.servers”, “192.168.167.254:9092”); //指定key value的序列化方式 prop.put(“key.serializer”, StringSerializer.class.getName()); prop.put(“value.serializer”, StringSerializer.class.getName()); //指定topic名称 String topic = “auditLog”;
//创建producer链接 KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop); //{“dt”:“2018-01-01 10:11:22”,“type”:“shelf”,“username”:“shenhe1”,“area”:“AREA_US”} //生产消息 while(true){ String message = “{“dt”:”"+getCurrentTime()+"",“type”:""+getRandomType()+"",“username”:""+getRandomUsername()+"",“area”:""+getRandomArea()+""}"; System.out.println(message); producer.send(new ProducerRecord<String, String>(topic,message)); Thread.sleep(500); } //关闭链接 //producer.close(); } public static String getCurrentTime(){ SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”); return sdf.format(new Date()); } public static String getRandomArea(){ String[] types = {“AREA_US”,“AREA_CT”,“AREA_AR”,“AREA_IN”,“AREA_ID”}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getRandomType(){ String[] types = {“shelf”,“unshelf”,“black”,“chlid_shelf”,“child_unshelf”}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getRandomUsername(){ String[] types = {“shenhe1”,“shenhe2”,“shenhe3”,“shenhe4”,“shenhe5”}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; }
} package com. .watermark;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
/** * */ public class MyWaterMark implements AssignerWithPeriodicWatermarks<Tuple3<Long,String,String>> { long currentMaxTimestamp=0L; final long maxOutputOfOrderness=20000L;//允许乱序时间。 @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutputOfOrderness); } @Override public long extractTimestamp(Tuple3<Long, String, String> element, long l) { Long timeStamp = element.f0; currentMaxTimestamp=Math.max(timeStamp,currentMaxTimestamp); return timeStamp; } }
|