问题背景
在go语言的http服务中,我们常常会使用到Context来取消一个请求,或者取消数据的读取。偶然的一次尝试,让我对Context有了一定的兴趣。接下来本文围绕下面的例子,分析http如何利用Context来控制请求的取消和影响数据读取。
例子
我们开启一个http服务,发送大量数据给每个请求,代码如下: srv.go:http服务
package main
import (
"fmt"
"net/http"
)
func hello(w http.ResponseWriter, r *http.Request) {
for i := 0; i < 100*10000; i++ {
w.Write([]byte("hello world"))
}
}
func main() {
fmt.Println("listening 8888:")
http.HandleFunc("/hello", hello)
_ = http.ListenAndServe(":8888", nil)
}
client.go: 发送请求的客户端
package main
import (
"context"
"fmt"
"io"
"log"
"net/http"
"time"
)
func main() {
client := http.Client{}
request, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:8888/hello", nil)
ctx, cancelFunc := context.WithCancel(request.Context())
request = request.WithContext(ctx)
if err != nil {
return
}
response, err := client.Do(request)
if err != nil {
log.Fatal(err)
}
cache := make([]byte, 128)
timer := time.NewTimer(time.Millisecond)
go func() {
select {
case <-timer.C:
cancelFunc()
}
}()
for {
read, err := response.Body.Read(cache)
if err == nil {
fmt.Println(string(cache[:read]))
continue
}
if err == io.EOF {
fmt.Println(string(cache[:read]))
break
}
log.Fatal(err)
}
}
代码很简单,就不做注释啦。分别启动服务和client,我们将得到如下结果: 我们看到这句话Process finished with the exit code 1,程序非正常退出,那么首先是追踪这个错误,下面我们追踪这个错误。
错误追踪
首先清楚这个“context canceled” 是客户端打印出来的:
log.Fatal(err)
断点入口:
read, err := response.Body.Read(cache)
我们会进入transport.go文件中:
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock()
closed, rerr := es.closed, es.rerr
es.mu.Unlock()
if closed {
return 0, errReadOnClosedResBody
}
if rerr != nil {
return 0, rerr
}
n, err = es.body.Read(p)
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
es.rerr = err
}
err = es.condfn(err)
}
return
}
然后我们继续进入到bodyEOFSignal的condfn(error)函数中:
func (es *bodyEOFSignal) condfn(err error) error {
if es.fn == nil {
return err
}
err = es.fn(err)
es.fn = nil
return err
}
那我们继续到es.fn(err)中一探究竟:
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
继续到pc.canceled()中:
func (pc *persistConn) canceled() error {
pc.mu.Lock()
defer pc.mu.Unlock()
return pc.canceledErr
}
1. 是什么?
canceledErr error
2. 如何被赋值?
根据canceledErr,我们找被赋值的函数如下:
func (pc *persistConn) cancelRequest(err error) {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.canceledErr = err
pc.closeLocked(errRequestCanceled)
}
错误追踪先到这里。接下来我们换一个角度,我们从Context的角度来看。
Context
这里就不讲context了,有兴趣的伙伴去官网获取吧!!!回到客户端代码,给request传入了一个WithCancel context,看看这个函数做了什么:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() {
c.cancel(true, Canceled)
}
}
进入到c.cancel(),会发现Canceled作为一个错误类型,定义如下:
var Canceled = errors.New("context canceled")
...
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
...
c.err = err
...
}
context先到这里,在context里找到了错误信息的来源,接下来看看错误是如何传给前面我们谈到的canceledErr。 似乎还有一个入口没有看,就是http.client.Do的方法: 我们打断点进入到RoundTrip方法的调用入口,看看下面是如何感知context被取消:
resp, err = rt.RoundTrip(req)
...
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
...
resp, err = rt.RoundTrip(req)
...
}
然后跟着RoundTrip(…), 进入到:
func (t *Transport) roundTrip(req *Request) (*Response, error) {
...
var resp *Response
if pconn.alt != nil {
t.setReqCanceler(cancelKey, nil)
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
}
进入到persistConn的实现方法roundTrip(),我们看看这个for循环:
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
pcClosed := pc.closech
canceled := false
for {
testHookWaitResLoop()
select {
case err := <-writeErrCh:
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C
}
case <-pcClosed:
pcClosed = nil
if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) {
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
}
case <-respHeaderTimer:
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
case re := <-resc:
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
return re.res, nil
case <-cancelChan:
canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
cancelChan = nil
case <-ctxDoneChan:
canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
因而这里的监听不是在客户端取消的context的监听,根据客户端的输出显示,表明请求已经发送到服务端,请求并未超时,response也返回了,那么这里的函数监听是与我们读取数据没有联系。小编最开始也以为是在这里监听返回,然而这里打断点,怎么进不来。在前面提到,连接是类型为persistConn,其次是读取数据过程中,context的取消会产生影响,那么表明错误发生在tcp连接中的读取数据。接下来,根据连接建立过程,看看http做了什么?其次是真正的数据读取来自哪里?
pconn, err := t.getConn(treq, cm)
...
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
...
select{
case <-w.ready:
if w.err != nil {
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
}
}
return w.pc, w.err
...
通过这个w建立连接,进入到dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error)。 在这里面开启了一个协程pconn.readLoop(),读取连接里面的数据。
(t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
...
go pconn.readLoop()
}
因为错误与数据读取有直接联系,至少错误发生readloop中的某一个地方:
for alive {
...
var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace)
} else {
err = transportReadFromServerError{err}
closeErr = err
}
...
waitForBodyRead := make(chan bool, 2)
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
resp.Body = body
...
select {
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
testHookReadLoopBeforeNextRead()
}
熟悉context的便知道,当我们调用context的cancel方法时,在前面的context的cancel()方法中有如下代码:
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
再回到:
ccase <-rc.req.Context().Done():// 当contex取消,便进入这个代码块
alive = false
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
进入到cancelRequest(…)的rc.req.Context().Err()
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
因而传入cancelRequest的err便是cancelErr,我们进入cancelRequest:
func (t *Transport) cancelRequest(key cancelKey, err error) bool {
t.reqMu.Lock()
defer t.reqMu.Unlock()
cancel := t.reqCanceler[key]
delete(t.reqCanceler, key)
if cancel != nil {
cancel(err)
}
return cancel != nil
}
进入cancel(err):
func (pc *persistConn) cancelRequest(err error) {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.canceledErr = err
pc.closeLocked(errRequestCanceled)
}
到这里我们的err就传给了body bodyEOFSignal,整个错误传递流程便走通了。 还剩最后一个问题,bodyEOFSignal的read函数中n, err = es.body.Read§ 所遇到的错误是什么?
n, err = es.body.Read(p)
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
es.rerr = err
}
err = es.condfn(err)
}
return
那么关闭连接又是在哪里呢? 我们回到cancelRequest函数:
pc.closeLocked(errRequestCanceled)
这样err整个传递逻辑和原因便都走同通了!
总结
经过上面的分析,将整个Context取消过程总结如下:
- 当创建一个带有取消的Context,会把Context的内部类中的err变量赋值为CancelErr;
- 客户端的调用cancelFunc,会向context的Done所绑定的channel写入值;
- 当channel写入值后,transport.go中的readLoop方法会监听这个channel的写入,从而把context取消的err传给persistConn,并关闭连接;
- 关闭连接后,数据读取便会遇到连接关闭的网络错误错误,当遇到这个错误,在bodySignal中进行错误处理,这里并不感知连接的关闭,只利用fn分别错误类型,当错误为io.EOF,直接将这个错误置为nil,若不是,便通过bodySignal获取到连接中的错误,再返回这个错误;
- 最后通过body.read()方法将错误打印出来。
- 这里复杂在于,每个角色只做自己的工作,遇到错误不是直接返回,而是等待其他角色来读取错误;具体表现为:context负责生成错误消息、传递取消指令给persistConn;persistConn基于bodySignal建立读取数据和连接的关联,响应Context的取消并关闭连接,拿到context的错误信息;client读取数据和错误;bodySignal:分析错误,并传递数据和persistConn的错误消息给client。
|