因为 redis 的消费队列并没有提供多消费只消费一次的功能,如果想要实现多消费(实例)保证只消费一次,则需要自己在消费端去实现。 简单封装一下 Redisson 中的 RTopic 的监听方法:
public class RedisMessageUnrepeatable {
private static final String INDEX_SUFFIX = "_unrepeatable";
private static final String RW_SUFFIX = "_rwLock";
private final RedissonClient client;
private final String topic;
private final Codec codec;
private final RTopic rTopic;
private final RAtomicLong topicIndex;
private final RReadWriteLock rwLock;
public RedisMessageUnrepeatable(@NonNull RedissonClient client, @NonNull String topic, Codec codec) {
this.client = client;
this.topic = topic;
this.codec = codec;
this.rTopic = client.getTopic(topic, codec);
this.topicIndex = client.getAtomicLong(topic + INDEX_SUFFIX);
rwLock = client.getReadWriteLock(topic + RW_SUFFIX);
}
public <M> void addListener(Class<M> type, MessageListener<M> listener) {
rwLock.writeLock().lock();
try {
final AtomicLong currentIndex = new AtomicLong(topicIndex.get());
rTopic.addListener(type, (charSequence, s) -> {
rwLock.readLock().lock();
try {
if (!topicIndex.compareAndSet(currentIndex.get(),
currentIndex.incrementAndGet())) {
return;
}
listener.onMessage(charSequence, s);
}finally {
rwLock.readLock().unlock();
}
});
}finally {
rwLock.writeLock().unlock();
}
}
public RedissonClient getClient() {
return client;
}
public String getTopic() {
return topic;
}
public Codec getCodec() {
return codec;
}
}
简单使用:
@Component
public class Test{
@Autowired
private RedissonClient client;
@PostConstruct
public void listener(){
RedisMessageUnrepeatable redisMq = new RedisMessageUnrepeatable(client, "test", StringCodec.INSTANCE);
redisMq.addListener(String.class, (t, m) -> System.out.printf("topic: %s, message: %s \n", t, m));
}
}
|