分析
编译完成就是来分析一下代码,为了接下来的修改。 已经有很多对源码的分析,但是看日期都是18年左右,肯定都是1.x版本(甚至是0.8),2.x改变了链码的调用部分,我关注的恰恰是这部分,所以还是得自己动手看。
我看的版本是2.3。还是有一些不同,看的过程我也记下来,方便以后查看。
fabric编译:fabric的docker镜像,在make之后,会直接加入到docker的镜像列表里。
整体结构
fabric从网上git下来就是如下的结构: 我这里用goland打开了项目。文件和文件夹很多,build文件夹是构建之后产生的。文件就不一个一个分析了,要花的时间太多,我这里就看到什么记录一下。
cmd文件夹里就是各个可执行文件的main.go,这里我主要关注peer。根据【1】好像peer也是常规的具有服务端和客户端两个功能。倒是很容易理解,ripple也是这样的,一个程序能够作为服务程序运行,也能够通过命令行调用。当然命令行部分肯定就和SDK的功能一样了。
重点看peer命令的脉络
所以这里面肯定得大致搞清楚peer都用到了哪些部分,主要是哪些功能。判定一下是否需要动其他地方。
节点启动用的是node子命令,其他命令是用来操作链码的。
这里列一下peer的子命令:
- chaincode:
- install 老版本的命令了,去看deployCC都是用的lifecycle
- instantiate
- invoke
- package
- query
- signpackage
- upgrade
- list
- channel:
- create 创建通道
- fetch 获取特定区块信息写入文件,<newest|oldest|config|(number)> [outputfile] [flags]
- join 把节点加入通道 peer channel join [flags]
- joinbysnapshot
- joinbysnapshotstatus
- list 节点已加入的通道
- update 更新通道设定,比如更新锚节点
- signconfigtx 更新configtx文件???不太清楚
- getinfo 获取指定通道的区块信息
- lifecycle: 管理链码生命周期
- chaincode
- approveformyorg 批准
- checkcommitreadiness 检查是否提交
- commit 提交链码
- getinstalledpackage 获取已安装链码的package id
- install 安装链码
- package 打包链码
- queryapproved 查询批准情况(json格式数据)
- querycommitted 查询提交情况
- queryinstalled 查询安装情况
- node:
- start 启动节点,这个极其重要
- reset 所有通道重置到创世块,使用前先停止peer。重启之后peer会从orderer或其他peer重新接收区块。
- rollback 回退到某一个区块,使用前先停止peer。
- pause 对某个通道暂停接收区块,使用前先停止peer。
- resume 和上面相反
- rebuild-dbs 删除所有通道的数据库,然后重新接收,使用前先停止peer。
- upgrade-dbs 直接更新或删除数据库,删掉后也会用新格式重建,使用前先停止peer。
- version 版本
- help 帮助
命令还是很多的,之前看代码还是简单了。这里感觉应该改peer就够了。区块的打包和分发,不用动,就交给orderer。
从cmd/peer/main.go进去,可以看到,引用第三方viper,cobra;通用加密bccsp,其他都是internal/peer。cobra这个好像很方便,可以一层一层嵌套命令。构建命令行工具。
看一下node下的start命令
peer node start
这里面重点是serve函数,记录一下。原有注释我就基本删掉了(节省空间),空行我也会删掉,写我自己的看法和理解。
package node
import (
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
cb "github.com/hyperledger/fabric-protos-go/common"
discprotos "github.com/hyperledger/fabric-protos-go/discovery"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/common/cauthdsl"
ccdef "github.com/hyperledger/fabric/common/chaincode"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/fabhttp"
"github.com/hyperledger/fabric/common/flogging"
floggingmetrics "github.com/hyperledger/fabric/common/flogging/metrics"
"github.com/hyperledger/fabric/common/grpclogging"
"github.com/hyperledger/fabric/common/grpcmetrics"
"github.com/hyperledger/fabric/common/metadata"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/policydsl"
"github.com/hyperledger/fabric/core/aclmgmt"
"github.com/hyperledger/fabric/core/cclifecycle"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/core/chaincode/accesscontrol"
"github.com/hyperledger/fabric/core/chaincode/extcc"
"github.com/hyperledger/fabric/core/chaincode/lifecycle"
"github.com/hyperledger/fabric/core/chaincode/persistence"
"github.com/hyperledger/fabric/core/chaincode/platforms"
"github.com/hyperledger/fabric/core/committer/txvalidator/plugin"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/common/privdata"
coreconfig "github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/core/container"
"github.com/hyperledger/fabric/core/container/dockercontroller"
"github.com/hyperledger/fabric/core/container/externalbuilder"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/dispatcher"
"github.com/hyperledger/fabric/core/endorser"
authHandler "github.com/hyperledger/fabric/core/handlers/auth"
endorsement2 "github.com/hyperledger/fabric/core/handlers/endorsement/api"
endorsement3 "github.com/hyperledger/fabric/core/handlers/endorsement/api/identities"
"github.com/hyperledger/fabric/core/handlers/library"
validation "github.com/hyperledger/fabric/core/handlers/validation/api"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/ledger/snapshotgrpc"
"github.com/hyperledger/fabric/core/operations"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/core/policy"
"github.com/hyperledger/fabric/core/scc"
"github.com/hyperledger/fabric/core/scc/cscc"
"github.com/hyperledger/fabric/core/scc/lscc"
"github.com/hyperledger/fabric/core/scc/qscc"
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/discovery"
"github.com/hyperledger/fabric/discovery/endorsement"
discsupport "github.com/hyperledger/fabric/discovery/support"
discacl "github.com/hyperledger/fabric/discovery/support/acl"
ccsupport "github.com/hyperledger/fabric/discovery/support/chaincode"
"github.com/hyperledger/fabric/discovery/support/config"
"github.com/hyperledger/fabric/discovery/support/gossip"
gossipcommon "github.com/hyperledger/fabric/gossip/common"
gossipgossip "github.com/hyperledger/fabric/gossip/gossip"
gossipmetrics "github.com/hyperledger/fabric/gossip/metrics"
gossipprivdata "github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/gossip/service"
gossipservice "github.com/hyperledger/fabric/gossip/service"
peergossip "github.com/hyperledger/fabric/internal/peer/gossip"
"github.com/hyperledger/fabric/internal/peer/version"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
"github.com/hyperledger/fabric/externaldb"
)
const (
chaincodeListenAddrKey = "peer.chaincodeListenAddress"
defaultChaincodePort = 7052
)
var chaincodeDevMode bool
func startCmd() *cobra.Command {
flags := nodeStartCmd.Flags()
flags.BoolVarP(&chaincodeDevMode, "peer-chaincodedev", "", false, "start peer in chaincode development mode")
return nodeStartCmd
}
var nodeStartCmd = &cobra.Command{
Use: "start",
Short: "Starts the node.",
Long: `Starts a node that interacts with the network.HYSHYSHYS`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 0 {
return fmt.Errorf("trailing args detected")
}
cmd.SilenceUsage = true
return serve(args)
},
}
type externalVMAdapter struct {
detector *externalbuilder.Detector
}
func (e externalVMAdapter) Build(
ccid string,
mdBytes []byte,
codePackage io.Reader,
) (container.Instance, error) {
i, err := e.detector.Build(ccid, mdBytes, codePackage)
if err != nil {
return nil, err
}
if i == nil {
return nil, nil
}
return i, err
}
type disabledDockerBuilder struct{}
func (disabledDockerBuilder) Build(string, *persistence.ChaincodePackageMetadata, io.Reader) (container.Instance, error) {
return nil, errors.New("docker build is disabled")
}
type endorserChannelAdapter struct {
peer *peer.Peer
}
func (e endorserChannelAdapter) Channel(channelID string) *endorser.Channel {
if peerChannel := e.peer.Channel(channelID); peerChannel != nil {
return &endorser.Channel{
IdentityDeserializer: peerChannel.MSPManager(),
}
}
return nil
}
type custodianLauncherAdapter struct {
launcher chaincode.Launcher
streamHandler extcc.StreamHandler
}
func (c custodianLauncherAdapter) Launch(ccid string) error {
return c.launcher.Launch(ccid, c.streamHandler)
}
func (c custodianLauncherAdapter) Stop(ccid string) error {
return c.launcher.Stop(ccid)
}
func serve(args []string) error {
mspType := mgmt.GetLocalMSP(factory.GetDefault()).GetType()
if mspType != msp.FABRIC {
panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
}
grpc.EnableTracing = true
logger.Infof("Starting %s", version.GetInfo())
coreConfig, err := peer.GlobalConfig()
if err != nil {
return err
}
platformRegistry := platforms.NewRegistry(platforms.SupportedPlatforms...)
identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {
return mgmt.GetManagerForChain(chainID)
}
opsSystem := newOperationsSystem(coreConfig)
err = opsSystem.Start()
if err != nil {
return errors.WithMessage(err, "failed to initialize operations subsystem")
}
defer opsSystem.Stop()
metricsProvider := opsSystem.Provider
logObserver := floggingmetrics.NewObserver(metricsProvider)
flogging.SetObserver(logObserver)
mspID := coreConfig.LocalMSPID
membershipInfoProvider := privdata.NewMembershipInfoProvider(mspID, createSelfSignedData(), identityDeserializerFactory)
chaincodeInstallPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "lifecycle", "chaincodes")
ccStore := persistence.NewStore(chaincodeInstallPath)
ccPackageParser := &persistence.ChaincodePackageParser{
MetadataProvider: ccprovider.PersistenceAdapter(ccprovider.MetadataAsTarEntries),
}
peerHost, _, err := net.SplitHostPort(coreConfig.PeerAddress)
if err != nil {
return fmt.Errorf("peer address is not in the format of host:port: %v", err)
}
listenAddr := coreConfig.ListenAddress
serverConfig, err := peer.GetServerConfig()
if err != nil {
logger.Fatalf("Error loading secure config for peer (%s)", err)
}
serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")
serverConfig.ServerStatsHandler = comm.NewServerStatsHandler(metricsProvider)
serverConfig.UnaryInterceptors = append(
serverConfig.UnaryInterceptors,
grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
)
serverConfig.StreamInterceptors = append(
serverConfig.StreamInterceptors,
grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
)
semaphores := initGrpcSemaphores(coreConfig)
if len(semaphores) != 0 {
serverConfig.UnaryInterceptors = append(serverConfig.UnaryInterceptors, unaryGrpcLimiter(semaphores))
serverConfig.StreamInterceptors = append(serverConfig.StreamInterceptors, streamGrpcLimiter(semaphores))
}
cs := comm.NewCredentialSupport()
if serverConfig.SecOpts.UseTLS {
logger.Info("Starting peer with TLS enabled")
cs = comm.NewCredentialSupport(serverConfig.SecOpts.ServerRootCAs...)
clientCert, err := peer.GetClientCertificate()
if err != nil {
logger.Fatalf("Failed to set TLS client certificate (%s)", err)
}
cs.SetClientCertificate(clientCert)
}
transientStoreProvider, err := transientstore.NewStoreProvider(
filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "transientstore"),
)
if err != nil {
return errors.WithMessage(err, "failed to open transient store")
}
deliverServiceConfig := deliverservice.GlobalConfig()
peerInstance := &peer.Peer{
ServerConfig: serverConfig,
CredentialSupport: cs,
StoreProvider: transientStoreProvider,
CryptoProvider: factory.GetDefault(),
OrdererEndpointOverrides: deliverServiceConfig.OrdererEndpointOverrides,
}
localMSP := mgmt.GetLocalMSP(factory.GetDefault())
signingIdentity, err := localMSP.GetDefaultSigningIdentity()
if err != nil {
logger.Panicf("Could not get the default signing identity from the local MSP: [%+v]", err)
}
signingIdentityBytes, err := signingIdentity.Serialize()
if err != nil {
logger.Panicf("Failed to serialize the signing identity: %v", err)
}
expirationLogger := flogging.MustGetLogger("certmonitor")
crypto.TrackExpiration(
serverConfig.SecOpts.UseTLS,
serverConfig.SecOpts.Certificate,
cs.GetClientCertificate().Certificate,
signingIdentityBytes,
expirationLogger.Infof,
expirationLogger.Warnf,
time.Now(),
time.AfterFunc,
)
policyMgr := policies.PolicyManagerGetterFunc(peerInstance.GetPolicyManager)
deliverGRPCClient, err := comm.NewGRPCClient(comm.ClientConfig{
Timeout: deliverServiceConfig.ConnectionTimeout,
KaOpts: deliverServiceConfig.KeepaliveOptions,
SecOpts: deliverServiceConfig.SecOpts,
})
if err != nil {
logger.Panicf("Could not create the deliver grpc client: [%+v]", err)
}
policyChecker := policy.NewPolicyChecker(
policies.PolicyManagerGetterFunc(peerInstance.GetPolicyManager),
mgmt.GetLocalMSP(factory.GetDefault()),
mgmt.NewLocalMSPPrincipalGetter(factory.GetDefault()),
)
aclProvider := aclmgmt.NewACLProvider(
aclmgmt.ResourceGetter(peerInstance.GetStableChannelConfig),
policyChecker,
)
lifecycleResources := &lifecycle.Resources{
Serializer: &lifecycle.Serializer{},
ChannelConfigSource: peerInstance,
ChaincodeStore: ccStore,
PackageParser: ccPackageParser,
}
privdataConfig := gossipprivdata.GlobalConfig()
lifecycleValidatorCommitter := &lifecycle.ValidatorCommitter{
CoreConfig: coreConfig,
PrivdataConfig: privdataConfig,
Resources: lifecycleResources,
LegacyDeployedCCInfoProvider: &lscc.DeployedCCInfoProvider{},
}
lsccInstallPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "chaincodes")
ccprovider.SetChaincodesPath(lsccInstallPath)
ccInfoFSImpl := &ccprovider.CCInfoFSImpl{GetHasher: factory.GetDefault()}
legacyMetadataManager, err := cclifecycle.NewMetadataManager(
cclifecycle.EnumerateFunc(
func() ([]ccdef.InstalledChaincode, error) {
return ccInfoFSImpl.ListInstalledChaincodes(ccInfoFSImpl.GetChaincodeInstallPath(), ioutil.ReadDir, ccprovider.LoadPackage)
},
),
)
if err != nil {
logger.Panicf("Failed creating LegacyMetadataManager: +%v", err)
}
metadataManager := lifecycle.NewMetadataManager()
chaincodeCustodian := lifecycle.NewChaincodeCustodian()
externalBuilderOutput := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "externalbuilder", "builds")
if err := os.MkdirAll(externalBuilderOutput, 0700); err != nil {
logger.Panicf("could not create externalbuilder build output dir: %s", err)
}
ebMetadataProvider := &externalbuilder.MetadataProvider{
DurablePath: externalBuilderOutput,
}
lifecycleCache := lifecycle.NewCache(
lifecycleResources,
mspID,
metadataManager,
chaincodeCustodian,
ebMetadataProvider,
)
txProcessors := map[common.HeaderType]ledger.CustomTxProcessor{
common.HeaderType_CONFIG: &peer.ConfigTxProcessor{},
}
peerInstance.LedgerMgr = ledgermgmt.NewLedgerMgr(
&ledgermgmt.Initializer{
CustomTxProcessors: txProcessors,
DeployedChaincodeInfoProvider: lifecycleValidatorCommitter,
MembershipInfoProvider: membershipInfoProvider,
ChaincodeLifecycleEventProvider: lifecycleCache,
MetricsProvider: metricsProvider,
HealthCheckRegistry: opsSystem,
StateListeners: []ledger.StateListener{lifecycleCache},
Config: ledgerConfig(),
HashProvider: factory.GetDefault(),
EbMetadataProvider: ebMetadataProvider,
},
)
peerServer, err := comm.NewGRPCServer(listenAddr, serverConfig)
if err != nil {
logger.Fatalf("Failed to create peer server (%s)", err)
}
gossipService, err := initGossipService(
policyMgr,
metricsProvider,
peerServer,
signingIdentity,
cs,
coreConfig.PeerAddress,
deliverGRPCClient,
deliverServiceConfig,
privdataConfig,
)
if err != nil {
return errors.WithMessage(err, "failed to initialize gossip service")
}
defer gossipService.Stop()
peerInstance.GossipService = gossipService
if err := lifecycleCache.InitializeLocalChaincodes(); err != nil {
return errors.WithMessage(err, "could not initialize local chaincodes")
}
if chaincodeDevMode {
logger.Info("Running in chaincode development mode")
logger.Info("Disable loading validity system chaincode")
viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
}
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {
return func(env *cb.Envelope, channelID string) error {
return aclProvider.CheckACL(resourceName, channelID, env)
}
}
metrics := deliver.NewMetrics(metricsProvider)
abServer := &peer.DeliverServer{
DeliverHandler: deliver.NewHandler(
&peer.DeliverChainManager{Peer: peerInstance},
coreConfig.AuthenticationTimeWindow,
mutualTLS,
metrics,
false,
),
PolicyCheckerProvider: policyCheckerProvider,
}
pb.RegisterDeliverServer(peerServer.Server(), abServer)
ca, err := tlsgen.NewCA()
if err != nil {
logger.Panic("Failed creating authentication layer:", err)
}
ccSrv, ccEndpoint, err := createChaincodeServer(coreConfig, ca, peerHost)
if err != nil {
logger.Panicf("Failed to create chaincode server: %s", err)
}
userRunsCC := chaincode.IsDevMode()
tlsEnabled := coreConfig.PeerTLSEnabled
authenticator := accesscontrol.NewAuthenticator(ca)
chaincodeHandlerRegistry := chaincode.NewHandlerRegistry(userRunsCC)
lifecycleTxQueryExecutorGetter := &chaincode.TxQueryExecutorGetter{
CCID: scc.ChaincodeID(lifecycle.LifecycleNamespace),
HandlerRegistry: chaincodeHandlerRegistry,
}
if coreConfig.VMEndpoint == "" && len(coreConfig.ExternalBuilders) == 0 {
logger.Panic("VMEndpoint not set and no ExternalBuilders defined")
}
chaincodeConfig := chaincode.GlobalConfig()
var dockerBuilder container.DockerBuilder
if coreConfig.VMEndpoint != "" {
client, err := createDockerClient(coreConfig)
if err != nil {
logger.Panicf("cannot create docker client: %s", err)
}
dockerVM := &dockercontroller.DockerVM{
PeerID: coreConfig.PeerID,
NetworkID: coreConfig.NetworkID,
BuildMetrics: dockercontroller.NewBuildMetrics(opsSystem.Provider),
Client: client,
AttachStdOut: coreConfig.VMDockerAttachStdout,
HostConfig: getDockerHostConfig(),
ChaincodePull: coreConfig.ChaincodePull,
NetworkMode: coreConfig.VMNetworkMode,
PlatformBuilder: &platforms.Builder{
Registry: platformRegistry,
Client: client,
},
LoggingEnv: []string{
"CORE_CHAINCODE_LOGGING_LEVEL=" + chaincodeConfig.LogLevel,
"CORE_CHAINCODE_LOGGING_SHIM=" + chaincodeConfig.ShimLogLevel,
"CORE_CHAINCODE_LOGGING_FORMAT=" + chaincodeConfig.LogFormat,
},
MSPID: mspID,
}
if err := opsSystem.RegisterChecker("docker", dockerVM); err != nil {
logger.Panicf("failed to register docker health check: %s", err)
}
dockerBuilder = dockerVM
}
if dockerBuilder == nil {
dockerBuilder = &disabledDockerBuilder{}
}
externalVM := &externalbuilder.Detector{
Builders: externalbuilder.CreateBuilders(coreConfig.ExternalBuilders, mspID),
DurablePath: externalBuilderOutput,
}
buildRegistry := &container.BuildRegistry{}
containerRouter := &container.Router{
DockerBuilder: dockerBuilder,
ExternalBuilder: externalVMAdapter{externalVM},
PackageProvider: &persistence.FallbackPackageLocator{
ChaincodePackageLocator: &persistence.ChaincodePackageLocator{
ChaincodeDir: chaincodeInstallPath,
},
LegacyCCPackageLocator: &ccprovider.CCInfoFSImpl{GetHasher: factory.GetDefault()},
},
}
builtinSCCs := map[string]struct{}{
"lscc": {},
"qscc": {},
"cscc": {},
"_lifecycle": {},
}
lsccInst := &lscc.SCC{
BuiltinSCCs: builtinSCCs,
Support: &lscc.SupportImpl{
GetMSPIDs: peerInstance.GetMSPIDs,
},
SCCProvider: &lscc.PeerShim{Peer: peerInstance},
ACLProvider: aclProvider,
GetMSPIDs: peerInstance.GetMSPIDs,
PolicyChecker: policyChecker,
BCCSP: factory.GetDefault(),
BuildRegistry: buildRegistry,
ChaincodeBuilder: containerRouter,
EbMetadataProvider: ebMetadataProvider,
}
chaincodeEndorsementInfo := &lifecycle.ChaincodeEndorsementInfoSource{
LegacyImpl: lsccInst,
Resources: lifecycleResources,
Cache: lifecycleCache,
BuiltinSCCs: builtinSCCs,
UserRunsCC: userRunsCC,
}
containerRuntime := &chaincode.ContainerRuntime{
BuildRegistry: buildRegistry,
ContainerRouter: containerRouter,
}
lifecycleFunctions := &lifecycle.ExternalFunctions{
Resources: lifecycleResources,
InstallListener: lifecycleCache,
InstalledChaincodesLister: lifecycleCache,
ChaincodeBuilder: containerRouter,
BuildRegistry: buildRegistry,
}
lifecycleSCC := &lifecycle.SCC{
Dispatcher: &dispatcher.Dispatcher{
Protobuf: &dispatcher.ProtobufImpl{},
},
DeployedCCInfoProvider: lifecycleValidatorCommitter,
QueryExecutorProvider: lifecycleTxQueryExecutorGetter,
Functions: lifecycleFunctions,
OrgMSPID: mspID,
ChannelConfigSource: peerInstance,
ACLProvider: aclProvider,
}
chaincodeLauncher := &chaincode.RuntimeLauncher{
Metrics: chaincode.NewLaunchMetrics(opsSystem.Provider),
Registry: chaincodeHandlerRegistry,
Runtime: containerRuntime,
StartupTimeout: chaincodeConfig.StartupTimeout,
CertGenerator: authenticator,
CACert: ca.CertBytes(),
PeerAddress: ccEndpoint,
ConnectionHandler: &extcc.ExternalChaincodeRuntime{},
}
if !chaincodeConfig.TLSEnabled {
chaincodeLauncher.CertGenerator = nil
}
chaincodeSupport := &chaincode.ChaincodeSupport{
ACLProvider: aclProvider,
AppConfig: peerInstance,
DeployedCCInfoProvider: lifecycleValidatorCommitter,
ExecuteTimeout: chaincodeConfig.ExecuteTimeout,
InstallTimeout: chaincodeConfig.InstallTimeout,
HandlerRegistry: chaincodeHandlerRegistry,
HandlerMetrics: chaincode.NewHandlerMetrics(opsSystem.Provider),
Keepalive: chaincodeConfig.Keepalive,
Launcher: chaincodeLauncher,
Lifecycle: chaincodeEndorsementInfo,
Peer: peerInstance,
Runtime: containerRuntime,
BuiltinSCCs: builtinSCCs,
TotalQueryLimit: chaincodeConfig.TotalQueryLimit,
UserRunsCC: userRunsCC,
}
custodianLauncher := custodianLauncherAdapter{
launcher: chaincodeLauncher,
streamHandler: chaincodeSupport,
}
go chaincodeCustodian.Work(buildRegistry, containerRouter, custodianLauncher)
ccSupSrv := pb.ChaincodeSupportServer(chaincodeSupport)
if tlsEnabled {
ccSupSrv = authenticator.Wrap(ccSupSrv)
}
csccInst := cscc.New(
aclProvider,
lifecycleValidatorCommitter,
lsccInst,
lifecycleValidatorCommitter,
policyChecker,
peerInstance,
factory.GetDefault(),
)
qsccInst := scc.SelfDescribingSysCC(qscc.New(aclProvider, peerInstance))
pb.RegisterChaincodeSupportServer(ccSrv.Server(), ccSupSrv)
go ccSrv.Start()
logger.Debugf("Running peer")
libConf, err := library.LoadConfig()
if err != nil {
return errors.WithMessage(err, "could not decode peer handlers configuration")
}
reg := library.InitRegistry(libConf)
authFilters := reg.Lookup(library.Auth).([]authHandler.Filter)
endorserSupport := &endorser.SupportImpl{
SignerSerializer: signingIdentity,
Peer: peerInstance,
ChaincodeSupport: chaincodeSupport,
ACLProvider: aclProvider,
BuiltinSCCs: builtinSCCs,
}
endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory)
validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory)
signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport)
channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport)
pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName)
pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{
ChannelStateRetriever: channelStateRetriever,
TransientStoreRetriever: peerInstance,
PluginMapper: pluginMapper,
SigningIdentityFetcher: signingIdentityFetcher,
})
endorserSupport.PluginEndorser = pluginEndorser
channelFetcher := endorserChannelAdapter{
peer: peerInstance,
}
serverEndorser := &endorser.Endorser{
PrivateDataDistributor: gossipService,
ChannelFetcher: channelFetcher,
LocalMSP: localMSP,
Support: endorserSupport,
Metrics: endorser.NewMetrics(metricsProvider),
}
for _, cc := range []scc.SelfDescribingSysCC{lsccInst, csccInst, qsccInst, lifecycleSCC} {
if enabled, ok := chaincodeConfig.SCCAllowlist[cc.Name()]; !ok || !enabled {
logger.Infof("not deploying chaincode %s as it is not enabled", cc.Name())
continue
}
scc.DeploySysCC(cc, chaincodeSupport)
}
logger.Infof("Deployed system chaincodes")
legacyMetadataManager.AddListener(metadataManager)
metadataManager.AddListener(lifecycle.HandleMetadataUpdateFunc(func(channel string, chaincodes ccdef.MetadataSet) {
gossipService.UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChannelID(channel))
}))
peerInstance.Initialize(
func(cid string) {
lifecycleCache.InitializeMetadata(cid)
sub, err := legacyMetadataManager.NewChannelSubscription(cid, cclifecycle.QueryCreatorFunc(func() (cclifecycle.Query, error) {
return peerInstance.GetLedger(cid).NewQueryExecutor()
}))
if err != nil {
logger.Panicf("Failed subscribing to chaincode lifecycle updates")
}
cceventmgmt.GetMgr().Register(cid, sub)
},
peerServer,
plugin.MapBasedMapper(validationPluginsByName),
lifecycleValidatorCommitter,
lsccInst,
lifecycleValidatorCommitter,
coreConfig.ValidatorPoolSize,
)
if coreConfig.DiscoveryEnabled {
registerDiscoveryService(
coreConfig,
peerInstance,
peerServer,
policyMgr,
lifecycle.NewMetadataProvider(
lifecycleCache,
legacyMetadataManager,
peerInstance,
),
gossipService,
)
}
logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", coreConfig.PeerID, coreConfig.NetworkID, coreConfig.PeerAddress)
profileEnabled := coreConfig.ProfileEnabled
profileListenAddress := coreConfig.ProfileListenAddress
serve := make(chan error)
if profileEnabled {
go func() {
logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
logger.Errorf("Error starting profiler: %s", profileErr)
}
}()
}
handleSignals(addPlatformSignals(map[os.Signal]func(){
syscall.SIGINT: func() { containerRouter.Shutdown(5 * time.Second); serve <- nil },
syscall.SIGTERM: func() { containerRouter.Shutdown(5 * time.Second); serve <- nil },
}))
logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", coreConfig.PeerID, coreConfig.NetworkID, coreConfig.PeerAddress)
ledgerIDs, err := peerInstance.LedgerMgr.GetLedgerIDs()
logger.Info("HYS ledegerIDs=", ledgerIDs)
if err != nil {
return errors.WithMessage(err, "failed to get ledger IDs")
}
rootFSPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "ledgersData")
preResetHeights, err := kvledger.LoadPreResetHeight(rootFSPath, ledgerIDs)
if err != nil {
return fmt.Errorf("error loading prereset height: %s", err)
}
for cid, height := range preResetHeights {
logger.Infof("Ledger rebuild: channel [%s]: preresetHeight: [%d]", cid, height)
}
if len(preResetHeights) > 0 {
logger.Info("Ledger rebuild: Entering loop to check if current ledger heights surpass prereset ledger heights. Endorsement request processing will be disabled.")
resetFilter := &reset{
reject: true,
}
authFilters = append(authFilters, resetFilter)
go resetLoop(resetFilter, preResetHeights, ledgerIDs, peerInstance.GetLedger, 10*time.Second)
}
auth := authHandler.ChainFilters(serverEndorser, authFilters...)
pb.RegisterEndorserServer(peerServer.Server(), auth)
snapshotSvc := &snapshotgrpc.SnapshotService{LedgerGetter: peerInstance, ACLProvider: aclProvider}
pb.RegisterSnapshotServer(peerServer.Server(), snapshotSvc)
go func() {
var grpcErr error
if grpcErr = peerServer.Start(); grpcErr != nil {
grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
}
serve <- grpcErr
}()
return <-serve
}
库说明
- viper库是用来解析config文件的
- cobra库是用来进行rpc通信的
- ginkgo是一个用go写的BDD(Behavior Driven Development)的测试框架,一般用于Go服务的集成测试。
- healthz是hl自己的一个库,用来检查应用或服务的组件的健康状况。就是说可能有的服务不能用,检查一下。默认是30秒超时。
- Go kit,一套微服务工具集
- zap库,https://segmentfault.com/a/1190000022461706。这是一个uber开源的日志库。
文件夹说明
1.bccsp BCCSP类是区块链加密服务提供器的缩写,提供加密标准和一些算法。
参考
- 一个老版本的源码介绍
- zap库
- ginkgo
- healthz
- 学习go微服务 https://www.bilibili.com/video/BV1qs411u7ud?from=search&seid=11750324503139562819
|