IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> [go]大规模并发常见操作 -> 正文阅读

[C++知识库][go]大规模并发常见操作

总结于《go语言并发之道》, 建议直接看原书
仓库地址

心跳

  1. 工作开始时的心跳
// DoWork2 在工作单元开始时发出的心跳 防止设置超时导致还没开始工作就超时了
// 根据给定数组内容生成int流
func DoWork2(ctx context.Context, nums ...int) (<-chan interface{}, <-chan int) {
	heartStream := make(chan interface{}, 1) // 至少可以发送一个心跳
	intStream := make(chan int)
	go func() {
		defer func() {
			close(heartStream)
			close(intStream)
		}()
		time.Sleep(time.Second)
		for _, num := range nums {
			select {
			case heartStream <- struct{}{}: // 开始任务时发送信号
			default: // 防止没人接收心跳
			}
			select {
			case <-ctx.Done():
				return
			case intStream <- num:
			}
		}
	}()
	return heartStream, intStream
}

// test
func TestDoWork2(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	intSlice := []int{0, 1, 2, 3, 4}
	heartBeat, results := DoWork2(ctx, intSlice...)
	<-heartBeat // 等待go程开始处理的信号
	i := 0
	for r := range results {
		if want := intSlice[i]; r != want {
			require.Equal(t, want, r, "idx=", i)
		}
		i++
	}
}

  1. 间隔心跳可以用于防止超时
// DoWork3 如果一个迭代会持续很长时间,可以使用间隔心跳保证安全
func DoWork3(ctx context.Context, pulseInterval time.Duration, nums ...int) (<-chan interface{}, <-chan int) {
	heartStream := make(chan interface{}, 1) // 至少可以发送一个心跳
	intStream := make(chan int)
	go func() {
		defer close(heartStream)
		defer close(intStream)

		time.Sleep(2 * time.Second)
		pulse := time.NewTicker(pulseInterval)
		defer pulse.Stop()
	numLoop:
		for _, n := range nums {
			for {
				select {
				case <-ctx.Done():
					return
				case <-pulse.C:
					select {
					case heartStream <- struct{}{}:
					default:
					}
				case intStream <- n:
					continue numLoop
				}
			}
		}
	}()
	return heartStream, intStream
}

// test
func TestDoWork3(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	intSlice := []int{0, 1, 2, 3, 4}
	const timeout = 2 * time.Second
	heartBeat, results := DoWork3(ctx, timeout, intSlice...)
	<-heartBeat
	i := 0
	for {
		select {
		case r, ok := <-results:
			if ok == false {
				return
			}
			require.Equal(t, intSlice[i], r)
			i++
		case <-heartBeat: // 接收心跳 防止超时
		case <-time.After(timeout):
			t.Fatal("time out")
		}
	}
}

速率限制

使用"golang.org/x/time/rate"包实现混合限流

// limit/limit.go

// RateLimiter 限流接口
type RateLimiter interface {
	Wait(ctx context.Context) error
	Limit() rate.Limit
}

type multiLimiter struct {
	limiters []RateLimiter
}

// Wait 阻塞等待直到获取令牌或者超时
func (m *multiLimiter) Wait(ctx context.Context) error {
	for _, l := range m.limiters {
		if err := l.Wait(ctx); err != nil {
			return err
		}
	}
	return nil
}

// Limit 返回当前限制速率
func (m *multiLimiter) Limit() rate.Limit {
	return m.limiters[0].Limit() // 直接返回限制最多的元素
}

// MultiLimiter 混合多个限流桶
func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
	byLimit := func(i, j int) bool { return limiters[i].Limit() < limiters[j].Limit() } // 细粒度在前
	sort.Slice(limiters, byLimit)
	return &multiLimiter{limiters: limiters}
}

// Per 返回速率为 每duration,eventCount个请求
func Per(eventCount int, duration time.Duration) rate.Limit {
	return rate.Every(duration / time.Duration(eventCount))
}

这里模拟一个需要混合限流的API

// api/api.go

type API interface {
	ReadFile(ctx context.Context) error
	ResolveAddress(ctx context.Context) error
}

type testApi struct {
	netWorkLimit, diskLimit, apiLimit limit.RateLimiter // 多个维度进行限制
}

func Open() API {
	apiLimit := limit.MultiLimiter(
		rate.NewLimiter(limit.Per(2, time.Second), 1),   // 每秒的限制,防止突发请求,每1秒补充两个
		rate.NewLimiter(limit.Per(10, time.Minute), 10), // 每分钟的限制,设置初始池,每10秒补充一个
	)
	diskLimit := limit.MultiLimiter(
		rate.NewLimiter(rate.Limit(1), 1),
	)
	netWorkLimit := limit.MultiLimiter(
		rate.NewLimiter(limit.Per(3, time.Second), 3),
	)
	return &testApi{
		apiLimit:     apiLimit,
		diskLimit:    diskLimit,
		netWorkLimit: netWorkLimit,
	}
}

func (t *testApi) ReadFile(ctx context.Context) error {
	if err := limit.MultiLimiter(t.apiLimit, t.diskLimit).Wait(ctx); err != nil { // 融合api限流和磁盘限流
		return err
	}
	return nil
}

func (t *testApi) ResolveAddress(ctx context.Context) error {
	if err := limit.MultiLimiter(t.apiLimit, t.netWorkLimit).Wait(ctx); err != nil {
		return err
	}
	return nil
}

最后测试限流情况

func main() {
	defer log.Println("Done")
	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.Lshortfile)
	apiConn := api.Open()
	var wg sync.WaitGroup
	wg.Add(20)
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			if err := apiConn.ReadFile(context.Background()); err != nil {
				log.Println("cannot read file:", err)
				return
			}
			log.Println("read file")
		}()
	}
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			if err := apiConn.ResolveAddress(context.Background()); err != nil {
				log.Println("cannot resolve address:", err)
				return
			}
			log.Println("ResolveAddress")
		}()
	}
	wg.Wait()
}

复制请求

将一个请求复制到多个处理程序,选择最快处理完成的结果

// Work 处理程序
func Work(ctx context.Context, id int, wg *sync.WaitGroup, resultChan chan<- int) {
	defer wg.Done()
	start := time.Now()
	costTime := time.Duration(rand.Intn(5)+1) * time.Second
	select {
	case <-ctx.Done():
	case <-time.After(costTime):
		select {
		case <-ctx.Done():
		case resultChan <- id:
		}
	}
	tookTime := time.Since(start)
	if tookTime < costTime {
		tookTime = costTime
	}
	log.Printf("%v cost %v\n", id, tookTime)
}

//开启10个处理程序,选择最快的
func test1() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var wg sync.WaitGroup
	resultChan := make(chan int)
	n := 10
	wg.Add(n)
	for i := 0; i < n; i++ {
		go Work(ctx, i, &wg, resultChan)
	}
	first := <-resultChan
	cancel()
	wg.Wait()
	close(resultChan)
	fmt.Println("received result:", first)
}

治愈异常的go程

当一些goroutine处于异常状态时,尝试对其进行重启
通过心跳机制判断goroutine的状态(最好在心跳中包含某些信息用于判断其不是活锁)
我们需要一个管理员来负责监控并重启goroutine
// StartGoroutineFn 创建一个可以监控和重启的goroutine的方式
// 参数:退出channel,管理员的心跳时间
// 返回值:返回管理员心跳的channel
type StartGoroutineFn func(done <-chan any, pulseInterval time.Duration) <-chan any

// NewSteward 新建一个管理员
// 参数:下游的超时时间,创建一个可以监控和重启的goroutine的方式
// 返回值:返回一个创建一个受管理的goroutine和其管理者的函数的创建方式
func NewSteward(timeout time.Duration, startGoroutine StartGoroutineFn) StartGoroutineFn {
	return func(done <-chan any, pulseInterval time.Duration) <-chan any {
		heartBeat := make(chan any)
		go func() {
			defer close(heartBeat)

			var wardDone chan any        // 管理者用于通知下游退出的channel
			var wardHeartbeat <-chan any // 管理员用于接收下游心跳的channel
			startWard := func() {
				wardDone = make(chan any)                                            // 初始化退出channel
				wardHeartbeat = startGoroutine(common.Or(wardDone, done), timeout/2) // 启动下游,其心跳间隔是超时间隔的一半
			}
			startWard()                       // 启动受监管的goroutine
			pulse := time.Tick(pulseInterval) // 定时回复上游的心跳
		monitorLoop:
			for {
				timeoutSignal := time.After(timeout)
				for {
					select {
					case <-pulse: // 回复心跳
						select {
						case heartBeat <- struct{}{}:
						default:
						}
					case <-wardHeartbeat: // 接收到下游的心跳则继续监视
						continue monitorLoop
					case <-timeoutSignal: // 没收到下游的心跳则重启下游
						log.Println("stewart: ward unhealthy;restarting")
						close(wardDone)
						startWard() // 使用之前的方式重新启动下游
						continue monitorLoop
					case <-done:
						return
					}
				}
			}
		}()
		return heartBeat
	}
}

// 不正常的go程
func badWorker() StartGoroutineFn {
	return func(done <-chan any, pulseInterval time.Duration) <-chan any {
		log.Println("ward: Hello, I am irresponsible")
		go func() {
			<-done
			log.Println("ward: I am halting")
		}()
		return nil // 故意阻塞
	}
}

// 受管理的go程:生成int流,可以启动多个管理区副本
func generatorIntStream(done <-chan interface{}, intList ...int) (<-chan interface{}, StartGoroutineFn) {
	intChanStream := make(chan (<-chan interface{}))
	intStream := common.Bridge(done, intChanStream) // 从intChanStream读int流
	return intStream, func(done <-chan any, pulseInterval time.Duration) <-chan any {
		intStream := make(chan interface{}) // 给管理区传递信息
		heartBeat := make(chan interface{})
		go func() {
			defer close(intStream)
			defer log.Println("over")
			log.Println("start")
			select {
			case intChanStream <- intStream: // 尝试塞intStream进intChanStream 塞不进去说明有其他的实例正在工作
			case <-done:
				return
			}
			pulse := time.Tick(pulseInterval)
			for {
			valueLoop:
				// 真正的工作 -> 往intStream里一直塞数据
				for _, intVal := range intList {
					if intVal < 0 {
						log.Printf("negative value:%v\n", intVal)
						return
					}
					time.Sleep(pulseInterval * 2)// 模拟下不正常运行
					for {
						select {
						case <-pulse:
							select {
							case heartBeat <- struct{}{}:
							default:
							}
						case intStream <- intVal:
							continue valueLoop
						case <-done:
							return
						}
					}
				}
			}
		}()
		return heartBeat
	}
}
// 测试一下
func main() {
	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC | log.Lshortfile)

	done := make(chan any)
	time.AfterFunc(time.Minute, func() { // 1min 后退出
		log.Println("main: halting stewart and ward.")
		close(done)
	})
	intStream, startFn := generatorIntStream(done, 1, 2, 3, 4)
	heartBeat := NewSteward(4*time.Second, startFn)(done, 4*time.Second)
	for {
		select {
		case <-done:
			log.Println("Done")
			return
		case <-heartBeat:
			log.Println("Steward is healthy")
		case val := <-intStream:
			log.Println("received intVal:", val.(int))
		}
	}
}

实际运行截图
在这里插入图片描述

  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章           查看所有文章
加:2022-05-12 16:18:51  更:2022-05-12 16:20:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/11 3:59:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码