使用Matplotlib生成专业的数据可视化仪表盘(下篇)
投资结果的可视化(下篇)
在上一篇文章《投资组合的评价和可视化(上)》中,我们了解了几种常见的投资组合评价指标的计算方法,并且通过一个实例,一步步根据这个投资组合在过去十年的模拟交易结果,计算出了它的各项指标,接下来,我们将一步步实现所有指标的可视化。
在这篇文章里,我们使用matplotlib来实现可视化。我们使用前一篇文章里计算完成的数据,把他们组合显示在一张图表中(如下图):
看上去图表比较复杂,但是我们会一步步将它们实现。图表中的原始数据可以在这里下载,原始数据包含一个大小盘轮动策略在过去十年里的模拟交易结果。在上一篇文章中,我们在原始数据的基础上一步步计算了投资组合的所有相关评价指标,包括:
- 收益率和年化收益率:整个模拟交易历史上的投资收益率、年化收益率和当日收益率存储在looped_value的rtn, annual_rtn, pct_change三列中
- 月度收益率:投资组合在模拟交易历史上每个月的月度收益率存储在一个名为montly_return_df的DataFrame中
- 波动率Volatility:这里我们以250个交易日为单位,滚动计算每一天的波动率,这个数字保存在looped_value的volatility列中,同时,计算出十年间的滚动波动率的平均值:0.211
- 最大回撤Max Drawdwon:我们通过遍历十年间的资产总额,统计出所有的回撤情况,存储在一个名为dd_df的DataFrame中,并按回撤深度从大到小排序,知道最大回撤位20%。
- 夏普率Sharp Ratio:反映投资组合每承担一份风险,可以获取多少超额收益,我们滚动计算了十年间每一天的250日夏普率,存储在looped_value的sharp列中,并计算出十年间滚动夏普率的平均值:0.94,说明承担的风险稍大于获取的超额收益。
- 卡尔玛比率 Calmar Ratio:反映投资组合每承受一份回撤,能够获取多少超额收益,我们滚动计算了十年间每一天的滚动卡尔玛比率,存储在looped_value的sharp列中,并计算出十年间滚动卡尔玛率的平均值:2.07,说明相对于收益来说,回撤幅度可以接受。
- 贝塔系数:体现投资组合随市场波动的情况,我们同样计算了滚动贝塔系数,存储在beta列中,同时计算出其平均值:0.62,说明投资组合的总体风险(波动率)小于市场平均水平。
- 阿尔法系数:揭示投资组合获取超越市场因素带来的超市场收益,滚动阿尔法系数存储在looped_value的alpha列中,平均值为27%,这是超过市场因素的“内生”收益率。
下面,我们就来一步步把上面的信息通过matplotlib可视化出来。不过,本文所有代码都是基于上一篇文章的结果,因此,还没有读上一篇文章的同学,需要复制完整代码以确保上述的计算都已经完成。
图表的布局规划及格式设定
由于需要显示的内容非常多,一张简单的图表是无法表现所有内容的,因此我们需要一张组合图表。在Matplotlib中,一张图表被称为一个figure,在一个figure上可以放置若干个绘图区域Axes,每个绘图区域内都有自己独立的数据、网格、标题等等元素,我们要在一个figure上有计划第放置多个Axes来制作组合图表。
我们首先需要加载matplotlib模块:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
图表布局
我们希望整张图表既能够展现所有的内容,但是又主次分明,因此需要对图表的格式做出一个大致的规划: 首先,图表应该表现三大类内容:
- 回测结果的文字总结,用文字的形式展示出关键信息,如回报率、夏普率等等
- 历史过程曲线图,以交易历史日期为X轴,以不同的图表展示各种历史过程曲线,例如收益率、波动率、回撤水平等,这部分图表又包含两部分:
2.1. 收益率曲线,由于收益率是回测结果的核心信息,因此这部分图表要占据最大的版面,占据视觉重心位置,而且以三种不同的方式展现 2.2. 回测评价指标曲线,包括风险敞口评价指标如beta、波动率,以及盈利能力评价指标如alpha等 - 收益率统计图表,通过热力图、直方图等多种形式展示历史收益率的水平。
根据上面的规划,我们可以按下面的格式在一个figure上创建九张图表,并调整它们的位置。留出最大的一张表,位于视觉重心处,用于显示最重要的收益率曲线。
chart_width = 0.88
fig = plt.figure(figsize=(12, 15), facecolor=(0.82, 0.83, 0.85))
ax1 = fig.add_axes([0.05, 0.67, 0.88, 0.20])
ax2 = fig.add_axes([0.05, 0.57, 0.88, 0.08], sharex=ax1)
ax3 = fig.add_axes([0.05, 0.49, 0.88, 0.06], sharex=ax1)
ax4 = fig.add_axes([0.05, 0.41, 0.88, 0.06], sharex=ax1)
ax5 = fig.add_axes([0.05, 0.33, 0.88, 0.06], sharex=ax1)
ax6 = fig.add_axes([0.05, 0.25, 0.88, 0.06], sharex=ax1)
ax7 = fig.add_axes([0.05, 0.04, 0.35, 0.16])
ax8 = fig.add_axes([0.45, 0.04, 0.15, 0.16])
ax9 = fig.add_axes([0.64, 0.04, 0.29, 0.16])
使用plt.show() 或者fig.show() 即可看到图表的外观如下。
同学们可以自行微调add_axes() 里列表中的四个数字,来调整每个表的位置大小,这四个数字分别代表: [图表左下角X坐标,图表左下角Y坐标,图表的宽度,图表的高度] 每个数字都是一个小数,单位为整个figure 的高度或宽度,例如,[0.64, 0.04, 0.29, 0.16] 表示图> 表的左下角位于figure 宽度的64%处、高度的4%处,宽度为figure 的29%,高度为figure 的16%
格式设定
为了图表的美观起见,我们还可以调整一下各个图表的格式,例如,前六张图表可以共享一个X轴,因此图表的X轴坐标便可以隐藏,另外,我们希望在图表左边放置名称或说明,那么Y轴坐标就可以放在右边,等等。
这部分代码可以放置在最后。
for ax in [ax1, ax2, ax3, ax4, ax5, ax6]:
ax.yaxis.tick_right()
ax.xaxis.set_ticklabels([])
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.grid(True)
years = mdates.YearLocator()
months = mdates.MonthLocator()
weekdays = mdates.WeekdayLocator()
years_fmt = mdates.DateFormatter('%Y')
month_fmt_none = mdates.DateFormatter('')
month_fmt_l = mdates.DateFormatter('%y/%m')
month_fmt_s = mdates.DateFormatter('%m')
major_locator = years
major_formatter = years_fmt
minor_locator = months
minor_formatter = month_fmt_none
ax6.xaxis.set_major_locator(major_locator)
ax6.xaxis.set_major_formatter(major_formatter)
ax6.xaxis.set_minor_locator(minor_locator)
ax6.xaxis.set_minor_formatter(minor_formatter)
for ax in [ax1, ax2, ax3, ax4, ax5]:
plt.setp(ax.get_xticklabels(), visible=False)
表头和回测结果摘要信息
图表的格式设置完毕后,我们可以先把所有文字信息输出到图表上的空白处。 在这里,我们需要计算出需要显示的所有数据:
change = (looped_value[stock_holdings] - looped_value[stock_holdings].shift(1)).sum(1)
start_point = looped_value['value'].iloc[0]
ref_start = looped_value['benchmark'].iloc[0]
ret = looped_value['value'] - looped_value['value'].shift(1)
position = 1 - (looped_value['cash'] / looped_value['value'])
beta = looped_value['beta']
alpha = looped_value['alpha']
volatility = looped_value['volatility']
sharp = looped_value['sharp']
underwater = looped_value['underwater']
drawdowns = dd_df
return_rate = (looped_value.value - start_point) / start_point * 100
ref_rate = (looped_value.benchmark - ref_start) / ref_start * 100
adjusted_bench_start = looped_value.benchmark / ref_start * start_point
使用fig.suptitle() 设置图表的大标题 而其他所有的文字都可以通过fig.text() 来输出。fig.text() 的头两个参数是文字的坐标,需要注意,文字的坐标是以文字块左下角的位置来定义的,因此如果文字中包含换行符,第一行文字就会被“顶到”上面去。
请注意,下面的代码中使用了fontname='pingfang HK' 以使用中文字体,否则中文会显示为乱码
title_asset_pool = '沪深300/创业板指 大小盘轮动策略'
fig.suptitle(f'回测交易结果: {title_asset_pool} - 业绩基准: 沪深300指数',
fontsize=14,
fontweight=10,
fontname='pingfang HK')
fig.text(0.07, 0.955, f'回测期长: {total_years:3.1f} 年, '
f' 从: {looped_value.index[0].date()} 起至 {looped_value.index[-1].date()}止',
fontname='pingfang HK')
fig.text(0.21, 0.90, f'交易操作汇总:\n\n'
f'投资总金额:\n'
f'期末总资产:', ha='right',
fontname='pingfang HK')
fig.text(0.23, 0.90, f'{op_counts.buy.sum()} 次买入 \n'
f'{op_counts.sell.sum()} 次卖出\n'
f'¥{total_invest:13,.2f}\n'
f'¥{final_value:13,.2f}',
fontname='pingfang HK')
fig.text(0.50, 0.90, f'总投资收益率:\n'
f'平均年化收益率:\n'
f'基准投资收益率:\n'
f'基准投资平均年化收益率:\n'
f'最大回撤:\n', ha='right',
fontname='pingfang HK')
fig.text(0.52, 0.90, f'{total_return:.2%} \n'
f'{annual_return: .2%} \n'
f'{ref_return:.2%} \n'
f'{ref_annual_rtn:.2%}\n'
f'{mdd:.1%} \n'
f' 底部日期 {mdd_date.date()}',
fontname='pingfang HK')
fig.text(0.82, 0.90, f'alpha / 阿尔法系数:\n'
f'Beta / 贝塔系数:\n'
f'Sharp ratio / 夏普率:\n'
f'Calmar ratio / 卡尔玛比率:\n'
f'250-日滚动波动率:', ha='right',
fontname='pingfang HK')
fig.text(0.84, 0.90, f'{avg_alpha:.3f} \n'
f'{avg_beta:.3f} \n'
f'{avg_sharp:.3f} \n'
f'{avg_calmar:.3f} \n'
f'{avg_volatility:.3f}',
fontname='pingfang HK')
如果输入的文字正常,显示效果应该如下图所示:
有的朋友在运行上述代码时,可能会遇到错误说使用的中文字体不存在,因而中文显示为乱码,这里给出一个解决方案供大家参考:
为了显示系统中有哪些中文字体,可以先导入matplotlib 的FontManager 类,调用这个类的ttflist 属性,就可以看到系统中已经存在的所有可以被matplotlib 使用的字体了,选择其中的中文字体即可(中文字体名称中一般都带有拼音,或者含有TC 、SC 之类的关键字:
>>> from matplotlib.font_manager import FontManager
>>> fm = FontManager()
>>> fm.ttflist
Out:
[<Font 'STIXSizeOneSym' (STIXSizOneSymBol.ttf) normal normal 700 normal>,
<Font 'STIXSizeOneSym' (STIXSizOneSymReg.ttf) normal normal 400 normal>,
...
<Font 'PingFang HK' (PingFang.ttc) normal normal 400 normal>,
...
<Font 'STIXIntegralsUpD' (STIXIntUpDReg.otf) normal normal 400 normal>,
<Font 'Apple Braille' (Apple Braille Pinpoint 6 Dot.ttf) normal normal 400 normal>]
清单中的字体可能会比较多,也有多种中文字体,比如上面例子中的PingFang HK 就是中文字体,将字体名称PingFang HK 用于font 就可以了:font_name='PingFang HK'
接下来进入重头戏,各个图表的绘制。由于每张图表数据区别甚大,因此我们分别来看。
表1:绘制收益率曲线图
收益率曲线图是整张图表的视觉重心,也是一张颇为复杂的复合图表,我们希望它能体现出最重要的关键信息,因此信息集成度比较高。
这张图表由三部分组成:
- 收益率曲线:包括基准收益率曲线和投资收益率曲线,为了体现基准收益率的变化,基准收益率曲线还被填充为两种颜色:正收益区间填充绿色,负收益区间填充红色,投资收益率就只是一条深红色曲线,在偏绿的背景下最为显眼。
- 交易持股指示:用于指示整个交易过程中什么时候买入、什么时候卖出,或者什么时候持股、什么时候空仓
- 最大回撤区间:使用箭头指出最大回撤区间和回撤比例
上面三部分我们逐步绘制。
1,绘制投资收益率以及基准收益率
收益率曲线用最简单的axex.plot() 函数即可实现,传入必要的参数如颜色color ,线形linestyle 、透明度alpha 等即可:
ax1.set_title('总收益率、基准收益率和交易历史', fontname='pingfang HK')
ax1.plot(looped_value.index, ref_rate, linestyle='-',
color=(0.4, 0.6, 0.8), alpha=0.85, label='Benchmark')
ax1.plot(looped_value.index, return_rate, linestyle='-',
color=(0.8, 0.2, 0.0), alpha=0.85, label='Return')
ax1.set_ylabel('收益率', fontname='pingfang HK')
ax1.yaxis.set_major_formatter(mtick.PercentFormatter())
用axes.fill_between() 在参考收益率的正负区间分别填充红色和绿色,使它更显眼。
ax1.fill_between(looped_value.index, 0, ref_rate,
where=ref_rate >= 0,
facecolor=(0.4, 0.6, 0.2), alpha=0.35)
ax1.fill_between(looped_value.index, 0, ref_rate,
where=ref_rate < 0,
facecolor=(0.8, 0.2, 0.0), alpha=0.35)
绘图效果如下,红色的投资曲线和红绿填充的基准曲线都显示在图表上了。
2,添加买卖区间
买卖区间可以有两种不同的方式表示:
- 方式1: 使用条状着色的方式,在持股区间(持有多头仓位)填充浅绿色,而空仓区间维持白色(更进一步,如果持有空头仓位,可以填充浅红色)
我们可以用axes.axvspan() 来填充纵向条纹,以表现某一个时间区间的状态,不过所有的条纹需要逐个填充,因此我们使用for 循环来填充所有持股区间。下面代码中change表示股票仓位发生变化的时间点,找出所有这样的时间点,再根据当时的仓位填充颜色就可以了。
注意facecolor=((1 - 0.6 * long_short), (1 - 0.4 * long_short), (1 - 0.8 * long_short)) 这行代码,实际上是根据仓位的高低来计算一个RGB 颜色值,当仓位为0 (空仓)时计算结果为(1.0, 1.0, 1.0) 即纯白色,而仓位为1 (满仓)时,计算结果为(0.4, 0.6, 0.2) 即绿色。因此条纹的颜色随仓位而变,仓位越高,颜色越深。
设置alpha=0.2 是为了确保填充的条纹足够透明,不会遮挡曲线图。
position_bounds = [looped_value.index[0]]
position_bounds.extend(looped_value.loc[change != 0].index)
position_bounds.append(looped_value.index[-1])
for first, second, long_short in zip(position_bounds[:-2], position_bounds[1:],
position.loc[position_bounds[:-2]]):
if long_short > 0:
if long_short > 1:
long_short = 1
ax1.axvspan(first, second,
facecolor=((1 - 0.6 * long_short), (1 - 0.4 * long_short), (1 - 0.8 * long_short)),
alpha=0.2)
else:
if long_short < -1:
long_short = -1
ax1.axvspan(first, second,
facecolor=((1 + 0.2 * long_short), (1 + 0.8 * long_short), (1 + long_short)),
alpha=0.2)
填充的效果如下:
- 方式2: 还有另外一种显示买卖点的方式:使用红绿箭头标记买卖点。
使用axes.scatter 可以在图表上任意坐标显示一个图形,marker="^" 表示向上箭头,marker="v" 表示向下箭头。因此,我们可以遍历历史上的所有买入和卖出点,在买入点曲线上绘制一个向上绿色箭头,在卖出点曲线上绘制一个向下红色箭头,这样也能展示出买卖点了。不过,如果买卖点特别密集的情况下,买卖点是无法分辨清楚的。
buy_points= np.where(change > 0, ref_rate, np.nan)
sell_points = np.where(change < 0, ref_rate, np.nan)
ax1.scatter(looped_value.index, buy_points, color='green',
label='Buy', marker='^', alpha=0.9)
ax1.scatter(looped_value.index, sell_points, color='red',
label='Sell', marker='v', alpha=0.9)
如下图所示,买卖点过于密集,难以分辨。 朋友们可以根据具体情况选择不同的买卖区间展示方式。
3,使用箭头标记最大回撤区间
使用axes.annotate() 函数可以在图表上绘制箭头,通过xy 参数可以指定箭头指向的位置、而xytext 的位置是箭头的出发(文字)坐标。在这里指定坐标非常方便,只需要按照数据坐标指定即可,例如,需要箭头指向图表中(2012年3月17日,125%)所在的点,需要传入的坐标就是:(2012-3-17, 1.25)
ax1.annotate(f"{mdd_date.date()}",
xy=(mdd_date, return_rate[mdd_date]),
xycoords='data',
xytext=(mdd_peak, return_rate[mdd_peak]),
textcoords='data',
arrowprops=dict(width=1, headwidth=3, facecolor='black', shrink=0.),
ha='right',
va='bottom')
if pd.notna(mdd_recover):
ax1.annotate(f"-{mdd:.1%}\n{mdd_date.date()}",
xy=(mdd_recover, return_rate[mdd_recover]),
xycoords='data',
xytext=(mdd_date, return_rate[mdd_date]),
textcoords='data',
arrowprops=dict(width=1, headwidth=3, facecolor='black', shrink=0.),
ha='right',
va='top')
else:
ax1.text(x=mdd_date,
y=return_rate[mdd_date],
s=f"-{mdd:.1%}\nnot recovered",
ha='right',
va='top')
ax1.legend()
如下图所示,最大回撤区间已经在图表上标记出来了。 有了上面这张图,整个投资过程中最关键的几个信息都已经完整展现出来了。不过,考虑到投资的复利效应,如果我们的投资组合非常给力,后期收益率非常高时,往往前期的收益会被压缩成一条直线,很难分辨,甚至有时基准首页也被压缩成直线了,这时我们可以使用一张对数比例的收益率曲线图来放大低收益区间的图形,以便看清整个投资区间的变化。
表2:绘制对数比例的收益率曲线图
绘制对数比例曲线图很简单,只需要设置axes.set_yscale('log') 即可:
ax2.set_title('对数比例回测收益率与基准收益率', fontname='pingfang HK')
ax2.plot(looped_value.index, adjusted_bench_start, linestyle='-',
color=(0.4, 0.6, 0.8), alpha=0.85, label='Benchmark')
ax2.plot(looped_value.index, looped_value.value, linestyle='-',
color=(0.8, 0.2, 0.0), alpha=0.85, label='Cum Value')
ax2.set_ylabel('收益率\n对数比例', fontname='pingfang HK')
ax2.yaxis.set_major_formatter(mtick.PercentFormatter())
ax2.set_yscale('log')
ax2.legend()
对比线性比例和对数比例的两张图,是否发现对数比例下,早期的投资收益情况更加清楚了?这是由于投资收益本身就是自带“指数”效应的,使用对数比例抵消指数效应,可以更清晰地展示整个投资全过程的收益情况。
表3:绘制收益额柱状图
接下来几张图表都比较简单,每日收益额可以用柱状图来展现,axes.bar() 函数专门用来显示柱状图:
ax3.set_title('收益额', fontname='pingfang HK')
ax3.bar(looped_value.index, ret)
ax3.set_ylabel('收益额', fontname='pingfang HK')
表4:绘制盈利能力指数变动图(阿尔法系数/夏普率)
投资组合的盈利能力通过alpha 和sharp 两个指标来体现,它们本身就是滚动数据,因此使用线图axes.plot() 来显示非常合适:
ax4.set_title('投资组合盈利能力: 滚动阿尔法系数和夏普率', fontname='pingfang HK')
ax4.plot(looped_value.index, sharp, label='sharp')
ax4.plot(looped_value.index, alpha, label='alpha')
ax4.set_ylabel('盈利能力', fontname='pingfang HK')
ax4.legend()
表5:绘制风险系数变动图(波动率/贝塔系数)
与前面一张表相似,波动率和贝塔系数都体现了投资组合的风险敞口,可以放到同一张图表中用线图表示,别忘记绘制图例axes.legend() 即可:
ax5.set_title('投资组合风险敞口: 滚动波动率和贝塔系数', fontname='pingfang HK')
ax5.plot(looped_value.index, volatility, label='volatility')
ax5.plot(looped_value.index, beta, label='beta')
ax5.set_ylabel('风险敞口', fontname='pingfang HK')
ax5.legend()
完成上面几张图的绘制后,使用plt.show()可以看到图表如下:
表6:历史回撤区间(潜水图——回撤比例图)
作为历史曲线图的最后一张图表,回撤区间图包含两部分:
历史回撤比例图显示了历史上每一天相对于前期高点的回撤比例,由于最大值为0(无回撤),最小值为1 ,而曲线总是在0 以下的某个位置波动,很像一条鱼只能在水下活动,不能超出水面,因此也叫潜水图。潜水图的数据已经存放在underwater 数据中了,因此可以直接调用,为了突出数据,可以把曲线和0之间的部分填充红色(使用axes.fill_between() )
ax6.set_title('历史最大回撤及收益率潜水图', fontname='pingfang HK')
ax6.plot(underwater, label='underwater')
ax6.set_ylabel('潜水图', fontname='pingfang HK')
ax6.set_xlabel('date')
ax6.set_ylim(-1, 0)
ax6.fill_between(looped_value.index, 0, underwater,
where=underwater < 0,
facecolor=(0.8, 0.2, 0.0), alpha=0.35)
dd_starts = drawdowns['peak_date'].head().values
dd_ends = drawdowns['recover_date'].head().values
dd_valley = drawdowns['valley_date'].head().values
dd_value = drawdowns['drawdown'].head().values
填充后的回撤比例图(潜水图)如下图所示: 紧接着我们要填充历史上的五个最大回撤区间。 使用for循环,找到历史上的最大五个回撤区间的开始、结束时间,使用axes.axvspan() 填充纵向条纹,同时使用axes.text() 标注最大回撤的比例,如下图所示:
for start, end, valley, dd in zip(dd_starts, dd_ends, dd_valley, dd_value):
if np.isnan(end):
end = looped_value.index[-1]
ax6.axvspan(start, end,
facecolor='grey',
alpha=0.3)
if dd > -0.6:
ax6.text(x=valley,
y=dd - 0.05,
s=f"-{dd:.1%}\n",
ha='center',
va='top')
else:
ax6.text(x=valley,
y=dd + 0.15,
s=f"-{dd:.1%}\n",
ha='center',
va='bottom')
这样,五个最大回撤区间就直观地展现出来了:
至此,我们已经完成了6张历史曲线图表的绘制。完整地展现了整个投资历史中的重要信息。不过,如果我们还想以月为单位,了解整个投资过程中收益率的统计信息,例如,月度收益率的分布情况如何?收益率平均分布,还是两极分化?这些信息都不容易从历史曲线图中看出来,这是我们就需要更多的统计图表了。
表7:月度收益热力图
热力图的好处是可以让人一目了然地同时看到所有年份所有月份的收益率,而且收益率以颜色来展示,非常直观。我们在前一篇文章中,计算了历史所有月份的收益率,并存储在了一个DataFrame中:
monthly_return_df
Out[50]:
Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec y-cum
2011 -0.029565 0.004999 -0.082161 -0.027755 0.000771 0.017896 0.036120 -0.089740 0.009318 0.001401 -0.029218 0.002102 -0.134492
2012 0.029766 0.043612 0.010853 0.008814 -0.004902 -0.036098 -0.031729 -0.039526 -0.020958 -0.022452 -0.013789 0.085873 0.053817
2013 0.077273 0.033820 -0.020762 0.002204 0.173830 -0.024849 0.026676 -0.074971 0.030109 -0.077782 0.048875 -0.054761 0.154530
2014 0.121668 -0.063620 -0.009860 -0.002996 0.009583 0.064157 0.048142 -0.015106 0.060307 -0.028095 0.117326 0.252034 0.751125
2015 -0.060772 0.135744 0.174663 0.064227 0.216783 -0.143777 0.045193 0.048436 -0.019001 0.131076 0.098636 0.002394 1.135973
2016 -0.028169 -0.009662 0.079805 -0.040826 0.007602 0.013114 0.026142 -0.011543 -0.046850 -0.007132 0.053011 -0.043511 -0.036378
2017 0.015494 0.034207 -0.013918 -0.017658 0.018582 0.047976 0.023719 0.007475 0.003254 0.027097 0.002371 -0.010655 0.184567
2018 0.043260 -0.044899 0.057105 -0.040944 -0.041885 0.021316 -0.028817 -0.048421 0.049822 0.002025 -0.014282 -0.028081 -0.114206
2019 0.006190 0.153495 0.079587 -0.026437 0.001902 0.033116 -0.009870 0.008138 -0.002944 0.011165 0.009196 0.041282 0.456783
2020 0.051435 0.152352 -0.081691 0.056878 -0.011090 0.128821 0.164158 -0.031281 -0.041442 -0.057777 0.039970 0.031110 0.548550
这样我们就可以绘制一张热力图,包含十行、十二列色块,每行代表一年,每列代表一个月,共计120个色块,每个色块的颜色代表当月的收益率大小。由于我们本来就已经用了DataFrame 格式,数据已经存储在行列中,因此我们用axes.imshow() 就可以轻松显示出热力图。 cmap='RdYlGn' 表示热力图的涂色方案,表示“red-yellow-green”红黄绿渐变着色,红色是最低收益率,绿色为最高收益率,如果要反过来,可以用cmap='GnYlRd' 。更多的涂色方案可以参考matplotlib 的文档。
monthly_df = monthly_return_df[['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']]
return_years = monthly_df.index
return_months = monthly_df.columns
return_values = monthly_df.values
c = ax7.imshow(return_values, cmap='RdYlGn')
ax7.set_title('月度收益热力图', fontname='pingfang HK')
ax7.set_xticks(np.arange(len(return_months)))
ax7.set_yticks(np.arange(len(return_years)))
ax7.set_xticklabels(return_months, rotation=45)
ax7.set_yticklabels(return_years)
base_aspect_ratio = 0.72
if len(return_years) <= 12:
aspect_ratio = base_aspect_ratio
else:
aspect_ratio = base_aspect_ratio * 12 / len(return_years)
ax7.set_aspect(aspect_ratio)
ax7.grid(False)
fig.colorbar(c, ax=ax7)
表8:年度收益率柱状图
每年的收益率可以显示为柱状图,为了确保每根柱子与表7里每一行(每一年)对齐,我们可以把柱状图显示为水平柱子,因此,不要使用axes.bar(),而是使用axes.barh()图显示水平柱状图。
y_cum = monthly_return_df['y-cum']
y_count = len(return_years)
pos_y_cum = np.where(y_cum >= 0, y_cum, 0)
neg_y_cum = np.where(y_cum < 0, y_cum, 0)
return_years = y_cum.index
ax8.barh(np.arange(y_count), pos_y_cum, 1, align='center', facecolor='green', alpha=0.85)
ax8.barh(np.arange(y_count), neg_y_cum, 1, align='center', facecolor='red', alpha=0.85)
ax8.set_yticks(np.arange(y_count))
ax8.set_ylim(y_count - 0.5, -0.5)
ax8.set_yticklabels(list(return_years))
ax8.set_title('年度收益率', fontname='pingfang HK')
ax8.grid(False)
表9:月度收益率直方图
最后是月度收益率的直方图,通过这张直方图,我们可以看到收益率的概率分布,从而判断收益率分布均匀,还是两极分化。
ax9.set_title('月度收益山积图', fontname='pingfang HK')
ax9.hist(monthly_return_df.values.flatten(), bins=18, alpha=0.5,
label='monthly returns')
ax9.grid(False)
最终效果如下:
总结及完整代码
至此,我们的整个图表就绘制完成了。通过这两篇文章,我们通过一个例子介绍了投资组合历史回测结果的评价,并且通过一张可视化图表展示了投资组合的结果。
完整的代码如下(包括前一篇文章中的代码),示例数据在这里下载:
if __name__ == '__main__':
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.ticker as mtick
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
looped_value = pd.read_csv('example_data.csv', index_col=0)
looped_value.index = pd.to_datetime(looped_value.index)
total_rounds = len(looped_value.index)
total_days = (looped_value.index[-1] - looped_value.index[0]).days
total_years = total_days / 365.
total_months = int(np.round(total_days / 30))
total_invest = looped_value.iloc[0].cash
final_value = looped_value.iloc[-1].value
holding_stocks = looped_value.copy()
holding_stocks.drop(columns=['cash', 'value', 'benchmark'], inplace=True)
holding_movements = holding_stocks - holding_stocks.shift(1)
holding_long = np.where(holding_stocks > 0, np.sign(holding_stocks), 0)
holding_short = np.where(holding_stocks < 0, np.sign(holding_stocks), 0)
holding_inc = np.where(holding_movements > 0, np.sign(holding_movements), 0)
holding_dec = np.where(holding_movements < 0, np.sign(holding_movements), 0)
sell_counts = -holding_dec.sum(axis=0)
buy_counts = holding_inc.sum(axis=0)
long_percent = holding_long.sum(axis=0) / total_rounds
short_percent = -holding_short.sum(axis=0) / total_rounds
op_counts = pd.DataFrame(sell_counts, index=holding_stocks.columns, columns=['sell'])
op_counts['buy'] = buy_counts
op_counts['total'] = op_counts.buy + op_counts.sell
op_counts['long'] = long_percent
op_counts['short'] = short_percent
op_counts['empty'] = 1 - op_counts.long - op_counts.short
looped_value['invest'] = total_invest
looped_value['rtn'] = looped_value.value / looped_value['invest'] - 1
total_return = looped_value['rtn'].iloc[-1]
ys = (looped_value.index - looped_value.index[0]).days / 365.
looped_value['annual_rtn'] = (looped_value.rtn + 1) ** (1 / ys) - 1
annual_return = looped_value['annual_rtn'].iloc[-1]
looped_value['pct_change'] = looped_value.value / looped_value.value.shift(1) - 1
ref_return = looped_value.benchmark.iloc[-1] / looped_value.benchmark.iloc[0]
ref_annual_rtn = (ref_return + 1) ** (1 / ys[-1]) - 1
first_year = looped_value.index[0].year
last_year = looped_value.index[-1].year
starts = pd.date_range(start=str(first_year - 1) + '1231',
end=str(last_year) + '1130',
freq='M') + pd.Timedelta(1, 'd')
ends = pd.date_range(start=str(first_year) + '0101',
end=str(last_year) + '1231',
freq='M')
monthly_returns = list()
for start, end in zip(starts, ends):
val = looped_value['value'].loc[start:end]
if len(val) > 0:
monthly_returns.append(val.iloc[-1] / val.iloc[0] - 1)
else:
monthly_returns.append(np.nan)
year_count = len(monthly_returns) // 12
monthly_returns = np.array(monthly_returns).reshape(year_count, 12)
monthly_return_df = pd.DataFrame(monthly_returns,
columns=['Jan', 'Feb', 'Mar', 'Apr',
'May', 'Jun', 'Jul', 'Aug',
'Sep', 'Oct', 'Nov', 'Dec'],
index=range(first_year, last_year + 1))
starts = pd.date_range(start=str(first_year - 1) + '1231',
end=str(last_year) + '1130',
freq='Y') + pd.Timedelta(1, 'd')
ends = pd.date_range(start=str(first_year) + '0101',
end=str(last_year) + '1231',
freq='Y')
yearly_returns = []
for start, end in zip(starts, ends):
val = looped_value['value'].loc[start:end]
if len(val) > 0:
yearly_returns.append(val.iloc[-1] / val.iloc[0] - 1)
else:
yearly_returns.append(np.nan)
monthly_return_df['y-cum'] = yearly_returns
ret = (looped_value['value'] / looped_value['value'].shift(1)) - 1
volatility = ret.rolling(250).std() * np.sqrt(250)
looped_value['volatility'] = volatility
avg_volatility = looped_value.volatility.mean()
cummax = looped_value['value'].cummax()
looped_value['underwater'] = (looped_value['value'] - cummax) / cummax
drawdown_sign = np.sign(looped_value.underwater)
diff = drawdown_sign - drawdown_sign.shift(1)
drawdown_starts = np.where(diff == -1)[0]
drawdown_ends = np.where(diff == 1)[0]
drawdown_count = min(len(drawdown_starts), len(drawdown_ends))
all_drawdowns = []
for i_start, i_end in zip(drawdown_starts[:drawdown_count], drawdown_ends[:drawdown_count]):
dd_start = looped_value.index[i_start - 1]
dd_end = looped_value.index[i_end]
dd_min = looped_value['underwater'].iloc[i_start:i_end].idxmin()
dd = looped_value['underwater'].loc[dd_min]
all_drawdowns.append((dd_start, dd_min, dd_end, dd))
if len(drawdown_starts) > drawdown_count:
dd_start = looped_value.index[drawdown_starts[-1] - 1]
dd_end = np.nan
dd_min = looped_value['underwater'].iloc[drawdown_starts[-1]:].idxmin()
dd = looped_value['underwater'].loc[dd_min]
all_drawdowns.append((dd_start, dd_min, dd_end, dd))
dd_df = pd.DataFrame(all_drawdowns, columns=['peak_date', 'valley_date', 'recover_date', 'drawdown'])
dd_df.sort_values(by='drawdown', inplace=True)
mdd = dd_df.iloc[0].drawdown
mdd_date = dd_df.iloc[0].valley_date
mdd_peak = dd_df.iloc[0].peak_date
mdd_recover = dd_df.iloc[0].recover_date
loop_len = len(looped_value)
ret = looped_value['value'] / looped_value['value'].shift(1) - 1
roll_yearly_return = ret.rolling(250).mean() * 250
looped_value['sharp'] = (roll_yearly_return - 0.035) / looped_value['volatility']
avg_sharp = looped_value.sharp.mean()
value = looped_value['value']
cummax = value.cummax()
drawdown = (cummax - value) / cummax
ret = value / value.shift(250) - 1
looped_value['calmar'] = ret / drawdown.rolling(250).max()
avg_calmar = looped_value['calmar'].mean()
ref = looped_value['benchmark']
ref_ret = (ref / ref.shift(1)) - 1
ret_dev = looped_value['pct_change'].rolling(250).var()
looped_value['beta'] = looped_value['pct_change'].rolling(250).cov(ref_ret) / ret_dev
avg_beta = looped_value['beta'].mean()
loop_len = len(looped_value)
year_ret = looped_value.value / looped_value['value'].shift(250) - 1
bench = looped_value['benchmark']
bench_ret = (bench / bench.shift(250)) - 1
looped_value['alpha'] = (year_ret - 0.035) - looped_value['beta'] * (bench_ret - 0.035)
avg_alpha = looped_value['alpha'].mean()
chart_width = 0.88
fig = plt.figure(figsize=(12, 15), facecolor=(0.82, 0.83, 0.85))
ax1 = fig.add_axes([0.05, 0.67, 0.88, 0.20])
ax2 = fig.add_axes([0.05, 0.57, 0.88, 0.08], sharex=ax1)
ax3 = fig.add_axes([0.05, 0.49, 0.88, 0.06], sharex=ax1)
ax4 = fig.add_axes([0.05, 0.41, 0.88, 0.06], sharex=ax1)
ax5 = fig.add_axes([0.05, 0.33, 0.88, 0.06], sharex=ax1)
ax6 = fig.add_axes([0.05, 0.25, 0.88, 0.06], sharex=ax1)
ax7 = fig.add_axes([0.05, 0.04, 0.35, 0.16])
ax8 = fig.add_axes([0.45, 0.04, 0.15, 0.16])
ax9 = fig.add_axes([0.64, 0.04, 0.29, 0.16])
for ax in [ax1, ax2, ax3, ax4, ax5, ax6]:
ax.yaxis.tick_right()
ax.xaxis.set_ticklabels([])
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.grid(True)
years = mdates.YearLocator()
months = mdates.MonthLocator()
weekdays = mdates.WeekdayLocator()
years_fmt = mdates.DateFormatter('%Y')
month_fmt_none = mdates.DateFormatter('')
month_fmt_l = mdates.DateFormatter('%y/%m')
month_fmt_s = mdates.DateFormatter('%m')
result_columns = looped_value.columns
fixed_column_items = ['fee', 'cash', 'value', 'reference', 'ref', 'ret',
'invest', 'underwater', 'volatility', 'pct_change',
'beta', 'sharp', 'alpha']
stock_holdings = [item for
item in
result_columns if
item not in fixed_column_items and
item[-2:] != '_p']
change = (looped_value[stock_holdings] - looped_value[stock_holdings].shift(1)).sum(1)
start_point = looped_value['value'].iloc[0]
ref_start = looped_value['benchmark'].iloc[0]
ret = looped_value['value'] - looped_value['value'].shift(1)
position = 1 - (looped_value['cash'] / looped_value['value'])
beta = looped_value['beta']
alpha = looped_value['alpha']
volatility = looped_value['volatility']
sharp = looped_value['sharp']
underwater = looped_value['underwater']
drawdowns = dd_df
return_rate = (looped_value.value - start_point) / start_point * 100
ref_rate = (looped_value.benchmark - ref_start) / ref_start * 100
adjusted_bench_start = looped_value.benchmark / ref_start * start_point
title_asset_pool = '沪深300/创业板指 大小盘轮动策略'
fig.suptitle(f'回测交易结果: {title_asset_pool} - 业绩基准: 沪深300指数',
fontsize=14,
fontweight=10,
fontname='pingfang HK')
fig.text(0.07, 0.955, f'回测期长: {total_years:3.1f} 年, '
f' 从: {looped_value.index[0].date()} 起至 {looped_value.index[-1].date()}止',
fontname='pingfang HK')
fig.text(0.21, 0.90, f'交易操作汇总:\n\n\n'
f'投资总金额:\n'
f'期末总资产:', ha='right',
fontname='pingfang HK')
fig.text(0.23, 0.90, f'{op_counts.buy.sum():.0f} 次买入 \n'
f'{op_counts.sell.sum():.0f} 次卖出\n\n'
f'¥{total_invest:13,.2f}\n'
f'¥{final_value:13,.2f}',
fontname='pingfang HK')
fig.text(0.50, 0.90, f'总投资收益率:\n'
f'平均年化收益率:\n'
f'基准投资收益率:\n'
f'基准投资平均年化收益率:\n'
f'最大回撤:\n', ha='right',
fontname='pingfang HK')
fig.text(0.52, 0.90, f'{total_return:.2%} \n'
f'{annual_return: .2%} \n'
f'{ref_return:.2%} \n'
f'{ref_annual_rtn:.2%}\n'
f'{mdd:.1%} \n'
f' 底部日期 {mdd_date.date()}',
fontname='pingfang HK')
fig.text(0.82, 0.90, f'alpha / 阿尔法系数:\n'
f'Beta / 贝塔系数:\n'
f'Sharp ratio / 夏普率:\n'
f'Calmar ratio / 卡尔玛比率:\n'
f'250-日滚动波动率:', ha='right',
fontname='pingfang HK')
fig.text(0.84, 0.90, f'{avg_alpha:.3f} \n'
f'{avg_beta:.3f} \n'
f'{avg_sharp:.3f} \n'
f'{avg_calmar:.3f} \n'
f'{avg_volatility:.3f}',
fontname='pingfang HK')
ax1.set_title('总收益率、基准收益率和交易历史', fontname='pingfang HK')
ax1.plot(looped_value.index, ref_rate, linestyle='-',
color=(0.4, 0.6, 0.8), alpha=0.85, label='Benchmark')
ax1.plot(looped_value.index, return_rate, linestyle='-',
color=(0.8, 0.2, 0.0), alpha=0.85, label='Return')
ax1.set_ylabel('收益率', fontname='pingfang HK')
ax1.yaxis.set_major_formatter(mtick.PercentFormatter())
ax1.fill_between(looped_value.index, 0, ref_rate,
where=ref_rate >= 0,
facecolor=(0.4, 0.6, 0.2), alpha=0.35)
ax1.fill_between(looped_value.index, 0, ref_rate,
where=ref_rate < 0,
facecolor=(0.8, 0.2, 0.0), alpha=0.35)
position_bounds = [looped_value.index[0]]
position_bounds.extend(looped_value.loc[change != 0].index)
position_bounds.append(looped_value.index[-1])
for first, second, long_short in zip(position_bounds[:-2], position_bounds[1:],
position.loc[position_bounds[:-2]]):
if long_short > 0:
if long_short > 1:
long_short = 1
ax1.axvspan(first, second,
facecolor=((1 - 0.6 * long_short), (1 - 0.4 * long_short), (1 - 0.8 * long_short)),
alpha=0.2)
else:
if long_short < -1:
long_short = -1
ax1.axvspan(first, second,
facecolor=((1 + 0.2 * long_short), (1 + 0.8 * long_short), (1 + long_short)),
alpha=0.2)
ax1.annotate(f"{mdd_date.date()}",
xy=(mdd_date, return_rate[mdd_date]),
xycoords='data',
xytext=(mdd_peak, return_rate[mdd_peak]),
textcoords='data',
arrowprops=dict(width=1, headwidth=3, facecolor='black', shrink=0.),
ha='right',
va='bottom')
if pd.notna(mdd_recover):
ax1.annotate(f"-{mdd:.1%}\n{mdd_date.date()}",
xy=(mdd_recover, return_rate[mdd_recover]),
xycoords='data',
xytext=(mdd_date, return_rate[mdd_date]),
textcoords='data',
arrowprops=dict(width=1, headwidth=3, facecolor='black', shrink=0.),
ha='right',
va='top')
else:
ax1.text(x=mdd_date,
y=return_rate[mdd_date],
s=f"-{mdd:.1%}\nnot recovered",
ha='right',
va='top')
ax1.legend()
ax2.set_title('对数比例回测收益率与基准收益率', fontname='pingfang HK')
ax2.plot(looped_value.index, adjusted_bench_start, linestyle='-',
color=(0.4, 0.6, 0.8), alpha=0.85, label='Benchmark')
ax2.plot(looped_value.index, looped_value.value, linestyle='-',
color=(0.8, 0.2, 0.0), alpha=0.85, label='Cum Value')
ax2.set_ylabel('收益率\n对数比例', fontname='pingfang HK')
ax2.yaxis.set_major_formatter(mtick.PercentFormatter())
ax2.set_yscale('log')
ax2.legend()
ax3.set_title('收益额', fontname='pingfang HK')
ax3.bar(looped_value.index, ret)
ax3.set_ylabel('收益额', fontname='pingfang HK')
ax4.set_title('投资组合盈利能力: 滚动阿尔法系数和夏普率', fontname='pingfang HK')
ax4.plot(looped_value.index, sharp, label='sharp')
ax4.plot(looped_value.index, alpha, label='alpha')
ax4.set_ylabel('盈利能力', fontname='pingfang HK')
ax4.legend()
ax5.set_title('投资组合风险敞口: 滚动波动率和贝塔系数', fontname='pingfang HK')
ax5.plot(looped_value.index, volatility, label='volatility')
ax5.plot(looped_value.index, beta, label='beta')
ax5.set_ylabel('风险敞口', fontname='pingfang HK')
ax5.legend()
ax6.set_title('历史最大回撤及收益率潜水图', fontname='pingfang HK')
ax6.plot(underwater, label='underwater')
ax6.set_ylabel('潜水图', fontname='pingfang HK')
ax6.set_xlabel('date')
ax6.set_ylim(-1, 0)
ax6.fill_between(looped_value.index, 0, underwater,
where=underwater < 0,
facecolor=(0.8, 0.2, 0.0), alpha=0.35)
dd_starts = drawdowns['peak_date'].head().values
dd_ends = drawdowns['recover_date'].head().values
dd_valley = drawdowns['valley_date'].head().values
dd_value = drawdowns['drawdown'].head().values
for start, end, valley, dd in zip(dd_starts, dd_ends, dd_valley, dd_value):
if np.isnan(end):
end = looped_value.index[-1]
ax6.axvspan(start, end,
facecolor='grey',
alpha=0.3)
if dd > -0.6:
ax6.text(x=valley,
y=dd - 0.05,
s=f"-{dd:.1%}\n",
ha='center',
va='top')
else:
ax6.text(x=valley,
y=dd + 0.15,
s=f"-{dd:.1%}\n",
ha='center',
va='bottom')
monthly_df = monthly_return_df[['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']]
return_years = monthly_df.index
return_months = monthly_df.columns
return_values = monthly_df.values
c = ax7.imshow(return_values, cmap='RdYlGn')
ax7.set_title('月度收益热力图', fontname='pingfang HK')
ax7.set_xticks(np.arange(len(return_months)))
ax7.set_yticks(np.arange(len(return_years)))
ax7.set_xticklabels(return_months, rotation=45)
ax7.set_yticklabels(return_years)
base_aspect_ratio = 0.72
if len(return_years) <= 12:
aspect_ratio = base_aspect_ratio
else:
aspect_ratio = base_aspect_ratio * 12 / len(return_years)
ax7.set_aspect(aspect_ratio)
ax7.grid(False)
fig.colorbar(c, ax=ax7)
y_cum = monthly_return_df['y-cum']
y_count = len(return_years)
pos_y_cum = np.where(y_cum >= 0, y_cum, 0)
neg_y_cum = np.where(y_cum < 0, y_cum, 0)
return_years = y_cum.index
ax8.barh(np.arange(y_count), pos_y_cum, 1, align='center', facecolor='green', alpha=0.85)
ax8.barh(np.arange(y_count), neg_y_cum, 1, align='center', facecolor='red', alpha=0.85)
ax8.set_yticks(np.arange(y_count))
ax8.set_ylim(y_count - 0.5, -0.5)
ax8.set_yticklabels(list(return_years))
ax8.set_title('年度收益率', fontname='pingfang HK')
ax8.grid(False)
ax9.set_title('月度收益山积图', fontname='pingfang HK')
ax9.hist(monthly_return_df.values.flatten(), bins=18, alpha=0.5,
label='monthly returns')
ax9.grid(False)
major_locator = years
major_formatter = years_fmt
minor_locator = months
minor_formatter = month_fmt_none
ax6.xaxis.set_major_locator(major_locator)
ax6.xaxis.set_major_formatter(major_formatter)
ax6.xaxis.set_minor_locator(minor_locator)
ax6.xaxis.set_minor_formatter(minor_formatter)
for ax in [ax1, ax2, ax3, ax4, ax5]:
plt.setp(ax.get_xticklabels(), visible=False)
plt.show()
|