1、项目介绍
由于上一个文档已经介绍了这个项目名。这里我就单独介绍一下这个文档主类。该文档主要是数据的主入口。同时也是可以熟悉整个代码的处理流程。 1、用户的操作日志数据(埋点数据),发送至kafka 。 2、运营人员在大数据平台配置好规则(如新用户,浏览了某一个页面…),存入mysql。 3、Flink代码定时(新增规则也能及时加载)加载mysql规则,根据规则处理日志。 4、将满足规则的数据存入ES(clickhouse)中。 5、Flink同时在根据mysql定义的规则处理数据(如新用户,浏览…),同时需要结合ES(clickhouse)查询。将满足要求的用户打上标签(特定规则有特定的标签)存入hbase中。
6、搭建API接口,开放给其他平台使用。 7、整个流程就是加载规则和处理规则,存入满足规则的用户,打上标签。
2、主类流程图
先不扯其他的,代码先上。后面在详细介绍。
3、主类代码
代码
package com.task
import java.text.SimpleDateFormat
import java.util.Locale
import java.util.concurrent.TimeUnit
import com.bean.{BuriedPointDetailBean, UserLastTimeBean}
import com.conf.Constants.{ENV, PARALLELISM, baseConf}
import com.conf.{BaseConf, Constants, LocalConf, OnlineConf, TestConf}
import com.func.{BroadcastProcessFunc, BroadcastProcessRuleFunc, ElasticsearchSinkFunc, HbaseSinkFunc, MysqlSourceFunc, ProcessETLWindowFunc}
import com.utils.StringUtils
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.elasticsearch6.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClientBuilder
import scala.collection.immutable._
object RealTimeLabel {
System.setProperty("HADOOP_USER_NAME", "root")
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000L)
val run_env: String = params.get(ENV)
var conf: BaseConf = null
if (StringUtils.isNotEmpty(run_env)) {
if ("online".equals(run_env)) {
println("online init...")
conf = new OnlineConf
} else {
println("test init...")
conf = new TestConf
}
} else {
println("local init...")
conf = new LocalConf
}
Constants.initConf(conf)
val parallelism: Int = params.getInt(PARALLELISM, 1)
env.setParallelism(parallelism)
val properties = params.getProperties
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
properties.setProperty(conf.BOOTSTRAP_SERVERS, conf.bootstrap_servers_value)
properties.setProperty(conf.GROUP_ID, conf.group_id_label_platform)
properties.setProperty(conf.ENABLE_AUTO_COMMIT, conf.enable_auto_commit_value)
properties.setProperty(conf.AUTO_COMMIT_INTERVAL_MS, conf.auto_commit_interval_ms_value)
properties.setProperty(conf.AUTO_OFFSET_RESET, conf.auto_offset_reset_value)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,
org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES),
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)))
env.setStateBackend(new FsStateBackend(conf.checkPointPath, true))
val etlRule = new MapStateDescriptor[String, Tuple4[String, String, String, String]]("config", classOf[String], classOf[Tuple4[String, String, String, String]])
val mysqlSource = env.addSource(new MysqlSourceFunc(baseConf))
val mysqlBroadcast = mysqlSource.broadcast(etlRule)
val dwdEventLogTopic: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](conf.product_kafka_dwd_topic, new SimpleStringSchema(), properties)
val kafkaDataStream: DataStream[BuriedPointDetailBean] = env.addSource(dwdEventLogTopic)
.map(item => BuriedPointDetailBean(item))
.setParallelism(parallelism)
val connMysql: DataStream[BuriedPointDetailBean] = kafkaDataStream.connect(mysqlBroadcast)
.process(new BroadcastProcessFunc())
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost(conf.ES_HOST, 9200, "http"))
val esSinkBuilder: ElasticsearchSink.Builder[BuriedPointDetailBean] = new ElasticsearchSink.Builder[BuriedPointDetailBean](
httpHosts,
new ElasticsearchSinkFunc(conf.ES_INDEX, conf.ES_TYPE)
)
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.setRestClientFactory(
new RestClientFactory {
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
val credentialsProvider = new BasicCredentialsProvider
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(conf.ES_USERNAME, conf.ES_PASSWORD))
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
}
})
}
})
connMysql.addSink(esSinkBuilder.build).name("sinkES")
val totalStream = connMysql
.keyBy(_.event_id)
.timeWindow(Time.seconds(5))
.process(new ProcessETLWindowFunc)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[UserLastTimeBean](Time.minutes(1)) {
override def extractTimestamp(element: UserLastTimeBean): Long = {
val fm = new SimpleDateFormat("yyyy-MM-dd HH:MM:SS.sss", Locale.ENGLISH)
val dt2 = fm.parse(element.event_time)
dt2.getTime
}
}).setParallelism(parallelism)
val hbaseStream = totalStream.connect(mysqlBroadcast)
.process(new BroadcastProcessRuleFunc(conf))
hbaseStream.addSink(new HbaseSinkFunc(conf)).name("sinkHBase")
env.execute("real_time_label")
}
}
4、主类介绍
(1)通过传参来区分测试环境与实际生产。
ParameterTool.fromArgs(args).get(ENV)
然后初始化类。在初始化中配置不同环境代码。
(2)构建实时环境
StreamExecutionEnvironment.getExecutionEnvironment
通过这个类设置配置,我在这里就不介绍每一个设置的效果
(3)配置kafka源数据
new FlinkKafkaConsumer011[String](conf.product_kafka_dwd_topic, new SimpleStringSchema(), properties)
然后通过上面env.addSource() 一起配置源数据source
(4)mysql源数据 主要是为了加载过滤配置,和上一个文档讲解的一样,通过kafka 和 mysql 关联。
(5)配置ES sink 如何配置ES的配置可参考 Flink官网。 存入ES主要是为了查询数据核对数据,以及配合ES进行一些聚合查询。
addSink(esSinkBuilder.build).name("sinkES")
(6) ETL逻辑处理 通过mysql加载的规则进行处理。所以需要processFunction
(7)将处理结果存入hbase 存入hbase主要是给下游进行实时api查询。
addSink(new HbaseSinkFunc(conf)).name("sinkHBase")
5、传送门
下面在逐个分享各个类的代码。 持续更新
|