前言
本菜狗现在是哈工大威海校区计算机学院的大四本科生,也是量化投资的初学者。因为本科学校中做量化的前辈喝同伴极少,缺少与业界的交流,在很长一段时间,本菜狗一直认为量化就是MACD等技术指标的或者是用一些炫酷的DL模型来预测(因为国内很多量化书籍都是“Python基础语法+技术指标+机器学习”)。 好在没有放弃尝试,终于在经过相当一段时间的焦虑和摸索中,开始对量化行业有了比较符合客观但并不全面的认识。于是开启量化方法论系列博客的创作,把自己的学习总结、分享出来,以和需要的同伴交流。 因为本人能力和精力有限,所创作的内容难免有瑕疵乃至纰漏,欢迎批评指正。 若您对我所做的工作感兴趣,欢迎联系我:cai_jinhang@foxmail.com
系列文章
本文是量化投资方法论之多因子选股系列文章的第一篇,分享数据准备(基于Tushare)和单因子检验模块。
矢量化选股回测概述
提示:若看不太懂,可结合后面的代码理解
要点1:数据格式
把数据处理成特定的DataFrame格式:(di,ii)格式,即columns为股票代码、index为日期,value为数据(如收盘价、成交量、各种因子等),每一个指标是单独的一个df。 但也不是所有数据都要DataFrame,一些只有一维的数据使用Series即可,如特定指数收益率序列。 如下图,是10年到21年A股复权后的close数据。
要点2:股票池
股票池一般取常用指数的成分股矩阵(或者其组合,如沪深300+中证500),一般命名为univ_a\univ_data。univ_a的列名只有现在或历史上曾经是该指数成分股的股票代码。 univ_a[stk_code][trade_date]=1即股票stk_code在trade_date这一天是该指数的成分股。否则univ_a[stk_code][trade_date]=NaN 我们一般在计算因子时计算全部的A股数据,在做选股回测时再考虑股票池。方法为: factor_df = factor_df.reindex_like(univ_a)*univ_a 其中factor_df为因子数据。reindex_like把factor_df的行、列与univ_a统一。
沪深300的股票池数据示例
要点3:剔除ST股、停盘股、涨跌停书评
构造矩阵ST_valid,ST_valid[stk_code][trade_date]==1即stk_code在trade_date这一天不是ST股,通过了ST股筛选,否则是NaN 构造矩阵suspend_valid、limit_valid,同ST。 forbid_days = suspend_valid*limit_valid 只有股票在当天同时通过三种筛选,其数值才为1
在做剔除操作时,只需要将仓位矩阵*forbid_days,没有通过筛选的股票的数据就成了NaN
要点4: 仓位构建
step1:横截面排序,按照比例筛选股票,再进行仓位权重归一化,得到初始仓位pos_1 step2:若考虑调仓周期,假设是月频率调仓,就是隔20个交易日调仓。 pos_2 = pos_1.reindex(pos_1.index[::20]).fillna(0).reindex(pos_1.index).ffill() 即截取调仓日的仓位,扩充其他日期的仓位先设置为nan,再ffill(). 这里有一个fillna(0),因为如果不在扩展之前fillna,则可能会出现 某一期一只股票被选上,后面这只股票的仓位原本都应该是NaN,但经过ffill之后却都继承了这个仓位、 step3:考虑调仓日不可交易股票(见to_final_position)
这里有一点要强调,即我们的是factor_df转换成仓位pos_df时,需要shift(1).因为factor是当天收盘后;利用了当天和之前的数据计算的,而交易最早发生在后一个交易日。
要点5:回测
处理好的仓位fin_pos.shift(1)*rtn_df再横向求和就是仓位的日收益,其中rtn_df是股票收益矩阵. shift()的原因是:rtn_df是当天收盘价(开盘价)比上前一天的收盘价(开盘价),而我们是当天非前一天下的单,所以我们吃不到下单第一天的rtn。
数据准备
本人尝试基于tushare(需要积分)获取研究所需要的常用数据(如股票行情数据、指数成分股数据、st股、涨跌停等)并处理成前文所示的(di,ii)格式。
与数据下载、存储、读取相关的代码如下。 但tushare的接口对调取频率、单次调取数据量有限制。因精力有限,并没有对代码进行完全的优化,故部分代码比较啰嗦,运行效率低。 另,因为部分方法使用了多进程并行下载,故无法在jupyter notebook等交互式环境下运行,请在pycharm等IDE下执行main()函数。
数据准备部分主要有三个类。 数据下载的DataDownloader,从tushare接口获取数据并整理成特定格式。 数据更新存储的DataWriter,调用DataDownloader将数据下载、更新、存储到本地。 数据读取的DataReader,读取数据。
import tushare as ts
import numpy as np
import pandas as pd
from multiprocessing import Manager, Pool
import datetime
import os
import pickle
import warnings
warnings.filterwarnings('ignore')
ts.set_token('')
pro = ts.pro_api(timeout=5)
global dataBase
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = curPath[:curPath.find("多因子框架\\")+len("多因子框架\\")]
dataBase = rootPath+'\\data\\'
def read_pickle(path):
with open(path, 'rb') as handle:
return pickle.load(handle)
def update_pickle(text, path):
with open(path, 'wb') as handle:
pickle.dump(text, handle)
class DataDownloader:
def __init__(self,start_date='20100101',end_date = None):
self.start_date = start_date
self.end_date = end_date
self.trade_dates = self.get_trade_dates()
self.stk_codes = self.get_stks()
def get_trade_dates(self,start_date = None,end_date = None):
if start_date == None:
start_date = self.start_date
end_date = datetime.datetime.now().strftime('%Y%m%d') if end_date == None else self.end_date
df = pro.trade_cal(exchange='SSE', start_date=start_date,end_date=end_date)
df[df['is_open']==1]['cal_date'].drop_duplicates()
return df[df['is_open']==1]['cal_date'].to_list()
def get_stks(self):
stk_set = set()
for list_status in ['L','D','P']:
stk_set |= set(pro.stock_basic(list_status=list_status,fileds='ts_code')['ts_code'].to_list())
return sorted(list(stk_set))
def get_IdxWeight(self,idx_code):
'''
指数成分股
'''
start_date = pd.to_datetime(self.trade_dates[0]) - datetime.timedelta(days=32)
start_date = start_date.strftime('%Y%m%d')
trade_dates = self.get_trade_dates(start_date)
df_ls = []
while start_date < trade_dates[-1]:
end_date = pd.to_datetime(start_date) + datetime.timedelta(days=32)
end_date = end_date.strftime('%Y%m%d')
raw_df = pro.index_weight(index_code=idx_code, start_date=start_date,end_date=end_date)
df_ls.append(raw_df.pivot(index = 'trade_date',columns = 'con_code',values='weight'))
start_date = end_date
res_df = pd.concat(df_ls)
res_df = res_df[~res_df.index.duplicated(keep='first')]
res_df = res_df.reindex(trade_dates)
res_df = res_df.ffill().reindex(self.trade_dates)
return res_df.sort_index()
def get_ST_valid(self):
'''
ST股
'''
res_df = pd.DataFrame(index=self.trade_dates,columns=self.stk_codes).fillna(1)
df = pro.namechange(fields='ts_code,name,start_date,end_date')
df = df[df.name.str.contains('ST')]
for i in range(df.shape[0]):
ts_code = df.iloc[i,0]
if ts_code not in self.stk_codes:
continue
s_date = df.iloc[i, 2]
e_date = df.iloc[i, 3]
if e_date == None:
res_df[ts_code].loc[s_date:]=np.nan
else:
res_df[ts_code].loc[s_date:e_date]=np.nan
return res_df.sort_index()
def get_suspend_oneDate(self,trade_date,m_ls):
'''
tushare的接口一次最多返回5000条数据,所以按天调取。用并行加速。
'''
try:
df = pro.suspend_d(suspend_type='S',trade_date=trade_date)
m_ls.append([trade_date,df])
except:
df = pro.suspend_d(suspend_type='S',trade_date=trade_date)
m_ls.append([trade_date,df])
def get_suspend_valid(self):
'''
停牌股
'''
res_df = pd.DataFrame(index=self.trade_dates,columns=self.stk_codes).fillna(1)
m_ls = Manager().list()
pools = Pool(4)
for date in self.trade_dates:
pools.apply_async(self.get_suspend_oneDate,
args=(date,m_ls)
)
pools.close()
pools.join()
m_ls = list(m_ls)
for date,df in m_ls:
print(date,df)
res_df.loc[date,df['ts_code'].to_list()] = np.nan
return res_df.sort_index()
def get_limit_oneDate(self,trade_date,m_ls):
'''
tushare的接口一次最多返回5000条数据,所以按天调取。用并行加速。
'''
try:
df = pro.limit_list(trade_date=trade_date)
m_ls.append([trade_date,df])
except:
df = pro.suspend_d(trade_date=trade_date)
m_ls.append([trade_date,df])
def get_limit_valid(self):
'''
停牌股
'''
res_df = pd.DataFrame(index=self.trade_dates,columns=self.stk_codes).fillna(1)
m_ls = Manager().list()
pools = Pool(3)
for date in self.trade_dates:
pools.apply_async(self.get_limit_oneDate,
args=(date,m_ls)
)
pools.close()
pools.join()
m_ls = list(m_ls)
for date,df in m_ls:
res_df.loc[date,df['ts_code'].to_list()]=np.nan
return res_df.sort_index()
def get_dailyMkt_oneStock(self,ts_code,m_ls):
'''
前复权的行情数据
因为tushare下载复权行情接口一次只能获取一只股票
所以使用多进行并行
'''
try:
df = ts.pro_bar(ts_code=ts_code, adj='qfq', start_date=self.start_date,end_date=self.end_date)
m_ls.append(df)
except:
df = ts.pro_bar(ts_code=ts_code, adj='qfq', start_date=self.start_date,end_date=self.end_date)
m_ls.append(df)
def get_dailyMkt_mulP(self):
m_ls = Manager().list()
pools = Pool(3)
for ts_code in self.stk_codes:
pools.apply_async(self.get_dailyMkt_oneStock,
args=(ts_code,m_ls))
pools.close()
pools.join()
m_ls = list(m_ls)
raw_df = pd.concat(m_ls)
res_dict = {}
for data_name in ['open','close','high','low','vol','amount']:
res_df = raw_df.pivot(index='trade_date',columns='ts_code',values=data_name)
res_dict[data_name] = res_df.sort_index()
return res_dict
class DataWriter:
@staticmethod
def commonFunc(data_path,getFunc,cover,*args,**kwds):
if not os.path.exists(data_path) or cover:
t1 = datetime.datetime.now()
print(f'--------{data_path},第一次下载该数据,可能耗时较长')
newData_df = eval(f'DataDownloader().{getFunc}(*args,**kwds)')
newData_df.to_pickle(data_path)
t2 = datetime.datetime.now()
print(f'--------下载完成,耗时{t2-t1}')
else:
savedData_df = pd.read_pickle(data_path)
savedLastDate = savedData_df.index[-1]
print(f'---------{data_path}上次更新至{savedLastDate},正在更新至最新交易日')
lastData_df = eval(f'DataDownloader(savedLastDate).{getFunc}(*args,**kwds)')
newData_df = pd.concat([savedData_df,lastData_df]).sort_index()
newData_df = newData_df[~newData_df.index.duplicated(keep='first')]
newData_df.to_pickle(data_path)
print(f'---------已更新至最新日期{newData_df.index[-1]}')
newData_df.index = pd.to_datetime(newData_df.index)
return newData_df
@staticmethod
def update_IdxWeight(stk_code,cover=False):
data_path = dataBase+f'daily/idx_cons/{stk_code}.pkl'
return DataWriter.commonFunc(data_path,'get_IdxWeight',cover,stk_code)
@staticmethod
def update_ST_valid(cover=False):
data_path = dataBase+f'daily/valid/ST_valid.pkl'
return DataWriter.commonFunc(data_path,'get_ST_valid',cover)
@staticmethod
def update_suspend_valid(cover=False):
data_path = dataBase+'daily/valid/suspend_valid.pkl'
return DataWriter.commonFunc(data_path,'get_suspend_valid',cover)
@staticmethod
def update_limit_valid(cover=False):
data_path = dataBase+'daily/valid/limit_valid.pkl'
return DataWriter.commonFunc(data_path,'get_limit_valid',cover)
@staticmethod
def update_dailyMkt(cover=False):
'''
需要保证已存储的ochlv数据的日期一致
'''
if not os.path.exists(dataBase+f'daily/mkt/open.pkl') or cover:
print(f'--------Mkt,第一次下载该数据,可能耗时较长')
res_dict = DataDownloader().get_dailyMkt_mulP()
for data_name,df in res_dict.items():
data_path = dataBase+f'daily/mkt//{data_name}.pkl'
df.to_pickle(data_path)
else:
savedData_df = pd.read_pickle(dataBase+f'daily/mkt/open.pkl')
savedLastDate = savedData_df.index[-1]
print(f'---------Mkt,上次更新至{savedLastDate},正在更新至最新交易日')
res_dict = DataDownloader(savedLastDate).get_dailyMkt_mulP()
new_df = pd.DataFrame()
for data_name,last_df in res_dict.items():
data_path = dataBase+f'daily/mkt//{data_name}.pkl'
new_df = pd.concat([savedData_df,last_df]).sort_index()
new_df = new_df[~new_df.index.duplicated(keep='first')]
new_df.to_pickle(data_path)
print(f'---------已更新至最新日期{new_df.index[-1]}')
class DataReader:
@staticmethod
def commonFunc(data_path):
if not os.path.exists(data_path):
print(f'{data_path}不存在,请先调用DataWriter().update_xx')
return
df = pd.read_pickle(data_path)
df.index = pd.to_datetime(df.index)
return df
@staticmethod
def read_IdxWeight(stk_code):
data_path = dataBase+f'daily/idx_cons/{stk_code}.pkl'
return DataReader.commonFunc(data_path)
@staticmethod
def read_ST_valid():
data_path = dataBase+f'daily/valid/ST_valid.pkl'
return DataReader.commonFunc(data_path)
@staticmethod
def read_suspend_valid():
data_path = dataBase+'daily/valid/suspend_valid.pkl'
return DataReader.commonFunc(data_path)
@staticmethod
def read_limit_valid():
data_path = dataBase + 'daily/valid/limit_valid.pkl'
return DataReader.commonFunc(data_path)
@staticmethod
def read_dailyMkt(data_name):
data_path = dataBase+f'daily/mkt/{data_name}.pkl'
return DataReader.commonFunc(data_path)
@staticmethod
def read_index_dailyRtn(index_code,start_date = '20100101'):
df = pro.index_daily(ts_code=index_code, start_date= start_date).set_index('trade_date').sort_index()
df.index = pd.to_datetime(df.index)
return df['pct_chg']/100
@staticmethod
def read_dailyRtn():
df = DataReader.read_dailyMkt('close')
return df.pct_change()
if __name__ == '__main__':
DataWriter.update_ST_valid(cover=True)
DataWriter.update_suspend_valid(cover=True)
DataWriter.update_IdxWeight('399300.SZ',cover=True)
DataWriter.update_dailyMkt(cover=True)
DataWriter.update_limit_valid(cover=True)
单因子检测
主要有 1.夏普、年化、最大回撤等指标的实现。 2.to_finnal_position函数,将选出的股票进行股票池、st股、limit等处理,见注释。 3.factor_group分组回测,默认进行十等分,画出净值图和收益率柱状图。 4.ICIR相关,计算IC、IR,画出累计IC图。 5.calc_daily_pnl计算仓位收益。 6.factor_stats综合,最终直接调用该函数。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
def Col_zscore(df, n, cap=None, min_periods=1, check_std=False):
df_mean = df.rolling(window=n,min_periods=min_periods).mean()
df_std = df.rolling(window=n, min_periods=min_periods).std()
if check_std:
df_std = df_std[df_std >= 0.00001]
target = (df - df_mean) / df_std
if cap is not None:
target[target > cap] = cap
target[target < -cap] = -cap
return target
def Row_zscore(df, cap=None, check_std=False):
df_mean = df.mean(axis=1)
df_std = df.std(axis=1)
if check_std:
df_std = df_std[df_std >= 0.00001]
target = df.sub(df_mean, axis=0).div(df_std, axis=0)
if cap is not None:
target[target > cap] = cap
target[target < -cap] = -cap
return target
def MaxDrawdown(asset_series):
return asset_series - np.maximum.accumulate(asset_series)
def Sharpe_yearly(pnl_series):
return (np.sqrt(250) * pnl_series.mean()) / pnl_series.std()
def AnnualReturn(pos_df, pnl_series, alpha_type):
temp_pnl = (1+pnl_series).prod()
if alpha_type == 'ls_alpha':
temp_pos = pos_df.abs().sum().sum() / 2
else:
temp_pos = pos_df.abs().sum().sum()
if temp_pos == 0:
return .0
else:
return round(temp_pnl ** (250 / temp_pos) - 1,2)
def IC(signal, pct_n, min_valids=None, lag=0):
signal = signal.shift(lag)
corr_df = signal.corrwith(pct_n, axis=1,method='spearman').dropna()
if min_valids is not None:
signal_valid = signal.count(axis=1)
signal_valid[signal_valid < min_valids] = np.nan
signal_valid[signal_valid >= min_valids] = 1
corr_signal = corr_df * signal_valid
else:
corr_signal = corr_df
return corr_signal
def IR(signal, pct_n, min_valids=None, lag=0):
corr_signal = IC(signal, pct_n, min_valids, lag)
ic_mean = corr_signal.mean()
ic_std = corr_signal.std()
ir = ic_mean / ic_std
return ir, corr_signal
def to_final_position(factor_score, forbid_day):
'''
factor_score:DataFrame,可以是因子值,也可以是根据因子值排序选出来的初始仓位矩阵
forbid_day:DataFrame,是否可交易(由ST股、停盘相乘得到),1代表该股票该日可以交易,不可交易则是NaN
return:
pos_fin:DataFrame,最终仓位
'''
pos_fin = factor_score.shift(1).replace(np.nan, 0) * forbid_day
pos_fin = pos_fin.ffill()
return pos_fin
def calc_daily_pnl(factor_df, univ_data, rtn_df, idx_rtn,forbid_days,method):
'''
:param factor_df: 因子/仓位矩阵
:param univ_data: 股票池矩阵(如沪深300成分股、中证500成分股等等)
:param idx_rtn: 指数rtn序列
:param forbid_days: 合法交易矩阵
:param rtn_df: 股票rtn矩阵
:param method_func: feature/factor/ls_alpha/hg_alpha
:return: 仓位矩阵+每日仓位收益率序列
'''
factor_sel = factor_df.copy()
factor_sel = factor_sel.reindex_like(univ_data)*univ_data
forbid_days = forbid_days.reindex_like(factor_sel)
return_df = rtn_df.reindex_like(factor_sel)
if method == 'feature' or method == 'factor':
factor_z = Row_zscore(factor_sel, cap=4.5)
pos_final = to_final_position(factor_z, forbid_days)
daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1)
return pos_final,daily_pnl_final
elif method == 'ls_alpha':
pos_final = to_final_position(factor_sel, forbid_days)
daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1)
return pos_final,daily_pnl_final
elif method == 'hg_alpha':
pos_final = to_final_position(factor_sel, forbid_days)
daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1) - idx_rtn
return pos_final,daily_pnl_final
def factor_group(factor_df,forb_day,rtn_df,idx_rtn,univ_data,split_pct_ls):
'''
分组回测
'''
factor_df = factor_df.reindex_like(univ_data)*univ_data
factor_score = factor_df
factor_rank_pct = factor_score.rank(ascending=False, pct=True, axis=1)
annual_rtn_ls = list()
plt.figure(figsize=(12, 6))
for split_pct in split_pct_ls:
pos_selected = factor_score[(factor_rank_pct > split_pct[0])&(factor_rank_pct <= split_pct[1])]
pos_selected = pos_selected.where(pd.isnull(pos_selected), 1)
pos = pos_selected.div(pos_selected.sum(axis=1), axis=0)
pos = to_final_position(pos, forb_day).reindex(factor_df.index)
daily_rtn = (pos.shift(1) * rtn_df).sum(axis=1).reindex(factor_df.index)
annual_rtn = AnnualReturn(pos,daily_rtn,'factor')
annual_rtn_ls.append(annual_rtn)
plt.plot((daily_rtn+1).cumprod(), label=str(split_pct))
plt.title('all factor group backtest return',fontsize = 14)
plt.legend()
plt.grid()
plt.show()
xticks = range(len(split_pct_ls))
plt.figure(figsize=(12, 6))
p = plt.subplot(111)
p.bar(x = xticks,height = annual_rtn_ls)
p.set_xticks(xticks)
p.set_xticklabels([x[1]*10 for x in split_pct_ls])
plt.title('factor group annual return',fontsize = 14)
plt.grid()
plt.show()
def factor_stats(
factor_df=None,
chg_n=1,
univ_data=None,
rtn_df=None,
idx_rtn=None,
forbid_days = None,
method='factor',
group_split_ls=[(0,0.1),(0.1,0.2),(0.2,0.3),(0.3,0.4),(0.4,0.5),(0.5,0.6),(0.6,0.7),(0.7,0.8),(0.8,0.9),(0.9,1.0)]
):
if method=='factor':
plt.figure(figsize=(12, 6))
pos_final,daily_pnl = calc_daily_pnl(factor_df, univ_data, rtn_df, idx_rtn,forbid_days,method)
plt.plot(daily_pnl.cumsum())
plt.title('all factor row_Zscore position return',fontsize = 14)
plt.grid(1)
plt.show()
factor_group(
factor_df,
forbid_days,
rtn_df,
idx_rtn,
univ_data,
split_pct_ls=group_split_ls
)
pct_n = rtn_df.rolling(window=chg_n).sum()
ir,IC_series = IR(factor_df, pct_n, lag=chg_n)
plt.figure(figsize=(12, 6))
plt.plot(IC_series.cumsum(),label=f'IR:{round(ir,2)},IC_mean:{round(IC_series.mean(),2)}')
plt.title('IC cumsum',fontsize = 14)
plt.legend()
plt.grid(1)
plt.show()
else:
plt.figure(figsize=(16, 6))
p1 = plt.subplot(111)
pos = factor_df.reindex(factor_df.index[::chg_n])
pos = pos.reindex(factor_df.index).ffill()
pos_final,daily_pnl = calc_daily_pnl(pos, univ_data, rtn_df, idx_rtn,forbid_days,method)
sharpe = round(Sharpe_yearly(daily_pnl),2)
max_drawdown = round(MaxDrawdown((daily_pnl+1).cumprod()),2)
annual_return = round(AnnualReturn(pos_final,daily_pnl,method),2)
p1.plot(daily_pnl.cumsum(),label=f'SP:{sharpe},MD:{max_drawdown.min()},AR:{annual_return}')
p1.set_title('selected position return')
p1.grid(1)
p1.legend()
plt.show()
样例
以20日收益率因子为例,展示单因子检测内容。 本人本地的目录如下图所示。
from my_lib.data_download.data_io import DataReader
from my_lib.factor_evaluate.factor_evaluate import factor_stats
import pandas as pd
import numpy as np
def calc_factor():
close_df = DataReader.read_dailyMkt('close')
return close_df.pct_change(20)
计算因子
factor_df = calc_factor()
factor_df.tail(5)
股票池
univ_a = DataReader.read_IdxWeight('399300.SZ')
univ_a = univ_a.where(pd.isnull(univ_a),1)
univ_a
st股、停牌、涨跌停
ST_valid = DataReader.read_ST_valid()
suspend_valid = DataReader.read_suspend_valid()
limit_valid = DataReader.read_limit_valid()
forb_days = ST_valid*suspend_valid*limit_valid
forb_days.tail(5)
每日收益率矩阵
rtn_df = DataReader.read_dailyRtn()
rtn_df.tail(5)
因子测试与回测 原始因子
idx_rtn = DataReader.read_index_dailyRtn('399300.SZ')
factor_stats(
factor_df=factor_df,
chg_n=20,
univ_data=univ_a,
rtn_df=rtn_df,
idx_rtn=idx_rtn,
forbid_days=forb_days,
method='factor',
group_split_ls=[(0,0.1),(0.1,0.2),(0.2,0.3),(0.3,0.4),(0.4,0.5),(0.5,0.6),(0.6,0.7),(0.7,0.8),(0.8,0.9),(0.9,1.0)]
)
指数对冲
factor_rank_pct = factor_df.rank(ascending=False, pct=True, axis=1)
factor_selected = factor_df[factor_rank_pct>0.8]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos = factor_selected.div(factor_selected.sum(axis=1), axis=0)
pos = pos.fillna(0)
factor_stats(
factor_df = pos,
chg_n=20,
univ_data=univ_a,
rtn_df=rtn_df,
idx_rtn=idx_rtn.replace(np.inf,np.nan).replace(-np.inf,np.nan),
forbid_days=forb_days,
method='hg_alpha',
)
多空对冲
factor_df = factor_df.reindex_like(univ_a)*univ_a
factor_rank_pct = factor_df.rank(ascending=False, pct=True, axis=1)
factor_selected = factor_df[factor_rank_pct>0.8]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos_long = factor_selected.div(factor_selected.sum(axis=1), axis=0).fillna(0)
factor_selected = factor_df[factor_rank_pct<0.2]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos_short = factor_selected.div(factor_selected.sum(axis=1), axis=0).fillna(0)
factor_stats(
factor_df = pos_long.fillna(0) - pos_short.fillna(0),
chg_n=20,
univ_data=univ_a,
rtn_df=rtn_df,
idx_rtn=idx_rtn.replace(np.inf,np.nan).replace(-np.inf,np.nan),
forbid_days=forb_days,
method='ls_alpha',
)
|