具体实现的功能: 每5分钟爬取阿里云漏洞库的页面,找到漏洞评分大于9分的漏洞并告警,再通过阿里云的API找出存在这些新增漏洞的服务器。 整体思路:(总共有5个模块) 1、漏洞类AVD:属性有漏洞详情,漏洞名,漏洞id,漏洞评分(属性用变量表示,这种变量叫实例变量); 漏洞类里还有通过API获取漏洞类型的函数def avd_gettype(self, client)以及存在漏洞的服务器信息的函数def avd_getuuid(self, client),需要传入API的客户端的参数; 2、数据库连接函数connect_db(),并返回一个连接,便于连接函数 3、游标函数conndb_cursor(conn),便于爬取页面时对数据库进行插入操作 4、爬虫函数grab(client, cursor, url),需要传入的参数有客户端,游标,爬取的页面地址。 5、钉钉群消息的推送dingding(mes),要注意钉钉群机器人关键字的设置,没有关键字就会被屏蔽掉
import json
import sqlite3
import time
from operator import itemgetter
import requests
from bs4 import BeautifulSoup
from alibabacloud_sas20181203.client import Client as Sas20181203Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sas20181203 import models as sas_20181203_models
access_key_id = 'key_id'
access_key_secret = 'key_secret'
avd_id_list = []
class AVD:
"""
这是一个关于漏洞的类,有漏洞的AVD编号avd_id,漏洞名称avd_name,漏洞危险评分avd_sorce,漏洞的详情页面avd_href
"""
avd_url = ''
avd_name = ''
avd_id = ''
avd_sorce = 0.0
def __init__(self, detail_url):
d_response = requests.get(url=detail_url)
detail_object = BeautifulSoup(d_response.text, "html.parser", from_encoding="utf-8")
d_name = detail_object.find('span', class_='header__title__text').text
d_score = float(detail_object.find('div', class_='cvss-breakdown__score cvss-breakdown__score--high').text.strip())
d_id = detail_object.find('ol', class_="breadcrumbs__list").find_all('li',class_="breadcrumbs__list-item-last CMSBreadCrumbsLink")[2].text.strip()
self.avd_url = detail_url
self.avd_name = d_name
self.avd_id = d_id
self.avd_sorce = d_score
def avd_gettype(self, client):
describe_grouped_vul_request = sas_20181203_models.DescribeGroupedVulRequest(
alias_name=self.avd_name
)
groupedvulitems = client.describe_grouped_vul(describe_grouped_vul_request)
avd_type = groupedvulitems.body.grouped_vul_items
if len(avd_type) == 0:
print(self.avd_name, '没有找到该漏洞类型')
return ''
return avd_type[0].type
def avd_getuuid(self, client):
tp = self.avd_gettype(client)
if tp == '':
print('没有服务器有该漏洞')
return ''
else:
describe_vul_list_request = sas_20181203_models.DescribeVulListRequest(
type=tp,
alias_name=self.avd_name
)
vul_records = client.describe_vul_list(describe_vul_list_request)
uuid = vul_records.body.vul_records[0].uuid
print('存在该漏洞的服务器为:', uuid)
dd_uuid = {
"at": {
"atMobiles": [
"180xxxxxx"
],
"atUserIds": [
"user123"
],
"isAtAll": False
},
"text": {
"content": "存在新增高危漏洞:\n%s\n出现该漏洞的服务器为:\n%s" % (self.avd_name, uuid)
},
"msgtype": "text"
}
dingding(dd_uuid)
return uuid
def client_sas():
global access_key_id
global access_key_secret
config = open_api_models.Config(access_key_id, access_key_secret)
config.endpoint = f'tds.aliyuncs.com'
client_s = Sas20181203Client(config)
return client_s
def connect_db():
conn = sqlite3.connect('D:/vul.db')
conn.isolation_level = None
return conn
def conndb_cursor(conn):
cursor = conn.cursor()
sql_create = 'create table if not exists avd_table(avd_id varchar(30) PRIMARY KEY, avd_name varchar(30), avd_score integer)'
try:
cursor.execute(sql_create)
except Exception as e:
print('抛出异常', e)
exit(1)
sql_selectID = 'select avd_id from avd_table'
cursor.execute(sql_selectID)
fetchall = cursor.fetchall()
if len(fetchall):
for i in fetchall:
global avd_id_list
avd_id_list.append(i[0])
else:
print('表内还没有数据')
return cursor
def dingding(mes):
webhook = 'webhook地址'
headers = {'content-type': 'application/json'}
r = requests.post(webhook, headers=headers, data=json.dumps(mes))
r.encoding = 'utf-8'
return r.text
def parse_url(url):
response = requests.get(url)
htm_page = BeautifulSoup(response.text, "html.parser", from_encoding="utf-8")
spqn_page = htm_page.find('span', class_="text-muted")
print(spqn_page.text)
print(spqn_page.text[8:10])
def grab(client, cursor, url):
response = requests.get(url)
high_risk_list = BeautifulSoup(response.text, "html.parser", from_encoding="utf-8")
a_label = high_risk_list.find('tbody').find_all('a')
if len(a_label) == 0:
print('该页面没有漏洞数据')
return 0
for a in a_label:
detail = 'https://avd.aliyun.com' + a.get('href')
vul_item = AVD(detail)
if vul_item.avd_id not in avd_id_list:
sql_insert = 'insert into avd_table(avd_id,avd_name,avd_score) values("%s","%s","%s")' % (vul_item.avd_id, vul_item.avd_name, vul_item.avd_sorce)
cursor.execute(sql_insert)
avd_id_list.append(vul_item.avd_id)
if vul_item.avd_sorce > 9.0:
vul_item.avd_getuuid(client)
dding = {
"msgtype": "link",
"link": {
"text": vul_item.avd_name,
"title": "新增高危漏洞",
"picUrl": "",
"messageUrl": url
}
}
dingding(dding)
return 1
if __name__ == '__main__':
client = client_sas()
conn = connect_db()
cursor = conndb_cursor(conn)
for i in range(1, 3):
j = 0
while True:
j = j+1
pre_url = r'https://avd.aliyun.com/high-risk/list?page=' + str(j)
print(pre_url)
res = grab(client, conn, pre_url)
if res == 0:
print('退出本次%d爬取', i)
break
time.sleep(5*60)
sql_select = 'select * from avd_table'
cursor.execute(sql_select)
print('----查询表的数据-----')
data = cursor.fetchall()
for d in data:
print(d)
cursor.close()
conn.close()
|