IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Python股票量化学习(2)——股票历史日线数据下载 -> 正文阅读

[Python知识库]Python股票量化学习(2)——股票历史日线数据下载

? ? ? ? 利用baostock下在股票历史数据

bs.query_history_k_data_plus(code, fields, start_date, end_date, frequency, adjustflag)

? ? ? ? code,股票代码;

? ? ? ? fields,表字段date,open,high,low,close,preclose,volume,amount,adjustflag,

turn,tradestatus,pctChg,peTTM,pbMRQ,psTTM,pcfNcfTTM,isST。也可以选取这其中一部分。

? ? ? ? start_date,起始日期;

? ? ? ? end_date,结束日期;

? ? ? ? frequency,该参数为'd'时,表示下载日线数据;

? ? ? ? adjustflag,复权类型,=’2‘为前复权

要下载全部A股数据,耗时会相当长,因此,这里可以引入多进程模块multiprocessing,但是,运行多进程的时候,又总有进程会莫名的卡死,导致程序即不报错,也不停止。于是,再引入一个超时模块func_timeout.

from multiprocessing import Pool

from func_timeout import func_set_timeout

? ? ? ? ?定义函数get_history_data()用来获取股票历史数据,用func_timeout盯着它。

import baostock as bs
from func_timeout import func_set_timeout

# 设置50秒超时
@func_set_timeout(50)
def get_history_data(code, start='1990-12-19', end=datetime.date.today().strftime('%Y-%m-%d'),
                adjustflag='2', ):
    """
    获取股票历史数据
    :param code: 股票代码
    :param start: 开始日期
    :param end: 结束日期
    :param adjustflag: 复权类型,默认为 2 ,前复权
    :return: 股票历史数据的DataFrame
    """
    # 登录baostock
    bs.login()
    print(f'正在建立{code}的下载链接......')

    df = bs.query_history_k_data_plus(code,fields=self.baostock_fields, start_date=start, end_date=end,
                                          frequency='d', adjustflag=adjustflag).get_data()
    print(f'{code} is done!')

    return df

? ? ? ? ?再定义函数data_handle(),用来检查获取数据是否超时,顺便把相应的数据类型也改一下

import pandas as pd

def data_handle(code):
    """
    数据处理
    :code: 股票代码
    :return: 处理后的股票历史数据
    """
    df = pd.DataFrame()

    i = 0
    while df.shape[0] == 0:
        i += 1
        try:
            df = get_history_data(code)
        except:
            print(f'**********************************{code}第{i}次超时了啊****************************************')

    # 剔除停盘数据
    df = df[(df['volume'] != '0') & (df['volume'] != '')]

    # 删除重复数据
    df.drop_duplicates(['date'], inplace=True)

    # 重置索引
    df.reset_index(drop=True, inplace=True)

    # 把日期改为datetime型
    df['date'] = pd.to_datetime(df['date'])

    # 将数值数据转为float型,便于后续处理
    convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']
    df[convert_list] = df[convert_list].astype(float)

    return df

? ? ? ? 再定义函数data_to_myslq(),把下载来的数据写到MySQL数据库,操作MySQL数据库,我用的是sqlalchemy.。其中create_engine里面有个参数poolclass如果不写的话,运行起来好像更快,但它却不能把所有数据都写完,写着写着就不写了,程序也不报错,还能运行完。设置poolclass=NullPool,能把数据都写完,就是相对慢一点。

import sqlalchemy

def data_to_mysql(code ):
    """
    MySQL数据库写入
    :param code: 要些入的股票代码
    :return: None
    """
    # 建立数据库连接
    engine = sqlalchemy.create_engine("mysql+pymysql://root:123456@localhost:3306/stock_databases?charset=utf8",
                                          poolclass=NullPool)

    df = data_handle(code)

    # 写入MySQL数据库
    df.to_sql("{}_{}".format(code[:2], code[3:]), con=engine, if_exists="replace", index=True)
    print(f"{code}已完成数据库写入。")

? ? ? ? 最后,再用多进程函数把程序跑起来。

import baostock as bs
from func_timeout import func_set_timeout
import datetime
import pandas as pd
from multiprocessing import Pool
import sqlalchemy
from sqlalchemy.pool import NullPool




class DownData:
    def __init__(self):
            """定义常用变量"""
            # baostock下载日线数据用的表字段
            self.baostock_fields = 'date,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ, psTTM,pcfNcfTTM,isST'




    def get_stock_codes(self, day=None):
        """
        获取沪深两市的股票代码,剔除掉中小板、创业板、北交所和指数代码,
        :param day: 指定日期,“YYYY-MM-DD",默认为空,表示今日。
        :return: 筛选后的股票代码
        """

        # 登录baostock
        bs.login()

        # 获取股票代码
        df = bs.query_all_stock(day=day).get_data()

        # 如果数据为空
        if len(df) == 0:

            # 把日期向前推,直到数据不为空,即为历史最近交易日
            delta = 1
            while len(df) == 0:
                df = bs.query_all_stock(day=datetime.date.today() - datetime.timedelta(delta)).get_data()
                delta += 1

        # 登出baostock
        bs.logout()

        # 股票代码筛选
        codes_df = df[((df['code'] < 'sz.300000') & (df['code'] > 'sh.689000') |
                           (df['code'] < 'sh.688000') & (df['code'] > 'sh.009999'))]

        print(codes_df.shape[0])
        return codes_df['code'].to_list()


        # 设置50秒超时
    @func_set_timeout(50)
    def get_history_data(self, code, start='1990-12-19', end=datetime.date.today().strftime('%Y-%m-%d'),
                adjustflag='2', ):
        """
        获取股票历史数据
        :param code: 股票代码
        :param start: 开始日期
        :param end: 结束日期
        :param adjustflag: 复权类型,默认为 2 ,前复权
        :return: 股票历史数据的DataFrame
        """
        # 登录baostock
        bs.login()
        print(f'正在建立{code}的下载链接......')

        df = bs.query_history_k_data_plus(code,fields=self.baostock_fields, start_date=start, end_date=end,
                                          frequency='d', adjustflag=adjustflag).get_data()
        print(f'{code} is done!')

        return df


    def data_handle(self, code):
        """
        数据处理
        :code: 股票代码
        :return: 处理后的股票历史数据
        """
        df = pd.DataFrame()

        i = 0
        while df.shape[0] == 0:
            i += 1
            try:
                df = self.get_history_data(code)
            except:
                print(f'**********************************{code}第{i}次超时了啊****************************************')

        # 剔除停盘数据
        df = df[(df['volume'] != '0') & (df['volume'] != '')]

        # 删除重复数据
        df.drop_duplicates(['date'], inplace=True)

        # 重置索引
        df.reset_index(drop=True, inplace=True)

        # 把日期改为datetime型
        df['date'] = pd.to_datetime(df['date'])

        # 将数值数据转为float型,便于后续处理
        convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']
        df[convert_list] = df[convert_list].astype(float)

        return df





    def data_to_mysql(self, code ):
        """
        MySQL数据库写入
        :param code: 要些入的股票代码
        :return: None
        """
        # 建立数据库连接
        engine = sqlalchemy.create_engine("mysql+pymysql://root:123456@localhost:3306/stock_databases?charset=utf8",
                                          poolclass=NullPool)

        df = self.data_handle(code)

        # 写入MySQL数据库
        df.to_sql("{}_{}".format(code[:2], code[3:]), con=engine, if_exists="replace", index=True)
        print(f"{code}已完成数据库写入。")




    def mp_func(self, func, ):
        """
        多进程函数
        :param func: 要运行的函数名
        :return:None
        """

        # 创建进程池,61个进程是网上看来的,什么科学原理我也不知道
        pool = Pool(61)
        # 多进程异步计算
        for code in self.get_stock_codes():

            pool.apply_async(func, args=(code,))

        # 阻止后续任务提交到进程池
        pool.close()

        # 等待所有进程结束
        pool.join()




if __name__ == '__main__':
    s = DownData()

    s.mp_func(s.data_to_mysql)

?运行完后是这样的

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-09-21 00:24:34  更:2022-09-21 00:25:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 10:26:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码