一、配置数据源
1、添加数据源
配置 --> 数据源 --> 添加数据源
2、添加存储数据的表
CREATE TABLE jdbc_message_store(
indexId BIGINT( 20 ) NOT NULL AUTO_INCREMENT ,
msg_id VARCHAR( 200 ) NOT NULL ,
message BLOB NOT NULL ,
PRIMARY KEY ( indexId )
);
二、配置接口
1、添加 Message Store
MsgStore.xml
<?xml version="1.0" encoding="UTF-8"?>
<messageStore class="org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore" name="MsgStore" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="store.jdbc.dsName">MySQL</parameter>
<parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
<parameter name="store.jdbc.table">jdbc_message_store</parameter>
</messageStore>
2、在 API 中添加 Store
3、发布应用
4、Postman 测试
未接到消息时为0. postman发送报文
再次刷新消息存储,消息变为1了! 数据库表也存储了这条消息
三、查看存储的消息
1、查看消息内容
不能直接查看 message 内容,需要另外使用程序查看,因为 blob 存储的是对象。查看源码它是使用 ObjectInputStream 读取消息的。
使用 Eclipse Java 新建一个工程
导入 WSO2 包:WSO2ESB_HOME\repository\components\plugins\synapse-core_2.1.7.wso2v7.jar JDBC包:mysql-connector-java-5.1.26.jar fastjson包:fastjson-1.2.15.jar
Main.java
package com;
import java.util.List;
import java.util.Map;
import org.apache.synapse.message.store.impl.commons.StorableMessage;
import com.alibaba.fastjson.JSON;
public class Main {
public static void main(String[] args) {
String[] field = { "indexId", "msg_id", "message" };
String sqlString = "SELECT indexId,msg_id,message FROM jdbc_message_store;";
List<Object> msgList = Utils.getMysqlInfo(sqlString, field);
String indexId = "", msg_id = "";
StorableMessage message = null;
for (int i = 0; i < (msgList.size() / field.length); i++) {
for (int j = 0; j < field.length; j++) {
if (j == 0) {
indexId = (String) msgList.get(i * field.length + j);
}
if (j == 1) {
msg_id = (String) msgList.get(i * field.length + j);
}
if (j == 2) {
message = (StorableMessage)msgList.get(i * field.length + j);
}
}
Map map = message.getAxis2message().getProperties();
System.out.println(JSON.toJSON(map));
Map map2 = message.getSynapseMessage().getProperties();
System.out.println(JSON.toJSON(map2));
}
}
}
Utils.java
package com;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.synapse.message.store.impl.commons.StorableMessage;
public class Utils {
private static Connection conn;
public static Connection getsourcedata() {
String dburl = "";
String username = "";
String password = "";
dburl = "jdbc:mysql://localhost:3306/Carbon_WSO2?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8";
username = "wxhntmy";
password = "123456";
try {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(dburl, username, password);
System.out.println("数据库连接成功!");
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
public static List<Object> getMysqlInfo(String sql, String... field) {
List<Object> respon = new ArrayList<>();
PreparedStatement pstmt = null;
Connection con = Utils.getsourcedata();
try {
pstmt = con.prepareStatement(sql);
ResultSet result = pstmt.executeQuery();
System.out.println("utils 最终运行SQL: " + pstmt);
while (result.next()) {
for (String string : field) {
if ("message".equals(string)) {
byte[] msgObj = result.getBytes(string);
if (msgObj != null) {
ObjectInputStream ios = null;
try {
ios = new ObjectInputStream(new ByteArrayInputStream(msgObj));
Object msg = ios.readObject();
if (msg instanceof StorableMessage) {
StorableMessage jdbcMsg = (StorableMessage)msg;
respon.add(jdbcMsg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
ios.close();
} catch (IOException e) {
e.printStackTrace();
}
}
continue;
}
} else {
respon.add(result.getString(string));
}
}
}
pstmt.close();
con.close();
} catch (SQLException e) {
e.printStackTrace();
try {
con.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
}
return respon;
}
}
2、查看消息
message.getAxis2message().getProperties() message.getSynapseMessage().getProperties(),这里可以看到我们设置的属性
|