IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Python检测UDP端口是否正常通信 -> 正文阅读

[网络协议]Python检测UDP端口是否正常通信

  • 通过python脚本构建客户端,向服务器发送hello的UDP包,然后在跑客户端的机器上tcpdump抓包查看是否能正常收到UDP回包,即可判断UDP服务是否正常。
# -*- coding:utf-8 -*-
#!/usr/bin/python

import sys
import os
import socket
import threading
from threading import _Timer
import datetime
import time
import select

totalTime = 0
sendCnt = 0
failCnt = 0
timeout = 10
timeoutCnt = 0

bufsize = 1024

logFile = 'ping.data'

class RepeatTimer(_Timer):
    def run(self):
        while not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
            self.finished.wait(self.interval)

def GetTime(func):
    def Wrapper(*arg, **kwarg):
        begin = datetime.datetime.now()
        func(*arg, **kwarg)
        end = datetime.datetime.now()

        costMs = (end - begin).seconds * 1000 + (end - begin).microseconds / 1000

        global totalTime

        totalTime = totalTime + costMs

        print "costMs=%dms, totalTime=%dms" % (costMs, totalTime)

    return Wrapper

def SetTimeOut(seconds):
    def Decorator(func):
        def Wrapper(*arg, **kwarg):
            t = threading.Thread(target=func, args=arg, kwargs=kwarg)
            t.setDaemon(True)
            t.start()
            t.join(seconds)

            if t.isAlive():
                raise Exception('function timeout')

        return Wrapper
    return Decorator


@SetTimeOut(timeout)
def RecvData(handle):

    ready = select.select([handle], [], [], timeout)

    if ready[0]:
        recvMsg = handle.recv(bufsize).decode('utf-8')

        if recvMsg == 'hello':
            return True
        else:
            return False

    else:
        print "function timeout: %ds" % timeout
        return False


@GetTime
def PingUdpPort(ip, port, file):
    global sendCnt
    global failCnt
    global timeoutCnt
    sendCnt = sendCnt + 1
    data = 'hello'

    handle = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    handle.setblocking(0)

    handle.sendto(data, (ip, port))

    try:
        state = RecvData(handle)

        if state is False:
            failCnt = failCnt + 1

    #超时,目前设置的是10秒        
    except Exception, e:
        timeoutCnt = timeoutCnt + 1
        print "Run exception", e

    WritePingLog(file)

def WritePingLog(file):
    if totalTime is not 0 and sendCnt > failCnt:
        avgCost = totalTime / (sendCnt - failCnt)
    else:
        avgCost = 0

    file.write("Ping count=%d, fail count=%d, timeoutMsg count=%d, avgCost=%ds\n" % (sendCnt, failCnt, timeoutCnt, avgCost))
    file.flush()

if __name__ == '__main__':
    if len(sys.argv) < 3:
        print "Invalid param count, should be python %s ip port interval" % os.path.basename(__file__)
        exit(-1)

    ip = sys.argv[1]
    port = int(sys.argv[2])
    interval = 2.0
    
    print "argv=", sys.argv

    with open(logFile, 'w') as file:

        testTimer = RepeatTimer(interval, PingUdpPort, args=[ip, port, file])

        testTimer.run()
    # PingUdpPort(handle, ip, port)

  • 测试如下
[root@test ~]# python ping_echosvr.py x.x.x.x x
argv= ['ping_echosvr.py', 'x.x.x.x', 'x']
costMs=63ms, totalTime=63ms
costMs=63ms, totalTime=126ms
  • 抓包结果如下
[root@test ~]# tcpdump -i eth0 udp and src net x.x.x.x
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on eth0, link-type EN10MB (Ethernet), capture size 65535 bytes
14:22:05.420606 IP x.x.x.x > vip.43751: UDP, length 5
14:22:07.486959 IP x.x.x.x > vip.44752: UDP, length 5
  • 如果抓包正常,说明UDP服务能正常通信,如果偶尔没抓到回包也是正常的,毕竟UDP是无连接的,可靠性较差。
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 12:28:31  更:2021-08-26 12:30:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 20:52:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计