3. TensorflowIO操作

IO操作

本节我们将介绍Tensorflow在IO处理上的一些知识点

  • 线程与队列
  • 数据读取
  • 图像操作

3.1. 读取数据

线程和队列

在使用TensorFlow进行异步计算时,队列是一种强大的机制。

为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQueue),并将其内部所有元素初始化为零。然后,我们构建一个TensorFlow图,它从队列前端取走一个元素,加上1之后,放回队列的后端。慢慢地,队列的元素的值就会增加。

TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner。Coordinator类可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常,QueueRunner类用来协调多个工作线程同时将多个张量推入同一个队列中。

队列概述

队列,如FIFOQueue和RandomShuffleQueue,在TensorFlow的张量异步计算时都非常重要。

例如,一个典型的输入结构:是使用一个RandomShuffleQueue来作为模型训练的输入:

  • 多个线程准备训练样本,并且把这些样本推入队列。
  • 一个训练线程执行一个训练操作

同步执行队列

# 创建一个队列
Q = tf.FIFOQueue(3, dtypes=tf.float32)

# 数据进队列
init = Q.enqueue_many(([0.1, 0.2, 0.3],))

# 定义操作,op,出队列,+1,进队列,注意返回的都是op
out_q = Q.dequeue()
data = out_q + 1
en_q = Q.enqueue(data)


with tf.Session() as sess:

    # 初始化队列,是数据进入
    sess.run(init)

    # 执行两次入队加1
    for i in range(2):
        sess.run(en_q)

    # 循环取队列
    for i in range(3):
        print(sess.run(Q.dequeue()))

tf.QueueRunner

QueueRunner类会创建一组线程, 这些线程可以重复的执行Enquene操作, 他们使用同一个Coordinator来处理线程同步终止。此外,一个QueueRunner会运行一个closer thread,当Coordinator收到异常报告时,这个closer thread会自动关闭队列。

您可以使用一个queue runner,来实现上述结构。 首先建立一个TensorFlow图表,这个图表使用队列来输入样本。增加处理样本并将样本推入队列中的操作。增加training操作来移除队列中的样本。

tf.Coordinator

Coordinator类用来帮助多个线程协同工作,多个线程同步终止。 其主要方法有:

  • should_stop():如果线程应该停止则返回True。
  • request_stop(): 请求该线程停止。
  • join():等待被指定的线程终止。

首先创建一个Coordinator对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行,一直到should_stop()返回True时停止。 任何线程都可以决定计算什么时候应该停止。它只需要调用request_stop(),同时其他线程的should_stop()将会返回True,然后都停下来。

异步执行队列:

#主线程,不断的去取数据,开启其它线程来进行增加计数,入队
#主线程结束了,队列线程没有结束,就会抛出异常
#主线程没有结束,需要将队列线程关闭,防止主线程等待

Q = tf.FIFOQueue(1000,dtypes=tf.float32)

# 定义操作
var = tf.Variable(0.0)
increment_op = tf.assign_add(var,tf.constant(1.0))
en_op = Q.enqueue(increment_op)

# 创建一个队列管理器,指定线程数,执行队列的操作
qr = tf.train.QueueRunner(Q,enqueue_ops=[increment_op,en_op]*3)

with tf.Session() as sess:
    tf.global_variables_initializer().run()

    # 生成一个线程协调器
    coord = tf.train.Coordinator()

    # 启动线程执行操作
    threads_list = qr.create_threads(sess,coord=coord,start=True)

    print(len(threads_list),"----------")
    # 主线程去取数据
    for i in range(20):
        print(sess.run(Q.dequeue()))

    # 请求其它线程终止
    coord.request_stop()

    # 关闭线程
    coord.join(threads_list)

3.2. 线程和队列

读取数据

小数量数据读取

这仅用于可以完全加载到存储器中的小的数据集有两种方法:

  • 存储在常数中。
  • 存储在变量中,初始化后,永远不要改变它的值。

使用常数更简单一些,但是会使用更多的内存,因为常数会内联的存储在数据流图数据结构中,这个结构体可能会被复制几次。

training_data = ...
training_labels = ...
with tf.Session():
  input_data = tf.constant(training_data)
  input_labels = tf.constant(training_labels)

要改为使用变量的方式,您就需要在数据流图建立后初始化这个变量。

training_data = ...
training_labels = ...
with tf.Session() as sess:
  data_initializer = tf.placeholder(dtype=training_data.dtype,
                                    shape=training_data.shape)
  label_initializer = tf.placeholder(dtype=training_labels.dtype,
                                     shape=training_labels.shape)
  input_data = tf.Variable(data_initalizer, trainable=False, collections=[])
  input_labels = tf.Variable(label_initalizer, trainable=False, collections=[])
  ...
  sess.run(input_data.initializer,
           feed_dict={
   data_initializer: training_data})
  sess.run(input_labels.initializer,
           feed_dict={
   label_initializer: training_lables})

设定trainable=False可以防止该变量被数据流图的GraphKeys.TRAINABLE_VARIABLES收集,这样我们就不会在训练的时候尝试更新它的值;设定collections=[]可以防止GraphKeys.VARIABLES收集后做为保存和恢复的中断点。设定这些标志,是为了减少额外的开销

文件读取

先看下文件读取以及读取数据处理成张量结果的过程:

一般数据文件格式有文本、excel和图片数据。那么TensorFlow都有对应的解析函数,除了这几种。还有TensorFlow指定的文件格式。

标准TensorFlow格式

TensorFlow还提供了一种内置文件格式TFRecord,二进制数据和训练类别标签数据存储在同一文件。模型训练前图像等文本信息转换为TFRecord格式。TFRecord文件是protobuf格式。数据不压缩,可快速加载到内存。TFRecords文件包含 tf.train.Example protobuf,需要将Example填充到协议缓冲区,将协议缓冲区序列化为字符串,然后使用该文件将该字符串写入TFRecords文件。在图像操作我们会介绍整个过程以及详细参数。

数据读取实现

文件队列生成函数

  • tf.train.string_input_producer(string_tensor, num_epochs=None, shuffle=True, seed=None, capacity=32, name=None)

产生指定文件张量

文件阅读器类

  • class tf.TextLineReader

阅读文本文件逗号分隔值(CSV)格式

  • tf.FixedLengthRecordReader

要读取每个记录是固定数量字节的二进制文件

  • tf.TFRecordReader

读取TfRecords文件

解码

由于从文件中读取的是字符串,需要函数去解析这些字符串到张量

  • tf.decode_csv(records,record_defaults,field_delim = None,name = None)将CSV转换为张量,与tf.TextLineReader搭配使用
  • tf.decode_raw(bytes,out_type,little_endian = None,name = None) 将字节转换为一个数字向量表示,字节为一字符串类型的张量,与函数tf.FixedLengthRecordReader搭配使用

生成文件队列

将文件名列表交给tf.train.string_input_producer函数。string_input_producer来生成一个先入先出的队列,文件阅读器会需要它们来取数据。string_input_producer提供的可配置参数来设置文件名乱序和最大的训练迭代数,QueueRunner会为每次迭代(epoch)将所有的文件名加入文件名队列中,如果shuffle=True的话,会对文件名进行乱序处理。一过程是比较均匀的,因此它可以产生均衡的文件名队列。

这个QueueRunner工作线程是独立于文件阅读器的线程,因此乱序和将文件名推入到文件名队列这些过程不会阻塞文件阅读器运行。根据你的文件格式,选择对应的文件阅读器,然后将文件名队列提供给阅读器的 read 方法。阅读器的read方***输出一个键来表征输入的文件和其中纪录(对于调试非常有用),同时得到一个字符串标量,这个字符串标量可以被一个或多个解析器,或者转换操作将其解码为张量并且构造成为样本。

# 读取CSV格式文件
# 1、构建文件队列

# 2、构建读取器,读取内容

# 3、解码内容

# 4、现读取一个内容,如果有需要,就批处理内容
import tensorflow as tf
import os
def readcsv_decode(filelist):
    """ 读取并解析文件内容 :param filelist: 文件列表 :return: None """

    # 把文件目录和文件名合并
    flist = [os.path.join("./csvdata/",file) for file in filelist]

    # 构建文件队列
    file_queue = tf.train.string_input_producer(flist,shuffle=False)

    # 构建阅读器,读取文件内容
    reader = tf.TextLineReader()

    key,value = reader.read(file_queue)

    record_defaults = [["null"],["null"]] # [[0],[0],[0],[0]]

    # 解码内容,按行解析,返回的是每行的列数据
    example,label = tf.decode_csv(value,record_defaults=record_defaults)

    # 通过tf.train.batch来批处理数据
    example_batch,label_batch = tf.train.batch([example,label],batch_size=9,num_threads=1,capacity=9)


    with tf.Session() as sess:

        # 线程协调员
        coord = tf.train.Coordinator()

        # 启动工作线程
        threads = tf.train.start_queue_runners(sess,coord=coord)

        # 这种方法不可取
        # for i in range(9):
        # print(sess.run([example,label]))

        # 打印批处理的数据
        print(sess.run([example_batch,label_batch]))


        coord.request_stop()

        coord.join(threads)

    return None


if __name__=="__main__":
    filename_list = os.listdir("./csvdata")
    readcsv_decode(filename_list)

每次read的执行都会从文件中读取一行内容,注意,(这与后面的图片和TfRecords读取不一样),decode_csv操作会解析这一行内容并将其转为张量列表。如果输入的参数有缺失,record_default参数可以根据张量的类型来设置默认值。在调用run或者eval去执行read之前,你必须调用tf.train.start_queue_runners来将文件名填充到队列。否则read操作会被阻塞到文件名队列中有值为止。

3.3. 图像操作

图像基本概念

在图像数字化表示当中,分为黑白和彩色两种。在数字化表示图片的时候,有三个因素。分别是图片的长、图片的宽、图片的颜色通道数。那么黑白图片的颜色通道数为1,它只需要一个数字就可以表示一个像素位;而彩色照片就不一样了,它有三个颜色通道,分别为RGB,通过三个数字表示一个像素位。TensorFlow支持JPG、PNG图像格式,RGB、RGBA颜色空间。图像用与图像尺寸相同(heightwidthchnanel)张量表示。图像所有像素存在磁盘文件,需要被加载到内存。

图像大小压缩

大尺寸图像输入占用大量系统内存。训练CNN需要大量时间,加载大文件增加更多训练时间,也难存放多数系统GPU显存。大尺寸图像大量无关本征属性信息,影响模型泛化能力。最好在预处理阶段完成图像操作,缩小、裁剪、缩放、灰度调整等。图像加载后,翻转、扭曲,使输入网络训练信息多样化,缓解过拟合。Python图像处理框架PIL、OpenCV。TensorFlow提供部分图像处理方法。

  • tf.image.resize_images 压缩图片导致定大小

图像数据读取实例

同样图像加载与二进制文件相同。图像需要解码。输入生成器(tf.train.string_input_producer)找到所需文件,加载到队列。tf.WholeFileReader 加载完整图像文件到内存,WholeFileReader.read 读取图像,tf.image.decode_jpeg 解码JPEG格式图像。图像是三阶张量。RGB值是一阶张量。加载图像格 式为[batch_size,image_height,image_width,channels]。批数据图像过大过多,占用内存过高,系统会停止响应。直接加载TFRecord文件,可以节省训练时间。支持写入多个样本。

读取图片数据到Tensor

管道读端多文件内容处理

但是会发现read只返回一个图片的值。所以我们在之前处理文件的整个流程中,后面的内容队列的出队列需要用特定函数去获取。

  • tf.train.batch 读取指定大小(个数)的张量
  • tf.train.shuffle_batch 乱序读取指定大小(个数)的张量
def readpic_decode(file_list):
    """ 批量读取图片并转换成张量格式 :param file_list: 文件名目录列表 :return: None """

    # 构造文件队列
    file_queue = tf.train.string_input_producer(file_list)

    # 图片阅读器和读取数据
    reader = tf.WholeFileReader()
    key,value = reader.read(file_queue)

    # 解码成张量形式

    image_first = tf.image.decode_jpeg(value)

    print(image_first)

    # 缩小图片到指定长宽,不用指定通道数
    image = tf.image.resize_images(image_first,[256,256])

    # 设置图片的静态形状
    image.set_shape([256,256,3])

    print(image)

    # 批处理图片数据,tensors是需要具体的形状大小
    image_batch = tf.train.batch([image],batch_size=100,num_threads=1,capacity=100)

    tf.summary.image("pic",image_batch)

    with tf.Session() as sess:

        merged = tf.summary.merge_all()

        filewriter = tf.summary.FileWriter("/tmp/summary/dog/",graph=sess.graph)

        # 线程协调器
        coord = tf.train.Coordinator()

        # 开启线程
        threads = tf.train.start_queue_runners(sess=sess,coord=coord)

        print(sess.run(image_batch))

        summary = sess.run(merged)

        filewriter.add_summary(summary)

        # 等待线程回收
        coord.request_stop()
        coord.join(threads)


    return None


if __name__=="__main__":

    # 获取文件列表
    filename = os.listdir("./dog/")

    # 组合文件目录和文件名
    file_list = [os.path.join("./dog/",file) for file in filename]

    # 调用读取函数
    readpic_decode(file_list)

读取TfRecords文件数据

#CIFAR-10的数据读取以及转换成TFRecordsg格式

#1、数据的读取

FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_string("data_dir","./cifar10/cifar-10-batches-bin/","CIFAR数据目录")
tf.app.flags.DEFINE_integer("batch_size",50000,"样本个数")
tf.app.flags.DEFINE_string("records_file","./cifar10/cifar.tfrecords","tfrecords文件位置")

class CifarRead(object):

    def __init__(self,filename):
        self.filelist = filename

        # 定义图片的长、宽、深度,标签字节,图像字节,总字节数
        self.height = 32
        self.width = 32
        self.depth = 3
        self.label_bytes = 1
        self.image_bytes = self.height*self.width*self.depth
        self.bytes = self.label_bytes + self.image_bytes


    def readcifar_decode(self):
        """ 读取数据,进行转换 :return: 批处理的图片和标签 """

        # 1、构造文件队列
        file_queue = tf.train.string_input_producer(self.filelist)

        # 2、构造读取器,读取内容
        reader = tf.FixedLengthRecordReader(self.bytes)

        key,value = reader.read(file_queue)

        # 3、文件内容解码
        image_label = tf.decode_raw(value,tf.uint8)

        # 分割标签与图像张量,转换成相应的格式

        label = tf.cast(tf.slice(image_label,[0],[self.label_bytes]),tf.int32)

        image = tf.slice(image_label,[self.label_bytes],[self.image_bytes])

        print(image)

        # 给image设置形状,防止批处理出错
        image_tensor = tf.reshape(image,[self.height,self.width,self.depth])

        print(image_tensor.eval())
        # depth_major = tf.reshape(image, [self.depth,self.height, self.width])
        # image_tensor = tf.transpose(depth_major, [1, 2, 0])

        # 4、处理流程
        image_batch,label_batch = tf.train.batch([image_tensor,label],batch_size=10,num_threads=1,capacity=10)


        return image_batch,label_batch


    def convert_to_tfrecords(self,image_batch,label_batch):
        """ 转换成TFRecords文件 :param image_batch: 图片数据Tensor :param label_batch: 标签数据Tensor :param sess: 会话 :return: None """

        # 创建一个TFRecord存储器
        writer = tf.python_io.TFRecordWriter(FLAGS.records_file)

        # 构造每个样本的Example
        for i in range(10):
            print("---------")
            image = image_batch[i]
            # 将单个图片张量转换为字符串,以可以存进二进制文件
            image_string = image.eval().tostring()

            # 使用eval需要注意的是,必须存在会话上下文环境
            label = int(label_batch[i].eval()[0])

            # 构造协议块
            example = tf.train.Example(features=tf.train.Features(feature={
   
                "image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_string])),
                "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
            })
            )

            # 写进文件
            writer.write(example.SerializeToString())

        writer.close()

        return None

    def read_from_tfrecords(self):
        """ 读取tfrecords :return: None """
        file_queue = tf.train.string_input_producer(["./cifar10/cifar.tfrecords"])

        reader = tf.TFRecordReader()

        key, value = reader.read(file_queue)

        features = tf.parse_single_example(value, features={
   
            "image":tf.FixedLenFeature([], tf.string),
            "label":tf.FixedLenFeature([], tf.int64),
        })

        image = tf.decode_raw(features["image"], tf.uint8)

        # 设置静态形状,可用于转换动态形状
        image.set_shape([self.image_bytes])

        print(image)

        image_tensor = tf.reshape(image,[self.height,self.width,self.depth])

        print(image_tensor)

        label = tf.cast(features["label"], tf.int32)

        print(label)

        image_batch, label_batch = tf.train.batch([image_tensor, label],batch_size=10,num_threads=1,capacity=10)
        print(image_batch)
        print(label_batch)

        with tf.Session() as sess:
            coord = tf.train.Coordinator()

            threads = tf.train.start_queue_runners(sess=sess,coord=coord)

            print(sess.run([image_batch, label_batch]))

            coord.request_stop()
            coord.join(threads)

        return None


if __name__=="__main__":
    # 构造文件名字的列表
    filename = os.listdir(FLAGS.data_dir)
    file_list = [os.path.join(FLAGS.data_dir, file) for file in filename if file[-3:] == "bin"]

    cfar = CifarRead(file_list)
    # image_batch,label_batch = cfar.readcifar_decode()
    cfar.read_from_tfrecords()

    with tf.Session() as sess:


        # 构建线程协调器
        coord = tf.train.Coordinator()

        # 开启线程
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)

        # print(sess.run(image_batch))

        # 存进文件
        # cfar.convert_to_tfrecords(image_batch, label_batch)


        coord.request_stop()
        coord.join(threads)