服务器上的web后端经常需要同时运行多个进程,各个进程之间需要交换数据,这个功能在Python中可以借助multiprocessing实现。更进一步,进程中断以后我们想要开发人员第一时间收到消息以进行改进和维护,这就需要借助进程管理包psutil和python自带的电子邮件发送模块smtplib和email实现。
一、 mutiprocessng多进程编程和通信
为了模拟多进程通信,我们设计了三个模块来实现这个功能,分别是接收(receiver),解析(parser),和发布(publish),数据通过mutiprocessing的队列Queue由接收模块传给解析,再由解析模块传给发布。
Receiver这里通过一个while循环不断产生自然数,把他放到队列:
def receiver(self, queue):
while True:
self.count += 1
if not queue.full():
print(multiprocessing.current_process().pid, "进程放数据", self.count)
queue.put(self.count)
time.sleep(1)
else:
time.sleep(2)
parser对数据进行进一步加工,他需要两个队列参数,一个负责从receiver接收,一个负责给publisher传递。当receiver传递过来的数据等于20时,故意让程序报错,模拟进程报错down掉。
def parser(self, queue, queue2):
while True:
if not queue.empty():
num = queue.get()
if num == 20:
a = int("hello world")
self.info = "Now count is %d" % a
queue2.put(self.info)
else:
print(multiprocessing.current_process().pid, "进程接收数据", num)
self.info = "Now count is {}".format(num)
queue2.put(self.info)
else:
pass
publisher负责发布数据
def publish(self, queue):
while True:
if not queue.empty():
msg = queue.get()
print("进程", multiprocessing.current_process().pid, "Publish info:", msg)
else:
pass
参考mutiliprocessing官方文档https://docs.python.org/zh-cn/3/library/multiprocessing.html#connection-objects
二、进程监测
分析
对于这种情况,我们可以使用如下的方案去实现: 方案一:使用 Zabbix/Prometheus监控系统,对Java应用程序做 TCP 端口检测。如果检测端口不通,就设置检测失败的触发器。然后实现告警. 方案二: 使用 Zabbix的自定义Key去实现告警,自定义key 的内容是执行一个shell脚本,Shell脚本用来检测进程是否存在,不存在的话,输出为 0,存在输出为1。然后Zabbix 的触发器 设置最近的T值 不为1,从而来实现进程状态检测的告警。 方案三:编写Python脚本用来检测进程的状态,然后使用Python的内置库,实现邮件告警。
这里我们选择方案三,要实现的功能:检测一个进程是否变变成了僵尸进程 思路: 1.首先Python程序需要检测给定进程是否正常运行。 2.检测到进程如果正常,不做任何处理;如果已经变成了僵尸进程,就需要触发邮件告警的函数 3.Python程序需要定时定期地去执行检测脚本。(可选,我这里没实现,可以通过shell脚本定期执行一遍python文件来实现,或者python自己sleep,然后设置成linux后台的守护进程)
def checkprocess(self, target_pid):
self.target_pid = target_pid
pid1, pid2, pid3 = self.target_pid
while True:
print('Monitoring')
process1 = psutil.Process(pid1)
process2 = psutil.Process(pid2)
process3 = psutil.Process(pid3)
if process1.is_running and process1.status() != psutil.STATUS_ZOMBIE:
pass
else:
self.sendmail(self.receiver, "进程监测", "receiver进程变成了僵尸进程!,请检测原因")
if process2.is_running and process2.status() != psutil.STATUS_ZOMBIE:
pass
else:
self.sendmail(self.receiver, "进程监测", "parser进程变成了僵尸进程!请检测原因")
if process3.is_running and process3.status() != psutil.STATUS_ZOMBIE:
pass
else:
self.sendmail(self.receiver, "进程监测", "receiver进程变成了僵尸进程!请检测原因")
time.sleep(5)
关于进程的各种状态,参考psutil官方文档:https://psutil.readthedocs.io/en/latest/index.html?highlight=status#process-status-constants
三、Python邮件发送功能
使用python实现电子邮件发送,并使用类封装成一个模块,需要用到python自带的email和smtplib库
- smtplib负责创建SMTP的操作对象并连接smtp目标服务器,调用对象中的方法,发送邮件到目标地址
- email库用来创建邮件对象(常见的有纯文本邮件对象、作为附件的图片对象,多种对象混合的邮件对象)
- 用python代理登录qq邮箱发邮件,是需要更改自己qq邮箱设置的。在这里大家需要做两件事情:邮箱开启SMTP功能 、获得授权码,参考:https://zhuanlan.zhihu.com/p/25565454
from email.mime.text import MIMEText
from email.header import Header
import smtplib
class SendEmail(object):
def __init__(self):
self.host_server = 'smtp.qq.com'
self.sender_qq = 'xxxxx'
self.pwd = 'xxx'
self.sender_qq_mail = 'xxx@qq.com'
self.smtp = smtplib.SMTP_SSL(self.host_server)
self.smtp.ehlo(self.host_server)
self.smtp.login(self.sender_qq, self.pwd)
def sendmail(self, receivers, mail_title, mail_content):
msg = MIMEText(mail_content, "plain", 'utf-8')
msg["Subject"] = Header(mail_title, 'utf-8')
msg["From"] = self.sender_qq_mail
msg["To"] = ",".join(receivers)
try:
self.smtp.sendmail(self.sender_qq_mail, msg['To'].split(','), msg.as_string())
print("mail has been post successfully")
except smtplib.SMTPException as e:
print(repr(e))
if __name__ == "__main__":
sm = SendEmail()
sm.sendmail(['xxx@163.com', 'xxx@gmail.com'], "python测试","你好,现在在进行一项用python登录qq邮箱发邮件的测试")
四、完整代码
下面是封装成类以后的完整代码: receiver.py
import time
import multiprocessing
class Recver:
def __init__(self):
self.count = 0
def recver(self, queue):
while True:
self.count += 1
if not queue.full():
print(multiprocessing.current_process().pid, "进程放数据", self.count)
queue.put(self.count)
time.sleep(1)
else:
time.sleep(2)
parser.py
import multiprocessing
class Parser:
def __init__(self):
self.info = "null"
def parser(self, queue, queue2):
while True:
if not queue.empty():
num = queue.get()
if num == 20:
a = int("hello world")
self.info = "Now count is %d" % a
queue2.put(self.info)
else:
print(multiprocessing.current_process().pid, "进程接收数据", num)
self.info = "Now count is {}".format(num)
queue2.put(self.info)
else:
pas
publisher.py
import multiprocessing
class Publisher():
def __init__(self):
pass
def publish(self, queue):
while True:
if not queue.empty():
msg = queue.get()
print("进程", multiprocessing.current_process().pid, "Publish info:", msg)
else:
pass
监测脚本monitor.py
from email.mime.text import MIMEText
from email.header import Header
import smtplib
import psutil
import time
class Monitor(object):
def __init__(self):
self.target_pid = []
self.host_server = 'smtp.qq.com'
self.sender_qq = 'xxx'
self.pwd = 'xxxxx'
self.sender_qq_mail = 'xxx@qq.com'
self.receiver = ['xxx']
self.smtp = smtplib.SMTP_SSL(self.host_server)
self.smtp.ehlo(self.host_server)
self.smtp.login(self.sender_qq, self.pwd)
def sendmail(self, receivers, mail_title, mail_content):
msg = MIMEText(mail_content, "plain", 'utf-8')
msg["Subject"] = Header(mail_title, 'utf-8')
msg["From"] = self.sender_qq_mail
msg["To"] = ",".join(receivers)
try:
self.smtp.sendmail(self.sender_qq_mail, msg['To'].split(','), msg.as_string())
self.smtp.quit()
print("mail has been post successfully")
except smtplib.SMTPException as e:
print(repr(e))
def checkprocess(self, target_pid):
self.target_pid = target_pid
pid1, pid2, pid3 = self.target_pid
while True:
print('Monitoring')
process1 = psutil.Process(pid1)
process2 = psutil.Process(pid2)
process3 = psutil.Process(pid3)
if process1.is_running and process1.status() != psutil.STATUS_ZOMBIE:
pass
else:
self.sendmail(self.receiver, "进程监测", "receiver进程变成了僵尸进程!,请检测原因")
if process2.is_running and process2.status() != psutil.STATUS_ZOMBIE:
pass
else:
self.sendmail(self.receiver, "进程监测", "parser进程变成了僵尸进程!请检测原因")
if process3.is_running and process3.status() != psutil.STATUS_ZOMBIE:
pass
else:
self.sendmail(self.receiver, "进程监测", "receiver进程变成了僵尸进程!请检测原因")
time.sleep(5)
启动脚本start.py
from receiver import Recver
from parser import Parser
from publisher import Publisher
from monitor import Monitor
import multiprocessing
queue1 = multiprocessing.Queue(maxsize=5)
queue2 = multiprocessing.Queue(maxsize=5)
queue_pid = multiprocessing.Queue(maxsize=3)
if __name__ == '__main__':
jieshou = Recver()
jiexi = Parser()
publisher = Publisher()
monitor = Monitor()
process1 = multiprocessing.Process(target=jieshou.recver, args=(queue1,), daemon=True)
process2 = multiprocessing.Process(target=jiexi.parser, args=(queue1, queue2), daemon=True)
process3 = multiprocessing.Process(target=publisher.publish, args=(queue2,), daemon=True)
process1.start()
process2.start()
process3.start()
target_pid = (process1.pid, process2.pid, process3.pid)
process4 = multiprocessing.Process(target=monitor.checkprocess, args=(target_pid,), daemon=True)
process4.start()
process1.join()
运行结果:
在传过来20时process2报错
收到的告警邮件
|