From 36ca324205427c8f43630739e9d6a225344bf087 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Fri, 14 Jun 2024 16:14:30 -0500 Subject: [PATCH] Added RNTuple read/write prototype Added RNTupleSource and RNTupleOutputModule --- FWIO/RNTuple/bin/BuildFile.xml | 4 + FWIO/RNTuple/bin/edmRntupleStorage.cc | 152 +++++ FWIO/RNTuple/plugins/BuildFile.xml | 8 + FWIO/RNTuple/plugins/DataProductsRNTuple.cc | 86 +++ FWIO/RNTuple/plugins/DataProductsRNTuple.h | 78 +++ FWIO/RNTuple/plugins/RNTupleDelayedReader.cc | 58 ++ FWIO/RNTuple/plugins/RNTupleDelayedReader.h | 60 ++ FWIO/RNTuple/plugins/RNTupleInputFile.cc | 199 ++++++ FWIO/RNTuple/plugins/RNTupleInputFile.h | 66 ++ FWIO/RNTuple/plugins/RNTupleOutputFile.cc | 589 ++++++++++++++++++ FWIO/RNTuple/plugins/RNTupleOutputFile.h | 149 +++++ FWIO/RNTuple/plugins/RNTupleOutputModule.cc | 261 ++++++++ FWIO/RNTuple/plugins/RNTupleSource.cc | 328 ++++++++++ FWIO/RNTuple/test/BuildFile.xml | 4 + FWIO/RNTuple/test/test_catch2_main.cc | 2 + FWIO/RNTuple/test/test_catch2_output2input.cc | 224 +++++++ .../Common/test/test_catch2_output2input.cc | 66 +- 17 files changed, 2333 insertions(+), 1 deletion(-) create mode 100644 FWIO/RNTuple/bin/BuildFile.xml create mode 100644 FWIO/RNTuple/bin/edmRntupleStorage.cc create mode 100644 FWIO/RNTuple/plugins/BuildFile.xml create mode 100644 FWIO/RNTuple/plugins/DataProductsRNTuple.cc create mode 100644 FWIO/RNTuple/plugins/DataProductsRNTuple.h create mode 100644 FWIO/RNTuple/plugins/RNTupleDelayedReader.cc create mode 100644 FWIO/RNTuple/plugins/RNTupleDelayedReader.h create mode 100644 FWIO/RNTuple/plugins/RNTupleInputFile.cc create mode 100644 FWIO/RNTuple/plugins/RNTupleInputFile.h create mode 100644 FWIO/RNTuple/plugins/RNTupleOutputFile.cc create mode 100644 FWIO/RNTuple/plugins/RNTupleOutputFile.h create mode 100644 FWIO/RNTuple/plugins/RNTupleOutputModule.cc create mode 100644 FWIO/RNTuple/plugins/RNTupleSource.cc create mode 100644 FWIO/RNTuple/test/BuildFile.xml create mode 100644 FWIO/RNTuple/test/test_catch2_main.cc create mode 100644 FWIO/RNTuple/test/test_catch2_output2input.cc diff --git a/FWIO/RNTuple/bin/BuildFile.xml b/FWIO/RNTuple/bin/BuildFile.xml new file mode 100644 index 0000000000000..cb7f71a6cbd08 --- /dev/null +++ b/FWIO/RNTuple/bin/BuildFile.xml @@ -0,0 +1,4 @@ + + + + diff --git a/FWIO/RNTuple/bin/edmRntupleStorage.cc b/FWIO/RNTuple/bin/edmRntupleStorage.cc new file mode 100644 index 0000000000000..f98bf2f9ebcd4 --- /dev/null +++ b/FWIO/RNTuple/bin/edmRntupleStorage.cc @@ -0,0 +1,152 @@ +#include "TFile.h" +#include "ROOT/RNTupleReader.hxx" +#include "ROOT/RNTuple.hxx" +#include +#include +#include + +namespace { + + class InfoDump { + public: + InfoDump(std::string iOut) : dump_(std::move(iOut)) {} + + std::optional nextLine(); + std::optional> nextFieldInfo(); + void moveToStartOfFields(); + void moveToLineWith(std::string_view); + + private: + std::string dump_; + std::string::size_type start_ = 0; + }; + + std::optional InfoDump::nextLine() { + auto lastStart = start_; + start_ = dump_.find('\n', start_); + if (start_ == std::string::npos) { + return std::optional(); + } + return std::string_view(dump_.data() + lastStart, start_++ - lastStart); + } + + void InfoDump::moveToStartOfFields() { + auto line = nextLine(); + while (line) { + if (*line == "COLUMN DETAILS") { + nextLine(); + return; + } + line = nextLine(); + } + return; + } + + std::optional> InfoDump::nextFieldInfo() { + auto line = nextLine(); + if (not line) { + return {}; + } + auto name = line->substr(2, line->find_first_of(" .", 2) - 2); + + nextLine(); + nextLine(); + nextLine(); + nextLine(); + line = nextLine(); + //std::cout <substr(line->find_first_not_of(" ",line->find_first_of(":")+1))<substr(line->find_first_not_of(" ", line->find_first_of(":") + 1)).data()); + moveToLineWith("............................................................"); + return std::make_pair(name, size); + } + + void InfoDump::moveToLineWith(std::string_view iCheck) { + auto line = nextLine(); + while (line) { + if (*line == iCheck) { + return; + } + line = nextLine(); + } + return; + } +} // namespace + +int main(int iArgc, char const* iArgv[]) { + // Add options here + + boost::program_options::options_description desc("Allowed options"); + desc.add_options()("help,h", "print help message")( + "file,f", boost::program_options::value(), "data file")("print,P", "Print list of data products")( + "verbose,v", "Verbose printout")("printProductDetails,p", "Call PrintInfo() for selected rntuple")( + "rntuple,r", boost::program_options::value(), "Select rntuple used with -P and -p options")( + "sizeSummary,s", "Print size on disk for each data product")( + "events,e", + "Print list of all Events, Runs, and LuminosityBlocks in the file sorted by run number, luminosity block number, " + "and event number. Also prints the entry numbers and whether it is possible to use fast copy with the file.")( + "eventsInLumis", "Print how many Events are in each LuminosityBlock."); + + // What rntuples do we require for this to be a valid collection? + std::vector expectedRNTuples; + expectedRNTuples.push_back("MetaData"); + expectedRNTuples.push_back("Events"); + + boost::program_options::positional_options_description p; + p.add("file", -1); + + boost::program_options::variables_map vm; + + try { + boost::program_options::store( + boost::program_options::command_line_parser(iArgc, iArgv).options(desc).positional(p).run(), vm); + } catch (boost::program_options::error const& x) { + std::cerr << "Option parsing failure:\n" << x.what() << "\n\n"; + std::cerr << desc << "\n"; + return 1; + } + + boost::program_options::notify(vm); + + if (vm.count("help")) { + std::cout << desc << "\n"; + return 1; + } + + using namespace ROOT::Experimental; + + auto file = TFile::Open(vm["file"].as().c_str(), "r"); + + auto ntuple = RNTupleReader::Open(*file->Get("Events")); + + if (vm.count("printProductDetails")) { + ntuple->PrintInfo(ENTupleInfo::kStorageDetails, std::cout); + return 0; + } + std::stringstream s; + ntuple->PrintInfo(ENTupleInfo::kStorageDetails, s); + + InfoDump info{s.str()}; + + info.moveToStartOfFields(); + + std::string presentField; + unsigned long long size = 0; + auto field = info.nextFieldInfo(); + while (field) { + if (field->first == presentField) { + size += field->second; + } else { + if (not presentField.empty()) { + std::cout << presentField << " " << size << std::endl; + } + presentField = field->first; + size = 0; + } + field = info.nextFieldInfo(); + } + if (not presentField.empty()) { + std::cout << presentField << " " << size << std::endl; + } + + return 0; +} diff --git a/FWIO/RNTuple/plugins/BuildFile.xml b/FWIO/RNTuple/plugins/BuildFile.xml new file mode 100644 index 0000000000000..f39c83c373b3d --- /dev/null +++ b/FWIO/RNTuple/plugins/BuildFile.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/FWIO/RNTuple/plugins/DataProductsRNTuple.cc b/FWIO/RNTuple/plugins/DataProductsRNTuple.cc new file mode 100644 index 0000000000000..c7ea3df87d1cc --- /dev/null +++ b/FWIO/RNTuple/plugins/DataProductsRNTuple.cc @@ -0,0 +1,86 @@ +#include "DataProductsRNTuple.h" +#include "ROOT/RNTuple.hxx" +#include "IOPool/Common/interface/getWrapperBasePtr.h" +#include "FWCore/Utilities/interface/EDMException.h" + +using namespace edm::input; +using namespace ROOT::Experimental; + +namespace { + std::string fixName(std::string_view iName) { + if (not iName.empty() and iName.back() == '.') { + iName.remove_suffix(1); + } + return std::string(iName); + } + + std::unique_ptr get_and_check_RNTuple(TFile* iFile, std::string const& iName) { + auto ret = std::unique_ptr(iFile->Get(iName.c_str())); + if (not ret) { + throw edm::Exception(edm::errors::FileReadError) + << "The entry for '" << iName << "' does not exist or is not an RNTuple"; + } + return ret; + } + +} // namespace + +DataProductsRNTuple::DataProductsRNTuple(TFile* iFile, + std::string const& iName, + std::string const& iAux, + ROOT::Experimental::RNTupleReadOptions const& iOps) + : reader_(RNTupleReader::Open(*get_and_check_RNTuple(iFile, iName), iOps)) { + auxDesc_ = reader_->GetDescriptor().FindFieldId(iAux); +} + +bool DataProductsRNTuple::setupToReadProductIfAvailable(ProductDescription& iProduct) { + auto fixedName = fixName(iProduct.branchName()); + auto desc = reader_->GetDescriptor().FindFieldId(fixedName); + if (desc == ROOT::Experimental::kInvalidDescriptorId) { + return false; + } + iProduct.initFromDictionary(); + iProduct.setOnDemand(true); + infos_.emplace(iProduct.branchID().id(), ProductInfo(iProduct.wrappedName(), desc, std::move(fixedName))); + return true; +} + +TClass const* DataProductsRNTuple::WrapperFactory::wrapperBase() { + static TClass const* const s_base = TClass::GetClass("edm::WrapperBase"); + return s_base; +} + +DataProductsRNTuple::WrapperFactory::WrapperFactory(std::string const& iTypeName) + : wrapperClass_(TClass::GetClass(iTypeName.c_str())) { + offsetToWrapperBase_ = wrapperClass_->GetBaseClassOffset(wrapperBase()); +} + +void DataProductsRNTuple::WrapperFactory::Deleter::operator()(void* iPtr) const { class_->Destructor(iPtr); } + +std::unique_ptr DataProductsRNTuple::WrapperFactory::newWrapper() + const { + return std::unique_ptr(wrapperClass_->New(), Deleter(wrapperClass_)); +} + +std::shared_ptr DataProductsRNTuple::WrapperFactory::toWrapperBase( + std::unique_ptr iProduct) const { + return getWrapperBasePtr(iProduct.release(), offsetToWrapperBase_); +} + +std::shared_ptr DataProductsRNTuple::dataProduct(edm::BranchID const& iProduct, int iEntry) { + auto const& info = infos_.find(iProduct.id()); + if (info == infos_.end()) { + throw cms::Exception("RNTupleError") << " unable to find branch id " << iProduct.id() << " for entry " << iEntry; + } + + //std::cout <<"dataProduct "<second.name_<second.factory_.newWrapper(); + if (not info->second.view_) { + info->second.view_ = reader_->GetView(info->second.descriptor_, product.get()); + } else { + info->second.view_->BindRawPtr(product.get()); + } + (*info->second.view_)(iEntry); + + return info->second.factory_.toWrapperBase(std::move(product)); +} diff --git a/FWIO/RNTuple/plugins/DataProductsRNTuple.h b/FWIO/RNTuple/plugins/DataProductsRNTuple.h new file mode 100644 index 0000000000000..dd4ef1b60dd1d --- /dev/null +++ b/FWIO/RNTuple/plugins/DataProductsRNTuple.h @@ -0,0 +1,78 @@ +#if !defined(DataProductsRNTuple_h) +#define DataProductsRNTuple_h + +#include "DataFormats/Provenance/interface/ProductDescription.h" +#include "DataFormats/Common/interface/WrapperBase.h" +#include "FWCore/Utilities/interface/InputType.h" +#include "ROOT/RNTupleReader.hxx" +#include "ROOT/RNTupleReadOptions.hxx" +#include "TFile.h" +#include "TClass.h" +#include +#include +#include + +namespace edm::input { + class DataProductsRNTuple { + public: + DataProductsRNTuple(TFile*, + std::string const& iName, + std::string const& iAux, + ROOT::Experimental::RNTupleReadOptions const& iOpts); + + bool setupToReadProductIfAvailable(ProductDescription&); + + std::shared_ptr dataProduct(edm::BranchID const&, int iEntry); + + template + ROOT::Experimental::RNTupleView auxView(std::shared_ptr oStorage) { + return reader_->GetView(auxDesc_, std::move(oStorage)); + } + + ROOT::Experimental::NTupleSize_t numberOfEntries() const { return reader_->GetNEntries(); } + + ROOT::Experimental::DescriptorId_t findDescriptorID(std::string const& iFieldName) { + return reader_->GetDescriptor().FindFieldId(iFieldName); + } + + template + ROOT::Experimental::RNTupleView viewFor(ROOT::Experimental::DescriptorId_t iID, std::shared_ptr oStorage) { + return reader_->GetView(iID, std::move(oStorage)); + } + + void printInfo(std::ostream& iStream) { reader_->PrintInfo(ROOT::Experimental::ENTupleInfo::kMetrics, iStream); } + + private: + struct WrapperFactory { + struct Deleter { + Deleter(TClass* iClass) : class_(iClass) {} + void operator()(void*) const; + TClass* class_; + }; + + WrapperFactory(std::string const& iTypeName); + + std::unique_ptr newWrapper() const; + std::shared_ptr toWrapperBase(std::unique_ptr) const; + TClass* wrapperClass_; + Int_t offsetToWrapperBase_; + + static TClass const* wrapperBase(); + }; + + struct ProductInfo { + ProductInfo(std::string const& iTypeName, ROOT::Experimental::DescriptorId_t iDesc, std::string iName) + : factory_(iTypeName), descriptor_(iDesc), name_(std::move(iName)) {} + WrapperFactory factory_; + ROOT::Experimental::DescriptorId_t descriptor_; + std::string name_; + std::optional> view_; + }; + + std::unique_ptr reader_; + std::unordered_map infos_; + ROOT::Experimental::DescriptorId_t auxDesc_; + }; +} // namespace edm::input + +#endif diff --git a/FWIO/RNTuple/plugins/RNTupleDelayedReader.cc b/FWIO/RNTuple/plugins/RNTupleDelayedReader.cc new file mode 100644 index 0000000000000..5dadf43670142 --- /dev/null +++ b/FWIO/RNTuple/plugins/RNTupleDelayedReader.cc @@ -0,0 +1,58 @@ +#include "RNTupleDelayedReader.h" +#include "DataProductsRNTuple.h" + +#include "DataFormats/Common/interface/EDProductGetter.h" +#include "DataFormats/Common/interface/RefCoreStreamer.h" + +#include "FWCore/Framework/interface/SharedResourcesAcquirer.h" +#include "FWCore/Framework/interface/SharedResourcesRegistry.h" + +#include "IOPool/Common/interface/getWrapperBasePtr.h" + +#include "FWCore/Utilities/interface/EDMException.h" + +namespace edm::input { + RNTupleDelayedReader::RNTupleDelayedReader(DataProductsRNTuple* iRNTuple, + SharedResourcesAcquirer* iAcquirer, + std::recursive_mutex* iMutex) + : rntuple_(iRNTuple), resourceAcquirer_(iAcquirer), mutex_(iMutex) {} + + std::pair RNTupleDelayedReader::sharedResources_() const { + return std::make_pair(resourceAcquirer_, mutex_); + } + + std::shared_ptr RNTupleDelayedReader::getProduct_(BranchID const& k, EDProductGetter const* ep) { + if (lastException_) { + try { + std::rethrow_exception(lastException_); + } catch (edm::Exception const& e) { + //avoid growing the context each time the exception is rethrown. + auto copy = e; + copy.addContext("Rethrowing an exception that happened on a different read request."); + throw copy; + } catch (cms::Exception& e) { + //If we do anything here to 'copy', we would lose the actual type of the exception. + e.addContext("Rethrowing an exception that happened on a different read request."); + throw; + } + } + + setRefCoreStreamer(ep); + //make code exception safe + std::shared_ptr refCoreStreamerGuard(nullptr, [](void*) { setRefCoreStreamer(false); }); + + std::shared_ptr edp; + try { + edp = rntuple_->dataProduct(k, entry_); + } catch (...) { + lastException_ = std::current_exception(); + std::rethrow_exception(lastException_); + } + //if (rntuple_->branchType() == InEvent) { + // CMS-THREADING For the primary input source calls to this function need to be serialized + //InputFile::reportReadBranch(inputType_, std::string(br->GetName())); + //} + + return edp; + } +} // namespace edm::input diff --git a/FWIO/RNTuple/plugins/RNTupleDelayedReader.h b/FWIO/RNTuple/plugins/RNTupleDelayedReader.h new file mode 100644 index 0000000000000..b025c18a34781 --- /dev/null +++ b/FWIO/RNTuple/plugins/RNTupleDelayedReader.h @@ -0,0 +1,60 @@ +#if !defined(RNTupleDelayedReader_h) +#define RNTupleDelayedReader_h + +#include "DataFormats/Provenance/interface/BranchID.h" +#include "FWCore/Framework/interface/DelayedReader.h" +#include "FWCore/Utilities/interface/InputType.h" +#include "FWCore/Utilities/interface/propagate_const.h" +#include "FWCore/Utilities/interface/thread_safety_macros.h" + +namespace edm::input { + class DataProductsRNTuple; + + class RNTupleDelayedReader : public DelayedReader { + public: + RNTupleDelayedReader(DataProductsRNTuple* iRNTuple, SharedResourcesAcquirer*, std::recursive_mutex*); + + void setEntry(int iEntry) { entry_ = iEntry; } + + signalslot::Signal const* preEventReadFromSourceSignal() + const final { + return preEventReadFromSourceSignal_; + } + signalslot::Signal const* postEventReadFromSourceSignal() + const final { + return postEventReadFromSourceSignal_; + } + + void setSignals( + signalslot::Signal const* preEventReadSource, + signalslot::Signal const* postEventReadSource) { + preEventReadFromSourceSignal_ = preEventReadSource; + postEventReadFromSourceSignal_ = postEventReadSource; + } + + private: + std::shared_ptr getProduct_(BranchID const& k, EDProductGetter const* ep) final; + void mergeReaders_(DelayedReader*) final {} + void reset_() final {} + std::pair sharedResources_() const final; + + DataProductsRNTuple* rntuple_; + SharedResourcesAcquirer* resourceAcquirer_; // We do not use propagate_const because the acquirer is itself mutable. + std::recursive_mutex* mutex_; + InputType inputType_; + int entry_ = 0; + + signalslot::Signal const* preEventReadFromSourceSignal_ = + nullptr; + signalslot::Signal const* postEventReadFromSourceSignal_ = + nullptr; + + //If a fatal exception happens we need to make a copy so we can + // rethrow that exception on other threads. This avoids TTree + // non-exception safety problems on later calls to TTree. + //All uses of the ROOT file are serialized + CMS_SA_ALLOW mutable std::exception_ptr lastException_; + }; +} // namespace edm::input + +#endif diff --git a/FWIO/RNTuple/plugins/RNTupleInputFile.cc b/FWIO/RNTuple/plugins/RNTupleInputFile.cc new file mode 100644 index 0000000000000..61ef1ffd24eb7 --- /dev/null +++ b/FWIO/RNTuple/plugins/RNTupleInputFile.cc @@ -0,0 +1,199 @@ +#include "RNTupleInputFile.h" + +#include "DataFormats/Provenance/interface/EventAuxiliary.h" +#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" +#include "DataFormats/Provenance/interface/RunAuxiliary.h" +#include "DataFormats/Provenance/interface/FileID.h" +#include "DataFormats/Provenance/interface/StoredMergeableRunProductMetadata.h" +#include "DataFormats/Provenance/interface/ProcessHistory.h" +#include "DataFormats/Provenance/interface/ProductRegistry.h" +#include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h" +#include "DataFormats/Provenance/interface/ProductDependencies.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "DataFormats/Provenance/interface/Parentage.h" +#include "DataFormats/Provenance/interface/ParentageRegistry.h" +#include "ROOT/RNTupleReadOptions.hxx" + +#include +#include +using namespace ROOT::Experimental; +namespace { + ROOT::Experimental::RNTupleReadOptions options(edm::RNTupleInputFile::Options const& iOpt) { + ROOT::Experimental::RNTupleReadOptions opt; + opt.SetMetricsEnabled(iOpt.enableMetrics_); + opt.SetClusterCache(iOpt.useClusterCache_ ? RNTupleReadOptions::EClusterCache::kOn + : RNTupleReadOptions::EClusterCache::kOff); + return opt; + } +} // namespace +namespace edm { + + RNTupleInputFile::RNTupleInputFile(std::string const& iName, Options const& iOpt) + : file_(TFile::Open(iName.c_str())), + runs_(file_.get(), "Runs", "RunAuxiliary", {}), + lumis_(file_.get(), "LuminosityBlocks", "LuminosityBlockAuxiliary", {}), + events_(file_.get(), "Events", "EventAuxiliary", options(iOpt)) {} + + std::vector RNTupleInputFile::readParentage() { + auto parentageTuple = RNTupleReader::Open(*file_->Get("Parentage")); + auto entry = parentageTuple->GetModel().CreateBareEntry(); + + edm::Parentage parentage; + entry->BindRawPtr("Description", &parentage); + std::vector retValue; + + ParentageRegistry& registry = *ParentageRegistry::instance(); + + retValue.reserve(parentageTuple->GetNEntries()); + + for (ROOT::Experimental::NTupleSize_t i = 0; i < parentageTuple->GetNEntries(); ++i) { + parentageTuple->LoadEntry(i, *entry); + registry.insertMapped(parentage); + retValue.push_back(parentage.id()); + } + + return retValue; + } + + void RNTupleInputFile::readMeta(edm::ProductRegistry& iReg, + edm::ProcessHistoryRegistry& iHist, + BranchIDLists& iBranchIDLists) { + auto meta = RNTupleReader::Open(*file_->Get("MetaData")); + assert(meta.get()); + + //BEWARE, if you do not 'BindRawPtr' to all top level Fields, + // using CreateBareEntry with LoadEntry will seg-fault! + //auto entry = meta->GetModel().CreateBareEntry(); + auto entry = meta->GetModel().CreateEntry(); + + entry->BindRawPtr("IndexIntoFile", &index_); + + edm::FileID id; + entry->BindRawPtr("FileIdentifier", &id); + + edm::StoredMergeableRunProductMetadata mergeable; + entry->BindRawPtr("MergeableRunProductMetadata", &mergeable); + + std::vector processHist; + entry->BindRawPtr("ProcessHistory", &processHist); + + edm::ProductRegistry reg; + entry->BindRawPtr("ProductRegistry", ®); + + entry->BindRawPtr("BranchIDLists", &iBranchIDLists); + + edm::ThinnedAssociationsHelper thinned; + entry->BindRawPtr("ThinnedAssociationsHelper", &thinned); + + edm::ProductDependencies productDependencies; + entry->BindRawPtr("ProductDependencies", &productDependencies); + + meta->LoadEntry(0, *entry); + + { + auto& pList = reg.productListUpdator(); + for (auto& product : pList) { + ProductDescription& prod = product.second; + prod.initBranchName(); + if (not prod.present()) + continue; + if (prod.branchType() == InEvent) { + prod.setDropped(not events_.setupToReadProductIfAvailable(prod)); + } else if (prod.branchType() == InLumi) { + prod.setDropped(not lumis_.setupToReadProductIfAvailable(prod)); + } else if (prod.branchType() == InRun) { + prod.setDropped(runs_.setupToReadProductIfAvailable(prod)); + } + } + } + reg.setFrozen(false); + iReg.updateFromInput(reg.productList()); + + for (auto const& h : processHist) { + iHist.registerProcessHistory(h); + } + + std::vector orderedHistory; + index_.fixIndexes(orderedHistory); + index_.setNumberOfEvents(events_.numberOfEntries()); + //index_.setEventFinder(); + bool needEventNumbers = false; + bool needEventEntries = false; + index_.fillEventNumbersOrEntries(needEventNumbers, needEventEntries); + + iter_ = index_.begin(IndexIntoFile::firstAppearanceOrder); + iterEnd_ = index_.end(IndexIntoFile::firstAppearanceOrder); + } + + IndexIntoFile::EntryType RNTupleInputFile::getNextItemType() { + if (*iter_ == *iterEnd_) { + return IndexIntoFile::kEnd; + } + return iter_->getEntryType(); + } + + IndexIntoFile::EntryNumber_t RNTupleInputFile::readLuminosityBlock() { + assert(*iter_ != *iterEnd_); + assert(iter_->getEntryType() == IndexIntoFile::kLumi); + auto v = iter_->entry(); + ++(*iter_); + return v; + } + + std::shared_ptr RNTupleInputFile::readLuminosityBlockAuxiliary() { + auto lumiAux = std::make_shared(); + assert(*iter_ != *iterEnd_); + assert(iter_->getEntryType() == IndexIntoFile::kLumi); + if (!lumiAuxView_) { + lumiAuxView_ = lumis_.auxView(lumiAux); + } else { + lumiAuxView_->Bind(lumiAux); + } + (*lumiAuxView_)(iter_->entry()); + return lumiAux; + } + + IndexIntoFile::EntryNumber_t RNTupleInputFile::readEvent() { + assert(*iter_ != *iterEnd_); + assert(iter_->getEntryType() == IndexIntoFile::kEvent); + auto v = iter_->entry(); + ++(*iter_); + return v; + } + + std::shared_ptr RNTupleInputFile::readEventAuxiliary() { + auto eventAux = std::make_shared(); + assert(*iter_ != *iterEnd_); + assert(iter_->getEntryType() == IndexIntoFile::kEvent); + if (!eventAuxView_) { + eventAuxView_ = events_.auxView(eventAux); + } else { + eventAuxView_->Bind(eventAux); + } + (*eventAuxView_)(iter_->entry()); + return eventAux; + } + + std::shared_ptr RNTupleInputFile::readRunAuxiliary() { + auto runAux = std::make_shared(); + assert(*iter_ != *iterEnd_); + assert(iter_->getEntryType() == IndexIntoFile::kRun); + + if (!runAuxView_) { + runAuxView_ = runs_.auxView(runAux); + } else { + runAuxView_->Bind(runAux); + } + (*runAuxView_)(iter_->entry()); + return runAux; + } + + IndexIntoFile::EntryNumber_t RNTupleInputFile::readRun() { + assert(*iter_ != *iterEnd_); + assert(iter_->getEntryType() == IndexIntoFile::kRun); + auto v = iter_->entry(); + ++(*iter_); + return v; + } + +} // namespace edm diff --git a/FWIO/RNTuple/plugins/RNTupleInputFile.h b/FWIO/RNTuple/plugins/RNTupleInputFile.h new file mode 100644 index 0000000000000..14b257894c8b4 --- /dev/null +++ b/FWIO/RNTuple/plugins/RNTupleInputFile.h @@ -0,0 +1,66 @@ +#ifndef FWIO_RNTuple_RNTupleInputFile_h +#define FWIO_RNTuple_RNTupleInputFile_h + +#include "DataFormats/Provenance/interface/IndexIntoFile.h" +#include "DataFormats/Provenance/interface/ParentageID.h" +#include "DataProductsRNTuple.h" + +#include "TFile.h" +#include "ROOT/RNTuple.hxx" +#include "ROOT/RNTupleReader.hxx" + +#include +#include + +namespace edm { + class RunAuxiliary; + class LuminosityBlockAuxiliary; + class EventAuxiliary; + class ProductRegistry; + class ProcessHistoryRegistry; + + class RNTupleInputFile { + public: + struct Options { + bool enableMetrics_ = false; + bool useClusterCache_ = true; + }; + RNTupleInputFile(std::string const& iFileName, Options const& iOpts); + + IndexIntoFile::EntryType getNextItemType(); + + std::shared_ptr readLuminosityBlockAuxiliary(); + IndexIntoFile::EntryNumber_t readLuminosityBlock(); + + std::shared_ptr readEventAuxiliary(); + IndexIntoFile::EntryNumber_t readEvent(); + + std::shared_ptr readRunAuxiliary(); + IndexIntoFile::EntryNumber_t readRun(); + + void readMeta(ProductRegistry&, ProcessHistoryRegistry&, BranchIDLists& iBranchIDLists); + std::vector readParentage(); + + input::DataProductsRNTuple* runProducts() { return &runs_; } + input::DataProductsRNTuple* luminosityBlockProducts() { return &lumis_; } + input::DataProductsRNTuple* eventProducts() { return &events_; } + + void printInfoForEvent(std::ostream& iOStream) { events_.printInfo(iOStream); } + + private: + std::unique_ptr file_; + + input::DataProductsRNTuple runs_; + input::DataProductsRNTuple lumis_; + input::DataProductsRNTuple events_; + + std::optional> runAuxView_; + std::optional> lumiAuxView_; + std::optional> eventAuxView_; + + IndexIntoFile index_; + std::optional iter_; + std::optional iterEnd_; + }; +} // namespace edm +#endif diff --git a/FWIO/RNTuple/plugins/RNTupleOutputFile.cc b/FWIO/RNTuple/plugins/RNTupleOutputFile.cc new file mode 100644 index 0000000000000..4c37fa636af74 --- /dev/null +++ b/FWIO/RNTuple/plugins/RNTupleOutputFile.cc @@ -0,0 +1,589 @@ +#include "RNTupleOutputFile.h" + +#include "FWCore/Framework/interface/RunForOutput.h" +#include "FWCore/Framework/interface/LuminosityBlockForOutput.h" +#include "FWCore/Framework/interface/EventForOutput.h" +#include "FWCore/Framework/interface/FileBlock.h" +#include "FWCore/Framework/interface/ProductProvenanceRetriever.h" + +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ParameterSet/interface/Registry.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/Utilities/interface/GlobalIdentifier.h" +#include "FWCore/Utilities/interface/ConvertException.h" + +#include "DataFormats/Provenance/interface/ParentageRegistry.h" +#include "DataFormats/Provenance/interface/ProductRegistry.h" +#include "DataFormats/Provenance/interface/FileID.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "DataFormats/Provenance/interface/Provenance.h" +#include "DataFormats/Provenance/interface/IndexIntoFile.h" +#include "DataFormats/Provenance/interface/ProductDependencies.h" + +#include "IOPool/Common/interface/getWrapperBasePtr.h" + +#include "TFile.h" +#include "ROOT/RNTuple.hxx" +#include "ROOT/RNTupleWriter.hxx" +#include +#include +#include + +namespace { + ROOT::RCompressionSetting::EAlgorithm::EValues convert(edm::rntuple::CompressionAlgos iAlgos) { + using namespace edm::rntuple; + using namespace ROOT; + switch (iAlgos) { + case CompressionAlgos::kLZMA: + return RCompressionSetting::EAlgorithm::kLZMA; + case CompressionAlgos::kZSTD: + return RCompressionSetting::EAlgorithm::kZSTD; + case CompressionAlgos::kZLIB: + return RCompressionSetting::EAlgorithm::kZLIB; + case CompressionAlgos::kLZ4: + return RCompressionSetting::EAlgorithm::kLZ4; + } + return RCompressionSetting::EAlgorithm::kZSTD; + } +} // namespace + +using namespace ROOT::Experimental; +namespace edm { + RNTupleOutputFile::RNTupleOutputFile(std::string const& iFileName, + FileBlock const& iFileBlock, + SelectedProductsForBranchType const& iSelected, + Config const& iConfig, + bool anyProductProduced) + : file_(iFileName.c_str(), "recreate", ""), + wrapperBaseTClass_(TClass::GetClass("edm::WrapperBase")), + selectorConfig_(iConfig.selectorConfig), + dropMetaData_(iConfig.dropMetaData) { + setupRuns(iSelected[InRun], iConfig); + setupLumis(iSelected[InLumi], iConfig); + setupEvents(iSelected[InEvent], iConfig, anyProductProduced); + setupPSets(iConfig); + setupParentage(iConfig); + setupMetaData(iConfig); + + auto const& branchToChildMap = iFileBlock.productDependencies().childLookup(); + for (auto const& parentToChildren : branchToChildMap) { + for (auto const& child : parentToChildren.second) { + productDependencies_.insertChild(parentToChildren.first, child); + } + } + } + + namespace { + std::string fixBranchName(std::string const& iName) { + //need to remove the '.' at the end of the branch name + return iName.substr(0, iName.size() - 1); + } + + /* By default RNTuple will take a multi-byte intrinsic data type and break +it into multiple output fields to separate the high-bytes from the low-bytes (or mantessa from exponent). +This typically allows for better compression. Empirically we have found that some important +member data of some classes actually take more space on disk when this is done. +This function allows one to override the default RNTuple behavior and instead store +all bytes of a data type in one field. To do that one must find the storage type (typeName) and +explicitly pass the correct variable to `SetColumnRepresentatives`). + */ + void noSplitField(ROOT::Experimental::RFieldBase& iField) { + auto const& typeName = iField.GetTypeName(); + if (typeName == "std::uint16_t") { + iField.SetColumnRepresentatives({{ROOT::Experimental::EColumnType::kUInt16}}); + } else if (typeName == "std::uint32_t") { + iField.SetColumnRepresentatives({{ROOT::Experimental::EColumnType::kUInt32}}); + } else if (typeName == "std::uint64_t") { + iField.SetColumnRepresentatives({{ROOT::Experimental::EColumnType::kUInt64}}); + } else if (typeName == "std::int16_t") { + iField.SetColumnRepresentatives({{ROOT::Experimental::EColumnType::kInt16}}); + } else if (typeName == "std::int32_t") { + iField.SetColumnRepresentatives({{ROOT::Experimental::EColumnType::kInt32}}); + } else if (typeName == "std::int64_t") { + iField.SetColumnRepresentatives({{ROOT::Experimental::EColumnType::kInt64}}); + } else if (typeName == "float") { + iField.SetColumnRepresentatives({{ROOT::Experimental::EColumnType::kReal32}}); + } else if (typeName == "double") { + iField.SetColumnRepresentatives({{ROOT::Experimental::EColumnType::kReal64}}); + } + } + + void findSubFieldsForNoSplitThenApply(ROOT::Experimental::RFieldBase& iField, + std::vector const& iNoSplitFields) { + for (auto const& name : iNoSplitFields) { + if (name.starts_with(iField.GetFieldName())) { + bool found = false; + for (auto& subfield : iField) { + if (subfield.GetQualifiedFieldName() == name) { + found = true; + noSplitField(subfield); + break; + } + } + if (not found) { + throw edm::Exception(edm::errors::Configuration) + << "The data product was found but the requested subfield '" << name << "' is not part of the class"; + } + } + } + } + } // namespace + + void RNTupleOutputFile::setupDataProducts(SelectedProducts const& iProducts, + std::vector const& iUseStreamer, + std::vector const& iNoSplitFields, + RNTupleModel& iModel) { + unsigned int index = 0; + const bool noSplitSubFields = (iNoSplitFields.size() == 1 and iNoSplitFields[0] == "all") ? true : false; + for (auto const& prod : iProducts) { + try { + edm::convertException::wrap([&]() { + if (index >= iUseStreamer.size() or not iUseStreamer[index]) { + auto field = ROOT::Experimental::RFieldBase::Create(fixBranchName(prod.first->branchName()), + prod.first->wrappedName()) + .Unwrap(); + if (noSplitSubFields) { + //use the 'conventional' way to store fields + for (auto& subfield : *field) { + noSplitField(subfield); + } + } else if (not iNoSplitFields.empty()) { + findSubFieldsForNoSplitThenApply(*field, iNoSplitFields); + } + iModel.AddField(std::move(field)); + } else { + auto field = std::make_unique(fixBranchName(prod.first->branchName()), + prod.first->wrappedName()); + iModel.AddField(std::move(field)); + } + branchesWithStoredHistory_.insert(prod.first->branchID()); + }); + ++index; + } catch (cms::Exception& iExcept) { + using namespace std::string_literals; + iExcept.addContext("while setting up field "s + prod.first->branchName()); + throw; + } + } + } + + std::vector RNTupleOutputFile::associateDataProducts(SelectedProducts const& iProducts, + RNTupleModel const& iModel) { + std::vector ret; + ret.reserve(iProducts.size()); + for (auto const& prod : iProducts) { + ret.emplace_back(prod.second, prod.first, iModel.GetToken(fixBranchName(prod.first->branchName()))); + } + return ret; + } + + std::unique_ptr RNTupleOutputFile::setupCommonModels(SelectedProducts const& iProducts, + std::string const& iAuxName, + std::string const& iAuxType) { + auto model = ROOT::Experimental::RNTupleModel::CreateBare(); + { + auto field = ROOT::Experimental::RFieldBase::Create(iAuxName, iAuxType).Unwrap(); + model->AddField(std::move(field)); + } + const std::vector streamerNothing; + const std::vector unsplitNothing; + setupDataProducts(iProducts, streamerNothing, unsplitNothing, *model); + return model; + } + + void RNTupleOutputFile::setupRuns(SelectedProducts const& iProducts, Config const& iConfig) { + std::string kRunAuxName = "RunAuxiliary"; + { + auto model = setupCommonModels(iProducts, "RunAuxiliary", "edm::RunAuxiliary"); + + auto writeOptions = ROOT::Experimental::RNTupleWriteOptions(); + writeOptions.SetCompression(convert(iConfig.compressionAlgo), iConfig.compressionLevel); + runs_ = ROOT::Experimental::RNTupleWriter::Append(std::move(model), "Runs", file_, writeOptions); + } + products_[InRun] = associateDataProducts(iProducts, runs_->GetModel()); + runAuxField_ = runs_->GetModel().GetToken(kRunAuxName); + } + void RNTupleOutputFile::setupLumis(SelectedProducts const& iProducts, Config const& iConfig) { + std::string kLumiAuxName = "LuminosityBlockAuxiliary"; + { + auto model = setupCommonModels(iProducts, "LuminosityBlockAuxiliary", "edm::LuminosityBlockAuxiliary"); + + auto writeOptions = ROOT::Experimental::RNTupleWriteOptions(); + writeOptions.SetCompression(convert(iConfig.compressionAlgo), iConfig.compressionLevel); + lumis_ = ROOT::Experimental::RNTupleWriter::Append(std::move(model), "LuminosityBlocks", file_, writeOptions); + } + products_[InLumi] = associateDataProducts(iProducts, lumis_->GetModel()); + lumiAuxField_ = lumis_->GetModel().GetToken(kLumiAuxName); + } + void RNTupleOutputFile::setupEvents(SelectedProducts const& iProducts, + Config const& iConfig, + bool anyProductProduced) { + std::string kEventAuxName = "EventAuxiliary"; + std::string kEventProvName = "EventProductProvenance"; + std::string kEventSelName = "EventSelections"; + std::string kBranchListName = "BranchListIndexes"; + { + auto model = ROOT::Experimental::RNTupleModel::CreateBare(); + { + auto field = ROOT::Experimental::RFieldBase::Create(kEventAuxName, "edm::EventAuxiliary").Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = + ROOT::Experimental::RFieldBase::Create(kEventProvName, "std::vector").Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = ROOT::Experimental::RFieldBase::Create(kEventSelName, "std::vector >").Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = ROOT::Experimental::RFieldBase::Create(kBranchListName, "std::vector").Unwrap(); + model->AddField(std::move(field)); + } + setupDataProducts(iProducts, iConfig.streamerProduct, iConfig.doNotSplitSubFields, *model); + + auto writeOptions = ROOT::Experimental::RNTupleWriteOptions(); + writeOptions.SetCompression(convert(iConfig.compressionAlgo), iConfig.compressionLevel); + writeOptions.SetApproxZippedClusterSize(iConfig.approxZippedClusterSize); + writeOptions.SetMaxUnzippedClusterSize(iConfig.maxUnzippedClusterSize); + writeOptions.SetInitialUnzippedPageSize(iConfig.initialUnzippedPageSize); + writeOptions.SetMaxUnzippedPageSize(iConfig.maxUnzippedPageSize); + writeOptions.SetPageBufferBudget(iConfig.pageBufferBudget); + writeOptions.SetUseBufferedWrite(iConfig.useBufferedWrite); + writeOptions.SetUseDirectIO(iConfig.useDirectIO); + events_ = ROOT::Experimental::RNTupleWriter::Append(std::move(model), "Events", file_, writeOptions); + } + products_[InEvent] = associateDataProducts(iProducts, events_->GetModel()); + + eventAuxField_ = events_->GetModel().GetToken(kEventAuxName); + eventProvField_ = events_->GetModel().GetToken(kEventProvName); + eventSelField_ = events_->GetModel().GetToken(kEventSelName); + branchListField_ = events_->GetModel().GetToken(kBranchListName); + + // Note: The EventSelectionIDVector should have a one to one correspondence with the processes in the process history. + // Therefore, a new entry should be added if and only if the current process has been added to the process history, + // which is done if and only if there is a produced product. + extendSelectorConfig_ = anyProductProduced || !iConfig.wantAllEvents; + } + void RNTupleOutputFile::setupPSets(Config const& iConfig) { + auto model = ROOT::Experimental::RNTupleModel::CreateBare(); + { + auto field = ROOT::Experimental::RFieldBase::Create("IdToParameterSetsBlobs", + "std::pair,edm::ParameterSetBlob>") + .Unwrap(); + model->AddField(std::move(field)); + } + auto writeOptions = ROOT::Experimental::RNTupleWriteOptions(); + writeOptions.SetCompression(convert(iConfig.compressionAlgo), iConfig.compressionLevel); + parameterSets_ = ROOT::Experimental::RNTupleWriter::Append(std::move(model), "ParameterSets", file_, writeOptions); + } + + void RNTupleOutputFile::fillPSets() { + std::pair idToBlob; + + auto rentry = parameterSets_->CreateEntry(); + rentry->BindRawPtr("IdToParameterSetsBlobs", static_cast(&idToBlob)); + + for (auto const& pset : *pset::Registry::instance()) { + idToBlob.first = pset.first; + idToBlob.second.pset() = pset.second.toString(); + + parameterSets_->Fill(*rentry); + } + } + + void RNTupleOutputFile::setupParentage(Config const& iConfig) { + auto model = ROOT::Experimental::RNTupleModel::CreateBare(); + { + auto field = ROOT::Experimental::RFieldBase::Create("Description", "edm::Parentage").Unwrap(); + model->AddField(std::move(field)); + } + auto writeOptions = ROOT::Experimental::RNTupleWriteOptions(); + writeOptions.SetCompression(convert(iConfig.compressionAlgo), iConfig.compressionLevel); + parentage_ = ROOT::Experimental::RNTupleWriter::Append(std::move(model), "Parentage", file_, writeOptions); + } + void RNTupleOutputFile::fillParentage() { + ParentageRegistry& ptReg = *ParentageRegistry::instance(); + + std::vector orderedIDs(parentageIDs_.size()); + for (auto const& parentageID : parentageIDs_) { + orderedIDs[parentageID.second] = parentageID.first; + } + + auto rentry = parentage_->CreateEntry(); + //now put them into the RNTuple in the correct order + for (auto const& orderedID : orderedIDs) { + auto desc = ptReg.getMapped(orderedID); + rentry->BindRawPtr("Description", const_cast(static_cast(desc))); + parentage_->Fill(*rentry); + } + } + + void RNTupleOutputFile::setupMetaData(Config const& iConfig) { + auto model = ROOT::Experimental::RNTupleModel::CreateBare(); + { + //Ultimately will need a new class specific for RNTuple + //auto field = ROOT::Experimental::RFieldBase::Create("FileFormatVersion", "edm::FileFormatVersion").Unwrap(); + //model->AddField(std::move(field)); + } + { + auto field = ROOT::Experimental::RFieldBase::Create("FileIdentifier", "edm::FileID").Unwrap(); + model->AddField(std::move(field)); + } + + { + auto field = ROOT::Experimental::RFieldBase::Create("IndexIntoFile", "edm::IndexIntoFile").Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = ROOT::Experimental::RFieldBase::Create("MergeableRunProductMetadata", + "edm::StoredMergeableRunProductMetadata") + .Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = + ROOT::Experimental::RFieldBase::Create("ProcessHistory", "std::vector").Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = ROOT::Experimental::RFieldBase::Create("ProductRegistry", "edm::ProductRegistry").Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = + ROOT::Experimental::RFieldBase::Create("BranchIDLists", "std::vector >").Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = + ROOT::Experimental::RFieldBase::Create("ThinnedAssociationsHelper", "edm::ThinnedAssociationsHelper").Unwrap(); + model->AddField(std::move(field)); + } + { + auto field = ROOT::Experimental::RFieldBase::Create("ProductDependencies", "edm::BranchChildren").Unwrap(); + model->AddField(std::move(field)); + } + + auto writeOptions = ROOT::Experimental::RNTupleWriteOptions(); + writeOptions.SetCompression(convert(iConfig.compressionAlgo), iConfig.compressionLevel); + metaData_ = ROOT::Experimental::RNTupleWriter::Append(std::move(model), "MetaData", file_, writeOptions); + } + + void RNTupleOutputFile::fillMetaData(BranchIDLists const& iBranchIDLists, + ThinnedAssociationsHelper const& iThinnedHelper, + ProductRegistry const& iReg) { + auto rentry = metaData_->CreateEntry(); + + FileID id(createGlobalIdentifier()); + + rentry->BindRawPtr("FileIdentifier", &id); + + indexIntoFile_.sortVector_Run_Or_Lumi_Entries(); + rentry->BindRawPtr("IndexIntoFile", &indexIntoFile_); + + ProcessHistoryVector procHistoryVector; + for (auto const& ph : processHistoryRegistry_) { + procHistoryVector.push_back(ph.second); + } + rentry->BindRawPtr("ProcessHistory", &procHistoryVector); + + // Make a local copy of the ProductRegistry, removing any transient or pruned products. + using ProductList = ProductRegistry::ProductList; + ProductRegistry pReg(iReg.productList()); + ProductList& pList = const_cast(pReg.productList()); + for (auto const& prod : pList) { + if (prod.second.branchID() != prod.second.originalBranchID()) { + if (branchesWithStoredHistory_.find(prod.second.branchID()) != branchesWithStoredHistory_.end()) { + branchesWithStoredHistory_.insert(prod.second.originalBranchID()); + } + } + } + std::set::iterator end = branchesWithStoredHistory_.end(); + for (ProductList::iterator it = pList.begin(); it != pList.end();) { + if (branchesWithStoredHistory_.find(it->second.branchID()) == end) { + // avoid invalidating iterator on deletion + ProductList::iterator itCopy = it; + ++it; + pList.erase(itCopy); + + } else { + ++it; + } + } + + rentry->BindRawPtr("ProductRegistry", &pReg); + rentry->BindRawPtr("ThinnedAssociationsHelper", const_cast(static_cast(&iThinnedHelper))); + rentry->BindRawPtr("BranchIDLists", const_cast(static_cast(&iBranchIDLists))); + rentry->BindRawPtr("ProductDependencies", const_cast(static_cast(&productDependencies_))); + + metaData_->Fill(*rentry); + } + + void RNTupleOutputFile::openFile(FileBlock const& fb) { + auto const& branchToChildMap = fb.productDependencies().childLookup(); + for (auto const& parentToChildren : branchToChildMap) { + for (auto const& child : parentToChildren.second) { + productDependencies_.insertChild(parentToChildren.first, child); + } + } + } + + void RNTupleOutputFile::reallyCloseFile(BranchIDLists const& iBranchIDLists, + ThinnedAssociationsHelper const& iThinnedHelper, + ProductRegistry const& iReg) { + fillPSets(); + fillParentage(); + fillMetaData(iBranchIDLists, iThinnedHelper, iReg); + } + + RNTupleOutputFile::~RNTupleOutputFile() {} + + std::vector> RNTupleOutputFile::writeDataProducts(std::vector const& iProducts, + OccurrenceForOutput const& iOccurence, + REntry& iEntry) { + std::vector> dummies; + + for (auto const& p : iProducts) { + auto h = iOccurence.getByToken(p.get_, p.desc_->unwrappedTypeID()); + auto product = h.wrapper(); + if (nullptr == product) { + // No product with this ID is in the event. + // Add a null product. + TClass* cp = p.desc_->wrappedType().getClass(); + assert(cp != nullptr); + int offset = cp->GetBaseClassOffset(wrapperBaseTClass_); + void* p = cp->New(); + std::unique_ptr dummy = getWrapperBasePtr(p, offset); + product = dummy.get(); + dummies.emplace_back(std::move(dummy)); + } + iEntry.BindRawPtr(p.field_, const_cast(static_cast(product))); + } + return dummies; + } + + std::vector RNTupleOutputFile::writeDataProductProvenance( + std::vector const& iProducts, EventForOutput const& iEvent) { + std::set provenanceToKeep; + + if (not dropMetaData_) { + for (auto const& p : iProducts) { + auto h = iEvent.getByToken(p.get_, p.desc_->unwrappedTypeID()); + if (h.isValid()) { + auto prov = h.provenance()->productProvenance(); + if (not prov) { + prov = iEvent.productProvenanceRetrieverPtr()->branchIDToProvenance(p.desc_->originalBranchID()); + } + if (prov) { + insertProductProvenance(*prov, provenanceToKeep); + } + } + } + } + return std::vector(provenanceToKeep.begin(), provenanceToKeep.end()); + } + + bool RNTupleOutputFile::insertProductProvenance(ProductProvenance const& iProv, + std::set& oToInsert) { + StoredProductProvenance toStore; + toStore.branchID_ = iProv.branchID().id(); + auto itFound = oToInsert.find(toStore); + if (itFound == oToInsert.end()) { + //get the index to the ParentageID or insert a new value if not already present + auto i = parentageIDs_.emplace(iProv.parentageID(), static_cast(parentageIDs_.size())); + toStore.parentageIDIndex_ = i.first->second; + if (toStore.parentageIDIndex_ >= parentageIDs_.size()) { + throw edm::Exception(errors::LogicError) + << "RNTupleOutputFile::insertProductProvenance\n" + << "The parentage ID index value " << toStore.parentageIDIndex_ + << " is out of bounds. The maximum value is currently " << parentageIDs_.size() - 1 << ".\n" + << "This should never happen.\n" + << "Please report this to the framework developers."; + } + + oToInsert.insert(toStore); + return true; + } + return false; + } + + void RNTupleOutputFile::insertAncestorsProvenance(ProductProvenance const& iProv, + ProductProvenanceRetriever const& iMapper, + std::set& oToKeep) { + std::vector const& parentIDs = iProv.parentage().parents(); + for (auto const& parentID : parentIDs) { + branchesWithStoredHistory_.insert(parentID); + ProductProvenance const* info = iMapper.branchIDToProvenance(parentID); + if (info) { + if (insertProductProvenance(*info, oToKeep)) { + //haven't seen this one yet + insertAncestorsProvenance(*info, iMapper, oToKeep); + } + } + } + } + + void RNTupleOutputFile::write(EventForOutput const& e) { + { + auto rentry = events_->CreateEntry(); + rentry->BindRawPtr(*eventAuxField_, const_cast(static_cast(&(e.eventAuxiliary())))); + rentry->BindRawPtr(*eventSelField_, const_cast(static_cast(&(e.eventSelectionIDs())))); + rentry->BindRawPtr(*branchListField_, const_cast(static_cast(&(e.branchListIndexes())))); + + EventSelectionIDVector esids = e.eventSelectionIDs(); + if (extendSelectorConfig_) { + esids.push_back(selectorConfig_); + } + rentry->BindRawPtr(*eventSelField_, &esids); + + auto dummies = writeDataProducts(products_[InEvent], e, *rentry); + auto prov = writeDataProductProvenance(products_[InEvent], e); + rentry->BindRawPtr(*eventProvField_, &prov); + events_->Fill(*rentry); + } + + processHistoryRegistry_.registerProcessHistory(e.processHistory()); + // Store the reduced ID in the IndexIntoFile + ProcessHistoryID reducedPHID = processHistoryRegistry_.reducedProcessHistoryID(e.processHistoryID()); + // Add event to index + indexIntoFile_.addEntry(reducedPHID, e.run(), e.luminosityBlock(), e.event(), eventEntryNumber_); + ++eventEntryNumber_; + } + + void RNTupleOutputFile::writeLuminosityBlock(LuminosityBlockForOutput const& iLumi) { + { + auto rentry = lumis_->CreateEntry(); + rentry->BindRawPtr(*lumiAuxField_, + const_cast(static_cast(&(iLumi.luminosityBlockAuxiliary())))); + auto dummies = writeDataProducts(products_[InLumi], iLumi, *rentry); + lumis_->Fill(*rentry); + } + processHistoryRegistry_.registerProcessHistory(iLumi.processHistory()); + // Store the reduced ID in the IndexIntoFile + ProcessHistoryID reducedPHID = processHistoryRegistry_.reducedProcessHistoryID(iLumi.processHistoryID()); + // Add lumi to index. + indexIntoFile_.addEntry(reducedPHID, iLumi.run(), iLumi.luminosityBlock(), 0U, lumiEntryNumber_); + ++lumiEntryNumber_; + } + + void RNTupleOutputFile::writeRun(RunForOutput const& iRun) { + { + auto rentry = runs_->CreateEntry(); + rentry->BindRawPtr(*runAuxField_, const_cast(static_cast(&(iRun.runAuxiliary())))); + auto dummies = writeDataProducts(products_[InRun], iRun, *rentry); + runs_->Fill(*rentry); + } + processHistoryRegistry_.registerProcessHistory(iRun.processHistory()); + // Store the reduced ID in the IndexIntoFile + ProcessHistoryID reducedPHID = processHistoryRegistry_.reducedProcessHistoryID(iRun.processHistoryID()); + // Add run to index. + indexIntoFile_.addEntry(reducedPHID, iRun.run(), 0U, 0U, runEntryNumber_); + ++runEntryNumber_; + } + +} // namespace edm diff --git a/FWIO/RNTuple/plugins/RNTupleOutputFile.h b/FWIO/RNTuple/plugins/RNTupleOutputFile.h new file mode 100644 index 0000000000000..b2ec160be1c35 --- /dev/null +++ b/FWIO/RNTuple/plugins/RNTupleOutputFile.h @@ -0,0 +1,149 @@ +#if !defined(FWIO_RNTuple_RNTupleOutputFile_h) +#define FWIO_RNTuple_RNTupleOutputFile_h + +#include "FWCore/Framework/interface/EventForOutput.h" +#include "FWCore/Framework/interface/FileBlock.h" + +#include "FWCore/ParameterSet/interface/Registry.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/Utilities/interface/GlobalIdentifier.h" + +#include "DataFormats/Provenance/interface/ParentageRegistry.h" +#include "DataFormats/Provenance/interface/FileID.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "DataFormats/Provenance/interface/Provenance.h" +#include "DataFormats/Provenance/interface/IndexIntoFile.h" +#include "DataFormats/Provenance/interface/ProductDependencies.h" +#include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h" +#include "DataFormats/Provenance/interface/SelectedProducts.h" +#include "DataFormats/Provenance/interface/StoredProductProvenance.h" + +#include "TFile.h" +#include "ROOT/RNTuple.hxx" +#include "ROOT/RNTupleWriter.hxx" +#include +#include +#include +#include +#include + +using namespace ROOT::Experimental; +namespace edm { + namespace rntuple { + enum class CompressionAlgos { kLZMA, kZSTD, kZLIB, kLZ4 }; + } + + class RNTupleOutputFile { + public: + struct Config { + ParameterSetID selectorConfig; + std::vector streamerProduct; + std::vector doNotSplitSubFields; + rntuple::CompressionAlgos compressionAlgo = rntuple::CompressionAlgos::kZSTD; + int compressionLevel = 4; + unsigned long long approxZippedClusterSize; + unsigned long long maxUnzippedClusterSize; + unsigned long long initialUnzippedPageSize; + unsigned long long maxUnzippedPageSize; + unsigned long long pageBufferBudget; + bool useBufferedWrite; + bool useDirectIO; + + bool wantAllEvents; + bool dropMetaData = false; + }; + + explicit RNTupleOutputFile(std::string const& iFileName, + FileBlock const& iFileBlock, + SelectedProductsForBranchType const& iSelected, + Config const&, + bool anyProductProduced); + ~RNTupleOutputFile(); + + void write(EventForOutput const& e); + void writeLuminosityBlock(LuminosityBlockForOutput const&); + void writeRun(RunForOutput const&); + void reallyCloseFile(BranchIDLists const& iBranchIDLists, + ThinnedAssociationsHelper const& iThinnedHelper, + ProductRegistry const& iReg); + void openFile(FileBlock const& fb); + + struct Product { + Product(EDGetToken iGet, ProductDescription const* iDesc, REntry::RFieldToken iField) + : get_(iGet), desc_(iDesc), field_(iField) {} + + EDGetToken get_; + ProductDescription const* desc_; + REntry::RFieldToken field_; + }; + + private: + void setupRuns(SelectedProducts const&, Config const&); + void setupLumis(SelectedProducts const&, Config const&); + std::unique_ptr setupCommonModels(SelectedProducts const&, + std::string const& iAuxName, + std::string const& iAuxType); + void setupEvents(SelectedProducts const&, Config const&, bool anyProductProduced); + void setupPSets(Config const&); + void setupParentage(Config const&); + void setupMetaData(Config const&); + + void fillPSets(); + void fillParentage(); + void fillMetaData(BranchIDLists const& iBranchIDLists, + ThinnedAssociationsHelper const& iThinnedHelper, + ProductRegistry const&); + + void setupDataProducts(SelectedProducts const&, + std::vector const&, + std::vector const&, + RNTupleModel&); + //Can't call until the model is frozen + std::vector associateDataProducts(SelectedProducts const&, RNTupleModel const&); + + std::vector> writeDataProducts(std::vector const& iProduct, + OccurrenceForOutput const& iOccurence, + REntry&); + std::vector writeDataProductProvenance(std::vector const& iProduct, + EventForOutput const& iEvent); + bool insertProductProvenance(ProductProvenance const& iProv, std::set& oToKeep); + void insertAncestorsProvenance(ProductProvenance const& iProv, + ProductProvenanceRetriever const&, + std::set& oToKeep); + TFile file_; + std::unique_ptr events_; + std::optional eventAuxField_; + std::optional eventProvField_; + std::optional eventSelField_; + std::optional branchListField_; + + std::unique_ptr runs_; + std::optional runAuxField_; + + std::unique_ptr lumis_; + std::optional lumiAuxField_; + + std::unique_ptr parameterSets_; + std::unique_ptr parentage_; + std::unique_ptr metaData_; + + std::map parentageIDs_; + ProcessHistoryRegistry processHistoryRegistry_; + std::set branchesWithStoredHistory_; + + IndexIntoFile::EntryNumber_t eventEntryNumber_ = 0LL; + IndexIntoFile::EntryNumber_t lumiEntryNumber_ = 0LL; + IndexIntoFile::EntryNumber_t runEntryNumber_ = 0LL; + IndexIntoFile indexIntoFile_; + ProductDependencies productDependencies_; + + std::array, NumBranchTypes> products_; + TClass const* wrapperBaseTClass_; + + ParameterSetID selectorConfig_; + bool extendSelectorConfig_ = true; + bool dropMetaData_ = false; + }; +} // namespace edm +#endif diff --git a/FWIO/RNTuple/plugins/RNTupleOutputModule.cc b/FWIO/RNTuple/plugins/RNTupleOutputModule.cc new file mode 100644 index 0000000000000..72df372590542 --- /dev/null +++ b/FWIO/RNTuple/plugins/RNTupleOutputModule.cc @@ -0,0 +1,261 @@ +#include "FWCore/Framework/interface/one/OutputModule.h" +#include "FWCore/Framework/interface/EventForOutput.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/Framework/interface/FileBlock.h" + +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ParameterSet/interface/Registry.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/Utilities/interface/GlobalIdentifier.h" + +#include "DataFormats/Provenance/interface/ParentageRegistry.h" +#include "DataFormats/Provenance/interface/ProductRegistry.h" +#include "DataFormats/Provenance/interface/FileID.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "DataFormats/Provenance/interface/Provenance.h" +#include "DataFormats/Provenance/interface/IndexIntoFile.h" +#include "DataFormats/Provenance/interface/BranchChildren.h" + +#include "RNTupleOutputFile.h" + +#include +#include +#include +#include "boost/algorithm/string.hpp" + +using namespace ROOT::Experimental; + +namespace { + edm::rntuple::CompressionAlgos convertTo(std::string const& iName) { + if (iName == "LZMA") { + return edm::rntuple::CompressionAlgos::kLZMA; + } + if (iName == "ZSTD") { + return edm::rntuple::CompressionAlgos::kZSTD; + } + if (iName == "LZ4") { + return edm::rntuple::CompressionAlgos::kLZ4; + } + if (iName == "ZLIB") { + return edm::rntuple::CompressionAlgos::kZLIB; + } + throw cms::Exception("UnknownCompression") << "An unknown compression algorithm was specified: " << iName; + } + + struct SetStreamerForDataProduct { + SetStreamerForDataProduct(std::string const& iName, bool iUseStreamer) + : branch_(convert(iName)), useStreamer_(iUseStreamer) {} + bool match(std::string const& iName) const; + std::regex convert(std::string const& iGlobBranchExpression) const; + + std::regex branch_; + bool useStreamer_; + }; + + inline bool SetStreamerForDataProduct::match(std::string const& iBranchName) const { + return std::regex_match(iBranchName, branch_); + } + + std::regex SetStreamerForDataProduct::convert(std::string const& iGlobBranchExpression) const { + std::string tmp(iGlobBranchExpression); + boost::replace_all(tmp, "*", ".*"); + boost::replace_all(tmp, "?", "."); + return std::regex(tmp); + } + + std::vector fromConfig(std::vector const& iConfig) { + std::vector returnValue; + returnValue.reserve(iConfig.size()); + + for (auto const& prod : iConfig) { + returnValue.emplace_back(prod.getUntrackedParameter("product"), + prod.getUntrackedParameter("useStreamer")); + } + return returnValue; + } + + std::optional useStreamer(std::string const& iName, std::vector const& iSpecial) { + auto nameNoDot = iName.substr(0, iName.size() - 1); + for (auto const& prod : iSpecial) { + if (prod.match(nameNoDot)) { + return prod.useStreamer_; + } + } + return {}; + } + +} // namespace + +namespace edm { + + class RNTupleOutputModule : public one::OutputModule<> { + public: + explicit RNTupleOutputModule(ParameterSet const& pset); + ~RNTupleOutputModule() final; + static void fillDescriptions(ConfigurationDescriptions& descriptions); + + private: + void write(EventForOutput const& e) final; + void writeLuminosityBlock(LuminosityBlockForOutput const&) final; + void writeRun(RunForOutput const&) final; + void reallyCloseFile() final; + void openFile(FileBlock const& fb) final; + void initialRegistry(edm::ProductRegistry const& iReg) final; + std::string fileName_; + std::unique_ptr reg_; + std::unique_ptr file_; + std::vector overrideStreamer_; + std::vector noSplitSubFields_; + rntuple::CompressionAlgos compressionAlgo_; + unsigned int compressionLevel_; + unsigned long long approxZippedClusterSize_; + unsigned long long maxUnzippedClusterSize_; + unsigned long long initialUnzippedPageSize_; + unsigned long long maxUnzippedPageSize_; + unsigned long long pageBufferBudget_; + bool useBufferedWrite_; + bool useDirectIO_; + bool dropMetaData_; + bool useStreamer_; + }; + + RNTupleOutputModule::RNTupleOutputModule(ParameterSet const& pset) + : one::OutputModuleBase(pset), + one::OutputModule<>(pset), + fileName_(pset.getUntrackedParameter("fileName")), + overrideStreamer_( + fromConfig(pset.getUntrackedParameter>("overrideDataProductStreamer"))), + noSplitSubFields_(pset.getUntrackedParameter>("noSplitSubFields")), + compressionAlgo_(convertTo(pset.getUntrackedParameter("compressionAlgorithm"))), + compressionLevel_(pset.getUntrackedParameter("compressionLevel")), + approxZippedClusterSize_(pset.getUntrackedParameter("approxZippedClusterSize")), + maxUnzippedClusterSize_(pset.getUntrackedParameter("maxUnzippedClusterSize")), + initialUnzippedPageSize_(pset.getUntrackedParameter("initialUnzippedPageSize")), + maxUnzippedPageSize_(pset.getUntrackedParameter("maxUnzippedPageSize")), + pageBufferBudget_(pset.getUntrackedParameter("pageBufferBudget")), + useBufferedWrite_(pset.getUntrackedParameter("useBufferedWrite")), + useDirectIO_(pset.getUntrackedParameter("useDirectIO")), + dropMetaData_(pset.getUntrackedParameter("dropPerEventDataProductProvenance")), + useStreamer_(pset.getUntrackedParameter("useStreamer")) {} + + void RNTupleOutputModule::openFile(FileBlock const& fb) { + RNTupleOutputFile::Config conf; + conf.wantAllEvents = wantAllEvents(); + conf.selectorConfig = selectorConfig(); + conf.compressionAlgo = compressionAlgo_; + conf.compressionLevel = compressionLevel_; + conf.approxZippedClusterSize = approxZippedClusterSize_; + conf.maxUnzippedClusterSize = maxUnzippedClusterSize_; + conf.initialUnzippedPageSize = initialUnzippedPageSize_; + conf.maxUnzippedPageSize = maxUnzippedPageSize_; + conf.pageBufferBudget = pageBufferBudget_; + conf.useBufferedWrite = useBufferedWrite_; + conf.useDirectIO = useDirectIO_; + conf.dropMetaData = dropMetaData_; + conf.doNotSplitSubFields = noSplitSubFields_; + if (useStreamer_ and overrideStreamer_.empty()) { + auto const& prods = keptProducts()[InEvent]; + conf.streamerProduct = std::vector(prods.size(), true); + } else if (not overrideStreamer_.empty()) { + auto const& prods = keptProducts()[InEvent]; + conf.streamerProduct = std::vector(prods.size(), useStreamer_); + unsigned int index = 0; + for (auto const& prod : prods) { + auto choice = useStreamer(prod.first->branchName(), overrideStreamer_); + if (choice) { + if (*choice != useStreamer_) { + conf.streamerProduct[index] = *choice; + } + } + ++index; + } + } + assert(reg_); + file_ = std::make_unique(fileName_, fb, keptProducts(), conf, reg_->anyProductProduced()); + } + + void RNTupleOutputModule::initialRegistry(edm::ProductRegistry const& iReg) { + reg_ = std::make_unique(iReg.productList()); + } + + void RNTupleOutputModule::reallyCloseFile() { + if (file_) { + assert(reg_); + file_->reallyCloseFile(*branchIDLists(), *thinnedAssociationsHelper(), *reg_); + } + } + + RNTupleOutputModule::~RNTupleOutputModule() = default; + + void RNTupleOutputModule::write(EventForOutput const& e) { file_->write(e); } + + void RNTupleOutputModule::writeLuminosityBlock(LuminosityBlockForOutput const& iLumi) { + file_->writeLuminosityBlock(iLumi); + } + + void RNTupleOutputModule::writeRun(RunForOutput const& iRun) { file_->writeRun(iRun); } + + void RNTupleOutputModule::fillDescriptions(ConfigurationDescriptions& descriptions) { + ParameterSetDescription desc; + desc.setComment("Outputs event information into an RNTuple container."); + desc.addUntracked("fileName")->setComment("RNTuple file to read"); + desc.addUntracked("compressionAlgorithm", "ZSTD") + ->setComment( + "Algorithm used to compress data in the ROOT output file, allowed values are ZLIB, LZMA, LZ4, and ZSTD"); + desc.addUntracked("compressionLevel", 4)->setComment("ROOT compression level of output file."); + ROOT::Experimental::RNTupleWriteOptions ops; + desc.addUntracked("approxZippedClusterSize", ops.GetApproxZippedClusterSize()) + ->setComment("Approximation of the target compressed cluster size"); + desc.addUntracked("maxUnzippedClusterSize", ops.GetMaxUnzippedClusterSize()) + ->setComment("Memory limit for committing a cluster. High compression leads to high IO buffer size."); + + desc.addUntracked("initialUnzippedPageSize", ops.GetInitialUnzippedPageSize()) + ->setComment("Initially, columns start with a page of this size (bytes)."); + desc.addUntracked("maxUnzippedPageSize", ops.GetMaxUnzippedPageSize()) + ->setComment("Pages can grow only to the given limit (bytes)."); + desc.addUntracked("pageBufferBudget", ops.GetPageBufferBudget()) + ->setComment( + "The maximum size that the sum of all page buffers used for writing into a persistent sink are allowed to " + "use." + " If set to zero, RNTuple will auto-adjust the budget based on the value of 'approxZippedClusterSize'." + " If set manually, the size needs to be large enough to hold all initial page buffers."); + + desc.addUntracked("useBufferedWrite", ops.GetUseBufferedWrite()) + ->setComment( + "Turn on use of buffered writing. This buffers compressed pages in memory, reorders them to keep pages of " + "the same column adjacent, and coalesces the writes when committing a cluster."); + desc.addUntracked("useDirectIO", ops.GetUseDirectIO()) + ->setComment( + "Set use of direct IO. this introduces alignment requirements that may vary between filesystems and " + "platforms"); + + desc.addUntracked("dropPerEventDataProductProvenance", false) + ->setComment( + "do not store which data products were consumed to create a given data product for a given event."); + + desc.addUntracked>("noSplitSubFields", {}) + ->setComment( + "fully qualified subfield names for fields which should not be split. A single value of 'all' means all " + "possible subfields will be unsplit"); + desc.addUntracked("useStreamer", false) + ->setComment("Use streamer storage for top level fields when storing data products"); + + { + ParameterSetDescription specialStreamer; + specialStreamer.addUntracked("product")->setComment( + "Name of data product needing a special split setting. The name can contain wildcards '*' and '?'"); + specialStreamer.addUntracked("useStreamer", true) + ->setComment("Explicitly set if should or should not use streamer (default is to use streamer)"); + desc.addVPSetUntracked("overrideDataProductStreamer", specialStreamer, std::vector()); + } + + OutputModule::fillDescription(desc); + descriptions.addDefault(desc); + } +} // namespace edm + +using edm::RNTupleOutputModule; +DEFINE_FWK_MODULE(RNTupleOutputModule); diff --git a/FWIO/RNTuple/plugins/RNTupleSource.cc b/FWIO/RNTuple/plugins/RNTupleSource.cc new file mode 100644 index 0000000000000..dbec134db6931 --- /dev/null +++ b/FWIO/RNTuple/plugins/RNTupleSource.cc @@ -0,0 +1,328 @@ +#include "DataFormats/Provenance/interface/BranchType.h" +#include "DataFormats/Provenance/interface/BranchIDListHelper.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "DataFormats/Provenance/interface/StoredProductProvenance.h" +#include "FWCore/Catalog/interface/InputFileCatalog.h" +#include "FWCore/Framework/interface/Frameworkfwd.h" +#include "FWCore/Framework/interface/ProcessingController.h" +#include "FWCore/Framework/interface/ProductSelectorRules.h" +#include "FWCore/Framework/interface/InputSource.h" +#include "FWCore/Framework/interface/SharedResourcesAcquirer.h" +#include "FWCore/Framework/interface/InputSourceMacros.h" +#include "FWCore/Framework/interface/EventPrincipal.h" +#include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" +#include "FWCore/Framework/interface/RunPrincipal.h" +#include "FWCore/Framework/interface/SharedResourcesRegistry.h" +#include "FWCore/Framework/interface/InputSourceDescription.h" +#include "FWCore/Framework/interface/PreallocationConfiguration.h" +#include "FWCore/Framework/interface/FileBlock.h" +#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" + +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" + +#include "FWCore/Utilities/interface/propagate_const.h" + +#include "RNTupleInputFile.h" +#include "RNTupleDelayedReader.h" +#include + +namespace edm { + + namespace { + class ProvenanceReader : public edm::ProvenanceReaderBase { + public: + ProvenanceReader(edm::input::DataProductsRNTuple* iRNTuple, + std::vector const* iParentageIDLookup, + std::vector::const_iterator iStreamToEntryBegin, + SharedResourcesAcquirer* iAcquirer, + std::recursive_mutex* iMutex) + : rntuple_(iRNTuple), + acquirer_(iAcquirer), + mutex_(iMutex), + parentageIDLookup_(iParentageIDLookup), + entryBegin_(iStreamToEntryBegin), + descriptor_(rntuple_->findDescriptorID("EventProductProvenance")) {} + + std::set readProvenance(unsigned int transitionIndex) const final; + void readProvenanceAsync(WaitingTaskHolder task, + ModuleCallingContext const* moduleCallingContext, + unsigned int transitionIndex, + std::atomic*>& writeTo) const noexcept final; + + public: + edm::input::DataProductsRNTuple* rntuple_; + SharedResourcesAcquirer* acquirer_; + std::recursive_mutex* mutex_; + std::vector const* parentageIDLookup_; + std::vector::const_iterator entryBegin_; + ROOT::Experimental::DescriptorId_t descriptor_; + }; + + std::set ProvenanceReader::readProvenance(unsigned int transitionIndex) const { + std::vector provVec; + { + std::shared_ptr> provPtr{&provVec, [](auto) {}}; + std::lock_guard guard(*mutex_); + auto v = rntuple_->viewFor(descriptor_, provPtr); + auto entry = *(entryBegin_ + transitionIndex); + v(entry); + } + + std::set retValue; + for (auto const& prov : provVec) { + if (prov.parentageIDIndex_ >= parentageIDLookup_->size()) { + throw edm::Exception(errors::LogicError) + << "RNTuple ProvenanceReader::ReadProvenance\n" + << "The parentage ID index value " << prov.parentageIDIndex_ + << " is out of bounds. The maximum value is " << parentageIDLookup_->size() - 1 << ".\n" + << "This should never happen.\n" + << "Please report this to the framework developers."; + } + retValue.emplace(BranchID(prov.branchID_), (*parentageIDLookup_)[prov.parentageIDIndex_]); + } + + return retValue; + } + + void ProvenanceReader::readProvenanceAsync(WaitingTaskHolder task, + ModuleCallingContext const* moduleCallingContext, + unsigned int transitionIndex, + std::atomic*>& writeTo) const noexcept { + if (nullptr == writeTo.load()) { + //need to be sure the task isn't run until after the read + WaitingTaskHolder taskHolder{task}; + auto pWriteTo = &writeTo; + + auto serviceToken = ServiceRegistry::instance().presentToken(); + + acquirer_->serialQueueChain().push( + *taskHolder.group(), + [holder = std::move(taskHolder), + pWriteTo, + this, + transitionIndex, + moduleCallingContext, + serviceToken]() mutable { + if (nullptr == pWriteTo->load()) { + ServiceRegistry::Operate operate(serviceToken); + std::unique_ptr> prov; + CMS_SA_ALLOW try { + prov = std::make_unique>(this->readProvenance(transitionIndex)); + } catch (...) { + holder.doneWaiting(std::current_exception()); + return; + } + const std::set* expected = nullptr; + + if (pWriteTo->compare_exchange_strong(expected, prov.get())) { + prov.release(); + } + } + holder.doneWaiting(std::exception_ptr()); + }); + } + } + } // namespace + class RNTupleSource : public InputSource { + public: + explicit RNTupleSource(ParameterSet const& pset, InputSourceDescription const& desc); + using InputSource::processHistoryRegistryForUpdate; + using InputSource::productRegistryUpdate; + + static void fillDescriptions(ConfigurationDescriptions& descriptions); + + private: + ItemTypeInfo getNextItemType() override; + void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override; + std::shared_ptr readLuminosityBlockAuxiliary_() override; + void readEvent_(EventPrincipal& eventPrincipal) override; + std::shared_ptr readRunAuxiliary_() override; + void readRun_(RunPrincipal& runPrincipal) override; + std::shared_ptr readFile_() override; + void closeFile_() override; + + std::pair resourceSharedWithDelayedReader_() override; + + std::unique_ptr + resourceSharedWithDelayedReaderPtr_; // We do not use propagate_const because the acquirer is itself mutable. + std::shared_ptr mutexSharedWithDelayedReader_; + + std::unique_ptr file_; + std::vector runReaders_; + std::vector lumiReaders_; + std::vector eventReaders_; + + std::vector parentageIDLookup_; + std::vector> provenanceRetrievers_; + std::vector entryForStream_; + + EventToProcessBlockIndexes processBlockIndexes_; + + ROOT::Experimental::DescriptorId_t eventSelectionsID_; + ROOT::Experimental::DescriptorId_t eventProductProvenanceID_; + ROOT::Experimental::DescriptorId_t eventBranchListIndexesID_; + bool enableMetrics_ = false; + bool useClusterCache_ = true; + bool startedFirstFile_ = false; + }; + + RNTupleSource::RNTupleSource(ParameterSet const& pset, InputSourceDescription const& desc) + : InputSource(pset, desc), + entryForStream_(std::size_t(desc.allocations_->numberOfStreams()), int(0)), + enableMetrics_(pset.getUntrackedParameter("enableMetrics")), + useClusterCache_(pset.getUntrackedParameter("useClusterCache")) { + auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader(); + resourceSharedWithDelayedReaderPtr_ = std::make_unique(std::move(resources.first)); + mutexSharedWithDelayedReader_ = resources.second; + + RNTupleInputFile::Options ops; + ops.enableMetrics_ = enableMetrics_; + ops.useClusterCache_ = useClusterCache_; + file_ = std::make_unique(pset.getUntrackedParameter("fileName"), ops); + + BranchIDLists branchIDLists; + file_->readMeta(productRegistryUpdate(), processHistoryRegistryForUpdate(), branchIDLists); + branchIDListHelper()->updateFromInput(branchIDLists); + + runReaders_.reserve(desc.allocations_->numberOfRuns()); + for (unsigned int i = 0; i < desc.allocations_->numberOfRuns(); ++i) { + runReaders_.emplace_back( + file_->runProducts(), resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get()); + } + lumiReaders_.reserve(desc.allocations_->numberOfLuminosityBlocks()); + for (unsigned int i = 0; i < desc.allocations_->numberOfLuminosityBlocks(); ++i) { + lumiReaders_.emplace_back(file_->luminosityBlockProducts(), + resourceSharedWithDelayedReaderPtr_.get(), + mutexSharedWithDelayedReader_.get()); + } + eventReaders_.reserve(desc.allocations_->numberOfStreams()); + for (unsigned int i = 0; i < desc.allocations_->numberOfStreams(); ++i) { + eventReaders_.emplace_back( + file_->eventProducts(), resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get()); + } + { + std::string kEventProvName = "EventProductProvenance"; + std::string kEventSelName = "EventSelections"; + std::string kBranchListName = "BranchListIndexes"; + + eventSelectionsID_ = file_->eventProducts()->findDescriptorID(kEventSelName); + eventProductProvenanceID_ = file_->eventProducts()->findDescriptorID(kEventProvName); + eventBranchListIndexesID_ = file_->eventProducts()->findDescriptorID(kBranchListName); + } + + parentageIDLookup_ = file_->readParentage(); + + provenanceRetrievers_.reserve(desc.allocations_->numberOfStreams()); + + for (unsigned int i = 0; i < desc.allocations_->numberOfStreams(); ++i) { + provenanceRetrievers_.push_back(std::make_shared( + std::make_unique(file_->eventProducts(), + &parentageIDLookup_, + entryForStream_.begin(), + resourceSharedWithDelayedReaderPtr_.get(), + mutexSharedWithDelayedReader_.get()))); + } + } + + void RNTupleSource::fillDescriptions(ConfigurationDescriptions& descriptions) { + ParameterSetDescription desc; + desc.addUntracked("fileName"); + desc.addUntracked("enableMetrics", false); + desc.addUntracked("useClusterCache", true); + + descriptions.addDefault(desc); + } + + InputSource::ItemTypeInfo RNTupleSource::getNextItemType() { + if (not startedFirstFile_) { + return InputSource::ItemType::IsFile; + } + auto entryType = file_->getNextItemType(); + switch (entryType) { + case IndexIntoFile::kEnd: + return InputSource::ItemType::IsStop; + case IndexIntoFile::kRun: + return InputSource::ItemType::IsRun; + case IndexIntoFile::kLumi: + return InputSource::ItemType::IsLumi; + case IndexIntoFile::kEvent: + return InputSource::ItemType::IsEvent; + default: + assert(false); + } + assert(false); + return InputSource::ItemType::IsStop; + } + + void RNTupleSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) { + auto history = processHistoryRegistry().getMapped(lumiPrincipal.aux().processHistoryID()); + auto entry = file_->readLuminosityBlock(); + auto& reader = lumiReaders_[lumiPrincipal.index()]; + reader.setEntry(entry); + lumiPrincipal.fillLuminosityBlockPrincipal(history, &reader); + } + std::shared_ptr RNTupleSource::readLuminosityBlockAuxiliary_() { + return file_->readLuminosityBlockAuxiliary(); + } + void RNTupleSource::readEvent_(EventPrincipal& eventPrincipal) { + auto aux = file_->readEventAuxiliary(); + + auto history = processHistoryRegistry().getMapped(aux->processHistoryID()); + auto entry = file_->readEvent(); + entryForStream_[eventPrincipal.streamID().value()] = entry; + auto& reader = eventReaders_[eventPrincipal.streamID().value()]; + reader.setEntry(entry); + + EventSelectionIDVector esids; + BranchListIndexes bli; + + { + std::shared_ptr pEsids{&esids, [](auto) {}}; + auto v = file_->eventProducts()->viewFor(eventSelectionsID_, pEsids); + v(entry); + } + { + std::shared_ptr pBli{&bli, [](auto) {}}; + auto v = file_->eventProducts()->viewFor(eventBranchListIndexesID_, pBli); + v(entry); + } + branchIDListHelper()->fixBranchListIndexes(bli); + + eventPrincipal.fillEventPrincipal(*aux, + history, + std::move(esids), + std::move(bli), + processBlockIndexes_, + *provenanceRetrievers_[eventPrincipal.streamID().value()], + &reader); + } + + std::shared_ptr RNTupleSource::readRunAuxiliary_() { return file_->readRunAuxiliary(); } + void RNTupleSource::readRun_(RunPrincipal& runPrincipal) { + auto entry = file_->readRun(); + auto& reader = runReaders_[runPrincipal.index()]; + reader.setEntry(entry); + runPrincipal.fillRunPrincipal(processHistoryRegistry(), &reader); + runPrincipal.setShouldWriteRun(RunPrincipal::kNo); + } + + std::pair RNTupleSource::resourceSharedWithDelayedReader_() { + return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get()); + } + + std::shared_ptr RNTupleSource::readFile_() { + startedFirstFile_ = true; + return std::make_shared(); + } + + void RNTupleSource::closeFile_() { + if (enableMetrics_) { + file_->printInfoForEvent(std::cout); + } + } + +} // namespace edm + +using edm::RNTupleSource; +DEFINE_FWK_INPUT_SOURCE(RNTupleSource); diff --git a/FWIO/RNTuple/test/BuildFile.xml b/FWIO/RNTuple/test/BuildFile.xml new file mode 100644 index 0000000000000..e112cb9b4650b --- /dev/null +++ b/FWIO/RNTuple/test/BuildFile.xml @@ -0,0 +1,4 @@ + + + + diff --git a/FWIO/RNTuple/test/test_catch2_main.cc b/FWIO/RNTuple/test/test_catch2_main.cc new file mode 100644 index 0000000000000..0c7c351f437f5 --- /dev/null +++ b/FWIO/RNTuple/test/test_catch2_main.cc @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include "catch.hpp" diff --git a/FWIO/RNTuple/test/test_catch2_output2input.cc b/FWIO/RNTuple/test/test_catch2_output2input.cc new file mode 100644 index 0000000000000..28b61b3eec690 --- /dev/null +++ b/FWIO/RNTuple/test/test_catch2_output2input.cc @@ -0,0 +1,224 @@ +#include "FWCore/TestProcessor/interface/TestProcessor.h" +#include "FWCore/TestProcessor/interface/TestSourceProcessor.h" +#include "FWCore/Utilities/interface/Exception.h" + +#include "DataFormats/TestObjects/interface/Thing.h" +#include "DataFormats/TestObjects/interface/OtherThing.h" +#include +#include +#include "catch.hpp" + +static constexpr auto s_tag = "[RNTupleOutputSource]"; + +namespace { + std::string setOutputFile(std::string const& iConfig, std::string const& iFileName) { + using namespace std::string_literals; + return iConfig + "\nprocess.out.fileName = '"s + iFileName + "'\n"; + } + + std::string setInputFile(std::string const& iConfig, std::string const& iFileName) { + using namespace std::string_literals; + return iConfig + "\nprocess.source.fileName = 'file:"s + iFileName + "'\n"; + } +} // namespace +TEST_CASE("Tests of RNTupleOuput -> RNTupleSource", s_tag) { + const std::string baseOutConfig{ + R"_(from FWCore.TestProcessor.TestProcess import * +process = TestProcess() +process.out = cms.OutputModule('RNTupleOutputModule', + fileName = cms.untracked.string('') +) +process.add_(cms.Service("InitRootHandlers")) +process.add_(cms.Service("JobReportService")) + +process.moduleToTest(process.out) +)_"}; + + const std::string baseSourceConfig{ + R"_(from FWCore.TestProcessor.TestSourceProcess import * +process = TestSourceProcess() +process.source = cms.Source("RNTupleSource", fileName = cms.untracked.string('')) +process.add_(cms.Service("InitRootHandlers")) +process.add_(cms.Service("SiteLocalConfigService")) +process.add_(cms.Service("JobReportService")) + )_"}; + + SECTION("OneEmptyEvent") { + const std::string fileName = "one_event.rntpl"; + { + auto configString = setOutputFile(baseOutConfig, fileName); + + edm::test::TestProcessor::Config config{configString}; + + edm::test::TestProcessor tester(config); + tester.test(); + } + { + auto config = setInputFile(baseSourceConfig, fileName); + edm::test::TestSourceProcessor tester(config); + + { + auto n = tester.findNextTransition(); + REQUIRE(n == edm::InputSource::ItemType::IsFile); + auto f = tester.openFile(); + REQUIRE(bool(f)); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsRun); + auto r = tester.readRun(); + REQUIRE(r.run() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsLumi); + auto r = tester.readLuminosityBlock(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + REQUIRE(r.event() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsStop); + } + } + std::filesystem::remove(fileName); + } + + SECTION("EventWithThing") { + const std::string fileName = "thing.rntpl"; + { + auto configString = setOutputFile(baseOutConfig, fileName); + + edm::test::TestProcessor::Config config{configString}; + auto thingToken = config.produces>("thing"); + + edm::test::TestProcessor tester(config); + tester.test(std::make_pair(thingToken, std::make_unique>(1, edmtest::Thing{1}))); + } + { + auto config = setInputFile(baseSourceConfig, fileName); + edm::test::TestSourceProcessor tester(config); + + { + auto n = tester.findNextTransition(); + REQUIRE(n == edm::InputSource::ItemType::IsFile); + auto f = tester.openFile(); + REQUIRE(bool(f)); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsRun); + auto r = tester.readRun(); + REQUIRE(r.run() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsLumi); + auto r = tester.readLuminosityBlock(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + REQUIRE(r.event() == 1); + auto v = r.get>("thing", "", "TEST"); + REQUIRE(v.isValid()); + REQUIRE(v->size() == 1); + REQUIRE((*v)[0].a == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsStop); + } + } + std::filesystem::remove(fileName); + } + + SECTION("EventWithRef") { + const std::string fileName = "ref.rntpl"; + { + auto configString = setOutputFile(baseOutConfig, fileName) + + "process.other = cms.EDProducer('OtherThingProducer', thingTag = cms.InputTag('thing'))\n" + "process.moduleToTest(process.out,cms.Task(process.other))\n"; + + edm::test::TestProcessor::Config config{configString}; + + auto thingToken = config.produces>("thing"); + + edm::test::TestProcessor tester(config); + tester.test(std::make_pair(thingToken, std::make_unique>(1, edmtest::Thing{1}))); + } + { + auto config = setInputFile(baseSourceConfig, fileName); + edm::test::TestSourceProcessor tester(config); + + { + auto n = tester.findNextTransition(); + REQUIRE(n == edm::InputSource::ItemType::IsFile); + auto f = tester.openFile(); + REQUIRE(bool(f)); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsRun); + auto r = tester.readRun(); + REQUIRE(r.run() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsLumi); + auto r = tester.readLuminosityBlock(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + REQUIRE(r.event() == 1); + auto v = r.get>("thing", "", "TEST"); + REQUIRE(v.isValid()); + REQUIRE(v->size() == 1); + REQUIRE((*v)[0].a == 1); + auto o = r.get>("other", "testUserTag", "TEST"); + REQUIRE(o.isValid()); + REQUIRE(o->size() == 20); + REQUIRE((*o)[0].a == 0); + REQUIRE((*o)[1].a == 1); + REQUIRE((*o)[0].ref.isNonnull()); + REQUIRE((*o)[0].ref->a == 1); + REQUIRE((*o)[0].refProd.isNonnull()); + REQUIRE((*o)[0].refProd->size() == 1); + REQUIRE((*o)[0].refVec.size() == 2); + REQUIRE((*o)[0].refVec[0]->a == 1); + REQUIRE((*o)[0].ptr.isNonnull()); + REQUIRE((*o)[0].ptr->a == 1); + REQUIRE((*o)[0].ptrVec.size() == 2); + REQUIRE((*o)[0].ptrVec[0]->a == 1); + REQUIRE((*o)[0].refToBaseProd.isNonnull()); + REQUIRE((*o)[0].refToBaseProd->size() == 1); + REQUIRE((*o)[0].refToBase.isNonnull()); + REQUIRE((*o)[0].refToBase->a == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsStop); + } + } + std::filesystem::remove(fileName); + } +} diff --git a/IOPool/Common/test/test_catch2_output2input.cc b/IOPool/Common/test/test_catch2_output2input.cc index 8dd8679d9b4ca..3ef41bcf883a3 100644 --- a/IOPool/Common/test/test_catch2_output2input.cc +++ b/IOPool/Common/test/test_catch2_output2input.cc @@ -3,6 +3,7 @@ #include "FWCore/Utilities/interface/Exception.h" #include "DataFormats/TestObjects/interface/Thing.h" +#include "DataFormats/TestObjects/interface/OtherThing.h" #include #include #include "catch.hpp" @@ -143,4 +144,67 @@ process.add_(cms.Service("JobReportService")) } std::filesystem::remove(fileName); } -} \ No newline at end of file + SECTION("EventWithRef") { + const std::string fileName = "ref.root"; + { + auto configString = setOutputFile(baseOutConfig, fileName) + + "process.other = cms.EDProducer('OtherThingProducer', thingTag = cms.InputTag('thing'))\n" + "process.moduleToTest(process.out,cms.Task(process.other))\n"; + + edm::test::TestProcessor::Config config{configString}; + + auto thingToken = config.produces>("thing"); + + edm::test::TestProcessor tester(config); + tester.test(std::make_pair(thingToken, std::make_unique>(1, edmtest::Thing{1}))); + } + { + auto config = setInputFile(baseSourceConfig, fileName); + edm::test::TestSourceProcessor tester(config); + + { + auto n = tester.findNextTransition(); + REQUIRE(n == edm::InputSource::ItemType::IsFile); + auto f = tester.openFile(); + REQUIRE(bool(f)); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsRun); + auto r = tester.readRun(); + REQUIRE(r.run() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsLumi); + auto r = tester.readLuminosityBlock(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + REQUIRE(r.event() == 1); + auto v = r.get>("thing", "", "TEST"); + REQUIRE(v.isValid()); + REQUIRE(v->size() == 1); + REQUIRE((*v)[0].a == 1); + auto o = r.get>("other", "testUserTag", "TEST"); + REQUIRE(o.isValid()); + REQUIRE(o->size() == 20); + REQUIRE((*o)[0].a == 0); + REQUIRE((*o)[1].a == 1); + REQUIRE((*o)[0].ref.isNonnull()); + REQUIRE((*o)[0].ref->a == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsStop); + } + } + std::filesystem::remove(fileName); + } +}