前言
有个朋友业务需要存大量的流水数据, 还希望可以实时根据流水聚合统计, 需求计算的精度到小数点18位(Es和Doris就可以直接ps了), 正好可以看看很火的clickhouse 不看不知道, 一看吓一跳, 查询速度是真快啊, 数据压缩也是香了一匹, 运维再也不怕磁盘报警了!
简单操作一下
因为我要测精度, 就不用官方示例了, 有兴趣的建议了解一下, 比较全
搭建
直接在 docker仓库 上找到clickhouse的镜像, 拉取就完事了
// 拉取镜像
docker pull yandex/clickhouse-server
// 建一个挂载目录
mkdir -p /data/clickhouse
// 启动
docker run -d -p 8123:8123 -p 9000:9000 --name house --ulimit nofile=262144:262144 --volume=/data/clickhouse:/var/lib/clickhouse yandex/clickhouse-server
建库建表
连接ck
docker exec -it house /bin/bash
clickhouse-client
建库建表
create database if not exists house;
CREATE TABLE house.asset \
( \
`user_id` String, \
`order_id` String, \
`currency` String, \
`from_wallet` UInt16, \
`to_wallet` UInt16, \
`op_type` UInt16, \
`amount` Decimal128(18), \
`c_time` UInt32 \
) \
ENGINE = MergeTree() \
PARTITION BY toYYYYMM(toDateTime(c_time)) \
ORDER BY (c_time) \
SETTINGS index_granularity = 8192;
表结构
- ENGINE = MergeTree() MergeTree系列表引擎的基础表引擎, 使用最为广泛;
- PARTITION BY toYYYYMM(toDateTime(c_time)) 把时间戳转成年月日格式按月进行分区;
- ORDER BY (c_time) ck是支持在插入表时就按照排序存储的; 注意, 如果没有指定 PRIMARY KEY, 则默认用排序字段作为主键;
- SETTINGS index_granularity = 8192 索引粒度, 按默认的8192就行;
可以进入库use house 查看表的结构desc asset
准备数据
既然号称PB数据存储, 我这点字段就以亿为单位开始存吧, 一亿数据也就 5.6G 的csv文件; 使用命令行方式导入
clickhouse-client --query "INSERT INTO house.asset FORMAT CSV" --max_insert_block_size=100000 < data.csv
确认下导入数量 可以看到count 1亿数据用了0.154s;
查询性能
最通用的业务场景应该就是分组聚合了, 那就用下面两条语句来进行测试
select sum(amount) as amount, from_wallet from asset group by from_wallet;
第一次1.183s, 第二次应该是有缓存了 0.172s;
select sum(amount) as amount, from_wallet, to_wallet from asset group by from_wallet, to_wallet; 第一次1.33s, 第二次 0.24s;
那这个性能足以支撑业务了, 因为他们每天的流水数据是千万级别的, 实时查询可以每天做份留存, 然后统计当天实时流水就可以; 留存就每天预计算统计所有历史流水, 保证准确性;
数据压缩
最后值得一提的当然就是数据压缩功能了, 进入数据目录看一下大小
可以看到占用了1.3G的磁盘, 压缩率≈23%! 那如果数据再大点呢? 这块就直接把当前数据在复制两遍试一下 insert into house.asset select * from house.asset
现在是4亿的数据, 磁盘占用5G, 压缩率≈22%!
Go Demo
package main
import (
"fmt"
"github.com/mailru/dbr"
_ "github.com/mailru/go-clickhouse"
"github.com/shopspring/decimal"
"log"
"time"
)
type (
asset struct {
UserId string
OrderId string
Currency string
FromWallet int
ToWallet int
OpType int
Amount decimal.Decimal
CTime int64
}
)
func main() {
connect, err := dbr.Open("clickhouse", "http://127.0.0.1:8123/house", nil)
if err != nil {
log.Fatal(err)
}
sess := connect.NewSession(nil)
_, err = sess.Exec("CREATE TABLE asset (`user_id` String, `order_id` String, `currency` String, " +
"`from_wallet` UInt16, `to_wallet` UInt16, `op_type` UInt16, `amount` Decimal128(18), `c_time` UInt32) " +
"ENGINE = MergeTree() PARTITION BY toYYYYMM(toDateTime(c_time)) ORDER BY (c_time) " +
"SETTINGS index_granularity = 8192;")
if err != nil {
log.Fatal("create table err: " + err.Error())
}
add := sess.InsertInto("asset").Columns("user_id", "order_id", "currency", "from_wallet", "to_wallet",
"op_type", "amount", "c_time")
amount, _ := decimal.NewFromString("0.123456789123456789")
for i := 0; i < 10000; i++ {
add.Record(asset{
UserId: fmt.Sprintf("%d", i),
OrderId: fmt.Sprintf("%d", time.Now().UnixNano()),
Currency: "USD",
FromWallet: 1,
ToWallet: 1,
OpType: 1,
Amount: amount.Add(decimal.NewFromInt(int64(i))),
CTime: time.Now().Unix(),
})
}
res, err := add.Exec()
log.Printf("res: %+v, err: %v", res, err)
countQ := sess.SelectBySql("select count(*) as total from asset")
var total int
_, err = countQ.Load(&total)
log.Printf("err: %v, total: %d", err, total)
var items []struct {
Amount decimal.Decimal `json:"amount"`
FromWallet int `json:"fromWallet"`
}
query := sess.SelectBySql("select sum(amount) as amount, from_wallet from asset group by from_wallet")
if _, err := query.Load(&items); err != nil {
log.Fatal(err)
}
for _, item := range items {
log.Printf("amount: %v, fromWallet: %v", item.Amount, item.FromWallet)
}
}
总结
ck的优劣势百度一搜一大堆, 大家就根据自己的业务场景来选择是否用它吧, 用了就是真香!
官方文档 数据查询 时间操作 可视化工具下载
|