diff --git a/api/src/data_migration/transformation/transform_oracle_data_task.py b/api/src/data_migration/transformation/transform_oracle_data_task.py index 46b58b05c..4018f0767 100644 --- a/api/src/data_migration/transformation/transform_oracle_data_task.py +++ b/api/src/data_migration/transformation/transform_oracle_data_task.py @@ -49,9 +49,17 @@ logger = logging.getLogger(__name__) -# Constants +### Constants ORPHANED_CFDA = "orphaned_cfda" ORPHANED_HISTORICAL_RECORD = "orphaned_historical_record" +ORPHANED_DELETE_RECORD = "orphaned_delete_record" + +OPPORTUNITY = "opportunity" +ASSISTANCE_LISTING = "assistance_listing" +OPPORTUNITY_SUMMARY = "opportunity_summary" +APPLICANT_TYPE = "applicant_type" +FUNDING_CATEGORY = "funding_category" +FUNDING_INSTRUMENT = "funding_instrument" class TransformOracleDataTask(Task): @@ -63,6 +71,7 @@ class Metrics(StrEnum): TOTAL_RECORDS_ORPHANED = "total_records_orphaned" TOTAL_DUPLICATE_RECORDS_SKIPPED = "total_duplicate_records_skipped" TOTAL_HISTORICAL_ORPHANS_SKIPPED = "total_historical_orphans_skipped" + TOTAL_DELETE_ORPHANS_SKIPPED = "total_delete_orphans_skipped" TOTAL_ERROR_COUNT = "total_error_count" @@ -89,6 +98,33 @@ def run_task(self) -> None: self.process_link_funding_categories() self.process_link_funding_instruments() + def _handle_delete( + self, + source: S, + target: D | None, + record_type: str, + extra: dict, + error_on_missing_target: bool = False, + ) -> None: + # If the target we want to delete is None, we have nothing to delete + if target is None: + # In some scenarios we want to error when this happens + if error_on_missing_target: + raise ValueError("Cannot delete %s record as it does not exist" % record_type) + + # In a lot of scenarios, we actually just want to log a message as it is expected to happen + # For example, if we are deleting an opportunity_summary record, and already deleted the opportunity, + # then SQLAlchemy would have deleted the opportunity_summary for us already. When we later go to delete + # it, we'd hit this case, which isn't a problem. + logger.info("Cannot delete %s record as it does not exist", record_type, extra=extra) + source.transformation_notes = ORPHANED_DELETE_RECORD + self.increment(self.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED, prefix=record_type) + return + + logger.info("Deleting %s record", record_type, extra=extra) + self.increment(self.Metrics.TOTAL_RECORDS_DELETED, prefix=record_type) + self.db_session.delete(target) + def fetch( self, source_model: Type[S], destination_model: Type[D], join_clause: Sequence ) -> list[Tuple[S, D | None]]: @@ -175,7 +211,7 @@ def process_opportunities(self) -> None: try: self.process_opportunity(source_opportunity, target_opportunity) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=OPPORTUNITY) logger.exception( "Failed to process opportunity", extra={"opportunity_id": source_opportunity.opportunity_id}, @@ -184,18 +220,18 @@ def process_opportunities(self) -> None: def process_opportunity( self, source_opportunity: Topportunity, target_opportunity: Opportunity | None ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=OPPORTUNITY) extra = {"opportunity_id": source_opportunity.opportunity_id} logger.info("Processing opportunity", extra=extra) if source_opportunity.is_deleted: - logger.info("Deleting opportunity", extra=extra) - - if target_opportunity is None: - raise ValueError("Cannot delete opportunity as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_opportunity) + self._handle_delete( + source_opportunity, + target_opportunity, + OPPORTUNITY, + extra, + error_on_missing_target=True, + ) else: # To avoid incrementing metrics for records we fail to transform, record @@ -208,10 +244,10 @@ def process_opportunity( ) if is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=OPPORTUNITY) self.db_session.add(transformed_opportunity) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=OPPORTUNITY) self.db_session.merge(transformed_opportunity) logger.info("Processed opportunity", extra=extra) @@ -239,7 +275,7 @@ def process_assistance_listings(self) -> None: source_assistance_listing, target_assistance_listing, opportunity ) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=ASSISTANCE_LISTING) logger.exception( "Failed to process assistance listing", extra={ @@ -253,35 +289,31 @@ def process_assistance_listing( target_assistance_listing: OpportunityAssistanceListing | None, opportunity: Opportunity | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=ASSISTANCE_LISTING) extra = { "opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id, "opportunity_id": source_assistance_listing.opportunity_id, } logger.info("Processing assistance listing", extra=extra) - if opportunity is None: + if source_assistance_listing.is_deleted: + self._handle_delete( + source_assistance_listing, target_assistance_listing, ASSISTANCE_LISTING, extra + ) + + elif opportunity is None: # The Oracle system we're importing these from does not have a foreign key between # the opportunity ID in the TOPPORTUNITY_CFDA table and the TOPPORTUNITY table. # There are many (2306 as of writing) orphaned CFDA records, created between 2007 and 2011 # We don't want to continuously process these, so won't error for these, and will just # mark them as transformed below. - self.increment(self.Metrics.TOTAL_RECORDS_ORPHANED) + self.increment(self.Metrics.TOTAL_RECORDS_ORPHANED, prefix=ASSISTANCE_LISTING) logger.info( "Assistance listing is orphaned and does not connect to any opportunity", extra=extra, ) source_assistance_listing.transformation_notes = ORPHANED_CFDA - elif source_assistance_listing.is_deleted: - logger.info("Deleting assistance listing", extra=extra) - - if target_assistance_listing is None: - raise ValueError("Cannot delete assistance listing as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_assistance_listing) - else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -293,10 +325,10 @@ def process_assistance_listing( ) if is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=ASSISTANCE_LISTING) self.db_session.add(transformed_assistance_listing) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=ASSISTANCE_LISTING) self.db_session.merge(transformed_assistance_listing) logger.info("Processed assistance listing", extra=extra) @@ -359,7 +391,7 @@ def process_opportunity_summary_group( try: self.process_opportunity_summary(source_summary, target_summary, opportunity) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=OPPORTUNITY_SUMMARY) logger.exception( "Failed to process opportunity summary", extra=transform_util.get_log_extra_summary(source_summary), @@ -371,21 +403,26 @@ def process_opportunity_summary( target_summary: OpportunitySummary | None, opportunity: Opportunity | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=OPPORTUNITY_SUMMARY) extra = transform_util.get_log_extra_summary(source_summary) logger.info("Processing opportunity summary", extra=extra) + if source_summary.is_deleted: + self._handle_delete(source_summary, target_summary, OPPORTUNITY_SUMMARY, extra) + # Historical records are linked to other historical records, however # we don't import historical opportunity records, so if the opportunity # was deleted, we don't have anything to link these to. Whenever we do # support historical opportunities, we'll have these all marked with a # flag that we can use to reprocess these. - if opportunity is None and source_summary.is_historical_table: + elif opportunity is None and source_summary.is_historical_table: logger.warning( "Historical opportunity summary does not have a corresponding opportunity - cannot import, but will mark as processed", extra=extra, ) - self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED) + self.increment( + self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED, prefix=OPPORTUNITY_SUMMARY + ) source_summary.transformation_notes = ORPHANED_HISTORICAL_RECORD elif opportunity is None: @@ -395,15 +432,6 @@ def process_opportunity_summary( "Opportunity summary cannot be processed as the opportunity for it does not exist" ) - elif source_summary.is_deleted: - logger.info("Deleting opportunity summary", extra=extra) - - if target_summary is None: - raise ValueError("Cannot delete opportunity summary as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_summary) - else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -415,10 +443,10 @@ def process_opportunity_summary( ) if is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=OPPORTUNITY_SUMMARY) self.db_session.add(transformed_opportunity_summary) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=OPPORTUNITY_SUMMARY) self.db_session.merge(transformed_opportunity_summary) logger.info("Processed opportunity summary", extra=extra) @@ -504,7 +532,7 @@ def process_link_applicant_types_group( source_applicant_type, target_applicant_type, opportunity_summary ) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=APPLICANT_TYPE) logger.exception( "Failed to process opportunity summary applicant type", extra=transform_util.get_log_extra_applicant_type(source_applicant_type), @@ -516,21 +544,24 @@ def process_link_applicant_type( target_applicant_type: LinkOpportunitySummaryApplicantType | None, opportunity_summary: OpportunitySummary | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=APPLICANT_TYPE) extra = transform_util.get_log_extra_applicant_type(source_applicant_type) logger.info("Processing applicant type", extra=extra) + if source_applicant_type.is_deleted: + self._handle_delete(source_applicant_type, target_applicant_type, APPLICANT_TYPE, extra) + # Historical records are linked to other historical records, however # we don't import historical opportunity records, so if the opportunity # was deleted, we won't have created the opportunity summary. Whenever we do # support historical opportunities, we'll have these all marked with a # flag that we can use to reprocess these. - if opportunity_summary is None and source_applicant_type.is_historical_table: + elif opportunity_summary is None and source_applicant_type.is_historical_table: logger.warning( "Historical applicant type does not have a corresponding opportunity summary - cannot import, but will mark as processed", extra=extra, ) - self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED) + self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED, prefix=APPLICANT_TYPE) source_applicant_type.transformation_notes = ORPHANED_HISTORICAL_RECORD elif opportunity_summary is None: @@ -539,15 +570,6 @@ def process_link_applicant_type( raise ValueError( "Applicant type record cannot be processed as the opportunity summary for it does not exist" ) - - elif source_applicant_type.is_deleted: - logger.info("Deleting applicant type", extra=extra) - - if target_applicant_type is None: - raise ValueError("Cannot delete applicant type as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_applicant_type) else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -566,18 +588,18 @@ def process_link_applicant_type( is_insert and transformed_applicant_type.applicant_type in opportunity_summary.applicant_types ): - self.increment(self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED) + self.increment(self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED, prefix=APPLICANT_TYPE) logger.warning( "Skipping applicant type record", extra=extra | {"applicant_type": transformed_applicant_type.applicant_type}, ) elif is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=APPLICANT_TYPE) # We append to the relationship so SQLAlchemy immediately attaches it to its cached # opportunity summary object so that the above check works when we receive dupes in the same batch opportunity_summary.link_applicant_types.append(transformed_applicant_type) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=APPLICANT_TYPE) self.db_session.merge(transformed_applicant_type) logger.info("Processed applicant type", extra=extra) @@ -663,7 +685,7 @@ def process_link_funding_categories_group( source_funding_category, target_funding_category, opportunity_summary ) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=FUNDING_CATEGORY) logger.exception( "Failed to process opportunity summary funding category", extra=transform_util.get_log_extra_funding_category(source_funding_category), @@ -675,21 +697,26 @@ def process_link_funding_category( target_funding_category: LinkOpportunitySummaryFundingCategory | None, opportunity_summary: OpportunitySummary | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=FUNDING_CATEGORY) extra = transform_util.get_log_extra_funding_category(source_funding_category) logger.info("Processing funding category", extra=extra) + if source_funding_category.is_deleted: + self._handle_delete( + source_funding_category, target_funding_category, FUNDING_CATEGORY, extra + ) + # Historical records are linked to other historical records, however # we don't import historical opportunity records, so if the opportunity # was deleted, we won't have created the opportunity summary. Whenever we do # support historical opportunities, we'll have these all marked with a # flag that we can use to reprocess these. - if opportunity_summary is None and source_funding_category.is_historical_table: + elif opportunity_summary is None and source_funding_category.is_historical_table: logger.warning( "Historical funding category does not have a corresponding opportunity summary - cannot import, but will mark as processed", extra=extra, ) - self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED) + self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED, prefix=FUNDING_CATEGORY) source_funding_category.transformation_notes = ORPHANED_HISTORICAL_RECORD elif opportunity_summary is None: @@ -698,15 +725,6 @@ def process_link_funding_category( raise ValueError( "Funding category record cannot be processed as the opportunity summary for it does not exist" ) - - elif source_funding_category.is_deleted: - logger.info("Deleting funding category", extra=extra) - - if target_funding_category is None: - raise ValueError("Cannot delete funding category as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_funding_category) else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -728,19 +746,21 @@ def process_link_funding_category( and transformed_funding_category.funding_category in opportunity_summary.funding_categories ): - self.increment(self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED) + self.increment( + self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED, prefix=FUNDING_CATEGORY + ) logger.warning( "Skipping funding category record", extra=extra | {"funding_category": transformed_funding_category.funding_category}, ) elif is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=FUNDING_CATEGORY) # We append to the relationship so SQLAlchemy immediately attaches it to its cached # opportunity summary object so that the above check works when we receive dupes in the same batch opportunity_summary.link_funding_categories.append(transformed_funding_category) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=FUNDING_CATEGORY) self.db_session.merge(transformed_funding_category) logger.info("Processed funding category", extra=extra) @@ -826,7 +846,7 @@ def process_link_funding_instruments_group( source_funding_instrument, target_funding_instrument, opportunity_summary ) except ValueError: - self.increment(self.Metrics.TOTAL_ERROR_COUNT) + self.increment(self.Metrics.TOTAL_ERROR_COUNT, prefix=FUNDING_INSTRUMENT) logger.exception( "Failed to process opportunity summary funding instrument", extra=transform_util.get_log_extra_funding_instrument( @@ -840,21 +860,26 @@ def process_link_funding_instrument( target_funding_instrument: LinkOpportunitySummaryFundingInstrument | None, opportunity_summary: OpportunitySummary | None, ) -> None: - self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED) + self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED, prefix=FUNDING_INSTRUMENT) extra = transform_util.get_log_extra_funding_instrument(source_funding_instrument) logger.info("Processing funding instrument", extra=extra) + if source_funding_instrument.is_deleted: + self._handle_delete( + source_funding_instrument, target_funding_instrument, FUNDING_INSTRUMENT, extra + ) + # Historical records are linked to other historical records, however # we don't import historical opportunity records, so if the opportunity # was deleted, we won't have created the opportunity summary. Whenever we do # support historical opportunities, we'll have these all marked with a # flag that we can use to reprocess these. - if opportunity_summary is None and source_funding_instrument.is_historical_table: + elif opportunity_summary is None and source_funding_instrument.is_historical_table: logger.warning( "Historical funding instrument does not have a corresponding opportunity summary - cannot import, but will mark as processed", extra=extra, ) - self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED) + self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED, prefix=FUNDING_INSTRUMENT) source_funding_instrument.transformation_notes = ORPHANED_HISTORICAL_RECORD elif opportunity_summary is None: @@ -864,14 +889,6 @@ def process_link_funding_instrument( "Funding instrument record cannot be processed as the opportunity summary for it does not exist" ) - elif source_funding_instrument.is_deleted: - logger.info("Deleting funding instrument", extra=extra) - - if target_funding_instrument is None: - raise ValueError("Cannot delete funding instrument as it does not exist") - - self.increment(self.Metrics.TOTAL_RECORDS_DELETED) - self.db_session.delete(target_funding_instrument) else: # To avoid incrementing metrics for records we fail to transform, record # here whether it's an insert/update and we'll increment after transforming @@ -893,19 +910,21 @@ def process_link_funding_instrument( and transformed_funding_instrument.funding_instrument in opportunity_summary.funding_instruments ): - self.increment(self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED) + self.increment( + self.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED, prefix=FUNDING_INSTRUMENT + ) logger.warning( "Skipping funding instrument record", extra=extra | {"funding_instrument": transformed_funding_instrument.funding_instrument}, ) elif is_insert: - self.increment(self.Metrics.TOTAL_RECORDS_INSERTED) + self.increment(self.Metrics.TOTAL_RECORDS_INSERTED, prefix=FUNDING_INSTRUMENT) # We append to the relationship so SQLAlchemy immediately attaches it to its cached # opportunity summary object so that the above check works when we receive dupes in the same batch opportunity_summary.link_funding_instruments.append(transformed_funding_instrument) else: - self.increment(self.Metrics.TOTAL_RECORDS_UPDATED) + self.increment(self.Metrics.TOTAL_RECORDS_UPDATED, prefix=FUNDING_INSTRUMENT) self.db_session.merge(transformed_funding_instrument) logger.info("Processed funding instrument", extra=extra) diff --git a/api/src/task/task.py b/api/src/task/task.py index 8c29744bc..f1619d0ac 100644 --- a/api/src/task/task.py +++ b/api/src/task/task.py @@ -63,6 +63,10 @@ def increment(self, name: str, value: int = 1, prefix: str | None = None) -> Non self.metrics[name] += value + if prefix is not None: + # Rather than re-implement the above, just re-use the function without a prefix + self.increment(f"{prefix}.{name}", value, prefix=None) + def cls_name(self) -> str: return self.__class__.__name__ diff --git a/api/tests/src/data_migration/transformation/test_transform_oracle_data_task.py b/api/tests/src/data_migration/transformation/test_transform_oracle_data_task.py index 20ee40c11..f4d07bfdd 100644 --- a/api/tests/src/data_migration/transformation/test_transform_oracle_data_task.py +++ b/api/tests/src/data_migration/transformation/test_transform_oracle_data_task.py @@ -727,7 +727,9 @@ def test_process_opportunity_delete_but_current_missing( # Verify an error is raised when we try to delete something that doesn't exist delete_but_current_missing = setup_opportunity(create_existing=False, is_delete=True) - with pytest.raises(ValueError, match="Cannot delete opportunity as it does not exist"): + with pytest.raises( + ValueError, match="Cannot delete opportunity record as it does not exist" + ): transform_oracle_data_task.process_opportunity(delete_but_current_missing, None) validate_opportunity(db_session, delete_but_current_missing, expect_in_db=False) @@ -823,16 +825,16 @@ def test_process_opportunity_assistance_listings(self, db_session, transform_ora assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 3 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_ORPHANED] == 2 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 - # Rerunning just attempts to re-process the error record, nothing else gets picked up + # Rerunning finds nothing - no metrics update transform_oracle_data_task.process_assistance_listings() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 11 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 10 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 3 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_ORPHANED] == 2 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 def test_process_assistance_listing_orphaned_record( self, db_session, transform_oracle_data_task @@ -880,14 +882,13 @@ def test_process_assistance_listing_delete_but_current_missing( create_existing=False, is_delete=True, opportunity=opportunity ) - with pytest.raises( - ValueError, match="Cannot delete assistance listing as it does not exist" - ): - transform_oracle_data_task.process_assistance_listing( - delete_but_current_missing, None, opportunity - ) + transform_oracle_data_task.process_assistance_listing( + delete_but_current_missing, None, opportunity + ) validate_assistance_listing(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" class TestTransformOpportunitySummary(BaseTestClass): @@ -1044,15 +1045,17 @@ def test_process_opportunity_summaries(self, db_session, transform_oracle_data_t assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 5 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 4 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 3 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 - # Rerunning will only attempt to re-process the errors, so total+errors goes up by 4 + # Rerunning will only attempt to re-process the errors, so total+errors goes up by 3 transform_oracle_data_task.process_opportunity_summaries() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 22 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 21 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 5 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 8 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 6 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 @pytest.mark.parametrize( "is_forecast,revision_number", [(True, None), (False, None), (True, 5), (False, 10)] @@ -1071,12 +1074,13 @@ def test_process_opportunity_summary_delete_but_current_missing( opportunity=opportunity, ) - with pytest.raises( - ValueError, match="Cannot delete opportunity summary as it does not exist" - ): - transform_oracle_data_task.process_opportunity_summary( - delete_but_current_missing, None, opportunity - ) + transform_oracle_data_task.process_opportunity_summary( + delete_but_current_missing, None, opportunity + ) + + validate_opportunity_summary(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" @pytest.mark.parametrize( "is_forecast,revision_number,source_values,expected_error", @@ -1426,7 +1430,7 @@ def test_process_applicant_types(self, db_session, transform_oracle_data_task): ) validate_applicant_type( - db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=False + db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=True ) validate_applicant_type( db_session, syn_hist_insert_invalid_type, expect_in_db=False, was_processed=False @@ -1441,17 +1445,19 @@ def test_process_applicant_types(self, db_session, transform_oracle_data_task): assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 8 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 1 assert metrics[transform_oracle_data_task.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 - # Rerunning will only attempt to re-process the errors, so total+errors goes up by 2 + # Rerunning will only attempt to re-process the errors, so total+errors goes up by 1 transform_oracle_data_task.process_link_applicant_types() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 25 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 24 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 8 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 4 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 assert metrics[transform_oracle_data_task.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 @pytest.mark.parametrize( "is_forecast,revision_number", [(True, None), (False, None), (True, 5), (False, 10)] @@ -1469,10 +1475,13 @@ def test_process_applicant_types_but_current_missing( is_delete=True, ) - with pytest.raises(ValueError, match="Cannot delete applicant type as it does not exist"): - transform_oracle_data_task.process_link_applicant_type( - delete_but_current_missing, None, opportunity_summary - ) + transform_oracle_data_task.process_link_applicant_type( + delete_but_current_missing, None, opportunity_summary + ) + + validate_applicant_type(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" @pytest.mark.parametrize( "is_forecast,revision_number,legacy_lookup_value", @@ -1714,7 +1723,7 @@ def test_process_funding_instruments(self, db_session, transform_oracle_data_tas ) validate_funding_instrument( - db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=False + db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=True ) validate_funding_instrument( db_session, syn_hist_insert_invalid_type, expect_in_db=False, was_processed=False @@ -1725,15 +1734,17 @@ def test_process_funding_instruments(self, db_session, transform_oracle_data_tas assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 2 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 # Rerunning will only attempt to re-process the errors, so total+errors goes up by 2 transform_oracle_data_task.process_link_funding_instruments() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 16 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 15 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 5 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 2 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 4 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 @pytest.mark.parametrize( "is_forecast,revision_number", [(True, None), (False, None), (True, 1), (False, 4)] @@ -1751,12 +1762,13 @@ def test_process_funding_instrument_but_current_missing( is_delete=True, ) - with pytest.raises( - ValueError, match="Cannot delete funding instrument as it does not exist" - ): - transform_oracle_data_task.process_link_funding_instrument( - delete_but_current_missing, None, opportunity_summary - ) + transform_oracle_data_task.process_link_funding_instrument( + delete_but_current_missing, None, opportunity_summary + ) + + validate_funding_instrument(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" @pytest.mark.parametrize( "is_forecast,revision_number,legacy_lookup_value", @@ -2073,7 +2085,7 @@ def test_process_funding_categories(self, db_session, transform_oracle_data_task ) validate_funding_category( - db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=False + db_session, syn_delete_but_current_missing, expect_in_db=False, was_processed=True ) validate_funding_category( db_session, syn_hist_insert_invalid_type, expect_in_db=False, was_processed=False @@ -2084,15 +2096,17 @@ def test_process_funding_categories(self, db_session, transform_oracle_data_task assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 9 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 4 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 1 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 - # Rerunning will only attempt to re-process the errors, so total+errors goes up by 2 + # Rerunning will only attempt to re-process the errors, so total+errors goes up by 1 transform_oracle_data_task.process_link_funding_categories() - assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 24 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED] == 23 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED] == 7 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED] == 9 assert metrics[transform_oracle_data_task.Metrics.TOTAL_RECORDS_UPDATED] == 4 - assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 4 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT] == 2 + assert metrics[transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1 @pytest.mark.parametrize( "is_forecast,revision_number", [(True, None), (False, None), (True, 1), (False, 70)] @@ -2110,10 +2124,13 @@ def test_process_funding_category_but_current_missing( is_delete=True, ) - with pytest.raises(ValueError, match="Cannot delete funding category as it does not exist"): - transform_oracle_data_task.process_link_funding_category( - delete_but_current_missing, None, opportunity_summary - ) + transform_oracle_data_task.process_link_funding_category( + delete_but_current_missing, None, opportunity_summary + ) + + validate_funding_category(db_session, delete_but_current_missing, expect_in_db=False) + assert delete_but_current_missing.transformed_at is not None + assert delete_but_current_missing.transformation_notes == "orphaned_delete_record" @pytest.mark.parametrize( "is_forecast,revision_number,legacy_lookup_value", @@ -2581,6 +2598,7 @@ def test_mix_of_inserts_updates_deletes(self, db_session, transform_oracle_data_ ) validate_summary_and_nested(db_session, synopsis_hist_insert, [], [], []) + print(transform_oracle_data_task.metrics) assert { transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED: 41, transform_oracle_data_task.Metrics.TOTAL_RECORDS_INSERTED: 8, @@ -2588,5 +2606,170 @@ def test_mix_of_inserts_updates_deletes(self, db_session, transform_oracle_data_ transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED: 8, transform_oracle_data_task.Metrics.TOTAL_DUPLICATE_RECORDS_SKIPPED: 15, transform_oracle_data_task.Metrics.TOTAL_RECORDS_ORPHANED: 0, - transform_oracle_data_task.Metrics.TOTAL_ERROR_COUNT: 1, + transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED: 1, + }.items() <= transform_oracle_data_task.metrics.items() + + def test_delete_opportunity_with_deleted_children(self, db_session, transform_oracle_data_task): + # We create an opportunity with a synopsis/forecast record, and various other child values + # We then delete all of them at once. Deleting the opportunity will recursively delete the others + # but we'll still have delete events for the others - this verfies how we handle that. + existing_opportunity = f.OpportunityFactory( + no_current_summary=True, opportunity_assistance_listings=[] + ) + opportunity = f.StagingTopportunityFactory( + opportunity_id=existing_opportunity.opportunity_id, cfdas=[], is_deleted=True + ) + + cfda = setup_cfda(create_existing=True, is_delete=True, opportunity=existing_opportunity) + + ### Forecast - has several children that will be deleted + summary_forecast = f.OpportunitySummaryFactory( + is_forecast=True, opportunity=existing_opportunity, no_link_values=True + ) + forecast = f.StagingTforecastFactory(opportunity=opportunity, is_deleted=True) + forecast_applicant_type = f.StagingTapplicanttypesForecastFactory( + forecast=forecast, at_id="04", at_frcst_id=91001, is_deleted=True + ) + f.LinkOpportunitySummaryApplicantTypeFactory( + opportunity_summary=summary_forecast, + applicant_type=ApplicantType.SPECIAL_DISTRICT_GOVERNMENTS, + legacy_applicant_type_id=91001, + ) + forecast_funding_category = f.StagingTfundactcatForecastFactory( + forecast=forecast, fac_id="ST", fac_frcst_id=92001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingCategoryFactory( + opportunity_summary=summary_forecast, + funding_category=FundingCategory.SCIENCE_TECHNOLOGY_AND_OTHER_RESEARCH_AND_DEVELOPMENT, + legacy_funding_category_id=92001, + ) + forecast_funding_instrument = f.StagingTfundinstrForecastFactory( + forecast=forecast, fi_id="O", fi_frcst_id=93001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingInstrumentFactory( + opportunity_summary=summary_forecast, + funding_instrument=FundingInstrument.OTHER, + legacy_funding_instrument_id=93001, + ) + + ### Synopsis + summary_synopsis = f.OpportunitySummaryFactory( + is_forecast=False, opportunity=existing_opportunity, no_link_values=True + ) + synopsis = f.StagingTsynopsisFactory(opportunity=opportunity, is_deleted=True) + synopsis_applicant_type = f.StagingTapplicanttypesSynopsisFactory( + synopsis=synopsis, at_id="21", at_syn_id=81001, is_deleted=True + ) + f.LinkOpportunitySummaryApplicantTypeFactory( + opportunity_summary=summary_synopsis, + applicant_type=ApplicantType.INDIVIDUALS, + legacy_applicant_type_id=81001, + ) + synopsis_funding_category = f.StagingTfundactcatSynopsisFactory( + synopsis=synopsis, fac_id="HL", fac_syn_id=82001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingCategoryFactory( + opportunity_summary=summary_synopsis, + funding_category=FundingCategory.HEALTH, + legacy_funding_category_id=82001, + ) + synopsis_funding_instrument = f.StagingTfundinstrSynopsisFactory( + synopsis=synopsis, fi_id="G", fi_syn_id=83001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingInstrumentFactory( + opportunity_summary=summary_synopsis, + funding_instrument=FundingInstrument.GRANT, + legacy_funding_instrument_id=83001, + ) + # Need to put an expire all so SQLAlchemy doesn't read from its cache + # otherwise when it does the recursive deletes, it doesn't see the later-added link table objects + db_session.expire_all() + + transform_oracle_data_task.run_task() + print(transform_oracle_data_task.metrics) + + # verify everything is not in the DB + validate_opportunity(db_session, opportunity, expect_in_db=False) + validate_assistance_listing(db_session, cfda, expect_in_db=False) + validate_opportunity_summary(db_session, forecast, expect_in_db=False) + validate_opportunity_summary(db_session, synopsis, expect_in_db=False) + + validate_applicant_type(db_session, forecast_applicant_type, expect_in_db=False) + validate_applicant_type(db_session, synopsis_applicant_type, expect_in_db=False) + + validate_funding_category(db_session, forecast_funding_category, expect_in_db=False) + validate_funding_category(db_session, synopsis_funding_category, expect_in_db=False) + + validate_funding_instrument(db_session, forecast_funding_instrument, expect_in_db=False) + validate_funding_instrument(db_session, synopsis_funding_instrument, expect_in_db=False) + + assert { + transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED: 10, + # Despite processing 10 records, only the opportunity is actually deleted directly + transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED: 1, + f"opportunity.{transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED}": 1, + transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED: 9, + }.items() <= transform_oracle_data_task.metrics.items() + + def test_delete_opportunity_summary_with_deleted_children( + self, db_session, transform_oracle_data_task + ): + # Similar to the above test, but we're leaving the opportunity alone and just deleting + # an opportunity summary. Should be the same thing, just on a smaller scale. + existing_opportunity = f.OpportunityFactory( + no_current_summary=True, opportunity_assistance_listings=[] + ) + opportunity = f.StagingTopportunityFactory( + opportunity_id=existing_opportunity.opportunity_id, cfdas=[], already_transformed=True + ) + + summary_synopsis = f.OpportunitySummaryFactory( + is_forecast=False, opportunity=existing_opportunity, no_link_values=True + ) + synopsis = f.StagingTsynopsisFactory(opportunity=opportunity, is_deleted=True) + synopsis_applicant_type = f.StagingTapplicanttypesSynopsisFactory( + synopsis=synopsis, at_id="21", at_syn_id=71001, is_deleted=True + ) + f.LinkOpportunitySummaryApplicantTypeFactory( + opportunity_summary=summary_synopsis, + applicant_type=ApplicantType.INDIVIDUALS, + legacy_applicant_type_id=71001, + ) + synopsis_funding_category = f.StagingTfundactcatSynopsisFactory( + synopsis=synopsis, fac_id="HL", fac_syn_id=72001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingCategoryFactory( + opportunity_summary=summary_synopsis, + funding_category=FundingCategory.HEALTH, + legacy_funding_category_id=72001, + ) + synopsis_funding_instrument = f.StagingTfundinstrSynopsisFactory( + synopsis=synopsis, fi_id="G", fi_syn_id=73001, is_deleted=True + ) + f.LinkOpportunitySummaryFundingInstrumentFactory( + opportunity_summary=summary_synopsis, + funding_instrument=FundingInstrument.GRANT, + legacy_funding_instrument_id=73001, + ) + # Need to put an expire all so SQLAlchemy doesn't read from its cache + # otherwise when it does the recursive deletes, it doesn't see the later-added link table objects + db_session.expire_all() + + transform_oracle_data_task.run_task() + + # verify everything is not in the DB + validate_opportunity( + db_session, opportunity, expect_in_db=True, expect_values_to_match=False + ) + validate_opportunity_summary(db_session, synopsis, expect_in_db=False) + validate_applicant_type(db_session, synopsis_applicant_type, expect_in_db=False) + validate_funding_category(db_session, synopsis_funding_category, expect_in_db=False) + validate_funding_instrument(db_session, synopsis_funding_instrument, expect_in_db=False) + + assert { + transform_oracle_data_task.Metrics.TOTAL_RECORDS_PROCESSED: 4, + # Despite processing 4 records, only the opportunity_summary is actually deleted directly + transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED: 1, + f"opportunity_summary.{transform_oracle_data_task.Metrics.TOTAL_RECORDS_DELETED}": 1, + transform_oracle_data_task.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED: 3, }.items() <= transform_oracle_data_task.metrics.items()