前言
在设计公链时,节点与节点之间建立连接需要 P2P 协议,从而实现数据的同步,于此同时上层应用还需要封装一些通信逻辑,比如节点之间的区块同步、交易数据同步等。
本篇文章将对 P2P 网络发展进行简单概述,同时将从源码角度对以太坊中的节点发现机制、分布式哈希表、节点查找、节点新增、节点移除等进行简单介绍,并对其 P2P 网络安全性设计进行简要分析。
基础知识
P2P网络
P2P 网络不同于传统的 CS 结构,在 P2P 网络中每个节点既可以是客户端也可以是服务端,节点之间的通信协议一般直接通过 Socket 实现。
P2P 技术发展至今经历了以下四个发展阶段:
一.集中式: 是 P2P 网络模式中最简单的路由方式,即存在一个中心节点,它保存了其他所有节点的索引信息,索引信息一般包括节点 IP、端口、节点资源等,集中式路由的优点是结构简单、实现容易,但缺点也很明显,由于中心节点需要存储所有节点的路由信息,当节点规模不断扩展时,就很容易出现性能瓶颈,而且也存在单节点故障问题
二.分布式: 是指移除了中心节点,在 P2P 节点之间建立随机网络,在新加入节点与 P2P 网络中的某个节点之间随机建立连接通道,从而形成一个随机拓扑结构,新节点加入该网络时随机选择一个已经存在的节点并建立邻居关系,在节点与邻居节点建立连接后,还需要进行全网广播,让整个网络知道该节点的存在
具体的广播步骤是:该节点首先向邻居节点广播,邻居节点收到广播消息后,在继续向自己的邻居节点广播,以此类推,这种广播方式也被称之为"泛洪机制",而泛洪机制的问题在于可控性差,其主要包括两个较大的问题:一个是容易形成泛洪循环,比如节点 A 给发出的消息结果节点 B 到节点 C,节点 C 再广播到节点 A,这就形成了一个循环,另一个问题是消息响应分包,比如节点 A 想要请求的资源被很多节点所拥有,那么在短时间内,会出现大量节点同时向 A 节点发送响应消息,这就很可能让节点A瞬间奔溃
而消除泛洪循环的方法可以借鉴 IP 网络路由协议中有关泛洪广播的控制,一种方法是对每一个查询消息设置TTL值,泛洪消息每被转发一次,TTL 值减 1,当节点接受的 TTL 为 0 时,不再转发消息,这样可以避免查询消息在网络中产生死循环,还可以为泛洪消息设置唯一的标志,对接收到的重复消息不再进行转发从而规避死循环,解决响应风暴的方法可以在数据链路层进行网络分段,减少消息跨段广播
三.混合式: 混合式其实就是混合集中式和分布式结构,网络中存在很多超级节点组成的分布式网络,而每个超级节点有多个普通节点与它组成局部集中网络,一个新的普通节点加入是可以先选择一个超级节点进行通信,该超级节点再推送其他超级节点列表给新加入节点,加入节点根据列表中的超级节点状态决定选择那个具体的超级节点作为父节点,这种结构的泛洪广播只是发生在超级节点之间,因此可以避免大规模泛洪问题,在实际应用中,混合式结构是相对灵活且比较有效的组网架构,实现难度也相对较小,因此目前较多系统基于混合式结构进行开发实现
四.结构化: 结构化 P2P 网络是一种分布式网络结构,与上面所讲的分布式结构不同,分布式网络就是一个随机网络,而结构化网络则将所有节点按照某种结构进行有序组织,比如形成一个环状网络或树状网络,结构化网络在具体实现上普遍基于分布式哈希表 (Distributed Hash Table,DHI) 算法,具体的实现方案有 Chord、Pasty、CAN、Kademlia 等算法
四种网络结构对比如下:
节点发现
节点发现是任何节点接入 P2P 网络的第一步,节点发现可以分为两种:
-初始节点发现: 指节点是一个全新的、从未运行的节点,该节点没有网络中的其他节点的任何数据,此时节点发现只能依靠节点中的硬编码的种子节点获得 P2P 网络的信息
-已知节点发现: 节点之前运行过,节点数据库中保存着网络中的其他节点信息,此时节点发现可以依靠节点数据库汇总的节点获取 P2P 网络的信息,从而构建自己的网络拓扑
种子节点
在 P2P 网络中,初始节点在启动时会通过一些长期稳定运行的节点快速发现网络中的其他节点,这些节点被称为"种子节点"(一般代码中会硬编码种子节点信息),一般情况下种子节点可以分为两种:
DNS-Seed: 也被称之为"DNS种子节点",DNS 是互联网提供的一种域名查询服务,它将域名和 IP 地址相互映射保存在一个分布式的数据库中,当我们访问 DNS 服务器时,给它提供一个域名,DNS 服务器会将该域名对应的 IP 地址返回
IP-Seed: 即将种子节点的 IP 地址硬编码到代码中去,硬编码的这些节点的地址被称为种子节点
KDA算法
Kademlia 是一种分布式哈希表(DHT)技术,与其他 DHT 技术相比,KDA 算法使用异或算法计算节点之间的距离,进而建立了全新的 DHT 拓扑结构,这种算法可以极大地提高路由的查询速度。
HashTable
哈希表是用于存储键值对的一种容器,键值对有被称为 Key/Value 对,哈希表数据结构中包含 N 个 bucket(桶),对于某个具体的哈希表,N (桶的数量)通常是固定不变的,于是可以对每个桶编号,0~N-1,桶是用来存储键值对的,可以简单的将其理解为一个动态数组,里面存放多个键值对。
下图展示了哈希表的查找原理,我们可以方便快速地通过 Key 来获取 value,当使用某个 key 进行查找时,先用某个哈希函数计算这个 key 的哈希值,得到的哈希值通常是一个整数,之后使用哈希值对N(桶数)进行取模运算(除法求余数),就可以算出对应的桶编号
HashCollision
说到哈希表不得不提一下哈希表碰撞,当两个不同的 Key 进行哈希计算得到相同的哈希值时,就是所谓的哈希函数碰撞,一旦出现这种情况,这两个 key 对应的两个键值对就会被存在在同一个桶中 (bucket) 中,另一中散列碰撞是虽然计算出来的哈希值不同,但经过取模运算之后得到相同的桶编号,这时候也会将两个键值对存储在一个桶中,哈希碰撞原理如下图所示:
如果某个哈希表在存储数据时完全没有碰撞,那么每个桶里都只有 0 个或 1 个键值对,这样查找起来就非常快,反之,如果某个哈希表在存储数据时出现严重碰撞,那么就会导致某些桶里存储了很多键值对,那么在查找 key 的时候需要在这个桶里面逐一对比 key 是否相同,查找效率会变得很低~
分布式哈希表
分布式哈希表在概念上类似于传统的哈希表,差异在于传统的哈希表主要用于单机上的某个软件中,分布式哈希表主要用于分布式系统(此时,分布式系统的节点可以通俗的理解为 hash 表中的 bucket),分布式哈希表主要用于存储大量(甚至海量)的数据,分布式哈希表的原理如下图所示:
源码分析
以太坊底层的 P2PServer 大致可以分为以下三层: -顶层: 以太坊中各个协议的具体实现 -中层: 以太坊中的 p2p 通信链路层,负责启动监听、处理新加入连接或维护连接 -底层: 以太坊中的数据通信网络 IO 层,主要负责路由表的管理以及数据库的读写操作
表的结构
表数据结构如下所示:
const (
alpha = 3
bucketSize = 16
maxReplacements = 10
hashBits = len(common.Hash{}) * 8
nBuckets = hashBits / 15
bucketMinDistance = hashBits - nBuckets
bucketIPLimit, bucketSubnet = 2, 24
tableIPLimit, tableSubnet = 10, 24
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
seedMinTableTime = 5 * time.Minute
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
)
type Table struct {
mutex sync.Mutex
buckets [nBuckets]*bucket
nursery []*node
rand *mrand.Rand
ips netutil.DistinctNetSet
log log.Logger
db *enode.DB
net transport
refreshReq chan chan struct{}
initDone chan struct{}
closeReq chan struct{}
closed chan struct{}
nodeAddedHook func(*node)
}
type bucket struct {
entries []*node
replacements []*node
ips netutil.DistinctNetSet
}
关键的几个变量:
- buckets:K桶,每个K桶包含节点(依据最近活跃情况进行降序排列),用于按距离列出已知节点索引
- nursery:种子节点,一个节点启动的时候最多能够链接35个种子节点,其中有五个是以太坊官方指定的,另外30个从数据库里面提取
- db:用于存储P2P节点的数据库(以太坊中有两个,另一个用于存储链上数据)
- refreshReq:刷新K-桶事件的管道
表的创建
newTable函数用于创建新的表:
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
tab := &Table{
net: t,
db: db,
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
log: log,
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
tab.seedRand()
tab.loadSeedNodes()
return tab, nil
}
在上述代码中首先使用传入的参数初始化了一个Table的对象tab,调用setFallbackNodes函数设置初始链接节点(即获得5个nursey节点,后面如果table为空或者数据库中没有节点信息时这些节点将被用于去链接网络),之后通过一个for循环结合函数ValidateComplete来验证节点是否有效
之后初始化K桶:
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
之后从table.buckets中随机取30个节点加载种子节点到相应的bucket:
tab.seedRand()
tab.loadSeedNodes()
return tab, nil
loadSeedNodes函数的具体实现如下所示(这里的seedCount为table.go中最上方定义的全局变量,值为30):
func (tab *Table) loadSeedNodes() {
seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.addSeenNode(seed)
}
}
这里的addSeenNode即用于添加节点到bucket,在这里会检查要添加的节点是否已经存在以及bucket是否已满,如果已满则调用tab.addReplacement(b, n)将节点添加到replacement列表中去,之后添加IP,之后更新bucket:
func (tab *Table) addSeenNode(n *node) {
if n.ID() == tab.self().ID() {
return
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
if contains(b.entries, n.ID()) {
return
}
if len(b.entries) >= bucketSize {
tab.addReplacement(b, n)
return
}
if !tab.addIP(b, n.IP()) {
return
}
b.entries = append(b.entries, n)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
}
事件监听
loop函数是table.go中的主循环,在函数开头出定义了后续会使用到的局部变量,之后通过deRefresh进行刷新桶操作,在这里的loop循环会每隔30分钟自动刷新一次K桶,每隔10秒钟验证K桶节点是否可以ping通,每30秒将K桶中存在超过5分钟的节点存储本地数据库,视作稳定节点:
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
refreshDone = make(chan struct{})
revalidateDone chan struct{}
waiting = []chan struct{}{tab.initDone}
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
go tab.doRefresh(refreshDone)
loop:
for {
select {
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case <-refreshDone:
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
case <-revalidate.C:
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
revalidateDone = nil
case <-copyNodes.C:
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
}
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
if revalidateDone != nil {
<-revalidateDone
}
close(tab.closed)
}
节点查找
getNode函数用于根据ID来查找节点,如果不存在则返回nil:
func (tab *Table) getNode(id enode.ID) *enode.Node {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(id)
for _, e := range b.entries {
if e.ID() == id {
return unwrapNode(e)
}
}
return nil
}
节点发现
以太坊分布式网络采用了结构化网络模型,其实现方案使用Kademlia协议,下面我们对节点发现进行简单介绍,在以太坊中k值是16,也就是说每个k桶包含16个节点,一共256个k桶,K桶中记录节点的NodeId,Distance,Endpoint,IP等信息,并按照与Target节点的距离排序,节点查找由doRefresh()实现:
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
tab.loadSeedNodes()
tab.net.lookupSelf()
for i := 0; i < 3; i++ {
tab.net.lookupRandom()
}
}
从上述代码中可以看到这里首先调用tab.loadSeedNodes()从数据库中加载节点并将其插入到表中去:
func (tab *Table) loadSeedNodes() {
seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.addSeenNode(seed)
}
}
func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
var (
now = time.Now()
nodes = make([]*Node, 0, n)
it = db.lvl.NewIterator(nil, nil)
id ID
)
defer it.Release()
seek:
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
ctr := id[0]
rand.Read(id[:])
id[0] = ctr + id[0]%16
it.Seek(nodeKey(id))
n := nextNode(it)
if n == nil {
id[0] = 0
continue seek
}
if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
continue seek
}
for i := range nodes {
if nodes[i].ID() == n.ID() {
continue seek
}
}
nodes = append(nodes, n)
}
return nodes
}
之后通过lookupSelf来发现新的节点,这里会优先使用当前节点的ID来运行newLookup发现邻居节点:
func (t *UDPv5) lookupSelf() []*enode.Node {
return t.newLookup(t.closeCtx, t.Self().ID()).run()
}
func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {
return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
return t.lookupWorker(n, target)
})
}
func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *lookup {
it := &lookup{
tab: tab,
queryfunc: q,
asked: make(map[enode.ID]bool),
seen: make(map[enode.ID]bool),
result: nodesByDistance{target: target},
replyCh: make(chan []*node, alpha),
cancelCh: ctx.Done(),
queries: -1,
}
it.asked[tab.self().ID()] = true
return it
}
最后随机一个target,进行lookup:
func (t *UDPv5) lookupRandom() []*enode.Node {
return t.newRandomLookup(t.closeCtx).run()
}
func (t *UDPv5) newRandomLookup(ctx context.Context) *lookup {
var target enode.ID
crand.Read(target[:])
return t.newLookup(ctx, target)
}
func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {
return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
return t.lookupWorker(n, target)
})
}
func (t *UDPv5) lookupWorker(destNode *node, target enode.ID) ([]*node, error) {
var (
dists = lookupDistances(target, destNode.ID())
nodes = nodesByDistance{target: target}
err error
)
var r []*enode.Node
r, err = t.findnode(unwrapNode(destNode), dists)
if err == errClosed {
return nil, err
}
for _, n := range r {
if n.ID() != t.Self().ID() {
nodes.push(wrapNode(n), findnodeResultLimit)
}
}
return nodes.entries, err
}
func (t *UDPv5) findnode(n *enode.Node, distances []uint) ([]*enode.Node, error) {
resp := t.call(n, v5wire.NodesMsg, &v5wire.Findnode{Distances: distances})
return t.waitForNodes(resp, distances)
}
func (t *UDPv5) call(node *enode.Node, responseType byte, packet v5wire.Packet) *callV5 {
c := &callV5{
node: node,
packet: packet,
responseType: responseType,
reqid: make([]byte, 8),
ch: make(chan v5wire.Packet, 1),
err: make(chan error, 1),
}
crand.Read(c.reqid)
packet.SetRequestID(c.reqid)
select {
case t.callCh <- c:
case <-t.closeCtx.Done():
c.err <- errClosed
}
return c
}
服务结构
server端的的数据结构如下所示:
type Server struct {
Config
newTransport func(net.Conn, *ecdsa.PublicKey) transport
newPeerHook func(*Peer)
listenFunc func(network, addr string) (net.Listener, error)
lock sync.Mutex
running bool
listener net.Listener
ourHandshake *protoHandshake
loopWG sync.WaitGroup
peerFeed event.Feed
log log.Logger
nodedb *enode.DB
localnode *enode.LocalNode
ntab *discover.UDPv4
DiscV5 *discover.UDPv5
discmix *enode.FairMix
dialsched *dialScheduler
quit chan struct{}
addtrusted chan *enode.Node
removetrusted chan *enode.Node
peerOp chan peerOpFunc
peerOpDone chan struct{}
delpeer chan peerDrop
checkpointPostHandshake chan *conn
checkpointAddPeer chan *conn
inboundHistory expHeap
}
Server配置(本地节点秘钥、拨号比率、节点最大链接数、拨号比率、事件记录等):
type Config struct {
PrivateKey *ecdsa.PrivateKey `toml:"-"`
MaxPeers int
MaxPendingPeers int `toml:",omitempty"`
DialRatio int `toml:",omitempty"`
NoDiscovery bool
DiscoveryV5 bool `toml:",omitempty"`
Name string `toml:"-"`
BootstrapNodes []*enode.Node
BootstrapNodesV5 []*enode.Node `toml:",omitempty"`
StaticNodes []*enode.Node
TrustedNodes []*enode.Node
NetRestrict *netutil.Netlist `toml:",omitempty"`
NodeDatabase string `toml:",omitempty"`
Protocols []Protocol `toml:"-"`
ListenAddr string
NAT nat.Interface `toml:",omitempty"`
Dialer NodeDialer `toml:"-"`
NoDial bool `toml:",omitempty"`
EnableMsgEvents bool
Logger log.Logger `toml:",omitempty"`
clock mclock.Clock
}
新增节点
AddPeer函数用于新增一个给定的节点,其实现代码如下所示:
func (srv *Server) AddPeer(node *enode.Node) {
srv.dialsched.addStatic(node)
}
func (d *dialScheduler) addStatic(n *enode.Node) {
select {
case d.addStaticCh <- n:
case <-d.ctx.Done():
}
}
AddTrustedPeer函数用于新增一个可信任节点:
func (srv *Server) AddTrustedPeer(node *enode.Node) {
select {
case srv.addtrusted <- node:
case <-srv.quit:
}
}
移除节点
RemovePeer函数用于移除节点并断开与节点之间的连接:
func (srv *Server) RemovePeer(node *enode.Node) {
var (
ch chan *PeerEvent
sub event.Subscription
)
srv.doPeerOp(func(peers map[enode.ID]*Peer) {
srv.dialsched.removeStatic(node)
if peer := peers[node.ID()]; peer != nil {
ch = make(chan *PeerEvent, 1)
sub = srv.peerFeed.Subscribe(ch)
peer.Disconnect(DiscRequested)
}
})
if ch != nil {
defer sub.Unsubscribe()
for ev := range ch {
if ev.Peer == node.ID() && ev.Type == PeerEventTypeDrop {
return
}
}
}
}
func (d *dialScheduler) removeStatic(n *enode.Node) {
select {
case d.remStaticCh <- n:
case <-d.ctx.Done():
}
}
RemoveTrustedPeer函数用于移除一个可信任节点:
func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
select {
case srv.removetrusted <- node:
case <-srv.quit:
}
}
终止服务
Stop函数用于终止节点运行,具体代码如下所示:
func (srv *Server) Stop() {
srv.lock.Lock()
if !srv.running {
srv.lock.Unlock()
return
}
srv.running = false
if srv.listener != nil {
srv.listener.Close()
}
close(srv.quit)
srv.lock.Unlock()
srv.loopWG.Wait()
}
服务启动
位于go-ethereum-1.10.2\p2p\server.go中的start函数用于启动一个P2P节点:
func (srv *Server) Start() (err error) {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.Root()
}
if srv.clock == nil {
srv.clock = mclock.System{}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
if srv.PrivateKey == nil {
return errors.New("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.listenFunc == nil {
srv.listenFunc = net.Listen
}
srv.quit = make(chan struct{})
srv.delpeer = make(chan peerDrop)
srv.checkpointPostHandshake = make(chan *conn)
srv.checkpointAddPeer = make(chan *conn)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
if err := srv.setupLocalNode(); err != nil {
return err
}
if srv.ListenAddr != "" {
if err := srv.setupListening(); err != nil {
return err
}
}
if err := srv.setupDiscovery(); err != nil {
return err
}
srv.setupDialScheduler()
srv.loopWG.Add(1)
go srv.run()
return nil
}
在这里首先检查当前节点是否处于运行状态,如果是则直接返回并给出错误提示信息,如果不是则将srv.running设置为true,之后进入服务启动流程,之后检查log是否开启等,之后初始化配置P2P服务信息:
func (srv *Server) Start() (err error) {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.Root()
}
if srv.clock == nil {
srv.clock = mclock.System{}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
if srv.PrivateKey == nil {
return errors.New("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.listenFunc == nil {
srv.listenFunc = net.Listen
}
srv.quit = make(chan struct{})
srv.delpeer = make(chan peerDrop)
srv.checkpointPostHandshake = make(chan *conn)
srv.checkpointAddPeer = make(chan *conn)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
之后调用setupLocalNode来启动一个本地节点,并建立本地监听,然后配置一个DiscoveryV5网络协议,生成节点路由表。
之后调用setupDialScheduler启动主动拨号连接过程,然后开一个协程,在其中做peer的维护:
srv.setupDialScheduler()
srv.loopWG.Add(1)
go srv.run()
return nil
}
setupDialScheduler代码如下所示,这里通过newDialScheduler来建立连接,参数discmix确定了进行主动建立连接时的节点集,它是一个迭代器 ,同时将setupConn连接建立函数传入:
func (srv *Server) setupDialScheduler() {
config := dialConfig{
self: srv.localnode.ID(),
maxDialPeers: srv.maxDialedConns(),
maxActiveDials: srv.MaxPendingPeers,
log: srv.Logger,
netRestrict: srv.NetRestrict,
dialer: srv.Dialer,
clock: srv.clock,
}
if srv.ntab != nil {
config.resolver = srv.ntab
}
if config.dialer == nil {
config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
for _, n := range srv.StaticNodes {
srv.dialsched.addStatic(n)
}
}
newDialScheduler函数如下所示,在这里通过d.readNodes(it)从迭代器中取得节点,之后通过通道传入d.loop(it)中进行连接:
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
d := &dialScheduler{
dialConfig: config.withDefaults(),
setupFunc: setupFunc,
dialing: make(map[enode.ID]*dialTask),
static: make(map[enode.ID]*dialTask),
peers: make(map[enode.ID]connFlag),
doneCh: make(chan *dialTask),
nodesIn: make(chan *enode.Node),
addStaticCh: make(chan *enode.Node),
remStaticCh: make(chan *enode.Node),
addPeerCh: make(chan *conn),
remPeerCh: make(chan *conn),
}
d.lastStatsLog = d.clock.Now()
d.ctx, d.cancel = context.WithCancel(context.Background())
d.wg.Add(2)
go d.readNodes(it)
go d.loop(it)
return d
}
服务监听
在上面的服务启动过程中有一个setupListening函数,该函数用于监听事件,具体代码如下所示:
func (srv *Server) setupListening() error {
listener, err := srv.listenFunc("tcp", srv.ListenAddr)
if err != nil {
return err
}
srv.listener = listener
srv.ListenAddr = listener.Addr().String()
if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
srv.localnode.Set(enr.TCP(tcp.Port))
if !tcp.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
}
srv.loopWG.Add(1)
go srv.listenLoop()
return nil
}
在上述代码中又调用了一个srv.listenLoop(),该函数是一个死循环的goroutine,它会监听端口并接收外部的请求:
func (srv *Server) listenLoop() {
srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
defer srv.loopWG.Done()
defer func() {
for i := 0; i < cap(slots); i++ {
<-slots
}
}()
for {
<-slots
var (
fd net.Conn
err error
lastLog time.Time
)
for {
fd, err = srv.listener.Accept()
if netutil.IsTemporaryError(err) {
if time.Since(lastLog) > 1*time.Second {
srv.log.Debug("Temporary read error", "err", err)
lastLog = time.Now()
}
time.Sleep(time.Millisecond * 200)
continue
} else if err != nil {
srv.log.Debug("Read error", "err", err)
slots <- struct{}{}
return
}
break
}
remoteIP := netutil.AddrIP(fd.RemoteAddr())
if err := srv.checkInboundConn(remoteIP); err != nil {
srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
fd.Close()
slots <- struct{}{}
continue
}
if remoteIP != nil {
var addr *net.TCPAddr
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
addr = tcp
}
fd = newMeteredConn(fd, true, addr)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
}
go func() {
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
这里的SetupConn主要执行执行握手协议,并尝试把链接创建为一个peer对象:
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
c := &conn{fd: fd, flags: flags, cont: make(chan error)}
if dialDest == nil {
c.transport = srv.newTransport(fd, nil)
} else {
c.transport = srv.newTransport(fd, dialDest.Pubkey())
}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
}
return err
}
在上述代码中又去调用了srv.setupConn(c, flags, dialDest)函数,该函数用于执行握手协议:
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
if !running {
return errServerStopped
}
var dialPubkey *ecdsa.PublicKey
if dialDest != nil {
dialPubkey = new(ecdsa.PublicKey)
if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
err = errors.New("dial destination doesn't have a secp256k1 public key")
srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
}
remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
if err != nil {
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
if dialDest != nil {
c.node = dialDest
} else {
c.node = nodeFromConn(remotePubkey, c.fd)
}
clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
err = srv.checkpoint(c, srv.checkpointPostHandshake)
if err != nil {
clog.Trace("Rejected peer", "err", err)
return err
}
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
clog.Trace("Failed p2p handshake", "err", err)
return err
}
if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
return DiscUnexpectedIdentity
}
c.caps, c.name = phs.Caps, phs.Name
err = srv.checkpoint(c, srv.checkpointAddPeer)
if err != nil {
clog.Trace("Rejected peer", "err", err)
return err
}
return nil
}
秘钥握手通过deEncHandshake函数实现,在函数之中调用了Handshake()函数:
func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
t.conn.SetDeadline(time.Now().Add(handshakeTimeout))
return t.conn.Handshake(prv)
}
Handshake代码如下所示,在这里会根据是主动握手还是被动握手来进行执行对应的握手逻辑:
func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
var (
sec Secrets
err error
)
if c.dialDest != nil {
sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest)
} else {
sec, err = receiverEncHandshake(c.conn, prv)
}
if err != nil {
return nil, err
}
c.InitWithSecrets(sec)
return sec.remote, err
}
主动发起握手过程过程如下,在这里会调用makeAuthMsg来生成Auth身份信息,包含签名,随机nonce生成的与签名对应的公钥和版本号,之后调用sealEIP8方法进行rlpx编码,之后发起加密握手,之后接收返回的authResp消息,并验证解密,获取对方公钥,之后生成AES,MAC:
func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {
h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}
authMsg, err := h.makeAuthMsg(prv)
if err != nil {
return s, err
}
authPacket, err := sealEIP8(authMsg, h)
if err != nil {
return s, err
}
if _, err = conn.Write(authPacket); err != nil {
return s, err
}
authRespMsg := new(authRespV4)
authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
if err != nil {
return s, err
}
if err := h.handleAuthResp(authRespMsg); err != nil {
return s, err
}
return h.secrets(authPacket, authRespPacket)
}
receiverEncHandshake如下所示,和initiatorEncHandshake相差无几:
func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {
authMsg := new(authMsgV4)
authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
if err != nil {
return s, err
}
h := new(encHandshake)
if err := h.handleAuthMsg(authMsg, prv); err != nil {
return s, err
}
authRespMsg, err := h.makeAuthResp()
if err != nil {
return s, err
}
var authRespPacket []byte
if authMsg.gotPlain {
authRespPacket, err = authRespMsg.sealPlain(h)
} else {
authRespPacket, err = sealEIP8(authRespMsg, h)
}
if err != nil {
return s, err
}
if _, err = conn.Write(authRespPacket); err != nil {
return s, err
}
return h.secrets(authPacket, authRespPacket)
}
之后通过doProtoHandshake来完成协议握手操作,在这里调用send发送一次握手操作,之后通过readProtocolHandshake来读取返回信息,之后进行检查:
func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
werr := make(chan error, 1)
go func() { werr <- Send(t, handshakeMsg, our) }()
if their, err = readProtocolHandshake(t); err != nil {
<-werr
return nil, err
}
if err := <-werr; err != nil {
return nil, fmt.Errorf("write error: %v", err)
}
t.conn.SetSnappy(their.Version >= snappyProtocolVersion)
return their, nil
}
服务循环
run函数是服务的主循环,监听服务器终止、增加信任节点、移除信任节点、增加检查节点等:
func (srv *Server) run() {
srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4())
defer srv.loopWG.Done()
defer srv.nodedb.Close()
defer srv.discmix.Close()
defer srv.dialsched.stop()
var (
peers = make(map[enode.ID]*Peer)
inboundCount = 0
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
)
for _, n := range srv.TrustedNodes {
trusted[n.ID()] = true
}
running:
for {
select {
case <-srv.quit:
break running
case n := <-srv.addtrusted:
srv.log.Trace("Adding trusted node", "node", n)
trusted[n.ID()] = true
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, true)
}
case n := <-srv.removetrusted:
srv.log.Trace("Removing trusted node", "node", n)
delete(trusted, n.ID())
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false)
}
case op := <-srv.peerOp:
op(peers)
srv.peerOpDone <- struct{}{}
case c := <-srv.checkpointPostHandshake:
if trusted[c.node.ID()] {
c.flags |= trustedConn
}
c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
case c := <-srv.checkpointAddPeer:
err := srv.addPeerChecks(peers, inboundCount, c)
if err == nil {
p := srv.launchPeer(c)
peers[c.node.ID()] = p
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
srv.dialsched.peerAdded(c)
if p.Inbound() {
inboundCount++
}
}
c.cont <- err
case pd := <-srv.delpeer:
d := common.PrettyDuration(mclock.Now() - pd.created)
delete(peers, pd.ID())
srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
srv.dialsched.peerRemoved(pd.rw)
if pd.Inbound() {
inboundCount--
}
}
}
srv.log.Trace("P2P networking is spinning down")
if srv.ntab != nil {
srv.ntab.Close()
}
if srv.DiscV5 != nil {
srv.DiscV5.Close()
}
for _, p := range peers {
p.Disconnect(DiscQuitting)
}
for len(peers) > 0 {
p := <-srv.delpeer
p.log.Trace("<-delpeer (spindown)")
delete(peers, p.ID())
}
}
节点信息
NodeInfo用于查看节点信息,PeersInfo用于查看连接的节点信息:
func (srv *Server) NodeInfo() *NodeInfo {
node := srv.Self()
info := &NodeInfo{
Name: srv.Name,
Enode: node.URLv4(),
ID: node.ID().String(),
IP: node.IP().String(),
ListenAddr: srv.ListenAddr,
Protocols: make(map[string]interface{}),
}
info.Ports.Discovery = node.UDP()
info.Ports.Listener = node.TCP()
info.ENR = node.String()
for _, proto := range srv.Protocols {
if _, ok := info.Protocols[proto.Name]; !ok {
nodeInfo := interface{}("unknown")
if query := proto.NodeInfo; query != nil {
nodeInfo = proto.NodeInfo()
}
info.Protocols[proto.Name] = nodeInfo
}
}
return info
}
func (srv *Server) PeersInfo() []*PeerInfo {
infos := make([]*PeerInfo, 0, srv.PeerCount())
for _, peer := range srv.Peers() {
if peer != nil {
infos = append(infos, peer.Info())
}
}
for i := 0; i < len(infos); i++ {
for j := i + 1; j < len(infos); j++ {
if infos[i].ID > infos[j].ID {
infos[i], infos[j] = infos[j], infos[i]
}
}
}
return infos
}
请求处理
下面为peer.run函数的代码:
func (p *Peer) run() (remoteRequested bool, err error) {
var (
writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1)
readErr = make(chan error, 1)
reason DiscReason
)
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr)
loop:
for {
select {
case err = <-writeErr:
if err != nil {
reason = DiscNetworkError
break loop
}
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
} else {
reason = DiscNetworkError
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
reason = discReasonForError(err)
break loop
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
}
从上述代码中可以看到函数的开头首先定义了一些局部变量,之后启用了两个协程,一个是readLoop,它通过调用ReadMsg()读取msg,之后又通过调用peer.handle(msg)来处理msg
如果msg是pingMsg,则发送一个pong回应,如果msg与下述特殊情况不相匹配则将msg交给proto.in通道,等待protocolManager.handleMsg()从通道中取出。另一个协程是pingLoop,它主要通过调用SendItems(p.rw, pingMsg)来发起ping请求
之后调用starProtocols()函数让协议运行起来:
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running))
for _, proto := range p.running {
proto := proto
proto.closed = p.closed
proto.wstart = writeStart
proto.werr = writeErr
var rw MsgReadWriter = proto
if p.events != nil {
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name, p.Info().Network.RemoteAddress, p.Info().Network.LocalAddress)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func() {
defer p.wg.Done()
err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
} else if err != io.EOF {
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
}
p.protoErr <- err
}()
}
}
最后通过一个loop循环来处理错误或者断开连接等操作:
loop:
for {
select {
case err = <-writeErr:
if err != nil {
reason = DiscNetworkError
break loop
}
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
} else {
reason = DiscNetworkError
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
reason = discReasonForError(err)
break loop
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
创数据库
newPersistentDB函数用于创建一个持久化的数据库用于存储节点信息:
func newPersistentDB(path string) (*DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
blob, err := db.Get([]byte(dbVersionKey), nil)
switch err {
case leveldb.ErrNotFound:
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
db.Close()
return nil, err
}
case nil:
if !bytes.Equal(blob, currentVer) {
db.Close()
if err = os.RemoveAll(path); err != nil {
return nil, err
}
return newPersistentDB(path)
}
}
return &DB{lvl: db, quit: make(chan struct{})}, nil
}
节点超时
ensureExpirer函数用于检查节点是否超时,具体实现代码如下所示:
func (db *DB) ensureExpirer() {
db.runner.Do(func() { go db.expirer() })
}
func (db *DB) expirer() {
tick := time.NewTicker(dbCleanupCycle)
defer tick.Stop()
for {
select {
case <-tick.C:
db.expireNodes()
case <-db.quit:
return
}
}
}
func (db *DB) expireNodes() {
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
defer it.Release()
if !it.Next() {
return
}
var (
threshold = time.Now().Add(-dbNodeExpiration).Unix()
youngestPong int64
atEnd = false
)
for !atEnd {
id, ip, field := splitNodeItemKey(it.Key())
if field == dbNodePong {
time, _ := binary.Varint(it.Value())
if time > youngestPong {
youngestPong = time
}
if time < threshold {
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
}
}
atEnd = !it.Next()
nextID, _ := splitNodeKey(it.Key())
if atEnd || nextID != id {
if youngestPong > 0 && youngestPong < threshold {
deleteRange(db.lvl, nodeKey(id))
}
youngestPong = 0
}
}
}
状态更新
下面是一些状态更新函数:
func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {
if ip = ip.To16(); ip == nil {
return time.Time{}
}
return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0)
}
func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix())
}
func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {
if ip = ip.To16(); ip == nil {
return time.Time{}
}
db.ensureExpirer()
return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0)
}
func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
}
func (db *DB) FindFails(id ID, ip net.IP) int {
if ip = ip.To16(); ip == nil {
return 0
}
return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails)))
}
func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
}
func (db *DB) FindFailsV5(id ID, ip net.IP) int {
if ip = ip.To16(); ip == nil {
return 0
}
return int(db.fetchInt64(v5Key(id, ip, dbNodeFindFails)))
}
func (db *DB) UpdateFindFailsV5(id ID, ip net.IP, fails int) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails))
}
节点挑选
QuerySeeds函数用于从数据库里面随机挑选合适种子节点:
func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
var (
now = time.Now()
nodes = make([]*Node, 0, n)
it = db.lvl.NewIterator(nil, nil)
id ID
)
defer it.Release()
seek:
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
ctr := id[0]
rand.Read(id[:])
id[0] = ctr + id[0]%16
it.Seek(nodeKey(id))
n := nextNode(it)
if n == nil {
id[0] = 0
continue seek
}
if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
continue seek
}
for i := range nodes {
if nodes[i].ID() == n.ID() {
continue seek
}
}
nodes = append(nodes, n)
}
return nodes
}
func nextNode(it iterator.Iterator) *Node {
for end := false; !end; end = !it.Next() {
id, rest := splitNodeKey(it.Key())
if string(rest) != dbDiscoverRoot {
continue
}
return mustDecodeNode(id[:], it.Value())
}
return nil
}
文末小结
P2P网络是区块链分布式网络结构的基础,本篇文章详细介绍了P2P网络的基本原理,包括节点发现机制、分布式哈希表、节点查找、节点新增、节点移除、请求处理等,同时从源码角度对以太坊源码中P2P网络的实现做了较为细致的分析,探索了以太坊P2P网络的工作流程以以及安全设计,而公链安全体系的建设依旧是长路漫漫,有待进一步深入探索
|