IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 分享一个使用 redisson 实现 redis 消费队列只消费一次的实现方法 -> 正文阅读

[大数据]分享一个使用 redisson 实现 redis 消费队列只消费一次的实现方法

因为 redis 的消费队列并没有提供多消费只消费一次的功能,如果想要实现多消费(实例)保证只消费一次,则需要自己在消费端去实现。
简单封装一下 Redisson 中的 RTopic 的监听方法:

public class RedisMessageUnrepeatable {

    private static final String INDEX_SUFFIX = "_unrepeatable";

    private static final String RW_SUFFIX = "_rwLock";
	
    private final RedissonClient client;

	// 主题名
    private final String topic;
	
	// 序列化方式
    private final Codec codec;

    private final RTopic rTopic;

    private final RAtomicLong topicIndex;

    private final RReadWriteLock rwLock;

    public RedisMessageUnrepeatable(@NonNull RedissonClient client, @NonNull String topic, Codec codec) {
        this.client = client;
        this.topic = topic;
        this.codec = codec;
        this.rTopic = client.getTopic(topic, codec);
        this.topicIndex = client.getAtomicLong(topic + INDEX_SUFFIX);
        rwLock = client.getReadWriteLock(topic + RW_SUFFIX);
    }

    public <M> void addListener(Class<M> type, MessageListener<M> listener) {
        // 获取写锁, 保证在注册新的消费者的时候 topicIndex 不会被修改.
        // 若在注册的时候 topicIndex 在改变, 可能导致与 currentIndex 不一致,
        // 从而导致 currentIndex 永远比 topicIndex 小, 造成该消费者永远消费不到消息.
        rwLock.writeLock().lock();
        try {
            final AtomicLong currentIndex = new AtomicLong(topicIndex.get());
            rTopic.addListener(type, (charSequence, s) -> {
                // 获取读锁, 读锁是共享锁.
                rwLock.readLock().lock();
                try {
                    if (!topicIndex.compareAndSet(currentIndex.get(),
                            currentIndex.incrementAndGet())) {
                        return;
                    }
                    listener.onMessage(charSequence, s);
                }finally {
                    rwLock.readLock().unlock();
                }
            });
        }finally {
            rwLock.writeLock().unlock();
        }
    }

    public RedissonClient getClient() {
        return client;
    }

    public String getTopic() {
        return topic;
    }

    public Codec getCodec() {
        return codec;
    }
}

简单使用:

@Component
public class Test{

	@Autowired
    private RedissonClient client;
	
	@PostConstruct
	public void listener(){
		RedisMessageUnrepeatable redisMq = new RedisMessageUnrepeatable(client, "test", StringCodec.INSTANCE);
		redisMq.addListener(String.class, (t, m) -> System.out.printf("topic: %s, message: %s \n", t, m));
	}
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-04 12:18:09  更:2022-04-04 12:20:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 16:00:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码