IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 使用netty发送UDP信息和接收UDP信息 -> 正文阅读

[网络协议]使用netty发送UDP信息和接收UDP信息

作者:token annotation punctuation

前言

网上找到的netty发送UDP报文多为收到消息后被动发送,没有自定义报文主动推送的,经过几番研究发现一种写法可以实现主动发送,如有错误或不是好的写法请不吝赐教。
另外,我衍生出指定用本机的哪个IP发送UDP的功能。


以下是本篇文章正文内容,下面案例可供参考

一、UDP的接收

废话不多说,直接上代码

import com.cdwx.central.config.SocketPropertiesConfig;
import com.cdwx.central.modem.receiveHandler.AirHandler;
import com.cdwx.central.modem.receiveHandler.RemoteHandler;
import com.cdwx.central.modem.receiveHandler.StationHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: xiongz
 * @time: 2021/8/20 10:55
 */
@Slf4j
@Component
public class InitRunner implements ApplicationRunner {

    private final SocketPropertiesConfig portConfig;
    private final AirHandler airHandler;
    private final StationHandler stationHandler;

    public InitRunner(SocketPropertiesConfig portConfig,
                      AirHandler airHandler,
                      StationHandler stationHandler) {
        this.portConfig = portConfig;
        this.airHandler = airHandler;
        this.stationHandler = stationHandler;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        this.startUdp();
    }

    private void startUdp(){
        //用线程池开启异步
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(30));
        executor.execute(() -> createReceiver(portConfig.getAirReceivePort(),airHandler));
        executor.execute(() -> createReceiver(portConfig.getStationReceivePort(),stationHandler));
        log.info("[系统信息]-异步UDP信息接收器创建成功!");
    }

    /**
     * 创建报文接收器
     */
    private void createReceiver(Integer port,ChannelInboundHandlerAdapter handler) {
        try {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            //由于我们用的是UDP协议,所以要用NioDatagramChannel来创建
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)//支持广播
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //设置处理handler.执行具体处理方法
                            pipeline.addLast(handler);
                        }
                    });
            b.bind(port).sync().channel().closeFuture().await();
        } catch (InterruptedException e) {
            e.printStackTrace();
            if(handler instanceof AirHandler){
                log.error("创建AIR-UDP接收器出错" + e.getMessage());
            }else if(handler instanceof StationHandler){
                log.error("创建STATION-UDP接收器出错" + e.getMessage());
            }
        }
    }

}

上面代码定义的两个handler 都是大同小异,贴上handler代码。

import com.cdwx.central.common.utils.ByteUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @description: AIR-UDP信息处理器
 * @author: xiongz
 * @time: 2021/8/20 11:31
 */
@Slf4j
@Component
public class AirHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    private final Executor executor;

    {
        executor = new ThreadPoolExecutor(10,15,5,
                TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(30),((r, executor) -> {
            log.warn("AIR广播异步线程池已满![{}]线程拒绝执行[{}]任务",executor.toString(),r.toString());
        }));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
        String ip = packet.sender().getAddress().getHostAddress();
        int port = ((InetSocketAddress)ctx.channel().localAddress()).getPort();
        ByteBuf bytes = packet.copy().content();
        byte[] buf = new byte[bytes.readableBytes()];
        int readerIndex = bytes.readerIndex();
        bytes.getBytes(readerIndex, buf);
        log.info("[收到AIR信息]-收到{}:{}的信令[{}]",ip,port, ByteUtils.getHexFromByteArray(buf));
        //线程池异步处理信令
        executor.execute(() -> signalingAnalysis(buf));
    }

    /**
     * 信令解析器
     */
    private void signalingAnalysis(byte[] buf){

    }

}

接收到此为止

二、UDP的发送

同样废话不多说,上代码

import com.cdwx.central.common.utils.ByteUtils;
import com.cdwx.central.config.SocketPropertiesConfig;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;

/**
 * @description: AIR 信令发送器
 * @author: xiongz
 * @time: 2021/8/20 15:40
 */
@Slf4j
@Component
public class AirSendHandler {

    private final SocketPropertiesConfig portConfig;

    private Channel channel;

    public AirSendHandler(SocketPropertiesConfig portConfig) {
        this.portConfig = portConfig;
    }

    @SneakyThrows
    @PostConstruct
    private void init(){
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        bootstrap.group(workGroup).channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>(){
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                    }
                });
        channel = bootstrap.bind(0).sync().channel();
    }

    public void sendMsg(String ip, int port, byte[] buf){
        InetSocketAddress socketAddress = new InetSocketAddress(ip, port);
        ByteBuf buffer = new UnpooledByteBufAllocator(false).buffer();
        buffer.writeBytes(buf);
        DatagramPacket packet = new DatagramPacket(buffer, socketAddress);
        channel.writeAndFlush(packet);
        log.info("[发送AIR信息]-往{}:{}地址发送信令[{}]",ip,port, ByteUtils.getHexFromByteArray(buf));
    }

    public void sendMsg(byte[] buf){
        InetSocketAddress socketAddress = new InetSocketAddress(portConfig.getNetManagerIp(),portConfig.getAirSendPort());
        ByteBuf buffer = new UnpooledByteBufAllocator(false).buffer();
        buffer.writeBytes(buf);
        DatagramPacket packet = new DatagramPacket(buffer, socketAddress);
        channel.writeAndFlush(packet);
        log.info("[发送AIR信息]-往{}:{}地址发送信令[{}]",portConfig.getNetManagerIp(),portConfig.getAirSendPort(),
                ByteUtils.getHexFromByteArray(buf));
    }

}

在其它代码中的调用只需要 注入这个Handler类

	private final AirSendHandler airSendHandler;
    private final StationSendHandler stationSendHandler;

    public StationHandler(AirSendHandler airSendHandler,
                          StationSendHandler stationSendHandler) {
        this.airSendHandler = airSendHandler;
        this.stationSendHandler = stationSendHandler;
    }

或者自动注入

	@Autowired
	private AirSendHandler airSendHandler;
	@Autowired
    private StationSendHandler stationSendHandler;

调用

	private void yourMethod(byte[] buf){
		//指定ip,和端口
		airSendHandler.sendMsg(ip,port,buf);
	}

三、指定本机IP来执行UDP的发送

与发送一致,只是创建channel对象的时候循环创建并存入MAP
上代码

import com.cdwx.central.common.utils.ByteUtils;
import com.cdwx.central.config.SocketPropertiesConfig;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description: station信令发送器
 * @author: xiongz
 * @time: 2021/8/20 15:40
 */
@Slf4j
@Component
public class StationSendHandler {

    private final SocketPropertiesConfig portConfig;

    public StationSendHandler(SocketPropertiesConfig portConfig) {
        this.portConfig = portConfig;
    }

    private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();

    @SneakyThrows
    @PostConstruct
    private void init(){
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        bootstrap.group(workGroup).channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>(){
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                    }
                });
        List<String> ips = getAllIp();
        for (String ip : ips) {
            Channel channel = bootstrap.bind(ip,0).sync().channel();
            channelMap.put(ip,channel);
        }
    }

    /**
     * 获取本机所有ip
     * @return ip集合
     */
    @SneakyThrows
    private List<String> getAllIp(){
        ArrayList<String> ips = new ArrayList<>();
        Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
        InetAddress ip;
        while (allNetInterfaces.hasMoreElements()) {
            NetworkInterface netInterface = allNetInterfaces.nextElement();
            Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
            while (addresses.hasMoreElements()) {
                ip = addresses.nextElement();
                if (ip instanceof Inet4Address) {
                    ips.add(ip.getHostAddress());
                }
            }
        }
        return ips;
    }

    public void sendMsg(String ip, int port, byte[] buf,Channel channel){
        InetSocketAddress socketAddress = new InetSocketAddress(ip, port);
        ByteBuf buffer = new UnpooledByteBufAllocator(false).buffer();
        buffer.writeBytes(buf);
        DatagramPacket packet = new DatagramPacket(buffer, socketAddress);
        channel.writeAndFlush(packet);
        log.info("[发送站控信息]-使用IP<{}>,往<{}:{}>地址发送信令[{}]",((InetSocketAddress)channel.localAddress()).getHostString(),ip,port, ByteUtils.getHexFromByteArray(buf));
    }

    public void sendMsg(byte[] buf,Channel channel){
        InetSocketAddress socketAddress = new InetSocketAddress(portConfig.getNetManagerIp(), portConfig.getStationSendPort());
        ByteBuf buffer = new UnpooledByteBufAllocator(false).buffer();
        buffer.writeBytes(buf);
        DatagramPacket packet = new DatagramPacket(buffer, socketAddress);
        channel.writeAndFlush(packet);
        log.info("[发送站控信息]-使用IP<{}>,往<{}:{}>地址发送信令[{}]",((InetSocketAddress)channel.localAddress()).getHostString(),
                portConfig.getNetManagerIp(),portConfig.getStationSendPort(),
                ByteUtils.getHexFromByteArray(buf));
    }

    /**
     * 你自己的方法
     */
    public void yourMethod(){
        Channel channel = channelMap.get("192.168.0.11");
        byte[] bytes = "艰难苦恨繁霜鬓,潦倒新停浊酒杯".getBytes();
        sendMsg("192.168.88.88",7100,bytes,channel);

    }
}

四、总结

以上就是接收和发送的方法啦,如果有哪儿不对请指正。或者有更优的方法请不吝赐教。

欢迎三连哦。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-08-22 13:50:08  更:2021-08-22 13:52:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 6:32:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计