前言:
比如使用同花顺或者富途等app软件炒股时,可以看到各种股票在某一时刻的数据,比如行情的成交信息(最高价,最低价,开盘价,收盘价)等,盘口信息(各个买卖档位数据)等。以港股为例,可能每秒要有数千条信息发送过来,然后处理展示在app上。因此,要通过各种数据压测,也确定自己的架构能在过程中,保证服务不会挂掉。最终实现的效果是 可以控制每秒钟行情种类数据的发送量,比如 成交 4000/s (条数/单位) 盘口数据 3000/s 挂单数据 3000/s 快照信息2000/s 。
实现方案:
由于在测试时,每次让第三方推送之前的数据比较麻烦,所以就根据第三方推送的日志,读取出来,然后转成json类型的数据,发送给下游,由于要保证每种数据都要按照指定的条数,时间间隔发送,所以每种数据要开启一个线程去发送。
netty服务端代码:
@Service
public class NettyServer {
@PostConstruct
public void init(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(2);
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("\0".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10485760, true, false, delimiter));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start。。");
ChannelFuture cf = bootstrap.bind(9090).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ps:
由于socket传输数据时,会有拆包黏包的问题,所以每条数据以\0结尾,避免出现client出现半条数据的情况
netty client:
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) //设置线程组
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向pipeline加入解码器
ByteBuf delimiter = Unpooled.copiedBuffer("\0".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10485760, true, false, delimiter));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
//加入处理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
Channel channel = channelFuture.channel();
System.out.println("========" + channel.localAddress() + "========");
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
ps:由于客户端只是测试收取数据,所以没有集成到springboot
netty server handler:
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
executorService.execute(()->{
ConvertService service = new TradeConvertImpl();
service.send(ctx, NettyServerConstant.TRADE_PATH,NettyServerConstant.TRADE_NUM,NettyServerConstant.TRADE_TIME);
});
executorService.execute(()->{
ConvertService service = new StockConvertImpl();
service.send(ctx,NettyServerConstant.STOCK_PATH,NettyServerConstant.STOCK_NUM,NettyServerConstant.STOCK_TIME);
});
executorService.execute(()->{
ConvertService service = new BrokerConvertImpl();
service.send(ctx,NettyServerConstant.BROKER_PATH,NettyServerConstant.BROKER_NUM,NettyServerConstant.BROKER_TIME);
});
executorService.execute(()->{
ConvertService service = new OrderConvertImpl();
service.send(ctx, NettyServerConstant.ORDER_PATH, NettyServerConstant.ORDER_NUM, NettyServerConstant.ORDER_TIME);
});
executorService.execute(()->{
OtherConvert.sendOther(ctx, NettyServerConstant.ORDER_NUM, NettyServerConstant.ORDER_TIME);
});
}
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("收到客户端的消息:" + msg);
}
}
client handler:
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) {
System.out.println("接收到服务端数据"+s);
}
}
结尾:
由于数据解析是使用工厂模式来实现,实现类文件较多,所以直接把代码打包上传了,直接可以拿来测试。
代码地址:https://gitee.com/devil_1/netty.git
|