IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> UDP传输文件,netty实现 -> 正文阅读

[网络协议]UDP传输文件,netty实现

UDP传输文件

  • 由于公司项目上线之际,甲方装了网闸只能使用udp传输数据,并开放固定端口,之前在网上找了几个demo都是关于netty实现tcp传输文件,后去netty官网查看关于UDP的demo,但是案例很少,遂自己模仿着tcp的实现方案,实现了一个udp传输文件的方案。都是自己摸爬滚打的研究出来的方案,不尽完善,希望各位高人不吝赐教。虽然实现了此功能,但还有几点困惑,
  1. UDP为何不能像TCP传输那样,自定义编解码器,做对象的处理工作,本文中UDP的序列化是采用protostuff序列化之后传到服务端,服务端采用这种方式反序列化,没有像tcp的object序列化工具。

  2. UDP最大传输字节是否为65535,那么ChannelOption.SO_SNDBUF, 20 * 1024 *1024设置的缓存区感觉完全无效。不太理解这个缓存区的意义。

  3. protostuff无法序列化map和反序列化list,其中原因不太理解,目前如果不能序列化的方案就是采用一个类包装list或者map实现序列化和反序列化。

  4. 目前项目上因为有多个业务,并且物联网的项目数据量较大,因为UDP包大小的限制,需要多个包才能将文件传输完毕。所以采用多个业务分了多个端口传输,别的服务是如何一个端口做到多个业务共享。目前我部分业务因为数据包较小可以一次传输完毕,所以在业务中采用类型做区分,包特别大的就采用端口做区分。不知是否合理或者有没有更好的方案。

    FileUploadClient.java

package com.file.udp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.io.File;

//文件上传客户端
public class FileUploadClient {

	public void connect(int port, String host,final FileUploadFile fileUploadFile) throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioDatagramChannel.class)
			.option(ChannelOption.SO_BROADCAST, true)
			.option(ChannelOption.SO_SNDBUF, 20 * 1024 * 1024)
					//配置接收缓冲区
					//设置udp接收的字符长度可以超过2048
			.handler(new ChannelInitializer<NioDatagramChannel>() {
				@Override
				protected void initChannel(NioDatagramChannel ch) throws Exception {
					ch.pipeline().addLast(new FileUploadClientHandler(fileUploadFile));
				}
			});
			ChannelFuture f = b.bind(0).sync();
			f.channel().closeFuture().sync();
			System.out.println("FileUploadClient connect()结束");
		} finally {
			group.shutdownGracefully();
		}
	}

	public static void main(String[] args) {
		int port = FILE_PORT;
		if (args != null && args.length > 0) {
			try {
				port = Integer.valueOf(args[0]);
			} catch (NumberFormatException e) {
				e.printStackTrace();
			}
		}
		try {
			FileUploadFile uploadFile = new FileUploadFile();
//			File file = new File("d:/source.rar");// d:/source.rar,D:/2014work/apache-maven-3.5.0-bin.tar.gz
			File file = new File("d:/123.mp4");
			String fileMd5 = file.getName();// 文件名
			uploadFile.setFile(file);
			uploadFile.setFile_md5(fileMd5);
			uploadFile.setStarPos(0);// 文件开始位置
			new FileUploadClient().connect(port, "127.0.0.1", uploadFile);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static final int FILE_PORT = 9991;
}


FileUploadClientHanler.java

package com.file.udp;

import com.netty.codec.ProtostuffUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.DatagramPacket;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;


public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
	private int byteRead;
	private volatile int start = 0;
	private volatile int lastLength = 0;
	public RandomAccessFile randomAccessFile;
	private FileUploadFile fileUploadFile;


	public FileUploadClientHandler(FileUploadFile ef) {
		if (ef.getFile().exists()) {
			if (!ef.getFile().isFile()) {
				System.out.println("Not a file :" + ef.getFile());
				return;
			}
		}
		this.fileUploadFile = ef;
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		// TODO Auto-generated method stub
		super.channelInactive(ctx);
		System.out.println("客户端结束传递文件channelInactive()");
	}

	public void channelActive(ChannelHandlerContext ctx) {
		System.out.println("客户端channelActive()");
		try {
			randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),
					"r");
			//文件分几次发送
			long length = (int) randomAccessFile.length() / (10*1024);
			System.out.println("一共读取次数="+length+1);
			for (int i = 0; i < length+1; i++) {
				randomAccessFile.seek(start);
				lastLength = 1024 * 10;
				int a = (int) (randomAccessFile.length() - start);
				if (a < lastLength) {
					lastLength = a;
				}
				byte[] bytes = new byte[lastLength];
				if ((byteRead = randomAccessFile.read(bytes)) != -1) {
					start = start + lastLength;
					fileUploadFile.setEndPos(byteRead);
					fileUploadFile.setBytes(bytes);
					byte[] object = ProtostuffUtil.serializer(fileUploadFile);
					ByteBuf buf = Unpooled.copiedBuffer(object);
					ctx.writeAndFlush(new DatagramPacket(buf,
							new InetSocketAddress("127.0.0.1", 8000)));
				} else {

				}
				System.out.println("其实字节的值start为"+start);
				System.out.println("channelActive()文件已经读完 "+byteRead);
			}

		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException i) {
			i.printStackTrace();
		}
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
	
	}

	

	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}


FileUploadServer.java

package com.file.udp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

文件上传服务端
public class FileUploadServer {
    public void bind(int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group( workerGroup).
                    channel(NioDatagramChannel.class).
                    option(ChannelOption.SO_BROADCAST, true)
                    .option(ChannelOption.SO_RCVBUF, 20 * 1024 * 1024)
                    //设置udp接收的字符长度可以超过2048 每次读取缓冲区最大长度
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535)).
                    handler(new ChannelInitializer<NioDatagramChannel>() {

                @Override
                protected void initChannel(NioDatagramChannel ch) throws Exception {
                    ch.pipeline().addLast(new FileUploadServerHandler());
                }
            });
            ChannelFuture f = b.bind(port).sync();
            ChannelFuture f2 = b.bind(8000).sync();
            System.out.println("file server  等待连接");
            f.channel().closeFuture().sync().wait();
            f2.channel().closeFuture().sync().wait();
            System.out.println("file end");
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port =  FILE_PORT;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        try {
            new FileUploadServer().bind(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static final int FILE_PORT = 9991;
}

FileUploadServerHandler.java

package com.zhaoxiang.file.udp;

import com.tuling.netty.codec.ProtostuffUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;

import java.io.File;
import java.io.RandomAccessFile;

public class FileUploadServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
	private int byteRead;
    private volatile int start = 0;
    private String file_dir = "D:/123";
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	// TODO Auto-generated method stub
    	super.channelActive(ctx);
        System.out.println("服务端channelActive()");
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	// TODO Auto-generated method stub
    	super.channelInactive(ctx);
        System.out.println("服务端channelInactive()");
    	ctx.flush();
    	ctx.close();
    }
    
    
    @Override
    public void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception {
        System.out.println("服务端读数据==");
        ByteBuf byteBuf = datagramPacket.content();
        byte[] bytes1 = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes1);
        // 在这里转换为实体类
        Object fileUploadFile = ProtostuffUtil.deserializer(bytes1, FileUploadFile.class);
        if (fileUploadFile instanceof FileUploadFile) {
            FileUploadFile ef = (FileUploadFile) fileUploadFile;
            byte[] bytes = ef.getBytes();
            byteRead = ef.getEndPos();
            String md5 = ef.getFile_md5();//文件名
            String path = file_dir + File.separator + md5;
            File file = new File(path);
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            randomAccessFile.seek(start);
            System.out.println("服务端start的值为"+start);
            randomAccessFile.write(bytes);
            start = start + byteRead;
            System.out.println("path:"+path+","+byteRead);
            if (byteRead > 0) {
                randomAccessFile.close();
            } else {
                ctx.close();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
        System.out.println("FileUploadServerHandler--exceptionCaught()");
    }
}

FileUploadFile.java

package com.zhaoxiang.file.udp;

import java.io.File;
import java.io.Serializable;
import java.util.Arrays;

public class FileUploadFile implements Serializable {
	private static final long serialVersionUID = 1L;
	private File file;// 文件
	private String file_md5;//文件名
	private int starPos;// 开始位置
	private byte[] bytes;// 开始位置
	private int endPos;// 结尾位置
	public File getFile() {
		return file;
	}
	public void setFile(File file) {
		this.file = file;
	}
	public String getFile_md5() {
		return file_md5;
	}
	public void setFile_md5(String file_md5) {
		this.file_md5 = file_md5;
	}
	public int getStarPos() {
		return starPos;
	}
	public void setStarPos(int starPos) {
		this.starPos = starPos;
	}
	public byte[] getBytes() {
		return bytes;
	}
	public void setBytes(byte[] bytes) {
		this.bytes = bytes;
	}
	public int getEndPos() {
		return endPos;
	}
	public void setEndPos(int endPos) {
		this.endPos = endPos;
	}
	public static long getSerialversionuid() {
		return serialVersionUID;
	}

	@Override
	public String toString() {
		return "FileUploadFile{" +
				"file=" + file +
				", file_md5='" + file_md5 + '\'' +
				", starPos=" + starPos +
				", bytes=" + Arrays.toString(bytes) +
				", endPos=" + endPos +
				'}';
	}
}

ProtostuffUtil .java

package com.netty.codec;

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * protostuff 序列化工具类,基于protobuf封装
 */
public class ProtostuffUtil {

    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();

    private static <T> Schema<T> getSchema(Class<T> clazz) {
        @SuppressWarnings("unchecked")
        Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) {
                cachedSchema.put(clazz, schema);
            }
        }
        return schema;
    }

    /**
     * 序列化
     *
     * @param obj
     * @return
     */
    public static <T> byte[] serializer(T obj) {
        @SuppressWarnings("unchecked")
        Class<T> clazz = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(clazz);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化
     *
     * @param data
     * @param clazz
     * @return
     */
    public static <T> T deserializer(byte[] data, Class<T> clazz) {
        try {
            T obj = clazz.newInstance();
            Schema<T> schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    public static void main(String[] args) {
        byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhuge"));
        User user = ProtostuffUtil.deserializer(userBytes, User.class);
        System.out.println(user);
    }
}
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-11-30 15:58:44  更:2021-11-30 16:00:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/6 21:19:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码