1、数据库配置
config/config.js
exports.config = {
options:{
host: 'postgres-testdata.lan',
port: 5432,
user: 'kear',
database: 'test',
password: '123123',
}
}
2、数据库连接并返回数据
api:https://node-postgres.com/api/pool/
utils/connect_database.js
const {config} = require('../config/config')
const { Pool } = require('pg')
const pool = new Pool(config.options)
/**
* @description - This function is used to connect to the database
* @param index - 需要单次返回的数据条数
* @param offset - 数据偏移量
* @returns {Promise<object>}
*/
function getData(index,offset){
return new Promise((resolve,reject)=>{
pool.connect( (err, client, release) => {
if (err) throw err
client.query(`SELECT * FROM "base-performance_1000w" offset ${offset} limit ${index}`, (err, res) => {
release();
if (err) {
reject (err.stack)
} else {
resolve (res.rows)
}
})
})
})
}
module.exports = getData
3、worker_thread/check/run.js
const {Worker} = require('worker_threads')
const getData = require('../../utils/connetct_database')
const thread_num = 100
const limit = 100//每次取数据条数
const large_limit = 10000000//大数据量取数据条数
const result_memory = new SharedArrayBuffer(32);
const result = new Int32Array(result_memory);
const memory_size = thread_num * 40000;
const share_memory = new SharedArrayBuffer(memory_size);
const thread_data = new Int16Array(share_memory);
async function performanceTest() {
let result_data = {
passTotal: 0,//断言通过的数量
successTotal: 0,//请求成功数量
count: 0,//重新取数据标记
totalReq: 0,//总请求数
maxData: 0,//最大数据量
exceptionTotal: 0,//请求成功且断言正确的数量
start_time: 0,//开始时间
end_time: 0,//结束时间
}
for (let i = 0; i < 8; i++) {
result[i] = 0
}
//根据线程数取数据
for (let i = 0; i < thread_num; i++) {
await getData(limit, limit * i).then(res => {
//构造数据结构,绑定线程id
let data = {
id: i + 1,//线程id
value: res
}
//将data转换成字符串
let data_str = JSON.stringify(data);
//将字符串转成unicode码并存储数据
if (data_str.length >= (memory_size / 2)) {
console.log('数据过大,超出内存空间');
} else {
for (let j = 0; j < data_str.length; j++) {
thread_data[i * (memory_size / (2 * thread_num)) + j] = data_str.charCodeAt(j);
}
}
})
}
//创建工作线程
for (let i = 0; i < thread_num; i++) {
const worker = new Worker('./wechat_text.js');
worker.postMessage({
threadId: worker.threadId,
data: thread_data,
memory_size: memory_size,
thread_num: thread_num,
statistics: result,
});
worker.on('message', async (message) => {
if (Atomics.load(result, 4) < large_limit) {
let data = {
id: message.threadId,//线程id
}
let count = Atomics.load(result, 2);
await getData(limit, (thread_num + count - 1) * limit).then(res => {
data.value = res
})
//将data转换成字符串
let data_str = JSON.stringify(data);
for (let j = (message.threadId - 1) * 20000; j < 20000 + (message.threadId - 1) * 20000; j++) {
// Atomics.store(thread_data, j, data_str.charCodeAt(j - (message.threadId-1) * 20000));
if (j < data_str.length) {
thread_data[j] = data_str.charCodeAt(j - (message.threadId - 1) * 20000);
} else {
thread_data[j] = "\0".charCodeAt(0);
break
}
}
worker.postMessage({
threadId: message.threadId,
data: thread_data,
memory_size: memory_size,
thread_num: thread_num,
statistics: result,
})
} else {
worker.terminate().then(() => {
})
}
})
}
}
performanceTest().then(() => {
console.log('正在启动')
}).catch(err => {
console.log(err)
})
process.on('exit', () => {
console.table({
'运行时长': (Atomics.load(result, 7) - Atomics.load(result, 6)) / 1000 + "s",
'工作线程': thread_num,
'请求总数': Atomics.load(result, 3),
'请求成功': Atomics.load(result, 1),
'请求通过': Atomics.load(result, 0),
'请求失败': Atomics.load(result, 5),
'每秒并发': Math.floor(Atomics.load(result, 3) / ((Atomics.load(result, 7) - Atomics.load(result, 6)) / 1000))
})
})
4、worker_thread/check/text.js
const {parentPort} = require('worker_threads');
const axios = require('axios');
let body = {
id:"",
data:{}
}
let config = {
method: 'post',
url: 'http://192.168.3.74:8002/api/inspect/v1/context',
headers: {
'Content-Type': 'application/json'
},
data: body,
timeout: 10000
};
parentPort.on('message', async (message) => {
let text = '';
let tempId = '';
for (let j = 0; j < message.thread_num; j++) {
//i每次循环的初值分别是0,20000,40000等差数列,即每个线程存储数据所占用的40000byte,即20000个字符
for (let i = j * message.memory_size / (2 * message.thread_num); i < (j + 1) * message.memory_size / (2 * message.thread_num); i++) {
//存储结构中没有字符存入时,默认值是\0,如果遇到\0就跳出,说明已经达到字符串的末尾了,后续的字节就无需再取了
if (String.fromCharCode(message.data[i]) === '\0') {
break;
} else {
let temp_text = `"id":${message.threadId}`;
//取前面10个字符,判断threadId是否相等,不相等则跳过,减少循环次数
if (i === 9 && !new RegExp(temp_text).test(text)) {
text = '';
tempId = '';
break;
} else {
tempId = message.threadId;
text += String.fromCharCode(message.data[i]);
}
}
}
if (text !== '') {
break;
}
}
if (text !== '') {
let temp = JSON.parse(text);
Atomics.add(message.statistics, 4, temp.value.length)
for (let k of temp.value) {
body['textContent'] = JSON.stringify(k);
Atomics.add(message.statistics, 3, 1)
if (Atomics.load(message.statistics, 6) === 0) {
Atomics.store(message.statistics, 6, new Date().getTime())
}
console.log(Atomics.load(message.statistics, 3))
await axios(config).then(res => {
Atomics.add(message.statistics, 1, 1)
if (res.data.data.result === "nopass") {
Atomics.add(message.statistics, 0, 1)
}
}).catch(err => {
Atomics.add(message.statistics, 5, 1)
console.log(err);
})
}
Atomics.add(message.statistics, 2, 1)
Atomics.store(message.statistics, 7, new Date().getTime())
parentPort.postMessage({threadId: tempId, status: 0});
}
});
5、run.js和text.js中的一些参数,可以挪到config.js里,方便维护
|