Netty做为java的高性能NIO框架,相比java的BIO(阻塞IO)他的并发性能得到了很大提高,在许多的即时通讯领域都用到了netty。比如音视频通话要求数据传输的实时性比较高,使用Netty框架实现起来比较简单。话不多说,下面通过一个案例来实现视频和音频的传输。
1.首先定义数据的类型
public interface DATA_TYPE {
?? ?byte VIDEO = 0,?
?? ??? ??? ?VIDEO_OK = 1,?
?? ??? ??? ?VIDEO_NO = 2,?
?? ??? ??? ?VIDEO_END = 9,?
?? ??? ??? ?FILE = 3,?
?? ??? ??? ?IMAGE = 4,?
?? ??? ??? ?TEXT = 5,
?? ??? ??? ?AUDIO = 6,
?? ??? ??? ?AUDIO_OK = 7,?
?? ??? ??? ?AUDIO_NO = 8,
?? ??? ??? ?AUDIO_END = 10;
}
2.定义消息实体类
@Data
@Accessors(chain=true)
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Builder
public class Datas implements Serializable {
private static final long serialVersionUID = 1L;
private byte type;
private long data_size;
private int isfen;
private int isend;
private String user_name;
private String file_name;
private byte[] datas;
public Datas(byte type, String user_name, byte[] datas) {
super();
this.type = type;
this.user_name = user_name;
this.datas = datas;
}
}
3.定义Netty服务端消息收发部分,服务端使用了TCP、UDP两种来收发数据
public class Server extends Task<ServerUI> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Rectangle dimension = new Rectangle(Toolkit.getDefaultToolkit().getScreenSize());
private static Robot robots;
private InetSocketAddress inet_socket = new InetSocketAddress("localhost",8084);
static {
try {
robots = new Robot();
} catch (AWTException e) {
e.printStackTrace();
}
}
private Service service = new Service();
private Server_Task messages;
private volatile boolean is_luzhi = false;
private ExecutorService forkJoin;
private Channel udp_channel;
private ChannelHandlerContext tcp_channels;
private EventLoopGroup udp_group = new NioEventLoopGroup();
private EventLoopGroup tcp_group1 = new NioEventLoopGroup();
private EventLoopGroup tcp_group2 = new NioEventLoopGroup();
private ClassResolver classLoader = ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader());
private ByteArrayOutputStream bytess = new ByteArrayOutputStream();
private Logger logger = LoggerFactory.getLogger(STasks.class);
public STasks() {}
public Service getService() {
return service;
}
public ChannelHandlerContext getTcp_channels() {
return tcp_channels;
}
public void setIs_luzhi(boolean is_luzhi) {
this.is_luzhi = is_luzhi;
}
public boolean getIs_luzhi() {
return this.is_luzhi;
}
public void setTcp_channels(ChannelHandlerContext tcp_channels) {
this.tcp_channels = tcp_channels;
}
public void setUdp_channel(Channel udp_channel) {
this.udp_channel = udp_channel;
}
public STasks(Server_Task messages) {
this.messages = messages;
this.forkJoin = messages.getForkJoin();
}
@Override
protected Server_Task call() throws Exception {
stratServerTCP();
stratServerUDP();
return messages;
}
private void send_video(ImageView video_image) {
forkJoin.execute(() -> {
try {
while (true) {
ByteArrayOutputStream byte_out = new ByteArrayOutputStream();
Thumbnails.of(robots.createScreenCapture(dimension))
.scale(0.3f)
.outputQuality(0.3f)
.outputFormat("jpg")
.toOutputStream(byte_out);
byte[] bytes = byte_out.toByteArray();
ByteArrayInputStream inputs = new ByteArrayInputStream(bytes);
video_image.setImage(new Image(inputs));
if (udp_channel != null)udp_channel.writeAndFlush(new DatagramPacket(Unpooled.wrappedBuffer(bytes), inet_socket));
//byte_out.flush();
byte_out.close();
inputs.close();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
private void stratServerTCP() {
forkJoin.execute(() -> {
try {
ServerBootstrap b = new ServerBootstrap();
b.group(tcp_group1, tcp_group2)
.channel(NioServerSocketChannel.class) // 非阻塞模式
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler("DEBUG"))
.addLast(new ObjectDecoder(Integer.MAX_VALUE,classLoader))
.addLast(new ObjectEncoder())
.addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg)
throws Exception {
setTcp_channels(ctx);
exetype(ctx, (Datas) msg);
}
});
}
});
ChannelFuture channel = b.bind("localhost",8082).sync();
System.out.println("stratServerTCP");
channel.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
});
}
private void stratServerUDP() {
forkJoin.execute(() -> {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(udp_group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(65535,65535))
.option(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(65535,65535,65535))
.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR,new DefaultMessageSizeEstimator(65535))
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler("DEBUG"))
.addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)
throws Exception {
ByteBuf bytebuf= msg.content();
int type = bytebuf.readInt();
byte[] bytes = new byte[bytebuf.readableBytes()];
bytebuf.readBytes(bytes);
if(type==0){
//System.out.println("视频");
messages.getVideo_image().setImage(new Image(new ByteBufInputStream(Unpooled.wrappedBuffer(bytes))));
}else if(type==1){
//System.out.println("音频");
if(getIs_luzhi()){
System.out.println("录制");
bytess.write(bytes);
}
SourceDataLine source = service.getLine2();
if(!source.isOpen()){
source.open(service.getAudioFormat());
source.start();
}
source.write(bytes,0,bytes.length);
}
}
});
}
});
ChannelFuture channel = bootstrap.bind("localhost",8083).sync();
System.out.println("stratclientUDP");
setUdp_channel(channel.channel());
channel.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("stratServerUDP");
});
}
private void exetype(ChannelHandlerContext ctx, Datas data) {
logger.info("服务端收到数据............");
Alert alert = messages.getAlertss();
Stage audio_stage = messages.getAudio_stage();
Stage video_stage = messages.getVideo_stage();
try {
switch (data.getType()) {
case DATA_TYPE.VIDEO:
alert.setAlertType(AlertType.CONFIRMATION);
alert.setContentText("用户 " + data.getUser_name() + " 邀请你视频通话是否接受");
Platform.runLater(() -> {
Optional<ButtonType> buttons = alert.showAndWait();
if (buttons.get() == ButtonType.OK) {
video_stage.show();
ctx.writeAndFlush(Datas.builder().user_name(messages.getUser_name()).type(DATA_TYPE.VIDEO_OK).build());
} else
ctx.writeAndFlush(Datas.builder().user_name(messages.getUser_name()).type(DATA_TYPE.VIDEO_NO).build());
});
break;
case DATA_TYPE.VIDEO_END:
Thread thread = service.getVideo_stop();
if(thread!=null && thread.isAlive())thread.stop();
Platform.runLater(()->{
if(video_stage.isShowing())
video_stage.close();
});
alert.setAlertType(AlertType.INFORMATION);
alert.setContentText("通话已经结束");
Platform.runLater(()->alert.showAndWait());
break;
case DATA_TYPE.VIDEO_NO:
alert.setAlertType(AlertType.INFORMATION);
alert.setContentText("用户 " + data.getUser_name() + " 拒绝了你的请求");
Platform.runLater(()->alert.showAndWait());
break;
case DATA_TYPE.VIDEO_OK:
//service.setVideo_stop(false);
Platform.runLater(()->video_stage.show());
service.send_video(udp_channel,messages.getVideo_image(),inet_socket);
break;
case DATA_TYPE.AUDIO:
alert.setAlertType(AlertType.CONFIRMATION);
alert.setContentText("用户 " + data.getUser_name() + " 邀请你语音通话是否接受");
Platform.runLater(() -> {
Optional<ButtonType> video_buttons = alert.showAndWait();
if (video_buttons.get() == ButtonType.OK) {
audio_stage.show();
messages.send_byte(Datas.builder().user_name(messages.getUser_name()).type(DATA_TYPE.AUDIO_OK).build());
} else
messages.send_byte(Datas.builder().user_name(messages.getUser_name()).type(DATA_TYPE.AUDIO_OK).build());
});
break;
case DATA_TYPE.AUDIO_END:
Thread thread2 = service.getAudio_stop();
if(thread2!=null && thread2.isAlive())thread2.stop();
Platform.runLater(() ->{if (audio_stage.isShowing())audio_stage.close();});
String path = null;
if(getIs_luzhi()){
String thistime = LocalTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss"));
File outfile = new File("D:\\桌面\\audio",thistime+"录制.wav");
path = outfile.getPath();
if(!outfile.getParentFile().exists())outfile.getParentFile().mkdirs();
byte[] datas = bytess.toByteArray();
AudioFormat audioFormat = service.getAudioFormat();
AudioInputStream inputs = new AudioInputStream(new ByteArrayInputStream(datas),audioFormat,datas.length/audioFormat.getFrameSize());
AudioSystem.write(inputs,AudioFileFormat.Type.WAVE,outfile);
}
alert.setAlertType(AlertType.INFORMATION);
alert.setContentText(getIs_luzhi()?"通话已经结束\n录音保存:"+path:"通话已经结束");
setIs_luzhi(false);
Platform.runLater(() -> alert.showAndWait());
break;
case DATA_TYPE.AUDIO_NO:
alert.setAlertType(AlertType.INFORMATION);
alert.setContentText("用户 " + data.getUser_name() + " 拒绝了你的请求");
Platform.runLater(() -> alert.showAndWait());
break;
case DATA_TYPE.AUDIO_OK:
//service.setAudio_stop(false);
Platform.runLater(() -> audio_stage.show());
service.send_audio(udp_channel,inet_socket);
break;
case DATA_TYPE.TEXT:
Button button = new Button(new String(data.getDatas()));
button.setTextFill(Color.WHITE);
button.setStyle("-fx-background-color:cadetblue");
String times = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
messages.getListView().getItems().addAll(times,button);
break;
case DATA_TYPE.IMAGE:
ByteArrayOutputStream outs = new ByteArrayOutputStream();
Thumbnails
.of(new ByteArrayInputStream(data.getDatas()))
.size(300,300)
.outputQuality(1.0f)
.outputFormat("jpeg")
.toOutputStream(outs);
ImageView imageView = new ImageView(new Image(new ByteArrayInputStream(outs.toByteArray())));
String times2 = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
messages.getListView().getItems().addAll(times2,imageView);
break;
case DATA_TYPE.FILE:
alert.setAlertType(AlertType.CONFIRMATION);
alert.setContentText("用户 " + data.getUser_name() + "\n发来文件 " + data.getFile_name() + "\n文件大小 "
+ getSize(data.getData_size() * 1.0) + " 是否接收");
FileChooser fileChooser = messages.getFileChooser();
fileChooser.setTitle("选择保存位置");
fileChooser.getExtensionFilters()
.addAll(new ExtensionFilter("ALL","*.*"),new ExtensionFilter("JPG","*.jpg"),new ExtensionFilter("PNG","*.png"));
fileChooser.setInitialFileName(data.getFile_name());
Platform.runLater(() -> {
try {
Optional<ButtonType> files = alert.showAndWait();
if (files.get() == ButtonType.OK) {
File filess = fileChooser.showSaveDialog(messages.getStage());
if(filess==null)return;
Files.write(data.getDatas(), filess);
alert.setContentText("保存成功");
alert.showAndWait();
}
} catch (IOException e) {
alert.setAlertType(AlertType.ERROR);
alert.setContentText("保存失败");
alert.showAndWait();
e.printStackTrace();
}
});
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static String getSize(Double length) {
Double KB = 1024d, MB = 1024d * 1024d, GB = 1024d * 1024d * 1024d, TB = 1024d * 1024d * 1024d * 1024d;
return (length > KB
? length > MB
? length > GB ? length > TB ? String.format("%.2fTB", length / TB)
: String.format("%.2fGB", length / GB) : String.format("%.2fMB", length / MB)
: String.format("%.2fKB", length / KB)
: length.toString());
}
}
4.定义Netty客户端消息收发部分,客户端也使用了TCP、UDP两种模式,与服务端代码基本相同
public class Client extends Task<ClientUI> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Rectangle dimension = new Rectangle(Toolkit.getDefaultToolkit().getScreenSize());
private static Robot robots;
private static InetSocketAddress inet_socket = new InetSocketAddress("localhost",8083);
static {
try {
robots = new Robot();
} catch (AWTException e) {
e.printStackTrace();
}
}
private Client_Task messages;
private volatile boolean is_luzhi = false;
private ExecutorService forkJoin;
private ByteArrayOutputStream bytess = new ByteArrayOutputStream();
private Service service = new Service();
private Logger logger = LoggerFactory.getLogger(Client.class);
private Channel tcp_channel;
private Channel udp_channel;
private EventLoopGroup udp_group = new NioEventLoopGroup();
private EventLoopGroup tcp_group = new NioEventLoopGroup();
private ClassResolver classLoader = ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader());
public boolean getIs_luzhi() {
return this.is_luzhi;
}
public Service getService() {
return service;
}
public Channel getTcp_channel() {
return tcp_channel;
}
public void setUdp_channel(Channel udp_channel) {
this.udp_channel = udp_channel;
}
public void setIs_luzhi(boolean is_luzhi) {
this.is_luzhi = is_luzhi;
}
public CTasks(Client_Task messages) {
this.messages = messages;
this.forkJoin = messages.getForkJoin();
}
@Override
protected Client_Task call() throws Exception {
stratClientTCP();
stratClientUDP();
return messages;
}
private void send_video(ImageView video_image) {
forkJoin.execute(() -> {
try {
ByteArrayOutputStream byte_out = new ByteArrayOutputStream();
Thumbnails.of(robots.createScreenCapture(dimension))
.scale(0.3f)
.outputQuality(0.3f)
.outputFormat("jpg")
.toOutputStream(byte_out);
byte[] bytes = byte_out.toByteArray();
ByteArrayInputStream inputs = new ByteArrayInputStream(bytes);
video_image.setImage(new Image(inputs));
if (udp_channel != null)udp_channel.writeAndFlush(new DatagramPacket(Unpooled.wrappedBuffer(bytes),inet_socket));
//byte_out.flush();
byte_out.close();
inputs.close();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
private void stratClientTCP() {
forkJoin.execute(() -> {
try {
Bootstrap b = new Bootstrap();
b.group(tcp_group)
.channel(NioSocketChannel.class) // 非阻塞模式
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler("DEBUG"))
.addLast(new ObjectDecoder(Integer.MAX_VALUE,classLoader))
.addLast(new ObjectEncoder())
.addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg)
throws Exception {
exetype(ctx,(Datas) msg);
}
});
}
});
ChannelFuture channel = b.connect("localhost",8082).sync();
System.out.println("stratclientTCP");
(tcp_channel = channel.channel()).writeAndFlush(new Datas().setType((byte) 100)).channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
});
}
private void stratClientUDP() {
forkJoin.execute(() -> {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(udp_group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST,true)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(65535,65535))
.option(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(65535,65535,65535))
.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR,new DefaultMessageSizeEstimator(65535))
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler("DEBUG"))
.addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)
throws Exception {
ByteBuf bytebuf= msg.content();
int type = bytebuf.readInt();
byte[] bytes = new byte[bytebuf.readableBytes()];
bytebuf.readBytes(bytes);
if(type==0){
//System.out.println("视频");
messages.getVideo_image().setImage(new Image(new ByteBufInputStream(Unpooled.wrappedBuffer(bytes))));
}else if(type==1){
//System.out.println("音频");
if(getIs_luzhi()){
System.out.println("录制");
bytess.write(bytes);
}
SourceDataLine source = service.getLine2();
if(!source.isOpen()){
source.open(service.getAudioFormat());
source.start();
}
source.write(bytes,0,bytes.length);
}
}
});
}
});
ChannelFuture channel = bootstrap.bind("localhost",8084).sync();
System.out.println("stratclientUDP");
setUdp_channel(channel.channel());
channel.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("stratServerUDP");
});
}
private void exetype(ChannelHandlerContext ctx, Datas data) {
logger.info("客户端收到数据............");
Alert alert = messages.getAlertss();
Stage audio_stage = messages.getAudio_stage();
Stage video_stage = messages.getVideo_stage();
try {
switch (data.getType()) {
case DATA_TYPE.VIDEO:
alert.setAlertType(AlertType.CONFIRMATION);
alert.setContentText("用户 " + data.getUser_name() + " 邀请你视频通话是否接受");
Platform.runLater(() -> {
Optional<ButtonType> buttons = alert.showAndWait();
if (buttons.get() == ButtonType.OK) {
video_stage.show();
messages.send_byte(Datas.builder().user_name(messages.getUser_name()).type(DATA_TYPE.VIDEO_OK).build());
} else
messages.send_byte(Datas.builder().user_name(messages.getUser_name()).type(DATA_TYPE.VIDEO_NO).build());
});
break;
case DATA_TYPE.VIDEO_END:
//service.setVideo_stop(true);
Thread thread = service.getVideo_stop();
if(thread!=null && thread.isAlive())thread.stop();
Platform.runLater(()->{
if(video_stage.isShowing())
video_stage.close();
});
alert.setAlertType(AlertType.INFORMATION);
alert.setContentText("通话已经结束");
Platform.runLater(()->alert.showAndWait());
break;
case DATA_TYPE.VIDEO_NO:
alert.setAlertType(AlertType.INFORMATION);
alert.setContentText("用户 " + data.getUser_name() + " 拒绝了你的请求");
Platform.runLater(()->alert.showAndWait());
break;
case DATA_TYPE.VIDEO_OK:
//service.setVideo_stop(false);
Platform.runLater(()->video_stage.show());
service.send_video(udp_channel,messages.getVideo_image(),inet_socket);
break;
case DATA_TYPE.AUDIO:
alert.setAlertType(AlertType.CONFIRMATION);
alert.setContentText("用户 " + data.getUser_name() + " 邀请你语音通话是否接受");
Platform.runLater(() -> {
Optional<ButtonType> video_buttons = alert.showAndWait();
if (video_buttons.get() == ButtonType.OK) {
audio_stage.show();
messages.send_byte(Datas.builder().user_name(messages.getUser_name()).type(DATA_TYPE.AUDIO_OK).build());
} else
messages.send_byte(Datas.builder().user_name(messages.getUser_name()).type(DATA_TYPE.AUDIO_OK).build());
});
break;
case DATA_TYPE.AUDIO_END:
Thread thread2 = service.getAudio_stop();
if(thread2!=null && thread2.isAlive())thread2.stop();
Platform.runLater(() ->{if(audio_stage.isShowing())audio_stage.close();});
String path = null;
if(getIs_luzhi()){
String thistime = LocalTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss"));
File outfile = new File("D:\\桌面\\audio",thistime+"录制.wav");
path = outfile.getPath();
if(!outfile.getParentFile().exists())outfile.getParentFile().mkdirs();
byte[] datas = bytess.toByteArray();
AudioFormat audioFormat = service.getAudioFormat();
AudioInputStream inputs = new AudioInputStream(new ByteArrayInputStream(datas),audioFormat,datas.length/audioFormat.getFrameSize());
AudioSystem.write(inputs,AudioFileFormat.Type.WAVE,outfile);
}
alert.setAlertType(AlertType.INFORMATION);
alert.setContentText(getIs_luzhi()?"通话已经结束\n录音保存位置:"+path:"通话已经结束");
setIs_luzhi(false);
Platform.runLater(() -> alert.showAndWait());
break;
case DATA_TYPE.AUDIO_NO:
alert.setAlertType(AlertType.INFORMATION);
alert.setContentText("用户 " + data.getUser_name() + " 拒绝了你的请求");
Platform.runLater(() -> alert.showAndWait());
break;
case DATA_TYPE.AUDIO_OK:
Platform.runLater(() -> audio_stage.show());
service.send_audio(udp_channel,inet_socket);
break;
case DATA_TYPE.TEXT:
Button button = new Button(new String(data.getDatas()));
button.setTextFill(Color.WHITE);
button.setStyle("-fx-background-color:cadetblue");
String times = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
messages.getListView().getItems().addAll(times,button);
break;
case DATA_TYPE.IMAGE:
ByteArrayOutputStream outs = new ByteArrayOutputStream();
Thumbnails
.of(new ByteArrayInputStream(data.getDatas()))
.size(300,300)
.outputQuality(1.0f)
.outputFormat("jpeg")
.toOutputStream(outs);
ImageView imageView = new ImageView(new Image(new ByteArrayInputStream(outs.toByteArray())));
imageView.setOnMouseClicked(x->{
ContextMenu contextMenu = new ContextMenu();
contextMenu.getItems().addAll(new MenuItem("保存"),new MenuItem("复制"));
Platform.runLater(() -> contextMenu.show(imageView,imageView.getX(),imageView.getY()));
System.out.println(x);
});
imageView.setOnDragDropped(x->{
System.out.println("1"+x.getGestureSource());
});
imageView.setOnDragEntered(x->{
System.out.println("2"+x.getGestureSource());
});
imageView.setOnDragExited(x->{
System.out.println("3"+x.getGestureSource());
});
String times2 = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
messages.getListView().getItems().addAll(times2,imageView);
break;
case DATA_TYPE.FILE:
alert.setAlertType(AlertType.CONFIRMATION);
alert.setContentText("用户 " + data.getUser_name() + "\n发来文件 " + data.getFile_name() + "\n文件大小 "
+ getSize(data.getData_size() * 1.0) + " 是否接收");
FileChooser fileChooser = messages.getFileChooser();
fileChooser.setTitle("选择保存位置");
fileChooser.getExtensionFilters()
.addAll(new ExtensionFilter("ALL","*.*"),new ExtensionFilter("JPG","*.jpg"),new ExtensionFilter("PNG","*.png"));
fileChooser.setInitialFileName(data.getFile_name());
Platform.runLater(() -> {
try {
Optional<ButtonType> files = alert.showAndWait();
if (files.get() == ButtonType.OK) {
File filess = fileChooser.showSaveDialog(messages.getStage());
if(filess==null)return;
Files.write(data.getDatas(),filess);
alert.setContentText("保存成功");
alert.showAndWait();
}
} catch (IOException e) {
alert.setAlertType(AlertType.ERROR);
alert.setContentText("保存失败");
alert.showAndWait();
e.printStackTrace();
}
});
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static String getSize(Double length) {
Double KB = 1024d, MB = 1024d * 1024d, GB = 1024d * 1024d * 1024d, TB = 1024d * 1024d * 1024d * 1024d;
return (length > KB
? length > MB
? length > GB ? length > TB ? String.format("%.2fTB", length / TB)
: String.format("%.2fGB", length / GB) : String.format("%.2fMB", length / MB)
: String.format("%.2fKB", length / KB)
: length.toString());
}
}
5.定义服务端UI部分,这里使用了javafx UI界面
public class ClientUI extends Application implements Serializable, EventHandler<ActionEvent> {
private static final long serialVersionUID = 1L;
private Integer count = 2;
private String[] buttontxt = {"发送","发送文件","视频通话","音频通话"};
private ListView<Object> listView;
private ImageView imageView = new ImageView();
private String user_name = "张三";
private Text text = new Text("用户聊天窗口");
private Button[] button = new Button[buttontxt.length];
private TextArea textarea = new TextArea();
private FileChooser fileChooser = new FileChooser();
private Alert alertss = new Alert(AlertType.INFORMATION);
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private List<Object> contentLabelList = new ArrayList<>();
private ExecutorService forkJoin = Executors.newFixedThreadPool(10);
private ImageView video_image = new ImageView();
private ImageView audio_image = new ImageView();
private Stage video_stage = UI.get_video_ui("服务端视频通话",video_image, this);
private Stage audio_stage = UI.get_audio_ui("服务端音频通话",audio_image, this);
private STasks tasks = new STasks(this);
private Stage stage;
public Server_Task() throws IOException{
// datagram.socket().setReceiveBufferSize(Integer.MAX_VALUE);
// datagram.socket().setSendBufferSize(Integer.MAX_VALUE);
// datagram.socket().setBroadcast(true);
// datagram.socket().setReuseAddress(true);
fileChooser.setInitialDirectory(new File("D:\\桌面"));
fileChooser.setTitle("选择文件");
// fileChooser.getExtensionFilters().addAll(
// new ExtensionFilter("JPG","*.jpg"),
// new ExtensionFilter("PNG","*.png"),
// new ExtensionFilter("GIF","*.gif"),
// new ExtensionFilter("PSD","*.psd"),
// new ExtensionFilter("PM4","*.mp4"),
// new ExtensionFilter("MP3","*.mp3")
// );
for (int a = 0; a < buttontxt.length; a++){
button[a] = new Button(buttontxt[a]);
button[a].setOnAction(this);
}
textarea.setTooltip(new Tooltip("此处为发送内容"));
textarea.setPromptText("请输入发送内容");
textarea.setFocusTraversable(false);
textarea.setOnDragOver(x->{
if(x.getGestureSource()!=textarea)
x.acceptTransferModes(TransferMode.ANY);
});
textarea.setOnDragDropped(x->{
Dragboard dragboard = x.getDragboard();
List<File> file = dragboard.getFiles();
for (File file2 : file) {
File[] files = file2.isDirectory()?file2.listFiles(b->b.length()<1024*1024*50):new File[]{file2};
for (File file3 : files)send_files(file3);
}
});
}
@Override
public void handle(ActionEvent event) {
try {
switch (((Button)event.getSource()).getText()) {
case "发送":
send_byte(new Datas(DATA_TYPE.TEXT,user_name,textarea[0].getText().getBytes()));
break;
case "发送文件":
File files = fileChooser.showOpenDialog(stage);
if(files==null)break;
forkJoin.execute(()->send_files(files));
break;
case "视频通话":
Datas datas = Datas.builder().user_name(user_name).type(DATA_TYPE.VIDEO).build();
send_byte(datas);
break;
case "音频通话":
Datas datas2 = Datas.builder().user_name(user_name).type(DATA_TYPE.AUDIO).build();
send_byte(datas2);
break;
case "结束视频通话":
tasks.getService().getVideo_stop().stop();
Datas datas3 = Datas.builder().user_name(user_name).type(DATA_TYPE.VIDEO_END).build();
send_byte(datas3);
if(video_stage.isShowing())video_stage.close();
break;
case "结束音频通话":
Thread thread2 = tasks.getService().getAudio_stop();
if(thread2!=null && thread2.isAlive())thread2.stop();
Datas datas4 = Datas.builder().user_name(user_name).type(DATA_TYPE.AUDIO_END).build();
send_byte(datas4);
if(audio_stage.isShowing())audio_stage.close();
break;
case "通话录音":
tasks.setIs_luzhi(true);
System.out.println(tasks.getIs_luzhi());
Button buttons = ((Button)event.getSource());
buttons.setText("正在录制");
buttons.setStyle("-fx-background-color:orangered;-fx-font-color:while");
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void start(Stage primaryStage) {
this.stage = primaryStage;
alertss.setTitle("服务端系统提示");
// contentLabelList.add(sdf.format(new Date(System.currentTimeMillis())) + ":");
// ImageView imageView = new ImageView(image[0]);
// imageView.setFitWidth(800);
// imageView.setFitHeight(400);
// contentLabelList.add(imageView);
listView = new ListView<Object>(FXCollections.observableArrayList(contentLabelList));
listView.getStylesheets().add(getClass().getResource("/javafx/listview.css").toExternalForm());
HBox hroot = new HBox(10);
hroot.getChildren().addAll(button[0],button[1],button[2],button[3]);
hroot.setAlignment(Pos.CENTER_RIGHT);
//HBox.setMargin(button[1], new Insets(0, 0, 0, 20));
VBox vroot = new VBox(20);
vroot.getChildren().addAll(text, listView, textarea[0], hroot);
vroot.setAlignment(Pos.CENTER);
vroot.setPadding(new Insets(10, 10, 10, 10));
//VBox.setMargin(textarea[0], new Insets(20, 20, 0, 0));
//KeyCodeCombination keys = new KeyCodeCombination(KeyCode.ENTER);
Scene scene = new Scene(vroot,500,600);
//scene.getAccelerators().put(keys,()->button[0].fire());
scene.getStylesheets().add(getClass().getResource("/javafx/javafx.css").toExternalForm());
primaryStage.getIcons().add(image[0]);
primaryStage.setTitle("服务端");
//primaryStage.setAlwaysOnTop(true);
primaryStage.setScene(scene);
primaryStage.show();
forkJoin.execute(tasks);
}
private InetSocketAddress inet = new InetSocketAddress("127.0.0.1", 8083);
public void send_byte(Datas data){
ChannelHandlerContext channel = tasks.getTcp_channels();
if(channel!=null){
System.out.println("socketChannel "+channel);
channel.writeAndFlush(data);
}
}
public void send_files(File files){
Boolean is_image = files.getName().matches(".*\\.(jpg|png|gif|jpeg)$");
try(ByteArrayOutputStream outputs = new ByteArrayOutputStream();) {
Files.copy(files,outputs);
if(files.length()<1024*1024*10){
Datas datas= Datas.builder()
.type(is_image?DATA_TYPE.IMAGE:DATA_TYPE.FILE)
.file_name(files.getName())
.data_size(files.length())
.datas(outputs.toByteArray())
.user_name(user_name)
.build();
send_byte(datas);
}else send_byte_limit2(files,outputs.toByteArray(),1024*1024);
} catch (Exception e) {
e.printStackTrace();
}
}
private void send_byte_limit(File files,byte[] bytes,int skip){
int start = 0,end = 0;
while(start<bytes.length){
end = start + skip;
if (end > bytes.length)end = bytes.length;
byte[] inputs = Arrays.copyOfRange(bytes,start,end);
start = end;
Datas datas= Datas.builder()
.type(DATA_TYPE.FILE)
.file_name(files.getName())
.data_size(files.length())
.isfen(1)
.datas(inputs)
.user_name(user_name)
.build();
send_byte(datas);
}
send_byte(Datas.builder().type(DATA_TYPE.FILE).isend(1).build());
}
public static void main(String[] args) throws Exception {
launch(args);
}
6.定义客户端UI部分,与服务端代码大同小异
public class Client_Task extends Application implements Serializable, EventHandler<ActionEvent> {
private static final long serialVersionUID = 1L;
private Integer count = 2;
private String[] buttontxt = {"发送","发送文件","视频通话","音频通话"};
private ListView<Object> listView;
private ImageView imageView = new ImageView();
private Text text = new Text("用户聊天窗口");
private String user_name = "李四";
private Button[] button = new Button[buttontxt.length];
private TextArea textarea = new TextArea();
private FileChooser fileChooser = new FileChooser();
private Alert alertss = new Alert(AlertType.INFORMATION);
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private List<Object> contentLabelList = new ArrayList<>();
private ExecutorService forkJoin = Executors.newFixedThreadPool(10);
// private volatile DatagramChannel datagram = DatagramChannel.open();
// private volatile ServerSocketChannel server = ServerSocketChannel.open();
private ImageView video_image = new ImageView();
private ImageView audio_image = new ImageView();
private Stage video_stage = UI.get_video_ui("客户端视频通话",video_image, this);
private Stage audio_stage = UI.get_audio_ui("客户端音频通话",audio_image, this);
private CTasks tasks = new CTasks(this);
private Stage stage;
public Client_Task() throws IOException{
// datagram.socket().setReceiveBufferSize(Integer.MAX_VALUE);
// datagram.socket().setSendBufferSize(Integer.MAX_VALUE);
// datagram.socket().setBroadcast(true);
// datagram.socket().setReuseAddress(true);
fileChooser.setInitialDirectory(new File("D:\\桌面"));
fileChooser.setTitle("选择文件");
// fileChooser.getExtensionFilters().addAll(
// new ExtensionFilter("JPG","*.jpg"),
// new ExtensionFilter("PNG","*.png"),
// new ExtensionFilter("GIF","*.gif"),
// new ExtensionFilter("PSD","*.psd"),
// new ExtensionFilter("PM4","*.mp4"),
// new ExtensionFilter("MP3","*.mp3")
// );
for (int a = 0; a < buttontxt.length; a++){
button[a] = new Button(buttontxt[a]);
button[a].setOnAction(this);
}
textarea = new TextArea();
textarea.setTooltip(new Tooltip("此处为发送内容"));
textarea.setPromptText("请输入发送内容");
textarea.setFocusTraversable(false);
textarea[0].setOnDragOver(x->{
if(x.getGestureSource()!=textarea[0])
x.acceptTransferModes(TransferMode.ANY);
});
textarea[0].setOnDragDropped(x->{
Dragboard dragboard = x.getDragboard();
List<File> file = dragboard.getFiles();
for (File file2 : file) {
File[] files = file2.isDirectory()?file2.listFiles(b->b.length()<1024*1024*50):new File[]{file2};
for (File file3 : files)send_files(file3);
}
});
}
@Override
public void handle(ActionEvent event) {
try {
switch (((Button)event.getSource()).getText()) {
case "发送":
send_byte(new Datas(DATA_TYPE.TEXT,user_name,textarea[0].getText().getBytes()));
break;
case "选择文件":
File files = fileChooser.showOpenDialog(stage);
if(files==null)break;
forkJoin.execute(()->send_files(files));
break;
case "视频通话":
Datas datas = Datas.builder().user_name(user_name).type(DATA_TYPE.VIDEO).build();
send_byte(datas);
break;
case "音频通话":
Datas datas2 = Datas.builder().user_name(user_name).type(DATA_TYPE.AUDIO).build();
send_byte(datas2);
break;
case "结束视频通话":
tasks.getService().getVideo_stop().stop();
Datas datas3 = Datas.builder().user_name(user_name).type(DATA_TYPE.VIDEO_END).build();
send_byte(datas3);
if(video_stage.isShowing())video_stage.close();
//tasks.setVideo_stop(false);
break;
case "结束音频通话":
Thread thread2 = tasks.getService().getAudio_stop();
if(thread2!=null && thread2.isAlive())thread2.stop();
Datas datas4 = Datas.builder().user_name(user_name).type(DATA_TYPE.AUDIO_END).build();
send_byte(datas4);
if(audio_stage.isShowing())audio_stage.close();
break;
case "通话录音":
tasks.setIs_luzhi(true);
System.out.println(tasks.getIs_luzhi());
Button buttons = ((Button)event.getSource());
buttons.setText("正在录制");
buttons.setStyle("-fx-background-color:orangered;-fx-font-color:while");
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void start(Stage primaryStage) {
this.stage = primaryStage;
alertss.setTitle("客户端系统提示");
contentLabelList.add(sdf.format(new Date(System.currentTimeMillis())) + ":");
listView = new ListView<Object>(FXCollections.observableArrayList(contentLabelList));
listView.getStylesheets().add(getClass().getResource("/javafx/listview.css").toExternalForm());
HBox hroot = new HBox(10);
hroot.getChildren().addAll(button[0],button[1],button[2],button[3]);
hroot.setAlignment(Pos.CENTER_RIGHT);
//HBox.setMargin(button[1], new Insets(0, 0, 0, 20));
VBox vroot = new VBox(20);
vroot.getChildren().addAll(text, listView, textarea[0], hroot);
vroot.setAlignment(Pos.CENTER);
vroot.setPadding(new Insets(10, 10, 10, 10));
//VBox.setMargin(textarea[0], new Insets(20, 20, 0, 0));
KeyCodeCombination keys = new KeyCodeCombination(KeyCode.ENTER);
Scene scene = new Scene(vroot,500,600);
scene.getAccelerators().put(keys,()->button[0].fire());
scene.getStylesheets().add(getClass().getResource("/javafx/javafx.css").toExternalForm());
primaryStage.getIcons().add(image[0]);
//primaryStage.setAlwaysOnTop(true);
primaryStage.setTitle("客户端");
primaryStage.setScene(scene);
primaryStage.show();
forkJoin.execute(tasks);
}
/**
* 发送Datas
* @param data 封装的数据
*
*/
//private InetSocketAddress inet = new InetSocketAddress("127.0.0.1", 8083);
public void send_byte(Datas data){
//System.out.println("发送");
//SocketChannel socketChannel = tasks.getClient_socket();
Channel channel = tasks.getTcp_channel();
if(channel!=null && channel.isOpen() && channel.isActive()){
System.out.println("socketChannel "+channel);
//channel.writeAndFlush(ByteBuffer.wrap(Service.obj_to_byte(data)));
channel.writeAndFlush(data);
}
}
/**
* 发送文件数据
* @param files
*
*/
public void send_files(File files){
Boolean is_image = files.getName().matches(".*\\.(jpg|png|gif|jpeg)$");
try(ByteArrayOutputStream outputs = new ByteArrayOutputStream();
FileInputStream inputs = new FileInputStream(files)) {
Datas datas= Datas.builder().type(is_image?DATA_TYPE.IMAGE:DATA_TYPE.FILE).file_name(files.getName())
.data_size(files.length()).user_name(user_name).build();
if(files.length()<1024*1024*10){
Files.copy(files,outputs);
datas.setDatas(outputs.toByteArray());
send_byte(datas);
}else{
send_byte(datas);
byte[] bytes = new byte[1024*1024];
for(int data = -1;(data=inputs.read(bytes,0,bytes.length))>-1;){
Datas datass= Datas.builder()
.type(DATA_TYPE.FILE).file_name(files.getName()).data_size(files.length())
.isfen(1).datas(bytes).user_name(user_name).build();
send_byte(datass);
}
send_byte(Datas.builder().type(DATA_TYPE.FILE).isend(1).build());
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 分段发送数据
* @param files
* @param bytes 文件数据
* @param skip 分段大小
*/
private void send_byte_limit(File files,byte[] bytes,int skip){
int start = 0,end = 0;
while(start<bytes.length){
end = start + skip;
if (end > bytes.length)end = bytes.length;
byte[] inputs = Arrays.copyOfRange(bytes,start,end);
start = end;
Datas datas= Datas.builder()
.type(DATA_TYPE.FILE)
.file_name(files.getName())
.data_size(files.length())
.isfen(1)
.datas(inputs)
.user_name(user_name)
.build();
send_byte(datas);
}
send_byte(Datas.builder().type(DATA_TYPE.FILE).isend(1).build());
}
public static void main(String[] args) throws Exception {
launch(args);
}
7.定义视频和音频的发送部分
public final class Service {
private Thread video_stop,audio_stop;
private static Java2DFrameConverter converter = new Java2DFrameConverter();
private static OpenCVFrameGrabber grabber = new OpenCVFrameGrabber(0);
private AudioFormat audioFormat;
private SourceDataLine line2;
public Thread getVideo_stop() {
return video_stop;
}
public Thread getAudio_stop() {
return audio_stop;
}
public SourceDataLine getLine2() {
return line2;
}
public AudioFormat getAudioFormat() {
return audioFormat;
}
public Service(){
try {
audioFormat = new AudioFormat(44100.0F, 16, 2, true, false);
DataLine.Info dataLineInfo2 = new DataLine.Info(SourceDataLine.class, audioFormat);
line2 = (SourceDataLine) AudioSystem.getLine(dataLineInfo2);
} catch (Exception e) {
e.printStackTrace();
}
}
public void send_audio(Channel udp_channel,InetSocketAddress inet_socket) {
audio_stop = new Thread(()->{
TargetDataLine line = null;
try {
DataLine.Info dataLineInfo = new DataLine.Info(TargetDataLine.class, audioFormat);
line = (TargetDataLine) AudioSystem.getLine(dataLineInfo);
line.open(audioFormat);
line.start();
byte[] bytes = new byte[1024];
AudioInputStream inputStream = new AudioInputStream(line);
for (int data = 0; (data = inputStream.read(bytes)) > 0;) {
ByteArrayOutputStream byte_out = new ByteArrayOutputStream();
byte_out.write(bytes, 0, data);
byte[] bytes2 = byte_out.toByteArray();
byte_out.close();
//if(is_play)line2.write(bytes, 0, data);
//if(is_luzhi)bytess.write(bytes, 0, data);
ByteBuf byte_buffer= Unpooled.buffer(1024);
byte_buffer.writeInt(1);
byte_buffer.writeBytes(bytes2);
udp_channel.writeAndFlush(new DatagramPacket(byte_buffer,inet_socket));
}
} catch (Exception e1) {
e1.printStackTrace();
}finally{
line.stop();
line.close();
}
});
audio_stop.start();
}
public void send_video(Channel udp_channel,ImageView images,InetSocketAddress inet_socket){
video_stop = new Thread(()->{
try {
grabber.start();
for(Frame frames;(frames = grabber.grab())!=null;){
ByteArrayOutputStream byte_out = new ByteArrayOutputStream();
Thumbnails.of(converter.convert(frames))
.scale(0.4f).outputQuality(0.4f)
.outputFormat("jpeg").toOutputStream(byte_out);
byte[] bytes = byte_out.toByteArray();byte_out.close();
images.setImage(new Image(new ByteArrayInputStream(bytes)));
ByteBuf byte_buffer= Unpooled.buffer(1024);
byte_buffer.writeInt(0);
byte_buffer.writeBytes(bytes);
udp_channel.writeAndFlush(new DatagramPacket(byte_buffer,inet_socket));
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
grabber.stop();
grabber.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
video_stop.start();
}
}
8.定义视频和音频通话的窗口
public abstract class UI{
public static Stage get_video_ui(String title,ImageView video_image, EventHandler<ActionEvent> value) {
VBox vBox = new VBox(10);
Text text = new Text("视频通话中...");
text.setFont(Font.font("黑体",20));
String[] button_text = {"结束视频通话","通话录音"};
Button[] button = new Button[2];
for (int i = 0; i < button.length; i++) {
button[i] = new Button(button_text[i]);
button[i].setFont(Font.font("黑体", 20));
button[i].setPrefSize(360,40);
button[i].setOnAction(value);
}
video_image.setFitWidth(360);
video_image.setFitHeight(340);
video_image.setImage(new Image("/0.png"));
vBox.getChildren().addAll(text, video_image, button[0], button[1]);
vBox.setAlignment(Pos.CENTER);
Stage stage = new Stage(StageStyle.UTILITY);
stage.setTitle(title);
stage.setResizable(false);
stage.setScene(new Scene(vBox,400,500));
return stage;
}
public static Stage get_audio_ui(String title,ImageView audio_image, EventHandler<ActionEvent> value) {
VBox vBox = new VBox(10);
Text text = new Text("音频通话中...");
text.setFont(Font.font("黑体",20));
String[] button_text = {"结束音频通话","通话录音"};
Button[] button = new Button[2];
for (int i = 0; i < button.length; i++) {
button[i] = new Button(button_text[i]);
button[i].setFont(Font.font("黑体", 20));
button[i].setPrefSize(360,40);
button[i].setOnAction(value);
}
audio_image.setFitWidth(360);
audio_image.setFitHeight(340);
audio_image.setImage(new Image("/0.png"));
vBox.getChildren().addAll(text, audio_image, button[0], button[1]);
vBox.setAlignment(Pos.CENTER);
Stage stage = new Stage(StageStyle.UTILITY);
stage.setTitle(title);
stage.setResizable(false);
stage.setScene(new Scene(vBox, 400, 500));
return stage;
}
}
9.先运行服务端,再运行客户端 运行效果如下图所示
10.用到的mavem依赖包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>net.coobird</groupId>
<artifactId>thumbnailator</artifactId>
<version>0.4.8</version>
</dependency>
<dependency>
<groupId>org.bytedeco</groupId>
<artifactId>javacv-platform</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.bytedeco.javacpp-presets</groupId>
<artifactId>opencv-platform</artifactId>
<version>3.4.1-1.4.1</version>
</dependency>
|