由于我实际使用中遇到了,开了线程后,销毁线程但是由于奇怪的原因一直没有成功,定位后发现,是由于线程内有死循环的原因。
由于机制问题导致线程必须要完成当前的操作后才能退出,死循环的话就退不了,但是也有办法.
举个简单的例子如下:
import threading
import time
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.Flag=True #停止标志位
self.Parm=0 #用来被外部访问的
#自行添加参数
def run(self):
while(True):
if(not self.Flag):
break
else:
time.sleep(10)
print(time.strftime('%Y.%m.%d %H:%M:%S ',time.localtime(time.time())))
def setFlag(self,parm): #外部停止线程的操作函数
self.Flag=parm #boolean
def setParm(self,parm): #外部修改内部信息函数
self.Parm=parm
def getParm(self): #外部获得内部信息函数
return self.Parm
if __name__=="__main__":
testThread=MyThread()
testThread.setDaemon(True) #设为保护线程,主进程结束会关闭线程
testThread.getParm() #获得线程内部值
testThread.setParm(1) #修改线程内部值
testThread.start() #开始线程
print(testThread.getParm()) #输出内部信息
time.sleep(2) #主进程休眠 2 秒
print(time.strftime('%Y.%m.%d %H:%M:%S ',time.localtime(time.time())))
testThread.setFlag(False) #修改线程运行状态
time.sleep(2) #2019.04.25 修改
print(testThread.is_alive()) #查看线程运行状态
time.sleep(10)
print(testThread.is_alive()) #查看线程运行状态
运行后发现:
1
2022.03.30 10:34:23
True
2022.03.30 10:34:31
False
实际按需修改:
def track_strategy_worker(self, strategy, name, interval=10, **kwargs):
"""下单worker
:param strategy: 策略id
:param name: 策略名字
:param interval: 策略的时间间隔,单位为秒"""
while True:
if (not self.flag):
break
else:
#print('轮询间隔:'+time.strftime('%Y.%m.%d %H:%M:%S ',time.localtime(time.time())))
try:
transactions = self.query_strategy_transaction(
strategy, **kwargs
)
# pylint: disable=broad-except
except Exception as e:
logger.exception("无法获取策略 %s 调仓信息, 错误: %s, 跳过此次调仓查询", name, e)
time.sleep(3)
continue
for transaction in transactions:
trade_cmd = {
"strategy": strategy,
"strategy_name": name,
"action": transaction["action"],
"stock_code": transaction["stock_code"],
"amount": transaction["amount"],
"price": transaction["price"],
"datetime": transaction["datetime"],
}
if self.is_cmd_expired(trade_cmd):
continue
logger.info(
"策略 [%s] 发送指令到交易队列, 股票: %s 动作: %s 数量: %s 价格: %s 信号产生时间: %s",
name,
trade_cmd["stock_code"],
trade_cmd["action"],
trade_cmd["amount"],
trade_cmd["price"],
trade_cmd["datetime"],
)
self.trade_queue.put(trade_cmd)
self.add_cmd_to_expired_cmds(trade_cmd)
try:
for _ in range(interval):
time.sleep(1)
except KeyboardInterrupt:
logger.info("程序退出")
break
修改后:
class track_strategy_worker(self, strategy, name, interval=10, **kwargs):
def __init__(self_t):
threading.Thread.__init__(self_t)
self_t.Flag=True #停止标志位
self_t.Parm=0 #用来被外部访问的
#自行添加参数
def run(self_t):
while True:
if (not self_t.flag):
break
else:
print('轮询间隔:'+time.strftime('%Y.%m.%d %H:%M:%S ',time.localtime(time.time())))
try:
transactions = self.query_strategy_transaction(
strategy, **kwargs
)
# pylint: disable=broad-except
except Exception as e:
logger.exception("无法获取策略 %s 调仓信息, 错误: %s, 跳过此次调仓查询", name, e)
time.sleep(3)
continue
for transaction in transactions:
trade_cmd = {
"strategy": strategy,
"strategy_name": name,
"action": transaction["action"],
"stock_code": transaction["stock_code"],
"amount": transaction["amount"],
"price": transaction["price"],
"datetime": transaction["datetime"],
}
if self.is_cmd_expired(trade_cmd):
continue
logger.info(
"策略 [%s] 发送指令到交易队列, 股票: %s 动作: %s 数量: %s 价格: %s 信号产生时间: %s",
name,
trade_cmd["stock_code"],
trade_cmd["action"],
trade_cmd["amount"],
trade_cmd["price"],
trade_cmd["datetime"],
)
self.trade_queue.put(trade_cmd)
self.add_cmd_to_expired_cmds(trade_cmd)
try:
for _ in range(interval):
time.sleep(1)
except KeyboardInterrupt:
logger.info("程序退出")
break
def setFlag(self_t,parm): #外部停止线程的操作函数
self_t.Flag=parm #boolean
def setParm(self_t,parm): #外部修改内部信息函数
self_t.Parm=parm
def getParm(self_t): #外部获得内部信息函数
return self_t.Parm
|