一、设计 听说projection 可以加快clickhouse的查询速度,重新设计表格:
create_sql = f"""CREATE TABLE if not exists {db_name}.{table_name}
(
code UInt64,
date Date,
datetime DateTime,
open Float32,
close Float32,
low Float32,
high Float32,
volume Float64,
money Float64,
factor Float32,
high_limit Float32,
low_limit Float32,
avg Float32,
pre_close Float32,
paused Float32,
open_interest Float64,
PROJECTION {projection_name}
(
SELECT
code,
date,
datetime,
open,
close,
low,
high,
volume,
money,
factor,
high_limit,
low_limit,
avg,
pre_close,
paused,
open_interest
ORDER BY code
)
)
ENGINE = MergeTree()
ORDER BY datetime """
数据量: 二、代码
from clickhouse_driver import Client
import pandas as pd
import os
from datetime import datetime, date
import time
import math
import numpy as np
CLIENT = Client('localhost')
def get_all_files_by_root_sub_dirs(directory, file_type):
data = list()
if os.path.isdir(directory):
dir_list = os.walk(directory)
for (root, sub, files) in dir_list:
for file in files:
path = os.path.join(root, file)
if path.endswith(file_type):
data.append(path)
else:
if directory.endswith(file_type):
data.append(directory)
return data
def get_code_from_csv_file(file):
s = os.path.basename(file)
sp = s.split('_')[0]
if sp.endswith(".csv") or sp.endswith(".CSV"):
code = sp[:-4]
else:
code = sp
return code
def create_table(db_name,table_name):
projection_name = "stock_p"
create_sql = f"""CREATE TABLE if not exists {db_name}.{table_name}
(
code UInt64,
date Date,
datetime DateTime,
open Float32,
close Float32,
low Float32,
high Float32,
volume Float64,
money Float64,
factor Float32,
high_limit Float32,
low_limit Float32,
avg Float32,
pre_close Float32,
paused Float32,
open_interest Float64,
PROJECTION {projection_name}
(
SELECT
code,
date,
datetime,
open,
close,
low,
high,
volume,
money,
factor,
high_limit,
low_limit,
avg,
pre_close,
paused,
open_interest
ORDER BY code
)
)
ENGINE = MergeTree()
ORDER BY datetime """
print(create_sql)
CLIENT.execute(create_sql)
print("create table finished!")
class Code:
def __init__(self,header,body,source,type):
self.header = header
self.body = body
self.source = source
self.type = type
def get_simplecode(self):
return self.header
def get_int_code(self,datasource):
return jq_int_code(self)
def get_asset_type(self):
return self.type
def get_fullcode(self,datasource):
pass
def jq_int_code(code):
if code.source =="XSHG":
return 10000000+int(code.body)
elif code.source == "XSHE":
return 20000000+int(code.body)
else:
return 0
def split_str_code(str_code):
return str_code.split('.', 1 )
def insert_data():
database_name = "my_db"
table_name = 'stock_tb'
dir_path = "/mnt/d/join_quant_data_stock_product/stock/minute/"
files = get_all_files_by_root_sub_dirs(dir_path,".csv")
t0 = time.time()
file_num = 0
files = files[0:50000]
for _file in files:
t_file = time.time()
print(f"{_file} => 第{file_num}个文件, 总共:{len(files)}个!")
block_insert_data = []
df = pd.read_csv(_file)
fullcode = get_code_from_csv_file(_file)
code_body,source = split_str_code(fullcode)
code = Code(code_body,code_body,source,"stock")
intcode = code.get_int_code("jq")
for row in df.itertuples():
_datetime = datetime.strptime(row[1],'%Y-%m-%d %H:%M:%S')
r0 = intcode
r1 = _datetime.date()
r2 = _datetime
r3 = float(row.open)
r4 = float(row.close)
r5 = float(row.low)
r6 = float(row.high)
r7 = float(row.volume)
r8 = float(row.money)
r9 = float(row.factor)
r10 = float(row.high_limit)
r11 = float(row.low_limit)
r12 = float(row.avg)
r13 = float(row.pre_close)
r14 = float(row.paused)
if math.isnan(row.open_interest):
r15 = 0.0
else:
r15 = float(row.open_interest)
_row = list((r0,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15))
block_insert_data.append(_row)
CLIENT.execute(f'INSERT INTO {database_name}.{table_name} VALUES', block_insert_data,types_check=True)
table_info = CLIENT.execute(f'select count(1) from {database_name}.{table_name}')
print(f"clickhouse stock_tb 表信息: {table_info}")
print(f"第{file_num}个文件 总共:{len(files)}个 => {_file}读写完成! cost time:{time.time()-t_file}")
file_num = file_num +1
print(f"文件总共:{file_num} 读写完成! cost time:{time.time()-t0}")
def get_data_from_ch_by_code():
database_name = "my_db"
table_name = 'stock_tb'
startdate = "2010-01-01 00:00:00"
enddate = "2022-02-02 00:00:00"
code = 20000001
start_date = datetime.strptime(startdate,'%Y-%m-%d %H:%M:%S')
end_date = datetime.strptime(enddate,'%Y-%m-%d %H:%M:%S')
query_sql_1 = f"SELECT * FROM {database_name}.{table_name} WHERE code = '{code}' "
data = CLIENT.execute(query_sql_1)
print(f"data : {len(data)}")
return data
def get_data_from_ch_by_code_and_datetime():
database_name = "my_db"
table_name = 'stock_tb'
startdate = "2021-01-01"
enddate = "2022-02-02"
code = 10600036
query_sql = f"SELECT * FROM {database_name}.{table_name} WHERE toDate(datetime) >= toDate('{startdate}') AND toDate(datetime) <= toDate('{enddate}') "
print(query_sql)
data = CLIENT.execute(query_sql)
print(f"data : {len(data)}")
return data
t0 = time.time()
db_name = "my_db"
table_name = "stock_tb"
mode = 1
if mode == -1:
create_table(db_name,table_name)
elif mode == 0:
insert_data()
else:
get_data_from_ch_by_code_and_datetime()
t1 = time.time()
print(f"get_data cost time : {t1-t0} s! ")
三、效果
1、查单只股票code速度很慢。
[Running] python -u "/home/songroom/pyclick/click_test.py"
data : 707280
get_data cost time : 40.04917001724243 s!
[Done] exited with code=0 in 40.841 seconds
2、查多只股票:能查所有个股大约半 年左右的分钟数据
[Running] python -u "/home/songroom/pyclick/click_test.py"
SELECT * FROM my_db.stock_tb WHERE toDate(datetime) >= toDate('2021-08-01') AND toDate(datetime) <= toDate('2022-02-02')
data : 32045280
get_data cost time : 53.07652807235718 s!
[Done] exited with code=0 in 53.738 seconds
[Running] python -u "/home/songroom/pyclick/click_test.py"
SELECT * FROM my_db.stock_tb WHERE toDate(datetime) >= toDate('2021-01-01') AND toDate(datetime) <= toDate('2022-02-02')
Killed
总体上看,效果不太明显。这个和 分区差不多。
|