Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL Analysis: move ownership of payloads to the fragment #13931

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions Framework/AnalysisSupport/src/RNTuplePlugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <ROOT/RFieldVisitor.hxx>
#include <ROOT/RNTupleInspector.hxx>
#include <ROOT/RVec.hxx>
#include <memory>
#include <TBufferFile.h>

#include <TDirectory.h>
Expand Down Expand Up @@ -51,10 +52,6 @@ class RNTupleFileSystem : public VirtualRootFileSystemBase
public:
~RNTupleFileSystem() override;

std::shared_ptr<VirtualRootFileSystemBase> GetSubFilesystem(arrow::dataset::FileSource source) override
{
return std::dynamic_pointer_cast<VirtualRootFileSystemBase>(shared_from_this());
};
virtual ROOT::Experimental::RNTuple* GetRNTuple(arrow::dataset::FileSource source) = 0;
};

Expand Down Expand Up @@ -100,9 +97,28 @@ class RNTupleFileFragment : public arrow::dataset::FileFragment
std::shared_ptr<arrow::dataset::FileFormat> format,
arrow::compute::Expression partition_expression,
std::shared_ptr<arrow::Schema> physical_schema)
: FileFragment(std::move(source), std::move(format), std::move(partition_expression), std::move(physical_schema))
: FileFragment(source, format, partition_expression, physical_schema)
{
auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
if (!fs.get()) {
throw runtime_error_f("Do not know how to extract %s from %s", source.path().c_str(), fs->type_name().c_str());
}
auto handler = fs->GetObjectHandler(source);
if (!handler->format->Equals(*format)) {
throw runtime_error_f("Format for %s does not match. Found %s, expected %s.", source.path().c_str(),
handler->format->type_name().c_str(),
format->type_name().c_str());
}
mNTuple = handler->GetObjectAsOwner<ROOT::Experimental::RNTuple>();
}

ROOT::Experimental::RNTuple* GetRNTuple()
{
return mNTuple.get();
}

private:
std::unique_ptr<ROOT::Experimental::RNTuple> mNTuple;
};

class RNTupleFileFormat : public arrow::dataset::FileFormat
Expand Down Expand Up @@ -133,11 +149,10 @@ class RNTupleFileFormat : public arrow::dataset::FileFormat
arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
{
auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
auto subFs = fs->GetSubFilesystem(source);
if (std::dynamic_pointer_cast<RNTupleFileSystem>(subFs)) {
return true;
if (!fs) {
return false;
}
return false;
return fs->CheckSupport(source);
}

arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
Expand Down Expand Up @@ -493,11 +508,12 @@ arrow::Result<std::shared_ptr<arrow::Schema>> RNTupleFileFormat::Inspect(const a

auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
// Actually get the TTree from the ROOT file.
auto ntupleFs = std::dynamic_pointer_cast<RNTupleFileSystem>(fs->GetSubFilesystem(source));
if (!ntupleFs.get()) {
throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
auto objectHandler = fs->GetObjectHandler(source);
if (objectHandler->format->type_name() != this->type_name()) {
throw runtime_error_f("Unexpected kind of filesystem %s to handle payload %s.\n", source.filesystem()->type_name().c_str(), source.path().c_str());
}
ROOT::Experimental::RNTuple* rntuple = ntupleFs->GetRNTuple(source);
// We know this is a RNTuple, so we can continue with the inspection.
auto rntuple = objectHandler->GetObjectAsOwner<ROOT::Experimental::RNTuple>().release();

auto inspector = ROOT::Experimental::RNTupleInspector::Create(rntuple);

Expand Down Expand Up @@ -526,11 +542,8 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
std::vector<std::shared_ptr<arrow::Array>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();

auto containerFS = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(ntupleFragment->source().filesystem());
auto fs = std::dynamic_pointer_cast<RNTupleFileSystem>(containerFS->GetSubFilesystem(ntupleFragment->source()));

int64_t rows = -1;
ROOT::Experimental::RNTuple* rntuple = fs->GetRNTuple(ntupleFragment->source());
ROOT::Experimental::RNTuple* rntuple = ntupleFragment->GetRNTuple();
auto reader = ROOT::Experimental::RNTupleReader::Open(rntuple);
auto& model = reader->GetModel();
for (auto& physicalField : fields) {
Expand Down Expand Up @@ -670,7 +683,7 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
if (!result.ok()) {
throw runtime_error("Cannot allocate offset buffer");
}
arrowOffsetBuffer = std::move(result).ValueUnsafe();
arrowOffsetBuffer = result.MoveValueUnsafe();

// Offset bulk
auto offsetBulk = model.CreateBulk(physicalField->name());
Expand All @@ -692,7 +705,7 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
if (!result.ok()) {
throw runtime_error("Cannot allocate values buffer");
}
arrowValuesBuffer = std::move(result).ValueUnsafe();
arrowValuesBuffer = result.MoveValueUnsafe();
ptr = (uint8_t*)(arrowValuesBuffer->mutable_data());
// Calculate the size of the buffer here.
for (size_t i = 0; i < total; i++) {
Expand Down Expand Up @@ -811,9 +824,9 @@ arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> RNTupleFileFormat::
{
std::shared_ptr<arrow::dataset::FileFormat> format = std::make_shared<RNTupleFileFormat>(mTotCompressedSize, mTotUncompressedSize);

auto fragment = std::make_shared<RNTupleFileFragment>(std::move(source), std::move(format),
std::move(partition_expression),
std::move(physical_schema));
auto fragment = std::make_shared<RNTupleFileFragment>(source, format,
partition_expression,
physical_schema);
return std::dynamic_pointer_cast<arrow::dataset::FileFragment>(fragment);
}

Expand All @@ -839,9 +852,6 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin {
return new RootArrowFactory{
.options = [context]() { return context->format->DefaultWriteOptions(); },
.format = [context]() { return context->format; },
.getSubFilesystem = [](void* handle) {
auto rntuple = (ROOT::Experimental::RNTuple*)handle;
return std::shared_ptr<VirtualRootFileSystemBase>(new SingleRNTupleFileSystem(rntuple)); },
};
}
};
Expand Down
Loading