package com.wang;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.wang.conf.Config;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* @author username
* @date 2021/11/7 21:06
* @description:TODO
* @since 11
*/
@Component
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = Config.getChannel();
//处理并发的有序map集合,ConcurrentHashMap是无序的
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
//开启发送确认模式
channel.confirmSelect();
ConfirmCallback confirmCallback =(deliveryTag, multiple )->{
System.out.println(deliveryTag+"success received");
//返回的消息有时候是批量确认的,批量确认需要判断!应该可以关闭批量确认的,但是不知道如何关闭
if (multiple){
ConcurrentNavigableMap<Long, String> confirmed = map.headMap(deliveryTag,true);
confirmed.clear();
}else {
map.remove(deliveryTag);
}
if (map.size()==0){
System.out.println("success send all message");
}
};
ConfirmCallback confirmCallback1 = (deliveryTag, multiple)->{
System.out.println(deliveryTag+"failed received");
};
channel.addConfirmListener(confirmCallback,confirmCallback1);
long begin = System.currentTimeMillis();
try {
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 0; i < 100 ; i++) {
String mes = "hello world"+i;
channel.basicPublish("",QUEUE_NAME,null,mes.getBytes());
//channel.getNextPublishSeqNo()获得的是下一个发布的序列号,当前序列号需要减一!
map.put(channel.getNextPublishSeqNo()-1,mes);
}
long end = System.currentTimeMillis();
System.out.println("time:"+(begin-end));
} catch (IOException e) {
e.printStackTrace();
}
}
}
? ? ? ? 采用生产者创建监听器方式监听传来的异步确认消息
|