IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> JavaScript知识库 -> 4.3 worker_thread从postgres取数据并发 -> 正文阅读

[JavaScript知识库]4.3 worker_thread从postgres取数据并发

1、数据库配置

config/config.js

exports.config = {
    options:{
        host: 'postgres-testdata.lan',
        port: 5432,
        user: 'kear',
        database: 'test',
        password: '123123',
    }
}

2、数据库连接并返回数据

api:https://node-postgres.com/api/pool/

utils/connect_database.js

const {config} = require('../config/config')
const { Pool } = require('pg')
const pool = new Pool(config.options)

/**
 * @description - This function is used to connect to the database
 * @param index - 需要单次返回的数据条数
 * @param offset - 数据偏移量
 * @returns {Promise<object>}
 */
function getData(index,offset){
    return new Promise((resolve,reject)=>{
        pool.connect( (err, client, release) => {
            if (err) throw err
            client.query(`SELECT * FROM "base-performance_1000w" offset ${offset} limit ${index}`,  (err, res) => {
                release();
                if (err) {
                    reject (err.stack)
                } else {
                    resolve (res.rows)
                }
            })
        })
    })
}
module.exports = getData

3、worker_thread/check/run.js

const {Worker} = require('worker_threads')
const getData = require('../../utils/connetct_database')
const thread_num = 100
const limit = 100//每次取数据条数
const large_limit = 10000000//大数据量取数据条数
const result_memory = new SharedArrayBuffer(32);
const result = new Int32Array(result_memory);
const memory_size = thread_num * 40000;
const share_memory = new SharedArrayBuffer(memory_size);
const thread_data = new Int16Array(share_memory);

async function performanceTest() {
    let result_data = {
        passTotal: 0,//断言通过的数量
        successTotal: 0,//请求成功数量
        count: 0,//重新取数据标记
        totalReq: 0,//总请求数
        maxData: 0,//最大数据量
        exceptionTotal: 0,//请求成功且断言正确的数量
        start_time: 0,//开始时间
        end_time: 0,//结束时间
    }
    for (let i = 0; i < 8; i++) {
        result[i] = 0
    }
    //根据线程数取数据
    for (let i = 0; i < thread_num; i++) {
        await getData(limit, limit * i).then(res => {
            //构造数据结构,绑定线程id
            let data = {
                id: i + 1,//线程id
                value: res
            }
            //将data转换成字符串
            let data_str = JSON.stringify(data);
            //将字符串转成unicode码并存储数据
            if (data_str.length >= (memory_size / 2)) {
                console.log('数据过大,超出内存空间');
            } else {
                for (let j = 0; j < data_str.length; j++) {
                    thread_data[i * (memory_size / (2 * thread_num)) + j] = data_str.charCodeAt(j);
                }
            }
        })
    }
    //创建工作线程
    for (let i = 0; i < thread_num; i++) {
        const worker = new Worker('./wechat_text.js');
        worker.postMessage({
            threadId: worker.threadId,
            data: thread_data,
            memory_size: memory_size,
            thread_num: thread_num,
            statistics: result,
        });
        worker.on('message', async (message) => {
            if (Atomics.load(result, 4) < large_limit) {
                let data = {
                    id: message.threadId,//线程id
                }
                let count = Atomics.load(result, 2);
                await getData(limit, (thread_num + count - 1) * limit).then(res => {
                    data.value = res
                })
                //将data转换成字符串
                let data_str = JSON.stringify(data);
                for (let j = (message.threadId - 1) * 20000; j < 20000 + (message.threadId - 1) * 20000; j++) {
                    // Atomics.store(thread_data, j, data_str.charCodeAt(j - (message.threadId-1) * 20000));
                    if (j < data_str.length) {
                        thread_data[j] = data_str.charCodeAt(j - (message.threadId - 1) * 20000);
                    } else {
                        thread_data[j] = "\0".charCodeAt(0);
                        break
                    }
                }
                worker.postMessage({
                    threadId: message.threadId,
                    data: thread_data,
                    memory_size: memory_size,
                    thread_num: thread_num,
                    statistics: result,
                })
            } else {
                worker.terminate().then(() => {
                })
            }
        })
    }
}

performanceTest().then(() => {
    console.log('正在启动')
}).catch(err => {
    console.log(err)
})
process.on('exit', () => {
    console.table({
        '运行时长': (Atomics.load(result, 7) - Atomics.load(result, 6)) / 1000 + "s",
        '工作线程': thread_num,
        '请求总数': Atomics.load(result, 3),
        '请求成功': Atomics.load(result, 1),
        '请求通过': Atomics.load(result, 0),
        '请求失败': Atomics.load(result, 5),
        '每秒并发': Math.floor(Atomics.load(result, 3) / ((Atomics.load(result, 7) - Atomics.load(result, 6)) / 1000))
    })
})

4、worker_thread/check/text.js

const {parentPort} = require('worker_threads');
const axios = require('axios');
let body = {
    id:"",
    data:{}
}

let config = {
    method: 'post',
    url: 'http://192.168.3.74:8002/api/inspect/v1/context',
    headers: {
        'Content-Type': 'application/json'
    },
    data: body,
    timeout: 10000
};
parentPort.on('message', async (message) => {
    let text = '';
    let tempId = '';
    for (let j = 0; j < message.thread_num; j++) {
        //i每次循环的初值分别是0,20000,40000等差数列,即每个线程存储数据所占用的40000byte,即20000个字符
        for (let i = j * message.memory_size / (2 * message.thread_num); i < (j + 1) * message.memory_size / (2 * message.thread_num); i++) {
            //存储结构中没有字符存入时,默认值是\0,如果遇到\0就跳出,说明已经达到字符串的末尾了,后续的字节就无需再取了
            if (String.fromCharCode(message.data[i]) === '\0') {
                break;
            } else {
                let temp_text = `"id":${message.threadId}`;
                //取前面10个字符,判断threadId是否相等,不相等则跳过,减少循环次数
                if (i === 9 && !new RegExp(temp_text).test(text)) {
                    text = '';
                    tempId = '';
                    break;
                } else {
                    tempId = message.threadId;
                    text += String.fromCharCode(message.data[i]);
                }
            }
        }
        if (text !== '') {
            break;
        }
    }
    if (text !== '') {
        let temp = JSON.parse(text);
        Atomics.add(message.statistics, 4, temp.value.length)
        for (let k of temp.value) {
            body['textContent'] = JSON.stringify(k);
            Atomics.add(message.statistics, 3, 1)
            if (Atomics.load(message.statistics, 6) === 0) {
                Atomics.store(message.statistics, 6, new Date().getTime())
            }
            console.log(Atomics.load(message.statistics, 3))
            await axios(config).then(res => {
                Atomics.add(message.statistics, 1, 1)
                if (res.data.data.result === "nopass") {
                    Atomics.add(message.statistics, 0, 1)
                }

            }).catch(err => {
                Atomics.add(message.statistics, 5, 1)
                console.log(err);
            })
        }
        Atomics.add(message.statistics, 2, 1)
        Atomics.store(message.statistics, 7, new Date().getTime())
        parentPort.postMessage({threadId: tempId, status: 0});

    }
});

5、run.js和text.js中的一些参数,可以挪到config.js里,方便维护

  JavaScript知识库 最新文章
ES6的相关知识点
react 函数式组件 & react其他一些总结
Vue基础超详细
前端JS也可以连点成线(Vue中运用 AntVG6)
Vue事件处理的基本使用
Vue后台项目的记录 (一)
前后端分离vue跨域,devServer配置proxy代理
TypeScript
初识vuex
vue项目安装包指令收集
上一篇文章      下一篇文章      查看所有文章
加:2022-05-13 11:40:51  更:2022-05-13 11:42:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/11 5:37:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码