"""
Time: 2022/4/14 11:13
Author: Jyun
Version: V 0.1
File: demo.py
Blog: https://ctrlcv.blog.csdn.net
"""
import datetime
import gzip
import json
import time
import pyshark
class GetPacket:
def __init__(self, network_name, item_filter):
"""
:param network_name: 网卡名称
:param item_filter: 抓包过滤条件/规则同wireshark
"""
tshark_path = r"D:\Program Files\Wireshark\tshark.exe"
self.cap = pyshark.LiveCapture(interface=network_name, display_filter=item_filter, tshark_path=tshark_path)
def action(self):
requests_dict = {}
for pak in self.cap.sniff_continuously():
try:
request_in = pak.http.request_in
except AttributeError:
requests_dict[pak.frame_info.number] = pak
continue
self.parse_http(requests_dict[request_in], pak)
del requests_dict[request_in]
def gzip(self, data):
""" 由于pytshark没有帮我们进行gzip解压缩,所以这里手动判断解压一下
:param data: 16进制原数据
:return: 文本数据
"""
try:
return gzip.decompress(bytearray.fromhex(data)).decode('utf8')
except ValueError:
return data
def parse_http(self, request, response):
print('\n', request.http.request_full_uri, request.http.request_method)
data: dict = {'Request': {}, 'Response': {}}
for name, value in request.http._all_fields.items():
if name[:4] == 'http':
data['Request'][name] = value
for name, value in response.http._all_fields.items():
if name[:4] == 'http':
data['Response'][name] = value
if name == 'http.file_data':
if response.http.get('content_encoding') == 'gzip':
data['Response'][name] = self.gzip(response.http.data)
if response.http.get('content_type') == "application/json":
data['Response'][name] = json.loads(data['Response'][name])
self.write_file(data)
def write_file(self, data):
_time = datetime.datetime.now().strftime(f"%Y%m%d_{int(time.time() // 1000)}")
with open(f'packet_{_time}.txt', 'a') as f:
f.write(json.dumps(data, indent=4, ensure_ascii=False))
f.write('\n')
if __name__ == '__main__':
gp = GetPacket('VPN6 - VPN Client', 'http')
gp.action()
|