????????又要完成tushare的季度任务了。这里分享一下笔者从tushare批量获取交易、财务数据的代码。一共获取了如下数据:收盘价、涨跌幅、成交量、基本每股收益、总资产周转率、每股净资产、净资产收益率、资产负债率、流动资产/总资产、产权比率、资产总计、总股本。分别涉及接口:pro.fina_indicator,pro.balancesheet,ts.pro_bar。代码如下:
? ? ? ? 首先进行相关的设置:
import pandas as pd
import numpy as np
import tushare as ts
import time
#设置登录token
ts.set_token('自己的token')
pro = ts.pro_api()
#所有股票的代码
code = pro.stock_basic()
? ? ? ? 下面三个函数是获取某个个股某一段时间的数据。注意有个递归调用,目的是为了让程序不受到断网的影响(非常重要,要不然每次都是批量调用还没完就断了还没保存就完蛋)。其中codelist,start_t,end_t分别是股票代码,开始日期和结束日期。
def get_fina_indicator(codelist,start_t,end_t):
try:
data=pro.fina_indicator(ts_code=codelist,start_date=start_t,end_date=end_t)
except:
data=get_fina_indicator(codelist,start_t,end_t)
return data
def get_balancesheet(codelist,start_t,end_t):
try:
data=pro.balancesheet(ts_code=codelist,start_date=start_t,end_date=end_t)
except:
data=get_balancesheet(codelist,start_t,end_t)
return data
def get_bar(codelist,start_t,end_t):
try:
data=ts.pro_bar(ts_code=codelist,start_date=start_t,end_date=end_t)
except:
data=get_bar(codelist,start_t,end_t)
return data
? ? ? ? ?接下来三个函数是分别批量调用上面三个函数。不过注意tushare数据经常有重复(不知道为什么),需要手动按照日期来删除。其中前两个参数starting_year,ending_year分别代表开始年份和结束年份,后面两个参数不是让用户调用的(当然这几个程序本身也不是让用户调用的),是最后面一个程序内调用的。为了简单起见直接写成从开始年份的一月开始到结束年份的年底的期限。
def getting_ratio(starting_year,ending_year,start_obs,end_obs):
start=str(starting_year)+'0101'
if ending_year!=None:
end=str(ending_year)+'1231'
else:
end=None
a=get_fina_indicator(code['ts_code'][start_obs],start,end)
eps_test0 = a[['end_date','eps']]
eps_test0.columns=['date',code['ts_code'][start_obs]]
assets_turn_test0=a[['end_date','assets_turn']]
assets_turn_test0.columns=['date',code['ts_code'][start_obs]]
bps_test0=a[['end_date','bps']]
bps_test0.columns=['date',code['ts_code'][start_obs]]
roe_test0=a[['end_date','roe']]
roe_test0.columns=['date',code['ts_code'][start_obs]]
debt_to_assets_test0=a[['end_date','debt_to_assets']]
debt_to_assets_test0.columns=['date',code['ts_code'][start_obs]]
ca_to_assets_test0=a[['end_date','ca_to_assets']]
ca_to_assets_test0.columns=['date',code['ts_code'][start_obs]]
debt_to_eqt_test0=a[['end_date','debt_to_eqt']]
debt_to_eqt_test0.columns=['date',code['ts_code'][start_obs]]
for i in range(start_obs+1,end_obs):
if i==code.shape[0]:
break
b=get_fina_indicator(code['ts_code'][i],start,end)
eps2_test0 = b[['end_date','eps']]
eps2_test0.columns=['date',code['ts_code'][i]]
assets_turn2_test0=b[['end_date','assets_turn']]
assets_turn2_test0.columns=['date',code['ts_code'][i]]
bps2_test0=b[['end_date','bps']]
bps2_test0.columns=['date',code['ts_code'][i]]
roe2_test0=b[['end_date','roe']]
roe2_test0.columns=['date',code['ts_code'][i]]
debt_to_assets2_test0=b[['end_date','debt_to_assets']]
debt_to_assets2_test0.columns=['date',code['ts_code'][i]]
ca_to_assets2_test0=b[['end_date','ca_to_assets']]
ca_to_assets2_test0.columns=['date',code['ts_code'][i]]
debt_to_eqt2_test0=b[['end_date','debt_to_eqt']]
debt_to_eqt2_test0.columns=['date',code['ts_code'][i]]
eps_test0=eps_test0.merge(eps2_test0,how='outer').drop_duplicates()
assets_turn_test0=assets_turn_test0.merge(assets_turn2_test0,how='outer').drop_duplicates()
bps_test0=bps_test0.merge(bps2_test0,how='outer').drop_duplicates()
roe_test0=roe_test0.merge(roe2_test0,how='outer').drop_duplicates()
debt_to_assets_test0=debt_to_assets_test0.merge(debt_to_assets2_test0,how='outer').drop_duplicates()
ca_to_assets_test0=ca_to_assets_test0.merge(ca_to_assets2_test0,how='outer').drop_duplicates()
debt_to_eqt_test0=debt_to_eqt_test0.merge(debt_to_eqt2_test0,how='outer').drop_duplicates()
#经常有重复的观测而且drop_duplicate又删不掉,最后越来越多内存就炸了。所以按照date再次删除缺失值
eps_test0=eps_test0.loc[eps_test0['date'].drop_duplicates().index]
assets_turn_test0=assets_turn_test0.loc[assets_turn_test0['date'].drop_duplicates().index]
bps_test0=bps_test0.loc[bps_test0['date'].drop_duplicates().index]
roe_test0=roe_test0.loc[roe_test0['date'].drop_duplicates().index]
debt_to_assets_test0=debt_to_assets_test0.loc[debt_to_assets_test0['date'].drop_duplicates().index]
ca_to_assets_test0=ca_to_assets_test0.loc[ca_to_assets_test0['date'].drop_duplicates().index]
debt_to_eqt_test0=debt_to_eqt_test0.loc[debt_to_eqt_test0['date'].drop_duplicates().index]
print('财务指标:',i)
return eps_test0,assets_turn_test0,bps_test0,roe_test0,debt_to_assets_test0,ca_to_assets_test0,debt_to_eqt_test0
def getting_bs(starting_year,ending_year,start_obs,end_obs):
start=str(starting_year)+'0101'
if ending_year!=None:
end=str(ending_year)+'1231'
else:
end=None
a=get_balancesheet(code['ts_code'][start_obs],start,end)
total_assets_test0 = a[['end_date','total_assets']]
total_assets_test0.columns=['date',code['ts_code'][start_obs]]
total_share_test0 = a[['end_date','total_share']]
total_share_test0.columns=['date',code['ts_code'][start_obs]]
for i in range(start_obs+1,end_obs):
if i==code.shape[0]:
break
b=get_balancesheet(code['ts_code'][i],start,end)
total_assets2_test0 = b[['end_date','total_assets']]
total_assets2_test0.columns=['date',code['ts_code'][i]]
total_share2_test0 = b[['end_date','total_share']]
total_share2_test0.columns=['date',code['ts_code'][i]]
total_assets_test0=total_assets_test0.merge(total_assets2_test0,how='outer').drop_duplicates()
total_assets_test0=total_assets_test0.loc[total_assets_test0['date'].drop_duplicates().index]
total_share_test0=total_share_test0.merge(total_share2_test0,how='outer').drop_duplicates()
total_share_test0=total_share_test0.loc[total_share_test0['date'].drop_duplicates().index]
print('资产负债表:',i)
return total_assets_test0,total_share_test0
def getting_return(starting_year,ending_year,start_obs,end_obs):
start=str(starting_year)+'0101'
if ending_year!=None:
end=str(ending_year)+'1231'
else:
end=None
a=get_bar(code['ts_code'][start_obs],start,end)
close_test0 = a[['trade_date','close']]
close_test0.columns=['date',code['ts_code'][start_obs]]
pct_chg_test0 = a[['trade_date','pct_chg']]
pct_chg_test0.columns=['date',code['ts_code'][start_obs]]
vol_test0 = a[['trade_date','vol']]
vol_test0.columns=['date',code['ts_code'][start_obs]]
for i in range(start_obs+1,end_obs):
if i==code.shape[0]:
break
b=get_bar(code['ts_code'][i],start,end)
close2_test0 = b[['trade_date','close']]
close2_test0.columns=['date',code['ts_code'][i]]
pct_chg2_test0 = b[['trade_date','pct_chg']]
pct_chg2_test0.columns=['date',code['ts_code'][i]]
vol2_test0 = b[['trade_date','vol']]
vol2_test0.columns=['date',code['ts_code'][i]]
close_test0=close_test0.merge(close2_test0,how='outer').drop_duplicates()
close_test0=close_test0.loc[close_test0['date'].drop_duplicates().index]
pct_chg_test0=pct_chg_test0.merge(pct_chg2_test0,how='outer').drop_duplicates()
pct_chg_test0=pct_chg_test0.loc[pct_chg_test0['date'].drop_duplicates().index]
vol_test0=vol_test0.merge(vol2_test0,how='outer').drop_duplicates()
vol_test0=vol_test0.loc[vol_test0['date'].drop_duplicates().index]
print('行情数据:',i)
return close_test0,pct_chg_test0,vol_test0
????????最后分别调用上面的函数批量获取数据。可能会有小伙伴问为什么不直接用上面的代码调用。原因是这三个接口有两个都有每分钟调用50次的限制,而且短时间内调用过多最后会有非常明显的减速(按照最开始的速度应该两小时能下载完所有数据,不过只用上面三个接口基本上得八小时以上才能下载完,因为后面减速太多了)。为了尽快下载数据,下面这个函数的作用是分批轮流调用上面那三个函数(现在确实是两个小时多点就能下载完了)。
def getdata(startyear,endyear=None):
i=0
eps_test1,assets_turn_test1,bps_test1,roe_test1,debt_to_assets_test1,ca_to_assets_test1,debt_to_eqt_test1=getting_ratio(startyear,endyear,49*i,49*(i+1))
total_assets_test1,total_share_test1=getting_bs(startyear,endyear,49*i,49*(i+1))
close_test1,pct_chg_test1,vol_test1=getting_return(startyear,endyear,49*i,49*(i+1))
time.sleep(60)
for i in range(1,int(code.shape[0]/49)):
eps2_test1,assets_turn2_test1,bps2_test1,roe2_test1,debt_to_assets2_test1,ca_to_assets2_test1,debt_to_eqt2_test1=getting_ratio(startyear,endyear,49*i,49*(i+1))
total_assets2_test1,total_share2_test1=getting_bs(startyear,endyear,49*i,49*(i+1))
close2_test1,pct_chg2_test1,vol2_test1=getting_return(startyear,endyear,49*i,49*(i+1))
eps_test1=eps_test1.merge(eps2_test1,how='outer').drop_duplicates()
assets_turn_test1=assets_turn_test1.merge(assets_turn2_test1,how='outer').drop_duplicates()
bps_test1=bps_test1.merge(bps2_test1,how='outer').drop_duplicates()
roe_test1=roe_test1.merge(roe2_test1,how='outer').drop_duplicates()
debt_to_assets_test1=debt_to_assets_test1.merge(debt_to_assets2_test1,how='outer').drop_duplicates()
ca_to_assets_test1=ca_to_assets_test1.merge(ca_to_assets2_test1,how='outer').drop_duplicates()
debt_to_eqt_test1=debt_to_eqt_test1.merge(debt_to_eqt2_test1,how='outer').drop_duplicates()
total_assets_test1=total_assets_test1.merge(total_assets2_test1,how='outer').drop_duplicates()
total_share_test1=total_share_test1.merge(total_share2_test1,how='outer').drop_duplicates()
close_test1=close_test1.merge(close2_test1,how='outer').drop_duplicates()
pct_chg_test1=pct_chg_test1.merge(pct_chg2_test1,how='outer').drop_duplicates()
vol_test1=vol_test1.merge(vol2_test1,how='outer').drop_duplicates()
eps_test1=eps_test1.sort_values(by='date')
assets_turn_test1=assets_turn_test1.sort_values(by='date')
bps_test1=bps_test1.sort_values(by='date')
roe_test1=roe_test1.sort_values(by='date')
debt_to_assets_test1=debt_to_assets_test1.sort_values(by='date')
ca_to_assets_test1=ca_to_assets_test1.sort_values(by='date')
debt_to_eqt_test1=debt_to_eqt_test1.sort_values(by='date')
total_assets_test1=total_assets_test1.sort_values(by='date')
total_share_test1=total_share_test1.sort_values(by='date')
close_test1=close_test1.sort_values(by='date')
pct_chg_test1=pct_chg_test1.sort_values(by='date')
vol_test1=vol_test1.sort_values(by='date')
time.sleep(60)
return eps_test1,assets_turn_test1,bps_test1,roe_test1,debt_to_assets_test1,ca_to_assets_test1,debt_to_eqt_test1,total_assets_test1,total_share_test1,close_test1,pct_chg_test1,vol_test1
? ? ? ? 最后调用上面的代码。注意只有这里才是留给用户操作的,可以选择下载数据的时间跨度。不过注意因为好像有个接口一次只能下载5000个数据,所以建议一次下载的时间跨度不要太大,比如这里是2007-2017年的:
eps,assets_turn,bps,roe,debt_to_assets,ca_to_assets,debt_to_eqt,total_assets,total_share,close,pct_chg,vol=getdata(2007,2017)
? ? ? ? 上面就是所有代码了。如果有同学想要下载其他数据,可以照猫画虎地更改上面的代码。最后感谢大家花时间来看本文!欢迎交流。
|