UDP传输文件
- 由于公司项目上线之际,甲方装了网闸只能使用udp传输数据,并开放固定端口,之前在网上找了几个demo都是关于netty实现tcp传输文件,后去netty官网查看关于UDP的demo,但是案例很少,遂自己模仿着tcp的实现方案,实现了一个udp传输文件的方案。都是自己摸爬滚打的研究出来的方案,不尽完善,希望各位高人不吝赐教。虽然实现了此功能,但还有几点困惑,
-
UDP为何不能像TCP传输那样,自定义编解码器,做对象的处理工作,本文中UDP的序列化是采用protostuff序列化之后传到服务端,服务端采用这种方式反序列化,没有像tcp的object序列化工具。 -
UDP最大传输字节是否为65535,那么ChannelOption.SO_SNDBUF, 20 * 1024 *1024设置的缓存区感觉完全无效。不太理解这个缓存区的意义。 -
protostuff无法序列化map和反序列化list,其中原因不太理解,目前如果不能序列化的方案就是采用一个类包装list或者map实现序列化和反序列化。 -
目前项目上因为有多个业务,并且物联网的项目数据量较大,因为UDP包大小的限制,需要多个包才能将文件传输完毕。所以采用多个业务分了多个端口传输,别的服务是如何一个端口做到多个业务共享。目前我部分业务因为数据包较小可以一次传输完毕,所以在业务中采用类型做区分,包特别大的就采用端口做区分。不知是否合理或者有没有更好的方案。 FileUploadClient.java
package com.file.udp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.io.File;
public class FileUploadClient {
public void connect(int port, String host,final FileUploadFile fileUploadFile) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_SNDBUF, 20 * 1024 * 1024)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline().addLast(new FileUploadClientHandler(fileUploadFile));
}
});
ChannelFuture f = b.bind(0).sync();
f.channel().closeFuture().sync();
System.out.println("FileUploadClient connect()结束");
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = FILE_PORT;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
try {
FileUploadFile uploadFile = new FileUploadFile();
File file = new File("d:/123.mp4");
String fileMd5 = file.getName();
uploadFile.setFile(file);
uploadFile.setFile_md5(fileMd5);
uploadFile.setStarPos(0);
new FileUploadClient().connect(port, "127.0.0.1", uploadFile);
} catch (Exception e) {
e.printStackTrace();
}
}
public static final int FILE_PORT = 9991;
}
FileUploadClientHanler.java
package com.file.udp;
import com.netty.codec.ProtostuffUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.DatagramPacket;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
private int byteRead;
private volatile int start = 0;
private volatile int lastLength = 0;
public RandomAccessFile randomAccessFile;
private FileUploadFile fileUploadFile;
public FileUploadClientHandler(FileUploadFile ef) {
if (ef.getFile().exists()) {
if (!ef.getFile().isFile()) {
System.out.println("Not a file :" + ef.getFile());
return;
}
}
this.fileUploadFile = ef;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("客户端结束传递文件channelInactive()");
}
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端channelActive()");
try {
randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),
"r");
long length = (int) randomAccessFile.length() / (10*1024);
System.out.println("一共读取次数="+length+1);
for (int i = 0; i < length+1; i++) {
randomAccessFile.seek(start);
lastLength = 1024 * 10;
int a = (int) (randomAccessFile.length() - start);
if (a < lastLength) {
lastLength = a;
}
byte[] bytes = new byte[lastLength];
if ((byteRead = randomAccessFile.read(bytes)) != -1) {
start = start + lastLength;
fileUploadFile.setEndPos(byteRead);
fileUploadFile.setBytes(bytes);
byte[] object = ProtostuffUtil.serializer(fileUploadFile);
ByteBuf buf = Unpooled.copiedBuffer(object);
ctx.writeAndFlush(new DatagramPacket(buf,
new InetSocketAddress("127.0.0.1", 8000)));
} else {
}
System.out.println("其实字节的值start为"+start);
System.out.println("channelActive()文件已经读完 "+byteRead);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException i) {
i.printStackTrace();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
FileUploadServer.java
package com.file.udp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
public class FileUploadServer {
public void bind(int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group( workerGroup).
channel(NioDatagramChannel.class).
option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_RCVBUF, 20 * 1024 * 1024)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535)).
handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline().addLast(new FileUploadServerHandler());
}
});
ChannelFuture f = b.bind(port).sync();
ChannelFuture f2 = b.bind(8000).sync();
System.out.println("file server 等待连接");
f.channel().closeFuture().sync().wait();
f2.channel().closeFuture().sync().wait();
System.out.println("file end");
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = FILE_PORT;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
try {
new FileUploadServer().bind(port);
} catch (Exception e) {
e.printStackTrace();
}
}
public static final int FILE_PORT = 9991;
}
FileUploadServerHandler.java
package com.zhaoxiang.file.udp;
import com.tuling.netty.codec.ProtostuffUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import java.io.File;
import java.io.RandomAccessFile;
public class FileUploadServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private int byteRead;
private volatile int start = 0;
private String file_dir = "D:/123";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("服务端channelActive()");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("服务端channelInactive()");
ctx.flush();
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception {
System.out.println("服务端读数据==");
ByteBuf byteBuf = datagramPacket.content();
byte[] bytes1 = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes1);
Object fileUploadFile = ProtostuffUtil.deserializer(bytes1, FileUploadFile.class);
if (fileUploadFile instanceof FileUploadFile) {
FileUploadFile ef = (FileUploadFile) fileUploadFile;
byte[] bytes = ef.getBytes();
byteRead = ef.getEndPos();
String md5 = ef.getFile_md5();
String path = file_dir + File.separator + md5;
File file = new File(path);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
randomAccessFile.seek(start);
System.out.println("服务端start的值为"+start);
randomAccessFile.write(bytes);
start = start + byteRead;
System.out.println("path:"+path+","+byteRead);
if (byteRead > 0) {
randomAccessFile.close();
} else {
ctx.close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
System.out.println("FileUploadServerHandler--exceptionCaught()");
}
}
FileUploadFile.java
package com.zhaoxiang.file.udp;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
public class FileUploadFile implements Serializable {
private static final long serialVersionUID = 1L;
private File file;
private String file_md5;
private int starPos;
private byte[] bytes;
private int endPos;
public File getFile() {
return file;
}
public void setFile(File file) {
this.file = file;
}
public String getFile_md5() {
return file_md5;
}
public void setFile_md5(String file_md5) {
this.file_md5 = file_md5;
}
public int getStarPos() {
return starPos;
}
public void setStarPos(int starPos) {
this.starPos = starPos;
}
public byte[] getBytes() {
return bytes;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
public int getEndPos() {
return endPos;
}
public void setEndPos(int endPos) {
this.endPos = endPos;
}
public static long getSerialversionuid() {
return serialVersionUID;
}
@Override
public String toString() {
return "FileUploadFile{" +
"file=" + file +
", file_md5='" + file_md5 + '\'' +
", starPos=" + starPos +
", bytes=" + Arrays.toString(bytes) +
", endPos=" + endPos +
'}';
}
}
ProtostuffUtil .java
package com.netty.codec;
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
cachedSchema.put(clazz, schema);
}
}
return schema;
}
public static <T> byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
public static <T> T deserializer(byte[] data, Class<T> clazz) {
try {
T obj = clazz.newInstance();
Schema<T> schema = getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
public static void main(String[] args) {
byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhuge"));
User user = ProtostuffUtil.deserializer(userBytes, User.class);
System.out.println(user);
}
}
|