Apache Spark 实时监控设置 自定义实现KafkaSink 以及如何通过业务场景来找到我们需要修改的位置
背景
一个特殊背景的甲方大数据计算平台。95%的 计算任务都是跑的Spark任务。现在需要我们扩充Spark的计算的实时运行指标监控功能。
1:Spark的监控指标介绍
相信从事大数据的小伙伴 或者 阅读过大数据生态系统的组件源代码的同学 都应该知道 HDFS,Hbase等Hadoop生态组件 都有自己的监控指标(Metrics)。 相同的Spark生态也是一样
2:Spark 阅读源码
PS: 相信大家都是有过Spark源码阅读的经验的 这里就不多废话了。
2.1: 首先进入到Spark的DAGSchedulerEventProcessLoop这个类中
方法onReceive 被调用来处理不同的Even
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
/**
* 这个 方法中会根据 event 类型不同去调用 case by case dagScheduler 的不同 handle....方法
*/
doOnReceive(event)
} finally {
timerContext.stop()
}
}
2.1.1 doOnReceive 方法
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
/**
* Job提交 事件 我们主要分析这个handle
*/
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler
.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler
.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) => dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled => dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) => val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)
case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message)
case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo)
case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task)
case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages => dagScheduler.resubmitFailedStages()
}
2.1.2:handleJobSubmitted 方法 (重点来了 这里就能看到Saprk的监控部分代码)
PS :因为这个方法的代码较长 并且都是一个切分stage的代码 跟本此讲述的事情无关 就先删除了
/**
* 1、stage切分
* 2、stage提交
*/
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int],
callSite: CallSite, listener: JobListener, properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
/**
* 1、Application
* 2、Job
* 3、Stage
* 一个Job有多个Stage,最后一个Stage叫做:ResultStage,前面的就叫做:ShuffleMapStage
* ShuffleMapStage 执行完了之后,数据可以被持久化或者shuffle给下一个stage
* ResultStage 执行完了之后,就按照程序的要求,把数据持久化,除非打印输出
* 4、Task
*/
} catch {
case e: BarrierJobSlotsNumberCheckFailed => logWarning(
s"The job $jobId requires to run a barrier stage that requires more slots " + "than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] {
override def apply(key: Int, value: Int): Int = value + 1
})
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(new Runnable {
override def run(): Unit = eventProcessLoop
.post(JobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties))
}, timeIntervalNumTasksCheck, TimeUnit.SECONDS)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
..............................略
// 这个代码我们点进去
listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
2.1.3: 进入到 以下的代码中 见名知意 我们可以大概的猜测到 是把 监控的信息 放入到一个队列档中
/** Post an event to all queues. */
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
// 这里我们 果真看到了 metrics
metrics.numEventsPosted.inc()
// If the event buffer is null, it means the bus has been started and we can avoid
// synchronization and post events directly to the queues. This should be the most
// common case during the life of the bus.
if (queuedEvents == null) {
postToQueues(event)
return
}
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
// calling start() picks up the new event.
synchronized {
if (!started.get()) {
queuedEvents += event
return
}
}
// If the bus was already started when the check above was made, just post directly to the
// queues.
postToQueues(event)
}
2.2 : 接下来 我们就要看看 是谁!在哪里调用了这个队列的东西
2.2.1: 阅读过Spark源码的小伙伴肯定知道 当我们写的代码 提交时 流程为 spark-submit脚本 提交 --》SparkSubmit 类中的main()方法 --》 然后调用 submit()–》 再调用 doRunMain() 方法 --》runMain() 方法
然后就会开始反射机制 创建除我们的类对象。调用main() 函数 那么到这里 就会开始 初始化我们的SparkContext对象
SparkContext 源码 因为这个类中会创建初始化许多Spark的组件 所以代码会很长 我们这里只分析 创建SaprkEnv环境的代码
private[spark] def createSparkEnv(conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = {
/**
* 创建 SparkEnv
*/
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
2.2.2: 我们跟进 就会到SparkEnv的
private def create(
conf: SparkConf, executorId: String, bindAddress: String, advertiseAddress: String, port: Option[Int], isLocal: Boolean,numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], listenerBus: LiveListenerBus = null,mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv={****}方法中
同样 这个方法中会创建 初始化许多组件 比如netty 通信组件 shuffle管理组件 权限组件 序列化组件 Block块管理组件等。 但是我们只关心 metricsSystem 这个有关监控的类
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
// 这里创建监控系统
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
// 这里创建监控系统
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
点击进入到creattMetricsSystem 最后会走到 MetricsConfig.initialize() 方法中
def initialize() {
// Add default properties in case there's no properties file
setDefaultProperties(properties)
loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
// Also look for the properties in provided Spark configuration
// 这里我们能知道 这就是Spark 进行配置的参数
// 我们可以通过官网查看 Spark给我们 提供了默认实现有
// ConsoleSink
// CSVSink
// MetricsServlet 等
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
case (k, v) if k.startsWith(prefix) =>
properties.setProperty(k.substring(prefix.length()), v)
case _ =>
}
// Now, let's populate a list of sub-properties per instance, instance being the prefix that
// appears before the first dot in the property name.
// Add to the sub-properties per instance, the default properties (those with prefix "*"), if
// they don't have that exact same sub-property already defined.
//
// For example, if properties has ("*.class"->"default_class", "*.path"->"default_path,
// "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be
// ("driver"->Map("path"->"driver_path", "class"->"default_class")
// Note how class got added to based on the default property, but path remained the same
// since "driver.path" already existed and took precedence over "*.path"
//
perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
(k, v) <- defaultSubProperties if (prop.get(k) == null)) {
prop.put(k, v)
}
}
}
现在我们查看 ConsoleSink
private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CONSOLE_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
override def report() {
reporter.report()
}
}
3: 我们现在就可以通过这例子 然后实现写入Kafka 然后 加上 StructStreaming+普罗米修斯时序数据库 来完成Spark 计算任务的监控
3.1:再相同的目录下 新建kafkaSink 实现特质Sink
private[spark] trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}
3.2 ConsoleSink的代码 我们可以看出 start()方法 是在注册进入监控系统的时候 做一些初始化的操作 report()方法才是真正的输出监控的操作 stop()为关闭的一些操作.
3.3 具体实现的代码
路径为:org.apache.spark.metrics.sink
private[spark] class KafkaSink(val props: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink with Logging{
// 信息的输出 周期
val KAFKA_KEY_PERIOD = "period"
// 这个根据自己的业务调整 这里就比较不优雅了 应当设置为可配置的
// 默认事件
val KAFKA_DEFAULT_PERIOD = 3
val KAFKA_KEY_UNIT = "unit"
val KAFKA_DEFAULT_UNIT = "SECONDS"
val KAFKA_TOPIC = "metrics"
val KAFKA_DEFAULT_TOPIC = "kafka-sink-topic"
val KAFAK_BROKERS = "kafka-brokers"
val KAFAK_DEFAULT_BROKERS = "XXX:9092"
// 初始化我们输出的Kafka Topic
val TOPIC = Option(property.getProperty(KAFKA_TOPIC)).getOrElse(KAFKA_DEFAULT_TOPIC)
val BROKERS = Option(property.getProperty(KAFAK_BROKERS)).getOrElse(throw new IllegalStateException("请配置Kafka集群地址!"))
private val kafkaProducerConfig = new Properties()
kafkaProducerConfig.put("bootstrap.servers",BROKERS)
kafkaProducerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProducerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
private val producer = new KafkaProducer[String, String](kafkaProducerConfig)
// 保证功能和 业务的分离 我们讲 具体的输出监控信息的逻辑分离到 另外一个
// 类当中
private val reporter: MyKafkaReporter = MyKafkaReporter.forRegistry(registry)
.topic(TOPIC)
.build(producer)
val pollPeriod = Option(property.getProperty(KAFKA_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => KAFKA_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(KAFKA_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(KAFKA_DEFAULT_UNIT)
}
override def start(): Unit = {
log.info("开始监控信息输出到Kafka业务代码........")
reporter.start(pollPeriod, pollUnit)
}
override def stop(): Unit = {
log.info("停止监控信息输出到Kafka业务代码........")
reporter.stop()
producer.close()
}
override def report(): Unit = {
log.info("开始信息的输出..........")
reporter.report()
}
}
因为具体的信息涉密 所以 这里借鉴了 网络上的代码 就不做具体的注释了 大家可以仔细看看。
public class KafkaReporter extends ScheduledReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReporter.class);
public static KafkaReporter.Builder forRegistry(MetricRegistry registry) {
return new KafkaReporter.Builder(registry);
}
private KafkaProducer producer;
private Clock clock;
private String topic;
private KafkaReporter(MetricRegistry registry,
TimeUnit rateUnit,
TimeUnit durationUnit,
MetricFilter filter,
Clock clock,
String topic,
KafkaProducer producer) {
super(registry, "kafka-reporter", filter, rateUnit, durationUnit);
this.producer = producer;
this.topic = topic;
this.clock = clock;
}
@Override
public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
final long timestamp = TimeUnit.MILLISECONDS.toSeconds(clock.getTime());
// Gauge
for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
reportGauge(timestamp,entry.getKey(), entry.getValue());
}
// Histogram
// for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
// reportHistogram(timestamp, entry.getKey(), entry.getValue());
// }
}
private void reportGauge(long timestamp, String name, Gauge gauge) {
report(timestamp, name, gauge.getValue());
}
private void reportHistogram(long timestamp, String name, Histogram histogram) {
final Snapshot snapshot = histogram.getSnapshot();
report(timestamp, name, snapshot.getMax());
}
private void report(long timestamp, String name, Object values) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("name",name);
jsonObject.put("timestamp",timestamp);
jsonObject.put("value",values);
producer.send(new ProducerRecord(topic,name, jsonObject.toJSONString()));
}
public static class Builder {
private final MetricRegistry registry;
private TimeUnit rateUnit;
private TimeUnit durationUnit;
private MetricFilter filter;
private Clock clock;
private String topic;
private Builder(MetricRegistry registry) {
this.registry = registry;
this.rateUnit = TimeUnit.SECONDS;
this.durationUnit = TimeUnit.MILLISECONDS;
this.filter = MetricFilter.ALL;
this.clock = Clock.defaultClock();
}
/**
* Convert rates to the given time unit.
*
* @param rateUnit a unit of time
* @return {@code this}
*/
public KafkaReporter.Builder convertRatesTo(TimeUnit rateUnit) {
this.rateUnit = rateUnit;
return this;
}
/**
* Convert durations to the given time unit.
*
* @param durationUnit a unit of time
* @return {@code this}
*/
public KafkaReporter.Builder convertDurationsTo(TimeUnit durationUnit) {
this.durationUnit = durationUnit;
return this;
}
/**
* Use the given {@link Clock} instance for the time.
*
* @param clock a {@link Clock} instance
* @return {@code this}
*/
public Builder withClock(Clock clock) {
this.clock = clock;
return this;
}
/**
* Only report metrics which match the given filter.
*
* @param filter a {@link MetricFilter}
* @return {@code this}
*/
public KafkaReporter.Builder filter(MetricFilter filter) {
this.filter = filter;
return this;
}
/**
* Only report metrics which match the given filter.
*
* @param topic a
* @return {@code this}
*/
public KafkaReporter.Builder topic(String topic) {
this.topic = topic;
return this;
}
/**
* Builds a {@link KafkaReporter} with the given properties, writing {@code .csv} files to the
* given directory.
*
* @return a {@link KafkaReporter}
*/
public KafkaReporter build(KafkaProducer producer) {
return new KafkaReporter(registry,
rateUnit,
durationUnit,
filter,
clock,
topic,
producer);
}
}
}
最后我们开始使用 我们的自定义的监控输出 在我们的conf/ 路径下的metrics.properties文件 添加配置 Spark 启动的时候就会自动的加载它
spark.metrics.conf.*.sink.kafka.class=org.apache.spark.metrics.sink.MyKafkaSink
spark.metrics.conf.*.sink.kafka.kafka-brokers=IP:9092
|