计算最热门TopN商品
package hosItems_analysis
import java.sql.Timestamp
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object hosItems {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
val dataStream: DataStream[UserBehavior] = inputStream
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val aggStream: DataStream[ItemViewCount] = dataStream
.filter(_.behavior == "pv")
.keyBy("itemId")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new CountAgg, new ItemCountWindowResult())
val resultStream:DataStream[String]=aggStream
.keyBy("windowEnd")
.process(new TopNHotItems(5))
resultStream.print()
env.execute("hot items job")
}
}
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class AtvgAgg()extends AggregateFunction[UserBehavior,(Long,Int),Double] {
override def createAccumulator(): (Long, Int) = (0L,0)
override def add(value: UserBehavior, accumulator: (Long, Int)): (Long, Int) = (accumulator._1+value.timestamp,accumulator._2+1)
override def getResult(accumulator: (Long, Int)): Double = accumulator._1/accumulator._2.toDouble
override def merge(a: (Long, Int), b: (Long, Int)): (Long, Int) = (a._1+b._1,a._2+b._2)
}
class ItemCountWindowResult() extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
val windowEnd: Long = window.getEnd
val count: Long = input.iterator.next()
out.collect(ItemViewCount(itemId, windowEnd, count))
}
}
class TopNHotItems(n: Int) extends KeyedProcessFunction[Tuple,ItemViewCount,String]{
lazy val itemCountListState:ListState[ItemViewCount]=getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemcount-list",classOf[ItemViewCount]))
override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
itemCountListState.add(value)
ctx.timerService().registerEventTimeTimer(value.windowEnd+100)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
val allItemCountList:ListBuffer[ItemViewCount]=ListBuffer()
import scala.collection.JavaConversions._
for(itemCount<-itemCountListState.get()){
allItemCountList+=itemCount
}
val sortedItemCountList=allItemCountList.sortBy(_.count)(Ordering.Long.reverse).take(n)
itemCountListState.clear()
val result:StringBuilder=new StringBuilder
result.append("时间:").append(new Timestamp(timestamp-100)).append("\n")
for(i<- sortedItemCountList.indices){
val currentItemCount = sortedItemCountList(i)
result.append("Top").append(i+1).append(":")
.append(" 商品ID=").append(currentItemCount.itemId)
.append(" 访问量=").append(currentItemCount.count)
.append("\n")
}
result.append("======================================================================================\n\n")
Thread.sleep(1000)
out.collect(result.toString())
}
}
从kafka发送数据
package hosItems_analysis
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerUtil {
def main(args: Array[String]): Unit = {
writeToKafkaWithTopic("hotItems")
}
def writeToKafkaWithTopic(topic: String):Unit={
val properties=new Properties()
properties.setProperty("bootstrap.servers", "192.168.1.103:9092")
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer=new KafkaProducer[String,String](properties)
val bufferedSource=io.Source.fromFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
for(line<-bufferedSource.getLines()){
val record=new ProducerRecord[String,String](topic,line)
producer.send(record)
}
producer.close()
}
}
网站总浏览量pv
package networkflow_analysis
import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class PvCount(windowEnd: Long, count: Long)
object PageView {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
val dataStream: DataStream[UserBehavior] = inputStream
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val pvStream: DataStream[PvCount] = dataStream
.filter(_.behavior == "pv")
.map(new MyMapper())
.keyBy(_._1)
.timeWindow(Time.hours(1))
.aggregate(new PvCountAgg(), new PvCountResult())
val pvTotalStream: DataStream[PvCount] = pvStream
.keyBy(_.windowEnd)
.sum("count")
pvTotalStream.print()
env.execute("pvJob")
}
}
class PvCountAgg() extends AggregateFunction[(String, Long), Long, Long] {
override def createAccumulator(): Long = 0L
override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class PvCountResult() extends WindowFunction[Long, PvCount, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
out.collect(PvCount(window.getEnd, input.head))
}
}
class MyMapper() extends MapFunction[UserBehavior, (String, Long)] {
override def map(value: UserBehavior): (String, Long) = (Random.nextInt(8).toString, 1L)
}
class TotalPvCountResult() extends KeyedProcessFunction[Long, PvCount, PvCount] {
lazy val totalCountState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-count", classOf[Long]))
override def processElement(value: PvCount, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#Context, out: Collector[PvCount]): Unit = {
val currentTotalCount: Long = totalCountState.value()
totalCountState.update(currentTotalCount + value.count)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
out.collect(PvCount(ctx.getCurrentKey, totalCountState.value))
totalCountState.clear()
}
}
网站独立访客数uv
package networkflow_analysis
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
case class UvCount(windowEnd:Long,count:Long)
object UniqueVisitor {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
val dataStream: DataStream[UserBehavior] = inputStream
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val uvStream: DataStream[UvCount] = dataStream
.filter(_.behavior == "pv")
.timeWindowAll(Time.hours(1))
.aggregate(new UvCountAgg(),new UvCountResultWithIncreaseAgg())
uvStream.print()
env.execute("uvJob")
}
}
class UvCountResult extends AllWindowFunction[UserBehavior,UvCount,TimeWindow]{
override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
var idSet: Set[Long] =Set[Long]()
for(userBehavior<-input){
idSet+=userBehavior.userId
}
out.collect(UvCount(window.getEnd,idSet.size))
}
}
class UvCountAgg()extends AggregateFunction[UserBehavior,Set[Long],Long]{
override def createAccumulator(): Set[Long] = Set[Long]()
override def add(value: UserBehavior, accumulator: Set[Long]): Set[Long] = accumulator+value.userId
override def getResult(accumulator: Set[Long]): Long = accumulator.size
override def merge(a: Set[Long], b: Set[Long]): Set[Long] = a++b
}
class UvCountResultWithIncreaseAgg()extends AllWindowFunction[Long,UvCount,TimeWindow]{
override def apply(window: TimeWindow, input: Iterable[Long], out: Collector[UvCount]): Unit = {
out.collect(UvCount(window.getEnd,input.head))
}
}
使用布隆过滤
package networkflow_analysis
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis
object UvWithBloomFilter {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
val dataStream: DataStream[UserBehavior] = inputStream
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val uvStream: DataStream[UvCount] = dataStream
.filter(_.behavior == "pv")
.map(data => ("uv",data.userId))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.trigger(new MyTrigger)
.process(new UvCountResultWithBloomFilter())
uvStream.print()
env.execute("uv job")
}
}
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountResultWithBloomFilter() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {
var jedis: Jedis = _
var bloom: Bloom = _
override def open(parameters: Configuration): Unit = {
jedis = new Jedis("localhost", 6379)
bloom=new Bloom(1<<30)
}
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
val storedKey=context.window.getEnd.toString
val countMap="countMap"
var count=0L
if(jedis.hget(countMap,storedKey)!=null){
count=jedis.hget(countMap,storedKey).toLong
}
val userId=elements.last._2.toString
val offset=bloom.hash(userId,61)
val isExit=jedis.getbit(storedKey,offset)
if(!isExit){
jedis.setbit(storedKey,offset,true)
jedis.hset(countMap,storedKey,(count+1).toString)
}
}
}
class Bloom(size: Long) extends Serializable {
private val cap = size
def hash(str: String, seed: Int): Long = {
var result = 0
for (i <- 0 until str.length) {
result = result * seed + str.charAt(i)
}
(cap - 1) & result
}
}
页面浏览量TopN
package networkflow_analysis
import java.sql.Timestamp
import java.{lang, util}
import java.text.SimpleDateFormat
import java.util.Map
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
case class PageViewCount(url: String, widowEnd: Long, count: Long)
object NetworkFlowTopNPage {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")
val dataStream: DataStream[ApacheLogEvent] = inputStream
.map(data => {
val dataArray: Array[String] = data.split(" ")
val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(dataArray(3)).getTime
ApacheLogEvent(dataArray(0), dataArray(1), timestamp, dataArray(5), dataArray(6))
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {
override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
})
val lateOutputTag = new OutputTag[ApacheLogEvent]("late data")
val aggStream = dataStream
.keyBy(_.url)
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag)
.aggregate(new PageCountAgg(), new PageCountWindowResult)
val lateDataStream = aggStream.getSideOutput(lateOutputTag)
val resultStream = aggStream
.keyBy(_.widowEnd)
.process(new TopNHotPage(3))
resultStream.print("result")
env.execute("top n page job")
}
}
class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class PageCountWindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
out.collect(PageViewCount(key, window.getEnd, input.head))
}
}
class TopNHotPage(n: Int) extends KeyedProcessFunction[Long, PageViewCount, String] {
lazy val pageCountMapState: MapState[String, Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("pageCount_list", classOf[String], classOf[Long]))
override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = {
pageCountMapState.put(value.url, value.count)
ctx.timerService().registerEventTimeTimer(value.widowEnd + 1000L)
ctx.timerService().registerEventTimeTimer(value.widowEnd + 60 * 1000L)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
if (timestamp == ctx.getCurrentKey + 60 * 1000L) {
pageCountMapState.clear()
}
val allPageCountList: ListBuffer[(String, Long)] = ListBuffer()
val iter: util.Iterator[Map.Entry[String, Long]] = pageCountMapState.entries().iterator()
while (iter.hasNext) {
val entry: Map.Entry[String, Long] = iter.next()
allPageCountList += ((entry.getKey, entry.getValue))
}
val sortedPageCountList: ListBuffer[(String, Long)] = allPageCountList.sortWith(_._2 > _._2).take(n)
val result: StringBuilder = new StringBuilder
result.append("时间:").append(new Timestamp(timestamp)).append("\n")
for (i <- sortedPageCountList.indices) {
val currentItemCount: (String,Long) = sortedPageCountList(i)
result.append("Top").append(i + 1).append(":")
.append(" 页面url=").append(currentItemCount._1)
.append(" 访问量=").append(currentItemCount._2)
.append("\n")
}
result.append("======================================================================================\n\n")
Thread.sleep(1000)
out.collect(result.toString())
}
}
app不分渠道统计
package market_analysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AppMarketingTotal {
def main(args: Array[String]): Unit = {
val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream:DataStream[MarketUserBehavior]=env.addSource(new SimulateMarketEventSource)
.assignAscendingTimestamps(_.timestamp)
val resultStream:DataStream[MarketCount]=dataStream
.filter(_.behavior!="UNINSTall")
.map(_=>("total",1L))
.keyBy(_._1)
.timeWindow(Time.hours(1),Time.seconds(5))
.aggregate(new MarketCountAgg(),new MarketCountResult())
resultStream.print()
env.execute("market total count job")
}
}
class MarketCountAgg()extends AggregateFunction[(String,Long),Long,Long]{
override def createAccumulator(): Long = 0L
override def add(value: (String, Long), accumulator: Long): Long = accumulator+1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
}
class MarketCountResult() extends WindowFunction[Long,MarketCount,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketCount]): Unit = {
val windowStart:String=new Timestamp(window.getStart).toString
val windowEnd:String=new Timestamp(window.getEnd).toString
val count:Long=input.head
out.collect(MarketCount(windowStart,windowEnd,"total","total",count))
}
}
app分渠道统计
package market_analysis
import java.sql.Timestamp
import java.util.UUID
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
case class MarketUserBehavior(userId:String,behavior:String,channel:String,timestamp:Long)
case class MarketCount(windowStart:String,windowEnd:String,channel:String,behavior: String,count:Long)
class SimulateMarketEventSource extends RichParallelSourceFunction[MarketUserBehavior]{
var running:Boolean=true
val behaviorSet:Seq[String]=Seq("CLICK","DOWNLOAD","INSTALL","UNINSTALL")
val channelSet:Seq[String]=Seq("appStore","huaweiStore","weibo","wechat")
val rand:Random=Random
override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
val maxCounts: Long =Long.MaxValue
var count: Long =0L
while(running&&count<maxCounts){
val id: String =UUID.randomUUID().toString
val behavior: String =behaviorSet(rand.nextInt(behaviorSet.size))
val channel: String =channelSet(rand.nextInt(channelSet.size))
val ts: Long =System.currentTimeMillis()
ctx.collect(MarketUserBehavior(id,behavior,channel,ts))
count+=1
Thread.sleep(50L)
}
}
override def cancel(): Unit = running = false
}
object AppMarketingByChannel {
def main(args: Array[String]): Unit = {
val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream:DataStream[MarketUserBehavior]=env.addSource(new SimulateMarketEventSource)
.assignAscendingTimestamps(_.timestamp)
val resultStream:DataStream[MarketCount]=dataStream
.filter(_.behavior!="UNINSTALL")
.keyBy(data=>(data.channel,data.behavior))
.timeWindow(Time.hours(1),Time.seconds(5))
.process(new MarketCountByChannel())
resultStream.print()
env.execute("market count bu channel job")
}
}
class MarketCountByChannel()extends ProcessWindowFunction[MarketUserBehavior,MarketCount,(String,String),TimeWindow]{
override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketCount]): Unit = {
val windowStart:String=new Timestamp(context.window.getStart).toString
val windowEnd:String=new Timestamp(context.window.getEnd).toString
val channel:String=key._1
val behavior:String=key._2
val count:Long=elements.size
out.collect(MarketCount(windowStart,windowEnd,channel,behavior,count))
}
}
广告点击及过滤
package market_analysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.transformations.SideOutputTransformation
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
case class AdClickEvent(userId:Long,adId:Long,province: String,city:String,timestamp: Long)
case class AdCountByProvince(province:String,windowEnd:String,count:Long)
case class BlackListWarning(userId:Long,adId:Long,msg:String)
object AdAnalysisByProvince {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val resource=getClass.getResource("/AdClickLog.csv")
val adLogStream:DataStream[AdClickEvent]=env.readTextFile(resource.getPath)
.map(data=>{
val dataArray=data.split(",")
AdClickEvent(dataArray(0).toLong,dataArray(1).toLong,dataArray(2),dataArray(3),dataArray(4).toLong)
}).assignAscendingTimestamps(_.timestamp*1000L)
val filterBlackListStream:DataStream[AdClickEvent]=adLogStream
.keyBy(data=>(data.userId,data.adId))
.process(new FilterBlackList(100L))
val adCountStream:DataStream[AdCountByProvince]=adLogStream
.keyBy(_.province)
.timeWindow(Time.hours(1),Time.seconds(5))
.aggregate(new AdCountAgg(),new AdCountResult())
adCountStream.print()
filterBlackListStream.getSideOutput(new OutputTag[BlackListWarning]("blacklist")).print("blacklist")
env.execute("ad analysis by province")
}
}
class AdCountAgg()extends AggregateFunction[AdClickEvent,Long,Long]{
override def createAccumulator(): Long = 0L
override def add(value: AdClickEvent, accumulator: Long): Long = accumulator+1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
}
class AdCountResult()extends WindowFunction[Long,AdCountByProvince,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit = {
val province: String =key
val windowEnd: Long =window.getEnd
val count: Long =input.head
out.collect(AdCountByProvince(province,new Timestamp(windowEnd).toString,count))
}
}
class FilterBlackList(maxClickCount: Long)extends KeyedProcessFunction[(Long,Long),AdClickEvent,AdClickEvent]{
lazy val countState:ValueState[Long]=getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
lazy val isSentState:ValueState[Boolean]=getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-sent",classOf[Boolean]))
override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = {
val curCount: Long =countState.value()
if(curCount==0){
val ts: Long =(ctx.timerService().currentProcessingTime()/(1000*60*60*24)+1)*(1000*60*60*24)
ctx.timerService().registerProcessingTimeTimer(ts)
}
if(curCount>=maxClickCount){
if(!isSentState.value()){
ctx.output(new OutputTag[BlackListWarning]("blacklist"),BlackListWarning(value.userId,value.adId,"click over"+maxClickCount+" times today"))
isSentState.update(true)
}
return
}
countState.update(curCount+1)
out.collect(value)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = {
countState.clear()
isSentState.clear()
}
}
恶意登录监控
package loginFail_detect
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
case class LoginEvent(userId:Long,ip:String,eventType:String,eventTime:Long)
case class Warning(userID:Long,firstFailTime:Long,lastFailTime:Long, warningMsg:String)
object LoginFail {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource=getClass.getResource("/LoginLog.csv")
val loginEventStream:DataStream[LoginEvent]=env.readTextFile(resource.getPath)
.map(data=>{
val dataArray=data.split(",")
LoginEvent(dataArray(0).toLong,dataArray(1),dataArray(2),dataArray(3).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
override def extractTimestamp(element: LoginEvent): Long = element.eventTime*1000L
})
val loginWarningStream:DataStream[Warning]=loginEventStream
.keyBy(_.userId)
.process(new LoginFailWarning(2))
loginWarningStream.print()
env.execute("login fail job")
}
}
class LoginFailWarning(maxFailTimes: Int)extends KeyedProcessFunction[Long,LoginEvent,Warning]{
lazy val loginFailListState:ListState[LoginEvent]=getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("saved-loginEvent",classOf[LoginEvent]))
lazy val timerTsState:ValueState[Long]=getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts",classOf[Long]))
override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, Warning]#Context, out: Collector[Warning]): Unit = {
if(value.eventType=="fail"){
loginFailListState.add(value)
if(timerTsState.value()==0){
val ts=value.eventTime*1000L+2000L
ctx.timerService().registerEventTimeTimer(ts)
timerTsState.update(ts)
}
}else{
ctx.timerService().deleteEventTimeTimer(timerTsState.value())
loginFailListState.clear()
timerTsState.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, Warning]#OnTimerContext, out: Collector[Warning]): Unit = {
val allLoginFailList:ListBuffer[LoginEvent]=new ListBuffer[LoginEvent]
val iter=loginFailListState.get().iterator()
while (iter.hasNext){
allLoginFailList+=iter.next()
}
if(allLoginFailList.length>=maxFailTimes){
out.collect(Warning(ctx.getCurrentKey,allLoginFailList.head.eventTime,allLoginFailList.last.eventTime,"login fail in 2s for "+allLoginFailList.length+" times."))
}
loginFailListState.clear()
timerTsState.clear()
}
}
使用Cep监控恶意登入
package loginFail_detect
import java.util
import javax.tools.ForwardingFileObject
import loginFail_detect.LoginFail.getClass
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object LoginFailWithCep {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource = getClass.getResource("/LoginLog.csv")
val loginEventStream: DataStream[LoginEvent] = env.readTextFile(resource.getPath)
.map(data => {
val dataArray = data.split(",")
LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L
})
val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern
.begin[LoginEvent]("firstFail").where(_.eventType == "fail")
.next("secondFail").where(_.eventType == "fail")
.within(Time.seconds(2))
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
val loginFailStream:DataStream[Warning]=patternStream.select(new LoginFailDetect())
loginFailStream.print()
env.execute("login fail job")
}
}
class LoginFailDetect()extends PatternSelectFunction[LoginEvent,Warning]{
override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
val firstLoginFail=map.get("firstFail").get(0)
val secondLoginFail=map.get("secondFail").get(0)
Warning(firstLoginFail.userId,firstLoginFail.eventTime,secondLoginFail.eventTime,"login fail")
}
}
订单支付
package order_detect
import java.util
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
case class OrderEvent(orderId:Long,eventType:String,txId:String,eventTime:Long)
case class OrderResult(orderId:Long,resultMsg:String)
object OrderTimeout {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource=getClass.getResource("/OrderLog.csv")
val orderEventStream:DataStream[OrderEvent]=env.readTextFile(resource.getPath)
.map(data=>{
val array=data.split(",")
OrderEvent(array(0).toLong,array(1),array(2),array(3).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
override def extractTimestamp(element: OrderEvent): Long = element.eventTime*1000L
})
val orderPayPattern=Pattern
.begin[OrderEvent]("create").where(_.eventType=="create")
.followedBy("pay").where(_.eventType=="pay")
.within(Time.minutes(15))
val patternStream: PatternStream[OrderEvent] =CEP.pattern(orderEventStream.keyBy(_.orderId),orderPayPattern)
val orderTimeoutOutputTag=new OutputTag[OrderResult]("order timeout")
val resultStream:DataStream[OrderResult]=patternStream
.select(orderTimeoutOutputTag,new OrderTimeoutSelect(),new OrderPaySelect())
resultStream.print("payed")
resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")
env.execute("order timeout detect job")
}
}
class OrderTimeoutSelect()extends PatternTimeoutFunction[OrderEvent,OrderResult]{
override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
val timeoutOrderId=map.get("create").iterator().next().orderId
OrderResult(timeoutOrderId,"timeout at " + l)
}
}
class OrderPaySelect()extends PatternSelectFunction[OrderEvent,OrderResult]{
override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
val payOrderId=map.get("pay").get(0).orderId
OrderResult(payOrderId,"payed successfully")
}
}
使用Cep监控订单支付
package order_detect
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object OrderTimeoutWithoutCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource = getClass.getResource("/OrderLog.csv")
val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
.map(data => {
val array: Array[String] = data.split(",")
OrderEvent(array(0).toLong, array(1), array(2), array(3).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(4)) {
override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L
})
val orderResultStream: DataStream[OrderResult] = orderEventStream
.keyBy(_.orderId)
.process(new OrderPayMatchDetect())
orderResultStream.print("payed")
orderResultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print()
env.execute("order timeout")
}
}
class OrderPayMatchDetect() extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed", classOf[Boolean]))
lazy val isCreatedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-created", classOf[Boolean]))
lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long]))
val orderTimeoutOutputTag: OutputTag[OrderResult] = new OutputTag[OrderResult]("timeout")
override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
val isPayed = isPayedState.value()
val isCreated = isCreatedState.value()
val timerTs = timerTsState.value()
if (value.eventType == "create") {
if (isPayed) {
out.collect(OrderResult(value.orderId, "payed successfully"))
isPayedState.clear()
timerTsState.clear()
ctx.timerService().deleteEventTimeTimer(timerTs)
}
else {
val ts: Long = value.eventTime * 1000L + 15 * 60 * 1000L
ctx.timerService().registerEventTimeTimer(ts)
timerTsState.update(ts)
isCreatedState.update(true)
}
}
else if (value.eventType == "pay") {
if (isCreated) {
if (value.eventTime * 1000 < timerTs) {
out.collect(OrderResult(value.orderId, "payed successfully"))
}
else {
ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout"))
}
isCreatedState.clear()
timerTsState.clear()
ctx.timerService().deleteEventTimeTimer(timerTs)
}
else {
val ts = value.eventTime * 1000L
ctx.timerService().registerEventTimeTimer(ts)
timerTsState.update(ts)
isPayedState.update(true)
}
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
if (isPayedState.value()) {
ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found create log"))
} else {
ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
}
isPayedState.clear()
isCreatedState.clear()
timerTsState.clear()
}
}
实时对账
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
case class ReceiptEvent(txId: String, payChannel: String, timeStamp: Long)
case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)
case class OrderResult(orderId: Long, resultMsg: String)
object OrderPayTxMatch {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource = getClass.getResource("/OrderLog.csv")
val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
.map(data => {
val array: Array[String] = data.split(",")
OrderEvent(array(0).toLong, array(1), array(2), array(3).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(4)) {
override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L
}).filter(_.txId != "").keyBy(_.txId)
val resource2 = getClass.getResource("/ReceiptLog.csv")
val receiptEventStream: DataStream[ReceiptEvent] = env.readTextFile(resource2.getPath)
.map(data => {
val array: Array[String] = data.split(",")
ReceiptEvent(array(0), array(1), array(2).toLong)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(4)) {
override def extractTimestamp(element: ReceiptEvent): Long = element.timeStamp * 1000L
})
.keyBy(_.txId)
val resultStream: DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream
.connect(receiptEventStream)
.process(new OrderPayTxDetect())
val unmatchedPays: OutputTag[OrderEvent] = new OutputTag[OrderEvent]("unmatched-pays")
val unmatchedReceipts: OutputTag[ReceiptEvent] = new OutputTag[ReceiptEvent]("unmatched-receipts")
resultStream.print("matched")
resultStream.getSideOutput(unmatchedPays).print("unmatched-pays")
resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts")
env.execute("order pay tx match job")
}
}
class OrderPayTxDetect() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)] {
lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))
lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))
val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")
override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
val receipt: ReceiptEvent = receiptState.value()
if (receipt != null) {
out.collect((pay, receipt))
receiptState.clear()
} else {
payState.update(pay)
ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L + 3000L)
}
}
override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
val pay: OrderEvent = payState.value()
if (pay != null) {
out.collect((pay, receipt))
payState.clear()
} else {
receiptState.update(receipt)
ctx.timerService().registerEventTimeTimer(receipt.timeStamp * 1000L + 5000L)
}
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
if (payState.value() != null) {
ctx.output(unmatchedPays, payState.value())
}
if (receiptState.value() != null) {
ctx.output(unmatchedReceipts, receiptState.value())
}
payState.clear()
receiptState.clear()
}
}
|