subscribe 订阅模式封装
Redis 里的订阅/发布命令
命令 | 用例和描述 |
---|
subscribe | subscribe channel [channel …] 订阅一个或多个频道 | unsubscribe | unsubscribe [channel [channel …]] 退订频道,如果没有指定频道,则退订所有的频道 | publish | publish channel message 给指定的频道发消息 | psubscribe | psubscribe pattern [pattern …] 订阅给定模式相匹配的所有频道 | punsubscribe | punsubscribe [pattern [pattern …]] 退订给定的模式,如果没有指定模式,则退订所有模式 |
废话不多说直接上代码
let EventEmitter = require('events');
let util = require('util');
const ioredis = require("ioredis");
let RedisSub = function (redisConfig ){
EventEmitter.call(this);
this.LOGGER = false;
this.redisC = new ioredis({
port: redisConfig.port,
host: redisConfig.host,
password: redisConfig.password,
db: redisConfig.db || 0,
family: 4,
});
this.redisC.on("subscribe", (channel, count) => {
this.LOGGER && console.info("client subscribed to " + channel + "," + count + " total subscriptions");
});
this.redisC.on("message", (channel, message) => {
this.LOGGER && console.log(`Received ${message} from ${channel}`);
this.emit(channel, message);
});
this.redisC.on("unsubscribe", (channel, count) => {
this.LOGGER && console.info(`client subscribed unsubscribed from ${channel}, ${count} subscriptions`)
});
}
util.inherits(RedisSub, EventEmitter);
RedisSub.prototype.name = '__redisSubscribeClient__';
RedisSub.prototype.subscribe = function (...channels) {
let self = this;
channels.forEach( function (channel) {
self.redisC.subscribe(`${channel}`, function (err, count) {
if (err) {
console.error("Failed to channel: %s subscribe: %s", channel, err.message);
} else {
self.LOGGER && console.log(`Subscribed channel ${channel} successfully!`);
self.LOGGER && console.log(`This client is currently subscribed to ${count} total channels.`);
}
});
});
}
RedisSub.prototype.unsubscribe = function (...channels) {
let self = this;
_.forEach(channels, function (channel){
self.LOGGER && console.info("subscribe channel: %j", channel);
self.redisC.unsubscribe(`${channel}`);
});
}
module.exports.init = function (redisConfig ) {
return new RedisSub(redisConfig );
};
使用方式
测试用代码
let ioredis = require('ioredis');
let RedisSub = require('./redisSub');
let opts = {
host:'127.0.0.1',
port: 6379,
password: ''
}
let subC = RedisSub.init(opts);
subC.LOGGER=true;
subC.subscribe("test");
subC.on("test", function (msg) {
console.log("==> ", msg)
});
let redisC = new ioredis({
port: opts.port,
host: opts.host,
password: opts.password,
db: opts.db || 0,
family: 4,
});
setInterval(()=>{
redisC.publish("test", "aaaa")
}, 1000);
后记
功能都很简单,没什么特别难的东西,代码弄到本地自己跑了多试试,就理解了,所有都是官方文档里面的,这里也只是进行了功能整合,将平时常用的功能,进行封装了。 具体可以根据项目需要进行扩展。
|