func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
for {
clientConn, err := listener.Accept()
go func() {
handler.Handle(clientConn)
wg.Done()
}()
}
}
func (p *tcpServer) Handle(conn net.Conn) {
buf := make([]byte, 4)
err = prot.IOLoop(client)
if err != nil {
p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
}
}
func (p *protocolV2) IOLoop(c protocol.Client) error {
client := c.(*clientV2)
go p.messagePump(client, messagePumpStartedChan)
for {
if client.HeartbeatInterval > 0 {
client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
} else {
client.SetReadDeadline(zeroTime)
}
line, err = client.Reader.ReadSlice('\n')
params := bytes.Split(line, separatorBytes)
response, err = p.Exec(client, params)
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
}
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
case bytes.Equal(params[0], []byte("RDY")):
return p.RDY(client, params)
case bytes.Equal(params[0], []byte("REQ")):
return p.REQ(client, params)
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
case bytes.Equal(params[0], []byte("MPUB")):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte("DPUB")):
return p.DPUB(client, params)
case bytes.Equal(params[0], []byte("NOP")):
return p.NOP(client, params)
case bytes.Equal(params[0], []byte("TOUCH")):
return p.TOUCH(client, params)
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
case bytes.Equal(params[0], []byte("CLS")):
return p.CLS(client, params)
case bytes.Equal(params[0], []byte("AUTH")):
return p.AUTH(client, params)
}
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
for i := 1; ; i++ {
if err := channel.AddClient(client.ID, client); err != nil {
return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error())
}
}
client.SubEventChan <- channel
}
|