背景
FastNetMon+Influxdb+Grafana+GoBGP可搭建一套基于 NetFLOW / sFLOW 的流量统计报告系统,其中:
- FastNetMon 是一个基于多种抓包引擎(NetFlow, IPFIX, sFLOW, netmap, PF_RING, PCAP)的DoS/DDoS攻击高效分析工具,可以探测和分析网络中的异常流量情况,同时可以通过外部脚本通知或阻断攻击;
- InfluxDB 是一款开源开源时序型数据库,和FastNetMon集成,用于将数据统计进行存储;
- Grafana 是一款非常强大且易用的数据可视化工具;
- GoBGP 是一个开源 BGP 实现,可以提供 BGP 协议的控制平面功能;
清洗需求
查询Influxdb,当某个IP段超过一定的阈值时,我们需要进行以下操作:
- 将开始时间、IP段、流量、是否过期、开始时间记录至统计数据库并开始计时;
- 执行Gobgp打标命令;
- 对于已过期的IP段,不要重复录入Mysql;
- 对于未过期的IP段,需保留历史记录,因此按步骤1再次记录至Mysql;
- 当计时延迟3600s结束之后,更新IP段的过期字段、结束时间并执行Gobgp去标命令;
需求分析
1.统计数据库选型
统计数据库我们可以复用Influxdb或使用Mysql,但Influxdb作为时序数据库,后续的字段更新必须基于时间戳和tag查询,字段的更新不友好,因此最终我们选择Mysql。
2.异步解耦
需求中bgp打标、去标、延迟执行等操作,在程序运行过程中计时、命令等待、命令返回错误等意外情况,都会导致运行中断,因此我们考虑使用Python + Celery(消息队列工具,可用于处理实时数据以及任务调度),来与以上情况进行异步解耦。
另,在实际应用中Celery可通过队列来调度任务,不用担心并发量高时系统负载过大。
环境准备
1.InfluxDB环境
docker pull influxdb:1.8
docker run -p 8086:8086 \
--name influxdb \
--restart unless-stopped \
-e DOCKER_INFLUXDB_INIT_USERNAME=admin \
-e DOCKER_INFLUXDB_INIT_PASSWORD=admin@123 \
-v /data/influxdb/data:/var/lib/influxdb \
-v /data/influxdb/config/influxdb.conf:/etc/influxdb/influxdb.conf \
-v /etc/localtime:/etc/localtime \
-d influxdb:1.8
数据库记录(若有测试需求,可私聊获取):
2.Mysql数据库
docker pull mysql:5.7
docker run -p 3306:3306 --name mysql \
-v /usr/local/docker/mysql/conf:/etc/mysql \
-v /usr/local/docker/mysql/logs:/var/log/mysql \
-v /usr/local/docker/mysql/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
-d mysql:5.7
docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7
docker exec -it mysql bash
mysql -uroot -p123456
create database fastnetmon
CREATE TABLE `statistic` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`network` varchar(40) DEFAULT NULL COMMENT 'IP段',
`bits_incoming` int(11) DEFAULT NULL COMMENT '进口流量',
`starttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`endtime` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '结束时间',
isexpire BOOLEAN DEFAULT NULL COMMENT '是否过期',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='统计表';
3.redis环境
yum install redis -y
systemctl start redis
4.python环境
conda create -n influx python=3.9
source activate influx
pip install influxdb celery redis pymysql
目录结构
influxdb
├── celery_app
│ ├── celeryconfig.py
│ ├── gobgp.py
│ ├── __init__.py
│ └── record.py
├── celery.log
├── dbconn.py
├── influx.log
├── influx.py
├── __init__.py
├── log.py
└── Readme
代码详解
1.主程序
vim log.py
import os
import logging
logging.basicConfig(
level = logging.INFO,
format = '%(asctime)s, %(filename)s, %(levelname)s, %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S',
filename = "influx.log",
filemode = 'a'
)
vim dbconn.py
import requests
import json
from log import logging
import pymysql
def QueryInflux(sql):
try:
url = "http://192.168.3.243:8086/query?pretty=true&db=fastnetmon&q=" + sql
res = requests.get(url, timeout=10)
return json.loads(res.text)
except Exception as e:
logging.error(e)
def ConnMySql():
mysql_conn = pymysql.connect(host= '192.168.3.243', port= 3306, user= 'root', password= '123456', db= 'fastnetmon')
return mysql_conn
def QueryMySql(sql):
try:
mysql_conn = ConnMySql()
with mysql_conn.cursor() as cursor:
cursor.execute(sql)
select_result = cursor.fetchone()
return select_result
except Exception as e:
logging.error(e)
def InsertMySql(sql):
try:
mysql_conn = ConnMySql()
with mysql_conn.cursor() as cursor:
cursor.execute(sql)
mysql_conn.commit()
except Exception as e:
mysql_conn.rollback()
logging.error(e)
vim influx.py
from celery_app import gobgp,record
from dbconn import QueryInflux,QueryMySql,InsertMySql
from log import logging
import datetime
def WorkHard(*lst):
try:
gobgp.cmd.delay("/opt/gobgp/gobgp global rib add -a ipv4 " + lst[0][2] +" community 65100:888")
starttime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = "INSERT INTO statistic (network, bits_incoming, isexpire, starttime ) VALUES ('{0}','{1}', '{2}', '{3}')".format(lst[0][2], lst[0][1], 0, starttime)
logging.info("插入IP段 %s 到 statistic表" % lst[0][2])
InsertMySql(sql)
record.update.apply_async(args = [lst[0][2]], countdown = 3600)
except Exception as e:
logging.error(e)
if __name__ == '__main__':
top_sql = "select top(bits_incoming, network, 30),network from networks_traffic where time > now() - 6s"
top_res = QueryInflux(top_sql)
try:
for item in top_res["results"]:
if "series" in item:
for series in item["series"]:
for value in series["values"]:
if value[1] > 184549376:
exist_measure = QueryMySql("select * from statistic limit 1;")
if exist_measure is None:
WorkHard(value)
else:
exist_isexp_sql = "select count(*) from statistic where network = \'" + value[2] + "\' and isexpire = '1'"
exist_isexp_ip = QueryMySql(exist_isexp_sql)
if exist_isexp_ip[0] == 0:
exist_noexp_sql = "select count(*) from statistic where network = \'" + value[2] + "\' and isexpire = '0'"
exist_noexp_ip = QueryMySql(exist_noexp_sql)
if exist_noexp_ip[0] == 0:
WorkHard(value)
else:
logging.info("IP段 %s 已存在且未过期,跳过" % value[2])
else:
WorkHard(value)
except Exception as e:
logging.error(e)
注意:当主程序第一次运行时,需要首先考虑到数据库为空的情况,然后再判断是否存在过期数据。
2.Celery异步操作
vim __init__.py
from celery import Celery
app = Celery('tasks')
app.config_from_object('celery_app.celeryconfig')
vim celeryconfig.py
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_IMPORTS = (
'celery_app.gobgp',
'celery_app.record'
)
vim gobgp.py
import sys
import subprocess
from celery_app import app
sys.path.append("..")
@app.task
def cmd(command):
try:
subp = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subp.wait(3)
if subp.poll() == 0:
res = subp.communicate()
else:
res = subp.stderr.read()
print(res)
except Exception as e:
print(e)
vim record.py
from celery_app import app,gobgp
import sys
sys.path.append("..")
from dbconn import QueryMySql,InsertMySql,QueryInflux
import datetime
@app.task
def update(network):
try:
gobgp.cmd("/opt/gobgp/gobgp globa rib del " + network + " -a ipv4")
endtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
InsertMySql("update statistic set endtime = '{0}', isexpire = '{1}' where network = '{2}' and isexpire = '{3}'".format(endtime, 1, network, 0))
print('设置IP段 %s 过期' % network)
except Exception as e:
print(e)
Celery任务主要分为:
- 命令执行模块,执行具体的命令操作,捕获命令返回结果、命令异常捕获;
- 数据库记录模块,命令执行及数据库记录更新等操作;
程序执行
cd influxdb
nohup celery -A celery_app worker --loglevel=info -c 4 >> celery.log 2>&1 &
python influx.py
cd influxdb
celery任务在此目录下查看celery.log
主程序日志在此目录下查看influx.log
Mysql数据库新记录、未过期记录、过期记录效果如下:
总结
通过这篇文章,如果你想快搭建一套基于 NetFLOW / sFLOW 的流量统计报告系统,你可以体验下FastNetMon+Influxdb+Grafana+GoBGP的解决方案;如果你想学习Python + Celery 的具体使用,也可参考清洗需求来进行实践。
|