Skip to content

Commit

Permalink
测试sysbench (#10)
Browse files Browse the repository at this point in the history
### What problem were solved in this pull request?

Problem:
增加调试日志

### What is changed and how it works?

### Other information
  • Loading branch information
hnwyllmm authored Apr 22, 2024
2 parents 46b949a + 105d715 commit 9c0d896
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 40 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,19 @@ jobs:
mysql --version
mysql -S /tmp/miniob.sock -e "show tables"
# error number 41 is LOCKED_CONCURRENCY_CONFLICT
# we should change the error number if we update the code
- name: sysbench test
shell: bash
run: |
cd test/sysbench
sysbench --mysql-socket=/tmp/miniob.sock --threads=10 ${{ matrix.test_case }} prepare
sysbench --mysql-socket=/tmp/miniob.sock --threads=10 ${{ matrix.test_case }} run
sysbench --mysql-socket=/tmp/miniob.sock --mysql-ignore-errors=41 --threads=10 ${{ matrix.test_case }} prepare
sysbench --mysql-socket=/tmp/miniob.sock --mysql-ignore-errors=41 --threads=10 ${{ matrix.test_case }} run
- name: stop server
shell: bash
run: |
mysql -S /tmp/miniob.sock -e "create table t(id int)"
mysql -S /tmp/miniob.sock -e "show tables"
killall observer
Expand All @@ -80,7 +83,7 @@ jobs:
shell: bash
run: |
cd test/sysbench
sysbench --mysql-socket=/tmp/miniob.sock --threads=10 ${{ matrix.test_case }} run
sysbench --mysql-socket=/tmp/miniob.sock --mysql-ignore-errors=41 --threads=10 ${{ matrix.test_case }} run
benchmark-test:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion src/observer/net/mysql_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ RC MysqlCommunicator::write_state(SessionEvent *event, bool &need_disconnect)
char *buf = new char[buf_size];
const std::string &state_string = sql_result->state_string();
if (state_string.empty()) {
const char *result = RC::SUCCESS == sql_result->return_code() ? "SUCCESS" : "FAILURE";
const char *result = strrc(sql_result->return_code());
snprintf(buf, buf_size, "%s", result);
} else {
snprintf(buf, buf_size, "%s > %s", strrc(sql_result->return_code()), state_string.c_str());
Expand Down
4 changes: 2 additions & 2 deletions src/observer/net/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ void NetServer::accept(int fd)
return;
}

LOG_INFO("Accepted connection from %s\n", communicator->addr());

rc = thread_handler_->new_connection(communicator);
if (OB_FAIL(rc)) {
LOG_WARN("failed to handle new connection. rc=%s", strrc(rc));
delete communicator;
return;
}

LOG_INFO("Accepted connection from %s\n", communicator->addr());
}

int NetServer::start()
Expand Down
10 changes: 8 additions & 2 deletions src/observer/sql/operator/index_scan_physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,28 @@ RC IndexScanPhysicalOperator::next()
bool filter_result = false;
while (RC::SUCCESS == (rc = index_scanner_->next_entry(&rid))) {
rc = record_handler_->get_record(rid, current_record_);
if (rc != RC::SUCCESS) {
if (OB_FAIL(rc)) {
LOG_TRACE("failed to get record. rid=%s, rc=%s", rid.to_string().c_str(), strrc(rc));
return rc;
}

LOG_TRACE("got a record. rid=%s", rid.to_string().c_str());

tuple_.set_record(&current_record_);
rc = filter(tuple_, filter_result);
if (rc != RC::SUCCESS) {
if (OB_FAIL(rc)) {
LOG_TRACE("failed to filter record. rc=%s", strrc(rc));
return rc;
}

if (!filter_result) {
LOG_TRACE("record filtered");
continue;
}

rc = trx_->visit_record(table_, current_record_, mode_);
if (rc == RC::RECORD_INVISIBLE) {
LOG_TRACE("record invisible");
continue;
} else {
return rc;
Expand Down
4 changes: 3 additions & 1 deletion src/observer/sql/operator/table_scan_physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ RC TableScanPhysicalOperator::next()

bool filter_result = false;
while (OB_SUCC(rc = record_scanner_.next(current_record_))) {

LOG_TRACE("got a record. rid=%s", current_record_.rid().to_string().c_str());

tuple_.set_record(&current_record_);
rc = filter(tuple_, filter_result);
if (rc != RC::SUCCESS) {
LOG_TRACE("record filtered failed=%s", strrc(rc));
return rc;
}

Expand Down
4 changes: 3 additions & 1 deletion src/observer/storage/buffer/disk_buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ RC DiskBufferPool::write_page(PageNum page_num, Page &page)
return RC::IOERR_WRITE;
}

LOG_TRACE("write_page: buffer_pool_id:%d, page_num:%d, lsn=%d, check_sum=%d", id(), page_num, page.lsn, page.check_sum);
return RC::SUCCESS;
}

Expand Down Expand Up @@ -651,7 +652,8 @@ RC DiskBufferPool::redo_allocate_page(LSN lsn, PageNum page_num)
}

if (page_num > file_header_->page_count) {
LOG_WARN("page %d is not continuous. file=%s", page_num, file_name_.c_str());
LOG_WARN("page %d is not continuous. file=%s, page_count=%d",
page_num, file_name_.c_str(), file_header_->page_count);
return RC::INTERNAL;
}

Expand Down
59 changes: 41 additions & 18 deletions src/observer/storage/buffer/double_write_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ RC DiskDoubleWriteBuffer::open_file(const char *filename)
}

file_desc_ = fd;
return RC::SUCCESS;
return load_pages();
}

RC DiskDoubleWriteBuffer::flush_page()
Expand All @@ -90,6 +90,7 @@ RC DiskDoubleWriteBuffer::flush_page()
}

dblwr_pages_.clear();
header_.page_cnt = 0;

return RC::SUCCESS;
}
Expand All @@ -101,20 +102,16 @@ RC DiskDoubleWriteBuffer::add_page(DiskBufferPool *bp, PageNum page_num, Page &p
auto iter = dblwr_pages_.find(key);
if (iter != dblwr_pages_.end()) {
iter->second->page = page;
LOG_TRACE("[cache hit]add page into double write buffer. buffer_pool_id:%d,page_num:%d,lsn=%d, dwb size=%d",
bp->id(), page_num, page.lsn, static_cast<int>(dblwr_pages_.size()));
return RC::SUCCESS;
}

if (static_cast<int>(dblwr_pages_.size()) >= max_pages_) {
RC rc = flush_page();
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to flush pages in double write buffer");
return rc;
}
}

int64_t page_cnt = dblwr_pages_.size();
DoubleWritePage *dblwr_page = new DoubleWritePage(bp->id(), page_num, page);
dblwr_pages_.insert(std::pair<DoubleWritePageKey, DoubleWritePage *>(key, dblwr_page));
LOG_TRACE("insert page into double write buffer. buffer_pool_id:%d,page_num:%d,lsn=%d, dwb size:%d",
bp->id(), page_num, page.lsn, static_cast<int>(dblwr_pages_.size()));

int64_t offset = page_cnt * DoubleWritePage::SIZE + DoubleWriteBufferHeader::SIZE;
if (lseek(file_desc_, offset, SEEK_SET) == -1) {
Expand All @@ -140,6 +137,14 @@ RC DiskDoubleWriteBuffer::add_page(DiskBufferPool *bp, PageNum page_num, Page &p
}
}

if (static_cast<int>(dblwr_pages_.size()) >= max_pages_) {
RC rc = flush_page();
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to flush pages in double write buffer");
return rc;
}
}

return RC::SUCCESS;
}

Expand All @@ -149,6 +154,9 @@ RC DiskDoubleWriteBuffer::write_page(DoubleWritePage *dblwr_page)
RC rc = bp_manager_.get_buffer_pool(dblwr_page->key.buffer_pool_id, disk_buffer);
ASSERT(OB_SUCC(rc) && disk_buffer != nullptr, "failed to get disk buffer pool of %d", dblwr_page->key.buffer_pool_id);

LOG_TRACE("double write buffer write page. buffer_pool_id:%d,page_num:%d,lsn=%d",
dblwr_page->key.buffer_pool_id, dblwr_page->key.page_num, dblwr_page->page.lsn);

return disk_buffer->write_page(dblwr_page->key.page_num, dblwr_page->page);
}

Expand Down Expand Up @@ -205,9 +213,17 @@ RC DiskDoubleWriteBuffer::clear_pages(DiskBufferPool *buffer_pool)
return RC::SUCCESS;
}

RC DiskDoubleWriteBuffer::recover()
RC DiskDoubleWriteBuffer::load_pages()
{
scoped_lock lock_guard(lock_);
if (file_desc_ < 0) {
LOG_ERROR("Failed to load pages, due to file desc is invalid.");
return RC::BUFFERPOOL_OPEN;
}

if (!dblwr_pages_.empty()) {
LOG_ERROR("Failed to load pages, due to double write buffer is not empty. opened?");
return RC::BUFFERPOOL_OPEN;
}

if (lseek(file_desc_, 0, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page header, due to failed to lseek:%s.", strerror(errno));
Expand All @@ -221,15 +237,15 @@ RC DiskDoubleWriteBuffer::recover()
return RC::IOERR_READ;
}

auto dblwr_page = make_unique<DoubleWritePage>();
for (int page_num = 0; page_num < header_.page_cnt; page_num++) {
int64_t offset = ((int64_t)page_num) * DoubleWritePage::SIZE + DoubleWriteBufferHeader::SIZE;

if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page %d, due to failed to lseek:%s.", page_num, strerror(errno));
LOG_ERROR("Failed to load page %d, offset=%ld, due to failed to lseek:%s.", page_num, offset, strerror(errno));
return RC::IOERR_SEEK;
}

auto dblwr_page = make_unique<DoubleWritePage>();
Page &page = dblwr_page->page;
page.check_sum = (CheckSum)-1;

Expand All @@ -240,17 +256,24 @@ RC DiskDoubleWriteBuffer::recover()
return RC::IOERR_READ;
}

if (crc32(page.data, BP_PAGE_DATA_SIZE) == page.check_sum) {
RC rc = write_page(dblwr_page.get());
if (OB_FAIL(rc)) {
return rc;
}
const CheckSum check_sum = crc32(page.data, BP_PAGE_DATA_SIZE);
if (check_sum == page.check_sum) {
DoubleWritePageKey key = dblwr_page->key;
dblwr_pages_.insert(pair<DoubleWritePageKey, DoubleWritePage *>(key, dblwr_page.release()));
} else {
LOG_TRACE("got a page with an invalid checksum. on disk:%d, in memory:%d", page.check_sum, check_sum);
}
}

LOG_INFO("double write buffer load pages done. page num=%d", dblwr_pages_.size());
return RC::SUCCESS;
}

RC DiskDoubleWriteBuffer::recover()
{
return flush_page();
}

////////////////////////////////////////////////////////////////
RC VacuousDoubleWriteBuffer::add_page(DiskBufferPool *bp, PageNum page_num, Page &page)
{
Expand Down
7 changes: 6 additions & 1 deletion src/observer/storage/buffer/double_write_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class DiskDoubleWriteBuffer : public DoubleWriteBuffer
*/
RC write_page(DoubleWritePage *page);

/**
* @brief 将磁盘文件中的内容加载到内存中。在启动时调用
*/
RC load_pages();

private:
int file_desc_ = -1;
int max_pages_ = 0;
Expand All @@ -153,4 +158,4 @@ class VacuousDoubleWriteBuffer : public DoubleWriteBuffer
* @brief 清空所有与指定buffer pool关联的页面
*/
RC clear_pages(DiskBufferPool *bp) override { return RC::SUCCESS; }
};
};
6 changes: 4 additions & 2 deletions src/observer/storage/clog/log_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ RC LogFileReader::iterate(std::function<RC(LogEntry &)> callback, LSN start_lsn
vector<char> data(header.size);
ret = readn(fd_, data.data(), header.size);
if (0 != ret) {
LOG_WARN("read file failed. filename=%s, ret = %d, error=%s", filename_.c_str(), ret, strerror(errno));
LOG_WARN("read file failed. filename=%s, size=%d, ret=%d, error=%s", filename_.c_str(), header.size, ret, strerror(errno));
return RC::IOERR_READ;
}

LogEntry entry;
entry.init(header.lsn, LogModule(header.module_id), std::move(data));
rc = callback(entry);
if (OB_FAIL(rc)) {
LOG_INFO("iterate log entry failed. entry=%s, rc=%s", entry.to_string().c_str(), strrc(rc));
return rc;
}
LOG_TRACE("redo log iterate entry success. entry=%s", entry.to_string().c_str());
}

return RC::SUCCESS;
Expand Down Expand Up @@ -335,4 +337,4 @@ RC LogFileManager::next_file(LogFileWriter &file_writer)
log_files_.emplace(lsn, file_path);

return file_writer.open(file_path.c_str(), lsn + max_entry_number_per_file_ - 1);
}
}
6 changes: 4 additions & 2 deletions src/observer/storage/db/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ RC Db::sync()

RC Db::recover()
{
LOG_TRACE("db recover begin. check_point_lsn=%d", check_point_lsn_);

LogReplayer *trx_log_replayer = trx_kit_->create_log_replayer(*this, *log_handler_);
if (trx_log_replayer == nullptr) {
LOG_ERROR("Failed to create trx log replayer.");
Expand All @@ -290,7 +292,7 @@ RC Db::recover()
return rc;
}

LOG_INFO("Successfully recover db. db=%s", name_.c_str());
LOG_INFO("Successfully recover db. db=%s checkpoint_lsn=%d", name_.c_str(), check_point_lsn_);
return rc;
}

Expand Down Expand Up @@ -393,4 +395,4 @@ RC Db::init_dblwr_buffer()

LogHandler &Db::log_handler() { return *log_handler_; }
BufferPoolManager &Db::buffer_pool_manager() { return *buffer_pool_manager_; }
TrxKit &Db::trx_kit() { return *trx_kit_; }
TrxKit &Db::trx_kit() { return *trx_kit_; }
21 changes: 18 additions & 3 deletions src/observer/storage/record/record_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ See the Mulan PSL v2 for more details. */
#include "storage/trx/trx.h"
#include "storage/clog/log_handler.h"

using namespace std;
using namespace common;

static constexpr int PAGE_HEADER_SIZE = (sizeof(PageHeader));
Expand Down Expand Up @@ -50,6 +51,17 @@ int page_record_capacity(int page_size, int record_size)
*/
int page_bitmap_size(int record_capacity) { return (record_capacity + 7) / 8; }

string PageHeader::to_string() const
{
stringstream ss;
ss << "record_num:" << record_num
<< ",record_real_size:" << record_real_size
<< ",record_size:" << record_size
<< ",record_capacity:" << record_capacity
<< ",first_record_offset:" << first_record_offset;
return ss.str();
}

////////////////////////////////////////////////////////////////////////////////
RecordPageIterator::RecordPageIterator() {}
RecordPageIterator::~RecordPageIterator() {}
Expand Down Expand Up @@ -257,7 +269,8 @@ RC RecordPageHandler::delete_record(const RID *rid)
"cannot delete record from page while the page is readonly");

if (rid->slot_num >= page_header_->record_capacity) {
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, page_num %d.", rid->slot_num, frame_->page_num());
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, frame=%s, page_header=%s",
rid->slot_num, frame_->to_string().c_str(), page_header_->to_string().c_str());
return RC::INVALID_ARGUMENT;
}

Expand Down Expand Up @@ -285,7 +298,8 @@ RC RecordPageHandler::update_record(const RID &rid, const char *data)
ASSERT(rw_mode_ != ReadWriteMode::READ_ONLY, "cannot delete record from page while the page is readonly");

if (rid.slot_num >= page_header_->record_capacity) {
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, page_num %d.", rid.slot_num, frame_->page_num());
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, frame=%s, page_header=%s",
rid.slot_num, frame_->to_string().c_str(), page_header_->to_string().c_str());
return RC::INVALID_ARGUMENT;
}

Expand Down Expand Up @@ -317,7 +331,8 @@ RC RecordPageHandler::update_record(const RID &rid, const char *data)
RC RecordPageHandler::get_record(const RID &rid, Record &record)
{
if (rid.slot_num >= page_header_->record_capacity) {
LOG_ERROR("Invalid slot_num:%d, exceed page's record capacity, page_num %d.", rid.slot_num, frame_->page_num());
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, frame=%s, page_header=%s",
rid.slot_num, frame_->to_string().c_str(), page_header_->to_string().c_str());
return RC::RECORD_INVALID_RID;
}

Expand Down
4 changes: 3 additions & 1 deletion src/observer/storage/record/record_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct PageHeader
int32_t record_size; ///< 每条记录占用实际空间大小(可能对齐)
int32_t record_capacity; ///< 最大记录个数
int32_t first_record_offset; ///< 第一条记录的偏移量

std::string to_string() const;
};

/**
Expand Down Expand Up @@ -375,4 +377,4 @@ class RecordFileScanner
RecordPageHandler record_page_handler_; ///< 处理文件某页面的记录
RecordPageIterator record_page_iterator_; ///< 遍历某个页面上的所有record
Record next_record_; ///< 获取的记录放在这里缓存起来
};
};
Loading

0 comments on commit 9c0d896

Please sign in to comment.