[Insight go] http client分析
1. http client的示例
循环去请求 http://www.baidu.com/ ,把第一次的请求结果写入到文件中
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"time"
)
var tr *http.Transport
func init() {
tr = &http.Transport{
MaxIdleConns: 100,
}
}
func WriterFile(filename, content string) {
file, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
fmt.Println("open file err, ", err)
}
defer file.Close()
writer := bufio.NewWriter(file)
writer.WriteString(content)
writer.Flush()
}
func main() {
flag := true
for {
value, err := Get("http://www.baidu.com/")
if err != nil {
fmt.Println("[Error]: ", err)
break
}
if flag {
content := string(value)
WriterFile("./baidu.html", content)
flag = false
}
time.Sleep(time.Second)
}
}
func Get(url string) ([]byte, error) {
m := make(map[string]interface{})
data, err := json.Marshal(m)
if err != nil {
return nil, err
}
body := bytes.NewReader(data)
req, _ := http.NewRequest("Get", url, body)
req.Header.Add("content-type", "application/json")
client := &http.Client{
Transport: tr,
Timeout: 3 * time.Second,
}
res, err := client.Do(req)
if res != nil {
defer res.Body.Close()
}
if err != nil {
return nil, err
}
resBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
return resBody, nil
}
2. 程序运行分析
基于go1.18源码包
2.1 init
程序执行前首先执行init函数,设置Transport中的最大idle连接数
2.2 main
执行main函数,调用到Get函数,Get函数中
- 创建<string, interface{}> map m, 序列化m 为 byte[] data
- 创建请求的body, 设置context-type类型为application/json
- 创建client对象,发起请求client.Do(req)为阻塞请求,返回res,err
2.3 client.Do(req)分析
func (c *Client) Do(req *Request) (*Response, error) {
return c.do(req)
}
var testHookClientDoResult func(retres *Response, reterr error)
func (c *Client) do(req *Request) (retres *Response, reterr error) {
if testHookClientDoResult != nil {
defer func() { testHookClientDoResult(retres, reterr) }()
}
if req.URL == nil {
req.closeBody()
return nil, &url.Error{
Op: urlErrorOp(req.Method),
Err: errors.New("http: nil Request.URL"),
}
}
var (
deadline = c.deadline()
reqs []*Request
resp *Response
copyHeaders = c.makeHeadersCopier(req)
reqBodyClosed = false
redirectMethod string
includeBody bool
)
uerr := func(err error) error {
if !reqBodyClosed {
req.closeBody()
}
var urlStr string
if resp != nil && resp.Request != nil {
urlStr = stripPassword(resp.Request.URL)
} else {
urlStr = stripPassword(req.URL)
}
return &url.Error{
Op: urlErrorOp(reqs[0].Method),
URL: urlStr,
Err: err,
}
}
for {
if len(reqs) > 0 {
loc := resp.Header.Get("Location")
if loc == "" {
resp.closeBody()
return nil, uerr(fmt.Errorf("%d response missing Location header", resp.StatusCode))
}
u, err := req.URL.Parse(loc)
if err != nil {
resp.closeBody()
return nil, uerr(fmt.Errorf("failed to parse Location header %q: %v", loc, err))
}
host := ""
if req.Host != "" && req.Host != req.URL.Host {
if u, _ := url.Parse(loc); u != nil && !u.IsAbs() {
host = req.Host
}
}
ireq := reqs[0]
req = &Request{
Method: redirectMethod,
Response: resp,
URL: u,
Header: make(Header),
Host: host,
Cancel: ireq.Cancel,
ctx: ireq.ctx,
}
if includeBody && ireq.GetBody != nil {
req.Body, err = ireq.GetBody()
if err != nil {
resp.closeBody()
return nil, uerr(err)
}
req.ContentLength = ireq.ContentLength
}
copyHeaders(req)
if ref := refererForURL(reqs[len(reqs)-1].URL, req.URL); ref != "" {
req.Header.Set("Referer", ref)
}
err = c.checkRedirect(req, reqs)
if err == ErrUseLastResponse {
return resp, nil
}
const maxBodySlurpSize = 2 << 10
if resp.ContentLength == -1 || resp.ContentLength <= maxBodySlurpSize {
io.CopyN(io.Discard, resp.Body, maxBodySlurpSize)
}
resp.Body.Close()
if err != nil {
ue := uerr(err)
ue.(*url.Error).URL = loc
return resp, ue
}
}
reqs = append(reqs, req)
var err error
var didTimeout func() bool
if resp, didTimeout, err = c.send(req, deadline); err != nil {
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}
var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}
req.closeBody()
}
}
2.4 c.send(req, deadline)
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
if c.Jar != nil {
for _, cookie := range c.Jar.Cookies(req.URL) {
req.AddCookie(cookie)
}
}
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
if c.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
c.Jar.SetCookies(req.URL, rc)
}
}
return resp, nil, nil
}
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
req := ireq
if rt == nil {
req.closeBody()
return nil, alwaysFalse, errors.New("http: no Client.Transport or DefaultTransport")
}
if req.URL == nil {
req.closeBody()
return nil, alwaysFalse, errors.New("http: nil Request.URL")
}
if req.RequestURI != "" {
req.closeBody()
return nil, alwaysFalse, errors.New("http: Request.RequestURI can't be set in client requests")
}
forkReq := func() {
if ireq == req {
req = new(Request)
*req = *ireq
}
}
if req.Header == nil {
forkReq()
req.Header = make(Header)
}
if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
username := u.Username()
password, _ := u.Password()
forkReq()
req.Header = cloneOrMakeHeader(ireq.Header)
req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
}
if !deadline.IsZero() {
forkReq()
}
stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
resp, err = rt.RoundTrip(req)
if err != nil {
stopTimer()
if resp != nil {
log.Printf("RoundTripper returned a response & error; ignoring response")
}
if tlsErr, ok := err.(tls.RecordHeaderError); ok {
if string(tlsErr.RecordHeader[:]) == "HTTP/" {
err = errors.New("http: server gave HTTP response to HTTPS client")
}
}
return nil, didTimeout, err
}
if resp == nil {
return nil, didTimeout, fmt.Errorf("http: RoundTripper implementation (%T) returned a nil *Response with a nil error", rt)
}
if resp.Body == nil {
if resp.ContentLength > 0 && req.Method != "HEAD" {
return nil, didTimeout, fmt.Errorf("http: RoundTripper implementation (%T) returned a *Response with content length %d but a nil Body", rt, resp.ContentLength)
}
resp.Body = io.NopCloser(strings.NewReader(""))
}
if !deadline.IsZero() {
resp.Body = &cancelTimerBody{
stop: stopTimer,
rc: resp.Body,
reqDidTimeout: didTimeout,
}
}
return resp, nil, nil
}
2.5 rt.RoundTrip(req)
这里rt, c.transport() RoundTrip(*Request) (*Response, error)
package http
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
return t.roundTrip(req)
}
2.6 Transport中的roundTrip方法
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
}
if req.Header == nil {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
scheme := req.URL.Scheme
isHTTP := scheme == "http" || scheme == "https"
if isHTTP {
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid header field name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
}
}
}
}
origReq := req
cancelKey := cancelKey{origReq}
req = setupRewindBody(req)
if altRT := t.alternateRoundTripper(req); altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
var err error
req, err = rewindBody(req)
if err != nil {
return nil, err
}
}
if !isHTTP {
req.closeBody()
return nil, badStringError("unsupported protocol scheme", scheme)
}
if req.Method != "" && !validMethod(req.Method) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
}
if req.URL.Host == "" {
req.closeBody()
return nil, errors.New("http: no Host in request URL")
}
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
t.setReqCanceler(cancelKey, nil)
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
resp.Request = origReq
return resp, nil
}
if http2isNoCachedConnError(err) {
if t.removeIdleConn(pconn) {
t.decConnsPerHost(pconn.cacheKey)
}
} else if !pconn.shouldRetryRequest(req, err) {
if e, ok := err.(nothingWrittenError); ok {
err = e.error
}
if e, ok := err.(transportReadFromServerError); ok {
err = e.err
}
return nil, err
}
testHookRoundTripRetried()
req, err = rewindBody(req)
if err != nil {
return nil, err
}
}
}
2.7 t.getConn(treq, cm)
尝试去获取一个空闲连接,用于发起 http 连接 pconn, err := t.getConn(treq, cm)
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
if pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
t.setReqCanceler(treq.cancelKey, func(error) {})
return pc, nil
}
cancelc := make(chan error, 1)
t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
t.queueForDial(w)
select {
case <-w.ready:
if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
}
if w.err != nil {
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
}
}
return w.pc, w.err
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
这里分为几个主要的函数
- t.queueForIdleConn(w);查找可以作为长连接复用的空闲连接
- t.queueForDial(w) Queue for permission to dial
2.7.1 t.queueForIdleConn(w)
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
if t.DisableKeepAlives {
return false
}
t.idleMu.Lock()
defer t.idleMu.Unlock()
t.closeIdle = false
if w == nil {
return false
}
var oldTime time.Time
if t.IdleConnTimeout > 0 {
oldTime = time.Now().Add(-t.IdleConnTimeout)
}
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {
pconn := list[len(list)-1]
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
if tooOld {
go pconn.closeConnIfStillIdle()
}
if pconn.isBroken() || tooOld {
list = list[:len(list)-1]
continue
}
delivered = w.tryDeliver(pconn, nil)
if delivered {
if pconn.alt != nil {
} else {
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
stop = true
}
if len(list) > 0 {
t.idleConn[w.key] = list
} else {
delete(t.idleConn, w.key)
}
if stop {
return delivered
}
}
if t.idleConnWait == nil {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.idleConnWait[w.key]
q.cleanFront()
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
}
2.7.2 t.queueForDial(w)
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}
t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}
func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()
pc, err := t.dialConn(w.ctx, w.cm)
delivered := w.tryDeliver(pc, err)
if err == nil && (!delivered || pc.alt != nil) {
t.putOrCloseIdleConn(pc)
}
if err != nil {
t.decConnsPerHost(w.key)
}
}
2.7.4 dialConn
persistConn是个具体的连接实例,包含了连接的上下文,会启动两个groutine分别执行go pconn.readLoop 和 go pconn.writeLoop ,
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
if cm.proxyURL != nil {
return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
}
return err
}
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
var err error
pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
if tc, ok := pconn.conn.(*tls.Conn); ok {
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
if err := tc.HandshakeContext(ctx); err != nil {
go pconn.conn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return nil, err
}
cs := tc.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
pconn.tlsState = &cs
}
} else {
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
switch {
case cm.proxyURL == nil:
case cm.proxyURL.Scheme == "socks5":
conn := pconn.conn
d := socksNewDialer("tcp", conn.RemoteAddr().String())
if u := cm.proxyURL.User; u != nil {
auth := &socksUsernamePassword{
Username: u.Username(),
}
auth.Password, _ = u.Password()
d.AuthMethods = []socksAuthMethod{
socksAuthMethodNotRequired,
socksAuthMethodUsernamePassword,
}
d.Authenticate = auth.Authenticate
}
if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
conn.Close()
return nil, err
}
case cm.targetScheme == "http":
pconn.isProxy = true
if pa := cm.proxyAuth(); pa != "" {
pconn.mutateHeaderFunc = func(h Header) {
h.Set("Proxy-Authorization", pa)
}
}
case cm.targetScheme == "https":
conn := pconn.conn
var hdr Header
if t.GetProxyConnectHeader != nil {
var err error
hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
if err != nil {
conn.Close()
return nil, err
}
} else {
hdr = t.ProxyConnectHeader
}
if hdr == nil {
hdr = make(Header)
}
if pa := cm.proxyAuth(); pa != "" {
hdr = hdr.Clone()
hdr.Set("Proxy-Authorization", pa)
}
connectReq := &Request{
Method: "CONNECT",
URL: &url.URL{Opaque: cm.targetAddr},
Host: cm.targetAddr,
Header: hdr,
}
connectCtx := ctx
if ctx.Done() == nil {
newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
connectCtx = newCtx
}
didReadResponse := make(chan struct{})
var (
resp *Response
err error
)
go func() {
defer close(didReadResponse)
err = connectReq.Write(conn)
if err != nil {
return
}
br := bufio.NewReader(conn)
resp, err = ReadResponse(br, connectReq)
}()
select {
case <-connectCtx.Done():
conn.Close()
<-didReadResponse
return nil, connectCtx.Err()
case <-didReadResponse:
}
if err != nil {
conn.Close()
return nil, err
}
if resp.StatusCode != 200 {
_, text, ok := strings.Cut(resp.Status, " ")
conn.Close()
if !ok {
return nil, errors.New("unknown status code")
}
return nil, errors.New(text)
}
}
if cm.proxyURL != nil && cm.targetScheme == "https" {
if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
return nil, err
}
}
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
if e, ok := alt.(erringRoundTripper); ok {
return nil, e.RoundTripErr()
}
return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
}
}
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
2.7.5 pconn.readLoop
读循环goroutine 通过 chan requestAndChan 接受主goroutine发送的request(rc := <-pc.reqch ), 并从tcp输出流中读取response, 然后反序列化到结构体中, 最后通过requestAndChan 中的 chan responseAndError 返给主goroutine (rc.ch <- responseAndError{resp, err} )
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}
eofc := make(chan struct{})
defer close(eofc)
testHookMu.Lock()
testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
testHookMu.Unlock()
alive := true
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
_, err := pc.br.Peek(1)
pc.mu.Lock()
if pc.numExpectedResponses == 0 {
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())
var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace)
} else {
err = transportReadFromServerError{err}
closeErr = err
}
if err != nil {
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}
select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}
pc.readLimit = maxInt64
pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()
bodyWritable := resp.bodyIsWritable()
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
alive = false
}
if !hasBody || bodyWritable {
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyWritable {
closeErr = errCallerOwnsConn
}
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
testHookReadLoopBeforeNextRead()
continue
}
waitForBodyRead := make(chan bool, 2)
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
resp.Body = body
if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Uncompressed = true
}
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
select {
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
testHookReadLoopBeforeNextRead()
}
}
2.7.6 pconn.writeLoop
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
if bre, ok := err.(requestBodyReadError); ok {
err = bre.error
wr.req.setError(err)
}
if err == nil {
err = pc.bw.Flush()
}
if err != nil {
if pc.nwrite == startBytesWritten {
err = nothingWrittenError{err}
}
}
pc.writeErrCh <- err
wr.ch <- err
if err != nil {
pc.close(err)
return
}
case <-pc.closech:
return
}
}
}
2.8 pconn.roundTrip(treq)
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
pc.t.putOrCloseIdleConn(pc)
return nil, errRequestCanceled
}
pc.mu.Lock()
pc.numExpectedResponses++
headerFn := pc.mutateHeaderFunc
pc.mu.Unlock()
if headerFn != nil {
headerFn(req.extraHeaders())
}
requestedGzip := false
if !pc.t.DisableCompression &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
requestedGzip = true
req.extraHeaders().Set("Accept-Encoding", "gzip")
}
var continueCh chan struct{}
if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
continueCh = make(chan struct{}, 1)
}
if pc.t.DisableKeepAlives &&
!req.wantsClose() &&
!isProtocolSwitchHeader(req.Header) {
req.extraHeaders().Set("Connection", "close")
}
gone := make(chan struct{})
defer close(gone)
defer func() {
if err != nil {
pc.t.setReqCanceler(req.cancelKey, nil)
}
}()
const debugRoundTrip = false
startBytesWritten := pc.nwrite
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
pcClosed := pc.closech
canceled := false
for {
testHookWaitResLoop()
select {
case err := <-writeErrCh:
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C
}
case <-pcClosed:
pcClosed = nil
if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) {
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
}
case <-respHeaderTimer:
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
case re := <-resc:
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
return re.res, nil
case <-cancelChan:
canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
cancelChan = nil
case <-ctxDoneChan:
canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
}
2.9 summary
- 主goroutine ->requestAndChan -> 读循环goroutine
- 主goroutine ->writeRequest-> 写循环goroutine
- 主goroutine 通过select 监听各个channel上的数据, 比如请求取消, timeout,长连接挂了,写流出错,读流出错
type persistConn struct {
...
t *Transport
cacheKey connectMethodKey
conn net.Conn
tlsState *tls.ConnectionState
br *bufio.Reader
bw *bufio.Writer
nwrite int64
reqch chan requestAndChan
writech chan writeRequest
closech chan struct{}
isProxy bool
sawEOF bool
readLimit int64
writeErrCh chan error
...
}
|