Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

double write buffer #367

Merged
merged 24 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
51a540c
double write buffer #334
Wenbin1002 Apr 5, 2024
9d7c09e
change data type #334
Wenbin1002 Apr 5, 2024
801a4fb
fix deadlock bug #334
Wenbin1002 Apr 6, 2024
eb175ca
add open mode #334
Wenbin1002 Apr 6, 2024
b863c6a
add write read lock to disk buffer #334
Wenbin1002 Apr 7, 2024
87cf1bd
clang format #334
Wenbin1002 Apr 7, 2024
48e6d75
add write page function #334
Wenbin1002 Apr 8, 2024
ce4147d
debug log #334
Wenbin1002 Apr 8, 2024
605a4fc
init dblwr buffer #334
Wenbin1002 Apr 8, 2024
770c598
remove string head file #334
Wenbin1002 Apr 8, 2024
1a6e1be
fix deadlock #334
Wenbin1002 Apr 8, 2024
fff8f46
避免重复申请disk buffer #334
Wenbin1002 Apr 8, 2024
4f79896
init dblwr buffer #334
Wenbin1002 Apr 11, 2024
bd95b17
尝试修复double write buffer死锁的问题
hnwyllmm Apr 11, 2024
0e2197d
dblwr buffer recover #334
Wenbin1002 Apr 11, 2024
b869b5a
fix read error #334
Wenbin1002 Apr 11, 2024
2b9e45c
add header info for dwb #334
Wenbin1002 Apr 12, 2024
a01bbaf
Merge branch 'issue_334' into fix/close_bp
hnwyllmm Apr 12, 2024
037e684
Merge pull request #2 from hnwyllmm-test/fix/close_bp
Wenbin1002 Apr 13, 2024
c05a14d
Revert "修复关闭buffer pool 与访问double write buffer 冲突的问题"
Wenbin1002 Apr 13, 2024
ed9c4b2
Merge pull request #3 from Wenbin1002/revert-2-fix/close_bp
Wenbin1002 Apr 13, 2024
a46cc11
add dwb header #334
Wenbin1002 Apr 13, 2024
d132b94
fix memory leak in DoubleWriteBuffer::recover
hnwyllmm Apr 15, 2024
b5da6d7
fix compile error
hnwyllmm Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/observer/common/rc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ See the Mulan PSL v2 for more details. */
DEFINE_RC(FILE_WRITE) \
DEFINE_RC(VARIABLE_NOT_EXISTS) \
DEFINE_RC(VARIABLE_NOT_VALID) \
DEFINE_RC(LOGBUF_FULL)

DEFINE_RC(LOGBUF_FULL) \
DEFINE_RC(DBLWR_RECOVER_ERRO)
enum class RC
{
#define DEFINE_RC(name) name,
Expand Down
311 changes: 293 additions & 18 deletions src/observer/storage/buffer/disk_buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ RC BufferPoolIterator::reset()
}

////////////////////////////////////////////////////////////////////////////////
DiskBufferPool::DiskBufferPool(BufferPoolManager &bp_manager, BPFrameManager &frame_manager)
: bp_manager_(bp_manager), frame_manager_(frame_manager)
DiskBufferPool::DiskBufferPool(
BufferPoolManager &bp_manager, BPFrameManager &frame_manager, DoubleWriteBuffer &dblwr_manager)
: bp_manager_(bp_manager), frame_manager_(frame_manager), dblwr_manager_(dblwr_manager)
{}

DiskBufferPool::~DiskBufferPool()
Expand Down Expand Up @@ -311,12 +312,17 @@ RC DiskBufferPool::get_this_page(PageNum page_num, Frame **frame)
// allocated_frame->pin(); // pined in manager::get
allocated_frame->access();

if ((rc = load_page(page_num, allocated_frame)) != RC::SUCCESS) {
LOG_ERROR("Failed to load page %s:%d", file_name_.c_str(), page_num);
purge_frame(page_num, allocated_frame);
return rc;
// check if the page is in double write buffer
optional<Page> ret = dblwr_manager_.get_page(file_name_, page_num);
if (ret != std::nullopt) {
hnwyllmm marked this conversation as resolved.
Show resolved Hide resolved
allocated_frame->page() = ret.value();
} else {
hnwyllmm marked this conversation as resolved.
Show resolved Hide resolved
if ((rc = load_page(page_num, allocated_frame)) != RC::SUCCESS) {
LOG_ERROR("Failed to load page %s:%d", file_name_.c_str(), page_num);
purge_frame(page_num, allocated_frame);
return rc;
}
}

*frame = allocated_frame;
return RC::SUCCESS;
}
Expand Down Expand Up @@ -490,17 +496,13 @@ RC DiskBufferPool::flush_page_internal(Frame &frame)
// so it is easier to flush data to file.

frame.set_check_sum(crc32(frame.page().data, BP_PAGE_DATA_SIZE));
Page &page = frame.page();
int64_t offset = ((int64_t)page.page_num) * sizeof(Page);
if (lseek(file_desc_, offset, SEEK_SET) == offset - 1) {
LOG_ERROR("Failed to flush page %lld of %d due to failed to seek %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_SEEK;
}
Page &page = frame.page();

if (writen(file_desc_, &page, sizeof(Page)) != 0) {
LOG_ERROR("Failed to flush page %lld of %d due to %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_WRITE;
RC rc = dblwr_manager_.add_page(file_name_, page);
if (rc != RC::SUCCESS) {
return rc;
hnwyllmm marked this conversation as resolved.
Show resolved Hide resolved
}

frame.clear_dirty();
LOG_DEBUG("Flush block. file desc=%d, pageNum=%d, pin count=%d", file_desc_, page.page_num, frame.pin_count());

Expand Down Expand Up @@ -536,6 +538,44 @@ RC DiskBufferPool::recover_page(PageNum page_num)
return RC::SUCCESS;
}

RC DiskBufferPool::write_page(Page &page)
{
scoped_lock lock_guard(wr_lock_);
int64_t offset = ((int64_t)page.page_num) * sizeof(Page);
if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to write page %lld of %d due to failed to seek %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_SEEK;
}

if (writen(file_desc_, &page, sizeof(Page)) != 0) {
LOG_ERROR("Failed to write page %lld of %d due to %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_WRITE;
}

return RC::SUCCESS;
}

RC DiskBufferPool::open_file_for_dwb(const char *file_name)
{
int fd = open(file_name, O_RDWR);
if (fd < 0) {
LOG_ERROR("Failed to open file %s, because %s.", file_name, strerror(errno));
return RC::IOERR_ACCESS;
}
LOG_INFO("Successfully open buffer pool file %s.", file_name);

file_name_ = file_name;
file_desc_ = fd;

return RC::SUCCESS;
}

RC DiskBufferPool::close_file_for_dwb()
{
file_desc_ = -1;
return RC::SUCCESS;
}

RC DiskBufferPool::allocate_frame(PageNum page_num, Frame **buffer)
{
auto purger = [this](Frame *frame) {
Expand Down Expand Up @@ -584,7 +624,8 @@ RC DiskBufferPool::check_page_num(PageNum page_num)

RC DiskBufferPool::load_page(PageNum page_num, Frame *frame)
{
int64_t offset = ((int64_t)page_num) * BP_PAGE_SIZE;
std::scoped_lock lock_guard(wr_lock_);
int64_t offset = ((int64_t)page_num) * BP_PAGE_SIZE;
if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page %s:%d, due to failed to lseek:%s.", file_name_.c_str(), page_num, strerror(errno));

Expand All @@ -611,6 +652,7 @@ BufferPoolManager::BufferPoolManager(int memory_size /* = 0 */)
}
const int pool_num = std::max(memory_size / BP_PAGE_SIZE / DEFAULT_ITEM_NUM_PER_POOL, 1);
frame_manager_.init(pool_num);
dblwr_buffer_ = new DoubleWriteBuffer(*this);
LOG_INFO("buffer pool manager init with memory size %d, page num: %d, pool num: %d",
memory_size, pool_num * DEFAULT_ITEM_NUM_PER_POOL, pool_num);
}
Expand All @@ -623,6 +665,8 @@ BufferPoolManager::~BufferPoolManager()
for (auto &iter : tmp_bps) {
delete iter.second;
}

delete dblwr_buffer_;
}

RC BufferPoolManager::create_file(const char *file_name)
Expand Down Expand Up @@ -680,7 +724,7 @@ RC BufferPoolManager::open_file(const char *_file_name, DiskBufferPool *&_bp)
return RC::BUFFERPOOL_OPEN;
}

DiskBufferPool *bp = new DiskBufferPool(*this, frame_manager_);
DiskBufferPool *bp = new DiskBufferPool(*this, frame_manager_, *dblwr_buffer_);
RC rc = bp->open_file(_file_name);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to open file name");
Expand Down Expand Up @@ -744,6 +788,16 @@ RC BufferPoolManager::flush_page(Frame &frame)
return bp->flush_page(frame);
}

RC BufferPoolManager::get_disk_buffer(const char *file_name, DiskBufferPool **buf)
{

if (buffer_pools_.count(file_name) != 0) {
*buf = buffer_pools_[file_name];
}

return RC::SUCCESS;
}

static BufferPoolManager *default_bpm = nullptr;
void BufferPoolManager::set_instance(BufferPoolManager *bpm)
{
Expand All @@ -754,3 +808,224 @@ void BufferPoolManager::set_instance(BufferPoolManager *bpm
default_bpm = bpm;
}
BufferPoolManager &BufferPoolManager::instance() { return *default_bpm; }

DoubleWriteBuffer::DoubleWriteBuffer(BufferPoolManager &bp_manager) : bp_manager_(bp_manager) { open_file(); }

DoubleWriteBuffer::~DoubleWriteBuffer()
{
for (auto page : dblwr_pages_) {
delete page;
}
close(file_desc_);
}

RC DoubleWriteBuffer::open_file()
{
int fd = open(DBLWR_FILE_NAME, O_CREAT | O_RDWR, 0644);
if (fd < 0) {
LOG_ERROR("Failed to open or creat %s, due to %s.", DBLWR_FILE_NAME, strerror(errno));
return RC::SCHEMA_DB_EXIST;
}

file_desc_ = fd;
return RC::SUCCESS;
}

RC DoubleWriteBuffer::flush_page()
{
sync();

buffers_.clear();
for (const auto &page : dblwr_pages_) {
const char *file_name = page->get_file_name();

RC rc = get_disk_buffer(file_name);
if (rc != RC::SUCCESS) {
LOG_ERROR("failed to get disk buffer");
return rc;
}
}

for (const auto &page : dblwr_pages_) {
RC rc = write_page(page);
if (rc != RC::SUCCESS) {
return rc;
}
delete page;
}

clear_buffer();

dblwr_pages_.clear();
pages_.clear();

return RC::SUCCESS;
}

RC DoubleWriteBuffer::add_page(const std::string &file_name, Page &page)
{
std::scoped_lock lock_guard(lock_);
string key = file_name + to_string(page.page_num);

if (pages_.count(key) != 0) {
pages_.at(key)->get_page() = page;
return RC::SUCCESS;
}

if (dblwr_pages_.size() >= DBLWR_BUFFER_MAX_SIZE) {
RC rc = flush_page();
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to flush pages in double write buffer");
return rc;
Wenbin1002 marked this conversation as resolved.
Show resolved Hide resolved
}
}

int64_t page_cnt = dblwr_pages_.size();
DoubleWritePage *dblwr_page = new DoubleWritePage((int)dblwr_pages_.size(), file_name, page);
dblwr_pages_.push_back(dblwr_page);

int64_t offset = page_cnt * DW_PAGE_SIZE + sizeof(int);
if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to add page %lld of %d due to failed to seek %s.", offset, file_desc_, strerror(errno));
Wenbin1002 marked this conversation as resolved.
Show resolved Hide resolved
return RC::IOERR_SEEK;
}

if (writen(file_desc_, dblwr_page, DW_PAGE_SIZE) != 0) {
LOG_ERROR("Failed to add page %lld of %d due to %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_WRITE;
}

if (page_cnt + 1 > header_.page_cnt) {
header_.page_cnt = page_cnt + 1;
if (lseek(file_desc_, 0, SEEK_SET) == -1) {
LOG_ERROR("Failed to add page header due to failed to seek %s.", strerror(errno));
return RC::IOERR_SEEK;
}

if (writen(file_desc_, &header_, sizeof(header_)) != 0) {
LOG_ERROR("Failed to add page header due to %s.", strerror(errno));
return RC::IOERR_WRITE;
}
}

pages_[key] = dblwr_page;

return RC::SUCCESS;
}

RC DoubleWriteBuffer::write_page(DoubleWritePage *dblwr_page)
{
if (buffers_.count(dblwr_page->get_file_name()) == 0) {
LOG_ERROR("can't find disk buffer when write page");
return RC::IOERR_WRITE;
}

DiskBufferPool *disk_buffer = buffers_[dblwr_page->get_file_name()];

return disk_buffer->write_page(dblwr_page->get_page());
}

RC DoubleWriteBuffer::get_disk_buffer(const char *file_name)
{
if (buffers_.count(file_name) != 0) {
return RC::SUCCESS;
}

DiskBufferPool *disk_buffer = nullptr;
bp_manager_.get_disk_buffer(file_name, &disk_buffer);

/**
* 如果bpm中没有对应的DiskBufferPool,就创建一个新的DiskBufferPool。
* 调用bpm中open_file时,需要申请一个新的frame,而如果此时frame manager已满,需要purge page,会导致无限循环
*/
if (disk_buffer == nullptr) {
disk_buffer = new DiskBufferPool(bp_manager_, bp_manager_.get_frame_manager(), *this);
RC rc = disk_buffer->open_file_for_dwb(file_name);
if (rc != RC::SUCCESS) {
LOG_ERROR("failed to open file for dwb");
return rc;
}
buffer_to_delete.push_back(disk_buffer);
}
buffers_[file_name] = disk_buffer;

return RC::SUCCESS;
}

RC DoubleWriteBuffer::recover()
{
scoped_lock lock_guard(lock_);

if (lseek(file_desc_, 0, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page header, due to failed to lseek:%s.", strerror(errno));
return RC::IOERR_SEEK;
}

int ret = readn(file_desc_, &header_, sizeof(header_));
if (ret != 0 && ret != -1) {
LOG_ERROR("Failed to load page header, file_desc:%d, due to failed to read data:%s, ret=%d",
file_desc_, strerror(errno), ret);
return RC::IOERR_READ;
}

auto dblwr_page = make_unique<DoubleWritePage>();
for (int page_num = 0; page_num < header_.page_cnt; page_num++) {
int64_t offset = ((int64_t)page_num) * DW_PAGE_SIZE + sizeof(int);

if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page %d, due to failed to lseek:%s.", page_num, strerror(errno));
return RC::IOERR_SEEK;
}

Page &page = dblwr_page->get_page();
page.check_sum = (CheckSum)-1;

ret = readn(file_desc_, &dblwr_page, DW_PAGE_SIZE);
if (ret != 0) {
LOG_ERROR("Failed to load page, file_desc:%d, page num:%d, due to failed to read data:%s, ret=%d, page count=%d",
file_desc_, page_num, strerror(errno), ret, page_num);
return RC::IOERR_READ;
hnwyllmm marked this conversation as resolved.
Show resolved Hide resolved
}

if (crc32(page.data, BP_PAGE_DATA_SIZE) == page.check_sum) {
RC rc = get_disk_buffer(dblwr_page->get_file_name());
if (rc != RC::SUCCESS) {
clear_buffer();
return rc;
}

rc = write_page(dblwr_page);
if (rc != RC::SUCCESS) {
clear_buffer();
return rc;
}
}
}

clear_buffer();

return RC::SUCCESS;
}

void DoubleWriteBuffer::clear_buffer()
{
for (const auto &buffer : buffer_to_delete) {
buffer->close_file_for_dwb();
delete buffer;
}

buffers_.clear();
buffer_to_delete.clear();
}

std::optional<Page> DoubleWriteBuffer::get_page(const std::string &file_name, PageNum &page_num)
{
std::scoped_lock lock_guard(lock_);

string key = file_name + to_string(page_num);
if (pages_.count(key) != 0) {
return make_optional<Page>(pages_.at(key)->get_page());
}

return std::nullopt;
}
Loading
Loading