#延时双删
#首先导入 四个模块
import queue
import datetime
import functools
import threading
#延时任务类
class DelayTask:
#初始化了 延时时间 和 具体任务
def __init__(self,delay_time,job_func):
#时间 秒 浮点数 为单位
self.delay_time = delay_time
#方法
self.job_func = job_func
#将时间转换为秒
def total_seconds(dt):
# 具体用来查看是否到达时间
return dt.seconds + dt.days * 24 * 60 * 60
#延时(delay) 队列(queue)
class DelayQueue(queue.PriorityQueue):
#其他来时实现我们的延迟操作
def __init__(self):
#初始化 线程里的Condition 它维护了一个(Lock,Rlock)锁和一个waiting池
# 首先acquire一个变量条件 然后进项判断一下条件 条件不满足则wait 满足
# 进行一些处理后 通过notify方法通知其他处于wait状态的线程 接收到通知会重新判断条件 ,
self.can_done = threading.Condition()
#然后使用super 掉用父类的构造器函数进行初始化 #不用会覆盖父类方法的
# 调用父类的构造函数进行初始化
super(DelayQueue,self).__init__()
#入队操作 以元祖的方式存对象(执行时间,执行方法)
def put_task(self,task):
self.put((task.delay_time,task))
#查看队顶数据
def _peek(self):
#获取锁
self.not_empty.acquire()
try:
#遍历队列
while not self._qsize():#队列长度
self.not_empty.wait() #等待
return self.queue[0][1]
except Exception as e:
print(e)
finally:
#释放锁
self.not_empty.release()
#出队
def get_task(self):
#获取锁
self.can_done.acquire()
try:
#获取对顶任务
task = self._peek()
#获取延时的时间
delta = total_seconds(task.delay_time- datetime.datetime.now())
#判断时间是否满足
while delta > 0:
#延时等待 线程为是否cindition内部的锁 并进程blocked状态 # 同时在waiting池中记录这个线程
self.can_done.wait(delta)
#更新最新的时间 #否则陷入死循环
task = self._peek()
delta = total_seconds(task.delay_time - datetime.datetime.now())
item = self.get() #从队列中获取任务并移除此任务
self.can_done.notify_all() #唤醒此条件下的说有线程 防止处于沉默状态
return item[1]
except Exception as e:
print(e)
finally:
#解锁
self.can_done.release()
# 测试方法
def test(id):
print("5秒以后删除用户%s的缓存" % id)
#声明装饰器任务
task = functools.partial(test,id=29)#生产一个新的函数
#声明延时任务
task_delay = DelayTask(delay_time=datetime.datetime.now() + datetime.timedelta(seconds=4),job_func=task)
#实例化队列
delay_queue = DelayQueue()
#入队
delay_queue.put_task(task_delay)
#出队
res = delay_queue.get_task()
#执行方法
res.job_func() #执行方法
|