| RabbitMQ进阶(二)消息确认机制RabbitMQ提供了transaction、confirm两种消息确认机制。transaction即事务机制,手动提交和回滚;confirm机制提供了Confirmlistener和waitForConfirms两种方式。confirm机制效率明显会高于transaction,但是后者的优势在于强一致性。1.消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。
 2. 为了确认消息是否发送到queue,应该在发送消息中启用参数mandatory=true,使用ReturnListerner接受未被发送成功的消息
 3. 接下来需要确认消息是否被有效消费,publisher未提供监听事件,但是提供了应答机制来保证消息被成功消费,应答方式:
 Channel.basicAck(用于肯定确认)
 RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
 Channel.basicNack(用于否定确认)
 Channel.basicReject(用于否定确认)
 与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了
 basicRecover:消息重新进入队列,requeue,发给新的consummer,false发送给相同的consumer
 同步public class TransactionalSend {
    private final static String EXCHANGE_NAME = "publisherconfirm-exchange";
    public static void execute(String host, String userName, String password, String routingKey, int num) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(userName);
        factory.setPassword(password);
        Connection connection = null;
        Channel channel = null;
        try {
            //建立TCP连接
            connection = factory.newConnection();
            //开启信道
            channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String message = "TransactionSend!" + System.currentTimeMillis();
            try {
                //开启事务
                channel.txSelect();
                while (num-- > 0) {
                    channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                    System.out.println("sent+[" + num + "]'" + message + "'");
                }
                channel.txCommit();
            } catch (IOException e) {
                e.printStackTrace();
                channel.txRollback();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        //测试线程池
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        String rabbitmq_host="";
        String rabbitmq_user="moer";
        String rabbitmq_pwd="123456";
        String routingkey="publisher-confirm";
        executorService.submit(()->{
           TransactionalSend.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey,1);
        });
    }
}
 异步处理public class AsynConfirmSend {
   protected final static String EXCHANGE_NAME = "publisherconfirm-exchange";
    public static void execute(String host, String userName, String password, String routingKey, int num) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(userName);
        factory.setPassword(password);
        Connection connection = null;
        Channel channel = null;
        try {
            //建立TCP连接
            connection = factory.newConnection();
            //开启信道
            channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String message = "AsynSend!" + System.currentTimeMillis();
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println("[Asyn]handleAck:deliveryTag="+l+"multiple"+b);
                }
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("[Asyn]handleNAck:deliveryTag="+l+"multiple"+b);
                }
            });
            channel.confirmSelect();
                while (num-- > 0) {
                    channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                    System.out.println("sent+[" + num + "]'" + message + "'");
                }
               Thread.sleep(1*1000);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        //测试线程池
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        String rabbitmq_host="";
        String rabbitmq_user="moer";
        String rabbitmq_pwd="123456";
        String routingkey="publisher-confirm2";
        executorService.submit(()->{
           AsynConfirmSend.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey,10);
        });
    }
}
 异步核心代码标注             channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println("[Asyn]handleAck:deliveryTag="+l+"multiple"+b);
                }
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("[Asyn]handleNAck:deliveryTag="+l+"multiple"+b);
                }
            });
            channel.confirmSelect();
 异步消息确认public class ConsumerConfiremRecv {
    private  final static String QUEUE_NAME="consumerconfirm";
    private final static String EXCHANGE_NAME=AsynConfirmSend.EXCHANGE_NAME;
    public static void execute(String host, String userName, String password, String routingKey) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(userName);
        factory.setPassword(password);
        Connection connection = null;
        try {
            //建立TCP连接
            connection = factory.newConnection();
            //在TCP连接的基础上创建一个信道
            final Channel channel = connection.createChannel();
            //声明一个持久话队列
            //queueDeclare(名字,是否持久化,独占的queue, 不使用时是否自动删除,其他参数)
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            //绑定路由,同一个队列可以绑定多个值
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);
            System.out.println("[consumerconfirmRecv]waiting for messages.");
            Consumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message=new String(body,"UTF-8");
                    System.out.println("[consumerconfirmRecv]Received"+message);
                    //正向确认
                    channel.basicAck(envelope.getDeliveryTag(),true);
                    //消息否定确认
                    //channel.basicNack(envelope.getDeliveryTag(),true,false);
                }
            };
            //接受消息,设置非自动确认
           channel.basicConsume(QUEUE_NAME,true,consumer);
           Thread.sleep(5*1000);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                connection.close();
            }catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        //测试线程池
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        String rabbitmq_host="";
        String rabbitmq_user="moer";
        String rabbitmq_pwd="123456";
        String routingkey="publisher-confirm2";
        executorService.submit(()->{
            ConsumerConfiremRecv.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey);
        });
    }
}
 如果你有更多的案列需要实现,可以将rabbitmq的相关参数抽离成一个工具类,直接调用即可。 public class RabbitMqUtils {
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("moer");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
 Channel channel = RabbitMqUtils.getChannel();
 希望大家可以和我一起进步,提升自己永远没有错! |