IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 工作中关于kafka的一个奇葩问题 -> 正文阅读

[大数据]工作中关于kafka的一个奇葩问题

kafka的一个奇葩问题

相信大部分人都使用过kafka吧!

我主导的业务场景中,通常根据业务类型分为两大类:
1、 系统消息(不是很频繁的那种)
2、 业务消息(吞吐大)

至少为何这么分, 我相信有编程洁癖的程序不会问, 对, 我就是!

层次化, 我司是公用一个的kafka集群(100多台), 消息满天飞, 我想不用解释为什么了。

进入主题:

那么问题来了, 在共用消息主题的时候, 通常都会设置key来区别不同的消息体。

生产者指定key–>hash(key)%/ numPartitions=分区

附上product代码:

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

但是! 但是~ 奇怪了~~
conusmer 却没有根据key获取分区的代码,反正我没找到,有找到的朋友告诉我····

至于作者为何这么设计, 个人觉得:
1、 为何用字符串key(为了业务中见名知意)
2、 还是跟业务场景有关, 那就是hash 冲突的问题,从作者代码的角度, 作者是希望我们指定具体的分区数
2.1, 因为一个好的hash函数是要花功夫的
2.2, hash函数执行的时间复杂度,mq要尽可能的保持高吞吐量,要排除一切干扰因素(个人在业务中使用了hashmap的hash函数),还不如直接上具体分区数来得痛快, 因为具体的分区我事先是知道的

好了, 除了以上的解决办法, 还有:

1、 自定义分区策略(重在hash函数的设计,跟product保持统一), 分布式场景中很头痛, 不同服务由不同的部分维护, 需要上下打通。。。。哈哈哈~ 烦烦, 人家不听你的

实现这个接口就可以了:
org.apache.kafka.clients.producer.Partitioner

2、使用它自己默认的分区策略, 但是那个序列化工具又是很头痛。
附上consumer代码:

----
public BaseKafkaConsumer(String name, String topic, String configPath, String userPrincipal, List<String> keys) {
        super(name, true);
        this.topic = topic;
        this.subscribePartitions = new ArrayList<>(5);
        SecCryptUtil256.securityPrepareForKafka(userPrincipal, configPath);
        this.consumer = new KafkaConsumer<>(initConsumerProps(configPath));

        if (!keys.isEmpty()) {
            List<PartitionInfo> topicPartitions = this.consumer.partitionsFor(topic);
            if (null == topicPartitions || topicPartitions.isEmpty()) {
                log.warn("topic【{}】 partitions is empty, keys ={}", topic, keys.toArray());
                return;
            }
            int numPartitions = topicPartitions.size();
            StringSerializer stringSerializer = new StringSerializer();
            for (String key : keys) {
                byte[] keyBytes = stringSerializer.serialize(topic, key);
                // 跟product保持统一
                int Partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
                this.subscribePartitions.add(new TopicPartition(topic, Partition));
            }
        }
    }

贴上自带的序列化工作, 性能垃圾的一批, 大家可以测试一下:

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);//看看这是什么鬼, 在大数据环境整洁给零分。
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

好! 今天的分享到这里, 下期再见~~

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-08 11:24:53  更:2021-08-08 11:26:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 15:33:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码