From 8c16376d210416535a94c1554741e5859c8c34b1 Mon Sep 17 00:00:00 2001 From: Michael Chouinard <46358556+chouinar@users.noreply.github.com> Date: Mon, 8 Jul 2024 11:27:03 -0400 Subject: [PATCH] Adjust transformation deletes to handle cascading deletes (navapbc/simpler-grants-gov#103) Note this is a duplicate of https://github.com/HHS/simpler-grants-gov/pull/2000 - just want to pull it into this repo first Updates the transformation code to handle a case where a parent record (ie. opportunity or opportunity_summary) is deleted, AND the child records (everything else) is marked to be deleted as well. Also added a new way to set metrics that handles adding more specific prefixed ones (eg. `total_records_processed` and `opportunity.total_records_processed`) - will expand more on this later. Imagine a scenario an opportunity with a summary (synopsis) and a few applicant types gets deleted. The update process for loading from Oracle will mark all of our staging table records for those as `is_deleted=True`. When we go to process, we'll first process the opportunity, and delete it uneventfully, however we have cascade-deletes setup. This means that all of the children (the opportunity summary, and assistance listing tables among many others) also need to be deleted. SQLAlchemy handles this for us. However, this means when we then start processing the synopsis record that was marked as deleted - we would error and say "I can't delete something that doesn't exist". To work around this, we're okay with these orphan deletes, and we just assume we already took care of it. To further test this, I loaded a subset of the prod data locally (~2500 opportunities, 35k records total). I then marked all of the data is `is_deleted=True, transformed_at=null` and ran it again. It went through the opportunities deleting them. When it got to the other tables, it didn't have to do very much as they all hit the new case. The metrics produced look like: ``` total_records_processed=37002 total_records_deleted=2453 total_delete_orphans_skipped=34549 total_error_count=0 opportunity.total_records_processed=2453 opportunity.total_records_deleted=2453 assistance_listing.total_records_processed=3814 assistance_listing.total_delete_orphans_skipped=3814 opportunity_summary.total_records_processed=3827 opportunity_summary.total_delete_orphans_skipped=3827 applicant_type.total_records_processed=17547 applicant_type.total_delete_orphans_skipped=17547 funding_category.total_records_processed=4947 funding_category.total_delete_orphans_skipped=4947 funding_instrument.total_records_processed=4414 funding_instrument.total_delete_orphans_skipped=4414 ``` And as a sanity check, running again processes nothing. --------- Co-authored-by: nava-platform-bot --- .../transform_oracle_data_task.py | 197 ++++++------ api/src/task/task.py | 4 + .../test_transform_oracle_data_task.py | 283 ++++++++++++++---- 3 files changed, 345 insertions(+), 139 deletions(-) diff --git a/api/src/data_migration/transformation/transform_oracle_data_task.py b/api/src/data_migration/transformation/transform_oracle_data_task.py index 46b58b05c..4018f0767 100644 --- a/api/src/data_migration/transformation/transform_oracle_data_task.py +++ b/api/src/data_migration/transformation/transform_oracle_data_task.py @@ -49,9 +49,17 @@ logger = logging.getLogger(__name__) -# Constants +### Constants ORPHANED_CFDA = "orphaned_cfda" ORPHANED_HISTORICAL_RECORD = "orphaned_historical_record" +ORPHANED_DELETE_RECORD = "orphaned_delete_record" + +OPPORTUNITY = "opportunity" +ASSISTANCE_LISTING = "assistance_listing" +OPPORTUNITY_SUMMARY = "opportunity_summary" +APPLICANT_TYPE = "applicant_type" +FUNDING_CATEGORY = "funding_category" +FUNDING_INSTRUMENT = "funding_instrument" class TransformOracleDataTask(Task): @@ -63,6 +71,7 @@ class Metrics(StrEnum): TOTAL_RECORDS_ORPHANED = "total_records_orphaned" TOTAL_DUPLICATE_RECORDS_SKIPPED = "total_duplicate_records_skipped" TOTAL_HISTORICAL_ORPHANS_SKIPPED = "total_historical_orphans_skipped" + TOTAL_DELETE_ORPHANS_SKIPPED = "total_delete_orphans_skipped" TOTAL_ERROR_COUNT = "total_error_count" @@ -89,6 +98,33 @@ def run_task(self) -> None: self.process_link_funding_categories() self.process_link_funding_instruments() + def _handle_delete( + self, + source: S, + target: D | None, + record_type: str, + extra: dict, + error_on_missing_target: bool = False, + ) -> None: + # If the target we want to delete is None, we have nothing to delete + if target is None: + # In some scenarios we want to error when this happens + if error_on_missing_target: + raise ValueError("Cannot delete %s record as it does not exist" % record_type) + + # In a lot of scenarios, we actually just want to log a message as it is expected to happen + # For example, if we are deleting an opportunity_summary record, and already deleted the opportunity, + # then SQLAlchemy would have deleted the opportunity_summary for us already. When we later go to delete + # it, we'd hit this case, which isn't a problem. + logger.info("Cannot delete %s record as it does not exist", record_type, extra=extra) + source.transformation_notes = ORPHANED_DELETE_RECORD + self.increment(self.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED, prefix=record_type) + return + + logger.info("Deleting %s record", record_type, extra=extra) + self.increment(self.Metrics.TOTAL_RECORDS_DELETED, prefix=record_type) + self.db_session.delete(target) + def fetch( self, source_model: Type[S], destination_model: Type[D], join_clause: Sequence ) -> list[Tuple[S, D | None]]: @@ -175,7 +211,7 @@ def process_opportunities(self) -> None: try: self.process_opportunity(source_opportunity, target_opportunity) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=OPPORTUNITY) logger.exception( "Failed to process opportunity", extra={"opportunity_id": source_opportunity.opportunity_id}, @@ -184,18 +220,18 @@ def process_opportunities(self) -> None: def process_opportunity( self, source_opportunity: Topportunity, target_opportunity: Opportunity | None ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=OPPORTUNITY) extra = {"opportunity_id": source_opportunity.opportunity_id} logger.info("Processing opportunity", extra=extra) if source_opportunity.is_deleted: - logger.info("Deleting opportunity", extra=extra) - - if target_opportunity is None: - raise ValueError("Cannot delete opportunity as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_opportunity) + self._handle_delete( + source_opportunity, + target_opportunity, + OPPORTUNITY, + extra, + error_on_missing_target=True, + ) else: # To avoid incrementing metrics for records we fail to transform, record @@ -208,10 +244,10 @@ def process_opportunity( ) if is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=OPPORTUNITY) self.db_session.add(transformed_opportunity) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=OPPORTUNITY) self.db_session.merge(transformed_opportunity) logger.info("Processed opportunity", extra=extra) @@ -239,7 +275,7 @@ def process_assistance_listings(self) -> None: source_assistance_listing, target_assistance_listing, opportunity ) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=ASSISTANCE_LISTING) logger.exception( "Failed to process assistance listing", extra={ @@ -253,35 +289,31 @@ def process_assistance_listing( target_assistance_listing: OpportunityAssistanceListing | None, opportunity: Opportunity | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=ASSISTANCE_LISTING) extra = { "opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id, "opportunity_id": source_assistance_listing.opportunity_id, } logger.info("Processing assistance listing", extra=extra) - if opportunity is None: + if source_assistance_listing.is_deleted: + self._handle_delete( + source_assistance_listing, target_assistance_listing, ASSISTANCE_LISTING, extra + ) + + elif opportunity is None: # The Oracle system we're importing these from does not have a foreign key between # the opportunity ID in the TOPPORTUNITY_CFDA table and the TOPPORTUNITY table. # There are many (2306 as of writing) orphaned CFDA records, created between 2007 and 2011 # We don't want to continuously process these, so won't error for these, and will just # mark them as transformed below. - self.increment(self.Metrics.TOTAL_RECORDS_ORPHANED) + self.increment(self.Metrics.TOTAL_RECORDS_ORPHANED, prefix=ASSISTANCE_LISTING) logger.info( "Assistance listing is orphaned and does not connect to any opportunity", extra=extra, ) source_assistance_listing.transformation_notes = ORPHANED_CFDA - elif source_assistance_listing.is_deleted: - logger.info("Deleting assistance listing", extra=extra) - - if target_assistance_listing is None: - raise ValueError("Cannot delete assistance listing as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_assistance_listing) - else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -293,10 +325,10 @@ def process_assistance_listing( ) if is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=ASSISTANCE_LISTING) self.db_session.add(transformed_assistance_listing) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=ASSISTANCE_LISTING) self.db_session.merge(transformed_assistance_listing) logger.info("Processed assistance listing", extra=extra) @@ -359,7 +391,7 @@ def process_opportunity_summary_group( try: self.process_opportunity_summary(source_summary, target_summary, opportunity) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=OPPORTUNITY_SUMMARY) logger.exception( "Failed to process opportunity summary", extra=transform_util.get_log_extra_summary(source_summary), @@ -371,21 +403,26 @@ def process_opportunity_summary( target_summary: OpportunitySummary | None, opportunity: Opportunity | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=OPPORTUNITY_SUMMARY) extra = transform_util.get_log_extra_summary(source_summary) logger.info("Processing opportunity summary", extra=extra) + if source_summary.is_deleted: + self._handle_delete(source_summary, target_summary, OPPORTUNITY_SUMMARY, extra) + # Historical records are linked to other historical records, however # we don't import historical opportunity records, so if the opportunity # was deleted, we don't have anything to link these to. Whenever we do # support historical opportunities, we'll have these all marked with a # flag that we can use to reprocess these. - if opportunity is None and source_summary.is_historical_table: + elif opportunity is None and source_summary.is_historical_table: logger.warning( "Historical opportunity summary does not have a corresponding opportunity - cannot import, but will mark as processed", extra=extra, ) - self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED) + self.increment( + self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED, prefix=OPPORTUNITY_SUMMARY + ) source_summary.transformation_notes = ORPHANED_HISTORICAL_RECORD elif opportunity is None: @@ -395,15 +432,6 @@ def process_opportunity_summary( "Opportunity summary cannot be processed as the opportunity for it does not exist" ) - elif source_summary.is_deleted: - logger.info("Deleting opportunity summary", extra=extra) - - if target_summary is None: - raise ValueError("Cannot delete opportunity summary as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_summary) - else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -415,10 +443,10 @@ def process_opportunity_summary( ) if is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=OPPORTUNITY_SUMMARY) self.db_session.add(transformed_opportunity_summary) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=OPPORTUNITY_SUMMARY) self.db_session.merge(transformed_opportunity_summary) logger.info("Processed opportunity summary", extra=extra) @@ -504,7 +532,7 @@ def process_link_applicant_types_group( source_applicant_type, target_applicant_type, opportunity_summary ) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=APPLICANT_TYPE) logger.exception( "Failed to process opportunity summary applicant type", extra=transform_util.get_log_extra_applicant_type(source_applicant_type), @@ -516,21 +544,24 @@ def process_link_applicant_type( target_applicant_type: LinkOpportunitySummaryApplicantType | None, opportunity_summary: OpportunitySummary | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=APPLICANT_TYPE) extra = transform_util.get_log_extra_applicant_type(source_applicant_type) logger.info("Processing applicant type", extra=extra) + if source_applicant_type.is_deleted: + self._handle_delete(source_applicant_type, target_applicant_type, APPLICANT_TYPE, extra) + # Historical records are linked to other historical records, however # we don't import historical opportunity records, so if the opportunity # was deleted, we won't have created the opportunity summary. Whenever we do # support historical opportunities, we'll have these all marked with a # flag that we can use to reprocess these. - if opportunity_summary is None and source_applicant_type.is_historical_table: + elif opportunity_summary is None and source_applicant_type.is_historical_table: logger.warning( "Historical applicant type does not have a corresponding opportunity summary - cannot import, but will mark as processed", extra=extra, ) - self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED) + self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED, prefix=APPLICANT_TYPE) source_applicant_type.transformation_notes = ORPHANED_HISTORICAL_RECORD elif opportunity_summary is None: @@ -539,15 +570,6 @@ def process_link_applicant_type( raise ValueError( "Applicant type record cannot be processed as the opportunity summary for it does not exist" ) - - elif source_applicant_type.is_deleted: - logger.info("Deleting applicant type", extra=extra) - - if target_applicant_type is None: - raise ValueError("Cannot delete applicant type as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_applicant_type) else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -566,18 +588,18 @@ def process_link_applicant_type( is_insert and transformed_applicant_type.applicant_type in opportunity_summary.applicant_types ): - self.increment(self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED) + self.increment(self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED, prefix=APPLICANT_TYPE) logger.warning( "Skipping applicant type record", extra=extra | {"applicant_type": transformed_applicant_type.applicant_type}, ) elif is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=APPLICANT_TYPE) # We append to the relationship so SQLAlchemy immediately attaches it to its cached # opportunity summary object so that the above check works when we receive dupes in the same batch opportunity_summary.link_applicant_types.append(transformed_applicant_type) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=APPLICANT_TYPE) self.db_session.merge(transformed_applicant_type) logger.info("Processed applicant type", extra=extra) @@ -663,7 +685,7 @@ def process_link_funding_categories_group( source_funding_category, target_funding_category, opportunity_summary ) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=FUNDING_CATEGORY) logger.exception( "Failed to process opportunity summary funding category", extra=transform_util.get_log_extra_funding_category(source_funding_category), @@ -675,21 +697,26 @@ def process_link_funding_category( target_funding_category: LinkOpportunitySummaryFundingCategory | None, opportunity_summary: OpportunitySummary | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=FUNDING_CATEGORY) extra = transform_util.get_log_extra_funding_category(source_funding_category) logger.info("Processing funding category", extra=extra) + if source_funding_category.is_deleted: + self._handle_delete( + source_funding_category, target_funding_category, FUNDING_CATEGORY, extra + ) + # Historical records are linked to other historical records, however # we don't import historical opportunity records, so if the opportunity # was deleted, we won't have created the opportunity summary. Whenever we do # support historical opportunities, we'll have these all marked with a # flag that we can use to reprocess these. - if opportunity_summary is None and source_funding_category.is_historical_table: + elif opportunity_summary is None and source_funding_category.is_historical_table: logger.warning( "Historical funding category does not have a corresponding opportunity summary - cannot import, but will mark as processed", extra=extra, ) - self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED) + self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED, prefix=FUNDING_CATEGORY) source_funding_category.transformation_notes = ORPHANED_HISTORICAL_RECORD elif opportunity_summary is None: @@ -698,15 +725,6 @@ def process_link_funding_category( raise ValueError( "Funding category record cannot be processed as the opportunity summary for it does not exist" ) - - elif source_funding_category.is_deleted: - logger.info("Deleting funding category", extra=extra) - - if target_funding_category is None: - raise ValueError("Cannot delete funding category as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_funding_category) else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -728,19 +746,21 @@ def process_link_funding_category( and transformed_funding_category.funding_category in opportunity_summary.funding_categories ): - self.increment(self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED) + self.increment( + self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED, prefix=FUNDING_CATEGORY + ) logger.warning( "Skipping funding category record", extra=extra | {"funding_category": transformed_funding_category.funding_category}, ) elif is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=FUNDING_CATEGORY) # We append to the relationship so SQLAlchemy immediately attaches it to its cached # opportunity summary object so that the above check works when we receive dupes in the same batch opportunity_summary.link_funding_categories.append(transformed_funding_category) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=FUNDING_CATEGORY) self.db_session.merge(transformed_funding_category) logger.info("Processed funding category", extra=extra) @@ -826,7 +846,7 @@ def process_link_funding_instruments_group( source_funding_instrument, target_funding_instrument, opportunity_summary ) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=FUNDING_INSTRUMENT) logger.exception( "Failed to process opportunity summary funding instrument", extra=transform_util.get_log_extra_funding_instrument( @@ -840,21 +860,26 @@ def process_link_funding_instrument( target_funding_instrument: LinkOpportunitySummaryFundingInstrument | None, opportunity_summary: OpportunitySummary | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=FUNDING_INSTRUMENT) extra = transform_util.get_log_extra_funding_instrument(source_funding_instrument) logger.info("Processing funding instrument", extra=extra) + if source_funding_instrument.is_deleted: + self._handle_delete( + source_funding_instrument, target_funding_instrument, FUNDING_INSTRUMENT, extra + ) + # Historical records are linked to other historical records, however # we don't import historical opportunity records, so if the opportunity # was deleted, we won't have created the opportunity summary. Whenever we do # support historical opportunities, we'll have these all marked with a # flag that we can use to reprocess these. - if opportunity_summary is None and source_funding_instrument.is_historical_table: + elif opportunity_summary is None and source_funding_instrument.is_historical_table: logger.warning( "Historical funding instrument does not have a corresponding opportunity summary - cannot import, but will mark as processed", extra=extra, ) - self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED) + self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED, prefix=FUNDING_INSTRUMENT) source_funding_instrument.transformation_notes = ORPHANED_HISTORICAL_RECORD elif opportunity_summary is None: @@ -864,14 +889,6 @@ def process_link_funding_instrument( "Funding instrument record cannot be processed as the opportunity summary for it does not exist" ) - elif source_funding_instrument.is_deleted: - logger.info("Deleting funding instrument", extra=extra) - - if target_funding_instrument is None: - raise ValueError("Cannot delete funding instrument as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_funding_instrument) else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -893,19 +910,21 @@ def process_link_funding_instrument( and transformed_funding_instrument.funding_instrument in opportunity_summary.funding_instruments ): - self.increment(self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED) + self.increment( + self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED, prefix=FUNDING_INSTRUMENT + ) logger.warning( "Skipping funding instrument record", extra=extra | {"funding_instrument": transformed_funding_instrument.funding_instrument}, ) elif is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=FUNDING_INSTRUMENT) # We append to the relationship so SQLAlchemy immediately attaches it to its cached # opportunity summary object so that the above check works when we receive dupes in the same batch opportunity_summary.link_funding_instruments.append(transformed_funding_instrument) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=FUNDING_INSTRUMENT) self.db_session.merge(transformed_funding_instrument) logger.info("Processed funding instrument", extra=extra) diff --git a/api/src/task/task.py b/api/src/task/task.py index 8c29744bc..f1619d0ac 100644 --- a/api/src/task/task.py +++ b/api/src/task/task.py @@ -63,6 +63,10 @@ def increment(self, name: str, value: int = 1, prefix: str | None = None) -> Non self.metrics[name] += value + if prefix is not None: + # Rather than re-implement the above, just re-use the function without a prefix + self.increment(f"{prefix}.{name}", value, prefix=None) + def cls_name(self) -> str: return self.__class__.__name__ diff --git a/api/tests/src/data_migration/transformation/test_transform_oracle_data_task.py b/api/tests/src/data_migration/transformation/test_transform_oracle_data_task.py index 20ee40c11..f4d07bfdd 100644 --- a/api/tests/src/data_migration/transformation/test_transform_oracle_data_task.py +++ b/api/tests/src/data_migration/transformation/test_transform_oracle_data_task.py @@ -727,7 +727,9 @@ def test_process_opportunity_delete_but_current_missing( # Verify an error is raised when we try to delete something that doesn't exist delete_but_current_missing = setup_opportunity(create_existing=False, is_delete=True) - with pytest.raises(ValueError, match="Cannot delete opportunity as it does not exist"): + with pytest.raises( + ValueError, match="Cannot delete opportunity record as it does not exist" + ): transform_oracle_data_task.process_opportunity(delete_but_current_missing, None) validate_opportunity(db_session, delete_but_current_missing, expect_in_db=False) @@ -823,16 +825,16 @@ def test_process_opportunity_assistance_listings(self, db_session, transform_ora assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 3 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_ORPHANED] == 2 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 - # Rerunning just attempts to re-process the error record, nothing else gets picked up + # Rerunning finds nothing - no metrics update transform_oracle_data_task.process_assistance_listings() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 11 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 10 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 3 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_ORPHANED] == 2 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 def test_process_assistance_listing_orphaned_record( self, db_session, transform_oracle_data_task @@ -880,14 +882,13 @@ def test_process_assistance_listing_delete_but_current_missing( create_existing=False, is_delete=True, opportunity=opportunity ) - with pytest.raises( - ValueError, match="Cannot delete assistance listing as it does not exist" - ): - transform_oracle_data_task.process_assistance_listing( - delete_but_current_missing, None, opportunity - ) + transform_oracle_data_task.process_assistance_listing( + delete_but_current_missing, None, opportunity + ) validate_assistance_listing(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" class TestTransformOpportunitySummary(BaseTestClass): @@ -1044,15 +1045,17 @@ def test_process_opportunity_summaries(self, db_session, transform_oracle_data_t assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 5 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 4 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 3 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 - # Rerunning will only attempt to re-process the errors, so total+errors goes up by 4 + # Rerunning will only attempt to re-process the errors, so total+errors goes up by 3 transform_oracle_data_task.process_opportunity_summaries() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 22 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 21 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 5 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 8 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 6 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 @pytest.mark.parametrize( "is_forecast,revision_number", [(True, None), (False, None), (True, 5), (False, 10)] @@ -1071,12 +1074,13 @@ def test_process_opportunity_summary_delete_but_current_missing( opportunity=opportunity, ) - with pytest.raises( - ValueError, match="Cannot delete opportunity summary as it does not exist" - ): - transform_oracle_data_task.process_opportunity_summary( - delete_but_current_missing, None, opportunity - ) + transform_oracle_data_task.process_opportunity_summary( + delete_but_current_missing, None, opportunity + ) + + validate_opportunity_summary(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" @pytest.mark.parametrize( "is_forecast,revision_number,source_values,expected_error", @@ -1426,7 +1430,7 @@ def test_process_applicant_types(self, db_session, transform_oracle_data_task): ) validate_applicant_type( - db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=False + db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=True ) validate_applicant_type( db_session, syn_hist_insert_invalid_type, expect_in_db=False, was_processed=False @@ -1441,17 +1445,19 @@ def test_process_applicant_types(self, db_session, transform_oracle_data_task): assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 8 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 1 assert metrics[transform_oracle_data_task.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 - # Rerunning will only attempt to re-process the errors, so total+errors goes up by 2 + # Rerunning will only attempt to re-process the errors, so total+errors goes up by 1 transform_oracle_data_task.process_link_applicant_types() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 25 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 24 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 8 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 4 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 @pytest.mark.parametrize( "is_forecast,revision_number", [(True, None), (False, None), (True, 5), (False, 10)] @@ -1469,10 +1475,13 @@ def test_process_applicant_types_but_current_missing( is_delete=True, ) - with pytest.raises(ValueError, match="Cannot delete applicant type as it does not exist"): - transform_oracle_data_task.process_link_applicant_type( - delete_but_current_missing, None, opportunity_summary - ) + transform_oracle_data_task.process_link_applicant_type( + delete_but_current_missing, None, opportunity_summary + ) + + validate_applicant_type(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" @pytest.mark.parametrize( "is_forecast,revision_number,legacy_lookup_value", @@ -1714,7 +1723,7 @@ def test_process_funding_instruments(self, db_session, transform_oracle_data_tas ) validate_funding_instrument( - db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=False + db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=True ) validate_funding_instrument( db_session, syn_hist_insert_invalid_type, expect_in_db=False, was_processed=False @@ -1725,15 +1734,17 @@ def test_process_funding_instruments(self, db_session, transform_oracle_data_tas assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 2 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 # Rerunning will only attempt to re-process the errors, so total+errors goes up by 2 transform_oracle_data_task.process_link_funding_instruments() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 16 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 15 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 2 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 4 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 @pytest.mark.parametrize( "is_forecast,revision_number", [(True, None), (False, None), (True, 1), (False, 4)] @@ -1751,12 +1762,13 @@ def test_process_funding_instrument_but_current_missing( is_delete=True, ) - with pytest.raises( - ValueError, match="Cannot delete funding instrument as it does not exist" - ): - transform_oracle_data_task.process_link_funding_instrument( - delete_but_current_missing, None, opportunity_summary - ) + transform_oracle_data_task.process_link_funding_instrument( + delete_but_current_missing, None, opportunity_summary + ) + + validate_funding_instrument(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" @pytest.mark.parametrize( "is_forecast,revision_number,legacy_lookup_value", @@ -2073,7 +2085,7 @@ def test_process_funding_categories(self, db_session, transform_oracle_data_task ) validate_funding_category( - db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=False + db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=True ) validate_funding_category( db_session, syn_hist_insert_invalid_type, expect_in_db=False, was_processed=False @@ -2084,15 +2096,17 @@ def test_process_funding_categories(self, db_session, transform_oracle_data_task assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 9 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 4 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 - # Rerunning will only attempt to re-process the errors, so total+errors goes up by 2 + # Rerunning will only attempt to re-process the errors, so total+errors goes up by 1 transform_oracle_data_task.process_link_funding_categories() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 24 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 23 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 9 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 4 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 4 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 @pytest.mark.parametrize( "is_forecast,revision_number", [(True, None), (False, None), (True, 1), (False, 70)] @@ -2110,10 +2124,13 @@ def test_process_funding_category_but_current_missing( is_delete=True, ) - with pytest.raises(ValueError, match="Cannot delete funding category as it does not exist"): - transform_oracle_data_task.process_link_funding_category( - delete_but_current_missing, None, opportunity_summary - ) + transform_oracle_data_task.process_link_funding_category( + delete_but_current_missing, None, opportunity_summary + ) + + validate_funding_category(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" @pytest.mark.parametrize( "is_forecast,revision_number,legacy_lookup_value", @@ -2581,6 +2598,7 @@ def test_mix_of_inserts_updates_deletes(self, db_session, transform_oracle_data_ ) validate_summary_and_nested(db_session, synopsis_hist_insert, [], [], []) + print(transform_oracle_data_task.metrics) assert { transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED: 41, transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED: 8, @@ -2588,5 +2606,170 @@ def test_mix_of_inserts_updates_deletes(self, db_session, transform_oracle_data_ transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED: 8, transform_oracle_data_task.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED: 15, transform_oracle_data_task.Metrics.TOTAL_RECORDS_ORPHANED: 0, - transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT: 1, + transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED: 1, + }.items() <= transform_oracle_data_task.metrics.items() + + def test_delete_opportunity_with_deleted_children(self, db_session, transform_oracle_data_task): + # We create an opportunity with a synopsis/forecast record, and various other child values + # We then delete all of them at once. Deleting the opportunity will recursively delete the others + # but we'll still have delete events for the others - this verfies how we handle that. + existing_opportunity = f.OpportunityFactory( + no_current_summary=True, opportunity_assistance_listings=[] + ) + opportunity = f.StagingTopportunityFactory( + opportunity_id=existing_opportunity.opportunity_id, cfdas=[], is_deleted=True + ) + + cfda = setup_cfda(create_existing=True, is_delete=True, opportunity=existing_opportunity) + + ### Forecast - has several children that will be deleted + summary_forecast = f.OpportunitySummaryFactory( + is_forecast=True, opportunity=existing_opportunity, no_link_values=True + ) + forecast = f.StagingTforecastFactory(opportunity=opportunity, is_deleted=True) + forecast_applicant_type = f.StagingTapplicanttypesForecastFactory( + forecast=forecast, at_id="04", at_frcst_id=91001, is_deleted=True + ) + f.LinkOpportunitySummaryApplicantTypeFactory( + opportunity_summary=summary_forecast, + applicant_type=ApplicantType.SPECIAL_DISTRICT_GOVERNMENTS, + legacy_applicant_type_id=91001, + ) + forecast_funding_category = f.StagingTfundactcatForecastFactory( + forecast=forecast, fac_id="ST", fac_frcst_id=92001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingCategoryFactory( + opportunity_summary=summary_forecast, + funding_category=FundingCategory.SCIENCE_TECHNOLOGY_AND_OTHER_RESEARCH_AND_DEVELOPMENT, + legacy_funding_category_id=92001, + ) + forecast_funding_instrument = f.StagingTfundinstrForecastFactory( + forecast=forecast, fi_id="O", fi_frcst_id=93001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingInstrumentFactory( + opportunity_summary=summary_forecast, + funding_instrument=FundingInstrument.OTHER, + legacy_funding_instrument_id=93001, + ) + + ### Synopsis + summary_synopsis = f.OpportunitySummaryFactory( + is_forecast=False, opportunity=existing_opportunity, no_link_values=True + ) + synopsis = f.StagingTsynopsisFactory(opportunity=opportunity, is_deleted=True) + synopsis_applicant_type = f.StagingTapplicanttypesSynopsisFactory( + synopsis=synopsis, at_id="21", at_syn_id=81001, is_deleted=True + ) + f.LinkOpportunitySummaryApplicantTypeFactory( + opportunity_summary=summary_synopsis, + applicant_type=ApplicantType.INDIVIDUALS, + legacy_applicant_type_id=81001, + ) + synopsis_funding_category = f.StagingTfundactcatSynopsisFactory( + synopsis=synopsis, fac_id="HL", fac_syn_id=82001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingCategoryFactory( + opportunity_summary=summary_synopsis, + funding_category=FundingCategory.HEALTH, + legacy_funding_category_id=82001, + ) + synopsis_funding_instrument = f.StagingTfundinstrSynopsisFactory( + synopsis=synopsis, fi_id="G", fi_syn_id=83001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingInstrumentFactory( + opportunity_summary=summary_synopsis, + funding_instrument=FundingInstrument.GRANT, + legacy_funding_instrument_id=83001, + ) + # Need to put an expire all so SQLAlchemy doesn't read from its cache + # otherwise when it does the recursive deletes, it doesn't see the later-added link table objects + db_session.expire_all() + + transform_oracle_data_task.run_task() + print(transform_oracle_data_task.metrics) + + # verify everything is not in the DB + validate_opportunity(db_session, opportunity, expect_in_db=False) + validate_assistance_listing(db_session, cfda, expect_in_db=False) + validate_opportunity_summary(db_session, forecast, expect_in_db=False) + validate_opportunity_summary(db_session, synopsis, expect_in_db=False) + + validate_applicant_type(db_session, forecast_applicant_type, expect_in_db=False) + validate_applicant_type(db_session, synopsis_applicant_type, expect_in_db=False) + + validate_funding_category(db_session, forecast_funding_category, expect_in_db=False) + validate_funding_category(db_session, synopsis_funding_category, expect_in_db=False) + + validate_funding_instrument(db_session, forecast_funding_instrument, expect_in_db=False) + validate_funding_instrument(db_session, synopsis_funding_instrument, expect_in_db=False) + + assert { + transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED: 10, + # Despite processing 10 records, only the opportunity is actually deleted directly + transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED: 1, + f"opportunity.{transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED}": 1, + transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED: 9, + }.items() <= transform_oracle_data_task.metrics.items() + + def test_delete_opportunity_summary_with_deleted_children( + self, db_session, transform_oracle_data_task + ): + # Similar to the above test, but we're leaving the opportunity alone and just deleting + # an opportunity summary. Should be the same thing, just on a smaller scale. + existing_opportunity = f.OpportunityFactory( + no_current_summary=True, opportunity_assistance_listings=[] + ) + opportunity = f.StagingTopportunityFactory( + opportunity_id=existing_opportunity.opportunity_id, cfdas=[], already_transformed=True + ) + + summary_synopsis = f.OpportunitySummaryFactory( + is_forecast=False, opportunity=existing_opportunity, no_link_values=True + ) + synopsis = f.StagingTsynopsisFactory(opportunity=opportunity, is_deleted=True) + synopsis_applicant_type = f.StagingTapplicanttypesSynopsisFactory( + synopsis=synopsis, at_id="21", at_syn_id=71001, is_deleted=True + ) + f.LinkOpportunitySummaryApplicantTypeFactory( + opportunity_summary=summary_synopsis, + applicant_type=ApplicantType.INDIVIDUALS, + legacy_applicant_type_id=71001, + ) + synopsis_funding_category = f.StagingTfundactcatSynopsisFactory( + synopsis=synopsis, fac_id="HL", fac_syn_id=72001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingCategoryFactory( + opportunity_summary=summary_synopsis, + funding_category=FundingCategory.HEALTH, + legacy_funding_category_id=72001, + ) + synopsis_funding_instrument = f.StagingTfundinstrSynopsisFactory( + synopsis=synopsis, fi_id="G", fi_syn_id=73001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingInstrumentFactory( + opportunity_summary=summary_synopsis, + funding_instrument=FundingInstrument.GRANT, + legacy_funding_instrument_id=73001, + ) + # Need to put an expire all so SQLAlchemy doesn't read from its cache + # otherwise when it does the recursive deletes, it doesn't see the later-added link table objects + db_session.expire_all() + + transform_oracle_data_task.run_task() + + # verify everything is not in the DB + validate_opportunity( + db_session, opportunity, expect_in_db=True, expect_values_to_match=False + ) + validate_opportunity_summary(db_session, synopsis, expect_in_db=False) + validate_applicant_type(db_session, synopsis_applicant_type, expect_in_db=False) + validate_funding_category(db_session, synopsis_funding_category, expect_in_db=False) + validate_funding_instrument(db_session, synopsis_funding_instrument, expect_in_db=False) + + assert { + transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED: 4, + # Despite processing 4 records, only the opportunity_summary is actually deleted directly + transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED: 1, + f"opportunity_summary.{transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED}": 1, + transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED: 3, }.items() <= transform_oracle_data_task.metrics.items()