期限套利策略,要能够稳定获得各币种期货和现货的报价,计算各币种的基差数据,然后当基差突破阈值时触发交易。以下代码可以得到稳定报价,计算基差数据,然后当突破阈值时收到提示邮件(还没写好交易模块)。
这里记录一下,开发过程遇到的主要问题,以及如何解决的,如果不感兴趣,可以跳过,直接看代码。
问题一:websocket报价数据不更新
简单的ws.run_forever,通常在运行两三天后,会出现报价卡在某个时点,不能更新的情况。例如,现货报价停留在2021年12月25日11:00。
解决方案:用while true + try except语句,一旦websock链接有问题,就发送邮件通知(不知道为啥,一直没有收到报错邮件,发送邮件的代码可能有问题),回收内存,再重新链接。
def s_start(self):
while True:
try:
s_ws=WebSocketApp(
url='wss://stream.binance.com:9443/ws',
on_open=self._s_on_open,
on_message=self._s_on_message,
on_error=self._s_on_error,
on_close=self._s_on_close
)
s_ws.run_forever()
except Exception as e:
content="%s the spot webscoket is broken,check if it restart"%e
self._email(content)
gc.collect()
问题二:启动阶段报价数据为空
因为涉及到多币种,刚刚建立链接的时候,报价数据获取有先后,有可能btc数据有了,但是bnb数据要到10秒甚至20秒后才能收到。这样程序会因报错而中断。
解决方案:定义好报错的类型,用@retry装饰器,保证程序在遇到特定类型报错的时候,能够持续运行下去。
def retry_if_index_error(exception):
return isinstance(exception,IndexError)
@retry(retry_on_exception=retry_if_index_error)
def get_basis(self):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 5 20:15:41 2021
It seems to work really good, the price and basis data is correct after running 13 days.
2021/12/13 16:30 begin
@author: yszhu
"""
from websocket import WebSocketApp
import json,threading
import pysnooper
import time
import smtplib
from email.mime.text import MIMEText
from retrying import retry
import logging
import gc
class basis_dif(object):
"""
create variable save dilivery contract price,spot price and basis difference
"""
def __init__(self,contract_date=220325):
#delivery contract price
self.d_eth_price=None
self.d_eth_time=None
self.d_btc_price=None
self.d_btc_time=None
self.d_bnb_price=None
self.d_bnb_price=None
#spot price
self.eth_price=None
self.eth_time=None
self.btc_price=None
self.btc_time=None
self.bnb_price=None
self.bnb_time=None
#delivery and spot basis difference
self.btc_basis_dif=None
self.eth_basis_dif=None
self.bnb_basis_dif=None
self.contract_date=contract_date
self.basis_dif_threshold=0.035
#email
self._mail_host="smtp.qq.com"
self._mail_user="315960451"
self._mail_pwd="brvjltflaofrbhcb"
self._sender="youradress@qq.com"
self._receivers="youradress@qq.com"
# I use self._break_threshold to avoid sending email repeatedly.
# when self._break_threshold = True ,it means the basis diffrence
# now is greater than the threshold,so if the diffence becomes smaller
# than the threshold, this is the first time of break during one operation period.
# So ,I will receive the email ,and then operate my account.
self._break_threshold = True
#websocket for delivery contrat price
def _d_on_open(self,d_ws):
data={
"method":"SUBSCRIBE",
"params":
[
"btcusd_%s@aggTrade"%self.contract_date,
"ethusd_%s@aggTrade"%self.contract_date,
"bnbusd_%s@aggTrade"%self.contract_date
],
"id": 1
}
d_ws.send(json.dumps(data))
def _d_on_message(self,d_ws,d_msg):
d_msg=json.loads(d_msg)
if 's' in d_msg and d_msg['s']=="BTCUSD_%s"%self.contract_date:
self.d_btc_price=float(d_msg['p'])
self.d_btc_time=d_msg['T']
self.d_btc_time=int(self.d_btc_time/1000)
self.d_btc_time=time.localtime(self.d_btc_time)
self.d_btc_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_btc_time)
if 's' in d_msg and d_msg['s']=="ETHUSD_%s"%self.contract_date:
self.d_eth_price=float(d_msg['p'])
self.d_eth_time=d_msg['T']
self.d_eth_time=int(self.d_eth_time/1000)
self.d_eth_time=time.localtime(self.d_eth_time)
self.d_eth_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_eth_time)
if 's' in d_msg and d_msg['s']=="BNBUSD_%s"%self.contract_date:
self.d_bnb_price=float(d_msg['p'])
self.d_bnb_time=d_msg['T']
self.d_bnb_time=int(self.d_bnb_time/1000)
self.d_bnb_time=time.localtime(self.d_bnb_time)
self.d_bnb_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_bnb_time)
def _d_on_close(self,d_ws):
print("##connection closed##")
def _d_on_error(self,d_ws,error):
print(f"on error:{error}")
def d_start(self):
while True:
try:
d_ws=WebSocketApp(
url='wss://dstream.binance.com/ws',
on_open=self._d_on_open,
on_message=self._d_on_message,
on_error=self._d_on_error,
on_close=self._d_on_close
)
d_ws.run_forever()
except Exception as e:
content="%s the future webscoket is broken,check if it restart"%e
self._email(content)
gc.collect()
#websocket for spot price
def _s_on_open(self,s_ws):
data={
"method": "SUBSCRIBE",
"params": [
"btcusdt@aggTrade",
"ethusdt@aggTrade",
"bnbusdt@aggTrade"
],
"id": 2
}
s_ws.send(json.dumps(data))
def _s_on_message(self,s_ws,s_msg):
s_msg=json.loads(s_msg)
if 's' in s_msg and s_msg['s']=="BTCUSDT":
self.btc_price=float(s_msg['p'])
self.btc_time=s_msg['T']
self.btc_time=int(self.btc_time/1000)
self.btc_time=time.localtime(self.btc_time)
self.btc_time=time.strftime("%Y-%m-%d %H:%M:%S",self.btc_time)
if 's' in s_msg and s_msg['s']=="ETHUSDT":
self.eth_price=float(s_msg['p'])
self.eth_time=s_msg['T']
self.eth_time=int(self.eth_time/1000)
self.eth_time=time.localtime(self.eth_time)
self.eth_time=time.strftime("%Y-%m-%d %H:%M:%S",self.eth_time)
if 's' in s_msg and s_msg['s']=="BNBUSDT":
self.bnb_price=float(s_msg['p'])
self.bnb_time=s_msg['T']
self.bnb_time=int(self.bnb_time/1000)
self.bnb_time=time.localtime(self.bnb_time)
self.bnb_time=time.strftime("%Y-%m-%d %H:%M:%S",self.bnb_time)
def _s_on_close(self,s_ws):
print("##connection closed##")
def _s_on_error(self,s_ws,error):
print(f"on error:{error}")
def s_start(self):
while True:
try:
s_ws=WebSocketApp(
url='wss://stream.binance.com:9443/ws',
on_open=self._s_on_open,
on_message=self._s_on_message,
on_error=self._s_on_error,
on_close=self._s_on_close
)
s_ws.run_forever()
except Exception as e:
content="%s the spot webscoket is broken,check if it restart"%e
self._email(content)
gc.collect()
#because there are 7 kind of coin with spot and future price , so at
#the begining , there maybe no data for self.bnb_price for the lack of liquidity,
#In this case , python will raise IndexError
#we need trying when TypeError is raised.
def retry_if_index_error(exception):
return isinstance(exception,IndexError)
@retry(retry_on_exception=retry_if_index_error)
def get_basis(self):
while True:
self.btc_basis_dif = self.d_btc_price/self.btc_price-1
self.eth_basis_dif = self.d_eth_price/self.eth_price-1
self.bnb_basis_dif = self.d_bnb_price/self.bnb_price-1
print("btc_basis_dif is %f" % self.btc_basis_dif)
print("btc_d_price is %f %s"%(self.d_btc_price,self.d_btc_time))
print("btc_s_price is %f %s"%(self.btc_price,self.btc_time))
print("eth_basis_dif is %f"%self.eth_basis_dif)
print("eth_d_price is %f %s"%(self.d_eth_price,self.d_eth_time))
print("eth_s_price is %f %s"%(self.eth_price,self.eth_time))
print("bnb_basis_dif is %f"%self.bnb_basis_dif)
print("bnb_d_price is %f %s"%(self.d_bnb_price,self.d_bnb_time))
print("bnb_s_price is %f %s"%(self.bnb_price,self.bnb_time))
basis_dif_dict={
"btc":[self.btc_basis_dif,self.btc_price,self.d_btc_price],
"eth":[self.eth_basis_dif,self.eth_price,self.d_eth_price],
"bnb":[self.bnb_basis_dif,self.bnb_price,self.d_bnb_price],
}
basis_dif_dict=sorted(basis_dif_dict.items(),key=lambda x:x[1],reverse=True)
greatest_basis_dif=basis_dif_dict[0][1][0]
print("the greatest basis is %s %f,Spot pirice %f,future price %f"%(
basis_dif_dict[0][0],
greatest_basis_dif,
basis_dif_dict[0][1][1],
basis_dif_dict[0][1][2]
))
if greatest_basis_dif>self.basis_dif_threshold:
if self._break_threshold == True:
content="the greatest basis is %s %f,Spot pirice %f,future price %f"%(
basis_dif_dict[0][0],
greatest_basis_dif,
basis_dif_dict[0][1][1],
basis_dif_dict[0][1][2]
)
self._email(content)
self._break_threshold = False
if greatest_basis_dif<self.basis_dif_threshold:
self._break_threshold= True
def _email(self,content):
'''
if the basis_dif reached the threshold, send an email
param:content: which coin's bais_dif has reach the threshold at what price and when
'''
message=MIMEText(content,'plain',"utf-8")
message['Subject']="from new greatest basis dif:basis difference reached the threshold"
message['From']=self._sender
message['to']=self._receivers[0]
smtpObj=smtplib.SMTP()
smtpObj.connect(self._mail_host,25)
smtpObj.login(self._mail_user,self._mail_pwd)
smtpObj.sendmail(self._sender,self._receivers,message.as_string())
smtpObj.quit()
if __name__=="__main__":
test=basis_dif()
# because I want to recieve spot and contract price at the same time, so I create two threads
t1=threading.Thread(target=test.d_start)
t2=threading.Thread(target=test.s_start)
t1.start()
t2.start()
test.get_basis()
|