Skip to content
chengduo edited this page Jan 5, 2018 · 1 revision

Double Buffering

Motivation

当前Fluid在模型训练时,首先由Python端向C++端传入一个batch的训练数据和网络结构,然后开始后续的各种计算。如果计算是在GPU上进行的,内存的中的数据还要经过PCI-E接口将数据从CPU端拷贝到GPU端。而数据从磁盘中读取和从CPU端拷贝到GPU是非常耗时的。为此,希望通过双缓存机制减少数据传输延迟。

调研对象:

  • tensorflow
  • caffe2

为了更清楚的描述两种框架的缓存机制,调研是从两种框架的数据加载开始的。

Tensorflow

1. TF获取数据的方式:

  • tf.data API:这是TF当前极力推荐的,通过调用该API可以非常方便的获取不同输入格式的数据,同时改API支持对数据进行多种形式的变换(如:shuffling,banching、random crop等等)。tf.data API是对feeding和QueueRunner的改进。
  • Feeding:这是TF最不高效的数据读取方式,一般用在小实验或者debug中。Feeding机制能够使用户向计算图中注入新的数据,即数据从内存中获取数据比如:向计算图中的placeholder注入新数据。(当前fluid使用...)
  • QueueRunner:是一种基于队列流水线式数据读取方式,在V1.2之前,TF推荐使用多线程+基于队列的流水线式读取数据,支持从硬盘中获取数据,比如在C++端通过文件名加载数据。现在已经停止这方面的开发。
    • 典型的pipeline image
      • The list of filenames
      • Optional filename shuffling
      • Optional epoch limit
      • Filename queue
      • A Reader for the file format
      • A decoder for a record read by the reader
      • Optional preprocessing
      • Example queue
  • Preloaded data: 通过constant或variable来hold所有数据(一般对于适用于小数据集)。

2. Input pipelines

TF的input pipelines本质上是一个ETL过程,直接引用官方文档描述:

  1. Extract: Read data from persistent storage -- either local (e.g. HDD or SSD) or remote (e.g. GCS or HDFS).
  2. Transform: Use CPU cores to parse and perform preprocessing operations on the data such as image decompression, data augmentation transformations (such as random crop, flips, and color distortions), shuffling, and batching.
  3. Load: Load the transformed data onto the accelerator device(s) (for example, GPU(s) or TPU(s)) that execute the machine learning model.

当模型在GPU上训练的时,这种模式能够使得数据获取、数据变换与模型训练一同进行,使得模型训练更加高效。

2.1 tf.data的两个抽象概念

  • tf.data.Dataset
  • tf.data.Iterator

直接引用TF官方介绍:

  • A tf.data.Dataset represents a sequence of elements, in which each element contains one or more Tensor objects. For example, in an image pipeline, an element might be a single training example, with a pair of tensors representing the image data and a label. There are two distinct ways to create a dataset:

    • Creating a source (e.g. Dataset.from_tensor_slices()) constructs a dataset from one or more tf.Tensor objects.

    • Applying a transformation (e.g. Dataset.batch()) constructs a dataset from one or more tf.data.Dataset objects.

  • A tf.data.Iterator provides the main way to extract elements from a dataset. The operation returned by Iterator.get_next() yields the next element of a Dataset when executed, and typically acts as the interface between input pipeline code and your model. The simplest iterator is a "one-shot iterator", which is associated with a particular Dataset and iterates through it once. For more sophisticated uses, the Iterator.initializer operation enables you to reinitialize and parameterize an iterator with different datasets, so that you can, for example, iterate over training and validation data multiple times in the same program.

参考Google给出的DataSet和Interator关系结构图 image

  • Dataset: Base class containing methods to create and transform datasets. Also allows you initialize a dataset from data in memory, or from a Python generator.
  • TextLineDataset: Reads lines from text files.
  • TFRecordDataset: Reads records from TFRecord files.
  • FixedLengthRecordDataset: Reads fixed size records from binary files.
  • Iterator: Provides a way to access one dataset element at a time.

因为Extract和Transform都是在CPU上进行的,所以DataSet和Iterator的所有Op都只有CPU版本的实现。

2.2 Data Loading (CPU->CPU)

由于DataSet和QueueRunner机制中所有Op都只是在CPU上注册,所以都不涉及在GPU端缓存数据。为降低CPU->GPU的数据拷贝带来的延迟,TensorFlow了一个轻量级的队列:stage_op、unstage_op.

stage_op: 轻量级的队列

Tensorflow提供提供了一个轻量级的队列:stage/unstage,以此降低feed数据时因为CPU->GPU的数据拷贝带来的延迟。

  • 调用方式
    首先运行一次stage_op,向staging area中放入一批数据,然后一起运行stage_op和train_op,train_op在运行时从staging area中取出一批数据,stage_op向staging area中放入新的一批数据,即GPU端的计算和CPU到GPU的数据拷贝同时进行。下面是一个简单的例子:
   with ops.Graph().as_default() as G:
      with ops.device('/cpu:0'):
        x = array_ops.placeholder(dtypes.float32)
        v = 2. * (array_ops.zeros([128, 128]) + x)
      with ops.device(test.gpu_device_name()):
        stager = data_flow_ops.StagingArea([dtypes.float32])
        stage = stager.put([v])
        y = stager.get()
        y = math_ops.reduce_max(math_ops.matmul(y, y))

    G.finalize()

    with self.test_session(use_gpu=True, graph=G) as sess:
      sess.run(stage, feed_dict={x: -1})
      for i in range(10):
        _, yval = sess.run([stage, y], feed_dict={x: i})
        self.assertAllClose(4 * (i - 1) * (i - 1) * 128, yval, rtol=1e-4)
代码逻辑及实现
Op Op功能
Buffer Stage等Op的基类,通过线程安全的方式向buf中存放/获取数据
StageOp 向Buffer中存放数据
UnstageOp 从Buffer中获取数据
StagePeekOp 从Buffer中获取指定index的数据
StageSizeOp 获取当前Buffer的大小
StageClearOp 清除当前Buffer中所有数据

Buffer

Buffer在初始化时需要指定容器的大小(capacity_)以及Buffer占用的最大空间(memor_limit_)。Buffer内部将数据在std::deque中。代码如下:

class Buffer : public ResourceBase {
 public:
  // public types
  using Tuple = std::vector<Tensor>;

 private:
  // private variables
  std::size_t capacity_;
  std::size_t memory_limit_;
  std::size_t current_bytes_;
  std::mutex mu_;
  std::condition_variable non_empty_cond_var_;
  std::condition_variable full_cond_var_;
  std::deque<Tuple> buf_;

 public:
  // public methods
  explicit Buffer(std::size_t capacity, std::size_t memory_limit)
      : capacity_(capacity), memory_limit_(memory_limit), current_bytes_(0) {}
  // the Buffer takes ownership of the Tuple
  Status Put(Tuple* tuple){...}
  // Get tuple at front of the buffer
  void Get(Tuple* tuple){...}
  // Return tuple at index
  Status Peek(std::size_t index, Tuple* tuple){...}
  // Return Buffer size
  size_t Size(){...}
  void Clear(){...}
  
 private:
  // If the buffer is configured for bounded capacity, notify
  // waiting inserters that space is now available
  void notify_inserters_if_bounded(std::unique_lock<std::mutex>* lock){...}
  // Are there a limit number of elements or a memory limit
  // configued on this buffer?
  bool IsBounded() const { return capacity_ > 0 || memory_limit_ > 0; }
  bool IsCapacityFull() const { return buf_.size() >= capacity_; }
  bool WouldExceedMemoryLimit(std::size_t bytes) const {
    return bytes + current_bytes_ > memory_limit_;
  }
  std::size_t GetTupleBytes(const Tuple & tuple){...}
};

获取Buffer

在stageOp、unstageOp、StagePeekOp、StageSizeOp、StageClearOp中通过调用GetBuffer函数获取Buffer的实例。

Status GetBuffer(OpKernelContext* ctx, const NodeDef& ndef, Buffer** buf) {
  auto rm = ctx->resource_manager();
  ContainerInfo cinfo;

  // Lambda for creating the Staging Area
  auto create_fn = [&ndef](Buffer** ret) -> Status
  {
    int64 capacity;
    int64 memory_limit;
    TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "capacity", &capacity));
    TF_RETURN_IF_ERROR(GetNodeAttr(ndef, "memory_limit", &memory_limit));
    *ret = new Buffer(capacity, memory_limit);
    return Status::OK();
  };

  TF_RETURN_IF_ERROR(cinfo.Init(rm, ndef, true /* use name() */));
  TF_RETURN_IF_ERROR(rm->LookupOrCreate<Buffer>(cinfo.container(), cinfo.name(),
                                                buf, create_fn));
  return Status::OK();
}

StageOp

StageOp从ctx中获取输入数据,并将数据添加到buffer中。

class StageOp : public OpKernel {
 public:
  explicit StageOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}

  void Compute(OpKernelContext* ctx) override {
    Buffer* buf = nullptr;
    OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
    core::ScopedUnref scope(buf);
    Buffer::Tuple tuple;
    tuple.reserve(ctx->num_inputs());
    for (int i = 0; i < ctx->num_inputs(); ++i) {
      tuple.push_back(ctx->input(i));
    }
    OP_REQUIRES_OK(ctx, buf->Put(&tuple));
  }
};

UnstageOp

UnstageOp从buffer中获取数据,并将数据放入ctx中,之后该数据从buffer中移除。

class UnstageOp : public OpKernel {
 public:
  explicit UnstageOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}

  // Using this op in such a way that it blocks forever
  // is an error.  As such cancellation is not handled.
  void Compute(OpKernelContext* ctx) override {
    Buffer* buf = nullptr;
    OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
    core::ScopedUnref scope(buf);
    Buffer::Tuple tuple;

    buf->Get(&tuple);

    OP_REQUIRES(ctx, tuple.size() == (size_t)ctx->num_outputs(),
        errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(), " vs. ", ctx->num_outputs()));

    for (size_t i = 0; i < tuple.size(); ++i) {
      ctx->set_output(i, tuple[i]);
    }
  }
};

StagePeekOp

StagePeekOp从buffer中deque的指定位置(index)获取数据,之后改数据不会从buffer中移除。

class StagePeekOp : public OpKernel {
 public:
  explicit StagePeekOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}

  // Using this op in such a way that it blocks forever
  // is an error.  As such cancellation is not handled.
  void Compute(OpKernelContext* ctx) override {
    Buffer* buf = nullptr;
    OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
    core::ScopedUnref scope(buf);
    Buffer::Tuple tuple;

    std::size_t index = ctx->input(0).scalar<int>()();

    OP_REQUIRES_OK(ctx, buf->Peek(index, &tuple));

    OP_REQUIRES(ctx, tuple.size() == (size_t)ctx->num_outputs(),
        errors::InvalidArgument("Mismatch stage/unstage: ", tuple.size(),  " vs. ", ctx->num_outputs()));

    for (size_t i = 0; i < tuple.size(); ++i) {
      ctx->set_output(i, tuple[i]);
    }
  }
};

StageSizeOp

StageSizeOp返回当前buffer中含有数据的个数。

class StageSizeOp : public OpKernel {
 public:
  explicit StageSizeOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}

  // Using this op in such a way that it blocks forever
  // is an error.  As such cancellation is not handled.
  void Compute(OpKernelContext* ctx) override {
    Buffer* buf = nullptr;
    OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
    core::ScopedUnref scope(buf);

    // Allocate size output tensor
    Tensor * size = nullptr;
    OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), &size));

    // Set it to the actual size
    size->scalar<int32>().setConstant(buf->Size());
  }
};

StageClearOp

StageClearOp用于清除buffer中数据。

class StageClearOp : public OpKernel {
 public:
  explicit StageClearOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}

  // Using this op in such a way that it blocks forever
  // is an error.  As such cancellation is not handled.
  void Compute(OpKernelContext* ctx) override {
    Buffer* buf = nullptr;
    OP_REQUIRES_OK(ctx, GetBuffer(ctx, def(), &buf));
    core::ScopedUnref scope(buf);

    buf->Clear();
  }
};

带来的性能提升
源码

小结

  1. 在DataSet和QueueRunner机制中,数据加载和数据变换都可以分别使用不同的线程池完成。
  2. DataSet、Iterator和QueueRunner的所有Op都只是在CPU上注册,所以都不涉及在GPU端缓存数据。
  3. 为降低CPU->GPU的数据拷贝带来的延迟,TensorFlow了一个轻量级的队列:stage_op、unstage_op.

caffe2 - DataPrefetch

与tensorflow不同,caffe2有一个PrefetchOperator类,其中做的事情包括:从DB中读取数据、对数据进行各种transform(比如random crop, flips, and color distortions等等)、将数据放入缓冲区。

PrefetchOperator继承关系:

OperatorBase
 `-PrefetchOperator
   `-ImageInputOp
   `-TensorProtosDBInput
   `-VideoInputOp

PrefetchOperator代码逻辑

  • 输入:DBReader。在单机多GPU情况下DBReader只有一个实例,为各个GPU共享。
  • 在下一次迭代时,从buffer中取一组数据

PrefetchOperator内部以生产者和消费者机制进行工作。

  • 生产者:
    • 调用Prefetch方法,从DBReader中获取一个batch数据
    • 内部使用线程池对数据进行各种Transform
    • 将数据放进buffer中
  • 消费者
    • 从buffer中获取一组数据,作为PrefetchOperator的输出

PrefetchOperator中只有一个生产者和一个消费者,每次只放如一条数据。

  • 生产者:
  void PrefetchWorker() {
    context_.SwitchToDevice();
    std::unique_lock<std::mutex> lock(prefetch_access_mutex_);
    while (prefetched_)
      producer_.wait(lock);
    while (!finalize_) {
      ...
      Prefetch();
      ...
      prefetched_ = true;
      consumer_.notify_one();
      while (prefetched_)
        producer_.wait(lock);
    }
  }
  • 消费者:
  bool Run(int /* unused */ /*stream_id*/) override {
    ...
    context_.SwitchToDevice(0);
    std::unique_lock<std::mutex> lock(prefetch_access_mutex_);
    while (!prefetched_)
      consumer_.wait(lock);
    ...
    CopyPrefetched();
    ...
    prefetched_ = false;
    context_.FinishDeviceComputation();
    producer_.notify_one();
    return true;
  }

以ImageInputOp为例,介绍Prefetch操作

template <class Context>
bool ImageInputOp<Context>::Prefetch() {
  if (!owned_reader_.get()) {
    reader_ = &OperatorBase::Input<db::DBReader>(0); // 获取数据源
  }
  const int channels = color_ ? 3 : 1;
  if (gpu_transform_) {
    // we'll transfer up in int8, then convert later
    prefetched_image_.mutable_data<uint8_t>();
  } else {
    prefetched_image_.mutable_data<float>();
  }

  prefetched_label_.mutable_data<int>();

  for (int item_id = 0; item_id < batch_size_; ++item_id) {
    std::string key, value;
    cv::Mat img;
    
    reader_->Read(&key, &value);  // Extract Data

    // determine label type based on first item
    if( item_id == 0 ) {
      if( use_caffe_datum_ ) {
        prefetched_label_.mutable_data<int>();
      } else {
        TensorProtos protos;
        CAFFE_ENFORCE(protos.ParseFromString(value));
        TensorProto_DataType labeldt = protos.protos(1).data_type();
        if( labeldt == TensorProto::INT32 ) {
          prefetched_label_.mutable_data<int>();
        } else if ( labeldt == TensorProto::FLOAT) {
          prefetched_label_.mutable_data<float>();
        } else {
          LOG(FATAL) << "Unsupported label type.";
        }

        for (int i = 0; i < additional_inputs_count_; ++i) {
          int index = additional_inputs_offset_ + i;
          TensorProto additional_output_proto = protos.protos(index);

          if (additional_output_proto.data_type() == TensorProto::FLOAT) {
            prefetched_additional_outputs_[i].template mutable_data<float>();
          } else if (
              additional_output_proto.data_type() == TensorProto::INT32) {
            prefetched_additional_outputs_[i].template mutable_data<int>();
          } else if (
              additional_output_proto.data_type() == TensorProto::INT64) {
            prefetched_additional_outputs_[i].template mutable_data<int64_t>();
          } else {
            LOG(FATAL) << "Unsupported output type.";
          }
        }
      }
    }
    // launch into thread pool for processing
    // TODO: support color jitter and color lighting in gpu_transform
    if (gpu_transform_) {
      // output of decode will still be int8
      uint8_t* image_data = prefetched_image_.mutable_data<uint8_t>() +
          crop_ * crop_ * channels * item_id;
      thread_pool_->runTaskWithID(std::bind(
          &ImageInputOp<Context>::DecodeAndTransposeOnly,
          this,
          std::string(value),
          image_data,
          item_id,
          channels,
          std::placeholders::_1));
    } else {
      float* image_data = prefetched_image_.mutable_data<float>() +
          crop_ * crop_ * channels * item_id;
      thread_pool_->runTaskWithID(std::bind(
          &ImageInputOp<Context>::DecodeAndTransform,
          this,
          std::string(value),
          image_data,
          item_id,
          channels,
          std::placeholders::_1));
    }
  }
  thread_pool_->waitWorkComplete();

  // If the context is not CPUContext, we will need to do a copy in the
  // prefetch function as well.
  if (!std::is_same<Context, CPUContext>::value) {
    prefetched_image_on_device_.CopyFrom(prefetched_image_, &context_);
    prefetched_label_on_device_.CopyFrom(prefetched_label_, &context_);

    for (int i = 0; i < prefetched_additional_outputs_on_device_.size(); ++i) {
      prefetched_additional_outputs_on_device_[i].CopyFrom(
          prefetched_additional_outputs_[i], &context_);
    }
  }
  return true;
}

DBReader实现细节

/**
 * A reader wrapper for DB that also allows us to serialize it.
 */
class DBReader {
 private:
  string db_type_; //数据库的类型,包括minidb,leveldb,lmdb等等
  string source_;  //数据库的路径
  unique_ptr<DB> db_; //数据库对象
  unique_ptr<Cursor> cursor_; //数据库游标
  mutable std::mutex reader_mutex_; //单机多GPU环境下,应该是多线程进行训练,多线程共享同一个DBReader实例,因此需要用这个reader_mutex来控制对共享变量的访问。
  uint32_t num_shards_; //单机环境下,该值为0,分布式环境下,该值为节点数目
  uint32_t shard_id_; //节点id,从0开始,单机情况下为0,依次递增

  
  void InitializeCursor(const int32_t num_shards, const int32_t shard_id) {
    CAFFE_ENFORCE(num_shards >= 1);
    CAFFE_ENFORCE(shard_id >= 0);
    CAFFE_ENFORCE(shard_id < num_shards);
    num_shards_ = num_shards;
    shard_id_ = shard_id;
    cursor_ = db_->NewCursor();
    SeekToFirst();
  }

  void MoveToBeginning() const {
    cursor_->SeekToFirst();
    for (auto s = 0; s < shard_id_; s++) {
      cursor_->Next();
      CAFFE_ENFORCE(
          cursor_->Valid(), "Db has less rows than shard id: ", s, shard_id_);
    }
  }

 public:

  friend class DBReaderSerializer;
  DBReader() {}
  DBReader(
      const string& db_type,
      const string& source,
      const int32_t num_shards = 1,
      const int32_t shard_id = 0) {
    Open(db_type, source, num_shards, shard_id);
  }

  explicit DBReader(const DBReaderProto& proto) {
    Open(proto.db_type(), proto.source());
    if (proto.has_key()) {
      CAFFE_ENFORCE(cursor_->SupportsSeek(),
          "Encountering a proto that needs seeking but the db type "
          "does not support it.");
      cursor_->Seek(proto.key());
    }
    num_shards_ = 1;
    shard_id_ = 0;
  }

  explicit DBReader(std::unique_ptr<DB> db)
      : db_type_("<memory-type>"),
        source_("<memory-source>"),
        db_(std::move(db)) {
    CAFFE_ENFORCE(db_.get(), "Passed null db");
    cursor_ = db_->NewCursor();
  }

  void Open(
      const string& db_type,
      const string& source,
      const int32_t num_shards = 1,
      const int32_t shard_id = 0) {
    cursor_.reset();
    db_.reset();
    db_type_ = db_type;
    source_ = source;
    db_ = CreateDB(db_type_, source_, READ);
    CAFFE_ENFORCE(db_, "Cannot open db: ", source_, " of type ", db_type_);
    InitializeCursor(num_shards, shard_id);
  }

  void Open(
      unique_ptr<DB>&& db,
      const int32_t num_shards = 1,
      const int32_t shard_id = 0) {
    cursor_.reset();
    db_.reset();
    db_ = std::move(db);
    CAFFE_ENFORCE(db_.get(), "Passed null db");
    InitializeCursor(num_shards, shard_id);
  }

 public:
  void Read(string* key, string* value) const {
    CAFFE_ENFORCE(cursor_ != nullptr, "Reader not initialized.");
    std::unique_lock<std::mutex> mutex_lock(reader_mutex_); 只对单机多GPU会阻塞,不同机器之间不会阻塞,
    *key = cursor_->key();
    *value = cursor_->value();

    // In sharded mode, each read skips num_shards_ records
    for (int s = 0; s < num_shards_; s++) {
      cursor_->Next();
      if (!cursor_->Valid()) {
        MoveToBeginning();
        break;
      }
    }
  }
  ...
};