Skip to content

Commit

Permalink
fix: fix gc coredump (#3561)
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 authored Nov 15, 2023
1 parent aa8e756 commit bb6bc09
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 19 deletions.
36 changes: 17 additions & 19 deletions src/storage/mem_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,18 @@ bool MemTable::Put(uint64_t time, const std::string& value, const Dimensions& di
PDLOG(WARNING, "invalid schema version %u, tid %u pid %u", version, id_, pid_);
return false;
}
std::map<int32_t, uint64_t> ts_map;
std::map<uint32_t, std::map<int32_t, uint64_t>> ts_value_map;
for (const auto& kv : inner_index_key_map) {
auto inner_index = table_index_.GetInnerIndex(kv.first);
if (!inner_index) {
PDLOG(WARNING, "invalid inner index pos %d. tid %u pid %u", kv.first, id_, pid_);
return false;
}
std::map<int32_t, uint64_t> ts_map;
for (const auto& index_def : inner_index->GetIndex()) {
if (!index_def->IsReady()) {
continue;
}
auto ts_col = index_def->GetTsColumn();
if (ts_col) {
int64_t ts = 0;
Expand All @@ -192,34 +196,28 @@ bool MemTable::Put(uint64_t time, const std::string& value, const Dimensions& di
return false;
}
ts_map.emplace(ts_col->GetId(), ts);
}
if (index_def->IsReady()) {
real_ref_cnt++;
}
}
if (!ts_map.empty()) {
ts_value_map.emplace(kv.first, std::move(ts_map));
}
}
if (ts_map.empty()) {
if (ts_value_map.empty()) {
return false;
}
auto* block = new DataBlock(real_ref_cnt, value.c_str(), value.length());
for (const auto& kv : inner_index_key_map) {
auto inner_index = table_index_.GetInnerIndex(kv.first);
bool need_put = false;
for (const auto& index_def : inner_index->GetIndex()) {
if (index_def->IsReady()) {
// TODO(hw): if we don't find this ts(has_found_ts==false), but it's ready, will put too?
need_put = true;
break;
}
auto iter = ts_value_map.find(kv.first);
if (iter == ts_value_map.end()) {
continue;
}
if (need_put) {
uint32_t seg_idx = 0;
if (seg_cnt_ > 1) {
seg_idx = ::openmldb::base::hash(kv.second.data(), kv.second.size(), SEED) % seg_cnt_;
}
Segment* segment = segments_[kv.first][seg_idx];
segment->Put(::openmldb::base::Slice(kv.second), ts_map, block);
uint32_t seg_idx = 0;
if (seg_cnt_ > 1) {
seg_idx = ::openmldb::base::hash(kv.second.data(), kv.second.size(), SEED) % seg_cnt_;
}
Segment* segment = segments_[kv.first][seg_idx];
segment->Put(::openmldb::base::Slice(kv.second), iter->second, block);
}
record_byte_size_.fetch_add(GetRecordSize(value.length()));
return true;
Expand Down
73 changes: 73 additions & 0 deletions src/storage/snapshot_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,79 @@ TEST_F(SnapshotTest, Recover_only_snapshot) {
ASSERT_FALSE(it->Valid());
}

TEST_F(SnapshotTest, RecoverWithDeleteIndex) {
uint32_t tid = 12;
uint32_t pid = 0;
::openmldb::api::TableMeta meta;
meta.set_tid(tid);
meta.set_pid(pid);
SchemaCodec::SetColumnDesc(meta.add_column_desc(), "userid", ::openmldb::type::kString);
SchemaCodec::SetColumnDesc(meta.add_column_desc(), "ts1", ::openmldb::type::kBigInt);
SchemaCodec::SetColumnDesc(meta.add_column_desc(), "ts2", ::openmldb::type::kBigInt);
SchemaCodec::SetColumnDesc(meta.add_column_desc(), "val", ::openmldb::type::kString);
SchemaCodec::SetIndex(meta.add_column_key(), "index1", "userid", "ts1", ::openmldb::type::kLatestTime, 0, 1);
SchemaCodec::SetIndex(meta.add_column_key(), "index2", "userid", "ts2", ::openmldb::type::kLatestTime, 0, 1);

std::string snapshot_dir = absl::StrCat(FLAGS_db_root_path, "/", tid, "_", pid, "/snapshot");

::openmldb::base::MkdirRecur(snapshot_dir);
std::string snapshot1 = "20231018.sdb";
uint64_t offset = 0;
{
if (FLAGS_snapshot_compression != "off") {
snapshot1.append(".");
snapshot1.append(FLAGS_snapshot_compression);
}
std::string full_path = snapshot_dir + "/" + snapshot1;
FILE* fd_w = fopen(full_path.c_str(), "ab+");
ASSERT_TRUE(fd_w != NULL);
::openmldb::log::WritableFile* wf = ::openmldb::log::NewWritableFile(snapshot1, fd_w);
::openmldb::log::Writer writer(FLAGS_snapshot_compression, wf);
::openmldb::codec::SDKCodec sdk_codec(meta);
for (int i = 0; i < 5; i++) {
uint32_t ts = 100 + i;
for (int key_num = 0; key_num < 10; key_num++) {
std::string userid = absl::StrCat("userid", key_num);
std::string ts_str = std::to_string(ts);
std::vector<std::string> row = {userid, ts_str, ts_str, "aa"};
std::string result;
sdk_codec.EncodeRow(row, &result);
::openmldb::api::LogEntry entry;
entry.set_log_index(offset++);
entry.set_value(result);
for (int k = 0; k < meta.column_key_size(); k++) {
auto dimension = entry.add_dimensions();
dimension->set_key(userid);
dimension->set_idx(k);
}
entry.set_ts(ts);
entry.set_term(1);
std::string val;
bool ok = entry.SerializeToString(&val);
ASSERT_TRUE(ok);
Slice sval(val.c_str(), val.size());
::openmldb::log::Status status = writer.AddRecord(sval);
ASSERT_TRUE(status.ok());
}
}
writer.EndLog();
}

auto index1 = meta.mutable_column_key(1);
index1->set_flag(1);
std::shared_ptr<MemTable> table = std::make_shared<MemTable>(meta);
table->Init();
LogParts* log_part = new LogParts(12, 4, scmp);
MemTableSnapshot snapshot(tid, pid, log_part, FLAGS_db_root_path);
ASSERT_TRUE(snapshot.Init());
int ret = snapshot.GenManifest(snapshot1, 50, offset, 1);
ASSERT_EQ(0, ret);
uint64_t r_offset = 0;
ASSERT_TRUE(snapshot.Recover(table, r_offset));
ASSERT_EQ(r_offset, offset);
table->SchedGc();
}

TEST_F(SnapshotTest, MakeSnapshot) {
LogParts* log_part = new LogParts(12, 4, scmp);
MemTableSnapshot snapshot(1, 2, log_part, FLAGS_db_root_path);
Expand Down

0 comments on commit bb6bc09

Please sign in to comment.