业务场景:在当前遇到的业务场景中,我们需要启一个间隔任务,这个间隔任务跑一个算法,然后把算法的结果进行一些处理,并入库。任务目前间隔是一小时,算法运行时间要50多分钟,留给结果处理的时间并不多,所以有可能会出现超时。目前来说,优化方向在算法上会更为合理,因为结果处理本来就不用很多时间。但是在这个业务场景下,想要把结果处理的时间进行无限压缩,压缩到0,其实也是可以实现的,说是压缩为0,实际上就是在算法执行完成后,再启一个进程去处理,这样就不会由于需要进行数据处理而影响到算法的运行,将算法和结果处理分为两个独立的进程去处理。在最开始的程序中,是把算法运行和结果处理作为一个周期,而现在是把算法运行和结果处理分为两个周期去处理。
技术实现方案:
启动二个进程,其中一个运行算法,在算法运行结束后,发送一个状态值到另外一个进程,另外一个进程在收到状态量后启动数据处理即可。两个进程间互不影响即可。其实也相当于算法进程控制数据处理进程
测试场景构造代码:
from multiprocessing import Process,Pipe
import time
import sys
import os
def send_message(conn):
for i in range(1000):
print('send_message:%d'%i)
print(os.getpid())
conn.send(i)
time.sleep(3)
def send_message1(conn):
# for i in range(1000):
print(conn.recv())
while True:
if conn.recv() % 5 == 0:
print(' today is nice day')
time.sleep(1)
if __name__ == '__main__':
????????#创建一个进程通信管道
left,right = Pipe()
t1 = Process(target=send_message,args=(left,))
t2 = Process(target=send_message1,args=(right,))
t1.start()
t2.start()
在这个案例场景下有一些需要注意的点:
一、time.sleep()的问题,睡眠指定时间,总是会出错,具体的出错原因到现在也没有找到,这是原来出现的问题,在这里没有做长时间的测试,所以不一定会出现,但是还是要注意
二、代码实现中与上述的描述差异有一些,如未启用调度任务,只是启了一个间隔运行的任务。
三、数据处理进程一直处理空跑状态,会造成资源的浪费(更合理的应该是形成阻塞状态,但是对于阻塞状态的构造缺乏认知,所以先牺牲资源
四、在上述描述的需求中,在算法运行及数据处理的上一节点还有一个调度任务在控制,这里未做出体现,其实应该把定时任务和数据处理作为两个周期独立出来才更符合上述描述中的需求。
|