Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded OnDiskReader #355

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/reader/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ This file is the implementation of Parser class.
namespace xLearn {

// Max size of one line TXT data
static const uint32 kMaxLineSize = 10 * 1024 * 1024; // 10 MB
static const uint32 kMaxLineSize = 100 * 1024; // 100 kB

static char line_buf[kMaxLineSize];

//------------------------------------------------------------------------------
// Class register
Expand Down Expand Up @@ -73,6 +72,7 @@ void LibsvmParser::Parse(char* buf,
uint64 size,
DMatrix& matrix,
bool reset) {
char line_buf[kMaxLineSize];
CHECK_NOTNULL(buf);
CHECK_GT(size, 0);
// Clear the data matrix
Expand All @@ -83,6 +83,7 @@ void LibsvmParser::Parse(char* buf,
uint64 pos = 0;
for (;;) {
uint64 rd_size = get_line_from_buffer(line_buf, buf, pos, size);
char *next_token;
if (rd_size == 0) break;
pos += rd_size;
matrix.AddRow();
Expand All @@ -91,7 +92,7 @@ void LibsvmParser::Parse(char* buf,
}
// Add Y
if (has_label_) { // for training task
char *y_char = strtok(line_buf, splitor_.c_str());
char *y_char = strtok_r(line_buf, splitor_.c_str(), &next_token);
matrix.Y[i] = atof(y_char);
} else { // for predict task
matrix.Y[i] = -2;
Expand All @@ -100,8 +101,8 @@ void LibsvmParser::Parse(char* buf,
real_t norm = 0.0;
// The first element
if (!has_label_) {
char *idx_char = strtok(line_buf, ":");
char *value_char = strtok(nullptr, splitor_.c_str());
char *idx_char = strtok_r(line_buf,":", &next_token);
char *value_char = strtok_r(nullptr, splitor_.c_str(), &next_token);
if (idx_char != nullptr && *idx_char != '\n') {
index_t idx = atoi(idx_char);
real_t value = atof(value_char);
Expand All @@ -111,8 +112,8 @@ void LibsvmParser::Parse(char* buf,
}
// The remain elements
for (;;) {
char *idx_char = strtok(nullptr, ":");
char *value_char = strtok(nullptr, splitor_.c_str());
char *idx_char = strtok_r(nullptr, ":", &next_token);
char *value_char = strtok_r(nullptr, splitor_.c_str(), &next_token);
if (idx_char == nullptr || *idx_char == '\n') {
break;
}
Expand All @@ -136,6 +137,8 @@ void FFMParser::Parse(char* buf,
uint64 size,
DMatrix& matrix,
bool reset) {
char *next_token;
char line_buf[kMaxLineSize];
CHECK_NOTNULL(buf);
CHECK_GT(size, 0);
// Clear the data matrix
Expand All @@ -152,7 +155,7 @@ void FFMParser::Parse(char* buf,
int i = matrix.row_length - 1;
// Add Y
if (has_label_) { // for training task
char *y_char = strtok(line_buf, splitor_.c_str());
char *y_char = strtok_r(line_buf, splitor_.c_str(), &next_token);
matrix.Y[i] = atof(y_char);
} else { // for predict task
matrix.Y[i] = -2;
Expand All @@ -161,9 +164,9 @@ void FFMParser::Parse(char* buf,
real_t norm = 0.0;
// The first element
if (!has_label_) {
char *field_char = strtok(line_buf, ":");
char *idx_char = strtok(nullptr, ":");
char *value_char = strtok(nullptr, splitor_.c_str());
char *field_char = strtok_r(line_buf, ":", &next_token);
char *idx_char = strtok_r(nullptr, ":", &next_token);
char *value_char = strtok_r(nullptr, splitor_.c_str(), &next_token);
if (idx_char != nullptr && *idx_char != '\n') {
index_t idx = atoi(idx_char);
real_t value = atof(value_char);
Expand All @@ -174,9 +177,9 @@ void FFMParser::Parse(char* buf,
}
// The remain elements
for (;;) {
char *field_char = strtok(nullptr, ":");
char *idx_char = strtok(nullptr, ":");
char *value_char = strtok(nullptr, splitor_.c_str());
char *field_char = strtok_r(nullptr, ":", &next_token);
char *idx_char = strtok_r(nullptr, ":", &next_token);
char *value_char = strtok_r(nullptr, splitor_.c_str(), &next_token);
if (field_char == nullptr || *field_char == '\n') {
break;
}
Expand Down Expand Up @@ -204,6 +207,7 @@ void CSVParser::Parse(char* buf,
uint64 size,
DMatrix& matrix,
bool reset) {
char line_buf[kMaxLineSize];
CHECK_NOTNULL(buf);
CHECK_GT(size, 0);
// Clear the data matrix
Expand Down
96 changes: 81 additions & 15 deletions src/reader/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ void InmemReader::Reset() { pos_ = 0; }

// Create parser and open file
void OndiskReader::Initialize(const std::string& filename) {
finished = true;
reset = true;
CHECK_NE(filename.empty(), true);
this->filename_ = filename;
// Init parser_
Expand All @@ -310,31 +312,95 @@ void OndiskReader::Initialize(const std::string& filename) {
#endif
}

void OndiskReader::blockWorkerThread(std::string *buf, uint64 size, int thread_idx) {
DMatrix *samples = new DMatrix;
assert(size == buf -> size());
char *bufData = (char *)(buf -> data());
parser_->Parse(bufData, size, *samples, true);
{
std::unique_lock<std::mutex> lk(condition_mutex);
mats.push(samples);
finished_threads[thread_idx] = std::move(worker_threads.at(thread_idx));
worker_threads.erase(thread_idx);
}
wait_variable.notify_all();
delete buf;
}

void OndiskReader::readData() {
// Convert MB to Byte
uint64 read_byte = block_size_ * 1024 * 1024;
uint32 num_threads = std::thread::hardware_concurrency();
for (int thread_idx = 0;; thread_idx++) {
// Read a block of data from disk file
size_t ret = ReadDataFromDisk(file_ptr_, block_, read_byte);
if (ret == 0) {
finished = true;
wait_variable.notify_all();
return;
} else if (ret == read_byte) {
// Find the last '\n', and shrink back file pointer
shrink_block(block_, &ret, file_ptr_);
} // else ret < read_byte: we don't need shrink_block()
// Parse block to data_sample_
{
std::unique_lock<std::mutex> lk(condition_mutex);
wait_variable.wait(lk, [&]{return worker_threads.size() < num_threads and mats.size() < 10;});
worker_threads[thread_idx] = std::thread ([ret, thread_idx, this](std::string *block) { this -> blockWorkerThread(block, ret, thread_idx);}, new std::string(block_, ret));
}
}
}

// Return to the beginning of the file
void OndiskReader::Reset() {
reset = true;
int ret = fseek(file_ptr_, 0, SEEK_SET);
if (ret != 0) {
LOG(FATAL) << "Fail to return to the head of file.";
}
if (last_ret != nullptr) {
last_ret -> Reset();
delete last_ret;
last_ret = nullptr;
}
{
std::unique_lock<std::mutex> lk(condition_mutex);
assert(finished == true and mats.size() == 0 and worker_threads.size() == 0);
}
if (mainReaderThread.joinable())
mainReaderThread.join();
for (auto &thread : finished_threads)
thread.second.join();
finished_threads.clear();
}

// Sample data from disk file.
index_t OndiskReader::Samples(DMatrix* &matrix) {
// Convert MB to Byte
uint64 read_byte = block_size_ * 1024 * 1024;
// Read a block of data from disk file
size_t ret = ReadDataFromDisk(file_ptr_, block_, read_byte);
if (ret == 0) {
matrix = nullptr;
return 0;
} else if (ret == read_byte) {
// Find the last '\n', and shrink back file pointer
shrink_block(block_, &ret, file_ptr_);
} // else ret < read_byte: we don't need shrink_block()
// Parse block to data_sample_
parser_->Parse(block_, ret, data_samples_, true);
matrix = &data_samples_;
return data_samples_.row_length;
if (last_ret != nullptr) {
last_ret -> Reset();
delete last_ret;
last_ret = nullptr;
}
if (reset) {
reset = false;
finished = false;
mainReaderThread = std::thread ([&] { this->readData(); });
} {
std::unique_lock<std::mutex> lk(condition_mutex);
wait_variable.wait(lk, [&]{return (finished == true and mats.size() == 0 and worker_threads.size() == 0)
or mats.size() > 0;});
if (finished and mats.size() == 0 and worker_threads.size() == 0) {
matrix = nullptr;
reset = true;
return 0;
}
last_ret = mats.front();
mats.pop();
}
wait_variable.notify_all();

matrix = last_ret;
return last_ret -> row_length;
}

void FromDMReader::Initialize(xLearn::DMatrix* &dmatrix) {
Expand Down
38 changes: 33 additions & 5 deletions src/reader/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ reading data from data source.
#include <vector>
#include <thread>
#include <algorithm>
#include <set>
#include <thread>

#include "src/base/common.h"
#include "src/base/class_register.h"
Expand Down Expand Up @@ -240,6 +242,7 @@ class InmemReader : public Reader {
// Initialize Reader from a new txt file.
void init_from_txt();


private:
DISALLOW_COPY_AND_ASSIGN(InmemReader);
};
Expand All @@ -253,10 +256,22 @@ class InmemReader : public Reader {
class OndiskReader : public Reader {
public:
// Constructor and Destructor
OndiskReader() { }
~OndiskReader() {
OndiskReader() : last_ret(nullptr) { }
~OndiskReader() {
{
std::unique_lock<std::mutex> lk(condition_mutex);
assert(finished == true and mats.size() == 0 and worker_threads.size() == 0);
}
if (mainReaderThread.joinable())
mainReaderThread.join();
for (auto &thread : finished_threads)
thread.second.join();
Clear();
Close(file_ptr_);
Close(file_ptr_);
if (last_ret != nullptr) {
last_ret -> Reset();
delete last_ret;
}
}

// Create parser and open file
Expand Down Expand Up @@ -291,9 +306,23 @@ class OndiskReader : public Reader {
}

protected:
DMatrix *last_ret;
/* Maintain the file pointer */
FILE* file_ptr_;


void readData();

std::map<int, std::thread> worker_threads;
std::map<int, std::thread> finished_threads;
bool finished;
bool reset;
std::queue<DMatrix*> mats;

void blockWorkerThread(std::string *buf, uint64 size, int thread_idx);
std::thread mainReaderThread;

std::mutex condition_mutex;
std::condition_variable wait_variable;
private:
DISALLOW_COPY_AND_ASSIGN(OndiskReader);
};
Expand Down Expand Up @@ -343,7 +372,6 @@ class FromDMReader : public Reader {
/* For random shuffle */
std::vector<index_t> order_;


private:
DISALLOW_COPY_AND_ASSIGN(FromDMReader);
};
Expand Down