总结于《go语言并发之道》, 建议直接看原书 仓库地址
心跳
- 工作开始时的心跳
func DoWork2(ctx context.Context, nums ...int) (<-chan interface{}, <-chan int) {
heartStream := make(chan interface{}, 1)
intStream := make(chan int)
go func() {
defer func() {
close(heartStream)
close(intStream)
}()
time.Sleep(time.Second)
for _, num := range nums {
select {
case heartStream <- struct{}{}:
default:
}
select {
case <-ctx.Done():
return
case intStream <- num:
}
}
}()
return heartStream, intStream
}
func TestDoWork2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
intSlice := []int{0, 1, 2, 3, 4}
heartBeat, results := DoWork2(ctx, intSlice...)
<-heartBeat
i := 0
for r := range results {
if want := intSlice[i]; r != want {
require.Equal(t, want, r, "idx=", i)
}
i++
}
}
- 间隔心跳可以用于防止超时
func DoWork3(ctx context.Context, pulseInterval time.Duration, nums ...int) (<-chan interface{}, <-chan int) {
heartStream := make(chan interface{}, 1)
intStream := make(chan int)
go func() {
defer close(heartStream)
defer close(intStream)
time.Sleep(2 * time.Second)
pulse := time.NewTicker(pulseInterval)
defer pulse.Stop()
numLoop:
for _, n := range nums {
for {
select {
case <-ctx.Done():
return
case <-pulse.C:
select {
case heartStream <- struct{}{}:
default:
}
case intStream <- n:
continue numLoop
}
}
}
}()
return heartStream, intStream
}
func TestDoWork3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
intSlice := []int{0, 1, 2, 3, 4}
const timeout = 2 * time.Second
heartBeat, results := DoWork3(ctx, timeout, intSlice...)
<-heartBeat
i := 0
for {
select {
case r, ok := <-results:
if ok == false {
return
}
require.Equal(t, intSlice[i], r)
i++
case <-heartBeat:
case <-time.After(timeout):
t.Fatal("time out")
}
}
}
速率限制
使用"golang.org/x/time/rate" 包实现混合限流
type RateLimiter interface {
Wait(ctx context.Context) error
Limit() rate.Limit
}
type multiLimiter struct {
limiters []RateLimiter
}
func (m *multiLimiter) Wait(ctx context.Context) error {
for _, l := range m.limiters {
if err := l.Wait(ctx); err != nil {
return err
}
}
return nil
}
func (m *multiLimiter) Limit() rate.Limit {
return m.limiters[0].Limit()
}
func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
byLimit := func(i, j int) bool { return limiters[i].Limit() < limiters[j].Limit() }
sort.Slice(limiters, byLimit)
return &multiLimiter{limiters: limiters}
}
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration / time.Duration(eventCount))
}
这里模拟一个需要混合限流的API
type API interface {
ReadFile(ctx context.Context) error
ResolveAddress(ctx context.Context) error
}
type testApi struct {
netWorkLimit, diskLimit, apiLimit limit.RateLimiter
}
func Open() API {
apiLimit := limit.MultiLimiter(
rate.NewLimiter(limit.Per(2, time.Second), 1),
rate.NewLimiter(limit.Per(10, time.Minute), 10),
)
diskLimit := limit.MultiLimiter(
rate.NewLimiter(rate.Limit(1), 1),
)
netWorkLimit := limit.MultiLimiter(
rate.NewLimiter(limit.Per(3, time.Second), 3),
)
return &testApi{
apiLimit: apiLimit,
diskLimit: diskLimit,
netWorkLimit: netWorkLimit,
}
}
func (t *testApi) ReadFile(ctx context.Context) error {
if err := limit.MultiLimiter(t.apiLimit, t.diskLimit).Wait(ctx); err != nil {
return err
}
return nil
}
func (t *testApi) ResolveAddress(ctx context.Context) error {
if err := limit.MultiLimiter(t.apiLimit, t.netWorkLimit).Wait(ctx); err != nil {
return err
}
return nil
}
最后测试限流情况
func main() {
defer log.Println("Done")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.Lshortfile)
apiConn := api.Open()
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if err := apiConn.ReadFile(context.Background()); err != nil {
log.Println("cannot read file:", err)
return
}
log.Println("read file")
}()
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if err := apiConn.ResolveAddress(context.Background()); err != nil {
log.Println("cannot resolve address:", err)
return
}
log.Println("ResolveAddress")
}()
}
wg.Wait()
}
复制请求
将一个请求复制到多个处理程序,选择最快处理完成的结果
func Work(ctx context.Context, id int, wg *sync.WaitGroup, resultChan chan<- int) {
defer wg.Done()
start := time.Now()
costTime := time.Duration(rand.Intn(5)+1) * time.Second
select {
case <-ctx.Done():
case <-time.After(costTime):
select {
case <-ctx.Done():
case resultChan <- id:
}
}
tookTime := time.Since(start)
if tookTime < costTime {
tookTime = costTime
}
log.Printf("%v cost %v\n", id, tookTime)
}
func test1() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
resultChan := make(chan int)
n := 10
wg.Add(n)
for i := 0; i < n; i++ {
go Work(ctx, i, &wg, resultChan)
}
first := <-resultChan
cancel()
wg.Wait()
close(resultChan)
fmt.Println("received result:", first)
}
治愈异常的go程
当一些goroutine处于异常状态时,尝试对其进行重启
通过心跳机制判断goroutine的状态(最好在心跳中包含某些信息用于判断其不是活锁)
我们需要一个管理员来负责监控并重启goroutine
type StartGoroutineFn func(done <-chan any, pulseInterval time.Duration) <-chan any
func NewSteward(timeout time.Duration, startGoroutine StartGoroutineFn) StartGoroutineFn {
return func(done <-chan any, pulseInterval time.Duration) <-chan any {
heartBeat := make(chan any)
go func() {
defer close(heartBeat)
var wardDone chan any
var wardHeartbeat <-chan any
startWard := func() {
wardDone = make(chan any)
wardHeartbeat = startGoroutine(common.Or(wardDone, done), timeout/2)
}
startWard()
pulse := time.Tick(pulseInterval)
monitorLoop:
for {
timeoutSignal := time.After(timeout)
for {
select {
case <-pulse:
select {
case heartBeat <- struct{}{}:
default:
}
case <-wardHeartbeat:
continue monitorLoop
case <-timeoutSignal:
log.Println("stewart: ward unhealthy;restarting")
close(wardDone)
startWard()
continue monitorLoop
case <-done:
return
}
}
}
}()
return heartBeat
}
}
func badWorker() StartGoroutineFn {
return func(done <-chan any, pulseInterval time.Duration) <-chan any {
log.Println("ward: Hello, I am irresponsible")
go func() {
<-done
log.Println("ward: I am halting")
}()
return nil
}
}
func generatorIntStream(done <-chan interface{}, intList ...int) (<-chan interface{}, StartGoroutineFn) {
intChanStream := make(chan (<-chan interface{}))
intStream := common.Bridge(done, intChanStream)
return intStream, func(done <-chan any, pulseInterval time.Duration) <-chan any {
intStream := make(chan interface{})
heartBeat := make(chan interface{})
go func() {
defer close(intStream)
defer log.Println("over")
log.Println("start")
select {
case intChanStream <- intStream:
case <-done:
return
}
pulse := time.Tick(pulseInterval)
for {
valueLoop:
for _, intVal := range intList {
if intVal < 0 {
log.Printf("negative value:%v\n", intVal)
return
}
time.Sleep(pulseInterval * 2)
for {
select {
case <-pulse:
select {
case heartBeat <- struct{}{}:
default:
}
case intStream <- intVal:
continue valueLoop
case <-done:
return
}
}
}
}
}()
return heartBeat
}
}
func main() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC | log.Lshortfile)
done := make(chan any)
time.AfterFunc(time.Minute, func() {
log.Println("main: halting stewart and ward.")
close(done)
})
intStream, startFn := generatorIntStream(done, 1, 2, 3, 4)
heartBeat := NewSteward(4*time.Second, startFn)(done, 4*time.Second)
for {
select {
case <-done:
log.Println("Done")
return
case <-heartBeat:
log.Println("Steward is healthy")
case val := <-intStream:
log.Println("received intVal:", val.(int))
}
}
}
实际运行截图
|