IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 【无标题】spring integration-TCP服务器和TCP客户端的注解配置 服务器配置接收不到消息的问题和消息过滤器配置 -> 正文阅读

[Java知识库]【无标题】spring integration-TCP服务器和TCP客户端的注解配置 服务器配置接收不到消息的问题和消息过滤器配置

具体配置见 https://blog.csdn.net/ssehs/article/details/104617657 大佬的文章
结合 官方文档

我用的是5.0.5版本 maven配置

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-ip</artifactId>
    <version>5.0.5.RELEASE</version>
</dependency>

这是该版本的官方文档
https://docs.spring.io/spring-integration/docs/5.0.5.RELEASE/reference/html/ip.html
有个小技巧 如果想用5.0.5.RELEASE版本 就把加粗的部分变为 5.0.5.RELEASE 即可
将其变为
https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/ip.html
同理 想找该版本的MQTT配置就将 ip换为mqtt即可
https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/mqtt.html

回归正题

服务器接收不到消息是因为

TcpNetServerConnectionFactory  serverCf = new TcpNetServerConnectionFactory(port); 

默认使用的是 ByteArrayCrlfSerializer 反序列方法 目的是将字节数组转换为字节流,后跟回车符和换行符 (\r\n)。
就是说发送的结束符是以回车符和换行符 (\r\n)为结尾的 才被认为是发送完毕

我找好久的原因 发现 client 关闭应用程序的时候 客户端才收到消息(这种问题发生在使用的调试工具的时候 如果用spring-integration配置的客户端是没有这个问题的)

查看ByteArrayCrLfSerializer的源码 发现

   if (n > 0 && bite == 10 && buffer[n - 1] == 13) {
                    return n - 1;
                }

10 一个就是换行符 13 应该是回车

 @Test
public void test(){

    byte[] a = new byte[3];
    a[0]=(byte)10;
    a[1]=(byte)13;

    String s1=new String(a);
    log.error("123");
    log.error(s1);
    log.error("123");

}

14:56:11.667 [main] ERROR com.qhdsx.apollo.module.msgcenter.util.DLT645Utils - 123
14:56:11.667 [main] ERROR com.qhdsx.apollo.module.msgcenter.util.DLT645Utils - 
 
14:56:11.667 [main] ERROR com.qhdsx.apollo.module.msgcenter.util.DLT645Utils - 123

我的调试工具应该是不在发送的最后 填写回车和换行的
所以收不到消息

下面是ByteArrayCrLfSerializer的源码

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.springframework.integration.ip.tcp.serializer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class ByteArrayCrLfSerializer extends AbstractPooledBufferByteArraySerializer {
    public static final ByteArrayCrLfSerializer INSTANCE = new ByteArrayCrLfSerializer();
    private static final byte[] CRLF = "\r\n".getBytes();

    public ByteArrayCrLfSerializer() {
    }

    public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
        int n = this.fillToCrLf(inputStream, buffer);
        return this.copyToSizedArray(buffer, n);
    }

    public int fillToCrLf(InputStream inputStream, byte[] buffer) throws IOException {
        int n = 0;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Available to read: " + inputStream.available());
        }

        try {
            do {
                int bite = inputStream.read();
                if (bite < 0 && n == 0) {
                    throw new SoftEndOfStreamException("Stream closed between payloads");
                }

                this.checkClosure(bite);
                if (n > 0 && bite == 10 && buffer[n - 1] == 13) {
                    return n - 1;
                }

                buffer[n++] = (byte)bite;
            } while(n < this.maxMessageSize);

            throw new IOException("CRLF not found before max message length: " + this.maxMessageSize);
        } catch (SoftEndOfStreamException var6) {
            throw var6;
        } catch (IOException var7) {
            this.publishEvent(var7, buffer, n);
            throw var7;
        } catch (RuntimeException var8) {
            this.publishEvent(var8, buffer, n);
            throw var8;
        }
    }

    public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
        outputStream.write(bytes);
        outputStream.write(CRLF);
    }
}

我们将其改进一下 自定义一下

package com.qhdsx.apollo.module.msgcenter.config;


import java.io.*;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.ip.tcp.serializer.AbstractPooledBufferByteArraySerializer;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;

/**
 * <h4>ace-apollo</h4>
 * <p>序列化备用</p>
 *
 * @author : hc
 * @date : 2022-03-23 09:46
 **/
@Slf4j
public class CustomSerializerDeserializer  extends AbstractPooledBufferByteArraySerializer{

    @Override
    public void serialize(byte[] message, OutputStream outputStream) throws IOException {
        log.info("Serializing {}", new String(message, StandardCharsets.UTF_8));
        outputStream.write(message);
        outputStream.flush();
    }

    @Override
    protected byte[] doDeserialize(InputStream inputStream,byte[] bytes) throws IOException{
        int n = this.fillToCrLf(inputStream, bytes);
        return this.copyToSizedArray(bytes, n);
    }

    public int fillToCrLf(InputStream inputStream, byte[] buffer) throws IOException {
        int n = 0;
        int count = 0;
        // 计算数量
        while (count == 0) {
            count = inputStream.available();
        }
        try {
            do {
                int bite = inputStream.read();
                if (bite < 0 && n == 0) {
                    throw new SoftEndOfStreamException("Stream closed between payloads");
                }

                this.checkClosure(bite);
                buffer[n++] = (byte)bite;
                // 数量相等就认为是接收完毕了
                if (n > 0&&bite>0&&n==count) {
                    return n ;
                }

            } while(n < this.maxMessageSize);

            throw new IOException("CRLF not found before max message length: " + this.maxMessageSize);
        } catch (SoftEndOfStreamException var6) {
            throw var6;
        } catch (IOException var7) {
            this.publishEvent(var7, buffer, n);
            throw var7;
        } catch (RuntimeException var8) {
            this.publishEvent(var8, buffer, n);
            throw var8;
        }
    }
}
 public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
  @Bean
    public AbstractServerConnectionFactory serverCF() {
        TcpNetServerConnectionFactory  serverCf = new TcpNetServerConnectionFactory(port);
        //serverCf.setDeserializer(new ByteArraySingleTerminatorSerializer((byte)0x16));
        //默认情况下,对入站数据包执行反向 DNS 查找,以将 IP 地址转换为主机名,以便在消息头中使用。在未配置 DNS 的环境中,这可能会导致连接延迟。lookup-host您可以通过将属性设置为 来覆盖此默认行为false
        // 不进行反向的dns 解析 https://docs.spring.io/spring-integration/reference/html/ip.html#overview
        serverCf.setLookupHost(false);
		//设置 自定义序列化 
        serverCf.setDeserializer(SERIALIZER);
        return serverCf;
    }

这样就能解决接收不到消息的问题了

关于 消息过滤器 好像没啥用

   @Bean
    public MessageChannel serverIn() {		//入站通道
        DirectChannel directChannel = new DirectChannel();
        directChannel.addInterceptor(new TcpChannelInterptor());
        return directChannel;
    }

在定义入站通道的时候 加入Interceptor

package com.qhdsx.apollo.module.msgcenter.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;

/**
 * <h4>ace-apollo</h4>
 * <p>自定义的通道拦截器</p>
 *  展示不用  用到可以在这配置
 * @author : hc
 * @date : 2022-01-04 14:37
 **/
@Slf4j
public class TcpChannelInterptor implements ChannelInterceptor{
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        log.error("preSend message:{}, channel:{}", message.toString(), channel.toString());
        return message;
    }

    @Override
    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
        log.error("postSend message:{}, channel:{}, sent:{}", message.toString(), channel.toString(), sent);
    }

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        log.error("afterSendCompletion message:{}, channel:{}, sent:{}, ex:{}", message.toString(), channel.toString(), sent, ex);
    }

    @Override
    public boolean preReceive(MessageChannel channel) {
        log.error("preReceive channel:{}", channel.toString());
        return false;
    }

    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel channel) {
        log.error("postReceive Completion message:{}, channel:{}", message.toString(), channel.toString());
        return message;
    }

    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
        log.error("afterReceiveCompletion=====> message:{}, channel:{}, ex:{}", message.toString(), channel.toString(), ex);
    }
}

服务器完整配置:

package com.qhdsx.apollo.module.msgcenter.config;

import com.qhdsx.apollo.module.msgcenter.listener.TcpChannelInterptor;
import com.qhdsx.apollo.module.msgcenter.listener.TcpInboundListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.*;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.*;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;

/**
 * <h4>ace-apollo</h4>
 * <p>tcp 服务端   无需应答版本</p>
 *
 * @author : hc
 * @date : 2022-03-21 10:25
 **/
@EnableIntegration
@Configuration
@Slf4j
public class TcpServerConfig  implements ApplicationListener<ApplicationEvent>{

    private final TcpServerGateway tcpServerGateway;

    public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
    @Value("${tcp_port}")
    private int port;
    @Resource
    private TcpInboundListener tcpInboundListener;


    public TcpServerConfig(TcpServerGateway tcpServerGateway) {		//注入网关,用于发送消息
        this.tcpServerGateway = tcpServerGateway;
    }


    @Bean
    public MessageChannel serverIn() {		//入站通道
        DirectChannel directChannel = new DirectChannel();
        directChannel.addInterceptor(new TcpChannelInterptor());
        return directChannel;
    }

    @Bean
    public AbstractServerConnectionFactory serverCF() {
        TcpNetServerConnectionFactory  serverCf = new TcpNetServerConnectionFactory(port);
        //serverCf.setDeserializer(new ByteArraySingleTerminatorSerializer((byte)0x16));
        //默认情况下,对入站数据包执行反向 DNS 查找,以将 IP 地址转换为主机名,以便在消息头中使用。在未配置 DNS 的环境中,这可能会导致连接延迟。lookup-host您可以通过将属性设置为 来覆盖此默认行为false
        // 不进行反向的dns 解析 https://docs.spring.io/spring-integration/reference/html/ip.html#overview
        serverCf.setLookupHost(false);

        serverCf.setDeserializer(SERIALIZER);
        return serverCf;
    }

    @Bean
    public TcpReceivingChannelAdapter tcpInAdapter(AbstractServerConnectionFactory connectionFactory) {		//入站适配器
        TcpReceivingChannelAdapter inGate = new TcpReceivingChannelAdapter();
        inGate.setConnectionFactory(serverCF());
        inGate.setOutputChannelName("serverIn");
        //inGate.start();
        return inGate;
    }

    @ServiceActivator(inputChannel = "serverIn")		//消息处理器
    public void upCase(Message<byte[]> in) {
        //不直接用String或byte[]的原因是消息头的ip_connectionId字段,用来表明应答由哪个连接发送
        log.error(in.getHeaders().get("ip_connectionId") + "=" + new String(in.getPayload()));
        String flag=tcpInboundListener.handleMessage(in);
        //选择性回复 自定义
        if(StringUtils.isEmpty(flag)){
            //同样简单逻辑:转成大写应答
            Message<byte[]> reply = MessageBuilder.createMessage(in.getPayload(), in.getHeaders());
            tcpServerGateway.send(reply);
        }
    }

    @Bean
    public MessageChannel serverOut() {
        //出站通道
        DirectChannel directChannel = new DirectChannel();
        directChannel.addInterceptor(new TcpChannelInterptor());
        return directChannel;
    }

    @Bean
    @ServiceActivator(inputChannel = "serverOut")
    public MessageHandler tcpOutAdapter(AbstractServerConnectionFactory connectionFactory) {
        //出站适配器
        TcpSendingMessageHandler outGate = new TcpSendingMessageHandler();
        outGate.setConnectionFactory(connectionFactory);
        return outGate;
    }
    //@MessageEndpoint
    //public static class Echo {
    //
    //    @Transformer(inputChannel = "serverIn", outputChannel = "toEcho")
    //    public String convert(byte[] bytes) {
    //        return new String(bytes);
    //    }
    //
    //    @ServiceActivator(inputChannel = "toEcho")
    //    public String upCase(String in) {
    //        log.error("=================");
    //        log.error(in);
    //        log.error("=================");
    //        return in.toUpperCase();
    //    }
    //    //
    //    //@Transformer(inputChannel = "resultToString")
    //    //public String convertResult(byte[] bytes) {
    //    //    return new String(bytes);
    //    //}
    //
    //}

    /**
     * 出站消息网关
     */
    @MessagingGateway(defaultRequestChannel = "serverOut")
    public interface TcpServerGateway {

        //void send(@Payload String data,@Header(IpHeaders.CONNECTION_ID) String connectionId);
        void send(Message<byte[]> out);

        //void send(String out);
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event){
        // 连接事件
        if(event instanceof TcpConnectionOpenEvent){
            TcpConnection connection = (TcpConnection) event.getSource();
            log.error("############started################");
            log.error("连接的成功        ip_connectionId======>{}", connection.getConnectionId());
            log.error("连接的成功        端口为======>{}", connection.getPort());
            log.error("连接的成功        host为======>{}", connection.getHostAddress());
            log.error("连接的成功        hostname为======>{}", connection.getHostName());
            log.error("############end################");
        }
        // 序列化异常事件
        if(event instanceof TcpDeserializationExceptionEvent){

            byte[] buffer=((TcpDeserializationExceptionEvent)event).getBuffer();
            String message=((TcpDeserializationExceptionEvent)event).getCause().getMessage();
            log.error("############序列化################");
            log.error("异常消息为        {}", (message));
            log.error("序列化为        {}", new String(buffer));
        }
        // 关闭链接事件
        if(event instanceof TcpConnectionCloseEvent){
            String connectionId=((TcpConnectionCloseEvent)event).getConnectionId();
            log.error("断开连接、======>{}",connectionId);

    }
}

个人理解 不对的地方还望大佬指正

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-31 23:49:43  更:2022-03-31 23:53:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 6:49:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码