IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Nodejs 中的发布订阅模式: 以小型聊天应用程序为例子? -> 正文阅读

[网络协议]Nodejs 中的发布订阅模式: 以小型聊天应用程序为例子?

Public/Subscribe (发布/订阅)模式, 简写 Pub/Sub
subscribe 发布消息的 publisher (发布者/发布方) 可能位于不同的节点上面
Pub/Sub 有两种形式

  1. 端对端
  2. 通过 message broker 来通信
    在这里插入图片描述

publisher 不需要提前直到谁会接受它所发出的消息, 消息需要由 subscriber 根据自己的兴趣去订阅

Pub/Sub模式的双发是松耦合的, 使用打造持续进化分布式系统, 如果使用了 broker, 耦合程序可以继续降低, 因为 subscriber 只需要和 broker 互动就行, broker本身还支持消息队列, 通信不畅的情况下将消息保存起来, 保证传输的可靠性

极简即时聊天程序

服务器实现

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'

const server = createServer((req, res) => {
  // 请求转发, 展示 www 目录下的文件内容 (public: 设置服务的子目录,  默认 index.html)
  return staticHandler(req, res, { public: 'www' })
})

// 将 websocket 服务器与 http 服务器相关联
const wss = new ws.Server({ server })
wss.on('connection', client => {
  console.log('Client connected')
  // websocket 收到客户端发来的消息
  client.on('message', msg => {
    console.log(`Message: ${msg}`)
    // 将消息广播给(已经连接到 webSocket 服务器)的所有客户端
    broadcast(msg)
  })
})

function broadcast (msg) {
  for (const client of wss.clients) {
    if (client.readyState === ws.OPEN) {
      client.send(msg)
    }
  }
}

server.listen(process.argv[2] || 8080)

www/index.html 文件的实现

<body>
  Messages:
  <div id="messages"></div>
  <form id="msgForm">
    <input type="text" placeholder="Send a message" id="msgBox" />
    <input type="submit" value="Send" />
  </form>
  <script>
    const ws = new WebSocket(
      `ws://127.0.0.1:8080`
    )
    // 接受广播的消息
    ws.onmessage = function (message) {
      const msgDiv = document.createElement('div')
      msgDiv.innerHTML = message.data
      document.getElementById('messages').appendChild(msgDiv)
    }
    const form = document.getElementById('msgForm')
    form.addEventListener('submit', (event) => {
      event.preventDefault()
      const message = document.getElementById('msgBox').value
      // 发送消息
      ws.send(message)
      document.getElementById('msgBox').value = ''
    })
  </script>
</body>

如下, 访问相同端口的多个浏览器页面, 显示相同的内容
在这里插入图片描述

以 redis 充当 message broker(消息中介) - 实现不同端口实时聊天

如下图, 服务器会将从客户端收到的消息发布到 Redis broker 上, broker 会将消息派发给所有订阅者
在这里插入图片描述
Redis for Windows 5.0.14.1
百度网盘链接 提取码:g2ki

下载完成后打开 redis-server.exe 即可

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import Redis from 'ioredis'

const redisSub = new Redis()
const redisPub = new Redis()

const server = createServer((req, res) => {
  return staticHandler(req, res, { public: 'www' })
})

const wss = new ws.Server({ server })
wss.on('connection', client => {
  console.log('Client connected')
  client.on('message', msg => {
    console.log(`Message: ${msg}`)
    // 将收到的消息发布到 chat_messages 通道上
    redisPub.publish('chat_messages', msg)
  })
})

redisSub.subscribe('chat_messages')
// 将发布到 chat_messages 通道上的消息, 广播给连接到当前(已经连接到 webSocket 服务器)的所有客户端
redisSub.on('message', (channel, msg) => {
  for (const client of wss.clients) {
    if (client.readyState === ws.OPEN) {
      client.send(msg)
    }
  }
})

server.listen(process.argv[2] || 8080)

开启三个服务器

node index.js 8080
node index.js 8081
node index.js 8082

访问其中一个网站, 其他网站数据也得到更新

在这里插入图片描述

紫色代码注释写错了: client.send 应该是广播给连接了 websocket 的所有的客户端

ZeroMQ 实现端对端的 Publish/Subscribe

message broker 可能产生过高的延迟, 或不允许系统里面出现单点故障等, 这种情况不适合 broker

如果项目很适合蝉蛹端对端的架构, 应该考虑的开发工具, 应该是ZeroMQ

ZeroMQ 是一个网络库, 可构建多种消息传递模式, ZeroMQ库位于底层, 速度快

利用 ZeroMQ 提供的 publish/subscribe socket, 可实现端对端的通信

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
// 解析命令行参数
import yargs from 'yargs'
// ZeroMQ 客户端
import zmq from 'zeromq'

const server = createServer((req, res) => {
  return staticHandler(req, res, { public: 'www' })
})

let pubSocket
async function initializeSockets () {
  // 发布消息: --pub
  pubSocket = new zmq.Publisher()
  await pubSocket.bind(`tcp://127.0.0.1:${yargs.argv.pub}`)

  // 接受订阅消息: --sub
  const subSocket = new zmq.Subscriber()
  const subPorts = [].concat(yargs.argv.sub)
  // 逐个开始订阅
  for (const port of subPorts) {
    console.log(`Subscribing to ${port}`)
    subSocket.connect(`tcp://127.0.0.1:${port}`)
  }
  // 只关心 chat 开头的消息
  subSocket.subscribe('chat')

  for await (const [msg] of subSocket) {
    console.log(`Message from another server: ${msg}`)
    broadcast(msg.toString().split(' ').slice(1).join(' '))
  }
}

initializeSockets()

const wss = new ws.Server({ server })
wss.on('connection', client => {
  console.log('Client connected')
  client.on('message', msg => {
    console.log(`Message: ${msg}`)
    broadcast(msg)
    // 发布给程序里其他服务器实例, 即有 --sub 的所有服务器
    pubSocket.send(`chat ${msg}`)
  })
})

function broadcast (msg) {
  for (const client of wss.clients) {
    if (client.readyState === ws.OPEN) {
      client.send(msg)
    }
  }
}

server.listen(yargs.argv.http || 8080)
node index.js --http 8080 --pub 5000 --sub 5001 --sub 5002
node index.js --http 8081 --pub 5001 --sub 5000 --sub 5002
node index.js --http 8082 --pub 5002 --sub 5000 --sub 5001

在这里插入图片描述

AMQP 高级消息队列协议

消息队列的使用场景是怎样的?
1?? 解耦 2?? 提速 3??广播 4??削峰

AMQP (advanced message queuing protocol 高级消息队列协议) 是个标准的开源协议, 许多消息队列系统都支持该协议, 其定义了一套常用的通信协议

AMQP 协议的各种组件

  • exchange (交换点) 处理信息
  • binding (绑定点) 筛选从 exchange 发来的信息
  • Queue (队列) 存储需要由客户端消费的信息

上面几种组件都有 broker 管理, 并会公布一套创建并操作这些组件API

客户端连接 broker 的时候, 会创建一条 channel
channel : 对 connection 所作的抽象, 负责维持客户端broker 之间的通信状态
在这里插入图片描述

🌰? 给小型聊天应用程序添加一项 history 服务
架构如下
在这里插入图片描述
使用一个模式为 Fanout (散播信息)Exchange: channel.assertExchange('chat', 'fanout'), 之后将队列绑定到 Exchangechannel.bindQueue(queue, 'chat')

消费者: 采用 AMQP 协议使用信息, 负责捕获聊天信息, 并保存到本地数据库中

import { createServer } from 'http'
import level from 'level'
import timestamp from 'monotonic-timestamp'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'

async function main () {
  const db = level('./msgHistory')

  // 建立连接, 采用 AMQP 协议的 broker 相连
  const connection = await amqp.connect('amqp://localhost')
  // 创建 channel , 类似 session 机制, 维护通信状态
  const channel = await connection.createChannel()
  // 设置名为 chat 的 exchange(交换点), assert 断言, 确保 broker 上面必定会有这样一个交换点
  await channel.assertExchange('chat', 'fanout')  // 发散式
  // 创建队列, 默认持久式
  const { queue } = channel.assertQueue('chat_history')
  // 将队列绑定到 exchange 上
  await channel.bindQueue(queue, 'chat')

  // 监听队列发过来的消息, 保存到 levelDB 数据库
  channel.consume(queue, async msg => {
    const content = msg.content.toString()
    console.log(`Saving message: ${content}`)
    // 时间戳当作 key, 可按时间排序
    await db.put(timestamp(), content)
    // 确认消息已经收到, 如果 broker 没有接到 ACK, 会把信息留在队列里面, 以便再度处理
    channel.ack(msg)
  })

  createServer((req, res) => {
    res.writeHead(200)
    db.createValueStream()
      .pipe(JSONStream.stringify())
      .pipe(res)
  }).listen(8090)
}

main().catch(err => console.error(err))

服务器: 将聊天记录公布给客户端

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import amqp from 'amqplib'
import JSONStream from 'JSONStream'
import superagent from 'superagent'

const httpPort = process.argv[2] || 8080

async function main () {
  const connection = await amqp.connect('amqp://localhost')
  const channel = await connection.createChannel()
  await channel.assertExchange('chat', 'fanout')
  // 服务器并不需要采用持久订阅模式, exclusive: true 表明为独占式队列: 发完就忘
  const { queue } = await channel.assertQueue(
    `chat_srv_${httpPort}`,
    { exclusive: true }
  )
  await channel.bindQueue(queue, 'chat')
  // 监听队列发过来的消息, 同时, 不需要回传 ack 信号
  channel.consume(queue, msg => {
    msg = msg.content.toString()
    console.log(`From queue: ${msg}`)
    broadcast(msg)
  }, { noAck: true })

  // serve static files
  const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'www' })
  })

  const wss = new ws.Server({ server })
  wss.on('connection', client => {
    console.log('Client connected')

    client.on('message', msg => {
      console.log(`Message: ${msg}`)
      // 指定目标交换点的名称及路由键
      channel.publish('chat', '', Buffer.from(msg))
    })

    // 向 history 服务器发出查询请求, 将查到的每条历史信息尽快发送给连接本服务器的客户端
    superagent
      .get('http://localhost:8090')
      .on('error', err => console.error(err))
      .pipe(JSONStream.parse('*'))
      .on('data', msg => client.send(msg))
  })

  function broadcast (msg) {
    for (const client of wss.clients) {
      if (client.readyState === ws.OPEN) {
        client.send(msg)
      }
    }
  }

  server.listen(httpPort)
}

main().catch(err => console.error(err))

安装 erlang, RabbitMQ
erlang 官网下载地址
RabbitMQ 官网下载地址
level 6.0.1
在这里插入图片描述

查看 leveldb 全部数据

import level from 'level'
const db = level('./msgHistory')

db.createReadStream()
  .on('data', data => {
    console.log(`${data.key} = ${data.value}`)
  })

使用 Redis 流实现聊天应用程序

除了消息队列, 也可传递消息
如下情况可使用的方案

  • 序列式数据
  • 批量处理
  • 要在过去消息之间寻找联系

日志, 消息更接近于记录, 另外, 记录是由消费者拉去的, 使得消费者能够按照自己的接受处理记录

使用 Reids 流来运做的服务器代码

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import Redis from 'ioredis'

const redisClient = new Redis()
const redisClientXRead = new Redis()

const server = createServer((req, res) => {
  return staticHandler(req, res, { public: 'www' })
})

const wss = new ws.Server({ server })
wss.on('connection', async client => {
  console.log('Client connected')

  client.on('message', msg => {
    console.log(`Message: ${msg}`)
    // 给流的尾部添加新纪录
    // ① 流的名字
    // ② 记录ID: * 意味着让 redis 替我们生成单调递增的 ID
    // ③ 键值对构成的列表
    redisClient.xadd('chat_stream', '*', 'message', msg)
  })

  // 加载消息历史
  // 如何从流中查询早前记录, 以获取聊天历史: "-" 有可能出现的最小 ID, "+" 有可能出现的最大 ID
  const logs = await redisClient.xrange(
    'chat_stream', '-', '+')
  // 解构指令, 完整形式: for (const [recordId, [propertyId, message]] of logs) {...}
  for (const [, [, message]] of logs) {
    client.send(message)
  }
})

function broadcast(msg) {
  for (const client of wss.clients) {
    if (client.readyState === ws.OPEN) {
      client.send(msg)
    }
  }
}

let lastRecordId = '$'

// 编写等待流里面出现新的记录的函数, 这样每个聊天程序都可通过该函数尽快读取到流里面出现的新消息
async function processStreamMessages() {
  while (true) {
    // ① BLICK: 调用阻塞在这里, 直到有新的消息到来
    // ② 超时返回 null 作结果, 0: 永远等待, 直到有新消息为止
    // ③ streams: 意味我们接下来将要指定流的细节
    // ④ 流的名称
    // ⑤ 想要从那一条记录后面读取新的消息, $: 整条流里面 ID 最高的那条记录, 即从上次读取记录之后开始, 每读到一条记录, 更新依次 lastRecordId 变量
    const [[, records]] = await redisClientXRead.xread(
      'BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
    for (const [recordId, [, message]] of records) {
      console.log(`Message from stream: ${message}`)
      broadcast(message)
      lastRecordId = recordId
    }
  }
}

processStreamMessages().catch(err => console.error(err))

server.listen(process.argv[2] || 8080)

根据流的名称chat_stream:

  • 初始启动服务器, 通过redisClient.xrange 读取流中早前记录,
  • websocket 收到客户端发来的消息时, 通过redisClient.xadd给流的尾部添加新纪录 => redisClientXRead.xread : 定义流里面出现新的记录的函数
    在这里插入图片描述
    因为数据是保存在流中的, 刷新浏览器, 或重启服务器, 数据不会消失
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-15 00:40:35  更:2022-04-15 00:40:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/30 3:53:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码