前言
上周接到一个活,说是分析一下机房新设备的告警日志,我没当回事,直到周五的时候看到40W+的告警日志之后,我炸了。 老设备平常一周也不过上万条,涉及规则20来个,一天就分析完了,新设备上来就是40W+的告警次数,涉及规则300+,最最痛苦的还是,虽然我能看到告警事件排名,虽然设备还不支持导出排名信息,虽然我还能接受手工复制每页的排名信息,但是,300+的规则好几页,设备无法完成翻页操作,卡出翔了!!!唯一的突破口,就是直接处理40W+的日志,python在手,冲冲冲
最终要提交的,举个例子,是如下的样子 要提交的报告,从图片中可以看出:
- 事件Tid、事件名称、触发数量、判断依据,这4列的顺序是绑定的
- 表格制作思路是:
- 先获取到所有的tid,并进行次数统计
- 根据tid找到对应的事件名称
- 根据tid找到对应的数据包,对数据包进行分析
需求分析:
- 脚本要实现对原始的40W+告警信息做一个排名处理,获知究竟涉及哪些规则,以及触发数量
- 对于判断依据,最好可以使用脚本,实现规则明文与取证包匹配,并拿到匹配结果
获取攻击统计排名
首先下载所有的告警事件,有4个CSV文件,手工复制出所有的规则编号,存放到一个txt本文中即可跑脚本了。 注意:需要手动在第19行指明文件路径,脚本会在桌面生成测试结果
from collections import Counter
import os
import re
'''
脚本主要是针对超大规模的txt文档
'''
def deal_result(demo):
a = re.sub(r'Counter\({', '', demo)
b = re.sub(r'}\)', '', a)
c = re.sub(r'\'', '', b)
d = re.sub(r'\\n\x3a\s+', '\t', c)
e = re.sub(r',\s+', '\n', d)
return e
word = []
with open(r'C:\Users\asuka\Desktop\alltid.txt', encoding='utf-8') as f:
for line in f:
if line == '':
continue
word.append(line)
result = Counter(word)
str_result = str(result)
print('对你输入的内容进行次数统计:')
new_result = deal_result(str_result)
print(new_result)
desktop_path = os.path.join(os.path.expanduser("~"), 'Desktop')
write_path = os.path.join(desktop_path, '内容去重_结果统计.txt')
a = open(write_path, 'w', encoding='utf8')
a.write(new_result)
a.close()
接下来就是一些简单的后续处理了,再利用Excel排序的一点技巧,就能获取如下图所示的图表了。
获取“判断依据”
核心逻辑,就是使用python读取规则明文,然后去跟取证包匹配,并把匹配的结果写到Excel表中。相当于是一个安全开发的工具了。
content版
一种思路是,从规则中提取出content关键字里的内容,将其填写到Excel表中。
脚本的逻辑是:
- 拿到所有规则对应的规则明文
- 遍历规则明文的每一行,分离出content和规则编号tid,存储到一个字典中。键:tid;值:content
- 拿着excel表第2列中的tid去遍历字典中的键,一旦匹配,就写入对应的值
通过修改第31、32行的代码,指明规则和excel的路径即可。
import re
import openpyxl
import pprint
all_message = {}
def get_tid_content():
with open(rules_path, 'rb') as f:
contents = f.readlines()
for line in contents:
line = line.decode('gbk')
if line == '':
continue
if "content" in line:
message = re.findall(r'content\x3a\x22(.*?)\x22', line)
tid = re.findall(r'tid\x3a(\d+)', line)
re_tid = tid[0]
message1 = str(message).replace('[', '').replace(']', '')
re_message = "存在攻击特征:" + message1
all_message[re_tid] = re_message
return all_message
rules_path = r'C:\Users\asuka\Desktop\xxx.txt'
excel_path = r'C:\Users\asuka\Desktop\test.xlsx'
get_tid_content()
workbook = openpyxl.load_workbook(excel_path)
sheet = workbook.active
print('当前活动表是:' + str(sheet))
rows = sheet.max_row
print('tid位于第2列')
tid_column = 2
evidence_column = 7
for i in range(1, rows + 1):
if sheet.cell(i, tid_column).value in all_message.keys():
sheet.cell(i, evidence_column, all_message.get(sheet.cell(i, tid_column).value, 'NULL'))
workbook.save(excel_path)
workbook.close()
print('操作结束')
正则版
脚本的逻辑:
-
运行脚本,它首先从规则中提取出规则tid和第一个pcre(如果存在多个正则的话),将其存放到字典info中。 键:tid;值:pcre -
根据提取到的tid,去遍历所有的数据包,获取其绝对路径 考虑到一个tid对应多个数据包,这里会做去重,一个tid只记录一个数据包的绝对路径到字典info中 -
拿着数据包的绝对路径和pcre,去匹配出数据包的内容,并把匹配结果添加到字典info中 此时,一个合格的tid,对应的值必须有三个元素:pcre、数据包绝对路径、正则匹配的结果 -
将结果写入到excel表中
正则版的脚本,很贴心的一点是写入excel之前,会检查单元格中是否有数据,没得话直接写,有的话会合并之前的数据。
修改97~99行,指定文件路径即可:
import urllib.parse
from scapy.all import *
import re
import openpyxl
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
def pcap_parser(filename, keyword):
flag = True
pkts = rdpcap(filename)
for pkt in pkts.res:
try:
pkt_load = pkt.getlayer('Raw').fields['load'].decode().strip()
pkt_load = urllib.parse.unquote(pkt_load)
re_keyword = keyword
if re.search(re_keyword, pkt_load, re.I):
match_re = re.search(re_keyword, pkt_load, re.I).group()
print(os.path.basename(filename) + '\t' + '匹配成功:' + '\t' + match_re)
pcap_path_tid = filename.split('.', 1)[0].split('Event')[-1]
info[pcap_path_tid].append(match_re)
flag = False
break
except:
pass
if flag:
print(os.path.basename(filename) + '\t' + '匹配失败!')
def pcre_to_excel():
workbook = openpyxl.load_workbook(excel_path)
sheet = workbook.active
print('====================================================')
print('[1]:正则匹配数据包结束')
print('[2]:开始把正则匹配的结果写到Excel,当前活动表是:' + str(sheet))
rows = sheet.max_row
for i in range(1, rows + 1):
if sheet.cell(i, tid_column).value in info.keys():
if len(info[sheet.cell(i, tid_column).value]) == 3:
if sheet.cell(i, evidence_column).value is None:
sheet.cell(i, evidence_column, '正则匹配:' + info[sheet.cell(i, tid_column).value][-1])
else:
excel_value = str(sheet.cell(i, evidence_column).value) + '正则匹配:' + \
info[sheet.cell(i, tid_column).value][-1]
sheet.cell(i, evidence_column, excel_value)
workbook.save(excel_path)
workbook.close()
print('[3]:pcre写入结束')
def deal_tid_pcre_pcap():
abs_pcap_filepath = []
with open(rules_path, 'rb') as f:
contents = f.readlines()
for line in contents:
line = line.decode('gbk')
tid = re.findall(r'tid\x3a(\d+)', line)
pcre = re.findall(r'pcre\x3a\x22\x2f(.*?)\x2f\w{0,4}\x22\x3b', line)
if len(pcre) == 0:
continue
info[tid[0]] = [pcre[0]]
for current_folder, list_folders, files in os.walk(files_path):
for f in files:
if f.endswith('pcap') or f.endswith('pcapng'):
file_path = current_folder + '\\' + f
abs_pcap_filepath.append(file_path)
for re_tid in info.keys():
for i in abs_pcap_filepath:
pcap_name = os.path.basename(i)
if re_tid in pcap_name.split('.', 1)[0]:
info[re_tid].append(i)
break
info = {}
files_path = r"C:\Users\asuka\Desktop\all"
excel_path = r'C:\Users\asuka\Desktop\test.xlsx'
rules_path = r'C:\Users\asuka\Desktop\xxx.txt'
tid_column = 2
evidence_column = 7
print('[+] 程序开始运行')
print('[+] 开始获取tid、pcre、数据包路径')
deal_tid_pcre_pcap()
print('[+] 开始使用正则匹配数据包中的内容')
for i in info.values():
if len(i) == 2:
pcap_parser(i[-1], i[0])
print('[-] 匹配结束')
pcre_to_excel()
print('Enjoy It')
由于使用过content版的脚本之后,没有清空单元格,所以正则版的脚本会追加内容到content中来
完整版
完整版就是综合了上述二者。 脚本的逻辑是:首先读取规则库,单独提取出content,直接写到excel中,然后单独提取出正则,与经过url解码的数据包明文匹配,将匹配结果追加到Excel表中
import openpyxl
import re
import pprint
import logging
import urllib.parse
from scapy.all import *
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
def get_tid_content():
with open(rules_path, 'rb') as f:
contents = f.readlines()
for line in contents:
line = line.decode('gbk')
if line == '':
continue
if "content" in line:
message = re.findall(r'content\x3a\x22(.*?)\x22', line)
tid = re.findall(r'tid\x3a(\d+)', line)
for i in tid:
re_tid = i
message1 = str(message).replace('[', '').replace(']', '')
re_message = "存在攻击特征:" + message1
all_message[re_tid] = re_message
return all_message
def content_to_excel():
workbook = openpyxl.load_workbook(excel_path)
sheet = workbook.active
print('[+] 开始写入content匹配的结果,当前活动表是:' + str(sheet))
rows = sheet.max_row
for i in range(1, rows + 1):
if sheet.cell(i, tid_column).value in all_message.keys():
sheet.cell(i, evidence_column, all_message.get(sheet.cell(i, tid_column).value, 'NULL'))
workbook.save(excel_path)
workbook.close()
print('[-] content写入结束')
def pcap_parser(filename, keyword):
flag = True
pkts = rdpcap(filename)
for pkt in pkts.res:
try:
pkt_load = pkt.getlayer('Raw').fields['load'].decode().strip()
pkt_load = urllib.parse.unquote(pkt_load)
re_keyword = keyword
if re.search(re_keyword, pkt_load, re.I):
match_re = re.search(re_keyword, pkt_load, re.I).group()
print(os.path.basename(filename) + '\t' + '匹配成功:' + '\t' + match_re)
pcap_path_tid = filename.split('.', 1)[0].split('Event')[-1]
info[pcap_path_tid].append(match_re)
flag = False
break
except:
pass
if flag:
print(os.path.basename(filename) + '\t' + '匹配失败!')
def pcre_to_excel():
workbook = openpyxl.load_workbook(excel_path)
sheet = workbook.active
print('====================================================')
print('[1]:正则匹配数据包结束')
print('[2]:开始把正则匹配的结果写到Excel,当前活动表是:' + str(sheet))
rows = sheet.max_row
for i in range(1, rows + 1):
if sheet.cell(i, tid_column).value in info.keys():
if len(info[sheet.cell(i, tid_column).value]) == 3:
if sheet.cell(i, evidence_column).value is None:
sheet.cell(i, evidence_column, '正则匹配:' + info[sheet.cell(i, tid_column).value][-1])
else:
excel_value = str(sheet.cell(i, evidence_column).value) + '正则匹配:' + \
info[sheet.cell(i, tid_column).value][-1]
sheet.cell(i, evidence_column, excel_value)
workbook.save(excel_path)
workbook.close()
print('[3]:pcre写入结束')
def deal_tid_pcre_pcap():
abs_pcap_filepath = []
with open(rules_path, 'rb') as f:
contents = f.readlines()
for line in contents:
line = line.decode('gbk')
tid = re.findall(r'tid\x3a(\d+)', line)
pcre = re.findall(r'pcre\x3a\x22\x2f(.*?)\x2f\w{0,4}\x22\x3b', line)
if len(pcre) == 0:
continue
info[tid[0]] = [pcre[0]]
for current_folder, list_folders, files in os.walk(files_path):
for f in files:
if f.endswith('pcap') or f.endswith('pcapng'):
file_path = current_folder + '\\' + f
abs_pcap_filepath.append(file_path)
for re_tid in info.keys():
for i in abs_pcap_filepath:
pcap_name = os.path.basename(i)
if re_tid in pcap_name.split('.', 1)[0]:
info[re_tid].append(i)
break
def work():
print('[+] 程序开始运行')
get_tid_content()
content_to_excel()
print('[+] 开始获取tid、pcre、数据包路径')
deal_tid_pcre_pcap()
print('[+] 开始使用正则匹配数据包中的内容')
for i in info.values():
if len(i) == 2:
pcap_parser(i[-1], i[0])
print('[-] 匹配结束')
pcre_to_excel()
if __name__ == '__main__':
all_message = {}
info = {}
tid_column = 2
evidence_column = 7
excel_path = r'C:\Users\asuka\Desktop\test.xlsx'
rules_path = r'C:\Users\asuka\Desktop\xxx.txt'
files_path = r"C:\Users\asuka\Desktop\all"
work()
print('Done,Enjoy It')
|