Python量化数据仓库搭建系列3:数据落库代码封装
本系列教程为量化开发者,提供本地量化金融数据仓库的搭建教程与全套源代码。我们以恒有数(UDATA)金融数据社区为数据源,将金融基础数据落到本地数据库。教程提供全套源代码,包括历史数据下载与增量数据更新,数据更新任务部署与日常监控等操作。
在上一节讲述中,我们封装了Python操作MySQL数据库的自定义类,存为MySQLOperation.py文件;本节内容操作数据库部分,将会调用MySQLOperation中的方法,以及pandas.to_sql和pandas.read_sql的操作。
一、恒有数(UDATA)操作简介
1、获取Token
A、在恒有数官网(https://udata.hs.net)注册并登录,在订阅页面,下单免费的体验套餐;
B、在右上角,头像下拉菜单中,进入总览页面,复制Token;
C、在数据页面,查看数据接口文档,获取接口名称、请求参数、返回参数和Python代码示例;
2、安装hs_udata
pip install hs_udata
使用示例如下:
import hs_udata as hs
hs.set_token(token = 'xxxxxxxxxxxxxxx')
df = hs.stock_quote_minutes(en_prod_code="000001.SZ",begin_date="20210501",end_date="20210601")
df['hs_code']='000001.SZ'
df.head()
其余接口使用过程与之类似;
二、数据落库示例
以股票列表(stock_list)为例,讲解建表、落库、查询等操作;全套代码见本文第三章;
1、准备工作
(1)在MySQL数据库中,创建数据库udata,创建过程见第一讲《Python量化数据仓库搭建系列1:数据库安装与操作》;
(2)在MySQL数据库中,创建数据更新记录表udata.tb_update_records,表结构如下: 建表SQL如下:
CREATE TABLE udata.tb_update_records (
table_name CHAR(40),
data_date CHAR(20),
update_type CHAR(20),
data_number INT(20),
elapsed_time INT(20),
updatetime CHAR(20)
)
(3)将Token与数据库参数写入配置文件DB_MySQL.config,文件内容如下:
[udata]
token='你的Token'
host='127.0.0.1'
port=3306
user='root'
passwd='密码'
db='udata'
(4)读取配置文件中的参数
import configparser
configFilePath = 'DB_MySQL.config'
section = 'udata'
config = configparser.ConfigParser()
config.read(configFilePath)
token = eval(config.get(section=section, option='token'))
host = eval(config.get(section=section, option='host'))
port = int(config.get(section=section, option='port'))
db = eval(config.get(section=section, option='db'))
user = eval(config.get(section=section, option='user'))
passwd = eval(config.get(section=section, option='passwd'))
2、建表
在社区数据页面,查看数据表返回参数字段,以股票列表(stock_list,https://udata.hs.net/datas/202/)为例: 将建表、删除表格、清空数据的SQL,写入配置文件DB_Table.config,文件内容如下:
[tb_stock_list]
DROP_TABLE='DROP TABLE IF EXISTS tb_stock_list'
CREATE_TABLE='''CREATE TABLE tb_stock_list (
secu_code CHAR(20),
hs_code CHAR(20),
secu_abbr CHAR(20),
chi_name CHAR(40),
secu_market CHAR(20),
listed_state CHAR(20),
listed_sector CHAR(20),
updatetime CHAR(20)
)'''
DELETE_DATA = 'truncate table tb_stock_list'
python建表代码如下,MySQLOperation为上一讲封装的方法。
MySQL = MySQLOperation(host, port, db, user, passwd)
configFilePath = 'DB_Table.config'
section = 'tb_stock_list'
config = configparser.ConfigParser()
config.read(configFilePath)
DROP_TABLE = eval(config.get(section=section, option='DROP_TABLE'))
CREATE_TABLE = eval(config.get(section=section, option='CREATE_TABLE'))
DELETE_DATA = eval(config.get(section=section, option='DELETE_DATA'))
MySQL.Execute_Code(DROP_TABLE)
MySQL.Execute_Code(CREATE_TABLE)
3、数据落库
import hs_udata as hs
from sqlalchemy import create_engine
from datetime import datetime
hs.set_token(token)
df = hs.stock_list()
dt = datetime.now()
df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
MySQL.Execute_Code(DELETE_DATA)
engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user,passwd,host,port,db))
df.to_sql(name='tb_stock_list', con=engine, index=False, if_exists='append')
在数据库中查看结果如下:
4、读取数据
import pandas as pdresult = pd.read_sql('''SELECT * FROM tb_stock_list ''', con=engine)print(result.head())
三、落库代码封装
将上述几步操作,封装到一起,定义并调用python中class类的属性和方法。代码中涉及主要技术点如下:
(1)使用pymysql、pandas.to_sql和pandas.read_sql操作MySQL数据库;
(2)使用class类的方法,集成建表、插入数据和查询数据的操作;
(3)使用配置文件的方式,从本地文件中,读取数据库参数与表操作的SQL代码;
(4)使用try容错机制,结合日志函数,将执行日志打印到本地的DB_MySQL_LOG.txt文件;
import pandas as pd
import hs_udata as hs
from MySQLOperation import *
from sqlalchemy import create_engine
from datetime import datetime
import time
import configparser
import logging
import traceback
import warnings
warnings.filterwarnings("ignore")
class TB_Stock_List:
def __init__(self,MySQL_Config,BD_Name,Table_Config,Table_Name):
self.logging = logging
self.logging.basicConfig(filename='DB_MySQL_LOG.txt', level=self.logging.DEBUG
, format='%(asctime)s - %(levelname)s - %(message)s')
configFilePath = MySQL_Config
self.section1 = BD_Name
config = configparser.ConfigParser()
config.read(configFilePath)
self.token = eval(config.get(section=self.section1, option='token'))
self.host = eval(config.get(section=self.section1, option='host'))
self.port = int(config.get(section=self.section1, option='port'))
self.db = eval(config.get(section=self.section1, option='db'))
self.user = eval(config.get(section=self.section1, option='user'))
self.passwd = eval(config.get(section=self.section1, option='passwd'))
self.MySQL = MySQLOperation(self.host, self.port, self.db, self.user, self.passwd)
self.engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(self.user
, self.passwd
, self.host
, self.port
, self.db))
configFilePath = Table_Config
self.section2 = Table_Name
config = configparser.ConfigParser()
config.read(configFilePath)
self.DROP_TABLE_SQL = eval(config.get(section=self.section2, option='DROP_TABLE'))
self.CREATE_TABLE_SQL = eval(config.get(section=self.section2, option='CREATE_TABLE'))
self.DELETE_DATA_SQL = eval(config.get(section=self.section2, option='DELETE_DATA'))
self.logging.info('*********************{0}.{1}*********************'.format(self.section1, self.section2))
def CREATE_TABLE(self):
try:
self.MySQL.Execute_Code('SET FOREIGN_KEY_CHECKS = 0')
self.MySQL.Execute_Code(self.DROP_TABLE_SQL)
self.MySQL.Execute_Code(self.CREATE_TABLE_SQL)
self.logging.info('表{0}.{1},表格创建成功'.format(self.section1,self.section2))
except:
self.logging.info('表{0}.{1},表格创建失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
def UPDATE_DATA(self):
try:
hs.set_token(self.token)
time_start = time.time()
df = hs.stock_list()
dt = datetime.now()
df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
self.MySQL.Execute_Code(self.DELETE_DATA_SQL)
df.to_sql(name='tb_stock_list', con=self.engine, index=False, if_exists='append')
time_end = time.time()
elapsed_time = round(time_end-time_start,2)
self.RECORDS_SQL = '''INSERT INTO udata.tb_update_records
VALUES ('{0}','{1}','全量',{2},{3}, SYSDATE())'''.format(self.section2
,dt.strftime('%Y-%m-%d'),len(df),elapsed_time)
self.MySQL.Execute_Code(self.RECORDS_SQL)
self.logging.info('表{0}.{1},数据更新成功'.format(self.section1,self.section2))
except:
self.logging.info('表{0}.{1},数据更新失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
def READ_DATA(self, WhereCondition=''):
try:
result = pd.read_sql('''SELECT * FROM {0}.{1} '''.format(self.section1,self.section2)
+ WhereCondition, con=self.engine)
self.logging.info('表{0}.{1},数据读取成功'.format(self.section1,self.section2))
return result
except:
self.logging.info('表{0}.{1},数据读取失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
return 0
if __name__ == '__main__':
MySQL_Config = 'DB_MySQL.config'
BD_Name = 'udata'
Table_Config = 'DB_Table.config'
Table_Name = 'tb_stock_list'
TB_Stock_List_Main = TB_Stock_List(MySQL_Config,BD_Name,Table_Config,Table_Name)
TB_Stock_List_Main.CREATE_TABLE()
TB_Stock_List_Main.UPDATE_DATA()
data = TB_Stock_List_Main.READ_DATA()
四、代码及配置文件下载
(1)源代码文件目录:
DB_MySQL.config
DB_Table.config
MySQLOperation.py
TB_Stock_List.py
(2)下载文件后,将配置文件DB_MySQL.config中的Token与数据库参数,修改为自己的参数;将代码放置在同一目录下执行。
代码文件见:https://developer.hs.net/thread/1501
下一节《Python量化数据仓库搭建系列4:股票数据落库》
|