准备工作
如果没有现成的硬件设备可以通过下载网络调试助手进行模拟。
- 填写好远程主机地址,以及netty服务端口。
编写netty服务端
@Component
public class NettyServer {
@Value("${netty.port}")
private int port;
public static NioServerSocketChannel nioServerSocketChannel;
@Autowired
private ServerHandler serverHandler;
public void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 4 * 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(
new DecoderHandler()
, new StringEncoder(StandardCharsets.UTF_8)
, new StringDecoder(StandardCharsets.UTF_8)
, serverHandler);
}
});
ChannelFuture future;
try {
log.info("netty服务器在[{}]端口启动监听", port);
future = bootstrap.bind(port).sync();
if (future.isSuccess()) {
nioServerSocketChannel = (NioServerSocketChannel) future.channel();
log.info("netty服务开启成功" + nioServerSocketChannel);
} else {
log.info("netty服务开启失败");
}
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
编写服务处理器
@Component
@ChannelHandler.Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private final Map<String, ChannelHandlerContext> channelMap = new ConcurrentHashMap<>();
private final Map<ChannelHandlerContext, String> mark = new ConcurrentHashMap<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
log.info("new gateway socket from {}", channel.remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String s = msg.toString();
String message = s.substring(1, s.length() - 1);
System.out.println("gateway info : " + message);
String code = message.substring(0, 4);
boolean containsKey = channelMap.containsKey(code);
if (!containsKey) {
channelMap.put(code, ctx);
mark.put(ctx, code);
}
ctx.flush();
}
public void send(String code, String msg) {
if (channelMap.containsKey(code)) {
ChannelHandlerContext handlerContext = channelMap.get(code);
handlerContext.writeAndFlush(msg);
} else {
System.out.println("-------设备已经断开连接-------");
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("netty客户端与服务端连接关闭..." + ctx);
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
boolean containsKey = mark.containsKey(ctx);
if (containsKey) {
String code = mark.get(ctx);
channelMap.remove(code, ctx);
mark.remove(ctx);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
log.info("信息接收完毕...");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
System.out.println("异常信息:rn " + cause.getMessage());
}
}
编写解析器
public class DecoderHandler extends ByteToMessageDecoder {
private static Map<ChannelHandlerContext, String> msgBufMap = new ConcurrentHashMap<>();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte[] data = new byte[in.readableBytes()];
in.readBytes(data);
String msg = new String(data, StandardCharsets.UTF_8);
if (msg.startsWith("#")) {
if (msg.endsWith("#")) {
out.add(msg);
} else {
msgBufMap.put(ctx, msg);
}
} else if (msg.endsWith("#") && msgBufMap.containsKey(ctx)) {
msg = msgBufMap.get(ctx) + msg.split("#")[0];
out.add(msg);
msgBufMap.remove(ctx);
}
}
}
启动Netty服务
在主线程启动后,调用start启动netty服务
public class InitNettyServer implements CommandLineRunner {
@Autowired
private NettyServer nettyServer;
@Autowired
public void setNettyServer(NettyServer nettyServer) {
this.nettyServer = nettyServer;
}
@Override
public void run(String... args) throws Exception {
nettyServer.start();
}
}
配置文件application.yml
# 应用名称
spring:
application:
name: netty
# 应用服务 WEB 访问端口
server:
port: 8010
netty:
port: 7002
所需要依赖pom
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<netty-all-version>4.1.65.Final</netty-all-version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-all-version}</version>
</dependency>
<!-- 解码and编码器 -->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<dependency>
<groupId>org.example</groupId>
<artifactId>netty</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
编写测试程序
@RestController
public class SendTest {
@Autowired
private ServerHandler serverHandler;
@GetMapping("/send/{code}")
public String send(@PathVariable(value = "code") String code) {
serverHandler.send(code,"123456789");
return "o";
}
}
这里我采用了api方式调用测试主动发送是否成功
启动程序
@EnableAsync
@SpringBootApplication
public class NettyNioApplication {
public static void main(String[] args) {
SpringApplication.run(NettyNioApplication.class, args);
}
}
上图显示启动成功
连接设备
连接设备后发送一条数据去绑定设备。一定要先发送信息绑定设备(这里是模拟心跳,重心跳数据中获取对应的设备mac,这样我们就可以通过设备mac去主动向设备发送信息了)。
使用postman模拟控制命令
send后调试助手可以看到模拟的信息123456789
代码参考博客
https://blog.csdn.net/zhangleiyes123/article/details/103871450#comments_20159110
|