具体配置见 https://blog.csdn.net/ssehs/article/details/104617657 大佬的文章 结合 官方文档
我用的是5.0.5版本 maven配置
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
这是该版本的官方文档 https://docs.spring.io/spring-integration/docs/5.0.5.RELEASE/reference/html/ip.html 有个小技巧 如果想用5.0.5.RELEASE版本 就把加粗的部分变为 5.0.5.RELEASE 即可 将其变为 https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/ip.html 同理 想找该版本的MQTT配置就将 ip换为mqtt即可 https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/mqtt.html
回归正题
服务器接收不到消息是因为
TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(port);
默认使用的是 ByteArrayCrlfSerializer 反序列方法 目的是将字节数组转换为字节流,后跟回车符和换行符 (\r\n)。 就是说发送的结束符是以回车符和换行符 (\r\n)为结尾的 才被认为是发送完毕
我找好久的原因 发现 client 关闭应用程序的时候 客户端才收到消息(这种问题发生在使用的调试工具的时候 如果用spring-integration配置的客户端是没有这个问题的)
查看ByteArrayCrLfSerializer的源码 发现
if (n > 0 && bite == 10 && buffer[n - 1] == 13) {
return n - 1;
}
10 一个就是换行符 13 应该是回车
@Test
public void test(){
byte[] a = new byte[3];
a[0]=(byte)10;
a[1]=(byte)13;
String s1=new String(a);
log.error("123");
log.error(s1);
log.error("123");
}
14:56:11.667 [main] ERROR com.qhdsx.apollo.module.msgcenter.util.DLT645Utils - 123
14:56:11.667 [main] ERROR com.qhdsx.apollo.module.msgcenter.util.DLT645Utils -
14:56:11.667 [main] ERROR com.qhdsx.apollo.module.msgcenter.util.DLT645Utils - 123
我的调试工具应该是不在发送的最后 填写回车和换行的 所以收不到消息
下面是ByteArrayCrLfSerializer的源码
package org.springframework.integration.ip.tcp.serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class ByteArrayCrLfSerializer extends AbstractPooledBufferByteArraySerializer {
public static final ByteArrayCrLfSerializer INSTANCE = new ByteArrayCrLfSerializer();
private static final byte[] CRLF = "\r\n".getBytes();
public ByteArrayCrLfSerializer() {
}
public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
int n = this.fillToCrLf(inputStream, buffer);
return this.copyToSizedArray(buffer, n);
}
public int fillToCrLf(InputStream inputStream, byte[] buffer) throws IOException {
int n = 0;
if (this.logger.isDebugEnabled()) {
this.logger.debug("Available to read: " + inputStream.available());
}
try {
do {
int bite = inputStream.read();
if (bite < 0 && n == 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
this.checkClosure(bite);
if (n > 0 && bite == 10 && buffer[n - 1] == 13) {
return n - 1;
}
buffer[n++] = (byte)bite;
} while(n < this.maxMessageSize);
throw new IOException("CRLF not found before max message length: " + this.maxMessageSize);
} catch (SoftEndOfStreamException var6) {
throw var6;
} catch (IOException var7) {
this.publishEvent(var7, buffer, n);
throw var7;
} catch (RuntimeException var8) {
this.publishEvent(var8, buffer, n);
throw var8;
}
}
public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
outputStream.write(bytes);
outputStream.write(CRLF);
}
}
我们将其改进一下 自定义一下
package com.qhdsx.apollo.module.msgcenter.config;
import java.io.*;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.ip.tcp.serializer.AbstractPooledBufferByteArraySerializer;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
@Slf4j
public class CustomSerializerDeserializer extends AbstractPooledBufferByteArraySerializer{
@Override
public void serialize(byte[] message, OutputStream outputStream) throws IOException {
log.info("Serializing {}", new String(message, StandardCharsets.UTF_8));
outputStream.write(message);
outputStream.flush();
}
@Override
protected byte[] doDeserialize(InputStream inputStream,byte[] bytes) throws IOException{
int n = this.fillToCrLf(inputStream, bytes);
return this.copyToSizedArray(bytes, n);
}
public int fillToCrLf(InputStream inputStream, byte[] buffer) throws IOException {
int n = 0;
int count = 0;
while (count == 0) {
count = inputStream.available();
}
try {
do {
int bite = inputStream.read();
if (bite < 0 && n == 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
this.checkClosure(bite);
buffer[n++] = (byte)bite;
if (n > 0&&bite>0&&n==count) {
return n ;
}
} while(n < this.maxMessageSize);
throw new IOException("CRLF not found before max message length: " + this.maxMessageSize);
} catch (SoftEndOfStreamException var6) {
throw var6;
} catch (IOException var7) {
this.publishEvent(var7, buffer, n);
throw var7;
} catch (RuntimeException var8) {
this.publishEvent(var8, buffer, n);
throw var8;
}
}
}
public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
@Bean
public AbstractServerConnectionFactory serverCF() {
TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(port);
serverCf.setLookupHost(false);
serverCf.setDeserializer(SERIALIZER);
return serverCf;
}
这样就能解决接收不到消息的问题了
关于 消息过滤器 好像没啥用
@Bean
public MessageChannel serverIn() {
DirectChannel directChannel = new DirectChannel();
directChannel.addInterceptor(new TcpChannelInterptor());
return directChannel;
}
在定义入站通道的时候 加入Interceptor
package com.qhdsx.apollo.module.msgcenter.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
@Slf4j
public class TcpChannelInterptor implements ChannelInterceptor{
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
log.error("preSend message:{}, channel:{}", message.toString(), channel.toString());
return message;
}
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
log.error("postSend message:{}, channel:{}, sent:{}", message.toString(), channel.toString(), sent);
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
log.error("afterSendCompletion message:{}, channel:{}, sent:{}, ex:{}", message.toString(), channel.toString(), sent, ex);
}
@Override
public boolean preReceive(MessageChannel channel) {
log.error("preReceive channel:{}", channel.toString());
return false;
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
log.error("postReceive Completion message:{}, channel:{}", message.toString(), channel.toString());
return message;
}
@Override
public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
log.error("afterReceiveCompletion=====> message:{}, channel:{}, ex:{}", message.toString(), channel.toString(), ex);
}
}
服务器完整配置:
package com.qhdsx.apollo.module.msgcenter.config;
import com.qhdsx.apollo.module.msgcenter.listener.TcpChannelInterptor;
import com.qhdsx.apollo.module.msgcenter.listener.TcpInboundListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.*;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.*;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
@EnableIntegration
@Configuration
@Slf4j
public class TcpServerConfig implements ApplicationListener<ApplicationEvent>{
private final TcpServerGateway tcpServerGateway;
public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
@Value("${tcp_port}")
private int port;
@Resource
private TcpInboundListener tcpInboundListener;
public TcpServerConfig(TcpServerGateway tcpServerGateway) {
this.tcpServerGateway = tcpServerGateway;
}
@Bean
public MessageChannel serverIn() {
DirectChannel directChannel = new DirectChannel();
directChannel.addInterceptor(new TcpChannelInterptor());
return directChannel;
}
@Bean
public AbstractServerConnectionFactory serverCF() {
TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(port);
serverCf.setLookupHost(false);
serverCf.setDeserializer(SERIALIZER);
return serverCf;
}
@Bean
public TcpReceivingChannelAdapter tcpInAdapter(AbstractServerConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inGate = new TcpReceivingChannelAdapter();
inGate.setConnectionFactory(serverCF());
inGate.setOutputChannelName("serverIn");
return inGate;
}
@ServiceActivator(inputChannel = "serverIn")
public void upCase(Message<byte[]> in) {
log.error(in.getHeaders().get("ip_connectionId") + "=" + new String(in.getPayload()));
String flag=tcpInboundListener.handleMessage(in);
if(StringUtils.isEmpty(flag)){
Message<byte[]> reply = MessageBuilder.createMessage(in.getPayload(), in.getHeaders());
tcpServerGateway.send(reply);
}
}
@Bean
public MessageChannel serverOut() {
DirectChannel directChannel = new DirectChannel();
directChannel.addInterceptor(new TcpChannelInterptor());
return directChannel;
}
@Bean
@ServiceActivator(inputChannel = "serverOut")
public MessageHandler tcpOutAdapter(AbstractServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler outGate = new TcpSendingMessageHandler();
outGate.setConnectionFactory(connectionFactory);
return outGate;
}
@MessagingGateway(defaultRequestChannel = "serverOut")
public interface TcpServerGateway {
void send(Message<byte[]> out);
}
@Override
public void onApplicationEvent(ApplicationEvent event){
if(event instanceof TcpConnectionOpenEvent){
TcpConnection connection = (TcpConnection) event.getSource();
log.error("############started################");
log.error("连接的成功 ip_connectionId======>{}", connection.getConnectionId());
log.error("连接的成功 端口为======>{}", connection.getPort());
log.error("连接的成功 host为======>{}", connection.getHostAddress());
log.error("连接的成功 hostname为======>{}", connection.getHostName());
log.error("############end################");
}
if(event instanceof TcpDeserializationExceptionEvent){
byte[] buffer=((TcpDeserializationExceptionEvent)event).getBuffer();
String message=((TcpDeserializationExceptionEvent)event).getCause().getMessage();
log.error("############序列化################");
log.error("异常消息为 {}", (message));
log.error("序列化为 {}", new String(buffer));
}
if(event instanceof TcpConnectionCloseEvent){
String connectionId=((TcpConnectionCloseEvent)event).getConnectionId();
log.error("断开连接、======>{}",connectionId);
}
}
个人理解 不对的地方还望大佬指正
|