IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> MIT 6.824 Lec2.RPC and Threads -> 正文阅读

[网络协议]MIT 6.824 Lec2.RPC and Threads

学习本次课程需要对Go语言有一定的了解,推荐Google官方的Go教程 A Tour of Go

线程

多个线程允许一个程序同时进行多项任务,每个线程内部程序串行运行,并且有自己的程序计数器、寄存器和栈空间。

使用线程的优点

多线程的应用在分布式系统中非常常见,因为它能够支持并发操作,非常契合分布式系统的特点。

  • I/O concurrency
    – 使用线程可以同时处理大量的I/O任务。一种常见的场景是,client构建多个线程来向不同的server发起rpc请求,每个请求线程得到响应后再执行对应的处理任务。
  • Multicore performance
    – 使用多线程可以最大限度的利用多核CPU的性能。多个线程可以由不同的CPU核心进行处理,不同CPU核心的线程拥有独立的CPU周期。
  • Convenience
    – 很多时候多线程可以大大简化我们的编程难度。比如在分布式系统中,我们想要每隔一定的时间进行一次事件检查(如 MapReduce中Master节点检查Worker是否异常),我们就可以创建一个线程,让其专门负责定期检查Worker是否存活。

事件驱动编程

如果要实现并发I/O,除了采取多线程的方式,还可以采用事件驱动编程的思想来实现,如epoll模型等。在事件驱动编程中,有一个线程会负责循环检测所有的事件状态,如客户端发起的rpc请求等,当该线程检测到事件到来时,如服务器响应rpc请求,该线程就会调用相应的处理函数,并继续进行循环监听。事件驱动编程相比多线程的实现方式有以下不同:

  • 优点
    – 开销更小(多线程的创建和删除以及空间占用远大于事件驱动)
  • 缺点
    – 无法充分利用多核CPU的性能
    – 实现较为复杂

线程中的挑战

在进行多线程编程时,通常需要仔细考虑以下几个重要问题。

  • shared data
    – 线程间是可以共享进程数据的,但是在使用共享数据的过程中,很可能会出现冲突问题。如两个线程同时执行n=n+1,由于读写的先后顺序不一致,程序产生的结果也会不一样。
  • coordination
    – 我们经常需要线程间能够相互协作,比如经典的消费者-生产者模型。在Go语言中,线程间的相互协作通常有以下几种实现方式,channelsync.Condsync.WaitGroup
  • deadlock

Example: web crawler

下面用一个简单的网页爬虫来展示Go中多线程的应用,对于一个网页爬虫,我们需要其从给定的url出发不断递归查询,并且每个url只能爬取一次。我们首先先给出基本的数据结构。

//
// main
//

func main() {
	fmt.Printf("=== Serial===\n")
	Serial("http://golang.org/", fetcher, make(map[string]bool))

	fmt.Printf("=== ConcurrentMutex ===\n")
	ConcurrentMutex("http://golang.org/", fetcher, makeState())

	fmt.Printf("=== ConcurrentChannel ===\n")
	ConcurrentChannel("http://golang.org/", fetcher)
}

//
// Fetcher
//

type Fetcher interface {
	// Fetch returns a slice of URLs found on the page.
	Fetch(url string) (urls []string, err error)
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
	body string
	urls []string
}

func (f fakeFetcher) Fetch(url string) ([]string, error) {
	if res, ok := f[url]; ok {
		fmt.Printf("found:   %s\n", url)
		return res.urls, nil
	}
	fmt.Printf("missing: %s\n", url)
	return nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
	"http://golang.org/": &fakeResult{
		"The Go Programming Language",
		[]string{
			"http://golang.org/pkg/",
			"http://golang.org/cmd/",
		},
	},
	"http://golang.org/pkg/": &fakeResult{
		"Packages",
		[]string{
			"http://golang.org/",
			"http://golang.org/cmd/",
			"http://golang.org/pkg/fmt/",
			"http://golang.org/pkg/os/",
		},
	},
	"http://golang.org/pkg/fmt/": &fakeResult{
		"Package fmt",
		[]string{
			"http://golang.org/",
			"http://golang.org/pkg/",
		},
	},
	"http://golang.org/pkg/os/": &fakeResult{
		"Package os",
		[]string{
			"http://golang.org/",
			"http://golang.org/pkg/",
		},
	},
}

fakeFetcher是一个网页爬虫器,其实现了Fetcher接口,其会根据给出的url找到对应的结果。爬虫的结构被存储到fakeResult中。

Serial crawler

//
// Serial crawler
//

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
	if fetched[url] {
		return
	}
	fetched[url] = true
	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	for _, u := range urls {
		Serial(u, fetcher, fetched)
	}
	

在串行爬虫中,我们通过递归调用Serial函数来实现需求,但是这种方式一次只能爬一个网页,效率很低。

ConcurrentMutex crawler

我们使用shared data + WaitGroup的方式来实现并发爬虫。

//
// Concurrent crawler with shared state and Mutex
//

type fetchState struct {
	mu      sync.Mutex
	fetched map[string]bool
}

func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
	f.mu.Lock()
	already := f.fetched[url]
	f.fetched[url] = true
	f.mu.Unlock()

	if already {
		return
	}

	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	var done sync.WaitGroup
	for _, u := range urls {
		done.Add(1)
		go func(u string) {
			defer done.Done()
			ConcurrentMutex(u, fetcher, f)
		}(u)
	}
	done.Wait()
	return
}

func makeState() *fetchState {
	f := &fetchState{}
	f.fetched = make(map[string]bool)
	return f
}

相比串行版本,该版本进行了以下改变:

  • 由于我们需要每个goroutine在执行fetch任务时保证url的唯一性,因此我们需要使用一个map来作为线程间的共享变量。
  • ConcurrentMutex中,我们使用sync.Mutex来保证map结构的读写正确
  • 使用sync.WaitGroup来同步等待创建的gorountines执行完毕后再退出函数。
  • 在go func入口使用defer done.Done(),来确保即使goroutinue执行异常,也能正确的更新WaitGroup计数器

ConcurrentChannel crawler

func worker(url string, ch chan []string, fetcher Fetcher) {
	urls, err := fetcher.Fetch(url)
	if err != nil {
		ch <- []string{}
	} else {
		ch <- urls
	}
}

func coordinator(ch chan []string, fetcher Fetcher) {
	n := 1
	fetched := make(map[string]bool)
	for urls := range ch {
		for _, u := range urls {
			if fetched[u] == false {
				fetched[u] = true
				n += 1
				go worker(u, ch, fetcher)
			}
		}
		n -= 1
		if n == 0 {
			break
		}
	}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
	ch := make(chan []string)
	go func() {
		ch <- []string{url}
	}()
	coordinator(ch, fetcher)
}

区别于使用MutexWaitGroup,我们也可以用channel来实现功能:

  • coordinator函数负责分配任务,worker负责执行任务
  • coordinatorchannel中循环读取数据,并使用变量n来记录分配的任务数量。
  • worker将查询到的结果放入到channel中,并等待coordinator接收。

RPC

调用流程

RPC用于clientserver进行远程通信的一种调用框架,其基本组成如下图所示。

RPC框架调用流程如下:

  • client调用server上的函数f(x,y)
  • client stub将调用的函数及相关参数进行打包,通过网络发送给server
  • server stub接收到数据包后进行参数和函数解析,调用server中的方法f(x,y)
  • server将函数调用解决通过server stub返回,返回过程与发送过程相同

异常处理

在进行RPC通信的时候,可能出现的异常情况是client在发送了rpc request之后,没有收到server的响应。对于这种异常错误,一般有以下几种处理机制。

at-least-once

client会一直等待server的回复,并不断的重复的发送请求,直到达到发送上限或受到服务器的应答。可以发现,这种处理机制对读操作是可以正常运行的,但是对于写操作,需要server有处理重复写操作的能力。比如有一个K/V数据库,我们要求使用Put(10)方法往银行账户上增长10块钱,如果server端没有处理重复写操作的能力,就会造成数据错误。

at-most-once

当发生异常时,server会检测重复的rpc request并且会返回之前运行的请求,而不是重新执行该请求。每个client在发送请求时,都会携带一个XID的唯一标识符,XID通常由随机数,IP和sequence number组合而成。

  server:
    if seen[xid]:
      r = old[xid]
    else
      r = handler()
      old[xid] = r
      seen[xid] = true

如何确保rpc可以安全的丢弃重复的rpc request,具体的做法可以参考TCP的实现思路。

Go中的rpc框架采用的简化版的at-most-once做法:

  • 使用的是TCP连接
  • client不会发送重复的request
  • client会返回错误,如果其没有受到response

exactly-once

舒服分布式系统中的难题,比较难实现,目前通用的解决方案是重传+冗余检测+异常处理。

Example:K/V

下面我们用一个简单的K/V数据库来学习如何用Go来实现RPC通信。该例子中的数据库包含两个功能,putgetput操作支持clientserver中插入一个任意的键值对数据,get操作支持client查询server中的数据。

package main

import (
	"fmt"
	"log"
	"net"
	"net/rpc"
	"sync"
)

//
// Common RPC request/reply definitions
//

const (
	OK       = "OK"
	ErrNoKey = "ErrNoKey"
)

type Err string

type PutArgs struct {
	Key   string
	Value string
}

type PutReply struct {
	Err Err
}

type GetArgs struct {
	Key string
}

type GetReply struct {
	Err   Err
	Value string
}

//
// Client
//

func connect() *rpc.Client {
	client, err := rpc.Dial("tcp", ":1234")
	if err != nil {
		log.Fatal("dialing:", err)
	}
	return client
}

func get(key string) string {
	client := connect()
	args := GetArgs{"subject"}
	reply := GetReply{}
	err := client.Call("KV.Get", &args, &reply)
	if err != nil {
		log.Fatal("error:", err)
	}
	client.Close()
	return reply.Value
}

func put(key string, val string) {
	client := connect()
	args := PutArgs{"subject", "6.824"}
	reply := PutReply{}
	err := client.Call("KV.Put", &args, &reply)
	if err != nil {
		log.Fatal("error:", err)
	}
	client.Close()
}

//
// Server
//

type KV struct {
	mu   sync.Mutex
	data map[string]string
}

func server() {
	kv := new(KV)
	kv.data = map[string]string{}
	rpcs := rpc.NewServer()
	rpcs.Register(kv)
	l, e := net.Listen("tcp", ":1234")
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go func() {
		for {
			conn, err := l.Accept()
			if err == nil {
				go rpcs.ServeConn(conn)
			} else {
				break
			}
		}
		l.Close()
	}()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
	kv.mu.Lock()
	defer kv.mu.Unlock()

	val, ok := kv.data[args.Key]
	if ok {
		reply.Err = OK
		reply.Value = val
	} else {
		reply.Err = ErrNoKey
		reply.Value = ""
	}
	return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
	kv.mu.Lock()
	defer kv.mu.Unlock()

	kv.data[args.Key] = args.Value
	reply.Err = OK
	return nil
}

//
// main
//

func main() {
	server()

	put("subject", "6.824")
	fmt.Printf("Put(subject, 6.824) done\n")
	fmt.Printf("get(subject) -> %s\n", get("subject"))
}

下面对代码进行分析和解释。

通用数据结构

  • 定义getput的请求格式和应答格式
  • 定义K/V数据的基本格式,包括键值对和互斥锁

client流程

  • connect()函数用于与server建立tcp连接。
  • get()put()函数相当于client stubs,用于打包请求数据
  • Call()函数通知RPC准备发起请求。在本例中我们在Call之前,已经定义好了请求和应答的格式,RPC库会打包参数,发送请求,然后等待回复,收到回复后再根据回复格式解析参数。

server流程

  • 创建一个基于tcp连接的server,并将K/V数据库注册到RPC库中
  • RPC框架在收到请求后,会为新的请求启动一个goroutine。新线程会解析请求参数,并在已注册的服务中找到匹配的服务,调用对应的函数,打包执行结果写入TCP连接。
  • Get()Put()函数必须加锁,因为RPC会为每个请求都单独创建一个goroutine
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-11-16 19:11:36  更:2021-11-16 19:13:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 20:09:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码