IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink自定义metrics监控kafka消费 -> 正文阅读

[大数据]flink自定义metrics监控kafka消费

一 背景

因为业务需求,要在flink中监控kafka消费的数据量以及积压情况,在网上找了很久没找到直接能用的代码。在这里把自己的实现记录一下。

有部分代码引用了:Flink监控:自定义消费延迟Metrics

二 实现

1.?CustomerJsonDeserialization

import org.apache.commons.lang3.ThreadUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;

import java.lang.reflect.Field;
import java.util.Set;

/**
 * @Author: Zhang Yuyao
 * @Email: 
 * @Description:
 * @Date 2021-08-09
 */
public class CustomerJsonDeserialization extends JSONKeyValueDeserializationSchema {
    private String DT_TOPIC_GROUP = "topic";
    private String DT_PARTITION_GROUP = "partition";
    private String DT_TOPIC_PARTITION_LAG_GAUGE = "topic_partition_lag";
    private Counter inCounter;
    private Counter outCounter;
    boolean firstMsg = true;

    private AbstractFetcher<Row, ?> fetcher;
    private ObjectMapper mapper;
    private final boolean includeMetadata;
    private RuntimeContext runtimeContext;
    private String confName;

    public CustomerJsonDeserialization(boolean includeMetadata, String confName) {
        super(includeMetadata);
        this.includeMetadata = includeMetadata;
        this.confName = confName;
    }

    public void initMetric(){
        this.inCounter =
                runtimeContext.getMetricGroup()
                        .addGroup("web-streaming")
                        .counter(this.confName+"-"+"in-count");

        this.outCounter =
                runtimeContext.getMetricGroup().addGroup("web-streaming").counter(this.confName+"-"+"out-count");

    }

    @Override
    public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        inCounter.inc();

        if(firstMsg){
            // 只有在第一条数据到来的时候,才会调用该方法
            registerPtMetric(fetcher);

            firstMsg = false;
        }
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (record.key() != null) {
            node.set("key", mapper.readValue(record.key(), JsonNode.class));
        }
        if (record.value() != null) {
            node.set("value", mapper.readValue(record.value(), JsonNode.class));
        }
        if (includeMetadata) {
            node.putObject("metadata")
                    .put("offset", record.offset())
                    .put("topic", record.topic())
                    .put("partition", record.partition());
        }
        outCounter.inc();
        return node;
    }

    public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
        this.fetcher = fetcher;
    }

    public void setRuntimeContext(RuntimeContext runtimeContext){
        this.runtimeContext = runtimeContext;
    }

    protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exception {
        // 通过反射获取fetcher中的kafka消费者等信息, 反射获取属性路径如下:
        // Flink: Fetcher -> KafkaConsumerThread -> KafkaConsumer ->
        // Kafka Consumer: KafkaConsumer -> SubscriptionState -> partitionLag()
        Field consumerThreadField = ((KafkaFetcher) fetcher).getClass().getDeclaredField("consumerThread");

        consumerThreadField.setAccessible(true);
        KafkaConsumerThread consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);

        Field hasAssignedPartitionsField = consumerThread.getClass().getDeclaredField("hasAssignedPartitions");
        hasAssignedPartitionsField.setAccessible(true);

        boolean hasAssignedPartitions = (boolean) hasAssignedPartitionsField.get(consumerThread);
        if(!hasAssignedPartitions){
            throw new RuntimeException("wait 50 secs, but not assignedPartitions");
        }
        Field consumerField = consumerThread.getClass().getDeclaredField("consumer");
        consumerField.setAccessible(true);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) consumerField.get(consumerThread);
        Field subscriptionStateField = kafkaConsumer.getClass().getDeclaredField("subscriptions");
        subscriptionStateField.setAccessible(true);
        SubscriptionState subscriptionState = (SubscriptionState) subscriptionStateField.get(kafkaConsumer);
        Set<TopicPartition> assignedPartitions = subscriptionState.assignedPartitions();
        for(TopicPartition topicPartition : assignedPartitions){
            runtimeContext.getMetricGroup().addGroup(DT_TOPIC_GROUP, topicPartition.topic())
                    .addGroup(DT_PARTITION_GROUP, topicPartition.partition() + "")
.gauge(DT_TOPIC_PARTITION_LAG_GAUGE, (Gauge<Long>) () -> subscriptionState.partitionLag(topicPartition, IsolationLevel.READ_UNCOMMITTED));

        }
    }
}

2.?CustomerKafkaConsumer

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.util.SerializedValue;

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @Author: Zhang Yuyao
 * @Email: 
 * @Description:
 * @Date 2021-08-09
 */
public class CustomerKafkaConsumer extends FlinkKafkaConsumer {
    private static final long serialVersionUID = -1234567890L;
    private CustomerJsonDeserialization customerJsonDeserialization;

    public CustomerKafkaConsumer(List topics, KafkaDeserializationSchema deserializer, Properties props) {
        super(topics, deserializer, props);
        this.customerJsonDeserialization = (CustomerJsonDeserialization) deserializer;
    }

    @Override
    public void run(SourceContext sourceContext) throws Exception {
        customerJsonDeserialization.setRuntimeContext(getRuntimeContext());
        customerJsonDeserialization.initMetric();
        super.run(sourceContext);
    }

    @Override
    protected AbstractFetcher createFetcher(SourceContext sourceContext, Map assignedPartitionsWithInitialOffsets, SerializedValue watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
        AbstractFetcher fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
        customerJsonDeserialization.setFetcher(fetcher);
        return fetcher;
    }
}

三 使用

DataStream dataStream = env.addSource(new CustomerKafkaConsumer(topics, new CustomerJsonDeserialization(false, this.name), props).setStartFromEarliest());

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-10 23:08:34  更:2021-08-10 23:09:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 21:14:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码