fabric1.0学习笔记(1) fabric1.0学习笔记(2) fabric1.0学习笔记(3) fabric1.0学习笔记(4) fabric1.0学习笔记(5)
本次接学习笔记(4),深入了解账本存储的内容 分为四个主要部分
- 账本存储接口定义
- 交易读写集
- 状态数据库及历史状态数据库
- 区块文件存储及区块索引
一、账本存储中的主要接口定义 fabric为账本存储相关的方法封装了6个接口 分别是
- PeerLedgerProvider对账本实例操作的方法接口
- PeerLedger账本中的方法接口包括账本对象及获取交易、获取区块等方法
- ValidatedLedger有效账本,也就是账本文件存储接口,只有账本对象,没有其他方法
- QueryExecutor账本查询方法接口如获取世界状态等
- HistoryQueryExecutor历史查询方法接口只有一个获取key的变动历史
- TxSimulator交易模拟方法接口,是交易执行的上下文,包括了前面的QueryExecutor、一些对世界状态的操作,以及一个返回交易模拟执行的结果即导出交易读写集
相关代码在ledger路径下 ledger_interface.go
type PeerLedgerProvider interface {
Create(genesisBlock *common.Block) (PeerLedger, error)
Open(ledgerID string) (PeerLedger, error)
Exists(ledgerID string) (bool, error)
List() ([]string, error)
Close()
}
type PeerLedger interface {
commonledger.Ledger
GetTransactionByID(txID string) (*peer.ProcessedTransaction, error)
GetBlockByHash(blockHash []byte) (*common.Block, error)
GetBlockByTxID(txID string) (*common.Block, error)
GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
NewTxSimulator() (TxSimulator, error)
NewQueryExecutor() (QueryExecutor, error)
NewHistoryQueryExecutor() (HistoryQueryExecutor, error)
Prune(policy commonledger.PrunePolicy) error
}
type ValidatedLedger interface {
commonledger.Ledger
}
type QueryExecutor interface {
GetState(namespace string, key string) ([]byte, error)
GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error)
GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error)
ExecuteQuery(namespace, query string) (commonledger.ResultsIterator, error)
Done()
}
type HistoryQueryExecutor interface {
GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error)
}
type TxSimulator interface {
QueryExecutor
SetState(namespace string, key string, value []byte) error
DeleteState(namespace string, key string) error
SetStateMultipleKeys(namespace string, kvs map[string][]byte) error
ExecuteUpdate(query string) error
GetTxSimulationResults() ([]byte, error)
}
接下来是构建读写集
lockbased_tx_simulator.go
type lockBasedTxSimulator struct {
lockBasedQueryExecutor
rwsetBuilder *rwsetutil.RWSetBuilder
}
主要实现还是在RWSetBuilder中,进入RWSetBuilder看一下,主要包括的是对读写集的添加和导出操作
rwset_builder.go
type RWSetBuilder struct {
rwMap map[string]*nsRWs
}
func NewRWSetBuilder() *RWSetBuilder {
return &RWSetBuilder{make(map[string]*nsRWs)}
}
func (rws *RWSetBuilder) AddToReadSet(ns string, key string, version *version.Height) {
nsRWs := rws.getOrCreateNsRW(ns)
nsRWs.readMap[key] = NewKVRead(key, version)
}
func (rws *RWSetBuilder) AddToWriteSet(ns string, key string, value []byte) {
nsRWs := rws.getOrCreateNsRW(ns)
nsRWs.writeMap[key] = newKVWrite(key, value)
}
func (rws *RWSetBuilder) AddToRangeQuerySet(ns string, rqi *kvrwset.RangeQueryInfo) {
nsRWs := rws.getOrCreateNsRW(ns)
key := rangeQueryKey{rqi.StartKey, rqi.EndKey, rqi.ItrExhausted}
_, ok := nsRWs.rangeQueriesMap[key]
if !ok {
nsRWs.rangeQueriesMap[key] = rqi
nsRWs.rangeQueriesKeys = append(nsRWs.rangeQueriesKeys, key)
}
}
func (rws *RWSetBuilder) GetTxReadWriteSet() *TxRwSet {
txRWSet := &TxRwSet{}
sortedNamespaces := util.GetSortedKeys(rws.rwMap)
for _, ns := range sortedNamespaces {
nsReadWriteMap := rws.rwMap[ns]
var reads []*kvrwset.KVRead
sortedReadKeys := util.GetSortedKeys(nsReadWriteMap.readMap)
for _, key := range sortedReadKeys {
reads = append(reads, nsReadWriteMap.readMap[key])
}
var writes []*kvrwset.KVWrite
sortedWriteKeys := util.GetSortedKeys(nsReadWriteMap.writeMap)
for _, key := range sortedWriteKeys {
writes = append(writes, nsReadWriteMap.writeMap[key])
}
var rangeQueriesInfo []*kvrwset.RangeQueryInfo
rangeQueriesMap := nsReadWriteMap.rangeQueriesMap
for _, key := range nsReadWriteMap.rangeQueriesKeys {
rangeQueriesInfo = append(rangeQueriesInfo, rangeQueriesMap[key])
}
kvRWs := &kvrwset.KVRWSet{Reads: reads, Writes: writes, RangeQueriesInfo: rangeQueriesInfo}
nsRWs := &NsRwSet{ns, kvRWs}
txRWSet.NsRwSets = append(txRWSet.NsRwSets, nsRWs)
}
return txRWSet
}
二、交易读写集 交易读写集就是告诉区块链交易对哪些数据进行了读操作,对哪些数据进行了写操作或更新操作 包括读集、写集和版本号
- 构建读写集
相关代码在ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr路径下 lockbased_tx_simulator.go
type lockBasedTxSimulator struct {
lockBasedQueryExecutor
rwsetBuilder *rwsetutil.RWSetBuilder
}
主要实现还是在RWSetBuilder中,进入RWSetBuilder看一下,主要包括的是对读写集的添加和导出操作 rwset_builder.go
type RWSetBuilder struct {
rwMap map[string]*nsRWs
}
func NewRWSetBuilder() *RWSetBuilder {
return &RWSetBuilder{make(map[string]*nsRWs)}
}
func (rws *RWSetBuilder) AddToReadSet(ns string, key string, version *version.Height) {
nsRWs := rws.getOrCreateNsRW(ns)
nsRWs.readMap[key] = NewKVRead(key, version)
}
func (rws *RWSetBuilder) AddToWriteSet(ns string, key string, value []byte) {
nsRWs := rws.getOrCreateNsRW(ns)
nsRWs.writeMap[key] = newKVWrite(key, value)
}
func (rws *RWSetBuilder) AddToRangeQuerySet(ns string, rqi *kvrwset.RangeQueryInfo) {
nsRWs := rws.getOrCreateNsRW(ns)
key := rangeQueryKey{rqi.StartKey, rqi.EndKey, rqi.ItrExhausted}
_, ok := nsRWs.rangeQueriesMap[key]
if !ok {
nsRWs.rangeQueriesMap[key] = rqi
nsRWs.rangeQueriesKeys = append(nsRWs.rangeQueriesKeys, key)
}
}
func (rws *RWSetBuilder) GetTxReadWriteSet() *TxRwSet {
txRWSet := &TxRwSet{}
sortedNamespaces := util.GetSortedKeys(rws.rwMap)
for _, ns := range sortedNamespaces {
nsReadWriteMap := rws.rwMap[ns]
var reads []*kvrwset.KVRead
sortedReadKeys := util.GetSortedKeys(nsReadWriteMap.readMap)
for _, key := range sortedReadKeys {
reads = append(reads, nsReadWriteMap.readMap[key])
}
var writes []*kvrwset.KVWrite
sortedWriteKeys := util.GetSortedKeys(nsReadWriteMap.writeMap)
for _, key := range sortedWriteKeys {
writes = append(writes, nsReadWriteMap.writeMap[key])
}
var rangeQueriesInfo []*kvrwset.RangeQueryInfo
rangeQueriesMap := nsReadWriteMap.rangeQueriesMap
for _, key := range nsReadWriteMap.rangeQueriesKeys {
rangeQueriesInfo = append(rangeQueriesInfo, rangeQueriesMap[key])
}
kvRWs := &kvrwset.KVRWSet{Reads: reads, Writes: writes, RangeQueriesInfo: rangeQueriesInfo}
nsRWs := &NsRwSet{ns, kvRWs}
txRWSet.NsRwSets = append(txRWSet.NsRwSets, nsRWs)
}
return txRWSet
}
以上是交易读写集的构造,接下来看交易读写集的验证 相关文件在ledger/kvledger/txmgmt/validator/statebasedval目录下 state_based_validator.go
func (v *Validator) validateEndorserTX(envBytes []byte, doMVCCValidation bool, updates *statedb.UpdateBatch) (*rwsetutil.TxRwSet, peer.TxValidationCode, error) {
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return nil, peer.TxValidationCode_NIL_TXACTION, nil
}
txRWSet := &rwsetutil.TxRwSet{}
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return nil, peer.TxValidationCode_INVALID_OTHER_REASON, nil
}
txResult := peer.TxValidationCode_VALID
if doMVCCValidation {
if txResult, err = v.validateTx(txRWSet, updates); err != nil {
return nil, txResult, err
} else if txResult != peer.TxValidationCode_VALID {
txRWSet = nil
}
}
return txRWSet, txResult, err
}
func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*statedb.UpdateBatch, error) {
logger.Debugf("New block arrived for validation:%#v, doMVCCValidation=%t", block, doMVCCValidation)
updates := statedb.NewUpdateBatch()
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for txIndex, envBytes := range block.Data.Data {
if txsFilter.IsInvalid(txIndex) {
logger.Warningf("Block [%d] Transaction index [%d] marked as invalid by committer. Reason code [%d]",
block.Header.Number, txIndex, txsFilter.Flag(txIndex))
continue
}
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return nil, err
}
payload, err := putils.GetPayload(env)
if err != nil {
return nil, err
}
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, err
}
txType := common.HeaderType(chdr.Type)
if txType != common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Skipping mvcc validation for Block [%d] Transaction index [%d] because, the transaction type is [%s]",
block.Header.Number, txIndex, txType)
continue
}
txRWSet, txResult, err := v.validateEndorserTX(envBytes, doMVCCValidation, updates)
if err != nil {
return nil, err
}
txsFilter.SetFlag(txIndex, txResult)
if txRWSet != nil {
committingTxHeight := version.NewHeight(block.Header.Number, uint64(txIndex))
addWriteSetToBatch(txRWSet, committingTxHeight, updates)
txsFilter.SetFlag(txIndex, peer.TxValidationCode_VALID)
}
if txsFilter.IsValid(txIndex) {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator",
block.Header.Number, txIndex, chdr.TxId)
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%d]",
block.Header.Number, txIndex, chdr.TxId, txsFilter.Flag(txIndex))
}
}
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
return updates, nil
}
func addWriteSetToBatch(txRWSet *rwsetutil.TxRwSet, txHeight *version.Height, batch *statedb.UpdateBatch) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
if kvWrite.IsDelete {
batch.Delete(ns, kvWrite.Key, txHeight)
} else {
batch.Put(ns, kvWrite.Key, kvWrite.Value, txHeight)
}
}
}
}
func (v *Validator) validateTx(txRWSet *rwsetutil.TxRwSet, updates *statedb.UpdateBatch) (peer.TxValidationCode, error) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
if valid, err := v.validateReadSet(ns, nsRWSet.KvRwSet.Reads, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_MVCC_READ_CONFLICT, nil
}
if valid, err := v.validateRangeQueries(ns, nsRWSet.KvRwSet.RangeQueriesInfo, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_PHANTOM_READ_CONFLICT, nil
}
}
return peer.TxValidationCode_VALID, nil
}
func (v *Validator) validateReadSet(ns string, kvReads []*kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
for _, kvRead := range kvReads {
if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
上面的一段是验证的外围操作,主要是判读交易是否需要验证以及验证成功后将交易放入待更新的状态集中、将一个区块中的所有的交易状态保存到区块的元数据中 核心的验证代码在validateKVRead中
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
if updates.Exists(ns, kvRead.Key) {
return false, nil
}
versionedValue, err := v.db.GetState(ns, kvRead.Key)
if err != nil {
return false, err
}
var committedVersion *version.Height
if versionedValue != nil {
committedVersion = versionedValue.Version
}
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]",
ns, kvRead.Key, committedVersion, kvRead.Version)
return false, nil
}
return true, nil
}
主要验证逻辑是:首先判断状态更新集里有没有与读集相同的内容,有则说明之前的交易修改了该键值对,该交易判定为无效,如果没有则读取世界状态中读集对应的版本号,判断交易中的版本号与世界状态中的版本号是否一致,如果一致则为有效交易,否则为无效交易。
三、状态数据库及历史状态数据库 二者是不同的概念,分别对应世界状态和历史数据索引,历史状态数据库保存操作包含在哪个区块,并不保存值,只保存值变动的动作。
状态数据库要解决三个问题:
- 如何联系智能合约中的键和底层存储的键值对
- 如何持久化区块信息
- 如何标识最新的区块信息
第一个问题:状态数据库通过保存和读取由通道名、链码名、对象名及分隔符组成的组合键来建立智能合约中的键与底层存储的键值对之间的关系
相关代码在ledger/kvledger/txmgmt/stateleveldb目录下 stateleveldb.go
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
compositeKey := constructCompositeKey(namespace, key)
dbVal, err := vdb.db.Get(compositeKey)
if err != nil {
return nil, err
}
if dbVal == nil {
return nil, nil
}
val, ver := statedb.DecodeValue(dbVal)
return &statedb.VersionedValue{Value: val, Version: ver}, nil
}
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
vals := make([]*statedb.VersionedValue, len(keys))
for i, key := range keys {
val, err := vdb.GetState(namespace, key)
if err != nil {
return nil, err
}
vals[i] = val
}
return vals, nil
}
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
compositeStartKey := constructCompositeKey(namespace, startKey)
compositeEndKey := constructCompositeKey(namespace, endKey)
if endKey == "" {
compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
}
dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
return newKVScanner(namespace, dbItr), nil
}
func (vdb *versionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {
return nil, errors.New("ExecuteQuery not supported for leveldb")
}
以上是对处理、保存、读取组合键的相关方法
第二、三个问题:区块信息的持久化、标识最新的区块信息 相关代码在同一个路径下(ledger/kvledger/txmgmt/stateleveldb)
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
dbBatch := leveldbhelper.NewUpdateBatch()
namespaces := batch.GetUpdatedNamespaces()
for _, ns := range namespaces {
updates := batch.GetUpdates(ns)
for k, vv := range updates {
compositeKey := constructCompositeKey(ns, k)
logger.Debugf("Channel [%s]: Applying key=[%#v]", vdb.dbName, compositeKey)
if vv.Value == nil {
dbBatch.Delete(compositeKey)
} else {
dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
}
}
}
dbBatch.Put(savePointKey, height.ToBytes())
if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
return err
}
return nil
}
状态数据库的实现代码,包括几个查询方法、一个更新方法以及一些辅助方法(如智能合约中的键值对与状态数据库中键值对进行转化的方法)。状态数据库中的键是以智能合约的通道名、智能合约的名字以及智能合约中的键拼接在一起作为键,值则是智能合约中的值以及对应的状态版本为值。 以上是区块持久化及标识最新区块的方法,状态数据库的点基本上就在上面的三个问题里了。
再看历史数据库的实现方法 同样有两个问题需要解决:
- 如何标识某个键是在哪一个交易中被改变的
- 如何查询某个键的变动历史
第一个问题:与状态数据库保存智能合约键的方法类似,也是通过组合键的形式,具体是通过由键和区块id,交易id组成的组合键来完成的
func (historyDB *historyDB) Commit(block *common.Block) error {
blockNo := block.Header.Number
var tranNo uint64
dbBatch := leveldbhelper.NewUpdateBatch()
logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions",
historyDB.dbName, blockNo, len(block.Data.Data))
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for _, envBytes := range block.Data.Data {
if txsFilter.IsInvalid(int(tranNo)) {
logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d",
historyDB.dbName, tranNo)
tranNo++
continue
}
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return err
}
payload, err := putils.GetPayload(env)
if err != nil {
return err
}
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err
}
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return err
}
txRWSet := &rwsetutil.TxRwSet{}
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return err
}
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
writeKey := kvWrite.Key
compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)
dbBatch.Put(compositeHistoryKey, emptyValue)
}
}
} else {
logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
}
tranNo++
}
height := version.NewHeight(blockNo, tranNo)
dbBatch.Put(savePointKey, height.ToBytes())
if err := historyDB.db.WriteBatch(dbBatch, true); err != nil {
return err
}
logger.Debugf("Channel [%s]: Updates committed to history database for blockNo [%v]", historyDB.dbName, blockNo)
return nil
}
第二个问题:它解决的是标识key是在哪一笔交易中修改的,具体解决方法是通过保存通道名,智能合约名,智能合约中的键,区块编号以及交易编号的组合键来标识。
获取历史信息
historyleveldb_query_executer.go
func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) {
if ledgerconfig.IsHistoryDBEnabled() == false {
return nil, errors.New("History tracking not enabled - historyDatabase is false")
}
var compositeStartKey []byte
var compositeEndKey []byte
compositeStartKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, false)
compositeEndKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, true)
dbItr := q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
return newHistoryScanner(compositeStartKey, namespace, key, dbItr, q.blockStore), nil
}
type historyScanner struct {
compositePartialKey []byte
namespace string
key string
dbItr iterator.Iterator
blockStore blkstorage.BlockStore
}
func newHistoryScanner(compositePartialKey []byte, namespace string, key string,
dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner {
return &historyScanner{compositePartialKey, namespace, key, dbItr, blockStore}
}
func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
if !scanner.dbItr.Next() {
return nil, nil
}
historyKey := scanner.dbItr.Key()
_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)
blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
scanner.namespace, scanner.key, blockNum, tranNum)
tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
queryResult, err := getKeyModificationFromTran(tranEnvelope, scanner.namespace, scanner.key)
if err != nil {
return nil, err
}
logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
scanner.namespace, scanner.key, queryResult.(*queryresult.KeyModification).TxId)
return queryResult, nil
}
首先根据namespace(通道id、智能合约名)智能合约键查询所有相关的记录,根据查询出的内容找到区块高度和交易i,然后获取envelope再根据envelope获取交易的历史信息(交易id、更新值、时间戳等)
最后是区块文件存储的代码 相关文件在common/ledger/blkstorage目录下 blockstorage.go
type fsBlockStore struct {
id string
conf *Conf
fileMgr *blockfileMgr
}
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
dbHandle *leveldbhelper.DBHandle) *fsBlockStore {
return &fsBlockStore{id, conf, newBlockfileMgr(id, conf, indexConfig, dbHandle)}
}
func (store *fsBlockStore) AddBlock(block *common.Block) error {
return store.fileMgr.addBlock(block)
}
func (store *fsBlockStore) GetBlockchainInfo() (*common.BlockchainInfo, error) {
return store.fileMgr.getBlockchainInfo(), nil
}
func (store *fsBlockStore) RetrieveBlocks(startNum uint64) (ledger.ResultsIterator, error) {
var itr *blocksItr
var err error
if itr, err = store.fileMgr.retrieveBlocks(startNum); err != nil {
return nil, err
}
return itr, nil
}
func (store *fsBlockStore) RetrieveBlockByHash(blockHash []byte) (*common.Block, error) {
return store.fileMgr.retrieveBlockByHash(blockHash)
}
func (store *fsBlockStore) RetrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
return store.fileMgr.retrieveBlockByNumber(blockNum)
}
func (store *fsBlockStore) RetrieveTxByID(txID string) (*common.Envelope, error) {
return store.fileMgr.retrieveTransactionByID(txID)
}
func (store *fsBlockStore) RetrieveTxByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
return store.fileMgr.retrieveTransactionByBlockNumTranNum(blockNum, tranNum)
}
func (store *fsBlockStore) RetrieveBlockByTxID(txID string) (*common.Block, error) {
return store.fileMgr.retrieveBlockByTxID(txID)
}
func (store *fsBlockStore) RetrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
return store.fileMgr.retrieveTxValidationCodeByTxID(txID)
}
func (store *fsBlockStore) Shutdown() {
logger.Debugf("closing fs blockStore:%s", store.id)
store.fileMgr.close()
}
主要包括了区块增加、查询的方法 然后区块文件流,(但是对流的概念不怎么理解)
对账本存储进行一个总结:
-
账本存储接口定义:从整体上把握账本存储的方法 -
交易读写集:交易读写集的建立、校验 -
状态数据库及历史状态数据库:组合键 -
区块文件存储及区块索引:基于文件系统
主要接口方法:
- peerledgerprovider:提供账本层面的方法如创建账本、打开账本查询账本等,一个账本对应三个数据状态数据、历史数据、区块数据。有点类似排序节点manager
- peerledger:提供对账本的操作,如对区块的查询、对状态的查询等,对状态和历史状态的查询分别使用queryexecutor和historyqueryexecutor实现的。
- txsimulator:交易模拟器,背书节点执行链码时底层的支持
读写集的概念:是防止双花的关键点,底层实现并不麻烦,就是进行操作前对状态值和版本号进行校验。
状态数据库:目前支持两种数据库couchdb、leveldb,couchdb支持模糊查询而leveldb不支持,二者可以在开发过程中切换,有三个重要问题:
- 1、如何关联智能合约键值对与底层存储的键值对(数据隔离) 用组合键保存
- 2、如何持久化区块的状态信息 交易读写集校验后生成状态更新集可用于状态信息的更新
- 3、如何标识最新存储的区块编号 在更新状态信息的同时记录最新区块的编号
历史状态数据库:只保存操作包含在哪个区块即不保存值,只保存值变动的动作 两个重要问题:
- 1、标识某key被某交易改变 也是通过保存组合键来标识
- 2、如何查询某key的变动历史 组合键的前缀匹配查询
区块文件存储分为两块:区块的存储、区块索引的存储
|