IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> python多进程读写文件 -> 正文阅读

[Python知识库]python多进程读写文件

RT

#encoding:utf-8 
import numpy as np 
import sys
import time
import threading
from queue import Queue 


np.random.seed(1)


# Same with n_compute_threads
READ_BATCH_SIZE=25
WRITE_BATCH_SIZE=25


# read_queue
r_q = Queue(READ_BATCH_SIZE)       
# write_queue
w_q = Queue(WRITE_BATCH_SIZE)



class ProducerThread(threading.Thread):
    
    def __init__(self, pin_cid3_vec_path, batch_size=100, name='read_producer'):
        super(ProducerThread, self).__init__()
        self.pin_cid3_vec_path = pin_cid3_vec_path
        self.batch_size=batch_size
        self.name = name
    

    def run(self):

        # compute batch_size

        pin_list = []
        pin_vec_list = []
    
        num = 0
        
        f = open(self.pin_cid3_vec_path, 'r')
            
        for line in f:
            buf = line.strip().split(',')
            if len(buf) != 101:
                continue

            num = num + 1
    
            pin = buf[0]
            pin_vec = np.array(buf[1:], dtype=np.float32)

            pin_list.append(pin)
            pin_vec_list.append(pin_vec)

            if num % self.batch_size == 0:
                # Block until r_q is not full
                r_q.put((pin_list, pin_vec_list))

                print('Read %d' % num)
                pin_list = []
                pin_vec_list = []
                        
                time.sleep(0.1)    

        f.close()
        
        if len(pin_list) > 0:
            r_q.put((pin_list, pin_vec_list))
                
            print('Read %d' % num)
            pin_list = []
            pin_vec_list = []

            
        # Terminate Signal    
        #for i in range(10):
        for i in range(READ_BATCH_SIZE):
            r_q.put(None)



class ComputeConsumerThread(threading.Thread):
    
    def __init__(self, cid3_vec_path, top_K):
        super(ComputeConsumerThread, self).__init__()
        
        self.cid3_vec = get_cid3_vec(cid3_vec_path)
        self.top_K = int(top_K)

    
    def run(self):
        
        while True:

            # Block until get
            queue_item = r_q.get()

            if queue_item == None:
                w_q.put(None)
                break
    
            pin_list, pin_vec_list = queue_item

            cid3_ref = np.dot(np.array(pin_vec_list), self.cid3_vec)
        
            # from small to big    
            #sorted_idx = np.argsort(cid3_ref, axis=1)

            # !!! from big to small
            sorted_idx = np.argsort(-cid3_ref, axis=1)

            
            out_lines = []    
            for i in range(len(pin_list)):
                pin = pin_list[i]
                #cid3_pref = ','.join(sorted_idx[i][:self.top_K+1].astype(str))
                cid3_pref = ','.join(sorted_idx[i][:self.top_K].astype(str))
                out_lines.append(pin + '\t' + cid3_pref)

            w_q.put('\n'.join(out_lines) + '\n')
            time.sleep(0.1)    
        



class WriteConsumerThread(threading.Thread):

    def __init__(self, output_path, n_compute_threads, thread_id):
        super(WriteConsumerThread, self).__init__()
        self.output_path = output_path
        self.n_compute_threads = n_compute_threads
        self.thread_id = thread_id

    
    def run(self):

        num = 0

        fw = open(self.output_path, 'w')

        flush_count = 0

        while True:
            out_lines = w_q.get()

            if out_lines == None:
                num = num + 1    
                if num >= self.n_compute_threads:
                    #fw.flush()
                    #fw.close()
                    break 
                continue    
            else:
                #print(out_lines[:10])    
                fw.write(out_lines)
                #fw.flush()

                flush_count = flush_count + 1

                if flush_count % 10 == 0:
                    fw.flush()
                print('Write')
                time.sleep(0.1)
        
        fw.flush()
        fw.close()    




def get_cid3_vec(cid3_vec_path):
    # cid3,f1,f2,...,f100

    cid3_vec = []

    with open(cid3_vec_path, 'r') as f:
        for line in f:
            cid3_vec.append(line.strip().split(' ')[1:])

    return np.transpose(np.array(cid3_vec, dtype=np.float32))




def thread_main(pin_cid3_vec_path, 
    cid3_vec_path, 
    output_path, 
    top_K, 
    batch_size, 
    n_compute_threads=5):
    
    p = ProducerThread(pin_cid3_vec_path, batch_size)

    threads = []

    p.start()
    threads.append(p)


    for i in range(n_compute_threads):
        compute_c = ComputeConsumerThread(cid3_vec_path, top_K)
        compute_c.start()
        threads.append(compute_c)


    for i in range(1):    
        write_c = WriteConsumerThread(output_path, n_compute_threads, thread_id=i)    
        write_c.start()
        threads.append(write_c)    


    start_time = time.time()
    print("Compute Start: %s" % time.ctime(int(start_time)))

    for thread in threads:
        thread.join()

        
    end_time = time.time()
    print("Compute End: %s" % time.ctime(int(end_time)))

    last_time = end_time - start_time

    print("Last time: %.2f seconds" % (last_time / 1000))



if __name__ == '__main__':

    if len(sys.argv) > 6:
        pin_cid3_vec_path = sys.argv[1]
        cid3_vec_path = sys.argv[2]
        output_path = sys.argv[3]
        K = int(sys.argv[4])
        batch_size = int(sys.argv[5])
        n_compute_threads = int(sys.argv[6])

    else:

        home_path = '../../data/'
        pin_cid3_vec_path = home_path + 'pin_vec_by_cid3_vec'
        cid3_vec_path = home_path + 'forU_cid3_vec'
        output_path = home_path + 'forU_pin_cid3_ref'
        K = 10 # top K
        batch_size = 10000
        #batch_size = 10000
        n_compute_threads = 20



    thread_main(pin_cid3_vec_path, 
        cid3_vec_path, 
        output_path, 
        K, 
        batch_size, 
        n_compute_threads)

    '''
    end_time = time.time()
    print("Compute End: %s" % time.ctime(int(end_time)))

    last_time = end_time - start_time

    print("Last time: %.2f seconds" % (last_time / 1000))
    '''

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-09-23 11:25:08  更:2021-09-23 11:26:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 5:46:17-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计