消费者
public class Consumers implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(Consumers.class);
private static final String BROKER_URL = "failover://tcp://localhost:61616";
private ConnectionFactory connectionFactory;
private Connection conn;
private Session session;
private Destination dest;
private MessageConsumer consumer;
public void creatConn() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", BROKER_URL);
//创建mq连接
conn = connectionFactory.createConnection();
//启动连接
conn.start();
//创建会话
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void connQueue(String SUBJECT) throws JMSException {
//通过会话创建目标
dest = session.createQueue(SUBJECT);
//创建mq消息的消费者
consumer = session.createConsumer(dest);
//初始化MessageListener
Consumers me = new Consumers();
//给消费者设定监听对象
consumer.setMessageListener(me);
}
public void closeConn()throws JMSException{
//关闭mq连接
// consumer.close();
// session.close();
conn.close();
System.out.println("消费者链接关闭" +"======="+conn);
}
public void onMessage(Message message) {
System.out.println(message);
TextMessage txtMessage = (TextMessage)message;
try {
msg=txtMessage.getText();
JSONObject jsonObject = JSON.parseObject(msg);
String textMessage = jsonObject.getString("message");
String from = jsonObject.getString("uname");
String to = jsonObject.getString("to");
Map<String, Object> mp1 = new HashMap<>();
mp1.put("messageType", 4);
mp1.put("textMessage", textMessage);
mp1.put("fromUser", from);
webSocket.sendMessageTo(JSON.toJSONString(mp1), to);
} catch (JMSException e) {
e.printStackTrace();
}
}
private static WebSocket webSocket= new WebSocket();
private String msg;
}
生产者
public class Producers {
private static final Logger LOGGER = LoggerFactory.getLogger(Producers.class);
private static final String BROKER_URL = "failover://tcp://localhost:61616";
//动态队列SUBJECT
//初始化连接工厂
private ConnectionFactory connectionFactory;
//获得连接
private Connection conn;
//创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
private Session session;
//创建队列
private Destination dest;
//通过session可以创建消息的生产者
private MessageProducer producer;
public void createConn() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", BROKER_URL);
conn = connectionFactory.createConnection();
//启动连接
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void closeConn()throws JMSException{
//关闭mq连接
conn.close();
}
public void createQueue(String SUBJECT) throws JMSException{
dest = session.createQueue(SUBJECT);
producer = session.createProducer(dest);
}
public void sendMsg(String msg) throws Exception {
//初始化一个mq消息
TextMessage message = session.createTextMessage(msg );
//发送消息
producer.send(message);
LOGGER.debug("send message");
}
}
https://download.csdn.net/download/Caiabcd/20628919?spm=1001.2014.3001.5503
https://blog.csdn.net/Caiabcd/article/details/118556245
|