public class ConfirmMessage {
public static final Integer MESSAGE_COUNT=1000;
public static void main(String[] args) throws Exception {
publishMessageAsync();
}
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> concurrentSkipListMap=new ConcurrentSkipListMap();
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
ConcurrentNavigableMap<Long, String> confirmed
= concurrentSkipListMap.headMap(deliveryTag);
if(multiple){
confirmed.clear();
}else {
confirmed.remove(deliveryTag);
}
System.out.println("确认的消息: " + deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("未确认的消息: " + deliveryTag);
};
channel.addConfirmListener(ackCallback,nackCallback);
for (int i=0;i<MESSAGE_COUNT;i++){
String message=i+"消息";
concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
channel.basicPublish("",Producer.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}
}
}
|