IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flinkScala -> 正文阅读

[大数据]flinkScala

package com. gu.networkflow_analysis

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/**

  • Copyright ? 2018-2028 All Rights Reserved
  • Project: UserBehaviorAnalysis
  • Package: com. gu.networkflow_analysis
  • Version: 1.0
  • Created by wushengran on 2019/9/23 9:21
    */

// 输入数据样例类
case class ApacheLogEvent( ip: String, userId: String, eventTime: Long, method: String, url: String)

// 窗口聚合结果样例类
case class UrlViewCount( url: String, windowEnd: Long, count: Long )

object NetworkFlow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// val dataStream = env.readTextFile(“D:\Projects\BigData\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\apache.log”)
val dataStream = env.socketTextStream(“localhost”, 7777)
.map( data => {
val dataArray = data.split(" ")
// 定义时间转换
val simpleDateFormat = new SimpleDateFormat(“dd/MM/yyyy:HH:mm:ss”)
val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime
ApacheLogEvent( dataArray(0).trim, dataArray(1).trim, timestamp, dataArray(5).trim, dataArray(6).trim )
} )
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorApacheLogEvent {
override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
} )
.keyBy(_.url)
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.seconds(60))
.aggregate( new CountAgg(), new WindowResult() )

val processedStream = dataStream
  .keyBy(_.windowEnd)
  .process( new TopNHotUrls(5) )

dataStream.print("aggregate")
processedStream.print("process")

env.execute("network flow job")

}
}

// 自定义预聚合函数
class CountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long]{
override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

override def createAccumulator(): Long = 0L

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a + b
}

// 自定义窗口处理函数
class WindowResult() extends WindowFunction[Long, UrlViewCount, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
out.collect( UrlViewCount( key, window.getEnd, input.iterator.next() ) )
}
}

// 自定义排序输出处理函数
class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long, UrlViewCount, String]{
lazy val urlState: MapState[String, Long] = getRuntimeContext.getMapState( new MapStateDescriptor[String, Long](“url-state”, classOf[String], classOf[Long] ) )

override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = {
urlState.put(value.url, value.count)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 从状态中拿到数据
val allUrlViews: ListBuffer[(String, Long)] = new ListBuffer(String, Long)
val iter = urlState.entries().iterator()
while(iter.hasNext){
val entry = iter.next()
allUrlViews += (( entry.getKey, entry.getValue ))
}

// urlState.clear()

val sortedUrlViews = allUrlViews.sortWith(_._2 > _._2).take(topSize)

// 格式化结果输出
val result: StringBuilder = new StringBuilder()
result.append("时间:").append( new Timestamp( timestamp - 1 ) ).append("\n")
for( i <- sortedUrlViews.indices ){
  val currentUrlView = sortedUrlViews(i)
  result.append("NO").append(i + 1).append(":")
    .append(" URL=").append(currentUrlView._1)
    .append(" 访问量=").append(currentUrlView._2).append("\n")
}
result.append("=============================")
Thread.sleep(1000)
out.collect(result.toString())

}
}
package com. gu.networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**

  • Copyright ? 2018-2028 All Rights Reserved
  • Project: UserBehaviorAnalysis
  • Package: com. gu.networkflow_analysis
  • Version: 1.0
  • Created by wushengran on 2019/9/23 10:28
    */

// 定义输入数据的样例类
case class UserBehavior( userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long )

object PageView {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
  .map( data => {
    val dataArray = data.split(",")
    UserBehavior( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong )
  } )
  .assignAscendingTimestamps(_.timestamp * 1000L)
  .filter( _.behavior == "pv" )    // 只统计pv操作
  .map( data => ("pv", 1) )
  .keyBy(_._1)
  .timeWindow(Time.hours(1))
  .sum(1)

dataStream.print("pv count")

env.execute("page view jpb")

}
}
package com. gu.networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**

  • Copyright ? 2018-2028 All Rights Reserved
  • Project: UserBehaviorAnalysis
  • Package: com. gu.networkflow_analysis
  • Version: 1.0
  • Created by wushengran on 2019/9/23 10:43
    */
    case class UvCount( windowEnd: Long, uvCount: Long )

object UniqueVisitor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
  .map( data => {
    val dataArray = data.split(",")
    UserBehavior( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong )
  } )
  .assignAscendingTimestamps(_.timestamp * 1000L)
  .filter( _.behavior == "pv" )    // 只统计pv操作
  .timeWindowAll( Time.hours(1) )
  .apply( new UvCountByWindow() )

dataStream.print()
env.execute("uv job")

}
}

class UvCountByWindow() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{
override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
// 定义一个scala set,用于保存所有的数据userId并去重
var idSet = SetLong
// 把当前窗口所有数据的ID收集到set中,最后输出set的大小
for( userBehavior <- input ){
idSet += userBehavior.userId
}
out.collect( UvCount( window.getEnd, idSet.size ) )
}
}
package com. gu.networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: UserBehaviorAnalysis

  • Package: com. gu.networkflow_analysis

  • Version: 1.0

  • Created by wushengran on 2019/9/23 11:34
    */
    object UvWithBloom {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 用相对路径定义数据源
    val resource = getClass.getResource("/UserBehavior.csv")
    val dataStream = env.readTextFile(resource.getPath)
    .map(data => {
    val dataArray = data.split(",")
    UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
    })
    .assignAscendingTimestamps(.timestamp * 1000L)
    .filter(
    .behavior == “pv”) // 只统计pv操作
    .map(data => (“dummyKey”, data.userId))
    .keyBy(_._1)
    .timeWindow(Time.hours(1))
    .trigger(new MyTrigger())
    .process(new UvCountWithBloom())

    dataStream.print()

    env.execute(“uv with bloom job”)
    }
    }

// 自定义窗口触发器
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}

override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 每来一条数据,就直接触发窗口操作,并清空所有窗口状态
TriggerResult.FIRE_AND_PURGE
}
}

// 定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {
// 位图的总大小,默认16M
private val cap = if (size > 0) size else 1 << 27

// 定义hash函数
def hash(value: String, seed: Int): Long = {
var result = 0L
for( i <- 0 until value.length ){
result = result * seed + value.charAt(i)
}
result & ( cap - 1 )
}
}

class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{
// 定义redis连接
lazy val jedis = new Jedis(“localhost”, 6379)
lazy val bloom = new Bloom(1<<29)

override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
// 位图的存储方式,key是windowEnd,value是bitmap
val storeKey = context.window.getEnd.toString
var count = 0L
// 把每个窗口的uv count值也存入名为count的redis表,存放内容为(windowEnd -> uvCount),所以要先从redis中读取
if( jedis.hget(“count”, storeKey) != null ){
count = jedis.hget(“count”, storeKey).toLong
}
// 用布隆过滤器判断当前用户是否已经存在
val userId = elements.last._2.toString
val offset = bloom.hash(userId, 61)
// 定义一个标识位,判断reids位图中有没有这一位
val isExist = jedis.getbit(storeKey, offset)
if(!isExist){
// 如果不存在,位图对应位置1,count + 1
jedis.setbit(storeKey, offset, true)
jedis.hset(“count”, storeKey, (count + 1).toString)
out.collect( UvCount(storeKey.toLong, count + 1) )
} else {
out.collect( UvCount(storeKey.toLong, count) )
}
}
}

package com. gu.marketanalysis

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**

  • Copyright ? 2018-2028 All Rights Reserved
  • Project: UserBehaviorAnalysis
  • Package: com. gu.marketanalysis
  • Version: 1.0
  • Created by wushengran on 2019/9/24 10:10
    */
    // 输入的广告点击事件样例类
    case class AdClickEvent( userId: Long, adId: Long, province: String, city: String, timestamp: Long )
    // 按照省份统计的输出结果样例类
    case class CountByProvince( windowEnd: String, province: String, count: Long )
    // 输出的黑名单报警信息
    case class BlackListWarning( userId: Long, adId: Long, msg: String )

object AdStatisticsByGeo {
// 定义侧输出流的tag
val blackListOutputTag: OutputTag[BlackListWarning] = new OutputTagBlackListWarning

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

// 读取数据并转换成AdClickEvent
val resource = getClass.getResource("/AdClickLog.csv")
val adEventStream = env.readTextFile(resource.getPath)
  .map( data => {
    val dataArray = data.split(",")
    AdClickEvent( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim, dataArray(3).trim, dataArray(4).trim.toLong )
  } )
  .assignAscendingTimestamps(_.timestamp * 1000L)

// 自定义process function,过滤大量刷点击的行为
val filterBlackListStream = adEventStream
  .keyBy( data => (data.userId, data.adId) )
  .process( new FilterBlackListUser(100) )

// 根据省份做分组,开窗聚合
val adCountStream = filterBlackListStream
  .keyBy(_.province)
  .timeWindow( Time.hours(1), Time.seconds(5) )
  .aggregate( new AdCountAgg(), new AdCountResult() )

adCountStream.print("count")
filterBlackListStream.getSideOutput(blackListOutputTag).print("blacklist")

env.execute("ad statistics job")

}

class FilterBlackListUser(maxCount: Int) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]{
// 定义状态,保存当前用户对当前广告的点击量
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long](“count-state”, classOf[Long]))
// 保存是否发送过黑名单的状态
lazy val isSentBlackList: ValueState[Boolean] = getRuntimeContext.getState( new ValueStateDescriptor[Boolean](“issent-state”, classOf[Boolean]) )
// 保存定时器触发的时间戳
lazy val resetTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“resettime-state”, classOf[Long]) )

override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = {
  // 取出count状态
  val curCount = countState.value()

  // 如果是第一次处理,注册定时器,每天00:00触发
  if( curCount == 0 ){
    val ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24)
    resetTimer.update(ts)
    ctx.timerService().registerProcessingTimeTimer(ts)
  }

  // 判断计数是否达到上限,如果到达则加入黑名单
  if( curCount >= maxCount ){
    // 判断是否发送过黑名单,只发送一次
    if( !isSentBlackList.value() ){
      isSentBlackList.update(true)
      // 输出到侧输出流
      ctx.output( blackListOutputTag, BlackListWarning(value.userId, value.adId, "Click over " + maxCount + " times today.") )
    }
    return
  }
  // 计数状态加1,输出数据到主流
  countState.update( curCount + 1 )
  out.collect( value )
}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = {
  // 定时器触发时,清空状态
  if( timestamp == resetTimer.value() ){
    isSentBlackList.clear()
    countState.clear()
    resetTimer.clear()
  }
}

}
}

// 自定义预聚合函数
class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{
override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1

override def createAccumulator(): Long = 0L

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a + b
}

// 自定义窗口处理函数
class AdCountResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[CountByProvince]): Unit = {
out.collect( CountByProvince( new Timestamp(window.getEnd).toString, key, input.iterator.next() ) )
}
}
package com. gu.marketanalysis

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: UserBehaviorAnalysis

  • Package: com. gu.marketanalysis

  • Version: 1.0

  • Created by wushengran on 2019/9/23 15:37
    */
    object AppMarketing {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream = env.addSource( new SimulatedEventSource() )
    .assignAscendingTimestamps(_.timestamp)
    .filter( .behavior != “UNINSTALL” )
    .map( data => {
    ( “dummyKey”, 1L )
    } )
    .keyBy(
    ._1) // 以渠道和行为类型作为key分组
    .timeWindow( Time.hours(1), Time.seconds(10) )
    .aggregate( new CountAgg(), new MarketingCountTotal() )

    dataStream.print()
    env.execute(“app marketing job”)
    }
    }

class CountAgg() extends AggregateFunction[(String, Long), Long, Long]{
override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1

override def createAccumulator(): Long = 0L

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a + b
}

class MarketingCountTotal() extends WindowFunction[Long, MarketingViewCount, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketingViewCount]): Unit = {
val startTs = new Timestamp(window.getStart).toString
val endTs = new Timestamp(window.getEnd).toString
val count = input.iterator.next()
out.collect( MarketingViewCount(startTs, endTs, “app marketing”, “total”, count) )
}
}
package com. gu.marketanalysis

import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

/**

  • Copyright ? 2018-2028 All Rights Reserved
  • Project: UserBehaviorAnalysis
  • Package: com. gu.marketanalysis
  • Version: 1.0
  • Created by wushengran on 2019/9/23 15:06
    */

// 输入数据样例类
case class MarketingUserBehavior( userId: String, behavior: String, channel: String, timestamp: Long )
// 输出结果样例类
case class MarketingViewCount( windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long )

object AppMarketingByChannel {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.addSource( new SimulatedEventSource() )
  .assignAscendingTimestamps(_.timestamp)
  .filter( _.behavior != "UNINSTALL" )
  .map( data => {
    ( (data.channel, data.behavior), 1L )
  } )
  .keyBy(_._1)     // 以渠道和行为类型作为key分组
  .timeWindow( Time.hours(1), Time.seconds(10) )
  .process( new MarketingCountByChannel() )

dataStream.print()
env.execute("app marketing by channel job")

}
}

// 自定义数据源
class SimulatedEventSource() extends RichSourceFunction[MarketingUserBehavior]{
// 定义是否运行的标识位
var running = true
// 定义用户行为的集合
val behaviorTypes: Seq[String] = Seq(“CLICK”, “DOWNLOAD”, “INSTALL”, “UNINSTALL”)
// 定义渠道的集合
val channelSets: Seq[String] = Seq(“wechat”, “weibo”, “appstore”, “huaweistore”)
// 定义一个随机数发生器
val rand: Random = new Random()

override def cancel(): Unit = running = false

override def run(ctx: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = {
// 定义一个生成数据的上限
val maxElements = Long.MaxValue
var count = 0L

// 随机生成所有数据
while( running && count < maxElements ){
  val id = UUID.randomUUID().toString
  val behavior = behaviorTypes(rand.nextInt(behaviorTypes.size))
  val channel = channelSets(rand.nextInt(channelSets.size))
  val ts = System.currentTimeMillis()

  ctx.collect( MarketingUserBehavior( id, behavior, channel, ts ) )

  count += 1
  TimeUnit.MILLISECONDS.sleep(10L)
}

}
}

// 自定义处理函数
class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long), MarketingViewCount, (String, String), TimeWindow]{
override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)], out: Collector[MarketingViewCount]): Unit = {
val startTs = new Timestamp(context.window.getStart).toString
val endTs = new Timestamp(context.window.getEnd).toString
val channel = key._1
val behavior = key._2
val count = elements.size
out.collect( MarketingViewCount(startTs, endTs, channel, behavior, count) )
}
}

package com. gu.apitest.sinktest

import java.util

import com. gu.apitest.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.apitest.sinktest

  • Version: 1.0

  • Created by wushengran on 2019/9/17 16:27
    */
    object EsSinkTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)

    // transform
    val dataStream = inputStream
    .map(
    data => {
    val dataArray = data.split(",")
    SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
    }
    )

    val httpHosts = new util.ArrayListHttpHost
    httpHosts.add(new HttpHost(“localhost”, 9200))

    // 创建一个esSink 的builder
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
    httpHosts,
    new ElasticsearchSinkFunction[SensorReading] {
    override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
    println("saving data: " + element)
    // 包装成一个Map或者JsonObject
    val json = new util.HashMapString, String
    json.put(“sensor_id”, element.id)
    json.put(“temperature”, element.temperature.toString)
    json.put(“ts”, element.timestamp.toString)

      // 创建index request,准备发送数据
      val indexRequest = Requests.indexRequest()
        .index("sensor")
        .`type`("readingdata")
        .source(json)
    
      // 利用index发送请求,写入数据
      indexer.add(indexRequest)
      println("data saved.")
    }
    

    }
    )

    // sink
    dataStream.addSink( esSinkBuilder.build() )

    env.execute(“es sink test”)
    }
    }
    package com. gu.apitest.sinktest

import java.sql.{Connection, DriverManager, PreparedStatement}

import com. gu.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.apitest.sinktest

  • Version: 1.0

  • Created by wushengran on 2019/9/17 16:44
    */
    object JdbcSinkTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)

    // transform
    val dataStream = inputStream
    .map(
    data => {
    val dataArray = data.split(",")
    SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    }
    )

    // sink
    dataStream.addSink( new MyJdbcSink() )

    env.execute(“jdbc sink test”)
    }
    }

class MyJdbcSink() extends RichSinkFunction[SensorReading]{
// 定义sql连接、预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _

// 初始化,创建连接和预编译语句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection(“jdbc:mysql://localhost:3306/test”, “root”, “123456”)
insertStmt = conn.prepareStatement(“INSERT INTO temperatures (sensor, temp) VALUES (?,?)”)
updateStmt = conn.prepareStatement(“UPDATE temperatures SET temp = ? WHERE sensor = ?”)
}

// 调用连接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
// 执行更新语句
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果update没有查到数据,那么执行插入语句
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}

// 关闭时做清理工作
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
package com. gu.apitest.sinktest

import com. gu.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.apitest.sinktest

  • Version: 1.0

  • Created by wushengran on 2019/9/17 16:12
    */
    object RedisSinkTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)

    // transform
    val dataStream = inputStream
    .map(
    data => {
    val dataArray = data.split(",")
    SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
    }
    )

    val conf = new FlinkJedisPoolConfig.Builder()
    .setHost(“localhost”)
    .setPort(6379)
    .build()

    // sink
    dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )

    env.execute(“redis sink test”)
    }
    }

class MyRedisMapper() extends RedisMapper[SensorReading]{

// 定义保存数据到redis的命令
override def getCommandDescription: RedisCommandDescription = {
// 把传感器id和温度值保存成哈希表 HSET key field value
new RedisCommandDescription( RedisCommand.HSET, “sensor_temperature” )
}

// 定义保存到redis的value
override def getValueFromData(t: SensorReading): String = t.temperature.toString

// 定义保存到redis的key
override def getKeyFromData(t: SensorReading): String = t.id
}
package com. gu.apitest.sinktest

import java.util.Properties

import com. gu.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.apitest.sinktest

  • Version: 1.0

  • Created by wushengran on 2019/9/17 15:43
    */
    object KafkaSinkTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    // val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)
    val properties = new Properties()
    properties.setProperty(“bootstrap.servers”, “localhost:9092”)
    properties.setProperty(“group.id”, “consumer-group”)
    properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
    properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
    properties.setProperty(“auto.offset.reset”, “latest”)

    val inputStream = env.addSource(new FlinkKafkaConsumer011[String](“sensor”, new SimpleStringSchema(), properties))

    // Transform操作

    val dataStream = inputStream
    .map(
    data => {
    val dataArray = data.split(",")
    SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出
    }
    )

    // sink
    dataStream.addSink( new FlinkKafkaProducer011[String]( “sinkTest”, new SimpleStringSchema(), properties) )
    dataStream.print()

    env.execute(“kafka sink test”)
    }
    }
    package com. gu.apitest

import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.apitest

  • Version: 1.0

  • Created by wushengran on 2019/8/24 10:14
    */
    object ProcessFunctionTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    env.enableCheckpointing(60000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
    env.getCheckpointConfig.setCheckpointTimeout(100000)
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
    // env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100)
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)

    env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))

// env.setStateBackend( new RocksDBStateBackend("") )

val stream = env.socketTextStream("localhost", 7777)

val dataStream = stream.map(data => {
  val dataArray = data.split(",")
  SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
  .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {
  override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
} )

val processedStream = dataStream.keyBy(_.id)
  .process( new TempIncreAlert() )

val processedStream2 = dataStream.keyBy(_.id)

// .process( new TempChangeAlert(10.0) )
.flatMap( new TempChangeAlert(10.0) )

val processedStream3 = dataStream.keyBy(_.id)
  .flatMapWithState[(String, Double, Double), Double]{
  // 如果没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
  case ( input: SensorReading, None ) => ( List.empty, Some(input.temperature) )
  // 如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警
  case ( input: SensorReading, lastTemp: Some[Double] ) =>
    val diff = ( input.temperature - lastTemp.get ).abs
    if( diff > 10.0 ){
      ( List((input.id, lastTemp.get, input.temperature)), Some(input.temperature) )
    } else
      ( List.empty, Some(input.temperature) )
}

dataStream.print("input data")
processedStream3.print("processed data")

env.execute("process function test")

}
}

class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String]{

// 定义一个状态,用来保存上一个数据的温度值
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )
// 定义一个状态,用来保存定时器的时间戳
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“currentTimer”, classOf[Long]) )

override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 先取出上一个温度值
val preTemp = lastTemp.value()
// 更新温度值
lastTemp.update( value.temperature )

val curTimerTs = currentTimer.value()


if( value.temperature < preTemp || preTemp == 0.0 ){
  // 如果温度下降,或是第一条数据,删除定时器并清空状态
  ctx.timerService().deleteProcessingTimeTimer( curTimerTs )
  currentTimer.clear()
} else if ( value.temperature > preTemp && curTimerTs == 0 ){
  // 温度上升且没有设过定时器,则注册定时器
  val timerTs = ctx.timerService().currentProcessingTime() + 5000L
  ctx.timerService().registerProcessingTimeTimer( timerTs )
  currentTimer.update( timerTs )
}

}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
// 输出报警信息
out.collect( ctx.getCurrentKey + " 温度连续上升" )
currentTimer.clear()
}
}

class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{

private var lastTempState: ValueState[Double] = _

override def open(parameters: Configuration): Unit = {
// 初始化的时候声明state变量
lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]))
}

override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的温度值
val lastTemp = lastTempState.value()
// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}

}

class TempChangeAlert2(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)]{
// 定义一个状态变量,保存上次的温度值
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )

override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的温度值
val lastTemp = lastTempState.value()
// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}
}
package com. gu.apitest

import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.apitest

  • Version: 1.0

  • Created by wushengran on 2019/8/24 11:16
    */
    object SideOutputTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env.socketTextStream(“localhost”, 7777)

    val dataStream = stream.map(data => {
    val dataArray = data.split(",")
    SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
    .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorSensorReading {
    override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
    } )

    val processedStream = dataStream
    .process( new FreezingAlert() )

// dataStream.print(“input data”)
processedStream.print(“processed data”)
processedStream.getSideOutput( new OutputTag[String](“freezing alert”) ).print(“alert data”)

env.execute("side output test")

}
}

// 冰点报警,如果小于32F,输出报警信息到侧输出流
class FreezingAlert() extends ProcessFunction[SensorReading, SensorReading]{

// lazy val alertOutput: OutputTag[String] = new OutputTag[String]( “freezing alert” )

override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if( value.temperature < 32.0 ){
ctx.output( new OutputTag[String]( “freezing alert” ), "freezing alert for " + value.id )
}
out.collect( value )
}
}
package com. gu.apitest

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import scala.util.Random

/**

  • Copyright ? 2018-2028 All Rights Reserved
  • Project: FlinkTutorial
  • Package: com. gu.apitest
  • Version: 1.0
  • Created by wushengran on 2019/9/17 10:11
    */

// 定义传感器数据样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )

object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// 1. 从集合中读取数据
val stream1 = env.fromCollection(List(
  SensorReading("sensor_1", 1547718199, 35.80018327300259),
  SensorReading("sensor_6", 1547718201, 15.402984393403084),
  SensorReading("sensor_7", 1547718202, 6.720945201171228),
  SensorReading("sensor_10", 1547718205, 38.101067604893444)
))

// env.fromElements(“flink”, 1, 32, 3213, 0.324).print(“test”)

// 2. 从文件中读取数据
val stream2 = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")

// 3. 从kafka中读取数据
// 创建kafka相关的配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

// 4. 自定义数据源
val stream4 = env.addSource(new SensorSource())

// sink输出
stream4.print("stream4")

env.execute("source api test")

}
}

class SensorSource() extends SourceFunction[SensorReading]{
// 定义一个flag:表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = running = false

override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 创建一个随机数发生器
val rand = new Random()

// 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据
var curTemp = 1.to(10).map(
  i => ( "sensor_" + i, 60 + rand.nextGaussian() * 20 )
)

// 无限循环生成流数据,除非被cancel
while(running){
  // 更新温度值
  curTemp = curTemp.map(
    t => (t._1, t._2 + rand.nextGaussian())
  )
  // 获取当前的时间戳
  val curTime = System.currentTimeMillis()
  // 包装成SensorReading,输出
  curTemp.foreach(
    t => ctx.collect( SensorReading(t._1, curTime, t._2) )
  )
  // 间隔100ms
  Thread.sleep(100)
}

}
}
package com. gu.apitest

import org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.apitest

  • Version: 1.0

  • Created by wushengran on 2019/9/17 11:41
    */
    object TransformTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 读入数据
    val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)

    // Transform操作

    val dataStream = inputStream
    .map(
    data => {
    val dataArray = data.split(",")
    SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
    }
    )

    // 1. 聚合操作
    val stream1 = dataStream
    .keyBy(“id”)
    // .sum(“temperature”)
    .reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10) )

    // 2. 分流,根据温度是否大于30度划分
    val splitStream = dataStream
    .split( sensorData => {
    if( sensorData.temperature > 30 ) Seq(“high”) else Seq(“low”)
    } )

    val highTempStream = splitStream.select(“high”)
    val lowTempStream = splitStream.select(“low”)
    val allTempStream = splitStream.select(“high”, “low”)

    // 3. 合并两条流
    val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) )
    val connectedStreams = warningStream.connect(lowTempStream)

    val coMapStream = connectedStreams.map(
    warningData => ( warningData._1, warningData._2, “high temperature warning” ),
    lowData => ( lowData.id, “healthy” )
    )

    val unionStream = highTempStream.union(lowTempStream)

    // 函数类
    dataStream.filter( new MyFilter() ).print()

    // 输出数据
    // dataStream.print()
    // highTempStream.print(“high”)
    // lowTempStream.print(“low”)
    // allTempStream.print(“all”)
    // unionStream.print(“union”)

    env.execute(“transform test job”)
    }
    }

class MyFilter() extends FilterFunction[SensorReading]{
override def filter(value: SensorReading): Boolean = {
value.id.startsWith(“sensor_1”)
}
}

class MyMapper() extends RichMapFunction[SensorReading, String]{
override def map(value: SensorReading): String = {
“flink”
}

override def open(parameters: Configuration): Unit = super.open(parameters)
}
package com. gu.apitest

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.apitest

  • Version: 1.0

  • Created by wushengran on 2019/9/18 9:31
    */
    object WindowTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(500)

    // 读入数据
    // val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)

    val inputStream = env.socketTextStream(“localhost”, 7777)

    val dataStream = inputStream
    .map(
    data => {
    val dataArray = data.split(",")
    SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    }
    )
    // .assignAscendingTimestamps(.timestamp * 1000L)
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading {
    override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
    })
    // .assignTimestampsAndWatermarks( new MyAssigner() )
    .map(data => (data.id, data.temperature))
    .keyBy(
    ._1)
    // .process( new MyProcess() )
    .timeWindow(Time.seconds(10), Time.seconds(3))
    .reduce((result, data) => (data._1, result._2.min(data._2))) // 统计10秒内的最低温度值

    dataStream.print()

    env.execute(“window api test”)
    }
    }

class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{
// 定义固定延迟为3秒
val bound: Long = 3 * 1000L
// 定义当前收到的最大的时间戳
var maxTs: Long = Long.MinValue

override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}

override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
maxTs = maxTs.max(element.timestamp * 1000L)
element.timestamp * 1000L
}
}

class MyAssigner2() extends AssignerWithPunctuatedWatermarks[SensorReading]{
val bound: Long = 1000L

override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {
if( lastElement.id == “sensor_1” ){
new Watermark(extractedTimestamp - bound)
}else{
null
}
}

override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
element.timestamp * 1000L
}
}
package com. gu.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

/**

  • Copyright ? 2018-2028 All Rights Reserved

  • Project: FlinkTutorial

  • Package: com. gu.wc

  • Version: 1.0

  • Created by wushengran on 2019/9/16 14:08
    */
    object StreamWordCount {
    def main(args: Array[String]): Unit = {

    val params = ParameterTool.fromArgs(args)
    val host: String = params.get(“host”)
    val port: Int = params.getInt(“port”)

    // 创建一个流处理的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // env.setParallelism(1)
    // env.disableOperatorChaining()

    // 接收socket数据流
    val textDataStream = env.socketTextStream(host, port)

    // 逐一读取数据,分词之后进行wordcount
    val wordCountDataStream = textDataStream.flatMap(.split("\s"))
    .filter(
    .nonEmpty).startNewChain()
    .map( (_, 1) )
    .keyBy(0)
    .sum(1)

    // 打印输出
    wordCountDataStream.print().setParallelism(1)

    // 执行任务
    env.execute(“stream word count job”)
    }
    }
    package com. gu.wc

import org.apache.flink.api.scala._

/**

  • Copyright ? 2018-2028 All Rights Reserved
  • Project: FlinkTutorial
  • Package: com. gu.wc
  • Version: 1.0
  • Created by wushengran on 2019/9/16 11:48
    */

// 批处理代码
object WordCount {
def main(args: Array[String]): Unit = {
// 创建一个批处理的执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

// 从文件中读取数据
val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"
val inputDataSet = env.readTextFile(inputPath)

// 分词之后做count
val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
  .map( (_, 1) )
  .groupBy(0)
  .sum(1)

// 打印输出
wordCountDataSet.print()

}
}
package streaming.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**

  • 把数据写入redis
    */
    public class SinkForRedisDemo {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource text = env.socketTextStream(“192.168.167.254”, 8888, “\n”);
    //lpsuh l_words word
    //对数据进行组装,把string转化为tuple2<String,String>
    DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> map(String value) throws Exception {
    return new Tuple2<>(“b”, value);
    }
    });
    //创建redis的配置
    FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“192.168.167.254”).setPort(6379).build();

     //创建redissink
     RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
     l_wordsData.addSink(redisSink);
     env.execute("StreamingDemoToRedis");
    

    }

    public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
    //表示从接收的数据中获取需要操作的redis key
    @Override
    public String getKeyFromData(Tuple2<String, String> data) {
    return data.f0;
    }
    //表示从接收的数据中获取需要操作的redis value
    @Override
    public String getValueFromData(Tuple2<String, String> data) {
    return data.f1;
    }

     @Override
     public RedisCommandDescription getCommandDescription() {
         return new RedisCommandDescription(RedisCommand.LPUSH);
     }
    

    }
    }

<?xml version="1.0" encoding="UTF-8"?>


4.0.0

<groupId>com.    .flink</groupId>
<artifactId>    Pro</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
    <module>ETL</module>
    <module>Report</module>
</modules>


<properties>
    <flink.version>1.9.0</flink.version>
    <scala.version>2.11.8</scala.version>
</properties>

<dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.3</version>
        </dependency>
        <!-- 日志相关依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
        <!-- redis依赖 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
        <!-- json依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>

        <!--es依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>

</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <testExcludes>
                    <testExclude>/src/test/**</testExclude>
                </testExcludes>
                <encoding>utf-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <id>compile-scala</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile-scala</id>
                    <phase>test-compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id> <!-- this is used for inheritance merges -->
                    <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
<?xml version="1.0" encoding="UTF-8"?>



Pro
com. .flink
1.0-SNAPSHOT

4.0.0

<artifactId>ETL</artifactId>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <!-- 日志相关依赖 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
    <!-- redis依赖 -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
    </dependency>
    <!-- json依赖 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>


</dependencies>

package com. .core;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com. .source. RedisSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Properties;

/**

  • 数据清洗
    */
    public class DataClean {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //我们是从Kafka里面读取数据,所以这儿就是topic有多少个partition,那么就设置几个并行度。
    env.setParallelism(3);
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //注释,我们这儿其实需要设置state backed类型,我们要把checkpoint的数据存储到
    //rocksdb里面

     //第一步:从Kafka里面读取数据 消费者 数据源需要kafka
     //topic的取名还是有讲究的,最好就是让人根据这个名字就能知道里面有什么数据。
     //xxxx_xxx_xxx_xxx
     String topic="allData";
     Properties consumerProperties = new Properties();
     consumerProperties.put("bootstrap.servers","192.168.167.254:9092");
     consumerProperties.put("group.id","allTopic_consumer");
    
     /**
      * String topic, 主题
      * KafkaDeserializationSchema<T> deserializer,
      * Properties props
      */
     FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic,
             new SimpleStringSchema(),
             consumerProperties);
     //{"dt":"2019-11-24 19:54:23","countryCode":"PK","data":[{"type":"s4","score":0.8,"level":"C"},{"type":"s5","score":0.2,"level":"C"}]}
     DataStreamSource<String> allData = env.addSource(consumer);
     //设置为广播变量
     DataStream<HashMap<String, String>> mapData = env.addSource(new     RedisSource()).broadcast();
     SingleOutputStreamOperator<String> etlData = allData.connect(mapData).flatMap(new CoFlatMapFunction<String, HashMap<String, String>, String>() {
         HashMap<String, String> allMap = new HashMap<String, String>();
    
         //里面处理的是kafka的数据
         @Override
         public void flatMap1(String line, Collector<String> out) throws Exception {
             JSONObject jsonObject = JSONObject.parseObject(line);
             String dt = jsonObject.getString("dt");
             String countryCode = jsonObject.getString("countryCode");
             //可以根据countryCode获取大区的名字
             String area = allMap.get(countryCode);
             JSONArray data = jsonObject.getJSONArray("data");
             for (int i = 0; i < data.size(); i++) {
                 JSONObject dataObject = data.getJSONObject(i);
                 System.out.println("大区:"+area);
                 dataObject.put("dt", dt);
                 dataObject.put("area", area);
                 //下游获取到数据的时候,也就是一个json格式的数据
                 out.collect(dataObject.toJSONString());
             }
    
    
         }
    
         //里面处理的是redis里面的数据
         @Override
         public void flatMap2(HashMap<String, String> map,
                              Collector<String> collector) throws Exception {
             System.out.println(map.toString());
             allMap = map;
    
         }
     });
    
     //ETL -> load kafka
    
    
     etlData.print().setParallelism(1);
    
     /**
      * String topicId,
      * SerializationSchema<IN> serializationSchema,
      * Properties producerConfig)
      */
    

// String outputTopic=“allDataClean”;
// Properties producerProperties = new Properties();
// producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
// FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic,
// new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
// producerProperties);
//
// //搞一个Kafka的生产者
// etlData.addSink(producer);

    env.execute("DataClean");


}

}
package com. .producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**

  • 模拟数据源
    */
    public class kafkaProducer {

    public static void main(String[] args) throws Exception{
    Properties prop = new Properties();
    //指定kafka broker地址
    prop.put(“bootstrap.servers”, “192.168.167.254:9092”);
    //指定key value的序列化方式
    prop.put(“key.serializer”, StringSerializer.class.getName());
    prop.put(“value.serializer”, StringSerializer.class.getName());
    //指定topic名称
    String topic = “allData”;

     //创建producer链接
     KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
    
     //{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}
    
    
     while(true){
         String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\",\"data\":[{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"},{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"}]}";
         System.out.println(message);
         //同步的方式,往Kafka里面生产数据
        producer.send(new ProducerRecord<String, String>(topic,message));
         Thread.sleep(2000);
     }
     //关闭链接
     //producer.close();
    

    }

    public static String getCurrentTime(){
    SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”);
    return sdf.format(new Date());
    }

    public static String getCountryCode(){
    String[] types = {“US”,“TW”,“HK”,“PK”,“KW”,“SA”,“IN”};
    Random random = new Random();
    int i = random.nextInt(types.length);
    return types[i];
    }

    public static String getRandomType(){
    String[] types = {“s1”,“s2”,“s3”,“s4”,“s5”};
    Random random = new Random();
    int i = random.nextInt(types.length);
    return types[i];
    }

    public static double getRandomScore(){
    double[] types = {0.3,0.2,0.1,0.5,0.8};
    Random random = new Random();
    int i = random.nextInt(types.length);
    return types[i];
    }

    public static String getRandomLevel(){
    String[] types = {“A”,“A+”,“B”,“C”,“D”};
    Random random = new Random();
    int i = random.nextInt(types.length);
    return types[i];
    }

}
package com. .source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.util.HashMap;
import java.util.Map;

/**
*

  • hset areas AREA_US US
  • hset areas AREA_CT TW,HK
  • hset areas AREA_AR PK,KW,SA
  • hset areas AREA_IN IN
  • IN,AREA_IN

*/
public class RedisSource implements SourceFunction<HashMap<String,String>> {

private Logger logger=LoggerFactory.getLogger(    RedisSource.class);

private Jedis jedis;
private boolean isRunning=true;

@Override
public void run(SourceContext<HashMap<String, String>> cxt) throws Exception {
    this.jedis = new Jedis("192.168.167.254",6379);
    HashMap<String, String> map = new HashMap<>();
    while(isRunning){
      try{
          map.clear();
          Map<String, String> areas = jedis.hgetAll("areas");
          for(Map.Entry<String,String> entry: areas.entrySet()){
              String area = entry.getKey();
              String value = entry.getValue();
              String[] fields = value.split(",");
              for(String country:fields){
                  map.put(country,area);
              }

          }
          if(map.size() > 0 ){
              cxt.collect(map);
          }
          Thread.sleep(60000);
      }catch (JedisConnectionException e){
          logger.error("redis连接异常",e.getCause());
          this.jedis = new Jedis("192.168.167.254",6379);
      }catch (Exception e){
          logger.error("数据源异常",e.getCause());
      }

    }

}

@Override
public void cancel() {
    isRunning=false;
    if(jedis != null){
        jedis.close();
    }

}

}

<?xml version="1.0" encoding="UTF-8"?>



Pro
com. .flink
1.0-SNAPSHOT

4.0.0

<artifactId>Report</artifactId>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <!-- 日志相关依赖 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
    <!-- redis依赖 -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
    </dependency>
    <!-- json依赖 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>

</dependencies>

package com. .core;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com. .function.MySumFuction;
import com. .watermark.MyWaterMark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

/**

  • ETL:对数据进行预处理

  • 报表:就是要计算一些指标
    */
    public class DataReport {

    public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //env.setStateBackend(new RocksDBStateBackend(""));
    //设置time
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    String topic=“auditLog”;
    Properties consumerProperties = new Properties();
    consumerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
    consumerProperties.put(“group.id”,“auditLog_consumer”);
    //读取Kafka里面对数据
    //{“dt”:“2019-11-24 21:19:47”,“type”:“child_unshelf”,“username”:“shenhe1”,“area”:“AREA_ID”}
    FlinkKafkaConsumer011 consumer =
    new FlinkKafkaConsumer011(topic,new SimpleStringSchema(),consumerProperties);
    DataStreamSource data = env.addSource(consumer);
    Logger logger= LoggerFactory.getLogger(DataReport.class);
    //对数据进行处理
    SingleOutputStreamOperator<Tuple3<Long, String, String>> preData = data.map(new MapFunction<String, Tuple3<Long, String, String>>() {
    /**
    * Long:time
    * String: type
    * String: area
    * @return
    * @throws Exception
    */
    @Override
    public Tuple3<Long, String, String> map(String line) throws Exception {
    JSONObject jsonObject = JSON.parseObject(line);
    String dt = jsonObject.getString(“dt”);
    String type = jsonObject.getString(“type”);
    String area = jsonObject.getString(“area”);
    long time = 0;
    try {
    SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
    time = sdf.parse(dt).getTime();
    } catch (ParseException e) {
    logger.error(“时间解析失败,dt:” + dt, e.getCause());
    }
    return Tuple3.of(time, type, area);
    }
    });

     /**
      *过滤无效的数据
      */
     SingleOutputStreamOperator<Tuple3<Long, String, String>> filterData = preData.filter(tuple3 -> tuple3.f0 != 0);
    
    
     /**
      * 收集迟到太久的数据
      */
     OutputTag<Tuple3<Long,String,String>> outputTag=
             new OutputTag<Tuple3<Long,String,String>>("late-date"){};
     /**
      * 进行窗口的统计操作
      * 统计的过去的一分钟的每个大区的,不同类型的有效视频数量
      */
    
     SingleOutputStreamOperator<Tuple4<String, String, String, Long>> resultData = filterData.assignTimestampsAndWatermarks(new MyWaterMark())
             .keyBy(1, 2)
             .window(TumblingEventTimeWindows.of(Time.seconds(30)))
             .sideOutputLateData(outputTag)
             .apply(new MySumFuction());
    
    
     /**
      * 收集到延迟太多的数据,业务里面要求写到Kafka
      */
    
     SingleOutputStreamOperator<String> sideOutput =
             //java8
             resultData.getSideOutput(outputTag).map(line -> line.toString());
    

// String outputTopic=“lateData”;
// Properties producerProperties = new Properties();
// producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
// FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic,
// new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
// producerProperties);
// sideOutput.addSink(producer);

    /**
     * 业务里面需要吧数据写到ES里面
     * 而我们公司是需要把数据写到kafka
     */

    resultData.print();


    env.execute("DataReport");

}

}
package com. .function;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;

/**

  • IN,输入的数据类型
  • OUT,输出的数据类型
  • KEY,在flink里面这儿其实就是分组的字段,大家永远看到的是一个tuple字段
  • 只不过,如果你的分组的字段是有一个,那么这个tuple里面就只会有一个字段
  • 如果说你的分组的字段有多个,那么这个里面就会有多个字段。
  • W extends Window

*/
public class MySumFuction implements WindowFunction<Tuple3<Long,String,String>,
Tuple4<String,String,String,Long>,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Tuple3<Long, String, String>> input,
Collector<Tuple4<String, String, String, Long>> out) {
//获取分组字段信息
String type = tuple.getField(0).toString();
String area = tuple.getField(1).toString();

    java.util.Iterator<Tuple3<Long, String, String>> iterator = input.iterator();
    long count=0;
    while(iterator.hasNext()){
        iterator.next();
        count++;
    }
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String time = sdf.format(new Date(timeWindow.getEnd()));


    Tuple4<String, String, String, Long> result =
            new Tuple4<String, String, String, Long>(time, type, area, count);
    out.collect(result);
}

}
package com. .source;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
*
*/
public class ProducerDataReport {

public static void main(String[] args) throws Exception{
Properties prop = new Properties();
//指定kafka broker地址
prop.put(“bootstrap.servers”, “192.168.167.254:9092”);
//指定key value的序列化方式
prop.put(“key.serializer”, StringSerializer.class.getName());
prop.put(“value.serializer”, StringSerializer.class.getName());
//指定topic名称
String topic = “auditLog”;

//创建producer链接
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
//{“dt”:“2018-01-01 10:11:22”,“type”:“shelf”,“username”:“shenhe1”,“area”:“AREA_US”}
//生产消息
while(true){
String message = “{“dt”:”"+getCurrentTime()+"",“type”:""+getRandomType()+"",“username”:""+getRandomUsername()+"",“area”:""+getRandomArea()+""}";
System.out.println(message);
producer.send(new ProducerRecord<String, String>(topic,message));
Thread.sleep(500);
}
//关闭链接
//producer.close();
}
public static String getCurrentTime(){
SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”);
return sdf.format(new Date());
}
public static String getRandomArea(){
String[] types = {“AREA_US”,“AREA_CT”,“AREA_AR”,“AREA_IN”,“AREA_ID”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
public static String getRandomType(){
String[] types = {“shelf”,“unshelf”,“black”,“chlid_shelf”,“child_unshelf”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
public static String getRandomUsername(){
String[] types = {“shenhe1”,“shenhe2”,“shenhe3”,“shenhe4”,“shenhe5”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}

}
package com. .watermark;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

/**
*
*/
public class MyWaterMark
implements AssignerWithPeriodicWatermarks<Tuple3<Long,String,String>> {
long currentMaxTimestamp=0L;
final long maxOutputOfOrderness=20000L;//允许乱序时间。
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutputOfOrderness);
}
@Override
public long extractTimestamp(Tuple3<Long, String, String>
element, long l) {
Long timeStamp = element.f0;
currentMaxTimestamp=Math.max(timeStamp,currentMaxTimestamp);
return timeStamp;
}
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-09 11:45:16  更:2021-12-09 11:47:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:58:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码