IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> python3 UDP TCP 调试回射命令行 -> 正文阅读

[网络协议]python3 UDP TCP 调试回射命令行

下大雨了写一个 小demo,方便测试程序

#!/usr/bin/python3

import socket, argparse
import time
from datetime import datetime

MAX_BYTES=65535

class net_com:
    """
    通用最简传输协议使用
    """
    def __init__(self, proto: str, addr: tuple, opt: str):
        """
        :param proto: 选择使用的传输层的协议类型,
        :param addr: 传输层协议必须要有的IP地址和端口号
        :param opt: 选择使用的模式
        """
        try:
            self.__proto =  proto
            self.__addr = addr
            self.__opt = opt
        except TypeError as reason:
            print("init common net error: {}".format(reason))
    def transport_init(self) -> socket:
        try:
            if self.__proto == "UDP":
                udp_fd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                return udp_fd
            elif self.__proto == "TCP":
                tcp_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                return tcp_fd
        except OSError as error:
            print("transport init error:{}".format(error))

    def print_recv(self, msg: tuple):
        """
        :param msg: 普通调试使用的打印方法
        :return:
        """
        if self.__proto == "UDP":
            print("time:{} udp connect->{} {}:\n{}".format(datetime, msg[0][0], msg[0][1], msg[1]))
        elif self.__proto == "TCP":
            print("time:{} tcp connect->{} {}:\n{}".format(datatime, msg[0][0], msg[0][1], msg[1]))

    def __sfunc(self, func=print_recv):
        try:
            if self.__proto == "UDP":
                udp_fd = self.transport_init()
                udp_fd.bind(self.__addr)
                while True:
                    listen_udp_recv = udp_fd.recvfrom(4096)
                    func(listen_udp_recv)
            if self.__proto == "TCP":
                tcp_fd = self.transport_init()
                tcp_fd.bind(self.__addr)
                tcp_fd.listen(5)
                tcp_accept = tcp_fd.accept()
                print("tcp connect->{}:\n".format(tcp_accept))
                while True:
                    time.sleep(3)
                    msg = tcp_accept[0].recv(4096)
                    print(msg)

        except OSError as error:
            print("server func error:{}".format(error))

    def send_example(self, fd: socket, msg="hello world"):
        if self.__proto == "UDP":
            fd.sendto(msg.encode("ascii"), self.__addr)
        elif self.__proto == "TCP":
            fd.send(msg.encode("ascii"))

    def __cfunc(self, func=send_example):
        try:
            if self.__proto == "UDP":
                udp_fd = self.transport_init()
                while True:
                    time.sleep(3)
                    func(fd=udp_fd)
            if self.__proto == "TCP":
                tcp_fd = self.transport_init()
                tcp_con_fd = tcp_fd.connect(self.__addr) #没有做任何的发送
                while True:
                    time.sleep(3)
                    func(fd=tcp_con_fd)

        except OSError as error:
            print("client func is error:{}".format(error))

    def transport(self):
        try:
            if self.__opt == "server":
                self.__sfunc()
            elif self.__opt == "client":
                self.__cfunc()
        except OSError as error:
            print("transport error:{}".format(error))

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Send and Recv locally")
    parser.add_argument('-t', type=str, default="UDP", help='trans type')
    parser.add_argument('-i', type=str, default="127.0.0.1", help="trans addr default 127.0.0.1")
    parser.add_argument('-p', type=int, default=8000, help="trans port default 8000")
    parser.add_argument('-m', type=str, default="server", help="trans mode default server")
    args = parser.parse_args()
    use = net_com(args.t, (args.i, args.p), args.m)
    use.transport()```

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-09-05 11:23:21  更:2021-09-05 11:26:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年6日历 -2024/6/27 1:59:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码