生产者:
package com.atguigu.util; /**
* Copyright (c) 牧原 All Rights Reserved
* <p>
* Project: Producer
* Package: com.atguigu.util
* Version: 1.0
* <p>
* Created by songquan on 2022/4/26 上午10:13
*/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.postgresql.core.QueryExecutor;
import org.postgresql.util.HostSpec;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
/**
* @topic:
* @desc:
* @author: songquan tel:18211850987
* @department:牧原食品-肉食总部数字化部-肉食总部大数据分析与应用科
* @datatime: 2022/4/26 上午10:13
*/
public class Producer {
//队列名称
public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.company.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.department.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.position.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.rank.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.employee.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.university.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.taginfo.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.district.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bank.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.category.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.stordoc.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.measdoc.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.mattaxes.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.customer.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.material.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bankaccbas.queue";
// public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.supplier.queue";
//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂,该连接工厂其实就对应着我们访问http://182.92.210.39:15672/网站之后的rabbitmq,从这个工厂里可以获取队列
ConnectionFactory factory = new ConnectionFactory() {
public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String s, String s1, Properties properties) throws SQLException {
return null;
}
};
//工厂IP连接RabbitMQ的队列
factory.setHost("10.106.11.37");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123456");
//创建连接
//Connection connection = factory.newConnection();
Connection connection = factory.newConnection();
//获取信道,通过这个信道可以连接交换机Exchange,然后再连接队列Queue
Channel channel = (connection).createChannel();
/**
* 生成一个队列,此队列中可以存放消息
*
* 1.队列名称
* 2.队列里面的消息是否持久化到磁盘中
* 3.该消息队列是否共享,true表示多个消费者可访问此消息队列,false表示只有一个消费者可访问此消息队列
* 4.是否自动删除,最后一个消费者断开连接以后,该消息队列是否自动删除
* 5.其它参数
* */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//要发送到hello队列中的消息
// String message = "hello world";
String message = "{\"table_name\":\"md_company\", \"type\":\"insert\", \"time_now\":\"2022-05-09 11:20:10\", \"id\":\"2\", \"tenant_id\":\"2\", \"code\":\"2\", \"name\" :\"2\", \"short_name\":\"2\", \"bank_account\":\"2\", \"bank_name\":\"2\", \"tax_payer_no\":\"2\", \"legal_person\":\"2\", \"tel\":\"2\", \"established_date\":\"2\", \"business_license\":\"2\", \"is_region\":\"2\" , \"is_bm_payment\":\"2\", \"province_code\":\"2\", \"city_code\":\"2\" , \"address\":\"2\" , \"company_type_code\":\"2\", \"is_used\":\"2\", \"create_user\":\"2\", \"create_dept\":\"2\", \"create_time\":\"2022-05-09 11:20:10\", \"update_user\":\"2\", \"update_time\":\"2022-05-09 11:20:18\", \"status\":\"2\", \"is_deleted\":\"2\", \"edge_form_id\":\"2\" , \"data_type\":\"2\",\"exchange_status\":\"2\"}";
/**
* 往队列中发送一个消息
*
* 1.发送到哪个交换机
* 2.路由的Key值是哪个,本次是队列的名称
* 3.其它参数信息
* 4.发送消息的消息体,需要转换成Byte数组
* */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//如果消息成功的发送到了hello队列中,那么会输出这句代码
System.out.println("消息发送完毕");
}
}
消费者:
package com.atguigu.util; /**
* Copyright (c) 牧原 All Rights Reserved
* <p>
* Project: Consumer
* Package: com.atguigu.util
* Version: 1.0
* <p>
* Created by songquan on 2022/4/26 上午10:30
*/
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @topic:
* @desc:
* @author: songquan tel:18211850987
* @department:牧原食品-肉食总部数字化部-肉食总部大数据分析与应用科
* @datatime: 2022/4/26 上午10:30
*/
public class Consumer {
//消费者要获取哪个队列中的消息
public static final String QUEUE_NAME="data.etl.syn.rs.mdm.company.queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.106.11.37");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//如果能成功接收到消息会调用的回调函数
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println(new String(message.getBody()));
};
//如果取消从消息队列中获取消息时会调用的回调函数
CancelCallback cancelCallback= consumerTag->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息,也即是消费者从消息队列中取消息
*
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答,true代表的是自动应答,false代表的是手动应答
* 3.消费者成功消费的回调
* 4.消费者取消消费的回调
* */
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
|