一、队列与线程
1.队列
在训练样本的时候,希望读入的训练样本时有序的
tf.FIFOQueue 先进先出队列,按顺序出队列
tf.RandomShuffleQueue 随机出队列
(1)tf.FIFOQueue:
FIFOQueue(capacity, dtypes, name='fifo_queue')
创建一个以先进先出的顺序对元素进行排队的队列
capacity:整数。可能存储在此队列中的元素数量的上限
dtypes:DType对象列表。长度dtypes必须等于每个队列元
素中的张量数,dtype的类型形状,决定了后面进队列元素形状
method方法:
dequeue(name=None)
enqueue(vals, name=None):
enqueue_many(vals, name=None):vals列表或者元组
返回一个进队列操作
size(name=None)
(2)举例使用tf.FIFOQueue :
完成一个出队列、+1、入队列操作(同步操作):
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import tensorflow as tf
Q = tf.FIFOQueue(3,tf.float32)
enq_mqny = Q.enqueue_many([[0.1,0.2,0.3],])
out_q = Q.dequeue()
data = out_q+1
en_q = Q.enqueue(data)
with tf.Session() as sess:
sess.run(enq_mqny)
for i in range(100):
sess.run(en_q)
for i in range(Q.size().eval()):
print(sess.run(Q.dequeue()))
运行结果:
注意: 分析:当数据量很大时,入队操作从硬盘中读取数据,放入内存中, 主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个 线程,实现异步读取。
2.队列管理器
(1)tf.RandomShuffleQueue
tf.train.QueueRunner(queue, enqueue_ops=None)
创建一个QueueRunner
queue:A Queue
enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程
create_threads(sess, coord=None,start=False)
创建线程来运行给定会话的入队操作
start:布尔值,如果True启动线程;如果为False调用者
必须调用start()启动线程
coord:线程协调器,后面线程管理需要用到
return:
(2)通过队列管理器来实现变量加1,入队,主线程出队列的操作(异步操作):
Q = tf.FIFOQueue(1000, tf.float32)
var = tf.Variable(0.0)
data = tf.assign_add(var, tf.constant(1.0))
en_q = Q.enqueue(data)
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 2)
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
threads = qr.create_threads(sess, coord=coord, start=True)
for i in range(300):
print(sess.run(Q.dequeue()))
3.线程协调器
(1)
tf.train.Coordinator()
线程协调员,实现一个简单的机制来协调一
组线程的终止
request_stop()
should_stop() 检查是否要求停止
join(threads=None, stop_grace_period_secs=120)
等待线程终止
return:线程协调员实例
(2)通过队列管理器来实现变量加1,入队,主线程出队列的操作(异步操作):
Q = tf.FIFOQueue(1000, tf.float32)
var = tf.Variable(0.0)
data = tf.assign_add(var, tf.constant(1.0))
en_q = Q.enqueue(data)
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 2)
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = qr.create_threads(sess, coord=coord, start=True)
for i in range(300):
print(sess.run(Q.dequeue()))
coord.request_stop()
coord.join(threads)
(3)分析: 这时候有一个问题就是,入队自顾自的去执行,在需要的出 队操作完成之后,程序没法结束。需要一个实现线程间的同步,终 止其他线程。
|