本次实验的主要目的。 理解可靠数据传输的基本原理;掌握停等协议的工作原理;掌握基 于 UDP 设计并实现一个停等协议的过程与技术。 理解滑动窗口协议的基本原理;掌握 GBN 的工作原理;掌握基于 UDP 设计并实现一个 GBN 协议的过程与技术。
https://download.csdn.net/download/Franklins_Fan/21416813
GBN.java
package GBN;
import java.io.*;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class GBN {
protected int WINDOW_SIZE;
protected int DATA_NUMBER_Send = 0;
protected int DATA_NUMBER_Rev = 0;
protected int TIMEOUT;
protected String hostName;
protected int nextSeq = 1;
protected int base = 1;
protected InetAddress destAddress;
protected int destPort = 80;
protected int expectedSeq = 1;
protected int lastSave = 0 ;
protected DatagramSocket sendSocket;
protected DatagramSocket receiveSocket;
protected int MaxDataPacketSize=0;
protected boolean isSendCarryData =false;
protected boolean isRevCarriedData =false;
protected byte[] dataList = new byte[0];
protected List<Integer> dataLengthList = new ArrayList<>() ;
protected String outputFileName = new String();
public GBN( int RECEIVE_PORT, int WINDOW_SIZE, int DATA_NUMBER, int TIMEOUT, String name) throws IOException {
this.WINDOW_SIZE = WINDOW_SIZE;
this.DATA_NUMBER_Send = DATA_NUMBER;
this.DATA_NUMBER_Rev=DATA_NUMBER;
this.TIMEOUT = TIMEOUT;
this.hostName = name;
sendSocket = new DatagramSocket();
receiveSocket = new DatagramSocket(RECEIVE_PORT);
destAddress = InetAddress.getLocalHost();
}
public void sendData(String filename,int MaxDataPacketSize) throws IOException {
File file=new File(filename);
this.MaxDataPacketSize=MaxDataPacketSize;
if(file.length()==0){
System.out.println("文件为空!");
return;
}
try {
DATA_NUMBER_Send=0;
FileInputStream fis = new FileInputStream(file);
byte[] bytes = new byte[MaxDataPacketSize];
int length = 0;
while((length = fis.read(bytes, 0, bytes.length)) != -1) {
DATA_NUMBER_Send++;
dataList=addBytes(dataList,bytes);
dataLengthList.add(length);
}
isSendCarryData=true;
System.out.println(hostName+":文件被拆分为"+DATA_NUMBER_Send+"个包");
fis.close();
} catch (IOException e) {
e.printStackTrace();
return;
}
send();
}
public void send() throws IOException {
int maxACK = 0;
while (true) {
while (nextSeq < base + WINDOW_SIZE && nextSeq <= DATA_NUMBER_Send) {
if (nextSeq % 5 == 0||nextSeq == 8) {
System.out.println(hostName + "模拟丢失报文:Seq = " + nextSeq);
nextSeq++;
continue;
}
byte[] data=new byte[MaxDataPacketSize];
int length=0;
if(isSendCarryData){
length=dataLengthList.get(nextSeq-1);
int curByte=0;
for(int i=0 ;i<nextSeq-1;i++){
curByte+=dataLengthList.get(i);
}
System.arraycopy(dataList,curByte,data,0,length);
}
String sendDataLabel = hostName + ": Sending to port " + destPort + ", Seq = " + nextSeq
+" isDataCarried ="+isSendCarryData+" length = "+length +" DATA_NUMBER = "+DATA_NUMBER_Send +"@@@@@";
byte[] datagram = addBytes(sendDataLabel.getBytes(),data);
DatagramPacket datagramPacket = new DatagramPacket(datagram, datagram.length, destAddress, destPort);
sendSocket.send(datagramPacket);
System.out.println(hostName + "发送到" + destPort + "端口, Seq = " + nextSeq);
nextSeq=nextSeq+1;
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
byte[] bytes = new byte[4096];
DatagramPacket datagramPacket = new DatagramPacket(bytes, bytes.length);
sendSocket.setSoTimeout(1000*TIMEOUT);
try {
sendSocket.receive(datagramPacket);
} catch (SocketTimeoutException ex) {
System.out.println(hostName + " 等待ACK:Seq="+base+"超时");
timeOut();
continue;
}
String fromServer = new String(datagramPacket.getData(), 0, datagramPacket.getLength());
int ack = Integer.parseInt(fromServer.substring(fromServer.indexOf("ACK: ") + "ACK: ".length()).trim());
maxACK = Math.max(ack,maxACK);
base = maxACK+1;
System.out.println(hostName + "当前最后接收到的最大ACK: " + maxACK);
if (maxACK == DATA_NUMBER_Send) {
System.out.println(hostName + "发送完毕,发送方收到了全部的ACK信息");
return;
}
}
}
public void receiveData (String fileName,int MaxDataPacketSize) throws IOException {
isRevCarriedData = true;
this.MaxDataPacketSize=MaxDataPacketSize;
if(!fileName.isBlank()){
outputFileName=fileName;
receive();
}else {
System.out.println("文件名为空!");
}
}
public void receive() throws IOException {
File output = null;
FileOutputStream fos = null;
if(isRevCarriedData){
output = new File(outputFileName);
fos = new FileOutputStream(output);
}
while (true) {
byte[] receivedData = new byte[Math.max(4096,MaxDataPacketSize+500)];
DatagramPacket receivePacket = new DatagramPacket(receivedData, receivedData.length);
receiveSocket.setSoTimeout(1000*TIMEOUT);
try {
receiveSocket.receive(receivePacket);
} catch (SocketTimeoutException ex) {
System.out.println(hostName + " 在正在等待分组: Seq= "+expectedSeq+"的到来 ");
continue;
}
String receivedLabel = new String(receivedData, 0,receivedData.length );
String label =receivedLabel.split("@@@@@")[0];
int labelSize = (label+"@@@@@").getBytes().length;
String pattern = "\\w*: Sending to port \\d+, Seq = (\\d+) isDataCarried =(true|false) length = (\\d+) DATA_NUMBER = (\\d+)";
Matcher matcher = Pattern.compile(pattern).matcher(label);
if (!matcher.find()) {
System.out.println(hostName + " 收到错误数据"+label);
sendACK(expectedSeq - 1, receivePacket.getAddress(), receivePacket.getPort());
continue;
}
int receivedSeq=Integer.parseInt(matcher.group(1));
isRevCarriedData=Boolean.parseBoolean(matcher.group(2));
int dataLength = Integer.parseInt(matcher.group(3));
DATA_NUMBER_Rev = Integer.parseInt(matcher.group(4));
if (receivedSeq == expectedSeq) {
System.out.println(hostName + " 收到了期待的数据,发送ACK:Seq = " + expectedSeq);
if(isRevCarriedData&&lastSave == receivedSeq -1 ){
System.out.println(hostName + "写入数据 " + expectedSeq );
fos.write(receivedData,labelSize,dataLength);
lastSave = receivedSeq ;
}
if (expectedSeq % 7 == 0) {
System.out.println(hostName + "收到了期待的数据,但是模拟丢失ACK: " + expectedSeq);
} else {
sendACK(expectedSeq, receivePacket.getAddress(), receivePacket.getPort());
}
if (expectedSeq == DATA_NUMBER_Rev) {
System.out.println(hostName + "接受完成");
if(isRevCarriedData){
fos.flush();
fos.close();
}
return;
}
expectedSeq++;
} else {
System.out.println(hostName + " 实际收到的数据Seq ="+receivedSeq+",然而期待顺序收到数据Seq = " + expectedSeq+" 因此丢弃此分组");
sendACK(expectedSeq - 1, receivePacket.getAddress(), receivePacket.getPort());
}
}
}
public void timeOut() throws IOException {
int curByte=0;
for (int i =0;i<base-1&&isSendCarryData;i++){
curByte=curByte+dataLengthList.get(i);
}
System.out.println(hostName+" 接受ACK超时,重发Seq:"+base+"--"+(nextSeq-1));
for (int i = base; i < nextSeq; i++) {
byte[] data=new byte[MaxDataPacketSize];
int length=0;
if(isSendCarryData){
length=dataLengthList.get(i-1);
System.arraycopy(dataList,curByte,data,0,length);
curByte=curByte+length;
}
String sendDataLabel = hostName + ": Sending to port " + destPort + ", Seq = " + i
+" isDataCarried ="+isSendCarryData+" length = "+length +" DATA_NUMBER = "+DATA_NUMBER_Send +"@@@@@";
byte[] datagram = addBytes(sendDataLabel.getBytes(),data);
DatagramPacket datagramPacket = new DatagramPacket(datagram, datagram.length, destAddress, destPort);
sendSocket.send(datagramPacket);
System.out.println(hostName
+ "重新发送发送到" + destPort + "端口, Seq = " + i);
}
}
protected void sendACK(int seq, InetAddress toAddr, int toPort) throws IOException {
String response = hostName + " responses ACK: " + seq;
byte[] responseData = response.getBytes();
DatagramPacket responsePacket = new DatagramPacket(responseData, responseData.length, toAddr, toPort);
receiveSocket.send(responsePacket);
}
public String getHostName() {
return hostName;
}
public void setDestAddress(InetAddress destAddress) {
this.destAddress = destAddress;
}
public int getDestPort() {
return destPort;
}
public void setDestPort(int destPort) {
this.destPort = destPort;
}
public static byte[] addBytes(byte[] data1, byte[] data2) {
byte[] data3 = new byte[data1.length + data2.length];
System.arraycopy(data1, 0, data3, 0, data1.length);
System.arraycopy(data2, 0, data3, data1.length, data2.length);
return data3;
}
}
SR.java
package SR;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Set;
public class SR {
protected int WINDOW_SIZE;
protected int DATA_NUMBER;
protected int TIMEOUT;
protected String hostName;
protected int nextSeq = 1;
protected int base = 1;
protected InetAddress destAddress;
protected int destPort = 80;
protected int expectedSeq = 1;
protected int lastSave = 0;
private Set<Integer> senderSentSet = new HashSet<>();
private Set<Integer> senderReceivedACKSet = new HashSet<>();
private Set<Integer> receiverReceivedSet = new HashSet<>();
protected DatagramSocket sendSocket;
protected DatagramSocket receiveSocket;
public SR(int RECEIVE_PORT, int WINDOW_SIZE, int DATA_NUMBER, int TIMEOUT, String name) throws IOException {
this.WINDOW_SIZE = WINDOW_SIZE;
this.DATA_NUMBER = DATA_NUMBER;
this.TIMEOUT = TIMEOUT;
this.hostName = name;
sendSocket = new DatagramSocket();
receiveSocket = new DatagramSocket(RECEIVE_PORT);
destAddress = InetAddress.getLocalHost();
}
public void send() throws IOException {
while (true) {
while (nextSeq < base + WINDOW_SIZE && nextSeq <= DATA_NUMBER) {
if (nextSeq % 5 == 0||nextSeq == 7) {
System.out.println(hostName + "模拟丢失报文:Seq = " + nextSeq);
nextSeq++;
continue;
}
String sendDataLabel = hostName + ": Sending to port " + destPort + ", Seq = " + nextSeq;
byte[] datagram = sendDataLabel.getBytes();
senderSentSet.add(nextSeq);
DatagramPacket datagramPacket = new DatagramPacket(datagram, datagram.length, destAddress, destPort);
sendSocket.send(datagramPacket);
System.out.println(hostName + "发送到" + destPort + "端口, Seq = " + nextSeq);
nextSeq = nextSeq + 1;
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
byte[] bytes = new byte[4096];
DatagramPacket datagramPacket = new DatagramPacket(bytes, bytes.length);
sendSocket.setSoTimeout(1000 * TIMEOUT);
try {
sendSocket.receive(datagramPacket);
} catch (SocketTimeoutException ex) {
System.out.println(hostName + " 等待ACK:Seq=" + base + "超时");
senderSentSet.remove(base);
timeOut();
continue;
}
String fromServer = new String(datagramPacket.getData(), 0, datagramPacket.getLength());
int ack = Integer.parseInt(fromServer.substring(fromServer.indexOf("ACK: ") + "ACK: ".length()).trim());
senderReceivedACKSet.add(ack);
if (base == ack) {
while (senderReceivedACKSet.contains(base)) {
base++;
}
System.out.println(hostName + " 当前窗口 [" + base + "," + (base + WINDOW_SIZE - 1) + "]");
}
System.out.println(hostName + "收到了 ACK: " + ack);
if (base == DATA_NUMBER + 1) {
System.out.println(hostName + "发送完毕,发送方收到了全部的ACK");
return;
}
}
}
public void receive() throws IOException {
int rcvBase = 1;
while (true) {
byte[] receivedData = new byte[4096];
DatagramPacket receivePacket = new DatagramPacket(receivedData, receivedData.length);
receiveSocket.setSoTimeout(1000 * TIMEOUT);
try {
receiveSocket.receive(receivePacket);
} catch (SocketTimeoutException ex) {
System.out.println(hostName + " 正在等待分组的到来");
continue;
}
String received = new String(receivedData, 0, receivePacket.getLength());
int seqIndex = received.indexOf("Seq = ");
if (seqIndex == -1) {
System.out.println(hostName + " 收到错误的数据");
continue;
}
int seq = Integer.parseInt(received.substring(seqIndex + "Seq = ".length()).trim());
if (seq >= rcvBase && seq <= rcvBase + WINDOW_SIZE - 1) {
receiverReceivedSet.add(seq);
System.out.println(hostName + "收到一个接收方窗口内的分组,Seq = " + seq + "已确认");
sendACK(seq, receivePacket.getAddress(), receivePacket.getPort());
if (seq == rcvBase) {
while (receiverReceivedSet.contains(rcvBase)) {
rcvBase++;
}
if (rcvBase == DATA_NUMBER + 1) {
System.out.println(hostName + "接受完毕,发送方收到了全部的数据");
return;
}
}
} else {
System.out.println(hostName + "收到一个不在窗口内的分组,Seq = " + seq + "因此丢弃此分组");
}
}
}
public void timeOut() throws IOException {
String resendData = hostName
+ ": Resending to port " + destPort + ", Seq = " + base;
byte[] data = resendData.getBytes();
DatagramPacket datagramPacket = new DatagramPacket(data,
data.length, destAddress, destPort);
sendSocket.send(datagramPacket);
senderSentSet.add(base);
System.out.println(hostName
+ "重新发送发送到" + destPort + "端口, Seq = " + base);
return;
}
protected void sendACK(int seq, InetAddress toAddr, int toPort) throws IOException {
String response = hostName + " responses ACK: " + seq;
byte[] responseData = response.getBytes();
DatagramPacket responsePacket = new DatagramPacket(responseData, responseData.length, toAddr, toPort);
receiveSocket.send(responsePacket);
}
public String getHostName() {
return hostName;
}
public void setDestAddress(InetAddress destAddress) {
this.destAddress = destAddress;
}
public int getDestPort() {
return destPort;
}
public void setDestPort(int destPort) {
this.destPort = destPort;
}
public static byte[] addBytes(byte[] data1, byte[] data2) {
byte[] data3 = new byte[data1.length + data2.length];
System.arraycopy(data1, 0, data3, 0, data1.length);
System.arraycopy(data2, 0, data3, data1.length, data2.length);
return data3;
}
}
|