学习本次课程需要对Go语言有一定的了解,推荐Google官方的Go教程 A Tour of Go
线程
多个线程允许一个程序同时进行多项任务,每个线程内部程序串行运行,并且有自己的程序计数器、寄存器和栈空间。
使用线程的优点
多线程的应用在分布式系统中非常常见,因为它能够支持并发操作,非常契合分布式系统的特点。
- I/O concurrency
– 使用线程可以同时处理大量的I/O任务。一种常见的场景是,client 构建多个线程来向不同的server 发起rpc 请求,每个请求线程得到响应后再执行对应的处理任务。 - Multicore performance
– 使用多线程可以最大限度的利用多核CPU的性能。多个线程可以由不同的CPU核心进行处理,不同CPU核心的线程拥有独立的CPU周期。 - Convenience
– 很多时候多线程可以大大简化我们的编程难度。比如在分布式系统中,我们想要每隔一定的时间进行一次事件检查(如 MapReduce中Master节点检查Worker是否异常),我们就可以创建一个线程,让其专门负责定期检查Worker是否存活。
事件驱动编程
如果要实现并发I/O,除了采取多线程的方式,还可以采用事件驱动编程的思想来实现,如epoll 模型等。在事件驱动编程中,有一个线程会负责循环检测所有的事件状态,如客户端发起的rpc 请求等,当该线程检测到事件到来时,如服务器响应rpc 请求,该线程就会调用相应的处理函数,并继续进行循环监听。事件驱动编程相比多线程的实现方式有以下不同:
- 优点
– 开销更小(多线程的创建和删除以及空间占用远大于事件驱动) - 缺点
– 无法充分利用多核CPU的性能 – 实现较为复杂
线程中的挑战
在进行多线程编程时,通常需要仔细考虑以下几个重要问题。
- shared data
– 线程间是可以共享进程数据的,但是在使用共享数据的过程中,很可能会出现冲突问题。如两个线程同时执行n=n+1 ,由于读写的先后顺序不一致,程序产生的结果也会不一样。 - coordination
– 我们经常需要线程间能够相互协作,比如经典的消费者-生产者模型。在Go 语言中,线程间的相互协作通常有以下几种实现方式,channel ,sync.Cond 和sync.WaitGroup 。 - deadlock
Example: web crawler
下面用一个简单的网页爬虫来展示Go 中多线程的应用,对于一个网页爬虫,我们需要其从给定的url 出发不断递归查询,并且每个url 只能爬取一次。我们首先先给出基本的数据结构。
func main() {
fmt.Printf("=== Serial===\n")
Serial("http://golang.org/", fetcher, make(map[string]bool))
fmt.Printf("=== ConcurrentMutex ===\n")
ConcurrentMutex("http://golang.org/", fetcher, makeState())
fmt.Printf("=== ConcurrentChannel ===\n")
ConcurrentChannel("http://golang.org/", fetcher)
}
type Fetcher interface {
Fetch(url string) (urls []string, err error)
}
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) ([]string, error) {
if res, ok := f[url]; ok {
fmt.Printf("found: %s\n", url)
return res.urls, nil
}
fmt.Printf("missing: %s\n", url)
return nil, fmt.Errorf("not found: %s", url)
}
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}
fakeFetcher是一个网页爬虫器,其实现了Fetcher接口,其会根据给出的url 找到对应的结果。爬虫的结构被存储到fakeResult 中。
Serial crawler
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
在串行爬虫中,我们通过递归调用Serial函数来实现需求,但是这种方式一次只能爬一个网页,效率很低。
ConcurrentMutex crawler
我们使用shared data + WaitGroup的方式来实现并发爬虫。
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}
func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
f.mu.Lock()
already := f.fetched[url]
f.fetched[url] = true
f.mu.Unlock()
if already {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, f)
}(u)
}
done.Wait()
return
}
func makeState() *fetchState {
f := &fetchState{}
f.fetched = make(map[string]bool)
return f
}
相比串行版本,该版本进行了以下改变:
- 由于我们需要每个
goroutine 在执行fetch任务时保证url 的唯一性,因此我们需要使用一个map 来作为线程间的共享变量。 - 在
ConcurrentMutex 中,我们使用sync.Mutex 来保证map 结构的读写正确 - 使用
sync.WaitGroup 来同步等待创建的gorountines 执行完毕后再退出函数。 - 在go func入口使用
defer done.Done() ,来确保即使goroutinue 执行异常,也能正确的更新WaitGroup 计数器
ConcurrentChannel crawler
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}
func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}
func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}
区别于使用Mutex 和WaitGroup ,我们也可以用channel 来实现功能:
coordinator 函数负责分配任务,worker 负责执行任务coordinator 从channel 中循环读取数据,并使用变量n 来记录分配的任务数量。worker 将查询到的结果放入到channel 中,并等待coordinator 接收。
RPC
调用流程
RPC 用于client 与server 进行远程通信的一种调用框架,其基本组成如下图所示。 RPC 框架调用流程如下:
client 调用server 上的函数f(x,y) client stub 将调用的函数及相关参数进行打包,通过网络发送给server server stub 接收到数据包后进行参数和函数解析,调用server 中的方法f(x,y) server 将函数调用解决通过server stub 返回,返回过程与发送过程相同
异常处理
在进行RPC 通信的时候,可能出现的异常情况是client 在发送了rpc request 之后,没有收到server 的响应。对于这种异常错误,一般有以下几种处理机制。
at-least-once
client 会一直等待server 的回复,并不断的重复的发送请求,直到达到发送上限或受到服务器的应答。可以发现,这种处理机制对读操作是可以正常运行的,但是对于写操作,需要server 有处理重复写操作的能力。比如有一个K/V数据库,我们要求使用Put(10) 方法往银行账户上增长10块钱,如果server 端没有处理重复写操作的能力,就会造成数据错误。
at-most-once
当发生异常时,server 会检测重复的rpc request 并且会返回之前运行的请求,而不是重新执行该请求。每个client 在发送请求时,都会携带一个XID 的唯一标识符,XID 通常由随机数,IP和sequence number组合而成。
server:
if seen[xid]:
r = old[xid]
else
r = handler()
old[xid] = r
seen[xid] = true
如何确保rpc 可以安全的丢弃重复的rpc request ,具体的做法可以参考TCP 的实现思路。
Go 中的rpc 框架采用的简化版的at-most-once 做法:
- 使用的是
TCP 连接 client 不会发送重复的request client 会返回错误,如果其没有受到response
exactly-once
舒服分布式系统中的难题,比较难实现,目前通用的解决方案是重传+冗余检测+异常处理。
Example:K/V
下面我们用一个简单的K/V数据库来学习如何用Go 来实现RPC 通信。该例子中的数据库包含两个功能,put 和get ,put 操作支持client 向server 中插入一个任意的键值对数据,get 操作支持client 查询server 中的数据。
package main
import (
"fmt"
"log"
"net"
"net/rpc"
"sync"
)
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
)
type Err string
type PutArgs struct {
Key string
Value string
}
type PutReply struct {
Err Err
}
type GetArgs struct {
Key string
}
type GetReply struct {
Err Err
Value string
}
func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}
func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}
func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}
type KV struct {
mu sync.Mutex
data map[string]string
}
func server() {
kv := new(KV)
kv.data = map[string]string{}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}
func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
val, ok := kv.data[args.Key]
if ok {
reply.Err = OK
reply.Value = val
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
return nil
}
func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.data[args.Key] = args.Value
reply.Err = OK
return nil
}
func main() {
server()
put("subject", "6.824")
fmt.Printf("Put(subject, 6.824) done\n")
fmt.Printf("get(subject) -> %s\n", get("subject"))
}
下面对代码进行分析和解释。
通用数据结构
- 定义
get 和put 的请求格式和应答格式 - 定义K/V数据的基本格式,包括键值对和互斥锁
client 流程
connect() 函数用于与server 建立tcp 连接。get() 和put() 函数相当于client stubs ,用于打包请求数据Call() 函数通知RPC 准备发起请求。在本例中我们在Call 之前,已经定义好了请求和应答的格式,RPC 库会打包参数,发送请求,然后等待回复,收到回复后再根据回复格式解析参数。
server 流程
- 创建一个基于
tcp 连接的server ,并将K/V 数据库注册到RPC 库中 RPC 框架在收到请求后,会为新的请求启动一个goroutine 。新线程会解析请求参数,并在已注册的服务中找到匹配的服务,调用对应的函数,打包执行结果写入TCP 连接。Get() 和Put() 函数必须加锁,因为RPC 会为每个请求都单独创建一个goroutine 。
|