一、网络协议概念
- 我们当前使用的互联网是一系列协议在工作,这些协议规定了电脑如何连接和组网。
- 网络协议是网络上设备(网络服务器、计算机及交换机、路由器,防火墙等)之间通信约定的规则,它规定了通信时信息必须采用的格式和格式的意义。网络通常采用分层的体系结构划分,每一层都建立在它的下层之上,向它的上一层提供特定的服务,而把如何实现这一服务的细节对上一层加以屏蔽。
- 理解了网络协议,就理解了互联网的原理
二、网络协议划分
-
我们把互联网在概念上划分7个层次,物理层,数据链路层,网络层,传输层,会话层,表示层,应用层。而我们大多数程序员工作在应用层(http) -
物理层,调制解调器,同轴电缆,双绞线,光纤等 -
数据链路层,Wi-Fi,以太网,L2TP,PPTP等 -
网络层,IP(IPv4,IPv6),ICMP(ICMPv4,ICMPv6),IGMP,IPsec,ARP,RARP,RIP等 -
传输层,TCP,UDP,TLS,SCTP,OSPF等 -
应用层,DHCP,DNS,FTP, HTTP,POP3 ,SMTP,SNMP ,SSH ,TELNET 等
三、UDP编程
-
UDP协议,是用户数据报协议,是一种无连接的传输层协议,不需要建立连接就能直接进行数据发送和接收,属于不可靠的、没有时序的通信,但是UDP协议的实时性比较好,通常用于视频直播相关领域。 -
//客户端代码
package udp
import (
"fmt"
"net"
"strings"
"time"
)
type Service struct {
Name string
Addr string
Port uint32
Timeout time.Duration
}
/**
*/
func NewService(name, addr string, port uint32, timeout time.Duration) iudp.IService {
if strings.TrimSpace(name) == "" {
name = "udp spa service"
}
ip, err := net.LookupIP(addr)
if err != nil {
log.Fatalf("spa address format err:%s", err.Error())
}
return &Service{
Name: name,
Addr: ip[0].String(),
Port: port,
Timeout: timeout,
}
}
/**
udp request
*/
func (s *Service) Request(message iudp.IMessage) (iudp.IMessage, error) {
address := fmt.Sprintf("%s:%d", s.Addr, s.Port)
conn, err := net.DialTimeout("udp", address, s.Timeout*time.Second)
if err != nil {
log.Errorf("udp dial err:%s", err)
return nil, err
}
defer conn.Close()
dp := NewDataPack()
data, err := dp.Pack(message)
if err != nil {
log.Errorf("udp request data pack err:%s", err)
return nil, err
}
conn.SetDeadline(time.Now().Add(s.Timeout * time.Second))
if _, err := conn.Write(data); err != nil {
log.Errorf("udp write to serve err:%s", err)
return nil, err
}
buf := make([]byte, 1<<10)
if _, err := conn.Read(buf); err != nil {
log.Errorf("udp read from serve err:%s", err)
return nil, err
}
msg, err := dp.Unpack(buf)
if err != nil {
log.Errorf("udp response data unpack err:%s", err)
return nil, err
}
return msg, nil
}
func (s *Service) Stop() {
log.Printf("udp client stopped.")
}
-
//服务端代码
package udp
import (
"fmt"
"math"
"net"
"strings"
"time"
)
const (
StatusStart = 1 // 启动动中
StatusRun = 2 // 运行中
StatusStop = 3 // 已关闭
)
type Service struct {
Name string
Addr string
Port uint32
Timeout time.Duration
Status byte //1:启动,2:运行,3:关闭
UdpConn *net.UDPConn
//Router udp.IRouter
msgHandle iudp.IMsgHandle
ExitChan chan bool
MaxWorkerTaskLen uint32 // 队列最大任务数
WorkerPoolSize uint32 //业务工作Worker池的数量
TaskQueue []chan iudp.IRequest //Worker负责取任务的消息队列
}
/**
udp server 初始化
*/
func NewServer(name, addr string, port uint32, timeout time.Duration) iudp.IServer {
if strings.TrimSpace(name) == "" {
name = "SPA Serve"
}
poolSize := config.Cfg.Spa.WorkPoolSize
if poolSize <= 0 {
poolSize = 10
}
taskLen := config.Cfg.Spa.WorkTaskLen
if taskLen <= 0 {
taskLen = 1024
}
return &Service{
Name: name,
Addr: addr,
Port: port,
Timeout: timeout,
Status: StatusStart,
UdpConn: nil,
ExitChan: make(chan bool),
msgHandle: NewMsgHandle(),
MaxWorkerTaskLen: taskLen,
WorkerPoolSize: poolSize,
TaskQueue: make([]chan iudp.IRequest, taskLen),
}
}
//将消息交给TaskQueue,由worker进行处理
func (s *Service) sendMsgToTaskQueue(request iudp.IRequest) {
//根据requestId来分配当前的连接应该由哪个worker负责处理
//轮询的平均分配法则
//得到需要处理此条连接的workerID
workerID := request.GetId() % s.WorkerPoolSize
log.Printf("Add Request id:%d to worker id:%d", request.GetId(), workerID)
//将请求消息发送给任务队列
s.TaskQueue[workerID] <- request
}
//启动服务
func (s *Service) Start() {
address := fmt.Sprintf("%s:%d", s.Addr, s.Port)
log.Printf("[start] %s :%s.", s.Name, address)
var addr *net.UDPAddr
var err error
if addr, err = net.ResolveUDPAddr("udp", address); err != nil {
log.Fatalf("udp addr resolve err:%s", err)
}
s.UdpConn, err = net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("udp spa server listen err:%s", err)
}
go s.startReader()
s.Status = StatusRun
}
//启动业务服务
func (s *Service) startReader() {
var reqId uint32
data := pool.BufferSmallPool.Get().([]byte) //make([]byte, 1<<16)
defer pool.BufferSmallPool.Put(data)
for {
n, cAddr, err := s.UdpConn.ReadFromUDP(data)
if err != nil {
if s.Status == StatusStop {
return
}
log.Errorf("reader from udp err:%s", err)
continue
}
reqId++
if reqId >= math.MaxInt32 {
reqId = 1
}
log.Printf("udp server reader:%s,%d", string(data[:n]), len(data[:n]))
dp := NewDataPack()
msg, err := dp.Unpack(data[:n])
if err != nil {
log.Errorf("client:%s udp data of unpack err:%s", cAddr.String(), err)
continue
}
req := NewRequest(reqId, s.UdpConn, cAddr, msg)
//go s.msgHandle.DoMsgHandler(req)
s.sendMsgToTaskQueue(req)
}
}
//启动一个Worker工作流程
func (s *Service) startOneWorker(workerID int, taskQueue chan iudp.IRequest) {
//log.Printf("%s Worker ID:%d is started.", s.Name, workerID)
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case request := <-taskQueue:
s.msgHandle.DoMsgHandler(request)
case <-s.ExitChan:
//log.Printf("%s service stopped", s.Name)
return
}
}
}
//启动worker工作池
func (s *Service) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
go func() {
for i := 0; i < int(s.WorkerPoolSize); i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
s.TaskQueue[i] = make(chan iudp.IRequest, s.MaxWorkerTaskLen)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go s.startOneWorker(i, s.TaskQueue[i])
}
}()
}
//启动业务服务
func (s *Service) Serve() {
s.StartWorkerPool()
s.Start()
}
//停止服务
func (s *Service) Stop() {
if s.Status == StatusStop {
return
}
s.Status = StatusStop
//通知读业务,此连接已关闭
s.ExitChan <- true
// 关闭通道
close(s.ExitChan)
//关闭连接
s.UdpConn.Close()
log.Printf("%s stopped.", s.Name)
}
//添加路由处理
func (s *Service) AddRouter(msgId uint32, router iudp.IRouter) {
//s.Router = router
s.msgHandle.AddRouter(msgId, router)
}
func (s *Service) AddPEP(pep ipolicyengine.IService) {
s.msgHandle.AddPEP(pep)
}
四、TCP编程
-
TCP 即传输控制协议,是一种面向连接(连接导向)的、可靠的、基于字节流的传输层(Transport layer)通信协议,因为是面向连接的协议,数据像水流一样传输,会存在黏包问题 -
//客户端代码
package tls
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"fmt"
"net"
"sync"
"time"
)
type TLServe struct {
Name string
IP string
Port uint32
Timeout time.Duration
}
type Connection struct {
//1. tls 服务器参数
TLServe
Conn *tls.Conn
//4.读channel
readChan map[uint32]chan itls.IResponse
//5. conn 是否已关闭
isClosed bool
isConnected chan bool
//7.服务是否退出
isExit bool
exitChan chan bool
l sync.Mutex
}
/**
*/
func NewConnection(name, ip string, port uint32, timeout time.Duration) itls.IConnection {
return &Connection{
TLServe: TLServe{Name: name, IP: ip, Port: port, Timeout: timeout},
Conn: nil,
readChan: make(map[uint32]chan itls.IResponse),
spa: nil,
isClosed: true,
isConnected: make(chan bool, 1),
isExit: false,
exitChan: make(chan bool, 1),
}
}
/**
tls 连接
需要加互斥锁 ,防止其他应用请求进来 重复处理 用户数据同步
*/
func (c *Connection) connect(message itls.IMessage) error {
c.l.Lock()
defer c.l.Unlock()
if c.isClosed == false {
return nil
}
var tryCnt int = 3
SPAStart:
if err := c.knock(); err != nil {
time.Sleep(time.Second * time.Duration(3-tryCnt))
if tryCnt > 0 {
tryCnt--
goto SPAStart
}
log.Errorf("spa has knock 3 times but failed,please check network situation:%s", err)
return err
}
address := fmt.Sprintf("%s:%d", c.IP, c.Port)
var err error
tryCnt = 3
TLSStart:
c.Conn, err = tls.Dial("tcp", address, base.ClientTLSConfig())
if err != nil {
time.Sleep(time.Millisecond * 500 * time.Duration(3-tryCnt))
if tryCnt > 0 {
tryCnt--
goto TLSStart
}
log.Errorf("client tls dial failed:%s", err)
return err
}
c.isConnected <- true
if c.GetHasLogin() && message.GetMsgId() != TypeControllerLogin {
data, err := c.userInfoPack(c)
if err != nil {
log.Errorf("client tls data pack failed:%s", err)
return err
}
typ := uint32(TypeControllerRegion)
msg := NewMessage(typ, data)
c.AddRouter(typ)
wRes := c.write(msg)
if wRes.GetCode() != 200 {
return fmt.Errorf(wRes.GetMsg())
}
res := <-c.readChan[msg.GetMsgId()]
if res.GetCode() != 200 {
log.Errorf("client tls user info sync failed:%s", res.GetMsg())
return fmt.Errorf(res.GetMsg())
}
c.SetLogin(true)
}
c.isClosed = false
log.Debug("tls tunnel:%s established.", c.Conn.RemoteAddr())
return nil
}
/**
*/
func (c *Connection) GetName() string {
return c.Name
}
/**
路由添加
*/
func (c *Connection) AddRouter(typ uint32) {
if _, ok := c.readChan[typ]; ok {
return
}
c.readChan[typ] = make(chan itls.IResponse)
}
/**
客户端请求流量
*/
func (c *Connection) write(message itls.IMessage) itls.IResponse {
dp := NewDataPack()
reqMsg, err := dp.Pack(message)
if err != nil {
return c.responseBad(fmt.Sprintf("pack message id:%d pack err:%s", message.GetMsgId(), err), message)
}
if err := c.Conn.SetWriteDeadline(time.Now().Add(time.Second * 5)); err != nil {
log.Errorf("client tls set write dead line err:%s", err)
return c.responseBad(fmt.Sprintf("message id:%d set write dead line err:%s", message.GetMsgId(), err), message)
}
if _, err := c.Conn.Write(reqMsg); err != nil {
log.Errorf("socket writing data err:%s", err)
c.disconnect()
return c.responseBad(fmt.Sprintf("message id:%d write to client err:%s", message.GetMsgId(), err), message)
}
return c.responseOk(message)
}
/**
服务端响应流量
*/
func (c *Connection) read() itls.IResponse {
//bufConn := bufio.NewReader(c.Conn)
dp := NewDataPack()
headBuf := pool.BufferDelimiterPool.Get().([]byte)
defer pool.BufferDelimiterPool.Put(headBuf)
//headBuf := make([]byte, dp.GetHeadLen())
//_, err := bufConn.Read(headBuf)
_, err := c.Conn.Read(headBuf)
if err != nil {
log.Errorf("socket read head err:%s", err)
c.disconnect()
return c.responseBad(err.Error(), nil)
}
msg, err := dp.Unpack(headBuf)
if err != nil {
return c.responseBad(fmt.Sprintf("conn socket header unpack err:%s ", err), nil)
}
var body []byte
if msg.GetDataLen() > 0 { //拿到数据部分 读取字节长度
bufConn := bufio.NewReaderSize(c.Conn, int(msg.GetDataLen()))
body = make([]byte, msg.GetDataLen())
n, err := bufConn.Read(body)
if err != nil {
log.Errorf("socket read body err:%s", err)
c.disconnect()
return c.responseBad(err.Error(), nil)
}
if uint32(n) < msg.GetDataLen() {
a := n
y := 0
for i := 0; i < 10000; i++ {
y, err = bufConn.Read(body[a:])
if err != nil {
log.Errorf("socket read body err:%s", err)
c.disconnect()
return c.responseBad(err.Error(), nil)
}
a = a + y
if msg.GetDataLen() <= uint32(a) {
break
}
}
}
}
//拿到数据部分
msg.SetData(body)
return c.responseOk(msg)
}
/**
客户端请求
考虑互斥锁
*/
func (c *Connection) Request(message itls.IMessage) (response itls.IResponse) {
//1.判断隧道是否可用
if !c.Available() {
if err := c.connect(message); err != nil {
return c.response(408, "网络请求超时!", message)
}
}
//2.
wRes := c.write(message)
if wRes.GetCode() != 200 {
return c.responseBad(fmt.Sprintf("connect controller err:%s", wRes.GetMsg()), message)
}
timeout, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
select {
//2.读取消息头
case r := <-c.readChan[message.GetMsgId()]:
return r
case <-timeout.Done():
return c.response(408, "网络请求超时", message)
}
}
/**
客户端响应
*/
func (c *Connection) responseOk(data itls.IMessage) (response itls.IResponse) {
res := &Response{}
res.SetCode(200)
res.SetMsg("ok")
if data != nil {
res.SetData(data.GetMsgId(), data.GetData())
}
return res
}
/**
客户端响应
*/
func (c *Connection) responseBad(message string, data itls.IMessage) (response itls.IResponse) {
res := &Response{}
res.SetCode(500)
res.SetMsg(message)
if data != nil {
res.SetData(data.GetMsgId(), data.GetData())
}
return res
}
/**
综合性
*/
func (c *Connection) response(code int, message string, data itls.IMessage) (response itls.IResponse) {
res := &Response{}
res.SetCode(code)
res.SetMsg(message)
if data != nil {
res.SetData(data.GetMsgId(), data.GetData())
}
return res
}
/**
监控读线程
*/
func (c *Connection) StartReader() {
//log.Printf("tls connection started.")
for {
select {
case connected := <-c.isConnected:
for {
//1.连接掉线或未启动 退出
if connected == true {
response := c.read()
if response.GetCode() != 200 {
break
}
c.readChan[response.GetMsgId()] <- response
continue
}
if connected == false {
//等待 新连接 到来
log.Printf("waiting new conn ")
break
}
}
case <-c.exitChan:
log.Printf("socket reader service stop.")
return
}
}
}
//1. 启动连接,让当前连接开始工作
func (c *Connection) Start() {
go c.StartReader()
}
func (c *Connection) Logout() bool {
return false
}
/**
主动断开连接
*/
func (c *Connection) disconnect() {
//1.当前连接已关闭
if c.isClosed == true {
return
}
c.isClosed = true
c.isConnected <- false
//关闭连接
if err := c.Conn.Close(); err != nil {
log.Errorf("conn shutdown err:%s", err)
return
}
c.Conn = nil
log.Printf("conn socket closed.")
}
//2.服务终止
func (c *Connection) Stop() {
//1.当前连接已关闭
if c.isExit == true {
return
}
c.isExit = true
//关闭连接
c.disconnect()
//通知读业务,此连接已关闭
c.exitChan <- true
// 关闭通道
close(c.isConnected)
close(c.exitChan)
}
//3.获取当前连接
func (c *Connection) GetConn() net.Conn {
return c.Conn
}
//5.获取客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
-
//服务端代码
package tls
import (
"bufio"
"errors"
"fmt"
"net"
"time"
)
type Connection struct {
//1.当前连接的socket套接字
Conn net.Conn
//2.当前连接的sessionID,全局唯一
ConnID uint32
//3.当前连接状态
isClosed bool
//4.连接关联的处理方法 :废弃
MsgHandler itls.IMsgHandle
//5.策略执行单元
PEP ipolicyengine.IService
//6.连接停止channel
ExitChan chan bool
//7.无缓冲通道,用于读写俩个goroutine 信息同步
msgChan chan []byte
stopCall imonitor.ICallback
}
/**
*/
func NewConnection(conn net.Conn, connID uint32, handle itls.IMsgHandle, pep ipolicyengine.IService, callback imonitor.ICallback) *Connection {
return &Connection{
Conn: conn,
ConnID: connID,
isClosed: false,
//Router: router,
MsgHandler: handle,
PEP: pep,
ExitChan: make(chan bool, 1),
msgChan: make(chan []byte, 10),
stopCall: callback,
}
}
// 策略执行单元
func (c *Connection) GetPEP() ipolicyengine.IService {
return c.PEP
}
//启动read引擎
func (c *Connection) StartReader() {
//log.Printf("Reader Goroutine:%d is running", c.ConnID)
//defer log.Printf("client:%s reader connId:%d goroutine exit.", c.RemoteAddr().String(), c.ConnID)
bufConn := bufio.NewReader(c.Conn)
for {
select {
case <-c.ExitChan:
return
default:
//1.创建拆包解包对象
dp := &DataPack{}
//2.读取消息头
headBuf := make([]byte, dp.GetHeadLen())
_, err := bufConn.Read(headBuf)
//_, err := io.ReadFull(c.Conn, headBuf)
if err != nil {
log.Errorf("conn socket read head err:%s ", err)
c.Stop()
return
}
//对message 结构
msg, err := dp.Unpack(headBuf)
if err != nil {
log.Errorf("conn socket header unpack err:%s ", err)
c.Stop()
return
}
var data []byte
if msg.GetDataLen() > 0 { //拿到数据部分 读取字节长度
data = make([]byte, msg.GetDataLen())
_, err := bufConn.Read(data)
//_, err := io.ReadFull(c.Conn, data)
if err != nil {
log.Errorf("conn socket read body data err:%s ", err)
c.Stop()
return
}
}
//拿到数据部分
msg.SetData(data)
req := Request{
connection: c,
msg: msg,
}
//把客户端流量数据转化成请求
c.MsgHandler.SendMsgToTaskQueue(&req)
}
}
}
//启动写引擎
func (c *Connection) StartWriter() {
for {
select {
case data := <-c.msgChan:
if _, err := c.Conn.Write(data); err != nil {
log.Errorf("send to client err:%s", err)
c.Stop()
time.Sleep(time.Second)
return
}
case <-c.ExitChan:
return
}
}
}
//1. 启动服务
func (c *Connection) Start() {
defer log.Printf("client:%s connection socket exit.", c.RemoteAddr().String())
// 读 goroutine
go c.StartReader()
//写 goroutine
go c.StartWriter()
select {
case <-c.ExitChan: //进程退出
return
}
}
//2.终止服务
func (c *Connection) Stop() {
//1.当前连接已关闭
if c.isClosed == true {
return
}
c.isClosed = true
//执行回调函数
c.stopCall.Call(c)
//通知读业务,此连接已关闭
c.ExitChan <- true
// 关闭通道
close(c.ExitChan)
//关闭连接
c.Conn.Close()
}
//3.获取当前连接
func (c *Connection) GetConn() net.Conn {
return c.Conn
}
//4.获取当前连接id
func (c *Connection) GetConnID() uint32 {
return c.ConnID
}
//5.获取客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//服务端消息写回客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("conn closed when send msg ")
}
//1.data []byte 封装
dp := NewDataPack()
msg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
c.Stop()
return errors.New(fmt.Sprintf("pack message id:%d pack err:%s", msgId, err))
}
c.msgChan <- msg
return nil
}
func (c *Connection) GetLogin() bool {
return c.isLogin
}
func (c *Connection) SetLogin(f bool) {
c.isLogin = f
}
func (c *Connection) GetMsgAuth() bool {
return c.isMsgAuth
}
func (c *Connection) SetMsgAuth(f bool) {
c.isMsgAuth = f
}
/**
账号登录&短信认证同时具备 才可同步应用到客户端
*/
func (c *Connection) Available() bool {
if c.isLogin && c.isMsgAuth {
return true
}
return false
}
|