这是DHT协议中路由表的实现,在DHT网络中,每个节点维护着一张路由表(table),表中储存着已获取的状态良好的节点(node)。路由表又被划分为多个区间桶(bucket),节点应该储存在这些桶中,空的表只有一个桶。当桶满时不能再插入该桶中,除非当前节点(自己)ID也在这个桶中,在这种情况下,原桶需分裂为两个相同大小的桶,旧桶中的节点重新分配到新的子桶中。具体细节可查阅DHT协议。
以下代码逻辑主要来源于QQ卖号平台,我只是改了两个bug。。。
#coding:utf-8 from?time?import?time from?hashlib?import?sha1 from?random?import?randint #生成一个20字节长度的node_id def?node_id(): ? ? ??hash?=?sha1()?? ? ? s?=?"".join(chr(randint(0,?255))?for?i?in?xrange(20)) ? ??hash.update(s)?? ? ??return?hash.digest()?? #返回Node_id的10进制长整型表示 node_id->16->10 def?int_ify(nid): ? ? ??assert?len(nid)?==?20 ? ??return?long(nid.encode('hex'),?16) #node 节点类 每个节点都有id、ip和端口属性 重新定义了节点的等于和不等于操作 class?KNode: ? ??def?__init__(self,?nid,?ip,?port): ? ? ? ??self.nid?=?nid ? ? ? ??self.ip?=?ip ? ? ? ??self.port?=?port ? ??def?__eq__(self,?other): ? ? ? ??return?self.nid?==?other.nid ? ??def?__ne__(self,?other): ? ? ? ??return?self.nid?!=?other.nid #桶满的异常类 class?BucketFull: ? ??pass #bucket 桶类 class?KBucket: ? ??def?__init__(self,?min,?max): ? ? ? ??self.min?=?min ? ? ? ??self.max?=?max ? ? ? ??self.nodes?=?[] ? ? ? ??self.lastTime?=?time()?#当前桶的最后更新时间 ? ??#检查一个node_id是否在桶的范围内 ? ??def?nid_in_range(self,?nid): ? ? ? ??return?self.min?<=?int_ify(nid)?<?self.max ? ??#向桶中添加节点 ? ??def?append(self,?node): ? ? ? ??if?len(node.nid)?!=?20:?return ? ? ? ??if?len(self.nodes)?<?8: ? ? ? ? ? ??if?node?in?self.nodes: ? ? ? ? ? ? ? ??self.nodes.remove(node) ? ? ? ? ? ? ? ??self.nodes.append(node) ? ? ? ? ? ??else: ? ? ? ? ? ? ? ??self.nodes.append(node) ? ? ? ? ? ??self.lastTime?=?time() ? ? ? ??else: ? ? ? ? ? ??raise?BucketFull #路由表类 class?KTable: ? ??def?__init__(self,?nid): ? ? ? ??self.nid?=?nid ? ? ? ??self.nodeTotal?=?0; ? ? ? ??self.buckets?=?[KBucket(0,?2?**?160)]?#路由表中所有的桶的列表 默认只有一个桶 ? ??#向路由表添加节点,即向表中某个桶中添加节点,桶满时要进行拆分 ? ??def?append(self,?node): ? ? ? ??if?node.nid?==?self.nid:?return ? ? ? ? index?=?self.bucket_index(node.nid) ? ? ? ? bucket?=?self.buckets[index] ? ? ? ??try: ? ? ? ? ? ? bucket.append(node) ? ? ? ? ? ??self.nodeTotal?=?self.nodeTotal+1 ? ? ? ??except?BucketFull: ? ? ? ? ? ??if?not?bucket.nid_in_range(self.nid):?return ? ? ? ? ? ??self.split_bucket(index) ? ? ? ? ? ??self.append(node) ? ??#返回待添加节点id应该在哪个桶的范围中 ? ??def?bucket_index(self,?nid): ? ? ? ??for?index,?bucket?in?enumerate(self.buckets): ? ? ? ? ? ??if?bucket.nid_in_range(nid): ? ? ? ? ? ? ? ??return?index ? ? ? ??return?index ? ??#拆分桶 ? ??def?split_bucket(self,?index): ? ? ? ? old?=?self.buckets[index] ? ? ? ? point?=?old.max?-?(old.max?- old.min)/2 ? ? ? ??new?=?KBucket(point,?old.max) ? ? ? ? old.max?=?point ? ? ? ??self.buckets.insert(index +?1,?new) ? ? ? ??for?node?in?old.nodes: ? ? ? ? ? ??if?new.nid_in_range(node.nid): ? ? ? ? ? ? ? ??new.append(node) ? ? ? ??for?node?in?new.nodes: ? ? ? ? ? ? old.nodes.remove(node) ? ??#返回离目标最近的8个node ? ??def?get_neighbor(self,?target): ? ? ? ? nodes?=?[] ? ? ? ??if?len(self.buckets)?==?0:?return?nodes ? ? ? ? index?=?self.bucket_index(target) ? ? ? ? nodes?=?self.buckets[index].nodes ? ? ? ??min?=?index -?1 ? ? ? ??max?=?index +?1 ? ? ? ??while?len(nodes)?<?K?and?(min?>=?0?or?max?<?len(self.buckets)): ? ? ? ? ? ??if?min?>=?0: ? ? ? ? ? ? ? ? nodes.extend(self.buckets[min].nodes) ? ? ? ? ? ??if?max?<?len(self.buckets): ? ? ? ? ? ? ? ? nodes.extend(self.buckets[max].nodes) ? ? ? ? ? ??min?-=?1 ? ? ? ? ? ??max?+=?1 ? ? ? ? num?=?int_ify(target) ? ? ? ? nodes.sort(lambda?a,?b,?num=num:?cmp(num^int_ify(a.nid),?num^int_ify(b.nid))) ? ? ? ??return?nodes[:8] ? ??#打印调试信息 ? ??def?print_info(self): ? ? ? ??print?'桶数量:'+str(len(self.buckets)) ? ? ? ??print?'节点量:'+str(self.nodeTotal) #Demo #实例化出路由表, 随机生成一千个node,放入表中并打印表的状态 routeTable?=?KTable(node_id()) for?i?in?xrange(0,1000): ? ? routeTable.append(KNode(node_id(),?'127.0.0.1',?'80012')) routeTable.print_info()
打印出的结果显示路由表中的桶和节点数量十分有限,说明有大量的节点已经被抛弃,原因在代码中的第67行,当待加入节点所需要加入的桶已满且自身id不在这个桶中时直接忽略。 由于这种路由表实现复杂、需要不停的ping检查每个节点是否有效且储存的节点数量有限。实际做DHT爬虫时可不实现,爬虫只需要不停的认识新的node,并获取资源infohash,所以直接通过向有效的node发送完find_node后即可删除该node,只需等待node发送的get_peers和announce_peer通知即可。
|