第八章 分布式计算框架
当使用zmq和其他技术构建产品时,会面临这些产品在现实世界中工作的问题。“现实世界”正在变成一个越来越多的移动部件的世界。我们的全球计算能力每两年就会翻一番。 本章尝试构建一个分布式应用程序的框架,包括API,协议和实现。 介绍如下内容: ? 分布式计算的需求 ? 无线网络在近距离网络中的利弊 ? 使用UDP和TCP探测 ? 基于消息的API ? 创建一个新的开源项目 ? 对等网络连接(和谐模式) ? 跟踪节点的存在和消失 ? 没有中央协调的消息群发 ? 大规模测试和模拟 ? 处理高水位标记和阻塞节点 ? 分布式日志记录和监控 一、用于真实世界的设计 发现 我们如何了解网络上的其他节点呢?我们是使用一个搜索服务、集中的中介,还是某种广播信标呢? 存在性 当其他节点来来去去时,我们如何跟踪它们呢?我们是使用某种集中注册服务,还是信号检测或信标呢? 连通性 我们到底怎么将一个节点连接到另一个节点呢?我们是使用本地网络、广域网络, 还是使用集中消息代理执行转发呢? 点对点的消息传递 我们如何将一个消息从一个节点发送到另一个节点呢?我们是把这个消息发送到节点的网络地址,还是通过一个集中的消息代理使用某个间接的寻址呢? 消息群发 我们如何将一个消息从一个节点发送到一组其他节点呢?我们是通过一个集中的消 息代理来工作,还是使用一种类似0MQ的发布-订阅模型来工作呢? 测试和模拟 我们如何模拟大量的节点,以便我们可以适当地测试性能呢?我们必须买两打 Android平板电脑,或者我们可以用纯软件来模拟吗? 分布式日志记录 我们如何跟踪这个节点云正在做什么,以便我们能发现性能问题和故障呢?我们是 要创建一个主日志服务,还是允许每台设备来记录它周围的世界呢? 内容分发 我们如何将内容从一个节点发送到另一个节点呢?我们是使用诸如FTP或HTTP的 以服务器为中心的协议,还是使用诸如FileMQ的分散的协议?
二、无线网络的秘密 WIFI借用没有许可的领空,并骑在开放,未经专利保护的,非常创新的互联网协议栈的背上。一个真正的无线世界将绕过所有中央审查,这个问题将是灾难。 连接两个点所需的功率按照它们距离的平方增加。对于WIFI网络,意味着两个无线电设备距离变远时.它们要么使用更多的电力要么必须降低它们的信号速率。因此,即使一个WIFI网络可以被标以一个额定速度,接入点(AP)和客户端之间的实际比特率还要取决于两者相距多远。 要构建健壮的分布式应用程序,我们应该意识到这些后果: ? 如果你有一组设备与AP交流,那么当AP与最慢的设备交流时,整个网络都必须等待。 ? 如果使用单播TCP并发送消息到多个设备,AP必须分别将数据包发送到每个设备,以太网的工作方式也是这样的。 ? 如果你使用多播或广播(在大多数情况下,这二者工作方式相同),则AP会一次性把一个数据包发送到整个网络,但它会用最慢的比特率发送(通常为1Mb/s)。 结论 WIFI不是以太网,虽然我相信未来ZMQ应用程序将有一个非常重要的分散式的无线存 在,但这不会是一帆风顺的。 三、发现 发现是网络编程的重要组成部分。每个zmq_connect()调用都提供一个端点字符串,该字符串必须来自某个地方。 1、服务发现 Wikipedia将“网络服务”定义为“计算机网络上托管的服务”,将“服务”定义为“一组可重用于不同目的的相关软件功能,以及应控制其使用的策略” 。在现代消息传递中,服务不会将一对一映射到端点。一个端点可以带来许多服务,并且服务可以随时间在端口之间甚至系统之间移动。因此,在现实的大型分布式应用程序中,我们需要某种服务发现机制。有一些经典模式: ? 我们可以强制从端点到服务进行旧的一对一映射,并简单地预先声明某个TCP端口号表示某个服务。然后,我们的协议应让我们进行检查(“请求的前4个字节是’HTTP’吗?”)。 ? 我们可以引导一项服务脱离另一项服务;连接到知名的端点和服务,请求“真实”服务,并返回端点。这为我们提供了服务查找服务。如果查找服务允许,则只要服务更新其位置,便可以移动。 ? 我们可以通过一项服务代理另一项服务,以便众所周知的端点和服务将间接提供其他服务(即,通过将消息转发给它们)。例如,这就是我们的Majordomo面向服务的经纪人的工作方式。 ? 我们可以使用八卦方法或集中式方法(如“克隆”模式)交换随时间变化的已知服务和端点的列表,以便分布式网络中的每个节点都可以建立整个网络的最终一致图。 ? 我们可以在网络端点和服务之间创建进一步的抽象层,例如为每个节点分配唯一的标识符,这样我们得到一个“节点网络”,其中每个节点可以提供一些服务,并且可以出现在随机的网络端点上。 ? 我们可以机会性地发现服务,例如,通过连接到端点,然后询问它们所提供的服务。“嗨,您提供共享打印机吗?如果是这样,制造商和型号是什么?”
2、网络发现 ? 使用硬编码的端点字符串,即固定的IP地址和约定的端口。 ? 从配置文件获取端点字符串。 ? 使用消息代理。仍然需要硬编码或配置的终结点字符串才能连接到代理,但是这种方法将网络中不同终结点的数量减少到一个。 ? 使用寻址代理。使用中央服务来中介地址信息(例如动态DNS设置),但允许节点直接相互发送消息。 ? 使用诸如ZeroConf的帮助程序库,它们无需任何集中式基础结构即可提供DNS服务。 ? 通过发出ARP或ICMP ECHO数据包,然后查询每个响应的节点来构建系统级发现。例如,您可以通过TCP连接进行查询,也可以通过发送UDP消息进行查询。 ? 通过尝试连接到网段中的每个单个地址来进行用户级暴力破解。 ? 滚动您自己的基于UDP的发现协议。 ? 八卦(gossip)发现协议。完全互连的网络对于较少数量的节点(例如最多100个或200个)非常有效。对于大量节点,我们需要某种八卦协议。 3、蛮力发现 连接到会议室中的每个IP地址。例如,如果您的网段是192.168.55.x,则执行以下操作:
connect to tcp://192.168.55.1:9000
connect to tcp://192.168.55.2:9000
connect to tcp://192.168.55.3:9000
...
connect to tcp://192.168.55.254:9000
用ZMQ描述: int address; for (address = 1; address < 255; address++) zsocket_connect (listener, “tcp://192.168.55.%d:9000”, address); 一个小型的分散式聊天程序,可让您与同一网段上的其他任何人交谈。该代码具有两个线程:监听器和广播器。监听器创建一个SUB套接字,并与网络中的所有对等节点进行暴力连接。 示例代码:Decentralized chat example:
try:
raw_input
except NameError:
raw_input = input
import argparse
import os
from threading import Thread
from netifaces import interfaces, ifaddresses, AF_INET
import zmq
def listen(masked):
"""listen for messages
masked is the first three parts of an IP address:
192.168.1
The socket will connect to all of X.Y.Z.{1-254}.
"""
ctx = zmq.Context.instance()
listener = ctx.socket(zmq.SUB)
for last in range(1, 255):
listener.connect("tcp://{0}.{1}:9000".format(masked, last))
listener.setsockopt(zmq.SUBSCRIBE, b'')
while True:
try:
print(listener.recv_string())
except (KeyboardInterrupt, zmq.ContextTerminated):
break
def main():
parser = argparse.ArgumentParser()
parser.add_argument("interface", type=str, help="the network interface",
choices=interfaces(),
)
parser.add_argument("user", type=str, default=os.environ['USER'],
nargs='?',
help="Your username",
)
args = parser.parse_args()
inet = ifaddresses(args.interface)[AF_INET]
addr = inet[0]['addr']
masked = addr.rsplit('.', 1)[0]
ctx = zmq.Context.instance()
listen_thread = Thread(target=listen, args=(masked,))
listen_thread.start()
bcast = ctx.socket(zmq.PUB)
bcast.bind("tcp://%s:9000" % args.interface)
print("starting chat on %s:9000 (%s.*)" % (args.interface, masked))
while True:
try:
msg = raw_input()
bcast.send_string("%s: %s" % (args.user, msg))
except KeyboardInterrupt:
break
bcast.close(linger=0)
ctx.term()
if __name__ == '__main__':
main()
该程序需要知道当前的IP地址,接口,和一个别名。 4、使用UDP广播的协作发现 使用UDP的主要问题是: (a)POSIX套接字API是为通用的灵活性设计的。 (b)UDP消息被限制为在LAN上大约为1500字节,在Internet上为512字节 (c)当你开始使用UDP对真实数据进行传输时.会发现有很多消息被丟弃,尤其是当基础结构倾向于使用TCP而不是UDP时。 示例代码:使用UDP,而不是ICMP_ECH0的一个最精简的ping程序。模型一
import os
import socket
import sys
import time
import zmq
PING_PORT_NUMBER = 9999
PING_MSG_SIZE = 1
PING_INTERVAL = 1
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(('', PING_PORT_NUMBER))
poller = zmq.Poller()
poller.register(sock, zmq.POLLIN)
ping_at = time.time()
while True:
timeout = ping_at - time.time()
if timeout < 0:
timeout = 0
try:
events = dict(poller.poll(1000* timeout))
except KeyboardInterrupt:
print("interrupted")
break
if sock.fileno() in events:
msg, addrinfo = sock.recvfrom(PING_MSG_SIZE)
print "Found peer %s:%d" % addrinfo
if time.time() >= ping_at:
print ("Pinging peers...")
sock.sendto(b'!', 0, ("255.255.255.255", PING_PORT_NUMBER))
ping_at = time.time() + PING_INTERVAL
if __name__ == '__main__':
main()
这个模型的最紧迫的问题分别是: 1)使用255.255.255.255广播地址是可疑的。一方面,这个广播地址精确地是指“发送到本地网络上的所有节点,并且不转发。”不过,如果有多个接口(有线以太网,WIFI),将只在默认路由上广播出去,只通过一个接口。 2)如套接字编程的许多方面,获得网络接口的信息是不可移植的。我们希望把它隐藏在一个库中。 3)除了 “终止”,没有对错误的处理。 4)代码需要知道自己的IP地址,并且忽略它发送出去的信标。 示例代码:模型二
import os
import sys
import time
import zmq
from udplib import UDP
PING_PORT_NUMBER = 9999
PING_MSG_SIZE = 1
PING_INTERVAL = 1
def main():
udp = UDP(PING_PORT_NUMBER)
poller = zmq.Poller()
poller.register(udp.handle, zmq.POLLIN)
ping_at = time.time()
while True:
timeout = ping_at - time.time()
if timeout < 0:
timeout = 0
try:
events = dict(poller.poll(1000* timeout))
except KeyboardInterrupt:
print("interrupted")
break
if udp.handle.fileno() in events:
udp.recv(PING_MSG_SIZE)
if time.time() >= ping_at:
print ("Pinging peers...")
udp.send('!')
ping_at = time.time() + PING_INTERVAL
if __name__ == '__main__':
main()
示例代码:模型三
"""UDP ping command
Model 3, uses abstract network interface
"""
from interface import Interface
def main():
interface = Interface()
while True:
try:
print(interface.recv())
except KeyboardInterrupt:
print("interrupted")
break
interface.stop()
if __name__ == '__main__':
main()
示例代码:UDP ping接口
"""Interface class for Chapter on Distributed Computing
This implements an "interface" to our network of nodes
"""
import time
import uuid
from threading import Thread
import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream
import udplib
def pipe(ctx):
"""create an inproc PAIR pipe"""
a = ctx.socket(zmq.PAIR)
b = ctx.socket(zmq.PAIR)
url = "inproc://%s" % uuid.uuid1()
a.bind(url)
b.connect(url)
return a, b
class Interface(object):
"""Interface class.
Just starts a UDP ping agent in a background thread."""
ctx = None
pipe = None
def __init__(self):
self.ctx = zmq.Context()
p0, p1 = pipe(self.ctx)
self.agent = InterfaceAgent(self.ctx, p1)
self.agent_thread = Thread(target=self.agent.start)
self.agent_thread.start()
self.pipe = p0
def stop(self):
self.pipe.close()
self.agent.stop()
self.ctx.term()
def recv(self):
"""receive a message from our interface"""
return self.pipe.recv_multipart()
PING_PORT_NUMBER = 9999
PING_INTERVAL = 1.0
PEER_EXPIRY = 5.0
UUID_BYTES = 32
class Peer(object):
uuid = None
expires_at = None
def __init__(self, uuid):
self.uuid = uuid
self.is_alive()
def is_alive(self):
"""Reset the peers expiry time
Call this method whenever we get any activity from a peer.
"""
self.expires_at = time.time() + PEER_EXPIRY
class InterfaceAgent(object):
"""This structure holds the context for our agent so we can
pass that around cleanly to methods that need it
"""
ctx = None
pipe = None
udp = None
uuid = None
peers = None
def __init__(self, ctx, pipe, loop=None):
self.ctx = ctx
self.pipe = pipe
if loop is None:
loop = IOLoop.instance()
self.loop = loop
self.udp = udplib.UDP(PING_PORT_NUMBER)
self.uuid = uuid.uuid4().hex.encode('utf8')
self.peers = {}
def stop(self):
self.pipe.close()
self.loop.stop()
def __del__(self):
try:
self.stop()
except:
pass
def start(self):
loop = self.loop
loop.add_handler(self.udp.handle.fileno(), self.handle_beacon, loop.READ)
stream = ZMQStream(self.pipe, loop)
stream.on_recv(self.control_message)
pc = PeriodicCallback(self.send_ping, PING_INTERVAL * 1000, loop)
pc.start()
pc = PeriodicCallback(self.reap_peers, PING_INTERVAL * 1000, loop)
pc.start()
loop.start()
def send_ping(self, *a, **kw):
try:
self.udp.send(self.uuid)
except Exception as e:
self.loop.stop()
def control_message(self, event):
"""Here we handle the different control messages from the frontend."""
print("control message: %s"%event)
def handle_beacon(self, fd, event):
uuid = self.udp.recv(UUID_BYTES)
if uuid in self.peers:
self.peers[uuid].is_alive()
else:
self.peers[uuid] = Peer(uuid)
self.pipe.send_multipart([b'JOINED', uuid])
def reap_peers(self):
now = time.time()
for peer in list(self.peers.values()):
if peer.expires_at < now:
print("reaping %s" % peer.uuid, peer.expires_at, now)
self.peers.pop(peer.uuid)
self.pipe.send_multipart([b'LEFT', peer.uuid])
四、点对点消息传递 在最后的UDP ping程序上面建立一个点对点消息传递层。目标是能够在节点加入和离开网络时检测它们,传送消息给它们,并获得应答。 UDP信标帧 从信标消息格式开始。我们需要一个永远不会在未来版本中改变的固定的协议标头,以及一个取决于版本的正文。我们定义了信标帧结构:
typedef struct {
byte protocol [3];
byte version;
uuid_t uuid;
uint16_t port;
} beacon_t;
真正的对等连接 和谐模式归结为: ? 一个ROUTER套接字。将它绑定到一个临时端口,这是在我们的信标中广播的。 ? 每个对等节点一个DEALER套接字,我们把它连接到对等节点的ROUTER套接字。 ? 从我们的ROUTER套接字读取。 ? 写入对等节点的DEALER套接字。 如果发现不是整齐地同步的。我们可以进行如下步骤: ? 如果收到一个UDP信标,我们就连接到该对等节点。 ? 从我们的ROUTER套接字读取消息,每个消息都带有发送者的UUID。 ? 如果它是一个OHAI消息,我们连接回到那个节点(如果我们还没有连接到它)。 ? 如果它是任何其他消息,我们就必须已经连接到该节点(这是一个放置断言的好地 方)。 ? 我们使用专用的节点-节点DEALER套接字给每个节点发送消息,它必须是已连接 的。 ? 当连接到一个节点时,我们也告诉我们的应用程序该对等节点是存在的。 ? 每次从一个节点得到一个消息时,我们都可以把它看成是一个信号检测(它还活着)。 检测失踪 当有大量的TCP流量时,UDP数据包就会 被丢弃,所以如果我们依赖于UDP的信标,就会得到虚假的断开。 我们做的是,只有当一个特定的节点一段时间后还没有发给我们任何UDP信标时才切换到TCP信号检测。然后,我们只对一个对等节点发送TCP信号检测。如果对方继续沉默,我们可以得出它已消失的结论。如果对等节点回来的时候带有一个不同的IP地址和端口,z则必须断开我们的DEALER套接字并重新连接到新的端口。 五、群发消息 群发消息是一种常见且非常有用的模式。这个概念很简单:不是与单个节点交流,而是跟一群节点交流。群是一个名字,是你在应用程序中达成一致的一个字符串。这正如在PUB和SUB套接字中使用发布-订阅前缀。 我们想要对群做的操作: ? 我们想加入和离开群。 ? 我们想知道在任何给定的群中的其他节点是什么。 ? 我们希望将消息发送给一个群(其中的所有节点)。 对群和对等节点进行管理的数据结构不是最佳的,但它是可行的。我们需要: ? 接口的群列表,我们可以在一个HELLO命令中将它发送到新的节点。 ? 其他节点的群的散列,这是我们用来自HELLO、JOIN和LEAVE命令的信息更新的。 ? 每个群的对等节点的散列,我们用相同的三个命令来更新它。 六、测试和模拟 1、使用断言 2、前期测试 3、zyre测试仪 测试仪的好处是,当我连接到一个WIFI接入点,所有Zyre流量(即使是同一进程中的两个接口之间的)都会通过AP。这意味着我可以完全只用在一个房间里的两台PC对任 何无线网络基础设施执行压力测试。 4、追踪活动 要调试各种问题,我们需要大量的日志记录。我们必须捕获: 每个事件的时间和日期。 事件发生在哪个节点上。 对等节点(如果有)。 事件是什么(例如,到达时哪个命令)。 事件数据(如果有)。 七、分布式日志记录和监督 1、捕获正在发生什么的能力非常重要。特别是如下几点原因: ? 为了测量随时间推移的系统性能。 ? 要了解什么样的工作做得最多,以优化性能。 ? 要跟踪误差和它出现的频度。 ? 要对故障做事后检查。 ? 在发生争端时提供审计线索。 2、从我们认为必须要解决的问题角度来对此划定范围: ? 我们要跟踪重要事件(如节点离开和重新加入网络)。 ? 对于每个事件,我们要跟踪一组一致的数据:日期/时间,观察到该事件的节点,产生该事件的对等节点,事件本身的类型,以及其他事件数据。 ? 我们希望能够随时切换登录和注销。 ? 我们希望能够以机械方式处理日志数据,因为这将是相当大量的。 ? 我们希望能够监视正在运行的系统,也就是说,实时收集日志,并对它们进行分析。 ? 我们希望将记录流量的工作对网络的影响降到最小。 ? 我们希望能够在网络上的一个点收集日志数据。 分布式日志收集
八、编写协议 协议规范的核心是用于命令和字段的ABNF语法:
zre-protocol = greeting *traffic
greeting = S:HELLO
traffic = S:WHISPER
/ S:SHOUT
/ S:JOIN
/ S:LEAVE
/ S:PING R:PING-OK
; Greet a peer so it can connect back to us
S:HELLO = header %x01 ipaddress mailbox groups status headers
header = signature sequence
signature = %xAA %xA1
sequence = 2OCTET ; Incremental sequence number
ipaddress = string ; Sender IP address
string = size *VCHAR
size = OCTET
mailbox = 2OCTET ; Sender mailbox port number
groups = strings ; List of groups sender is in
strings = size *string
status = OCTET ; Sender group status sequence
headers = dictionary ; Sender header properties
dictionary = size *key-value
key-value = string ; Formatted as name=value
; Send a message to a peer
S:WHISPER = header %x02 content
content = FRAME ; Message content as ZeroMQ frame
; Send a message to a group
S:SHOUT = header %x03 group content
group = string ; Name of group
content = FRAME ; Message content as ZeroMQ frame
; Join a group
S:JOIN = header %x04 group status
status = OCTET ; Sender group status sequence
; Leave a group
S:LEAVE = header %x05 group status
; Ping a peer that has gone silent
S:PING = header %06
; Reply to a peer's ping
R:PING-OK = header %07
九、Zyre应用示例 使用Zyre在分布式网络上广播文件。此示例包含两个程序: 一个听众是加入Zyre网络只要收到一个文件报告。 一个发送者是加入一个Zyre网络,准确地播放一个文件。 listener代码:
int main (int argc, char *argv [])
{
zre_interface_t *interface = zre_interface_new ();
while (true) {
zmsg_t *incoming = zre_interface_recv (interface);
if (!incoming)
break;
zmsg_dump (incoming);
zmsg_destroy (&incoming);
}
zre_interface_destroy (&interface);
return 0;
}
sender代码:
int main (int argc, char *argv [])
{
if (argc < 2) {
puts ("Syntax: sender filename virtualname");
return 0;
}
printf ("Publishing %s as %s\n", argv [argv [2](1],));
zre_interface_t *interface = zre_interface_new ();
zre_interface_publish (interface, argv [argv [2](1],));
while (true) {
zmsg_t *incoming = zre_interface_recv (interface);
if (!incoming)
break;
zmsg_dump (incoming);
zmsg_destroy (&incoming);
}
zre_interface_destroy (&interface);
return 0;
}
十、结论 为不稳定的分散式网络构建应用程序是ZMQ的最终目标之一。Zyre不是唯一的,人们已经为应用程序打开此领域进行了很多尝试(ZeroConf, SLP、SSDP, UPnP、DDS)。但这些尝试似乎都因为过于复杂或很难被应用程序开发者用作构建的基础而告终。
|