? ? ? ? 利用baostock下在股票历史数据
bs.query_history_k_data_plus(code, fields, start_date, end_date, frequency, adjustflag)
? ? ? ? code,股票代码;
? ? ? ? fields,表字段date,open,high,low,close,preclose,volume,amount,adjustflag,
turn,tradestatus,pctChg,peTTM,pbMRQ,psTTM,pcfNcfTTM,isST。也可以选取这其中一部分。
? ? ? ? start_date,起始日期;
? ? ? ? end_date,结束日期;
? ? ? ? frequency,该参数为'd'时,表示下载日线数据;
? ? ? ? adjustflag,复权类型,=’2‘为前复权
要下载全部A股数据,耗时会相当长,因此,这里可以引入多进程模块multiprocessing,但是,运行多进程的时候,又总有进程会莫名的卡死,导致程序即不报错,也不停止。于是,再引入一个超时模块func_timeout.
from multiprocessing import Pool
from func_timeout import func_set_timeout
? ? ? ? ?定义函数get_history_data()用来获取股票历史数据,用func_timeout盯着它。
import baostock as bs
from func_timeout import func_set_timeout
# 设置50秒超时
@func_set_timeout(50)
def get_history_data(code, start='1990-12-19', end=datetime.date.today().strftime('%Y-%m-%d'),
adjustflag='2', ):
"""
获取股票历史数据
:param code: 股票代码
:param start: 开始日期
:param end: 结束日期
:param adjustflag: 复权类型,默认为 2 ,前复权
:return: 股票历史数据的DataFrame
"""
# 登录baostock
bs.login()
print(f'正在建立{code}的下载链接......')
df = bs.query_history_k_data_plus(code,fields=self.baostock_fields, start_date=start, end_date=end,
frequency='d', adjustflag=adjustflag).get_data()
print(f'{code} is done!')
return df
? ? ? ? ?再定义函数data_handle(),用来检查获取数据是否超时,顺便把相应的数据类型也改一下
import pandas as pd
def data_handle(code):
"""
数据处理
:code: 股票代码
:return: 处理后的股票历史数据
"""
df = pd.DataFrame()
i = 0
while df.shape[0] == 0:
i += 1
try:
df = get_history_data(code)
except:
print(f'**********************************{code}第{i}次超时了啊****************************************')
# 剔除停盘数据
df = df[(df['volume'] != '0') & (df['volume'] != '')]
# 删除重复数据
df.drop_duplicates(['date'], inplace=True)
# 重置索引
df.reset_index(drop=True, inplace=True)
# 把日期改为datetime型
df['date'] = pd.to_datetime(df['date'])
# 将数值数据转为float型,便于后续处理
convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']
df[convert_list] = df[convert_list].astype(float)
return df
? ? ? ? 再定义函数data_to_myslq(),把下载来的数据写到MySQL数据库,操作MySQL数据库,我用的是sqlalchemy.。其中create_engine里面有个参数poolclass如果不写的话,运行起来好像更快,但它却不能把所有数据都写完,写着写着就不写了,程序也不报错,还能运行完。设置poolclass=NullPool,能把数据都写完,就是相对慢一点。
import sqlalchemy
def data_to_mysql(code ):
"""
MySQL数据库写入
:param code: 要些入的股票代码
:return: None
"""
# 建立数据库连接
engine = sqlalchemy.create_engine("mysql+pymysql://root:123456@localhost:3306/stock_databases?charset=utf8",
poolclass=NullPool)
df = data_handle(code)
# 写入MySQL数据库
df.to_sql("{}_{}".format(code[:2], code[3:]), con=engine, if_exists="replace", index=True)
print(f"{code}已完成数据库写入。")
? ? ? ? 最后,再用多进程函数把程序跑起来。
import baostock as bs
from func_timeout import func_set_timeout
import datetime
import pandas as pd
from multiprocessing import Pool
import sqlalchemy
from sqlalchemy.pool import NullPool
class DownData:
def __init__(self):
"""定义常用变量"""
# baostock下载日线数据用的表字段
self.baostock_fields = 'date,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ, psTTM,pcfNcfTTM,isST'
def get_stock_codes(self, day=None):
"""
获取沪深两市的股票代码,剔除掉中小板、创业板、北交所和指数代码,
:param day: 指定日期,“YYYY-MM-DD",默认为空,表示今日。
:return: 筛选后的股票代码
"""
# 登录baostock
bs.login()
# 获取股票代码
df = bs.query_all_stock(day=day).get_data()
# 如果数据为空
if len(df) == 0:
# 把日期向前推,直到数据不为空,即为历史最近交易日
delta = 1
while len(df) == 0:
df = bs.query_all_stock(day=datetime.date.today() - datetime.timedelta(delta)).get_data()
delta += 1
# 登出baostock
bs.logout()
# 股票代码筛选
codes_df = df[((df['code'] < 'sz.300000') & (df['code'] > 'sh.689000') |
(df['code'] < 'sh.688000') & (df['code'] > 'sh.009999'))]
print(codes_df.shape[0])
return codes_df['code'].to_list()
# 设置50秒超时
@func_set_timeout(50)
def get_history_data(self, code, start='1990-12-19', end=datetime.date.today().strftime('%Y-%m-%d'),
adjustflag='2', ):
"""
获取股票历史数据
:param code: 股票代码
:param start: 开始日期
:param end: 结束日期
:param adjustflag: 复权类型,默认为 2 ,前复权
:return: 股票历史数据的DataFrame
"""
# 登录baostock
bs.login()
print(f'正在建立{code}的下载链接......')
df = bs.query_history_k_data_plus(code,fields=self.baostock_fields, start_date=start, end_date=end,
frequency='d', adjustflag=adjustflag).get_data()
print(f'{code} is done!')
return df
def data_handle(self, code):
"""
数据处理
:code: 股票代码
:return: 处理后的股票历史数据
"""
df = pd.DataFrame()
i = 0
while df.shape[0] == 0:
i += 1
try:
df = self.get_history_data(code)
except:
print(f'**********************************{code}第{i}次超时了啊****************************************')
# 剔除停盘数据
df = df[(df['volume'] != '0') & (df['volume'] != '')]
# 删除重复数据
df.drop_duplicates(['date'], inplace=True)
# 重置索引
df.reset_index(drop=True, inplace=True)
# 把日期改为datetime型
df['date'] = pd.to_datetime(df['date'])
# 将数值数据转为float型,便于后续处理
convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']
df[convert_list] = df[convert_list].astype(float)
return df
def data_to_mysql(self, code ):
"""
MySQL数据库写入
:param code: 要些入的股票代码
:return: None
"""
# 建立数据库连接
engine = sqlalchemy.create_engine("mysql+pymysql://root:123456@localhost:3306/stock_databases?charset=utf8",
poolclass=NullPool)
df = self.data_handle(code)
# 写入MySQL数据库
df.to_sql("{}_{}".format(code[:2], code[3:]), con=engine, if_exists="replace", index=True)
print(f"{code}已完成数据库写入。")
def mp_func(self, func, ):
"""
多进程函数
:param func: 要运行的函数名
:return:None
"""
# 创建进程池,61个进程是网上看来的,什么科学原理我也不知道
pool = Pool(61)
# 多进程异步计算
for code in self.get_stock_codes():
pool.apply_async(func, args=(code,))
# 阻止后续任务提交到进程池
pool.close()
# 等待所有进程结束
pool.join()
if __name__ == '__main__':
s = DownData()
s.mp_func(s.data_to_mysql)
?运行完后是这样的
|