Public/Subscribe (发布/订阅)模式, 简写 Pub/Sub 给 subscribe 发布消息的 publisher (发布者/发布方) 可能位于不同的节点上面 Pub/Sub 有两种形式
- 端对端
- 通过
message broker 来通信
publisher 不需要提前直到谁会接受它所发出的消息, 消息需要由 subscriber 根据自己的兴趣去订阅
Pub/Sub 模式的双发是松耦合 的, 使用打造持续进化的分布式系统 , 如果使用了 broker , 耦合程序可以继续降低, 因为 subscriber 只需要和 broker 互动就行, broker 本身还支持消息队列, 通信不畅的情况下将消息保存起来, 保证传输的可靠性
极简即时聊天程序
服务器实现
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new ws.Server({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadcast(msg)
})
})
function broadcast (msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(process.argv[2] || 8080)
www/index.html 文件的实现
<body>
Messages:
<div id="messages"></div>
<form id="msgForm">
<input type="text" placeholder="Send a message" id="msgBox" />
<input type="submit" value="Send" />
</form>
<script>
const ws = new WebSocket(
`ws://127.0.0.1:8080`
)
ws.onmessage = function (message) {
const msgDiv = document.createElement('div')
msgDiv.innerHTML = message.data
document.getElementById('messages').appendChild(msgDiv)
}
const form = document.getElementById('msgForm')
form.addEventListener('submit', (event) => {
event.preventDefault()
const message = document.getElementById('msgBox').value
ws.send(message)
document.getElementById('msgBox').value = ''
})
</script>
</body>
如下, 访问相同端口的多个浏览器页面, 显示相同的内容
以 redis 充当 message broker(消息中介) - 实现不同端口实时聊天
如下图, 服务器会将从客户端收到的消息 发布到 Redis broker 上, broker 会将消息派发给所有订阅者 Redis for Windows 5.0.14.1 百度网盘链接 提取码:g2ki
下载完成后打开 redis-server.exe 即可
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import Redis from 'ioredis'
const redisSub = new Redis()
const redisPub = new Redis()
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new ws.Server({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisPub.publish('chat_messages', msg)
})
})
redisSub.subscribe('chat_messages')
redisSub.on('message', (channel, msg) => {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
})
server.listen(process.argv[2] || 8080)
开启三个服务器
node index.js 8080
node index.js 8081
node index.js 8082
访问其中一个网站, 其他网站数据也得到更新
紫色代码注释写错了: client.send 应该是广播给连接了 websocket 的所有的客户端
ZeroMQ 实现端对端的 Publish/Subscribe
message broker 可能产生过高的延迟 , 或不允许系统里面出现单点故障 等, 这种情况不适合 broker
如果项目很适合蝉蛹端对端 的架构, 应该考虑的开发工具, 应该是ZeroMQ
ZeroMQ 是一个网络库 , 可构建多种消息传递模式, ZeroMQ 库位于底层, 速度快
利用 ZeroMQ 提供的 publish/subscribe socket, 可实现端对端的通信
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import yargs from 'yargs'
import zmq from 'zeromq'
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
let pubSocket
async function initializeSockets () {
pubSocket = new zmq.Publisher()
await pubSocket.bind(`tcp://127.0.0.1:${yargs.argv.pub}`)
const subSocket = new zmq.Subscriber()
const subPorts = [].concat(yargs.argv.sub)
for (const port of subPorts) {
console.log(`Subscribing to ${port}`)
subSocket.connect(`tcp://127.0.0.1:${port}`)
}
subSocket.subscribe('chat')
for await (const [msg] of subSocket) {
console.log(`Message from another server: ${msg}`)
broadcast(msg.toString().split(' ').slice(1).join(' '))
}
}
initializeSockets()
const wss = new ws.Server({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadcast(msg)
pubSocket.send(`chat ${msg}`)
})
})
function broadcast (msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(yargs.argv.http || 8080)
node index.js --http 8080 --pub 5000 --sub 5001 --sub 5002
node index.js --http 8081 --pub 5001 --sub 5000 --sub 5002
node index.js --http 8082 --pub 5002 --sub 5000 --sub 5001
AMQP 高级消息队列协议
消息队列的使用场景是怎样的? 1?? 解耦 2?? 提速 3??广播 4??削峰
AMQP (advanced message queuing protocol 高级消息队列协议) 是个标准的开源协议 , 许多消息队列系统都支持该协议, 其定义了一套常用的通信协议
AMQP 协议的各种组件
- exchange (交换点) 处理信息
- binding (绑定点) 筛选从
exchange 发来的信息 - Queue (队列) 存储需要由客户端消费的信息
上面几种组件都有 broker 管理, 并会公布一套创建并操作这些组件的API
客户端连接 broker 的时候, 会创建一条 channel channel : 对 connection 所作的抽象, 负责维持客户端 与 broker 之间的通信状态
🌰? 给小型聊天应用程序添加一项 history 服务 架构如下 使用一个模式为 Fanout (散播信息) 的 Exchange : channel.assertExchange('chat', 'fanout') , 之后将队列绑定到 Exchange 上 channel.bindQueue(queue, 'chat')
消费者: 采用 AMQP 协议使用信息, 负责捕获聊天信息, 并保存到本地数据库中
import { createServer } from 'http'
import level from 'level'
import timestamp from 'monotonic-timestamp'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'
async function main () {
const db = level('./msgHistory')
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat', 'fanout')
const { queue } = channel.assertQueue('chat_history')
await channel.bindQueue(queue, 'chat')
channel.consume(queue, async msg => {
const content = msg.content.toString()
console.log(`Saving message: ${content}`)
await db.put(timestamp(), content)
channel.ack(msg)
})
createServer((req, res) => {
res.writeHead(200)
db.createValueStream()
.pipe(JSONStream.stringify())
.pipe(res)
}).listen(8090)
}
main().catch(err => console.error(err))
服务器: 将聊天记录公布给客户端
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import amqp from 'amqplib'
import JSONStream from 'JSONStream'
import superagent from 'superagent'
const httpPort = process.argv[2] || 8080
async function main () {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat', 'fanout')
const { queue } = await channel.assertQueue(
`chat_srv_${httpPort}`,
{ exclusive: true }
)
await channel.bindQueue(queue, 'chat')
channel.consume(queue, msg => {
msg = msg.content.toString()
console.log(`From queue: ${msg}`)
broadcast(msg)
}, { noAck: true })
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new ws.Server({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
channel.publish('chat', '', Buffer.from(msg))
})
superagent
.get('http://localhost:8090')
.on('error', err => console.error(err))
.pipe(JSONStream.parse('*'))
.on('data', msg => client.send(msg))
})
function broadcast (msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(httpPort)
}
main().catch(err => console.error(err))
安装 erlang , RabbitMQ erlang 官网下载地址 RabbitMQ 官网下载地址 level 6.0.1
查看 leveldb 全部数据
import level from 'level'
const db = level('./msgHistory')
db.createReadStream()
.on('data', data => {
console.log(`${data.key} = ${data.value}`)
})
使用 Redis 流实现聊天应用程序
除了消息队列 , 流 也可传递消息 如下情况可使用流 的方案
如日志 , 消息更接近于记录 , 另外, 记录是由消费者拉去的, 使得消费者能够按照自己的接受处理记录
使用 Reids 流来运做的服务器代码
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import Redis from 'ioredis'
const redisClient = new Redis()
const redisClientXRead = new Redis()
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new ws.Server({ server })
wss.on('connection', async client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisClient.xadd('chat_stream', '*', 'message', msg)
})
const logs = await redisClient.xrange(
'chat_stream', '-', '+')
for (const [, [, message]] of logs) {
client.send(message)
}
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
let lastRecordId = '$'
async function processStreamMessages() {
while (true) {
const [[, records]] = await redisClientXRead.xread(
'BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
for (const [recordId, [, message]] of records) {
console.log(`Message from stream: ${message}`)
broadcast(message)
lastRecordId = recordId
}
}
}
processStreamMessages().catch(err => console.error(err))
server.listen(process.argv[2] || 8080)
根据流的名称chat_stream :
- 初始启动服务器, 通过
redisClient.xrange 读取流中早前记录, - websocket 收到客户端发来的消息时, 通过
redisClient.xadd 给流的尾部添加新纪录 => redisClientXRead.xread : 定义流里面出现新的记录的函数 因为数据是保存在流中的, 刷新浏览器, 或重启服务器, 数据不会消失
|