IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka produce 消息分配原则-思考指定key -> 正文阅读

[大数据]kafka produce 消息分配原则-思考指定key

我们向kafka里发送消息的时候,kafka有多个分区,是如何将数以百万的消息发送到各个分区的,如果其中的分区节点出现了问题?这个时候又会如何?

当我们在new一个produceRecord的时候,最常见的是直接 topic+value

还有一种是topic+key+value

查源码kafkaProduce.java里

这三行代码决定了分配的分区在哪里?

前面两行都相似

byte[] serializedKey = valueSerializer.serialize(record.topic(), record.headers(), record.key());

byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

都是通过序列化 得到一个byte数组,那我们看下这个序列化怎么计算的?

head没用,不管

StringSerializer.java里? topic这个值也没用,那么制用看key和value了

?发现直接就是string.getBytes

由此可以知道

serializedKey=key.getBytes

serializedKey=value.getBytes

再看?

int partition = partition(record, serializedKey, serializedValue, cluster);

到?

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

?partition=null 继续看partitioner.patition方法 那这个partitioner是啥?就是我最上面的截图的分区器

?最关键的代码来了

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
????????//获取了所有的分区个数
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {  //如果key为null
        int nextValue = nextValue(topic); // 这个nextvalue方法是根据topic名称随机生成一个值 123456 第二次调用+1变成123457 第三次+1变成123458
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); //获取可用的分区个数
        if (availablePartitions.size() > 0) {
????????    //分配到可用的分区上
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
?           //所有分区都不可用那么就随便分配了
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else { //指明了根据key的hash值取分到一个区 
?????????????//注意这里和上面不同,上面会获取了所有的可用分区,这里还是按照所有分区分配
        // hash the keyBytes to choose a partition
????????// Utils.murmur2(keyBytes) 这个b方法就是 你随便传个topic 无论传多少次都会返回一个相同的数,传不同的返回不同的数,就是和hash差不多不知道为啥这么麻烦。但是这个可以控制,比如我知道cc=24 我知道分区数=3 那么我知道他肯定会分配到0号分区。
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

看到这大家伙都明白了把。

如果我们不指定key,那么kafka比较智能,获取可用分区,然后自己给这个topic搞个随机值然后就会按照分区轮询。

开始思考

思考1

如果我这边启动produce 发送一条消息到partition1.然后停掉,再重启produce再发一条有可能还是会到1号分区,依次类推大概率导致最后分区offset不一样,不过这概率小之又小,但是有时候我们offset有误差,如下图。?

思考2?

为什么要指定key,指定key有哪些好处?

好处:我知道这条数据会分配到哪个分区,知道了又能怎么样呢??

比如如下场景。

我们设定好分区数3? 其中已知 cc1%3=0?cc2%3=1 cc2%3=2

双十一的时候阿里使用kafka记录消费数据,比如用户下一个单就会朝kafka发送一条消息

{"id":xx,tyoe:"male","amount":1000}

{"id":xx,tyoe:"female","amount":10000}

{"id":xx,tyoe:"children","amount":100}

在向kafka发送消息的时候,那么就是

key=cc1 value=male

key=cc2 value=female

key=cc3 value=children

这样有什么好处? 比如说我要实时计算

女性消费了多少数据,那么我只需要消费这个topic的 1号分区,就可以计算出女性的消费额?

如果不这么做,还是采用随机的分配原则,那么我们还要订阅整个topic,消费全部数据

consumer.subscribe(Lists.newArrayList(topic));

然后比如flink.addsource(kafaConsumer).filter(x->x.type=female).sum()

一个是消费所有数据一个是消费部分数据,明显减少了服务器和客户端的压力,我真是太聪明了,这都想到了。

这个时候又有杠精来了,女的消费能力强,男的消费一般,小孩穷b消费差,你这么做会导致我计算总销售额的时候出现问题,有的消费者消费压力大。

确实按照上述,存在有的topic数据多 有的topic少。

但是我马上想到解决办法了。

比如 消费能力来算 女:男:小孩=6:3:1

那么我准备好10个分区 女的放在123456? 小孩放在0? 男的放在789

list1[ cc1 cc2 cc3 cc4 cc5 cc6 ]%10= 1 2 3 4 5 6??

list2 [cc0]%10=0

list3 [cc7 cc8 cc9]%10=7 8 9?

那么

key=list1.get(new Random().nextInt(6))? value=male 就会随机进入到分区(123456)

key=cc0?value=children 就会指定分区进入 0

key=list2.get(new Random().nextInt(3)) value=female 就会随机进入到分区(789)

这样也会减少40%的数据读取

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-01 15:18:23  更:2022-06-01 15:21:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 20:24:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码