一、计算交易信号
这里以双均线策略为例,金叉买入,死叉卖出。
import pandas as pd
def signal_moving_average(df: pd.DataFrame, para=[5, 60]):
"""
简单的移动平均线策略
当短期均线由下向上穿过长期均线的时候,买入;然后由上向下穿过的时候,卖出。
:param df: 原始数据
:param para: 参数,[ma_short, ma_long]
:return:
"""
ma_short = para[0]
ma_long = para[1]
df['ma_short'] = df['close'].rolling(ma_short, min_periods=1).mean()
df['ma_long'] = df['close'].rolling(ma_long, min_periods=1).mean()
condition1 = df['ma_short'] > df['ma_long']
condition2 = df['ma_short'].shift(1) <= df['ma_long'].shift(1)
df.loc[condition1 & condition2, 'signal'] = 1
condition1 = df['ma_short'] < df['ma_long']
condition2 = df['ma_short'].shift(1) >= df['ma_long'].shift(1)
df.loc[condition1 & condition2, 'signal'] = 0
df.drop(['ma_short', 'ma_long'], axis=1, inplace=True)
df['pos'] = df['signal'].shift()
df['pos'].fillna(method='ffill', inplace=True)
df['pos'].fillna(value=0, inplace=True)
return df
二、交易相关方法
from datetime import datetime, timedelta
import time
import pandas as pd
from email.mime.text import MIMEText
from smtplib import SMTP_SSL
def next_run_time(time_interval, ahead_time=1):
if time_interval.endswith('m'):
now_time = datetime.now()
time_interval = int(time_interval.strip('m'))
target_min = (int(now_time.minute / time_interval) + 1) * time_interval
if target_min < 60:
target_time = now_time.replace(minute=0, second=0, microsecond=0)
else:
if now_time.hour == 23:
target_time = now_time.replace(hour=0, minute=0, second=0, microsecond=0)
target_time += timedelta(days=1)
else:
target_time = now_time.replace(hour=now_time.hour + 1, minute=0, second=0, microsecond=0)
if (target_time - datetime.now()).seconds < ahead_time + 1:
print('距离target_time不足', ahead_time, '秒,下下个周期再运行')
target_time += timedelta(minutes=time_interval)
print('下次运行时间', target_time)
return target_time
else:
exit('time_interval doesn\'t end with m')
return datetime.now()
def get_okex_candle_data(exchange, symbol, time_interval):
content = exchange.fetch_ohlcv(symbol, timeframe=time_interval, since=0)
df = pd.DataFrame(content, dtype=float)
df.rename(columns={0: 'MTS', 1: 'open', 2: 'high', 3: 'low', 4: 'close', 5: 'volume'}, inplace=True)
df['candle_begin_time'] = pd.to_datetime(df['MTS'], unit='ms')
df['candle_begin_time_GMT8'] = df['candle_begin_time'] + timedelta(hours=8)
df = df[['candle_begin_time_GMT8', 'open', 'high', 'low', 'close', 'volume']]
return df
def place_order(exchange, order_type, buy_or_sell, symbol, price, amount):
"""
下单
:param exchange: 交易所
:param order_type: limit, market
:param buy_or_sell: buy, sell
:param symbol: 买卖品种
:param price: 当market订单的时候,price无效
:param amount: 买卖量
:return:
"""
for i in range(5):
try:
if order_type == 'limit':
if buy_or_sell == 'buy':
order_info = exchange.create_limit_buy_order(symbol, amount, price)
elif buy_or_sell == 'sell':
order_info = exchange.create_limit_sell_order(symbol, amount, price)
elif order_type == 'market':
if buy_or_sell == 'buy':
order_info = exchange.create_market_buy_order(symbol=symbol, amount=amount)
elif buy_or_sell == 'sell':
order_info = exchange.create_market_sell_order(symbol=symbol, amount=amount)
else:
pass
print('下单成功:', order_type, buy_or_sell, symbol, price, amount)
print('下单信息:', order_info, '\n')
return order_info
except Exception as e:
print('下单报错,1s后重试', e)
time.sleep(1)
print('下单报错次数过多,程序终止')
exit()
class QQMail:
user = 'xxx@qq.com'
pwd = '授权码'
def __init__(self):
self.smtp = SMTP_SSL('smtp.qq.com', 465)
self.smtp.login(self.user, self.pwd)
def send_message(self, to, subject, content):
msg = MIMEText(content)
msg['Subject'] = subject
msg['From'] = self.user
msg['To'] = to
self.smtp.send_message(msg)
def quit(self):
self.smtp.quit()
def auto_send_email(to_address, subject, content):
mail = QQMail()
mail.send_message(to_address, subject, content)
mail.quit()
三、主程序
import ccxt
from datetime import datetime, timedelta
from time import sleep
import pandas as pd
from .trade import next_run_time, auto_send_email, place_order, get_okex_candle_data
from .signals import signal_moving_average
"""
自动交易主要流程
# 通过while语句,不断的循环
# 每次循环中需要做的操作步骤
1. 更新账户信息
2. 获取实时数据
3. 根据最新数据计算买卖信号
4. 根据目前仓位、买卖信息,结束本次循环,或者进行交易
5. 交易
"""
time_interval = '1m'
exchange = ccxt.okex5()
exchange.proxies = {
'http': 'http://127.0.0.1:6666',
'https': 'http://127.0.0.1:6666',
}
exchange.apiKey = ''
exchange.secret = ''
exchange.password = ''
symbol = 'ETH/USDT'
base_coin = symbol.split('/')[-1]
trade_coin = symbol.split('/')[0]
para = [20, 200]
while True:
email_title = '策略报表'
email_content = ''
balance = exchange.fetch_balance()['total']
base_coin_amount = float(balance[base_coin])
trade_coin_amount = float(balance[trade_coin])
print('当前资产:\n', base_coin, base_coin_amount, trade_coin, trade_coin_amount)
run_time = next_run_time(time_interval)
sleep(max(0, (run_time - datetime.now()).seconds))
while True:
if datetime.now() < run_time:
continue
else:
break
while True:
df = get_okex_candle_data(exchange, symbol, time_interval)
_temp = df[df['candle_begin_time_GMT8'] == (run_time - timedelta(minutes=int(time_interval)))]
if _temp.empty:
print('获取数据不包含最新的数据,重新获取')
continue
else:
break
df = df[df['candle_begin_time_GMT8'] < pd.to_datetime(run_time)]
df = signal_moving_average(df, para=para)
signal = df.iloc[-1]['signal']
print('\n 交易信号', signal)
if trade_coin_amount > 0 and signal == 0:
print('\n卖出')
price = exchange.fetch_ticker(symbol)['bid']
place_order(exchange, order_type='limit', buy_or_sell='sell', symbol=symbol, price=price * 0.98, amount=trade_coin_amount)
email_title += '_卖出_' + trade_coin
email_content += '卖出信息:\n'
email_content += '卖出数量:' + str(trade_coin_amount) + '\n'
email_content += '卖出价格:' + str(price) + '\n'
if trade_coin_amount == 0 and signal == 1:
print('\n买入')
price = exchange.fetch_ticker(symbol)['ask']
buy_amount = base_coin_amount / price
place_order(exchange, order_type='limit', buy_or_sell='buy', symbol=symbol, price=price * 1.02, amount=buy_amount)
email_title += '_买入_' + trade_coin
email_content += '买入信息:\n'
email_content += '买入数量:' + str(buy_amount) + '\n'
email_content += '买入价格:' + str(price) + '\n'
if run_time.minute % 30 == 0:
auto_send_email('462915202@qq.com', email_title, email_content)
print(email_title)
print(email_content)
print('====本次运行完毕\n')
sleep(6 * 1)
|