01 引言
在前面的博客,我们学习了Flink 的高级特性了,有兴趣的同学可以参阅下:
本文主要讲解Flink 多语言开发。
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/scala_api_extensions.html
02 Scala-Flink
2.1 需求
使用Flink从Kafka接收对电商点击流日志数据并进行实时处理:
- 数据预处理:对数据进行拓宽处理,也就是将数据变为宽表,方便后续分析
- 分析实时频道热点
- 分析实时频道PV/UV
2.2 准备工作
kafka:
查看主题:
/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01:2181
创建主题:
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic pyg
再次查看主题:
/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01:2181
启动控制台消费者
/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic pyg
删除主题--不需要执行
/export/servers/kafka/bin/kafka-topics.sh --delete --zookeeper node01:2181 --topic pyg
导入准备骨架代码:
2.3 代码实现
2.3.1 入口类-数据解析
object App {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
if(args.length<1){
env.setStateBackend(new FsStateBackend("file:///D:/ckp"))
}else{
env.setStateBackend(new FsStateBackend(args(0)))
}
env.enableCheckpointing(1000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
))
val topic: String = "pyg"
val schema = new SimpleStringSchema()
val props:Properties = new Properties()
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"node1:9092")
props.setProperty("group.id","flink")
props.setProperty("auto.offset.reset","latest")
props.setProperty("flink.partition-discovery.interval-millis","5000")
val kafkaSource: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](topic,schema,props)
kafkaSource.setCommitOffsetsOnCheckpoints(true)
val jsonStrDS: DataStream[String] = env.addSource(kafkaSource)
val messageDS: DataStream[Message] = jsonStrDS.map(jsonStr => {
val jsonObj: JSONObject = JSON.parseObject(jsonStr)
val count: lang.Long = jsonObj.getLong("count")
val timeStamp: lang.Long = jsonObj.getLong("timeStamp")
val clickLogStr: String = jsonObj.getString("message")
val clickLog: ClickLog = JSON.parseObject(clickLogStr, classOf[ClickLog])
Message(clickLog, count, timeStamp)
})
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(200)
val watermakerDS: DataStream[Message] = messageDS.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[Message](org.apache.flink.streaming.api.windowing.time.Time.seconds(5)) {
override def extractTimestamp(element: Message): Long = element.timeStamp
}
)
val clickLogWideDS: DataStream[ClickLogWide] = ProcessTask.process(watermakerDS)
clickLogWideDS.print()
ChannelRealHotTask.process(clickLogWideDS)
ChannelRealPvUvTask.process(clickLogWideDS)
env.execute()
}
}
2.3.2 数据预处理
为了方便后续分析,我们需要对点击流日志,使用Flink进行实时预处理。在原有点击流日志的基础上添加一些字段,方便进行后续业务功能的统计开发。
以下为Kafka中消费得到的原始点击流日志字段:
字段名 说明 channelID 频道ID categoryID 产品类别ID produceID 产品ID country 国家 province 省份 city 城市 network 网络方式 source 来源方式 browserType 浏览器类型 entryTime 进入网站时间 leaveTime 离开网站时间 userID 用户的ID
我们需要在原有点击流日志字段基础上,再添加以下字段: 字段名 说明 count 用户访问的次数 timestamp 用户访问的时间 address 国家省份城市(拼接) yearMonth 年月 yearMonthDay 年月日 yearMonthDayHour 年月日时 isNew 是否为访问某个频道的新用户 isHourNew 在某一小时内是否为某个频道的新用户 isDayNew 在某一天是否为某个频道的新用户 isMonthNew 在某一个月是否为某个频道的新用户
我们不能直接从点击流日志中,直接计算得到上述后4个字段的值。而是需要在hbase中有一个历史记录表,来保存用户的历史访问状态才能计算得到。 该历史记录表(user_history表)结构如下: 列名 说明 示例 rowkey 用户ID:频道ID 10:220 userid 用户ID 10 channelid 频道ID 220 lastVisitedTime 最后访问时间(时间戳) 1553653555
object ProcessTask {
def process(watermakerDS: DataStream[Message]): DataStream[ClickLogWide] = {
import org.apache.flink.api.scala._
val clickLogWideDS: DataStream[ClickLogWide] = watermakerDS.map(message => {
val address: String = message.clickLog.country + message.clickLog.province + message.clickLog.city
val yearMonth: String = TimeUtil.parseTime(message.timeStamp, "yyyyMM")
val yearMonthDay: String = TimeUtil.parseTime(message.timeStamp, "yyyyMMdd")
val yearMonthDayHour: String = TimeUtil.parseTime(message.timeStamp, "yyyyMMddHH")
val (isNew, isHourNew, isDayNew, isMonthNew) = getIsNew(message)
val clickLogWide = ClickLogWide(
message.clickLog.channelID,
message.clickLog.categoryID,
message.clickLog.produceID,
message.clickLog.country,
message.clickLog.province,
message.clickLog.city,
message.clickLog.network,
message.clickLog.source,
message.clickLog.browserType,
message.clickLog.entryTime,
message.clickLog.leaveTime,
message.clickLog.userID,
message.count,
message.timeStamp,
address,
yearMonth,
yearMonthDay,
yearMonthDayHour,
isNew,
isHourNew,
isDayNew,
isMonthNew
)
clickLogWide
})
clickLogWideDS
}
def getIsNew(msg: Message):(Int,Int,Int,Int) = {
var isNew: Int = 0
var isHourNew: Int = 0
var isDayNew: Int = 0
var isMonthNew: Int = 0
val tableName = "user_history"
val columnFamily = "info"
val rowkey = msg.clickLog.userID + ":" + msg.clickLog.channelID
val queryColumn = "lastVisitTime"
val lastVisitTime: String = HBaseUtil.getData(tableName,rowkey,columnFamily,queryColumn)
if(StringUtils.isBlank(lastVisitTime)){
isNew = 1
isHourNew = 1
isDayNew = 1
isMonthNew = 1
}else{
isNew = 0
isHourNew = TimeUtil.compareDate(msg.timeStamp,lastVisitTime.toLong,"yyyyMMddHH")
isDayNew = TimeUtil.compareDate(msg.timeStamp,lastVisitTime.toLong,"yyyyMMdd")
isMonthNew = TimeUtil.compareDate(msg.timeStamp,lastVisitTime.toLong,"yyyyMM")
}
HBaseUtil.putData(tableName,rowkey,columnFamily,queryColumn,msg.timeStamp.toString)
(isNew,isHourNew,isDayNew,isMonthNew)
}
}
2.3.3 实时频道热点
频道热点,就是要统计频道被访问(点击)的数量。 分析得到以下的数据: 需要将历史的点击数据进行累加
object ChannelRealHotTask {
case class ChannelRealHot(channelId: String, visited: Long)
def process(clickLogWideDS: DataStream[ClickLogWide]) = {
import org.apache.flink.api.scala._
val result: DataStream[ChannelRealHot] = clickLogWideDS
.map(clickLogWide => {
ChannelRealHot(clickLogWide.channelID, clickLogWide.count)
})
.keyBy(_.channelId)
.timeWindow(Time.seconds(10))
.reduce((c1, c2) => {
ChannelRealHot(c2.channelId, c1.visited + c2.visited)
})
result.addSink(new SinkFunction[ChannelRealHot] {
override def invoke(value: ChannelRealHot, context: SinkFunction.Context): Unit = {
val tableName = "channel_realhot"
val columnFamily = "info"
val queryColumn = "visited"
val rowkey = value.channelId
val historyValueStr: String = HBaseUtil.getData(tableName, rowkey, columnFamily, queryColumn)
var currentFinalResult = 0L
if (StringUtils.isBlank(historyValueStr)) {
currentFinalResult = value.visited
} else {
currentFinalResult = value.visited + historyValueStr.toLong
}
HBaseUtil.putData(tableName, rowkey, columnFamily, queryColumn, currentFinalResult.toString)
}
})
}
}
2.3.4 实时频道PV/UV
PV(访问量) 即Page View,页面刷新一次算一次。 UV(独立访客) 即Unique Visitor,指定时间内相同的客户端只被计算一次
统计分析后得到的数据如下所示:
object ChannelRealPvUvTask {
case class ChannelRealPvUv(channelId: String, monthDayHour: String, pv: Long, uv: Long)
def process(clickLogWideDS: DataStream[ClickLogWide]) = {
import org.apache.flink.api.scala._
val result: DataStream[ChannelRealPvUv] = clickLogWideDS.flatMap(clickLogWide => {
List(
ChannelRealPvUv(clickLogWide.channelID, clickLogWide.yearMonth, clickLogWide.count, clickLogWide.isMonthNew),
ChannelRealPvUv(clickLogWide.channelID, clickLogWide.yearMonthDay, clickLogWide.count, clickLogWide.isDayNew),
ChannelRealPvUv(clickLogWide.channelID, clickLogWide.yearMonthDayHour, clickLogWide.count, clickLogWide.isHourNew)
)
})
.keyBy("channelId", "monthDayHour")
.timeWindow(Time.seconds(10))
.reduce((c1, c2) => {
ChannelRealPvUv(c2.channelId, c2.monthDayHour, c1.pv + c2.pv, c1.uv + c2.uv)
})
result.addSink(new SinkFunction[ChannelRealPvUv] {
override def invoke(value: ChannelRealPvUv, context: SinkFunction.Context): Unit = {
val tableName = "channel_pvuv"
val columnFamily = "info"
val queryColumn1 = "pv"
val queryColumn2 = "uv"
val rowkey = value.channelId + ":" + value.monthDayHour
val map: Map[String, String] = HBaseUtil.getMapData(tableName,rowkey,columnFamily,List(queryColumn1,queryColumn2))
val pvhistoryValueStr: String = map.getOrElse(queryColumn1,"0")
val uvhistoryValueStr: String = map.getOrElse(queryColumn2,"0")
val currentFinalPv = value.pv + pvhistoryValueStr.toLong
val currentFinalUv = value.uv + uvhistoryValueStr.toLong
HBaseUtil.putMapData(tableName,rowkey,columnFamily,
Map(
(queryColumn1,currentFinalPv),
(queryColumn2,currentFinalUv)
)
)
}
})
}
}
03 Py-Flink
3.1 环境准备
pip install apache-flink
需要在网络环境好的条件下安装,估计用时2小时左右,因为需要下载很多其他的依赖
3.2 官方文档
3.3 示例代码
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
def tutorial():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection(
collection=["hadoop spark flink","hadoop spark","hadoop"],
type_info=Types.STRING()
)
ds.print()
result = ds.flat_map(lambda line: line.split(" "), result_type=Types.STRING())\
.map(lambda word: (word, 1),output_type=Types.ROW([Types.STRING(), Types.INT()]))\
.key_by(lambda x: x[0],key_type_info=Types.STRING())\
.reduce(lambda a, b: a + b)
result.print()
result.add_sink(StreamingFileSink
.for_row_format('data/output/result1', SimpleStringEncoder())
.build())
env.execute("tutorial_job")
if __name__ == '__main__':
tutorial()
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
t_env.connect(FileSystem().path('data/input')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.execute_insert('mySink').wait()
04 文末
本文主要讲解了Flink的多语言开发的简单例子,谢谢大家的阅读,本文完!
|