前言
网上找到的netty发送UDP报文多为收到消息后被动发送,没有自定义报文主动推送的,经过几番研究发现一种写法可以实现主动发送,如有错误或不是好的写法请不吝赐教。 另外,我衍生出指定用本机的哪个IP发送UDP的功能。
以下是本篇文章正文内容,下面案例可供参考
一、UDP的接收
废话不多说,直接上代码
import com.cdwx.central.config.SocketPropertiesConfig;
import com.cdwx.central.modem.receiveHandler.AirHandler;
import com.cdwx.central.modem.receiveHandler.RemoteHandler;
import com.cdwx.central.modem.receiveHandler.StationHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class InitRunner implements ApplicationRunner {
private final SocketPropertiesConfig portConfig;
private final AirHandler airHandler;
private final StationHandler stationHandler;
public InitRunner(SocketPropertiesConfig portConfig,
AirHandler airHandler,
StationHandler stationHandler) {
this.portConfig = portConfig;
this.airHandler = airHandler;
this.stationHandler = stationHandler;
}
@Override
public void run(ApplicationArguments args) throws Exception {
this.startUdp();
}
private void startUdp(){
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(30));
executor.execute(() -> createReceiver(portConfig.getAirReceivePort(),airHandler));
executor.execute(() -> createReceiver(portConfig.getStationReceivePort(),stationHandler));
log.info("[系统信息]-异步UDP信息接收器创建成功!");
}
private void createReceiver(Integer port,ChannelInboundHandlerAdapter handler) {
try {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(handler);
}
});
b.bind(port).sync().channel().closeFuture().await();
} catch (InterruptedException e) {
e.printStackTrace();
if(handler instanceof AirHandler){
log.error("创建AIR-UDP接收器出错" + e.getMessage());
}else if(handler instanceof StationHandler){
log.error("创建STATION-UDP接收器出错" + e.getMessage());
}
}
}
}
上面代码定义的两个handler 都是大同小异,贴上handler代码。
import com.cdwx.central.common.utils.ByteUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class AirHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private final Executor executor;
{
executor = new ThreadPoolExecutor(10,15,5,
TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(30),((r, executor) -> {
log.warn("AIR广播异步线程池已满![{}]线程拒绝执行[{}]任务",executor.toString(),r.toString());
}));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String ip = packet.sender().getAddress().getHostAddress();
int port = ((InetSocketAddress)ctx.channel().localAddress()).getPort();
ByteBuf bytes = packet.copy().content();
byte[] buf = new byte[bytes.readableBytes()];
int readerIndex = bytes.readerIndex();
bytes.getBytes(readerIndex, buf);
log.info("[收到AIR信息]-收到{}:{}的信令[{}]",ip,port, ByteUtils.getHexFromByteArray(buf));
executor.execute(() -> signalingAnalysis(buf));
}
private void signalingAnalysis(byte[] buf){
}
}
接收到此为止
二、UDP的发送
同样废话不多说,上代码
import com.cdwx.central.common.utils.ByteUtils;
import com.cdwx.central.config.SocketPropertiesConfig;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
@Slf4j
@Component
public class AirSendHandler {
private final SocketPropertiesConfig portConfig;
private Channel channel;
public AirSendHandler(SocketPropertiesConfig portConfig) {
this.portConfig = portConfig;
}
@SneakyThrows
@PostConstruct
private void init(){
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workGroup = new NioEventLoopGroup();
bootstrap.group(workGroup).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel channel) throws Exception {
}
});
channel = bootstrap.bind(0).sync().channel();
}
public void sendMsg(String ip, int port, byte[] buf){
InetSocketAddress socketAddress = new InetSocketAddress(ip, port);
ByteBuf buffer = new UnpooledByteBufAllocator(false).buffer();
buffer.writeBytes(buf);
DatagramPacket packet = new DatagramPacket(buffer, socketAddress);
channel.writeAndFlush(packet);
log.info("[发送AIR信息]-往{}:{}地址发送信令[{}]",ip,port, ByteUtils.getHexFromByteArray(buf));
}
public void sendMsg(byte[] buf){
InetSocketAddress socketAddress = new InetSocketAddress(portConfig.getNetManagerIp(),portConfig.getAirSendPort());
ByteBuf buffer = new UnpooledByteBufAllocator(false).buffer();
buffer.writeBytes(buf);
DatagramPacket packet = new DatagramPacket(buffer, socketAddress);
channel.writeAndFlush(packet);
log.info("[发送AIR信息]-往{}:{}地址发送信令[{}]",portConfig.getNetManagerIp(),portConfig.getAirSendPort(),
ByteUtils.getHexFromByteArray(buf));
}
}
在其它代码中的调用只需要 注入这个Handler类
private final AirSendHandler airSendHandler;
private final StationSendHandler stationSendHandler;
public StationHandler(AirSendHandler airSendHandler,
StationSendHandler stationSendHandler) {
this.airSendHandler = airSendHandler;
this.stationSendHandler = stationSendHandler;
}
或者自动注入
@Autowired
private AirSendHandler airSendHandler;
@Autowired
private StationSendHandler stationSendHandler;
调用
private void yourMethod(byte[] buf){
airSendHandler.sendMsg(ip,port,buf);
}
三、指定本机IP来执行UDP的发送
与发送一致,只是创建channel对象的时候循环创建并存入MAP 上代码
import com.cdwx.central.common.utils.ByteUtils;
import com.cdwx.central.config.SocketPropertiesConfig;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class StationSendHandler {
private final SocketPropertiesConfig portConfig;
public StationSendHandler(SocketPropertiesConfig portConfig) {
this.portConfig = portConfig;
}
private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();
@SneakyThrows
@PostConstruct
private void init(){
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workGroup = new NioEventLoopGroup();
bootstrap.group(workGroup).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel channel) throws Exception {
}
});
List<String> ips = getAllIp();
for (String ip : ips) {
Channel channel = bootstrap.bind(ip,0).sync().channel();
channelMap.put(ip,channel);
}
}
@SneakyThrows
private List<String> getAllIp(){
ArrayList<String> ips = new ArrayList<>();
Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
InetAddress ip;
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = allNetInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
ip = addresses.nextElement();
if (ip instanceof Inet4Address) {
ips.add(ip.getHostAddress());
}
}
}
return ips;
}
public void sendMsg(String ip, int port, byte[] buf,Channel channel){
InetSocketAddress socketAddress = new InetSocketAddress(ip, port);
ByteBuf buffer = new UnpooledByteBufAllocator(false).buffer();
buffer.writeBytes(buf);
DatagramPacket packet = new DatagramPacket(buffer, socketAddress);
channel.writeAndFlush(packet);
log.info("[发送站控信息]-使用IP<{}>,往<{}:{}>地址发送信令[{}]",((InetSocketAddress)channel.localAddress()).getHostString(),ip,port, ByteUtils.getHexFromByteArray(buf));
}
public void sendMsg(byte[] buf,Channel channel){
InetSocketAddress socketAddress = new InetSocketAddress(portConfig.getNetManagerIp(), portConfig.getStationSendPort());
ByteBuf buffer = new UnpooledByteBufAllocator(false).buffer();
buffer.writeBytes(buf);
DatagramPacket packet = new DatagramPacket(buffer, socketAddress);
channel.writeAndFlush(packet);
log.info("[发送站控信息]-使用IP<{}>,往<{}:{}>地址发送信令[{}]",((InetSocketAddress)channel.localAddress()).getHostString(),
portConfig.getNetManagerIp(),portConfig.getStationSendPort(),
ByteUtils.getHexFromByteArray(buf));
}
public void yourMethod(){
Channel channel = channelMap.get("192.168.0.11");
byte[] bytes = "艰难苦恨繁霜鬓,潦倒新停浊酒杯".getBytes();
sendMsg("192.168.88.88",7100,bytes,channel);
}
}
四、总结
以上就是接收和发送的方法啦,如果有哪儿不对请指正。或者有更优的方法请不吝赐教。
欢迎三连哦。
|