下大雨了写一个 小demo,方便测试程序
import socket, argparse
import time
from datetime import datetime
MAX_BYTES=65535
class net_com:
"""
通用最简传输协议使用
"""
def __init__(self, proto: str, addr: tuple, opt: str):
"""
:param proto: 选择使用的传输层的协议类型,
:param addr: 传输层协议必须要有的IP地址和端口号
:param opt: 选择使用的模式
"""
try:
self.__proto = proto
self.__addr = addr
self.__opt = opt
except TypeError as reason:
print("init common net error: {}".format(reason))
def transport_init(self) -> socket:
try:
if self.__proto == "UDP":
udp_fd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return udp_fd
elif self.__proto == "TCP":
tcp_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
return tcp_fd
except OSError as error:
print("transport init error:{}".format(error))
def print_recv(self, msg: tuple):
"""
:param msg: 普通调试使用的打印方法
:return:
"""
if self.__proto == "UDP":
print("time:{} udp connect->{} {}:\n{}".format(datetime, msg[0][0], msg[0][1], msg[1]))
elif self.__proto == "TCP":
print("time:{} tcp connect->{} {}:\n{}".format(datatime, msg[0][0], msg[0][1], msg[1]))
def __sfunc(self, func=print_recv):
try:
if self.__proto == "UDP":
udp_fd = self.transport_init()
udp_fd.bind(self.__addr)
while True:
listen_udp_recv = udp_fd.recvfrom(4096)
func(listen_udp_recv)
if self.__proto == "TCP":
tcp_fd = self.transport_init()
tcp_fd.bind(self.__addr)
tcp_fd.listen(5)
tcp_accept = tcp_fd.accept()
print("tcp connect->{}:\n".format(tcp_accept))
while True:
time.sleep(3)
msg = tcp_accept[0].recv(4096)
print(msg)
except OSError as error:
print("server func error:{}".format(error))
def send_example(self, fd: socket, msg="hello world"):
if self.__proto == "UDP":
fd.sendto(msg.encode("ascii"), self.__addr)
elif self.__proto == "TCP":
fd.send(msg.encode("ascii"))
def __cfunc(self, func=send_example):
try:
if self.__proto == "UDP":
udp_fd = self.transport_init()
while True:
time.sleep(3)
func(fd=udp_fd)
if self.__proto == "TCP":
tcp_fd = self.transport_init()
tcp_con_fd = tcp_fd.connect(self.__addr)
while True:
time.sleep(3)
func(fd=tcp_con_fd)
except OSError as error:
print("client func is error:{}".format(error))
def transport(self):
try:
if self.__opt == "server":
self.__sfunc()
elif self.__opt == "client":
self.__cfunc()
except OSError as error:
print("transport error:{}".format(error))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Send and Recv locally")
parser.add_argument('-t', type=str, default="UDP", help='trans type')
parser.add_argument('-i', type=str, default="127.0.0.1", help="trans addr default 127.0.0.1")
parser.add_argument('-p', type=int, default=8000, help="trans port default 8000")
parser.add_argument('-m', type=str, default="server", help="trans mode default server")
args = parser.parse_args()
use = net_com(args.t, (args.i, args.p), args.m)
use.transport()```
|