Skip to content

Commit

Permalink
New backup consolidated commit
Browse files Browse the repository at this point in the history
  • Loading branch information
flowguru committed Nov 7, 2024
1 parent 2b23111 commit 5bd814d
Show file tree
Hide file tree
Showing 12 changed files with 848 additions and 16 deletions.
5 changes: 4 additions & 1 deletion fdbbackup/FileConverter.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
if (fp->empty()) {
self->fileProgress.erase(self->fileProgress.begin());
} else {
// Keep fileProgress sorted
// Keep fileProgress sorted because only the first one can be chagned,so this is enough
for (int i = 1; i < self->fileProgress.size(); i++) {
if (*self->fileProgress[i - 1] <= *self->fileProgress[i]) {
break;
Expand Down Expand Up @@ -489,6 +489,9 @@ ACTOR Future<Void> convert(ConvertParams params) {
arena = Arena();
}

// keep getting data until a new version is encounter, then flush all data buffered and start to buffer for a
// new version.

ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion()));
MutationRef m;
rd >> m;
Expand Down
43 changes: 42 additions & 1 deletion fdbclient/BackupAgentBase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion,
return ret;
}

// given a begin and end version, get the prefix in the database for this range
// which is applyLogKeys.begin/backupUid/hash(uint8)/version(64bites)/part
// returns multiple key ranges, each should be of length APPLY_BLOCK_SIZE
// (64, 200) -> [(64, 128), (128, 192), (192, 200)]
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) {
Standalone<VectorRef<KeyRangeRef>> ret;

Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin);

//TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix);
Expand Down Expand Up @@ -292,6 +295,7 @@ void _addResult(bool* tenantMapChanging,
each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the
"result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector)
*/
// hfu5: value is each Param2
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
VectorRef<MutationRef>* result,
VectorRef<Optional<MutationRef>>* encryptedResult,
Expand All @@ -318,6 +322,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
throw incompatible_protocol_version();
}

// hfu5: this is the format for Param2
state uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
Expand All @@ -332,6 +337,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,

while (consumed < totalBytes) {
uint32_t type = 0;
// hfu5: format should be type|kLen|vLen|Key|Value
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t len1 = 0;
Expand Down Expand Up @@ -448,6 +454,9 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
} else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
// version is the version of this mutation decoded from log
// ver is the old version stored in keyVersionMap
// as a result, only add this mutation in log when the version is larger(to work with range file)
if (version > ver && ver != invalidVersion) {
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
Expand Down Expand Up @@ -587,6 +596,7 @@ ACTOR Future<Void> readCommitted(Database cx,
}
}

// hfu5: read each version, potentially multiple part within the same version
ACTOR Future<Void> readCommitted(Database cx,
PromiseStream<RCGroup> results,
Future<Void> active,
Expand Down Expand Up @@ -639,14 +649,23 @@ ACTOR Future<Void> readCommitted(Database cx,
wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());

// iterate on a version range.
// each version - partition is a key-value pair
// hfu5 question: when in the edge case, two partitions of same key goes to two different blocks, so they
// cannot be combined here, what happens?
for (auto& s : rangevalue) {
// hfu5 : (version, part)
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
if (groupKey != skipGroup) {
if (rcGroup.version == -1) {
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
} else if (rcGroup.groupKey != groupKey) {
// hfu5: if seeing a different version, then send result directly, and then create another
// rcGroup as a result, each rcgroup is for a single version, but a single version can span in
// different rcgroups

//TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size());
// state uint32_t len(0);
// for (size_t j = 0; j < rcGroup.items.size(); ++j) {
Expand All @@ -665,6 +684,7 @@ ACTOR Future<Void> readCommitted(Database cx,
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
// this is each item, so according to kvMutationLogToTransactions, each item should be a partition
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
Expand Down Expand Up @@ -706,6 +726,8 @@ Future<Void> readCommitted(Database cx,
cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True);
}

// restore transaction has to be first in the batch, or it is the only txn in batch to make sure it never conflicts with
// others.
ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
Key uid,
Version newBeginVersion,
Expand Down Expand Up @@ -759,6 +781,8 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
state Version lastVersion = invalidVersion;
state bool endOfStream = false;
state int totalBytes = 0;
// two layer of loops, outside loop for each file range,
// inside look for each transaction(version)
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
Expand All @@ -774,10 +798,12 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,

BinaryWriter bw(Unversioned());
for (int i = 0; i < group.items.size(); ++i) {
// hfu5 : each value should be a partition
bw.serializeBytes(group.items[i].value);
}
// Parse a single transaction from the backup mutation log
Standalone<StringRef> value = bw.toValue();
// ref: https://github.com/apple/foundationdb/blob/release-6.2/design/backup-dataFormat.md
wait(decodeBackupLogValue(&curReq.arena,
&curReq.transaction.mutations,
&curReq.transaction.encryptedMutations,
Expand Down Expand Up @@ -882,6 +908,13 @@ ACTOR Future<Void> coalesceKeyVersionCache(Key uid,
lastVersion = it.value();
} else {
Version ver = it.value();
// ver: version from keyVersion
// endVersion: after applying a batch of versions from log files, the largest version
// if ver < endVersion, that means this key in keyVersion is outdated
// in this case, runClearRange on the keyVersionMapRange prefix for this key,
// so that the alog key is the truth, otherwise, keyVersionMapRange should be the truth
// each key needs to be individually checked, because even though range file is for a range, log file does
// not
if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion &&
lastVersion != invalidVersion) {
Key removeKey = it.range().begin.withPrefix(mapPrefix);
Expand Down Expand Up @@ -940,15 +973,22 @@ ACTOR Future<Void> applyMutations(Database cx,
}

int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes);
// this means newEndVersion can only be at most of size APPLY_BLOCK_SIZE
state Version newEndVersion = std::min(*endVersion,
((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) *
CLIENT_KNOBS->APPLY_BLOCK_SIZE);

// ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400]
// (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
// ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Reference<FlowLock>> locks;

// each RCGroup is for a single version, each results[i] is for a single range
// one range might have multiple versions
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
locks.push_back(makeReference<FlowLock>(
Expand All @@ -957,6 +997,7 @@ ACTOR Future<Void> applyMutations(Database cx,
}

maxBytes = std::max<int>(maxBytes * CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES);

for (idx = 0; idx < ranges.size(); ++idx) {
int bytes =
wait(kvMutationLogToTransactions(cx,
Expand Down
16 changes: 15 additions & 1 deletion fdbclient/BackupContainerFileSystem.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,15 @@ class BackupContainerFileSystemImpl {
// If "keyRangesFilter" is empty, the file set will cover all key ranges present in the backup.
// It's generally a good idea to specify "keyRangesFilter" to reduce the number of files for
// restore times.
//
// hfu5:1. it first reads and parse snapshot file, each snapshot file can map to a list of range files
// including ranges/ and kvranges/, then it collects range files who has intersecting keys
// 2. not sure why restorable.targetVersion < maxKeyRangeVersion it would continue
// 3. then it has a minKeyRangeVersion representing min version of all range files
// 4. then it read all log files with start smaller than targetVersion and end larget than minKeyRangeVersion
// 4. if the first log file start version is smaller than minKeyRangeVersion, then we do not know the value, give
// up.
// otherwise return both range and log files.
// 5. LogFile object is created in BackupContainerFileSystem::listLogFiles, and tagID are populated for plog
// If "logsOnly" is true, then only log files are returned and "keyRangesFilter" is ignored,
// because the log can contain mutations of the whole key space, unlike range files that each
// is limited to a smaller key range.
Expand Down Expand Up @@ -943,6 +951,7 @@ class BackupContainerFileSystemImpl {
state Version minKeyRangeVersion = MAX_VERSION;
state Version maxKeyRangeVersion = -1;

// iterate each listed file, why still return a vector
std::pair<std::vector<RangeFile>, std::map<std::string, KeyRange>> results =
wait(bc->readKeyspaceSnapshot(snapshots[i]));

Expand All @@ -955,6 +964,7 @@ class BackupContainerFileSystemImpl {
maxKeyRangeVersion = snapshots[i].endVersion;
} else {
for (const auto& rangeFile : results.first) {
// each file is a version on a [begin, end] key range
const auto& keyRange = results.second.at(rangeFile.fileName);
if (keyRange.intersects(keyRangesFilter)) {
restorable.ranges.push_back(rangeFile);
Expand All @@ -971,6 +981,8 @@ class BackupContainerFileSystemImpl {
// 'latestVersion' represents using the minimum restorable version in a snapshot.
restorable.targetVersion = targetVersion == latestVersion ? maxKeyRangeVersion : targetVersion;
// Any version < maxKeyRangeVersion is not restorable.
// hfu5 question: why? what if target version is 8500, and this snapshot has [8000, 8200, 8800]
// do we give up directly? why it is not restorable?
if (restorable.targetVersion < maxKeyRangeVersion)
continue;

Expand All @@ -993,6 +1005,7 @@ class BackupContainerFileSystemImpl {
store(plogs, bc->listLogFiles(minKeyRangeVersion, restorable.targetVersion, true)));

if (plogs.size() > 0) {
// hfu5 : this is how files are decided
logs.swap(plogs);
// sort by tag ID so that filterDuplicates works.
std::sort(logs.begin(), logs.end(), [](const LogFile& a, const LogFile& b) {
Expand All @@ -1005,6 +1018,7 @@ class BackupContainerFileSystemImpl {
restorable.logs.swap(filtered);
// sort by version order again for continuous analysis
std::sort(restorable.logs.begin(), restorable.logs.end());
// sort by version, but isPartitionedLogsContinuous will sort each tag separately
if (isPartitionedLogsContinuous(restorable.logs, minKeyRangeVersion, restorable.targetVersion)) {
restorable.continuousBeginVersion = minKeyRangeVersion;
restorable.continuousEndVersion = restorable.targetVersion + 1; // not inclusive
Expand Down
1 change: 1 addition & 0 deletions fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BACKUP_DISPATCH_ADDTASK_SIZE, 50 );
init( RESTORE_DISPATCH_ADDTASK_SIZE, 150 );
init( RESTORE_DISPATCH_BATCH_SIZE, 30000 ); if( randomize && BUGGIFY ) RESTORE_DISPATCH_BATCH_SIZE = 20;
init (RESTORE_PARTITIONED_BATCH_VERSION_SIZE, 1000000);
init( RESTORE_WRITE_TX_SIZE, 256 * 1024 );
init( APPLY_MAX_LOCK_BYTES, 1e9 );
init( APPLY_MIN_LOCK_BYTES, 11e6 ); //Must be bigger than TRANSACTION_SIZE_LIMIT
Expand Down
Loading

0 comments on commit 5bd814d

Please sign in to comment.