IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 区块链 -> fabric1.0学习笔记(5) -> 正文阅读

[区块链]fabric1.0学习笔记(5)

fabric1.0学习笔记(1)
fabric1.0学习笔记(2)
fabric1.0学习笔记(3)
fabric1.0学习笔记(4)
fabric1.0学习笔记(5)

本次接学习笔记(4),深入了解账本存储的内容
分为四个主要部分

  • 账本存储接口定义
  • 交易读写集
  • 状态数据库及历史状态数据库
  • 区块文件存储及区块索引

一、账本存储中的主要接口定义
fabric为账本存储相关的方法封装了6个接口
分别是

  • PeerLedgerProvider对账本实例操作的方法接口
  • PeerLedger账本中的方法接口包括账本对象及获取交易、获取区块等方法
  • ValidatedLedger有效账本,也就是账本文件存储接口,只有账本对象,没有其他方法
  • QueryExecutor账本查询方法接口如获取世界状态等
  • HistoryQueryExecutor历史查询方法接口只有一个获取key的变动历史
  • TxSimulator交易模拟方法接口,是交易执行的上下文,包括了前面的QueryExecutor、一些对世界状态的操作,以及一个返回交易模拟执行的结果即导出交易读写集
    相关代码在ledger路径下
    ledger_interface.go
// PeerLedgerProvider provides handle to ledger instances
type PeerLedgerProvider interface {
	// Create creates a new ledger with the given genesis block.
	// This function guarantees that the creation of ledger and committing the genesis block would an atomic action
	// The chain id retrieved from the genesis block is treated as a ledger id
	//根据创世区块创建一个新的账本,创世区块中的通道id作为账本id
	Create(genesisBlock *common.Block) (PeerLedger, error)
	// Open opens an already created ledger
	//根据账本id打开一个已经存在的账本
	Open(ledgerID string) (PeerLedger, error)
	// Exists tells whether the ledger with given id exists
	//判断某个账本id对应的账本是否存在
	Exists(ledgerID string) (bool, error)
	// List lists the ids of the existing ledgers
	//列出所有的账本id
	List() ([]string, error)
	// Close closes the PeerLedgerProvider
	//关闭接口?
	Close()
}

// PeerLedger differs from the OrdererLedger in that PeerLedger locally maintain a bitmask
 that tells apart valid transactions from invalid ones
//peer节点的账本和orderer节点的账本有所区别,peer会检验交易是否有效
//peerledgerprovider接口创建的账本都是创建的该对象
type PeerLedger interface {
	//账本的文件存储
	commonledger.Ledger
	// GetTransactionByID retrieves a transaction by id
	//根据交易id返回对应的交易
	GetTransactionByID(txID string) (*peer.ProcessedTransaction, error)
	// GetBlockByHash returns a block given it's hash
	//根据区块哈希返回对应的区块
	GetBlockByHash(blockHash []byte) (*common.Block, error)
	// GetBlockByTxID returns a block which contains a transaction
	//根据交易id返回交易所在的区块
	GetBlockByTxID(txID string) (*common.Block, error)
	// GetTxValidationCodeByTxID returns reason code of transaction validation
	//根据交易id返回对应交易的验证码(状态,有效、无效)
	GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
	//以上四个是账本的索引

	// NewTxSimulator gives handle to a transaction simulator.
	// A client can obtain more than one 'TxSimulator's for parallel execution.
	// Any snapshoting/synchronization should be performed at the implementation level if required
	//生成新的交易模拟器,可以理解为交易的上下文,封装了一些对世界状态的操作
	NewTxSimulator() (TxSimulator, error)
	// NewQueryExecutor gives handle to a query executor.
	// A client can obtain more than one 'QueryExecutor's for parallel execution.
	// Any synchronization should be performed at the implementation level if required
	//新的查询执行器,查询状态数据库
	NewQueryExecutor() (QueryExecutor, error)
	// NewHistoryQueryExecutor gives handle to a history query executor.
	// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.
	// Any synchronization should be performed at the implementation level if required
	//新的历史查询器,查询历史数据库
	NewHistoryQueryExecutor() (HistoryQueryExecutor, error)
	//Prune prunes the blocks/transactions that satisfy the given policy
	//根据策略对区块或交易进行裁剪,1.0是一个空方法
	Prune(policy commonledger.PrunePolicy) error
}

// ValidatedLedger represents the 'final ledger' after filtering out invalid transactions from PeerLedger.
// Post-v1
//经检验有效的账本,即最终账本,指代账本文件存储
type ValidatedLedger interface {
	commonledger.Ledger
}

// QueryExecutor executes the queries
// Get* methods are for supporting KV-based data model. ExecuteQuery method is for supporting a rich datamodel and query support
//
// ExecuteQuery method in the case of a rich data model is expected to support queries on
// latest state, historical state and on the intersection of state and transactions
//账本查询
type QueryExecutor interface {
	// GetState gets the value for given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId
	//获取一个状态,namespace可以理解为账本id和链码名的组合,key是账本中的对象
	GetState(namespace string, key string) ([]byte, error)
	// GetStateMultipleKeys gets the values for multiple keys in a single call
	//获取多个状态
	GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error)
	// GetStateRangeScanIterator returns an iterator that contains all the key-values between given key ranges.
	// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
	// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
	// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons.
	// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
	//获取状态区间
	GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error)
	//上面方法都是对键值对模型的操作
	// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
	// Only used for state databases that support query
	// For a chaincode, the namespace corresponds to the chaincodeId
	// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
	//富文本查询/模糊查询,只有couchdb支持,leveldb不支持
	ExecuteQuery(namespace, query string) (commonledger.ResultsIterator, error)
	// Done releases resources occupied by the QueryExecutor
	Done()
}

// HistoryQueryExecutor executes the history queries
type HistoryQueryExecutor interface {
	// GetHistoryForKey retrieves the history of values for a key.
	// The returned ResultsIterator contains results of type *KeyModification which is defined in protos/ledger/queryresult.
	//根据key获取key的变动历史
	GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error)
}

// TxSimulator simulates a transaction on a consistent snapshot of the 'as recent state as possible'
// Set* methods are for supporting KV-based data model. ExecuteUpdate method is for supporting a rich datamodel and query support
//交易执行的上下文
type TxSimulator interface {
	QueryExecutor//上面的查询执行器
	// SetState sets the given value for the given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId
	SetState(namespace string, key string, value []byte) error
	// DeleteState deletes the given namespace and key
	DeleteState(namespace string, key string) error
	// SetMultipleKeys sets the values for multiple keys in a single call
	SetStateMultipleKeys(namespace string, kvs map[string][]byte) error
	// ExecuteUpdate for supporting rich data model (see comments on QueryExecutor above)
	//以上三个方法对state即世界状态进行更改
	ExecuteUpdate(query string) error
	// GetTxSimulationResults encapsulates the results of the transaction simulation.
	// This should contain enough detail for
	// - The update in the state that would be caused if the transaction is to be committed
	// - The environment in which the transaction is executed so as to be able to decide the validity of the environment
	//   (at a later time on a different peer) during committing the transactions
	// Different ledger implementation (or configurations of a single implementation) may want to represent the above two pieces
	// of information in different way in order to support different data-models or optimize the information representations.
	//获取交易模拟执行的结果,导出交易读写集
	GetTxSimulationResults() ([]byte, error)
}

接下来是构建读写集
lockbased_tx_simulator.go
// LockBasedTxSimulator is a transaction simulator used in `LockBasedTxMgr`
type lockBasedTxSimulator struct {
	lockBasedQueryExecutor
	rwsetBuilder *rwsetutil.RWSetBuilder  //用于构建读写集
}

主要实现还是在RWSetBuilder中,进入RWSetBuilder看一下,主要包括的是对读写集的添加和导出操作
rwset_builder.go
// RWSetBuilder helps building the read-write set
type RWSetBuilder struct {
	rwMap map[string]*nsRWs //按照namespace隔离的map
}

// NewRWSetBuilder constructs a new instance of RWSetBuilder
func NewRWSetBuilder() *RWSetBuilder {
	return &RWSetBuilder{make(map[string]*nsRWs)}
}

// AddToReadSet adds a key and corresponding version to the read-set
//向读集里添加键值对
func (rws *RWSetBuilder) AddToReadSet(ns string, key string, version *version.Height) {
	nsRWs := rws.getOrCreateNsRW(ns)
	nsRWs.readMap[key] = NewKVRead(key, version)
}

// AddToWriteSet adds a key and value to the write-set
//向写集里添加键值对
func (rws *RWSetBuilder) AddToWriteSet(ns string, key string, value []byte) {
	nsRWs := rws.getOrCreateNsRW(ns)
	nsRWs.writeMap[key] = newKVWrite(key, value)
}

// AddToRangeQuerySet adds a range query info for performing phantom read validation
func (rws *RWSetBuilder) AddToRangeQuerySet(ns string, rqi *kvrwset.RangeQueryInfo) {
	nsRWs := rws.getOrCreateNsRW(ns)
	key := rangeQueryKey{rqi.StartKey, rqi.EndKey, rqi.ItrExhausted}
	_, ok := nsRWs.rangeQueriesMap[key]
	if !ok {
		nsRWs.rangeQueriesMap[key] = rqi
		nsRWs.rangeQueriesKeys = append(nsRWs.rangeQueriesKeys, key)
	}
}

// GetTxReadWriteSet returns the read-write set in the form that can be serialized
//导出builder中生成的读写集
func (rws *RWSetBuilder) GetTxReadWriteSet() *TxRwSet {
	txRWSet := &TxRwSet{}
	sortedNamespaces := util.GetSortedKeys(rws.rwMap)
	for _, ns := range sortedNamespaces {
		//Get namespace specific read-writes
		nsReadWriteMap := rws.rwMap[ns]

		//add read set
		var reads []*kvrwset.KVRead
		sortedReadKeys := util.GetSortedKeys(nsReadWriteMap.readMap)
		for _, key := range sortedReadKeys {
			reads = append(reads, nsReadWriteMap.readMap[key])
		}

		//add write set
		var writes []*kvrwset.KVWrite
		sortedWriteKeys := util.GetSortedKeys(nsReadWriteMap.writeMap)
		for _, key := range sortedWriteKeys {
			writes = append(writes, nsReadWriteMap.writeMap[key])
		}

		//add range query info
		var rangeQueriesInfo []*kvrwset.RangeQueryInfo
		rangeQueriesMap := nsReadWriteMap.rangeQueriesMap
		for _, key := range nsReadWriteMap.rangeQueriesKeys {
			rangeQueriesInfo = append(rangeQueriesInfo, rangeQueriesMap[key])
		}
		kvRWs := &kvrwset.KVRWSet{Reads: reads, Writes: writes, RangeQueriesInfo: rangeQueriesInfo}
		nsRWs := &NsRwSet{ns, kvRWs}
		txRWSet.NsRwSets = append(txRWSet.NsRwSets, nsRWs)
	}
	return txRWSet
}

二、交易读写集
交易读写集就是告诉区块链交易对哪些数据进行了读操作,对哪些数据进行了写操作或更新操作 包括读集、写集和版本号

  • 构建读写集
    相关代码在ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr路径下
    lockbased_tx_simulator.go
// LockBasedTxSimulator is a transaction simulator used in `LockBasedTxMgr`
type lockBasedTxSimulator struct {
	lockBasedQueryExecutor
	rwsetBuilder *rwsetutil.RWSetBuilder  //用于构建读写集
}

主要实现还是在RWSetBuilder中,进入RWSetBuilder看一下,主要包括的是对读写集的添加和导出操作
rwset_builder.go

// RWSetBuilder helps building the read-write set
type RWSetBuilder struct {
	rwMap map[string]*nsRWs //按照namespace隔离的map
}

// NewRWSetBuilder constructs a new instance of RWSetBuilder
func NewRWSetBuilder() *RWSetBuilder {
	return &RWSetBuilder{make(map[string]*nsRWs)}
}

// AddToReadSet adds a key and corresponding version to the read-set
//向读集里添加键值对
func (rws *RWSetBuilder) AddToReadSet(ns string, key string, version *version.Height) {
	nsRWs := rws.getOrCreateNsRW(ns)
	nsRWs.readMap[key] = NewKVRead(key, version)
}

// AddToWriteSet adds a key and value to the write-set
//向写集里添加键值对
func (rws *RWSetBuilder) AddToWriteSet(ns string, key string, value []byte) {
	nsRWs := rws.getOrCreateNsRW(ns)
	nsRWs.writeMap[key] = newKVWrite(key, value)
}

// AddToRangeQuerySet adds a range query info for performing phantom read validation
func (rws *RWSetBuilder) AddToRangeQuerySet(ns string, rqi *kvrwset.RangeQueryInfo) {
	nsRWs := rws.getOrCreateNsRW(ns)
	key := rangeQueryKey{rqi.StartKey, rqi.EndKey, rqi.ItrExhausted}
	_, ok := nsRWs.rangeQueriesMap[key]
	if !ok {
		nsRWs.rangeQueriesMap[key] = rqi
		nsRWs.rangeQueriesKeys = append(nsRWs.rangeQueriesKeys, key)
	}
}

// GetTxReadWriteSet returns the read-write set in the form that can be serialized
//导出builder中生成的读写集
func (rws *RWSetBuilder) GetTxReadWriteSet() *TxRwSet {
	txRWSet := &TxRwSet{}
	sortedNamespaces := util.GetSortedKeys(rws.rwMap)
	for _, ns := range sortedNamespaces {
		//Get namespace specific read-writes
		nsReadWriteMap := rws.rwMap[ns]

		//add read set
		var reads []*kvrwset.KVRead
		sortedReadKeys := util.GetSortedKeys(nsReadWriteMap.readMap)
		for _, key := range sortedReadKeys {
			reads = append(reads, nsReadWriteMap.readMap[key])
		}

		//add write set
		var writes []*kvrwset.KVWrite
		sortedWriteKeys := util.GetSortedKeys(nsReadWriteMap.writeMap)
		for _, key := range sortedWriteKeys {
			writes = append(writes, nsReadWriteMap.writeMap[key])
		}

		//add range query info
		var rangeQueriesInfo []*kvrwset.RangeQueryInfo
		rangeQueriesMap := nsReadWriteMap.rangeQueriesMap
		for _, key := range nsReadWriteMap.rangeQueriesKeys {
			rangeQueriesInfo = append(rangeQueriesInfo, rangeQueriesMap[key])
		}
		kvRWs := &kvrwset.KVRWSet{Reads: reads, Writes: writes, RangeQueriesInfo: rangeQueriesInfo}
		nsRWs := &NsRwSet{ns, kvRWs}
		txRWSet.NsRwSets = append(txRWSet.NsRwSets, nsRWs)
	}
	return txRWSet
}

以上是交易读写集的构造,接下来看交易读写集的验证
相关文件在ledger/kvledger/txmgmt/validator/statebasedval目录下
state_based_validator.go

//validate endorser transaction
func (v *Validator) validateEndorserTX(envBytes []byte, doMVCCValidation bool, updates *statedb.UpdateBatch) (*rwsetutil.TxRwSet, peer.TxValidationCode, error) {
	// extract actions from the envelope message
	respPayload, err := putils.GetActionFromEnvelope(envBytes)
	if err != nil {
		return nil, peer.TxValidationCode_NIL_TXACTION, nil
	}

	//preparation for extracting RWSet from transaction
	txRWSet := &rwsetutil.TxRwSet{}

	// Get the Result from the Action
	// and then Unmarshal it into a TxReadWriteSet using custom unmarshalling

	if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
		return nil, peer.TxValidationCode_INVALID_OTHER_REASON, nil
	}

	txResult := peer.TxValidationCode_VALID
	//以上都是反序列化交易读写集
	//mvccvalidation, may invalidate transaction
	if doMVCCValidation {
		if txResult, err = v.validateTx(txRWSet, updates); err != nil {
			return nil, txResult, err
		} else if txResult != peer.TxValidationCode_VALID {
			txRWSet = nil
		}
	}

	return txRWSet, txResult, err
}

// ValidateAndPrepareBatch implements method in Validator interface
func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*statedb.UpdateBatch, error) {
	logger.Debugf("New block arrived for validation:%#v, doMVCCValidation=%t", block, doMVCCValidation)
	//记录状态数据库的更改
	updates := statedb.NewUpdateBatch()
	logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))

	// Committer validator has already set validation flags based on well formed tran checks
	//获取区块里交易的状态
	txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])

	// Precaution in case committer validator has not added validation flags yet
	//判断是否已经被其他验证者验证过
	if len(txsFilter) == 0 {
		txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
		block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
	}

	for txIndex, envBytes := range block.Data.Data {
		//略过无效交易
		if txsFilter.IsInvalid(txIndex) {
			// Skiping invalid transaction
			logger.Warningf("Block [%d] Transaction index [%d] marked as invalid by committer. Reason code [%d]",
				block.Header.Number, txIndex, txsFilter.Flag(txIndex))
			continue
		}

		env, err := putils.GetEnvelopeFromBlock(envBytes)
		if err != nil {
			return nil, err
		}

		payload, err := putils.GetPayload(env)
		if err != nil {
			return nil, err
		}

		chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
		if err != nil {
			return nil, err
		}

		txType := common.HeaderType(chdr.Type)
		//略过非endorser交易
		if txType != common.HeaderType_ENDORSER_TRANSACTION {
			logger.Debugf("Skipping mvcc validation for Block [%d] Transaction index [%d] because, the transaction type is [%s]",
				block.Header.Number, txIndex, txType)
			continue
		}
		//验证交易读写集,txRWSet为过滤后的交易读写集, txResult交易的状态, err错误信息
		txRWSet, txResult, err := v.validateEndorserTX(envBytes, doMVCCValidation, updates)

		if err != nil {
			return nil, err
		}
		//更新交易的状态
		txsFilter.SetFlag(txIndex, txResult)

		//txRWSet != nil => t is valid
		//将过滤后的交易放入待更新的状态集中
		if txRWSet != nil {
			committingTxHeight := version.NewHeight(block.Header.Number, uint64(txIndex))
			addWriteSetToBatch(txRWSet, committingTxHeight, updates)
			txsFilter.SetFlag(txIndex, peer.TxValidationCode_VALID)
		}

		if txsFilter.IsValid(txIndex) {
			logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator",
				block.Header.Number, txIndex, chdr.TxId)
		} else {
			logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%d]",
				block.Header.Number, txIndex, chdr.TxId, txsFilter.Flag(txIndex))
		}
	}
	//更新交易状态到区块的元数据中
	block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
	return updates, nil
}

func addWriteSetToBatch(txRWSet *rwsetutil.TxRwSet, txHeight *version.Height, batch *statedb.UpdateBatch) {
	for _, nsRWSet := range txRWSet.NsRwSets {
		ns := nsRWSet.NameSpace
		for _, kvWrite := range nsRWSet.KvRwSet.Writes {
			if kvWrite.IsDelete {
				batch.Delete(ns, kvWrite.Key, txHeight)
			} else {
				batch.Put(ns, kvWrite.Key, kvWrite.Value, txHeight)
			}
		}
	}
}

func (v *Validator) validateTx(txRWSet *rwsetutil.TxRwSet, updates *statedb.UpdateBatch) (peer.TxValidationCode, error) {
	for _, nsRWSet := range txRWSet.NsRwSets {
		ns := nsRWSet.NameSpace
		//验证读集
		if valid, err := v.validateReadSet(ns, nsRWSet.KvRwSet.Reads, updates); !valid || err != nil {
			if err != nil {
				return peer.TxValidationCode(-1), err
			}
			return peer.TxValidationCode_MVCC_READ_CONFLICT, nil
		}
		//rangequeries是读集的拓展
		if valid, err := v.validateRangeQueries(ns, nsRWSet.KvRwSet.RangeQueriesInfo, updates); !valid || err != nil {
			if err != nil {
				return peer.TxValidationCode(-1), err
			}
			return peer.TxValidationCode_PHANTOM_READ_CONFLICT, nil
		}
	}
	return peer.TxValidationCode_VALID, nil
}

func (v *Validator) validateReadSet(ns string, kvReads []*kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
	for _, kvRead := range kvReads {
		if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil {
			return valid, err
		}
	}
	return true, nil
}

上面的一段是验证的外围操作,主要是判读交易是否需要验证以及验证成功后将交易放入待更新的状态集中、将一个区块中的所有的交易状态保存到区块的元数据中
核心的验证代码在validateKVRead中

// validateKVRead performs mvcc check for a key read during transaction simulation.
// i.e., it checks whether a key/version combination is already updated in the statedb (by an already committed block)
// or in the updates (by a preceding valid transaction in the current block)
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
	//看状态更新集里面有没有与读集相同的内容,有则说明之前的交易修改了该键值对,该交易无效
	if updates.Exists(ns, kvRead.Key) {
		return false, nil
	}
	//读取世界状态中读集对应的版本号
	versionedValue, err := v.db.GetState(ns, kvRead.Key)
	if err != nil {
		return false, err
	}
	var committedVersion *version.Height
	if versionedValue != nil {
		committedVersion = versionedValue.Version
	}
	//验证世界状态中的version与读集中的version是否是一致的
	if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
		logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]",
			ns, kvRead.Key, committedVersion, kvRead.Version)
		return false, nil
	}
	return true, nil
}

主要验证逻辑是:首先判断状态更新集里有没有与读集相同的内容,有则说明之前的交易修改了该键值对,该交易判定为无效,如果没有则读取世界状态中读集对应的版本号,判断交易中的版本号与世界状态中的版本号是否一致,如果一致则为有效交易,否则为无效交易。

三、状态数据库及历史状态数据库
二者是不同的概念,分别对应世界状态和历史数据索引,历史状态数据库保存操作包含在哪个区块,并不保存值,只保存值变动的动作。

状态数据库要解决三个问题:

  • 如何联系智能合约中的键和底层存储的键值对
  • 如何持久化区块信息
  • 如何标识最新的区块信息

第一个问题:状态数据库通过保存和读取由通道名、链码名、对象名及分隔符组成的组合键来建立智能合约中的键与底层存储的键值对之间的关系

相关代码在ledger/kvledger/txmgmt/stateleveldb目录下
stateleveldb.go

// GetState implements method in VersionedDB interface
//获取状态,通过compositekey,由通道名、链码名、对象名及分隔符组成查询value
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
	logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
	compositeKey := constructCompositeKey(namespace, key)
	dbVal, err := vdb.db.Get(compositeKey)
	if err != nil {
		return nil, err
	}
	if dbVal == nil {
		return nil, nil
	}
	val, ver := statedb.DecodeValue(dbVal)
	return &statedb.VersionedValue{Value: val, Version: ver}, nil
}

// GetStateMultipleKeys implements method in VersionedDB interface
//查询多个键的值
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
	vals := make([]*statedb.VersionedValue, len(keys))
	for i, key := range keys {
		val, err := vdb.GetState(namespace, key)
		if err != nil {
			return nil, err
		}
		vals[i] = val
	}
	return vals, nil
}

// GetStateRangeScanIterator implements method in VersionedDB interface
// startKey is inclusive
// endKey is exclusive
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
	compositeStartKey := constructCompositeKey(namespace, startKey)
	compositeEndKey := constructCompositeKey(namespace, endKey)
	if endKey == "" {
		compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
	}
	dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
	return newKVScanner(namespace, dbItr), nil
}

// ExecuteQuery implements method in VersionedDB interface
func (vdb *versionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {
	return nil, errors.New("ExecuteQuery not supported for leveldb")
}

以上是对处理、保存、读取组合键的相关方法

第二、三个问题:区块信息的持久化、标识最新的区块信息
相关代码在同一个路径下(ledger/kvledger/txmgmt/stateleveldb)

// ApplyUpdates implements method in VersionedDB interface
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
	//获取更更新器
	dbBatch := leveldbhelper.NewUpdateBatch()
	//获取namespace
	namespaces := batch.GetUpdatedNamespaces()
	//对每个namespace的更新内容循环进行更新
	for _, ns := range namespaces {
		updates := batch.GetUpdates(ns)
		for k, vv := range updates {
			compositeKey := constructCompositeKey(ns, k)
			logger.Debugf("Channel [%s]: Applying key=[%#v]", vdb.dbName, compositeKey)

			//如果要更新的值为空则删除对应键,否则将compositekey作为键,value和version共同作为值保存到状态数据库中
			if vv.Value == nil {
				dbBatch.Delete(compositeKey)
			} else {
				dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
			}
		}
	}
	//标识最新的区块高度,此处即是标识最新的区块信息
	dbBatch.Put(savePointKey, height.ToBytes())
	// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
	if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
		return err
	}
	return nil
}

状态数据库的实现代码,包括几个查询方法、一个更新方法以及一些辅助方法(如智能合约中的键值对与状态数据库中键值对进行转化的方法)。状态数据库中的键是以智能合约的通道名、智能合约的名字以及智能合约中的键拼接在一起作为键,值则是智能合约中的值以及对应的状态版本为值。
以上是区块持久化及标识最新区块的方法,状态数据库的点基本上就在上面的三个问题里了。

再看历史数据库的实现方法
同样有两个问题需要解决:

  • 如何标识某个键是在哪一个交易中被改变的
  • 如何查询某个键的变动历史

第一个问题:与状态数据库保存智能合约键的方法类似,也是通过组合键的形式,具体是通过由键和区块id,交易id组成的组合键来完成的

// Commit implements method in HistoryDB interface
func (historyDB *historyDB) Commit(block *common.Block) error {

	blockNo := block.Header.Number
	//Set the starting tranNo to 0
	var tranNo uint64

	dbBatch := leveldbhelper.NewUpdateBatch()

	logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions",
		historyDB.dbName, blockNo, len(block.Data.Data))

	// Get the invalidation byte array for the block
	//获取区块的验证数据,用于判断区块中的每笔交易是否合法
	txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
	// Initialize txsFilter if it does not yet exist (e.g. during testing, for genesis block, etc)
	if len(txsFilter) == 0 {
		txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
		block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
	}

	// write each tran's write set to history db
	for _, envBytes := range block.Data.Data {

		// If the tran is marked as invalid, skip it
		//如果交易为无效交易则跳过
		if txsFilter.IsInvalid(int(tranNo)) {
			logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d",
				historyDB.dbName, tranNo)
			tranNo++
			continue
		}
		//获取envelope
		env, err := putils.GetEnvelopeFromBlock(envBytes)
		if err != nil {
			return err
		}
		//从envelope中获取交易体
		payload, err := putils.GetPayload(env)
		if err != nil {
			return err
		}
		//获取并解码交易体中通道头的部分
		chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
		if err != nil {
			return err
		}
		//如果是普通交易(非配置交易)则修改历史数据库
		if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {

			// extract actions from the envelope message
			//从envelope中获取操作
			respPayload, err := putils.GetActionFromEnvelope(envBytes)
			if err != nil {
				return err
			}

			//preparation for extracting RWSet from transaction
			//用于保存交易读写集
			txRWSet := &rwsetutil.TxRwSet{}

			// Get the Result from the Action and then Unmarshal
			// it into a TxReadWriteSet using custom unmarshalling
			//从操作结果中获取读写集
			if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
				return err
			}
			// for each transaction, loop through the namespaces and writesets
			// and add a history record for each write
			for _, nsRWSet := range txRWSet.NsRwSets {
				ns := nsRWSet.NameSpace
				//namespace哪个通道、哪个智能合约

				for _, kvWrite := range nsRWSet.KvRwSet.Writes {
					writeKey := kvWrite.Key

					//composite key for history records is in the form ns~key~blockNo~tranNo
					//循环将写集中每个namespace下的每个键和区块id,交易id作为组合键保存在状态更新集中
					compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)

					// No value is required, write an empty byte array (emptyValue) since Put() of nil is not allowed
					dbBatch.Put(compositeHistoryKey, emptyValue)
				}
			}

		} else {
			logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
		}
		tranNo++
	}

	// add savepoint for recovery purpose
	//将区块高度保存下来
	height := version.NewHeight(blockNo, tranNo)
	dbBatch.Put(savePointKey, height.ToBytes())

	// write the block's history records and savepoint to LevelDB
	// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
	//把状态更新集存入历史数据库
	if err := historyDB.db.WriteBatch(dbBatch, true); err != nil {
		return err
	}

	logger.Debugf("Channel [%s]: Updates committed to history database for blockNo [%v]", historyDB.dbName, blockNo)
	return nil
}

第二个问题:它解决的是标识key是在哪一笔交易中修改的,具体解决方法是通过保存通道名,智能合约名,智能合约中的键,区块编号以及交易编号的组合键来标识。

获取历史信息
historyleveldb_query_executer.go
// GetHistoryForKey implements method in interface `ledger.HistoryQueryExecutor`
func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) {

	//检查账本配置中历史数据库是否开启
	if ledgerconfig.IsHistoryDBEnabled() == false {
		return nil, errors.New("History tracking not enabled - historyDatabase is false")
	}

	var compositeStartKey []byte
	var compositeEndKey []byte
	compositeStartKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, false)
	compositeEndKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, true)

	// range scan to find any history records starting with namespace~key
	dbItr := q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
	//获取历史查询对象,包括通道id、智能合约名的组合键、单独的通道id、智能合约名、智能合约键、iterator、区块存储对象
	return newHistoryScanner(compositeStartKey, namespace, key, dbItr, q.blockStore), nil
}

//historyScanner implements ResultsIterator for iterating through history results
type historyScanner struct {
	compositePartialKey []byte //compositePartialKey includes namespace~key
	namespace           string
	key                 string
	dbItr               iterator.Iterator
	blockStore          blkstorage.BlockStore
}

func newHistoryScanner(compositePartialKey []byte, namespace string, key string,
	dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner {
	return &historyScanner{compositePartialKey, namespace, key, dbItr, blockStore}
}

func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
	if !scanner.dbItr.Next() {
		return nil, nil
	}
	historyKey := scanner.dbItr.Key() // history key is in the form namespace~key~blocknum~trannum

	// SplitCompositeKey(namespace~key~blocknum~trannum, namespace~key~) will return the blocknum~trannum in second position
	_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)
	blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
	tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
	logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
		scanner.namespace, scanner.key, blockNum, tranNum)
	//首先根据历史查询对象获取某个历史记录的区块高度、交易编号
	// Get the transaction from block storage that is associated with this history record
	//根据区块高度和交易编号获取交易的envelope
	tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
	if err != nil {
		return nil, err
	}

	// Get the txid, key write value, timestamp, and delete indicator associated with this transaction
	//根据envelope获取交易的历史信息(交易id、更新值、时间戳等)
	queryResult, err := getKeyModificationFromTran(tranEnvelope, scanner.namespace, scanner.key)
	if err != nil {
		return nil, err
	}
	logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
		scanner.namespace, scanner.key, queryResult.(*queryresult.KeyModification).TxId)
	return queryResult, nil
}
首先根据namespace(通道id、智能合约名)智能合约键查询所有相关的记录,根据查询出的内容找到区块高度和交易i,然后获取envelope再根据envelope获取交易的历史信息(交易id、更新值、时间戳等)

最后是区块文件存储的代码
相关文件在common/ledger/blkstorage目录下
blockstorage.go

// fsBlockStore - filesystem based implementation for `BlockStore`
type fsBlockStore struct {//区块文件存储
	id      string
	conf    *Conf
	fileMgr *blockfileMgr
}

// NewFsBlockStore constructs a `FsBlockStore`
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
	dbHandle *leveldbhelper.DBHandle) *fsBlockStore {
	return &fsBlockStore{id, conf, newBlockfileMgr(id, conf, indexConfig, dbHandle)}
}

// AddBlock adds a new block
//增加区块方法
func (store *fsBlockStore) AddBlock(block *common.Block) error {
	return store.fileMgr.addBlock(block)
}

// GetBlockchainInfo returns the current info about blockchain
//返回链的信息
func (store *fsBlockStore) GetBlockchainInfo() (*common.BlockchainInfo, error) {
	return store.fileMgr.getBlockchainInfo(), nil
}

// RetrieveBlocks returns an iterator that can be used for iterating over a range of blocks
//返回区块的迭代器
func (store *fsBlockStore) RetrieveBlocks(startNum uint64) (ledger.ResultsIterator, error) {
	var itr *blocksItr
	var err error
	if itr, err = store.fileMgr.retrieveBlocks(startNum); err != nil {
		return nil, err
	}
	return itr, nil
}

// RetrieveBlockByHash returns the block for given block-hash
//通过区块哈希返回区块的迭代器
func (store *fsBlockStore) RetrieveBlockByHash(blockHash []byte) (*common.Block, error) {
	return store.fileMgr.retrieveBlockByHash(blockHash)
}

// RetrieveBlockByNumber returns the block at a given blockchain height
//通过区块高度返回区块
func (store *fsBlockStore) RetrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
	return store.fileMgr.retrieveBlockByNumber(blockNum)
}

// RetrieveTxByID returns a transaction for given transaction id
//通过交易id返回交易
func (store *fsBlockStore) RetrieveTxByID(txID string) (*common.Envelope, error) {
	return store.fileMgr.retrieveTransactionByID(txID)
}

// RetrieveTxByID returns a transaction for given transaction id
//通过区块高度、交易位置返回交易
func (store *fsBlockStore) RetrieveTxByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
	return store.fileMgr.retrieveTransactionByBlockNumTranNum(blockNum, tranNum)
}
//通过交易id返回区块
func (store *fsBlockStore) RetrieveBlockByTxID(txID string) (*common.Block, error) {
	return store.fileMgr.retrieveBlockByTxID(txID)
}
//通过交易id返回交易验证结果
func (store *fsBlockStore) RetrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
	return store.fileMgr.retrieveTxValidationCodeByTxID(txID)
}

// Shutdown shuts down the block store
//关闭区块存储器
func (store *fsBlockStore) Shutdown() {
	logger.Debugf("closing fs blockStore:%s", store.id)
	store.fileMgr.close()
}

主要包括了区块增加、查询的方法
然后区块文件流,(但是对流的概念不怎么理解)

对账本存储进行一个总结:

  • 账本存储接口定义:从整体上把握账本存储的方法

  • 交易读写集:交易读写集的建立、校验

  • 状态数据库及历史状态数据库:组合键

  • 区块文件存储及区块索引:基于文件系统

主要接口方法:

  • peerledgerprovider:提供账本层面的方法如创建账本、打开账本查询账本等,一个账本对应三个数据状态数据、历史数据、区块数据。有点类似排序节点manager
  • peerledger:提供对账本的操作,如对区块的查询、对状态的查询等,对状态和历史状态的查询分别使用queryexecutor和historyqueryexecutor实现的。
  • txsimulator:交易模拟器,背书节点执行链码时底层的支持

读写集的概念:是防止双花的关键点,底层实现并不麻烦,就是进行操作前对状态值和版本号进行校验。

状态数据库:目前支持两种数据库couchdb、leveldb,couchdb支持模糊查询而leveldb不支持,二者可以在开发过程中切换,有三个重要问题:

  • 1、如何关联智能合约键值对与底层存储的键值对(数据隔离) 用组合键保存
  • 2、如何持久化区块的状态信息 交易读写集校验后生成状态更新集可用于状态信息的更新
  • 3、如何标识最新存储的区块编号 在更新状态信息的同时记录最新区块的编号

历史状态数据库:只保存操作包含在哪个区块即不保存值,只保存值变动的动作
两个重要问题:

  • 1、标识某key被某交易改变 也是通过保存组合键来标识
  • 2、如何查询某key的变动历史 组合键的前缀匹配查询

区块文件存储分为两块:区块的存储、区块索引的存储

  区块链 最新文章
盘点具备盈利潜力的几大加密板块,以及潜在
阅读笔记|让区块空间成为商品,打造Web3云
区块链1.0-比特币的数据结构
Team Finance被黑分析|黑客自建Token“瞒天
区块链≠绿色?波卡或成 Web3“生态环保”标
期货从入门到高深之手动交易系列D1课
以太坊基础---区块验证
进入以太坊合并的五个数字
经典同态加密算法Paillier解读 - 原理、实现
IPFS/Filecoin学习知识科普(四)
上一篇文章      下一篇文章      查看所有文章
加:2021-10-21 12:14:17  更:2021-10-21 12:14:38 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 20:29:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码