java中使用jms操作webspheremq
一、简介
这里介绍webspheremq在spring boot中,借助jms,进行消息的发送和接收。
二、准备
在webspheremq中创建发送队列管理器和接收队列管理器,及相关队列、监听器。
2.1 接收队列管理器
接收队列管理器及相关队列、监听器创建如下:
crtmqm QUEUE_TWO_MANAGER
strmqm QUEUE_TWO_MANAGER
runmqsc QUEUE_TWO_MANAGER
define ql(QUEUE_TWO)
define channel(ONE_TWO_CHL) chltype (RCVR) trptype (TCP)
alter listener(system.default.listener.tcp) trptype(tcp) port(9004)
start listener(system.default.listener.tcp)
备注:
echo "start listener(system.default.listener.tcp)" | runmqsc QUEUE_ONE_MANAGER
2.2 发送队列管理器
发送队列管理器及相关队列、监听器创建如下:
crtmqm QUEUE_ONE_MANAGER
strmqm QUEUE_ONE_MANAGER
runmqsc QUEUE_ONE_MANAGER
define qlocal(QUEUE_TR) usage (xmitq)
define qremote(QUEUE_TWO) rname (QUEUE_TWO) rqmname(QUEUE_TWO_MANAGER) xmitq (QUEUE_TR)
define channel(ONE_TWO_CHL) chltype(sdr) conname('127.0.0.1(9004)') xmitq(QUEUE_TR) trptype(tcp)
start channel(ONE_TWO_CHL)
2.3 消息类型
StreamMessage | 主体中包含 Java 基元值流的消息。它的填充和读取均按顺序进行。 |
---|
MapMessage | 主体中包含一组名/值对的消息。没有定义条目顺序。 | TextMessage | 主体中包含 Java 字符串的消息,例如 XML 消息。 | ObjectMessage | 主体中包含序列化 Java 对象的消息。 | BytesMessage | 主体中包含连续字节流的消息。 | Message | 包含消息头和属性但不包含主体的消息。 |
三、使用示例
这里以spring boot,从发送队列管理器发送消息到接收队列管理器为例。
3.1 添加maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath/>
</parent>
<groupId>com.dragon.study</groupId>
<artifactId>spring-boot-websperemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-websperemq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.2 定义mq配置参数文件
在application.yaml定义如下:
mq:
host: localhost
port: 9004
username: mqm
password: root
sendQueueManager: QUEUE_ONE_MANAGER
receiveQueueManager: QUEUE_TWO_MANAGER
channel: ONE_TWO_CHL
queue: QUEUE_TWO
ccsid: 1208
receiveTimeout: 2000
charset: utf-8
定义配置参数类MqParam.java:
package com.dragon.study.springbootwebsperemq.mq.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "mq", ignoreInvalidFields = true)
@Data
public class MqParam {
private String host;
private int port;
private String username;
private String password;
private String sendQueueManager;
private String receiveQueueManager;
private String channel;
private String queue;
private int ccsid;
private int receiveTimeout;
private String charset;
}
3.3 定义发送和接收操作类
发送操作类MqSendConfig.java如下:
package com.dragon.study.springbootwebsperemq.mq.config;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Resource;
@Configuration
public class MqSendConfig {
@Resource
private MqParam mqParam;
@Bean
public MQQueueConnectionFactory sendMqQueueConnectionFactory(){
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setHostName(mqParam.getHost());
try{
factory.setPort(mqParam.getPort());
factory.setCCSID(mqParam.getCcsid());
factory.setQueueManager(mqParam.getSendQueueManager());
factory.setChannel(mqParam.getChannel());
}catch (Exception e){
e.printStackTrace();
}
return factory;
}
@Bean
public UserCredentialsConnectionFactoryAdapter sendUserCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory sendMqQueueConnectionFactory){
UserCredentialsConnectionFactoryAdapter adapter = new UserCredentialsConnectionFactoryAdapter();
adapter.setUsername(mqParam.getUsername());
adapter.setPassword(mqParam.getPassword());
adapter.setTargetConnectionFactory(sendMqQueueConnectionFactory);
return adapter;
}
@Bean
public CachingConnectionFactory sendCachingConnectionFactory(UserCredentialsConnectionFactoryAdapter sendUserCredentialsConnectionFactoryAdapter){
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setTargetConnectionFactory(sendUserCredentialsConnectionFactoryAdapter);
cf.setSessionCacheSize(20);
cf.setReconnectOnException(true);
return cf;
}
@Bean
public PlatformTransactionManager sendPlatformTransactionManager(CachingConnectionFactory sendCachingConnectionFactory){
JmsTransactionManager tm = new JmsTransactionManager(sendCachingConnectionFactory);
return tm;
}
@Bean
public JmsOperations sendJmsOperations(CachingConnectionFactory sendCachingConnectionFactory){
JmsTemplate tp = new JmsTemplate(sendCachingConnectionFactory);
tp.setReceiveTimeout(2000);
return tp;
}
}
接收操作类MqReceiveConfig.java如下:
package com.dragon.study.springbootwebsperemq.mq.config;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.Resource;
@Configuration
public class MqReceiveConfig {
@Resource
private MqParam mqParam;
@Bean
public MQQueueConnectionFactory receiveMqQueueConnectionFactory(){
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setHostName(mqParam.getHost());
try{
factory.setPort(mqParam.getPort());
factory.setCCSID(mqParam.getCcsid());
factory.setQueueManager(mqParam.getReceiveQueueManager());
factory.setChannel(mqParam.getChannel());
}catch (Exception e){
e.printStackTrace();
}
return factory;
}
@Bean
public UserCredentialsConnectionFactoryAdapter receiveUserCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory receiveMqQueueConnectionFactory){
UserCredentialsConnectionFactoryAdapter adapter = new UserCredentialsConnectionFactoryAdapter();
adapter.setUsername(mqParam.getUsername());
adapter.setPassword(mqParam.getPassword());
adapter.setTargetConnectionFactory(receiveMqQueueConnectionFactory);
return adapter;
}
@Bean
public CachingConnectionFactory receiveCachingConnectionFactory(UserCredentialsConnectionFactoryAdapter receiveUserCredentialsConnectionFactoryAdapter){
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setTargetConnectionFactory(receiveUserCredentialsConnectionFactoryAdapter);
cf.setSessionCacheSize(20);
cf.setReconnectOnException(true);
return cf;
}
@Bean
public PlatformTransactionManager receivePlatformTransactionManager(CachingConnectionFactory receiveCachingConnectionFactory){
JmsTransactionManager tm = new JmsTransactionManager(receiveCachingConnectionFactory);
return tm;
}
@Bean
public JmsOperations receiveJmsOperations(CachingConnectionFactory receiveCachingConnectionFactory){
JmsTemplate tp = new JmsTemplate(receiveCachingConnectionFactory);
tp.setReceiveTimeout(mqParam.getReceiveTimeout());
return tp;
}
@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(CachingConnectionFactory receiveCachingConnectionFactory){
DefaultJmsListenerContainerFactory lf = new DefaultJmsListenerContainerFactory();
lf.setConnectionFactory(receiveCachingConnectionFactory);
lf.setConcurrency("3-10");
lf.setRecoveryInterval(1000L);
return lf;
}
}
3.4 定义接收监听器
服务类MqService.java如下:定义接收消息监听类ReceiveMessageListener.java如下:
package com.dragon.study.springbootwebsperemq.mq.service;
import lombok.SneakyThrows;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
@Component
public class ReceiveMessageListener extends MessageListenerAdapter {
@SneakyThrows
@JmsListener(destination = "QUEUE_TWO", containerFactory="defaultJmsListenerContainerFactory")
@Override
public void onMessage(Message message) {
if(message instanceof ObjectMessage){
ObjectMessage m = (ObjectMessage) message;
System.out.printf("msgId:%s correlationId:%s msg:%s\n", m.getJMSMessageID(), m.getJMSCorrelationID(), m.getObject());
}else {
TextMessage m = (TextMessage) message;
System.out.printf("msgId:%s correlationId:%s msg:%s\n", m.getJMSMessageID(), m.getJMSCorrelationID(), m.getText());
}
}
}
3.5 定义其他相关类
定义实体Stu.java:
package com.dragon.study.springbootwebsperemq.mq.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class Stu implements Serializable {
private String name;
@Override
public String toString() {
return "Stu{" +
"name='" + name + '\'' +
'}';
}
}
启动类SpringBootWebsperemqApplication.java如下:
package com.dragon.study.springbootwebsperemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootWebsperemqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootWebsperemqApplication.class, args);
}
}
3.6 测试
这里接收消息有两种,
- 通过消息接收监听器监听消费消息,此时运行SpringBootWebsperemqApplication,启动项目即可;
- 主动收取消息,此时不需要启动项目,注释掉监听器代码即可;
发送、接收或浏览消息测试如下:
package com.dragon.study.springbootwebsperemq;
import com.dragon.study.springbootwebsperemq.mq.bean.Stu;
import com.dragon.study.springbootwebsperemq.mq.config.MqParam;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.BrowserCallback;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
@SpringBootTest
class MqTests {
@Resource
private MqParam mqParam;
@Resource
private JmsOperations sendJmsOperations;
@Resource
private JmsOperations receiveJmsOperations;
@Test
void sendTest() {
sendJmsOperations.convertAndSend(mqParam.getQueue(), "hello");
sendJmsOperations.convertAndSend(mqParam.getQueue(), new Stu("apple"));
sendJmsOperations.send(mqParam.getQueue(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage m = session.createTextMessage();
m.setText("hello");
m.setJMSCorrelationID("001");
m.setStringProperty("mId", "001");
return m;
}
});
}
@Test
@SneakyThrows
void receiveTest() {
Message message = receiveJmsOperations.receive(mqParam.getQueue());
if (message instanceof ObjectMessage) {
ObjectMessage m = (ObjectMessage) message;
System.out.printf("msgId:%s correlationId:%s msg:%s\n", m.getJMSMessageID(), m.getJMSCorrelationID(), m.getObject());
} else {
TextMessage m = (TextMessage) message;
System.out.printf("msgId:%s correlationId:%s msg:%s\n", m.getJMSMessageID(), m.getJMSCorrelationID(), m.getText());
}
Object msg = receiveJmsOperations.receiveAndConvert(mqParam.getQueue());
System.out.println("msg:" + msg);
Object msg1 = receiveJmsOperations.receiveSelectedAndConvert(mqParam.getQueue(), "JMSMessageID='ID:414D512051554555455F4F4E455F4D41F3E4F760047FEB20'");
System.out.println("msg1:" + msg1);
Object msg2 = receiveJmsOperations.receiveSelectedAndConvert(mqParam.getQueue(), "JMSCorrelationID='001'");
System.out.println("msg2:" + msg2);
Object msg3 = receiveJmsOperations.receiveSelectedAndConvert(mqParam.getQueue(), "Cid='001'");
System.out.println("msg3:" + msg3);
Object msg4 = receiveJmsOperations.receiveSelectedAndConvert(mqParam.getQueue(), "mId='001'");
System.out.println("msg4:" + msg4);
}
@Test
@SneakyThrows
void browseTest() {
Consumer<List<Message>> showMsgFunc = ml -> ml.forEach(message -> {
try {
if (message instanceof ObjectMessage) {
ObjectMessage m = (ObjectMessage) message;
System.out.printf("msgId:%s correlationId:%s msg:%s\n", m.getJMSMessageID(), m.getJMSCorrelationID(), m.getObject());
} else {
TextMessage m = (TextMessage) message;
System.out.printf("msgId:%s correlationId:%s msg:%s\n", m.getJMSMessageID(), m.getJMSCorrelationID(), m.getText());
}
} catch (Exception e) {
e.printStackTrace();
}
});
List<Message> allMsgList = receiveJmsOperations.browse(mqParam.getQueue(), new BrowserCallback<List<Message>>() {
@Override
public List<Message> doInJms(Session session, QueueBrowser queueBrowser) throws JMSException {
return Collections.list(queueBrowser.getEnumeration());
}
});
showMsgFunc.accept(allMsgList);
List<Message> msgList = receiveJmsOperations.browseSelected(mqParam.getQueue(), "JMSCorrelationID='002'", new BrowserCallback<List<Message>>() {
@Override
public List<Message> doInJms(Session session, QueueBrowser queueBrowser) throws JMSException {
return Collections.list(queueBrowser.getEnumeration());
}
});
showMsgFunc.accept(msgList);
}
}
四、其他
4.1 常见问题
4.1.1 JMSFMQ6312报错
问题:JMSFMQ6312: An exception occurred in the Java(tm) MQI
参考:https://www.ibm.com/support/pages/node/476243
解决:
运行时添加mqm的jar包
-Djava.library.path=/opt/mqm/java/lib64
4.1.2 消息通道一直处于RETRYING状态
问题:dis chs(ONE_TWO_CHL) 显示 STATUS(RETRYING)
原因:如果在网络和端口都正确的情况下,那可能是通道序号不对,此时需要停止通道后重置通道,再重启通道即可
解决:依次执行如下命令,重置通道
stop chl(ONE_TWO_CHL)
reset chl(ONE_TWO_CHL)
start chl(ONE_TWO_CHL)
4.2 消息形式
消息在mq管理器中和jms中显示形式如下: mq管理器中消息:
$ ./amqsbcg QUEUE_TWO QUEUE_TWO_MANAGER
AMQSBCG0 - starts here
**********************
MQOPEN - 'QUEUE_TWO'
MQGET of message number 1, CompCode:0 Reason:0
****Message descriptor****
StrucId : 'MD ' Version : 2
Report : 0 MsgType : 8
Expiry : -1 Feedback : 0
Encoding : 273 CodedCharSetId : 1208
Format : 'MQHRF2 '
Priority : 4 Persistence : 1
MsgId : X'414D512051554555455F4F4E455F4D41F3E4F760029CEB20'
CorrelId : X'303031000000000000000000000000000000000000000000'
BackoutCount : 0
ReplyToQ : ' '
ReplyToQMgr : 'QUEUE_ONE_MANAGER '
** Identity Context
UserIdentifier : 'mqm '
AccountingToken :
X'0000000000000000000000000000000000000000000000000000000000000000'
ApplIdentityData : ' '
** Origin Context
PutApplType : '6'
PutApplName : 'java '
PutDate : '20210722' PutTime : '08221594'
ApplOriginData : ' '
GroupId : X'000000000000000000000000000000000000000000000000'
MsgSeqNumber : '1'
Offset : '0'
MsgFlags : '0'
OriginalLength : '-1'
**** Message ****
length - 205 of 205 bytes
00000000: 5246 4820 0000 0002 0000 00C8 0000 0111 'RFH ............'
00000010: 0000 04B8 4D51 5354 5220 2020 0000 0000 '....MQSTR ....'
00000020: 0000 04B8 0000 0020 3C6D 6364 3E3C 4D73 '....... <mcd><Ms'
00000030: 643E 6A6D 735F 7465 7874 3C2F 4D73 643E 'd>jms_text</Msd>'
00000040: 3C2F 6D63 643E 2020 0000 005C 3C6A 6D73 '</mcd> ...\<jms'
00000050: 3E3C 4473 743E 7175 6575 653A 2F2F 2F51 '><Dst>queue:///Q'
00000060: 5545 5545 5F54 574F 3C2F 4473 743E 3C54 'UEUE_TWO</Dst><T'
00000070: 6D73 3E31 3632 3730 3238 3137 3539 3237 'ms>1627028175927'
00000080: 3C2F 546D 733E 3C43 6964 3E30 3031 3C2F '</Tms><Cid>001</'
00000090: 4369 643E 3C44 6C76 3E32 3C2F 446C 763E 'Cid><Dlv>2</Dlv>'
000000A0: 3C2F 6A6D 733E 2020 0000 001C 3C75 7372 '</jms> ....<usr'
000000B0: 3E3C 6D49 643E 3030 313C 2F6D 4964 3E3C '><mId>001</mId><'
000000C0: 2F75 7372 3E20 2020 6865 6C6C 6F '/usr> hello '
jms中消息:
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: ID:414d512051554555455f4f4e455f4d41f3e4f760029ceb20
JMSTimestamp: 1627028175927
JMSCorrelationID: 001
JMSDestination: queue:///QUEUE_TWO
JMSReplyTo: null
JMSRedelivered: false
JMSXAppID: java
JMSXDeliveryCount: 1
JMSXUserID: mqm
JMS_IBM_Character_Set: UTF-8
JMS_IBM_Encoding: 273
JMS_IBM_Format: MQSTR
JMS_IBM_MsgType: 8
JMS_IBM_PutApplType: 6
JMS_IBM_PutDate: 20210722
JMS_IBM_PutTime: 08221594
mId: 001
hello
4.3 相关文档
https://docs.oracle.com/cd/E19148-01/820-0533/index.html https://docs.oracle.com/cd/E19148-01/820-0533/aerbg/index.html
|