最近有需求将windows系统防火墙规则进行自动化管理,搜集了一些资料,也把网上的一些方法试验了一下,核心操作在这里分享给大家。 这里的规则是指的ip+端口的类似以白名单的方式进行的管理,默认将涉及的端口进行封闭,在远程,只允许指定的ip才能访问这些端口 最近大家都在卷评分,所以有空给我也来个五星好评,来了留言必回,我先上为敬!有空来玩~https://bbs.csdn.net/topics/603960367
防火墙增删操作
网上有很多关于pywin32库里关于防火墙的操作,如下是一个网上的示例,这个示例在测试过程中就会发现,规则可以添加,但是实际上是没有生效的。不知道是不是系统的原因,测试的环境是Python3.7和3.8的环境,然后操作系统是WIN10环境。同样的规则手动添加的可以生效,所以如果有大佬能解释的,麻烦留言指点一下,感激不尽!
import pythoncom
import win32com
class rule:
items = {}
items_name = {
"Action":'操作',
"ApplicationName":'程序',
"Description":'描述',
"Direction":'进站/出站',
"EdgeTraversal":'边缘穿越',
"EdgeTraversalOptions":'边缘穿越选项',
"Enabled":'已启用',
"Grouping":'组',
"IcmpTypesAndCodes":'ICMP设置',
"InterfaceTypes":'接口类型',
"Interfaces":'接口',
"LocalAddresses":'本地地址',
"LocalAppPackageId":'应用程序包',
"LocalPorts":'本地端口',
"LocalUserAuthorizedList":'授权的本地计算机',
"LocalUserOwner":'本地用户所有者',
"Name":'名称',
"Profiles":'配置文件',
"Protocol":'协议',
"RemoteAddresses":'远程地址',
"RemoteMachineAuthorizedList":'授权的远程计算机',
"RemotePorts":'远程端口',
"RemoteUserAuthorizedList":'授权的远程用户',
"SecureFlags":'安全',
"serviceName":'服务名'}
items_shell = {
"Action": 'action',
"ApplicationName": 'program',
"Description": 'description',
"Direction": 'dir',
"EdgeTraversal": 'edge',
"EdgeTraversalOptions": '边缘穿越选项',
"Enabled": 'enable',
"Grouping": '组',
"IcmpTypesAndCodes": 'ICMP设置',
"InterfaceTypes": 'interfacetype',
"Interfaces": '接口',
"LocalAddresses": 'localip',
"LocalAppPackageId": '应用程序包',
"LocalPorts": 'localport',
"LocalUserAuthorizedList": '授权的本地计算机',
"LocalUserOwner": '本地用户所有者',
"Name": 'name',
"Profiles": 'profile',
"Protocol": 'protocol',
"RemoteAddresses": 'remoteip',
"RemoteMachineAuthorizedList": 'rmtcomputergrp',
"RemotePorts": 'remoteport',
"RemoteUserAuthorizedList": 'rmtusrgrp',
"SecureFlags": 'security',
"serviceName": 'service'
}
def __init__(self,index):
self.index = index
for i in self.items_name.keys():
self.items[i] = ''
def init_by_app(self, app_in):
for key in self.items_name.keys():
self.items[key] = " " + str(eval("app_in."+key))
print(self.items[key] )
def init_by_dict(self,dirc_con):
flag = False
for item_key in self.items_name.keys():
if self.items_name[item_key] in dirc_con.keys():
flag = True
self.items[item_key] = dirc_con[self.items_name[item_key]]
if not flag:
for key in dirc_con.keys():
self.items[key] = dirc_con[key]
def create_rule(self):
app = win32com.client.Dispatch('HNetCfg.FwRule')
res = []
app.Action = int(self.items["Action"])
app.Description = self.items["Description"]
app.Direction = int(self.items["Direction"])
app.EdgeTraversal = self.items["EdgeTraversal"]
app.EdgeTraversalOptions = self.items["EdgeTraversalOptions"]
app.Enabled = self.items["Enabled"]
app.Grouping = self.items["Grouping"]
app.InterfaceTypes = self.items["InterfaceTypes"]
app.LocalAddresses = self.items["LocalAddresses"]
app.LocalAppPackageId = self.items["LocalAppPackageId"]
app.LocalUserOwner = self.items["LocalUserOwner"]
app.Name = self.items["Name"]
app.Profiles = self.items["Profiles"]
app.Protocol = self.items["Protocol"]
app.RemoteAddresses = self.items["RemoteAddresses"]
app.RemotePorts = self.items["RemotePorts"]
app.LocalPorts = self.items['LocalPorts']
app.SecureFlags = self.items["SecureFlags"]
return app
def __str__(self):
result = "="*10 + '\n序号 : ' + str(self.index) + '\n'
for key in self.items_name.keys():
result += self.items_name[key] + " : " + str(self.items[key]) +"\n"
return result
def add_rule(dict_value):
fw = win32com.client.gencache.EnsureDispatch('HNetCfg.FwPolicy2', 0)
apps = fw.Rules
print(apps.Count)
rule_obj = rule(-1)
rule_obj.init_by_dict(dict_value)
app = rule_obj.create_rule()
apps.Add(app)
def del_rule(dict_value):
fw = win32com.client.gencache.EnsureDispatch('HNetCfg.FwPolicy2', 0)
apps = fw.Rules
print("before :", apps.Count)
rule_obj = rule(-1)
rule_obj.init_by_dict(dict_value)
for app in apps:
print(rule_obj.items['Name'] , str(app.Name))
print(rule_obj.items['LocalPorts'] , str(app.LocalPorts))
print(rule_obj.items['RemoteAddresses'] , str(app.RemoteAddresses))
if rule_obj.items['Name'] == str(app.Name) and rule_obj.items['LocalPorts'] == str(app.LocalPorts) and rule_obj.items['RemoteAddresses'] == str(app.RemoteAddresses):
apps.Remove(str(app.Name))
print("after :", apps.Count)
if __name__ == '__main__':
my_dict = {
'序号' : '2',
'操作' : '0',
'程序' : '',
'描述' : '',
'进站/出站' : '1',
'边缘穿越' : 'False',
'边缘穿越选项' : '0',
'已启用' : 'True',
'组' : '',
'ICMP设置' : '',
'接口类型' : 'All',
'接口' : 'None',
'本地地址' : '*',
'应用程序包' : '',
'本地端口' : '9876',
'授权的本地计算机' : '',
'本地用户所有者' : '',
'名称' : 'test_cmd',
'配置文件' : '2',
'协议' : '6',
'远程地址' : '114.115.250.41/255.255.255.255',
'授权的远程计算机' : '',
'远程端口' : '*',
'授权的远程用户' : '',
'安全' : '0',
'服务名' : ''
}
add_rule(my_dict)
del_rule(my_dict)
所以我选择的是在用os的popen调用系统命令进行规则的调整
netsh advfirewall firewall add rule name="myrule" protocol=TCP dir=in localport=8080 action=allow
具体的参数解析可以参考 https://www.inqingdao.cn/360.html 这里主要分清楚add是添加,delete是删除,action中allow是允许,而block是阻止 remoteip是指的远程地址,localport指的是本地的端口 值得注意的是,删除操作不需要很精确的描述,在防火墙规则中是可以同时添加多个同名的规则的,所以在这个过程中必须做相应的配置,删除需要指定规则精度,选择的描述比较粗略就会删除一大堆的东西。
windows服务
windows 服务我参考的是网上比较推荐的用Python写的windows服务脚本,通过python 文件名 install的方式可以直接安装。 需要注意的是环境的配置,库安装的是pywin32 我们需要在windows系统的环境变量path下加入系统中python的斯特-packages中win32和pywin32_system32的文件夹位置,以及在 系统中Python本身的位置,好多人运行这个会发现系统没有响应的错误,主要是python环境变量没有设置。
import win32timezone
from logging.handlers import TimedRotatingFileHandler
import win32serviceutil
import win32service
import win32event
import os
import logging
import inspect
import time
import shutil
class PythonService(win32serviceutil.ServiceFramework):
_svc_name_ = "PythonService"
_svc_display_name_ = "Clearjob"
_svc_description_ = "Clear system files"
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
self.logger = self._getLogger()
self.path = 'D:\\WebSite'
self.T = time.time()
self.run = True
def _getLogger(self):
'''日志记录'''
logger = logging.getLogger('[PythonService]')
this_file = inspect.getfile(inspect.currentframe())
dirpath = os.path.abspath(os.path.dirname(this_file))
if os.path.isdir('%s\\log'%dirpath):
pass
else:
os.mkdir('%s\\log'%dirpath)
dir = '%s\\log' % dirpath
handler = TimedRotatingFileHandler(os.path.join(dir, "Clearjob.log"),when="midnight",interval=1,backupCount=20)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def SvcDoRun(self):
self.logger.info("service is run....")
try:
while self.run:
self.logger.info('---Begin---')
for path, name, file in os.walk('D:\\Website'):
if path == 'D:\\Website':
for IISname in name:
floder = []
for i in os.listdir(os.path.join(path, IISname)):
if i.isdigit():
floder.append(int(i))
if len(floder) == 0:
pass
elif len(floder) >= 2:
floder.sort()
for x in floder[:(len(floder) - 2)]:
self.logger.info("delete dir: %s" % os.path.join(os.path.join(path, IISname), str(x)))
shutil.rmtree(os.path.join(os.path.join(path, IISname), str(x)))
self.logger.info('---End---')
time.sleep(10)
except Exception as e:
self.logger.info(e)
time.sleep(60)
def SvcStop(self):
self.logger.info("service is stop....")
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
self.run = False
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(PythonService)
服务的安装和使用的方式参考https://blog.csdn.net/wo446100076/article/details/80973551
业务的逻辑
这里选择将ip和port的数据从远端获取,利用requests库get到相关的json格式数据,并做定时循环。 实现动态管理的话就是集合的相关操作,将前一次的存入文件中,后一次得到的数据和他进行比较,重复的就是不操作的,新增的需要在规则上新增,没有了的就需要在规则上删除
其他
防火墙的操作一定要搞清楚防火墙的优先级,在windows中,规则设定为同样的规则,阻止的优先级高于允许的,精确的规则优先于粗略的规则,搞清楚这一点对于操作防火墙很重要。
我也是头一次操作windows的防火墙,如有不妥敬请批评指正。
|