目的:获取Tushare中的历史数据以及实时金融数据,并将其导入本地数据库中。
#====================历史数据(df)导入=========================
import pandas as pd
import numpy as np
import tushare as ts
import datetime
import pymysql
#=================Tushare数据的获得===================
连接Tushare
pro = ts.pro_api('*********')
#获取最新交易日期
latest_trade_day = pro.daily_basic\(ts_code='000001.SZ',fields='trade_date').trade_date[0]
trade_cal = pro.trade_cal(exchange='', start_date='19900101', end_date='20210929')
trade_date = trade_cal[(trade_cal.is_open==1)&(trade_cal.cal_date>='20210101')]
trade_date = trade_date.cal_date
begin = trade_date.iloc[0]
#获得最新股票列表
df = pro.daily_basic\
(ts_code='',trade_date = begin,\
fields='ts_code,trade_date,close,turnover_rate,volume_ratio,pe,pe_ttm,pb,ps,ps_ttm,dv_ratio,dv_ttm,total_share,float_share,free_share,total_mv,circ_mv')
#获取总股票数据列表
for i in list(trade_date):
df1 = pro.daily_basic\
(ts_code='',trade_date = i,fields='ts_code,trade_date,close,turnover_rate,volume_ratio,pe,pe_ttm,pb,ps,ps_ttm,dv_ratio,dv_ttm,total_share,float_share,free_share,total_mv,circ_mv')
df = pd.concat([df,df1],ignore_index = True) #默认纵向合并,行数增加
#========================================================================
# 参数设置 DictCursor使输出为字典模式
config = dict(host='localhost', user='root', password='lhy20020127',
cursorclass=pymysql.cursors.DictCursor
)
# 建立连接
conn = pymysql.Connect(**config)
# 自动确认commit True
conn.autocommit(1)
# 设置光标
cursor = conn.cursor()
#========================================================================
df_1 = df.where(df.notnull(),None)
# 一个根据pandas自动识别type来设定table的type
def make_table_sql(df): #传入dataframe
columns = df.columns.tolist()
#df.dtypes 返回每一列的数据类型
types = df.dtypes
# 添加id 制动递增主键模式
make_table = []
for item in columns:
if 'int' in str(types[item]):
char = item + ' INT' + ' null'
elif 'float' in str(types[item]):
char = item + ' FLOAT' + ' null'
elif 'object' in str(types[item]):
char = item + ' VARCHAR(255)' + ' null'
elif 'datetime' in str(types[item]):
char = item + ' DATETIME' + ' null'
make_table.append(char)
return ','.join(make_table) #返回字符串
#=============输入 mysql 中===============
def my_sql(db_name, table_name, df):
#传入数据库名字(注意为字符串),表名和要传输的dataframe
# 创建database
cursor.execute('CREATE DATABASE IF NOT EXISTS {}'.format(db_name))
# 选择连接database
cursor.execute('USE {}'.format(db_name))
# 创建新table
cursor.execute('CREATE TABLE IF NOT EXISTS {}({})'.format(table_name,make_table_sql(df)))
#========================================================================
# 提取数据转list 这里由于pandas时间模式无法写入因此换成str
lst = []
for i in df.dtypes:
lst.append(str(i))
for i in lst:
if 'datetime' in i:
s = df.columns[lst.index(i)]
df[s] = df[s].astype('str')
#========================================================================
values = df.values.tolist()
# 根据columns个数
s = ','.join(['%s' for i in range(len(df.columns))])
# executemany批量操作 插入数据 批量操作比逐个操作速度快很多
cursor.executemany('INSERT INTO {} VALUES ({})'.format(table_name,s), values)
my_sql('test','test_df',df_1)
# 光标关闭
cursor.close()
# 连接关闭
conn.close()
检验
#连接数据库
mysql -u root -p
#输入密码
show databases;
?运行程序
?
?
运行程序之后的数据库插入了Tushare金融数据
?
Tushare金融数据成功存入mysql数据库
?
?
|