| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> kafka produce 消息分配原则-思考指定key -> 正文阅读 |
|
[大数据]kafka produce 消息分配原则-思考指定key |
我们向kafka里发送消息的时候,kafka有多个分区,是如何将数以百万的消息发送到各个分区的,如果其中的分区节点出现了问题?这个时候又会如何? 当我们在new一个produceRecord的时候,最常见的是直接 topic+value 还有一种是topic+key+value 查源码kafkaProduce.java里 这三行代码决定了分配的分区在哪里? 前面两行都相似 byte[] serializedKey = valueSerializer.serialize(record.topic(), record.headers(), record.key()); byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); 都是通过序列化 得到一个byte数组,那我们看下这个序列化怎么计算的? head没用,不管 StringSerializer.java里? topic这个值也没用,那么制用看key和value了 ?发现直接就是string.getBytes 由此可以知道 serializedKey=key.getBytes serializedKey=value.getBytes 再看? int partition = partition(record, serializedKey, serializedValue, cluster); 到? private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } ?partition=null 继续看partitioner.patition方法 那这个partitioner是啥?就是我最上面的截图的分区器 ?最关键的代码来了 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { ????????//获取了所有的分区个数 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { //如果key为null int nextValue = nextValue(topic); // 这个nextvalue方法是根据topic名称随机生成一个值 123456 第二次调用+1变成123457 第三次+1变成123458 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); //获取可用的分区个数 if (availablePartitions.size() > 0) { ???????? //分配到可用的分区上 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { ? //所有分区都不可用那么就随便分配了 // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { //指明了根据key的hash值取分到一个区 ?????????????//注意这里和上面不同,上面会获取了所有的可用分区,这里还是按照所有分区分配 // hash the keyBytes to choose a partition ????????// Utils.murmur2(keyBytes) 这个b方法就是 你随便传个topic 无论传多少次都会返回一个相同的数,传不同的返回不同的数,就是和hash差不多不知道为啥这么麻烦。但是这个可以控制,比如我知道cc=24 我知道分区数=3 那么我知道他肯定会分配到0号分区。 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } 看到这大家伙都明白了把。 如果我们不指定key,那么kafka比较智能,获取可用分区,然后自己给这个topic搞个随机值然后就会按照分区轮询。 开始思考思考1 如果我这边启动produce 发送一条消息到partition1.然后停掉,再重启produce再发一条有可能还是会到1号分区,依次类推大概率导致最后分区offset不一样,不过这概率小之又小,但是有时候我们offset有误差,如下图。? 思考2? 为什么要指定key,指定key有哪些好处? 好处:我知道这条数据会分配到哪个分区,知道了又能怎么样呢?? 比如如下场景。 我们设定好分区数3? 其中已知 cc1%3=0?cc2%3=1 cc2%3=2 双十一的时候阿里使用kafka记录消费数据,比如用户下一个单就会朝kafka发送一条消息 {"id":xx,tyoe:"male","amount":1000} {"id":xx,tyoe:"female","amount":10000} {"id":xx,tyoe:"children","amount":100} 在向kafka发送消息的时候,那么就是 key=cc1 value=male key=cc2 value=female key=cc3 value=children 这样有什么好处? 比如说我要实时计算 女性消费了多少数据,那么我只需要消费这个topic的 1号分区,就可以计算出女性的消费额? 如果不这么做,还是采用随机的分配原则,那么我们还要订阅整个topic,消费全部数据 consumer.subscribe(Lists.newArrayList(topic)); 然后比如flink.addsource(kafaConsumer).filter(x->x.type=female).sum() 一个是消费所有数据一个是消费部分数据,明显减少了服务器和客户端的压力,我真是太聪明了,这都想到了。 这个时候又有杠精来了,女的消费能力强,男的消费一般,小孩穷b消费差,你这么做会导致我计算总销售额的时候出现问题,有的消费者消费压力大。 确实按照上述,存在有的topic数据多 有的topic少。 但是我马上想到解决办法了。 比如 消费能力来算 女:男:小孩=6:3:1 那么我准备好10个分区 女的放在123456? 小孩放在0? 男的放在789 list1[ cc1 cc2 cc3 cc4 cc5 cc6 ]%10= 1 2 3 4 5 6?? list2 [cc0]%10=0 list3 [cc7 cc8 cc9]%10=7 8 9? 那么 key=list1.get(new Random().nextInt(6))? value=male 就会随机进入到分区(123456) key=cc0?value=children 就会指定分区进入 0 key=list2.get(new Random().nextInt(3)) value=female 就会随机进入到分区(789) 这样也会减少40%的数据读取 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/16 4:52:28- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |