初始化queue
'use strict';
const Queue = require('bull');
const queue = new Queue('nike', {
redis: {
port: 6379,
host: '127.0.0.1',
db: 4,
password: null,
},
prefix: 'nike_',
defaultJobOptions: {
attempts: 1,
removeOnComplete: true,
backoff: false,
delay: 0,
},
limiter: {
max: 20000,
duration: 1000,
},
settings: {
maxStalledCount: 1,
guardInterval: 1,
retryProcessDelay: 500,
},
});
module.exports = queue;
队列基本使用
'use strict';
const queue = require('./bullTest');
const random = require('random-string');
const log4js = require('log4js');
const logger = log4js.getLogger();
logger.level = 'info';
queue.on('global:progress', function (jobId, progress) {
logger.info(`Job ${jobId} is ${progress * 100}% ready!`);
});
queue.on('global:completed', jobId => {
logger.info(`global:completed Job with id ${jobId} has been completed`);
});
queue.process((job, done) => {
console.log('消耗-----', job.data);
done();
});
const main = async () => {
for (let i = 0; i < 7; i++) {
const job = await queue.add({
key: random(10),
}, {
delay: (3 - i) * (3 - i) * 1000 + 10000,
});
logger.info('生产者:', job.data, await queue.count());
}
};
main();
node test.js
|