依赖
<!--socket io -->
<dependency>
<groupId>io.socket</groupId>
<artifactId>socket.io-client</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>${netty-socketio.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
代码
@Slf4j
@Component
public class SocketIOClientManager {
@Autowired
private KafkaConnectionManager kafkaConnectionManager;
@Autowired
private MqttConnectionManager mqttConnectionManager;
@Resource
private WebSocketEventHandler webSocketEventHandler;
private Map<String, Map<String, SocketIOClient>> clientMap = Collections.synchronizedMap(new HashMap<>());
public void addClient(SocketIOClient client) {
String sessionID = client.getSessionId().toString();
String resourceID = getParamsByClient(client);
if (resourceID == null) {
log.error("客户端未配置参数");
client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");
}
if (clientMap.containsKey(resourceID)) {
Map<String, SocketIOClient> subMap = clientMap.get(resourceID);
if (!subMap.containsKey(sessionID)) {
subMap.put(sessionID, client);
clientMap.put(resourceID, subMap);
}
} else {
final HashMap<String, SocketIOClient> subMap = new HashMap<>();
subMap.put(sessionID, client);
clientMap.put(resourceID, subMap);
}
log.info("在线客户端: " + clientMap.toString());
}
public void removeClient(SocketIOClient client) {
String sessionID = client.getSessionId().toString();
String resourceID = getParamsByClient(client);
if (resourceID == null) {
log.error("客户端未配置参数");
client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");
return;
}
if (clientMap.containsKey(resourceID)) {
final Map<String, SocketIOClient> subMap = clientMap.get(resourceID);
final Iterator<Map.Entry<String, SocketIOClient>> iterator = subMap.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<String, SocketIOClient> clientEntry = iterator.next();
if (clientEntry.getKey().equals(sessionID)) {
iterator.remove();
log.info("移除客户端: {}", sessionID);
if (subMap.size() == 0) {
clientMap.remove(resourceID);
log.info("移除ID: {}", resourceID);
if (resourceID.startsWith(String.valueOf(ResourceType.KAFKA))) {
kafkaConnectionManager.removeConnection(resourceID);
}
if (resourceID.startsWith(String.valueOf(ResourceType.MQTT))) {
mqttConnectionManager.removeConnection(resourceID);
}
} else {
clientMap.put(resourceID, subMap);
}
}
}
} else {
log.info("没有 {} 对应的{} 客户端", resourceID, sessionID);
}
}
public void pushClientMesg2Kafka(SocketIOClient client, String topic, String mesg) throws ExecutionException, InterruptedException {
String resourceID = getParamsByClient(client);
KafkaPubSubServer kafkaServer = (KafkaPubSubServer) kafkaConnectionManager.getServerByResourceID(resourceID);
if (kafkaServer == null) {
throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");
}
if (clientMap.containsKey(resourceID)) {
kafkaServer.pushMesg(topic, mesg);
}
}
public void pushKafkaMesg2Client(String resourceID, String mesg) {
if (clientMap.containsKey(resourceID)) {
Map<String, SocketIOClient> subMap = clientMap.get(resourceID);
for (SocketIOClient ioClient : subMap.values()) {
ioClient.sendEvent(webSocketEventHandler.getClientSubKafkaEvent(), mesg.toString());
}
}
}
public void pushClientMesg2MQTT(SocketIOClient client, String topic, String mesg) throws MqttException {
String resourceID = getParamsByClient(client);
MqttPubSubServer mqttServer = (MqttPubSubServer) mqttConnectionManager.getServerByResourceID(resourceID);
if (mqttServer == null) {
throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");
}
if (clientMap.containsKey(resourceID)) {
mqttServer.pushMesg(topic, mesg);
}
}
public void pushMQTTMesg2Client(String resourceID, String mesg) {
if (clientMap.containsKey(resourceID)) {
Map<String, SocketIOClient> subMap = clientMap.get(resourceID);
for (SocketIOClient ioClient : subMap.values()) {
ioClient.sendEvent(webSocketEventHandler.getClientSubEmqEvent(), mesg.toString());
}
}
}
private String getParamsByClient(SocketIOClient client) {
final String resourceID = client.getHandshakeData().getSingleUrlParam("resourceID");
return resourceID;
}
}
@Configuration
public class SocketIOConfig {
@Value("${socket-io.host}")
private String host;
@Value("${socket-io.port}")
private int port;
public String getUrl() {
return "http://" + host + ":" + port;
}
public SocketIOConfig() {
}
@Bean
public SocketIOServer socketIOServer() {
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setHostname(host);
config.setPort(port);
config.setUpgradeTimeout(10000);
config.setPingInterval(25000);
config.setPingTimeout(60000);
return new SocketIOServer(config);
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
@Component
@Slf4j
public class WebSocketEventHandler {
@Autowired
private SocketIOClientManager socketIOClientManager;
public String getClientSubKafkaEvent() {
return clientSubKafkaEvent;
}
public String getClientPubKafkaEvent() {
return clientPubKafkaEvent;
}
public String getClientSubEmqEvent() {
return clientSubEmqEvent;
}
public String getClientPubEmqEvent() {
return clientPubEmqEvent;
}
private final String clientSubKafkaEvent = "subKafka";
private final String clientPubKafkaEvent = "pubKafka";
private final String clientSubEmqEvent = "subEmq";
private final String clientPubEmqEvent = "pubEmq";
@OnConnect
public void onConnect(SocketIOClient client) {
log.info("客户端发起连接. sessionId->{}", client.getSessionId());
socketIOClientManager.addClient(client);
}
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
final String sessionID = client.getSessionId().toString();
log.info("客户端断开连接, sessionId->{}" + sessionID);
socketIOClientManager.removeClient(client);
client.disconnect();
}
@OnEvent(value = clientPubKafkaEvent)
public void pushKafka(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {
if (StrUtil.isEmpty(topic)) {
ackRequest.sendAckData(400, "topic不能为空");
}
if (StrUtil.isEmpty(mesg)) {
ackRequest.sendAckData(400, "mesg不能为空");
}
try {
socketIOClientManager.pushClientMesg2Kafka(client, topic, mesg);
ackRequest.sendAckData(200, "id");
} catch (Exception e) {
e.printStackTrace();
ackRequest.sendAckData(500, e.getMessage());
}
}
@OnEvent(value = clientPubEmqEvent)
public void pushEmq(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {
if (StrUtil.isEmpty(topic)) {
ackRequest.sendAckData(400, "topic不能为空");
}
if (StrUtil.isEmpty(mesg)) {
ackRequest.sendAckData(400, "mesg不能为空");
}
try {
socketIOClientManager.pushClientMesg2MQTT(client, topic, mesg);
ackRequest.sendAckData(200, "id");
} catch (Exception e) {
e.printStackTrace();
ackRequest.sendAckData(500, e.getMessage());
}
}
}
@Component
@Order(1)
public class ServerRunner implements CommandLineRunner {
private final SocketIOServer server;
private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);
@Autowired
public ServerRunner(SocketIOServer server) {
this.server = server;
}
@Override
public void run(String... args) {
logger.info("SocketIO 启动...");
server.start();
}
}
@Slf4j
public class SocketClientEMQTest {
public static void main(String[] args) {
final SocketClientEMQTest socketClientTest = new SocketClientEMQTest();
try {
socketClientTest.run();
} catch (Exception e) {
e.printStackTrace();
}
}
public void run(String... args) throws Exception {
URI uri = URI.create("http://127.0.0.1:9201");
IO.Options options = new IO.Options();
options.transports = new String[]{"websocket"};
options.reconnectionAttempts = 2;
options.query = "resourceID=" + "mqtt$$tcp://localhost:1883$$test";
Socket socket = IO.socket(uri, options);
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
log.info("connect: {}", args);
}
});
socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
log.info("disconnect: {}", args);
}
});
socket.on("subEmq", new Emitter.Listener() {
@Override
public void call(Object... args) {
log.info("push_mqtt {}", args);
}
});
final ArrayList<String> arrayList = new ArrayList<>();
int i = 0;
while (true) {
i += 1;
socket.emit("pubEmq", "test", "testmesg" + i, new Ack() {
@Override
public void call(Object... objects) {
log.info("userChat ack:{}|{}", objects[0], objects[1]);
}
});
if (i >= 10) {
break;
}
Thread.sleep(2000);
}
socket.connect();
LockSupport.park();
}
}
@Slf4j
public class SocketClientKAFKATest {
public static void main(String[] args) {
final SocketClientKAFKATest socketClientTest = new SocketClientKAFKATest();
try {
socketClientTest.run();
} catch (Exception e) {
e.printStackTrace();
}
}
public void run(String... args) throws Exception {
URI uri = URI.create("http://127.0.0.1:9201");
IO.Options options = new IO.Options();
options.transports = new String[]{"websocket"};
options.reconnectionAttempts = 2;
options.query = "resourceID=" + "kafka$$localhost:9092$$test12399";
Socket socket = IO.socket(uri, options);
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
log.info("connect: {}", args);
}
});
socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
log.info("disconnect: {}", args);
}
});
socket.on("subKafka", new Emitter.Listener() {
@Override
public void call(Object... args) {
log.info("push_kafka {}", args);
}
});
final ArrayList<String> arrayList = new ArrayList<>();
int i = 0;
while (true) {
i += 1;
socket.emit("pubKafka", "TEST", "testmesg" + i, new Ack() {
@Override
public void call(Object... objects) {
log.info("userChat ack:{}|{}", objects[0], objects[1]);
}
});
if (i >= 10) {
break;
}
Thread.sleep(2000);
}
socket.connect();
LockSupport.park();
}
}
|