前言
这是一个influxdb源码分析系列的文章,上一章分析了Store结构,Store是数据库核心功能的一个总体抽象。Store里面包含了其他核心组件,例如Shard,series,index等。本篇文章分析的是Shard结构。
Shard 的定位
Shard是一个物理的概念,在存储时,最小的存储粒度就是Shard。Shard封装了单个分片的存储查询功能。这个结构定义在了tsdb/shard.go中,首先看一下具体的结构:
type Shard struct {
path string
walPath string
id uint64
database string
retentionPolicy string
sfile *SeriesFile
options EngineOptions
mu sync.RWMutex
_engine Engine
index Index
enabled bool
stats *ShardStatistics
defaultTags models.StatisticTags
baseLogger *zap.Logger
logger *zap.Logger
EnableOnOpen bool
CompactionDisabled bool
}
这里面的一些结构在之前的文章都有详细的分析,如series,index,所以这里就不再分析。除此之外,还有一个核心结构:Engine,这个是存储引擎,负责数据真正写入到influxdb中。我们本篇文章说是分析Shard,其实主要是分析Engine结构。
Engine
上面提到了,Engine是shard的存储引擎,负责读写数据用的。和大多数结构一样,为了能够更好地拓展,Engine也定义了一个顶层的抽象,在tsdb/engine.go中。部分结构如下:
type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
ScheduleFullCompaction() error
WithLogger(*zap.Logger)
LoadMetadataIndex(shardID uint64, index Index) error
CreateSnapshot() (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
Digest() (io.ReadCloser, int64, error)
CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
CreateCursorIterator(ctx context.Context) (CursorIterator, error)
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
WritePoints(points []models.Point) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
MeasurementFieldSet() *MeasurementFieldSet
MeasurementFields(measurement []byte) *MeasurementFields
ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
TagKeyCardinality(name, key []byte) int
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool
Free() error
io.WriterTo
}
这里的设计和Index部分很像。不是很了解的可以去看看influxdb源码解析-inmem index 如果想要自己实现一个存储引擎。需要做:
这个流程和index是一致的,对于每个引擎的具体实现:
type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine
var newEngineFuncs = make(map[string]NewEngineFunc)
func RegisterEngine(name string, fn NewEngineFunc) {
if _, ok := newEngineFuncs[name]; ok {
panic("engine already registered: " + name)
}
newEngineFuncs[name] = fn
}
需要注册一个NewEngineFunc到map中(key是engine的名字),作为当前engine的构造函数。使用的时候根据当前启动参数选择不同的engine:
func NewEngine(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) (Engine, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
engine := newEngineFuncs[options.EngineVersion](id, i, path, walPath, sfile, options)
if options.OnNewEngine != nil {
options.OnNewEngine(engine)
}
return engine, nil
}
format := DefaultEngine
if fi, err := os.Stat(path); err != nil {
return nil, err
} else if !fi.Mode().IsDir() {
return nil, ErrUnknownEngineFormat
} else {
format = "tsm1"
}
fn := newEngineFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid engine format: %q", format)
}
engine := fn(id, i, path, walPath, sfile, options)
if options.OnNewEngine != nil {
options.OnNewEngine(engine)
}
return engine, nil
}
具体的实现在tsdb/engine/tsm1下,实现的是一个基于TSM(Time Series Merge Tree)引擎。看一下这里的实现。
TSM Engine
TSM Engine是influxdb实现的一个存储时序数据的存储引擎,结构的定义在tsdb/engine/tsm1/engine.go 这个结构的字段很多,只关心核心逻辑,做一下简化:
type Engine struct {
mu sync.RWMutex
index tsdb.Index
id uint64
path string
sfile *tsdb.SeriesFile
fieldset *tsdb.MeasurementFieldSet
WAL *WAL
Cache *Cache
Compactor *Compactor
CompactionPlan CompactionPlanner
FileStore *FileStore
}
这个结构主要分为三部分
- index 和series 信息。database 粒度的
- id,path,fieldSet 基本信息
- WAL,Cache,Compactor,FileStore TSM 相关。
index和series之前有过介绍,id,path也不再细说。主要来分析 TSM相关的部分。
Engine Cache
Cache部分作为LSM Based的存储引擎是一个基本的组件。主要功能是缓存磁盘上的数据。提供series-> seires Value 的映射。Cache结构位于tsdb/engine/tsm1/cache.go里。
type Cache struct {
mu sync.RWMutex
store storer
snapshot *Cache
snapshotting bool
stats *CacheStatistics
}
简化过后如上,核心字段是两个:store和snapshot。store 是一个抽象的interface,可以有多种实现。snapshot是cache的快照。
storer
storer结构是cache 存储数据的部分。结构定义如下:
type storer interface {
entry(key []byte) *entry
write(key []byte, values Values) (bool, error)
add(key []byte, entry *entry)
remove(key []byte)
keys(sorted bool) [][]byte
apply(f func([]byte, *entry) error) error
applySerial(f func([]byte, *entry) error) error
reset()
split(n int) []storer
count() int
}
这里先插一句,可以看到add 的参数是(key []byte,entry),这个entry是当前这个key对应的value集合,封装了一个结构:
type entry struct {
mu sync.RWMutex
values Values
vtype byte
}
key指的是series key=name+tags+fieldKey
Storer 提供了一些基本函数,例如add,write等。实现crud的功能。storer的实现是:ring.位于tsdb/engine/tsm1/ring.go,具体结构:
type ring struct {
keysHint int64
partitions []*partition
}
ring把数据做了一下分桶,分为多个partition。写入数据是,首先要计算写到哪个partition,这里的逻辑和写series ,计算shard 有点像:
func (r *ring) getPartition(key []byte) *partition {
return r.partitions[int(xxhash.Sum64(key)%uint64(len(r.partitions)))]
}
做了个hash然后取模。partition 结构其实是一个map:
type partition struct {
mu sync.RWMutex
store map[string]*entry
}
所以在得到具体的partition之后,写入到这个partition就成了map数据的添加,这里可能会有并发问题,所以需要加锁
func (p *partition) add(key []byte, entry *entry) {
p.mu.Lock()
p.store[string(key)] = entry
p.mu.Unlock()
}
add 函数不返回值,write函数会返回是否有新的entry写入:
func (p *partition) write(key []byte, values Values) (bool, error) {
p.mu.RLock()
e := p.store[string(key)]
p.mu.RUnlock()
if e != nil {
return false, e.add(values)
}
p.mu.Lock()
defer p.mu.Unlock()
if e = p.store[string(key)]; e != nil {
return false, e.add(values)
}
e, err := newEntryValues(values)
if err != nil {
return false, err
}
p.store[string(key)] = e
return true, nil
}
这里的逻辑也不复杂,首先是加读锁,check数据是不是已经存在,然后写入。到这里一些核心的函数基本就分析完毕了。得到以下结论:
- Cache的真正存数据是交给了storer的一个实现ring
- ring是一个curde hash ring,里面会分多个partition 缓存数据
- partition 本质上市一个map结构,key是series key,value是这个series 对应的value集合。
接下来看看Cache本身的一些函数
Cache Init和Free
Cache的吃初始化和清空
func (c *Cache) init() {
if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) {
return
}
c.mu.Lock()
c.store, _ = newring(ringShards)
c.mu.Unlock()
}
func (c *Cache) Free() {
if !atomic.CompareAndSwapUint32(&c.initializedCount, 1, 0) {
return
}
c.mu.Lock()
c.store = emptyStore{}
c.mu.Unlock()
}
初始化时,核心逻辑是初始化storer,指定ring作为实现。Free时改变了store结构的指向,作为清空数据操作。
Cache Write和WriteMulti
写入单条数据:Write
func (c *Cache) Write(key []byte, values []Value) error {
c.init()
addedSize := uint64(Values(values).Size())
limit := c.maxSize
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
return ErrCacheMemorySizeLimitExceeded(n, limit)
}
newKey, err := c.store.write(key, values)
if err != nil {
atomic.AddInt64(&c.stats.WriteErr, 1)
return err
}
if newKey {
addedSize += uint64(len(key))
}
c.increaseSize(addedSize)
c.updateMemSize(int64(addedSize))
atomic.AddInt64(&c.stats.WriteOK, 1)
return nil
}
这里委托给了store.write执行真正的写入。
写入多条数据:WriteMulti,代码有点长,做一下简化
func (c *Cache) WriteMulti(values map[string][]Value) error {
c.init()
var werr error
c.mu.RLock()
store := c.store
c.mu.RUnlock()
for k, v := range values {
newKey, err := store.write([]byte(k), v)
}
return werr
}
核心逻辑就是遍历values map,循环调用单条写入。说到这里,多提一句,还记得这个函数在哪里调用的吗?在之前的文章里分析写入的时候influxdb 数据写入 倒数第二段:
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}
if pointsWritten, ok := ctx.Value(tsdb.StatPointsWritten).(*int64); ok {
*pointsWritten = npoints
}
if valuesWritten, ok := ctx.Value(tsdb.StatValuesWritten).(*int64); ok {
*valuesWritten = nvalues
}
数据在shard内部的真正写入逻辑,首先是写入到Cache,然后才是写入到WAL。这里调用的就是这个方法,好了,现在任督二脉,快打通了,和之前的连起来了!
Cache Snapshot和ClearSnapshot
snapshot是快照功能,快照的生成和清理,其实对应的就是Cache结构的snapshot字段的赋值和清除。代码比较长,简化一下:
生成快照:
func (c *Cache) Snapshot() (*Cache, error) {
c.init()
c.mu.Lock()
defer c.mu.Unlock()
if c.snapshotting {
return nil, ErrSnapshotInProgress
}
c.snapshotting = true
c.snapshotAttempts++
if c.snapshot == nil {
store, err := newring(ringShards)
if err != nil {
return nil, err
}
c.snapshot = &Cache{
store: store,
}
}
return c.snapshot, nil
}
清理快照:
func (c *Cache) ClearSnapshot(success bool) {
c.init()
c.mu.RLock()
snapStore := c.snapshot.store
c.mu.RUnlock()
if success {
snapStore.reset()
}
}
这个主要依赖了ring的reset 实现,目的是把map里面的所有数据清理掉。
Cache Split
除了这些比较重要的操作之外,Cache还有一个重要的操作:split.这个是分割cache的,后面会提到,Cache会做compact生成TSM File,为了加速这个操作,经常会使用多个goroutine 一起生成,所以会把Cache 分割成多份,每份一个goroutine
func (c *Cache) Split(n int) []*Cache {
if n == 1 {
return []*Cache{c}
}
caches := make([]*Cache, n)
storers := c.store.split(n)
for i := 0; i < n; i++ {
caches[i] = &Cache{
store: storers[i],
}
}
return caches
}
这里委托给了storer的实现,做split。
func (r *ring) split(n int) []storer {
var keys int
storers := make([]storer, n)
for i := 0; i < n; i++ {
storers[i], _ = newring(len(r.partitions))
}
for i, p := range r.partitions {
r := storers[i%n].(*ring)
r.partitions[i] = p
keys += len(p.store)
}
return storers
}
这里会生成多个Cache的实现(ring),然后把当前的partition赋值到这些新的ring里面。
CacheLoader
上面分析了一下Cache的一些核心依赖和函数,那么Cache是怎么初始化的呢?这里的初始化指的不是创建对应的结构,而是灌数据到这个初始化好的结构里面去。这部分逻辑交给了CacheLoader来实现。
type CacheLoader struct {
files []string
Logger *zap.Logger
}
func NewCacheLoader(files []string) *CacheLoader {
return &CacheLoader{
files: files,
Logger: zap.NewNop(),
}
}
CacheLoader结构成员是一个files 数组,代表了WAL文件,然后读取这些WAL 文件,来把数据写入到Cache里。这部分逻辑在CacheLoader的Load函数里,代码有点长,简化一下:
func (cl *CacheLoader) Load(cache *Cache) error {
var r *WALSegmentReader
for _, fn := range cl.files {
if err := func() error {
f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
r = NewWALSegmentReader(f)
for r.Next() {
entry, err := r.Read()
switch t := entry.(type) {
case *WriteWALEntry:
if err := cache.WriteMulti(t.Values); err != nil {
return err
}
case *DeleteRangeWALEntry:
cache.DeleteRange(t.Keys, t.Min, t.Max)
case *DeleteWALEntry:
cache.Delete(t.Keys)
}
}
return r.Close()
}(); err != nil {
return err
}
}
return nil
}
可以看到核心的逻辑是读取WAL,然后根据WAL Entry的类型调用相关的方法,更新cache。
总结
到这里Cache部分算是告一段落。本篇文章主要揭示了:
- Cache的基本结构
- Cache是怎么缓存数据的
- ring的实现和核心函数
- Cache的核心操作
- 从WAL中恢复Cache
下一章会分析一下WAL相关的内容。
|