Skip to content

Commit

Permalink
Optimize record boundaries
Browse files Browse the repository at this point in the history
Summary:
Knowing record boundaries is necessary:
- to compute disk stream-sizes (by adding each of the stream's record disk size)
- to know how much data we need to cache when streaming and prefetching records

The VRS file index only provides the start offset of each record. When records are sorted in the file the same as they are by timestamp, each record ends where the next starts (or user records end), which is simple enough, and doesn't require additional memory. That's the 99% case.

Sometimes, a few records aren't sorted well, and we actually only need to track these records' boundaries and the records around them. But knowing which records need to be tracked is complex, because each record by be followed by any other, and detecting record boundaries becomes difficult: you must effectively compile a sorted list every boundary, then search where the next boundary is.
So when a file isn't fully sorted, and we need to track a relatively small number of boundaries, as for most records, we can use the offset of the following record as the record's limit.

In some extreme cases however, from what I can tell only with artificially generated files designed to stress-test this situation, many records aren't sorted, and tracking exceptions become more memory consuming than keeping the list of sorted boundaries. In this case, we need a binary search each time we need to find where a record ends.

With these changes, we make sure that:
- we only build the list of boundaries when the file isn't sorted, saving memory and compute, 99% of the time.
- we track boundaries using a map containing only unexpected limits around records "out of place", when there are a limited number of exceptions.
- we can track boundaries with the complete list of boundaries when there are many exceptions.
- for testing, we can force using either method, so we can compare results provided by both methods.

This is a deceptively simple problem, but making an efficient solution that works correctly was surprisingly tricky, hence preserving both methods for unit testing.

Differential Revision: D67614158

fbshipit-source-id: 08b86551ef02e932831c151a3b126295f6816afd
  • Loading branch information
Georges Berenger authored and facebook-github-bot committed Jan 3, 2025
1 parent 635ce51 commit c7862b3
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 28 deletions.
101 changes: 77 additions & 24 deletions vrs/RecordFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ int RecordFileReader::closeFile() {
openProgressLogger_ = &defaultProgressLogger_;
streamIndex_.clear();
streamRecordCounts_.clear();
recordBoundaries_.clear();
recordLimits_.clear();
lastRequest_.clear();
fileHasAnIndex_ = false;
return result;
Expand All @@ -496,41 +498,92 @@ int RecordFileReader::clearStreamPlayers() {
return 0;
}

const vector<int64_t>& RecordFileReader::getRecordBoundaries() const {
if (recordBoundaries_.empty()) {
// records are not always perfectly sorted, so we can't tell easily where they end.
// The best guess, is the offset of the first record, after the current record...
// yep, that's a bit expensive, but we have few options...
recordBoundaries_.reserve(recordIndex_.size() + 1);
int64_t lastOffset = 0;
bool sortNeeded = false;
void RecordFileReader::buildRecordBoundaries(bool boundariesAndLimits) const {
if (recordIndex_.empty() || (!recordBoundaries_.empty() && !recordLimits_.empty()) ||
(!boundariesAndLimits && (!recordBoundaries_.empty() || !recordLimits_.empty()))) {
return;
}
int sortErrors = 0;
int64_t lastOffset = 0;
for (const auto& r : recordIndex_) {
if (r.fileOffset < lastOffset) {
sortErrors++;
}
lastOffset = r.fileOffset;
}
if (sortErrors == 0 && !boundariesAndLimits) {
// files are usually fully sorted, and we don't need much
recordLimits_[recordIndex_.size() - 1] = endOfUserRecordsOffset_;
} else {
vector<int64_t> boundaries;
boundaries.reserve(recordIndex_.size() + 1);
for (const auto& r : recordIndex_) {
recordBoundaries_.emplace_back(r.fileOffset);
if (r.fileOffset < lastOffset) {
sortNeeded = true;
boundaries.emplace_back(r.fileOffset);
}
boundaries.emplace_back(endOfUserRecordsOffset_);
sort(boundaries.begin(), boundaries.end());

// the array of boundaries can save memory if we have too many errors and the map is big
// We'll have to do a binary search for each record limit we need...
bool tooManyErrors = sortErrors > recordIndex_.size() / 10;

if (boundariesAndLimits || !tooManyErrors) {
recordLimits_.clear();
auto nextBoundary = boundaries.end();
for (size_t i = 0; i < recordIndex_.size(); ++i) {
if (nextBoundary == boundaries.end() || *nextBoundary != recordIndex_[i].fileOffset ||
++nextBoundary == boundaries.end() || i == recordIndex_.size() - 1 ||
*nextBoundary != recordIndex_[i + 1].fileOffset) {
nextBoundary =
upper_bound(boundaries.begin(), boundaries.end(), recordIndex_[i].fileOffset);
if (!XR_VERIFY(nextBoundary != boundaries.end())) {
tooManyErrors = true;
recordLimits_.clear();
break;
}
if (i + 1 >= recordIndex_.size() || recordIndex_[i + 1].fileOffset != *nextBoundary) {
recordLimits_[i] = *nextBoundary;
}
}
}
lastOffset = r.fileOffset;
}
recordBoundaries_.emplace_back(endOfUserRecordsOffset_);
if (sortNeeded) {
sort(recordBoundaries_.begin(), recordBoundaries_.end());
if (boundariesAndLimits || tooManyErrors) {
recordBoundaries_ = std::move(boundaries);
}
}
return recordBoundaries_;
}

uint32_t RecordFileReader::getRecordSize(uint32_t recordIndex) const {
if (recordIndex >= recordIndex_.size()) {
int64_t RecordFileReader::getFollowingRecordOffset(uint32_t recordIndex, bool useBoundaries) const {
if (!XR_VERIFY(recordIndex < recordIndex_.size())) {
return 0;
}
const IndexRecord::RecordInfo& record = recordIndex_[recordIndex];
const vector<int64_t>& boundaries = getRecordBoundaries();
auto nextBoundary = upper_bound(boundaries.begin(), boundaries.end(), record.fileOffset);
if (!XR_VERIFY(nextBoundary != boundaries.end()) ||
!XR_VERIFY(*nextBoundary > record.fileOffset)) {
if (recordBoundaries_.empty() && recordLimits_.empty()) {
buildRecordBoundaries(false);
}
if (useBoundaries && !recordBoundaries_.empty()) {
auto nextBoundary = upper_bound(
recordBoundaries_.begin(), recordBoundaries_.end(), recordIndex_[recordIndex].fileOffset);
if (XR_VERIFY(nextBoundary != recordBoundaries_.end())) {
return *nextBoundary;
}
}
auto nextIter = recordLimits_.find(recordIndex);
if (nextIter != recordLimits_.end()) {
return nextIter->second;
}
return XR_VERIFY(recordIndex < recordIndex_.size() - 1) ? recordIndex_[recordIndex + 1].fileOffset
: endOfUserRecordsOffset_;
}

uint32_t RecordFileReader::getRecordSize(uint32_t recordIndex, bool useBoundaries) const {
if (recordIndex >= recordIndex_.size()) {
return 0;
}
return *nextBoundary - record.fileOffset;
int64_t nextOffset = getFollowingRecordOffset(recordIndex, useBoundaries);
if (XR_VERIFY(nextOffset > recordIndex_[recordIndex].fileOffset)) {
return static_cast<uint32_t>(nextOffset - recordIndex_[recordIndex].fileOffset);
}
return 0;
}

bool RecordFileReader::prefetchRecordSequence(
Expand Down
14 changes: 10 additions & 4 deletions vrs/RecordFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,11 @@ class RecordFileReader {
uint32_t getRecordStreamIndex(const IndexRecord::RecordInfo* record) const;

/// Get a record's disk size.
uint32_t getRecordSize(uint32_t recordIndex) const;
/// @param record: index of the record.
/// @param useBoundaries: if true, use the record's boundaries when available to compute the size.
/// For testing only: use the default value!
/// @return The record's size on disk, or 0 for invalid indexes.
uint32_t getRecordSize(uint32_t recordIndex, bool useBoundaries = true) const;

/// Timestamp for the first data record in the whole file.
/// @return The timestamp for the file data record, or 0, if the file contains no data record.
Expand Down Expand Up @@ -554,6 +558,8 @@ class RecordFileReader {
uint32_t totalCount() const;
};

void buildRecordBoundaries(bool boundariesAndLimits = false) const; ///< private, for testing only

private:
int doOpenFile(const FileSpec& fileSpec, bool autoWriteFixedIndex, bool checkSignatureOnly);
int readFileHeader(const FileSpec& fileSpec, FileFormat::FileHeader& outFileHeader);
Expand All @@ -568,8 +574,9 @@ class RecordFileReader {
static const string& getTag(const map<string, string>& tags, const string& name); ///< private
bool mightContainContentTypeInDataRecord(StreamId streamId, ContentType type) const; ///< private

/// Record boundaries, in sequential order, but not necessarily in record order!
const vector<int64_t>& getRecordBoundaries() const; ///< private
int64_t getFollowingRecordOffset(uint32_t recordIndex, bool useBoundaries) const; ///< private
mutable vector<int64_t> recordBoundaries_; ///< private
mutable map<uint32_t, int64_t> recordLimits_; ///< private

// Members to read an open VRS file
std::unique_ptr<FileHandler> file_;
Expand All @@ -591,7 +598,6 @@ class RecordFileReader {
ProgressLogger* openProgressLogger_{&defaultProgressLogger_};
unique_ptr<std::thread> detailsSaveThread_;
mutable map<StreamId, vector<const IndexRecord::RecordInfo*>> streamIndex_;
mutable vector<int64_t> recordBoundaries_;
// Location of the last record searched for a specific stream & record type
// The pair: index of the record for the type (query), index of the record in the stream (result)
mutable map<pair<StreamId, Record::Type>, pair<uint32_t, size_t>> lastRequest_;
Expand Down
Binary file added vrs/oss/test_data/VRS_Files/chunks-shuffled.vrs
Binary file not shown.
23 changes: 23 additions & 0 deletions vrs/test/GetRecordTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace {
struct GetRecordTester : testing::Test {
string kTestFile = os::pathJoin(coretech::getTestDataDir(), "VRS_Files/sample_file.vrs");
string kTestFile2 = os::pathJoin(coretech::getTestDataDir(), "VRS_Files/simulated.vrs");
string kTestFile3 = os::pathJoin(coretech::getTestDataDir(), "VRS_Files/chunks.vrs");
string kTestFile4 = os::pathJoin(coretech::getTestDataDir(), "VRS_Files/chunks-shuffled.vrs");
};
} // namespace

Expand Down Expand Up @@ -374,3 +376,24 @@ TEST_F(GetRecordTester, GetRecordSizeTest) {
{StreamId(RecordableTypeId::SlamImuData, 1), 1673048}};
EXPECT_EQ(streamSizes, actualSizes);
}

static size_t testGetRecordSize(const string& filename) {
vrs::RecordFileReader file;
EXPECT_EQ(file.openFile(filename), 0);
file.buildRecordBoundaries(true);
size_t totalSize = 0;
for (uint32_t i = 0; i < file.getRecordCount(); i++) {
uint32_t recordSize = file.getRecordSize(i, true);
totalSize += recordSize;
EXPECT_EQ(file.getRecordSize(i, false), recordSize);
}
EXPECT_LT(totalSize, file.getTotalSourceSize());
return totalSize;
}

TEST_F(GetRecordTester, GetRecordLimitsTest) {
EXPECT_EQ(testGetRecordSize(kTestFile), 79955);
EXPECT_EQ(testGetRecordSize(kTestFile2), 21302235);
EXPECT_EQ(testGetRecordSize(kTestFile3), 79606);
EXPECT_EQ(testGetRecordSize(kTestFile4), 79612);
}

0 comments on commit c7862b3

Please sign in to comment.