From ed7a88af1482c205f937bfcb66719ea358fc056b Mon Sep 17 00:00:00 2001 From: Argus Chiu Date: Thu, 5 Dec 2024 13:07:48 -0800 Subject: [PATCH] 24478 Add corp processing logic to tombstone pipeline (#3114) * 24478 Add corp processing logic to tombstone pipeline * Tweak logic for marking corp processing as completed or error --- data-tool/flows/common/auth_service.py | 2 - .../common/corp_processing_queue_service.py | 173 ++++++++++++++++++ data-tool/flows/corps_tombstone_flow.py | 75 ++++++-- .../flows/tombstone/tombstone_queries.py | 6 +- .../scripts/colin_corps_extract_postgres_ddl | 2 + 5 files changed, 238 insertions(+), 20 deletions(-) create mode 100644 data-tool/flows/common/corp_processing_queue_service.py diff --git a/data-tool/flows/common/auth_service.py b/data-tool/flows/common/auth_service.py index 24b3f9f53c..5fb5c9d357 100644 --- a/data-tool/flows/common/auth_service.py +++ b/data-tool/flows/common/auth_service.py @@ -94,7 +94,6 @@ def create_affiliation(cls, timeout=cls.get_time_out(config) ) - # @TODO delete affiliation and entity record next sprint when affiliation service is updated if affiliate.status_code != HTTPStatus.CREATED or entity_record.status_code != HTTPStatus.CREATED: return HTTPStatus.BAD_REQUEST return HTTPStatus.OK @@ -175,7 +174,6 @@ def update_entity(cls, def delete_affiliation(cls, config, account: int, business_registration: str) -> Dict: """Affiliate a business to an account. - @TODO Update this when account affiliation is changed next sprint. """ auth_url = config.AUTH_SVC_URL account_svc_entity_url = f'{auth_url}/entities' diff --git a/data-tool/flows/common/corp_processing_queue_service.py b/data-tool/flows/common/corp_processing_queue_service.py new file mode 100644 index 0000000000..eecd621b00 --- /dev/null +++ b/data-tool/flows/common/corp_processing_queue_service.py @@ -0,0 +1,173 @@ +from enum import Enum +from typing import List +from sqlalchemy import text +import logging + +class ProcessingStatuses(str, Enum): + PENDING = 'PENDING' + PROCESSING = 'PROCESSING' + COMPLETED = 'COMPLETED' + FAILED = 'FAILED' + +class CorpProcessingQueueService: + def __init__(self, environment: str, db_engine, flow_name: str): + self.data_load_env = environment + self.db_engine = db_engine + self.flow_name = flow_name + self.logger = logging.getLogger(__name__) + + def reserve_for_flow(self, base_query: str, flow_run_id: str) -> int: + """Reserve corporations for processing in a specific flow run. + + Args: + base_query: SQL query that selects corps to be processed + flow_run_id: Unique identifier for this flow run + + Returns: + Number of corporations successfully reserved for this flow + """ + init_query = f""" + WITH candidate_corps AS ({base_query}), + available_corps AS ( + SELECT corp_num, corp_type_cd + FROM candidate_corps + FOR UPDATE SKIP LOCKED + ) + INSERT INTO corp_processing ( + corp_num, + corp_type_cd, + flow_name, + processed_status, + environment, + flow_run_id, + create_date, + last_modified, + claimed_at + ) + SELECT + corp_num, + corp_type_cd, + :flow_name, + :status, + :environment, + :flow_run_id, + NOW(), + NOW(), + NULL + FROM available_corps + ON CONFLICT (corp_num, flow_name, environment) + DO NOTHING + RETURNING corp_num + """ + + with self.db_engine.connect() as conn: + with conn.begin(): + result = conn.execute( + text(init_query), + { + 'flow_name': self.flow_name, + 'status': ProcessingStatuses.PENDING, + 'environment': self.data_load_env, + 'flow_run_id': flow_run_id + } + ) + count = len(result.fetchall()) + self.logger.info(f"Initialized {count} corps for flow run {flow_run_id}") + return count + + def claim_batch(self, flow_run_id: str, batch_size: int) -> List[str]: + """Claim a batch of corporations for immediate processing within a flow run. + + Args: + flow_run_id: Unique identifier for this flow run + batch_size: Maximum number of corps to claim + + Returns: + List of corporation numbers that were successfully claimed for processing + """ + query = """ + WITH claimable AS ( + SELECT corp_num + FROM corp_processing + WHERE processed_status = :pending_status + AND environment = :environment + AND flow_name = :flow_name + AND flow_run_id = :flow_run_id + AND claimed_at IS NULL + LIMIT :batch_size + FOR UPDATE SKIP LOCKED + ) + UPDATE corp_processing + SET processed_status = :processing_status, + claimed_at = NOW(), + last_modified = NOW() + FROM claimable + WHERE corp_processing.corp_num = claimable.corp_num + RETURNING corp_processing.corp_num, corp_processing.claimed_at + """ + + with self.db_engine.connect() as conn: + with conn.begin(): + result = conn.execute( + text(query), + { + 'pending_status': ProcessingStatuses.PENDING, + 'processing_status': ProcessingStatuses.PROCESSING, + 'environment': self.data_load_env, + 'flow_name': self.flow_name, + 'flow_run_id': flow_run_id, + 'batch_size': batch_size + } + ) + claimed = result.fetchall() + claimed_corps = [row[0] for row in claimed] + if claimed_corps: + self.logger.info(f"Claimed {len(claimed_corps)} corps for flow {flow_run_id}") + self.logger.info(f"Corps: {', '.join(claimed_corps[:5])}...") + for corp, claimed_at in claimed: + self.logger.debug(f" {corp} claimed at {claimed_at}") + return claimed_corps + + def update_corp_status( + self, + flow_run_id: str, + corp_num: str, + status: ProcessingStatuses, + error: str = None + ) -> bool: + """Update status for a corp.""" + query = """ + UPDATE corp_processing + SET processed_status = :status, + last_modified = NOW(), + last_error = CASE + WHEN :error IS NOT NULL THEN :error + ELSE last_error + END + WHERE corp_num = :corp_num + AND flow_run_id = :flow_run_id + AND environment = :environment + AND flow_name = :flow_name + RETURNING corp_num + """ + + with self.db_engine.connect() as conn: + with conn.begin(): + result = conn.execute( + text(query), + { + 'status': status, + 'error': error, + 'corp_num': corp_num, + 'flow_run_id': flow_run_id, + 'environment': self.data_load_env, + 'flow_name': self.flow_name + } + ) + success = result.rowcount > 0 + if not success: + self.logger.warning( + f"Failed to update {corp_num} to {status} " + f"(flow_run_id={flow_run_id})" + ) + return success diff --git a/data-tool/flows/corps_tombstone_flow.py b/data-tool/flows/corps_tombstone_flow.py index c44aa758fb..a5fab2d7a3 100644 --- a/data-tool/flows/corps_tombstone_flow.py +++ b/data-tool/flows/corps_tombstone_flow.py @@ -1,13 +1,16 @@ import math -from datetime import datetime +from datetime import datetime, timedelta from common.init_utils import colin_init, get_config, lear_init from common.query_utils import convert_result_set_to_dict from common.auth_service import AuthService -from prefect import flow, task +from prefect import flow, task, serve from prefect.futures import wait +from prefect.context import get_run_context from sqlalchemy import Connection, text from sqlalchemy.engine import Engine + +from common.corp_processing_queue_service import CorpProcessingQueueService as CorpProcessingService, ProcessingStatuses from tombstone.tombstone_queries import (get_corp_snapshot_filings_queries, get_corp_users_query, get_total_unprocessed_count_query, @@ -20,20 +23,21 @@ @task -def get_unprocessed_corps(config, colin_engine: Engine) -> list: - """Get unprocessed corp numbers.""" - query = get_unprocessed_corps_query( - 'local', +def reserve_unprocessed_corps(config, processing_service, flow_run_id, num_corps) -> list: + """Reserve corps for a given flow run. + + Note that this is not same as claiming them for processing which will be done in some subsequent steps. This step + is done to avoid parallel flows from trying to compete for the same corps. + """ + base_query = get_unprocessed_corps_query( + 'tombstone-flow', config.DATA_LOAD_ENV, - config.TOMBSTONE_BATCH_SIZE + num_corps # Pass the total number we want to process ) - sql_text = text(query) - with colin_engine.connect() as conn: - rs = conn.execute(sql_text) - raw_data_dict = convert_result_set_to_dict(rs) - corp_nums = [x.get('corp_num') for x in raw_data_dict] - return corp_nums + # reserve corps + reserved = processing_service.reserve_for_flow(base_query, flow_run_id) + return reserved @task @@ -200,8 +204,8 @@ def load_placeholder_filings(conn: Connection, tombstone_data: dict, business_id @task(name='3.3-Update-Auth-Task') def update_auth(conn: Connection, config, corp_num: str, tombstone_data: dict): """Create auth entity and affiliate as required.""" - # TODO affiliation to an account does not need to happen. only entity creation in auth is req'd. - # used for testing purposes to see how things look in entity dashboard - remove when done testing + # Note: affiliation to an account does not need to happen. only entity creation in auth is req'd. + # used for testing purposes to see how things look in entity dashboard if config.AFFILIATE_ENTITY: business_data = tombstone_data['businesses'] account_id = config.AFFILIATE_ENTITY_ACCOUNT_ID @@ -287,6 +291,8 @@ def tombstone_flow(): config = get_config() colin_engine = colin_init(config) lear_engine = lear_init(config) + flow_run_id = get_run_context().flow_run.id + processing_service = CorpProcessingService(config.DATA_LOAD_ENV, colin_engine, 'tombstone-flow') total = get_unprocessed_count(config, colin_engine) @@ -297,12 +303,21 @@ def tombstone_flow(): batch_size = config.TOMBSTONE_BATCH_SIZE batches = min(math.ceil(total/batch_size), config.TOMBSTONE_BATCHES) + # Calculate max corps to initialize + max_corps = min(total, config.TOMBSTONE_BATCHES * config.TOMBSTONE_BATCH_SIZE) + print(f'max_corps: {max_corps}') + reserved_corps = reserve_unprocessed_corps(config, processing_service, flow_run_id, max_corps) + print(f'👷 Reserved {reserved_corps} corps for processing') print(f'👷 Going to migrate {total} corps with batch size of {batch_size}') cnt = 0 migrated_cnt = 0 while cnt < batches: - corp_nums = get_unprocessed_corps(config, colin_engine) + # Claim next batch of reserved corps for current flow + corp_nums = processing_service.claim_batch(flow_run_id, batch_size) + if not corp_nums: + print("No more corps available to claim") + break print(f'👷 Start processing {len(corp_nums)} corps: {", ".join(corp_nums[:5])}...') @@ -332,6 +347,24 @@ def tombstone_flow(): print(f'❗ Skip migrating {corp_num} due to data collection error.') wait(corp_futures) + + for f in corp_futures: + corp_num = f.result() + if corp_num: + processing_service.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.COMPLETED + ) + else: + # Handle error case if needed + processing_service.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + error="Migration failed" + ) + succeeded = sum(1 for f in corp_futures if f.state.is_completed()) failed = len(corp_futures) - succeeded print(f'🌟 Complete round {cnt}. Succeeded: {succeeded}. Failed: {failed}. Skip: {skipped}') @@ -347,3 +380,13 @@ def tombstone_flow(): if __name__ == "__main__": tombstone_flow() + + # # Create deployment - only intended to test locally for parallel flows + # deployment = tombstone_flow.to_deployment( + # name="tombstone-deployment", + # interval=timedelta(seconds=8), # Run every x seconds + # tags=["tombstone-migration"] + # ) + # + # # Start serving the deployment + # serve(deployment) diff --git a/data-tool/flows/tombstone/tombstone_queries.py b/data-tool/flows/tombstone/tombstone_queries.py index 8448bb041a..c6d671ad81 100644 --- a/data-tool/flows/tombstone/tombstone_queries.py +++ b/data-tool/flows/tombstone/tombstone_queries.py @@ -20,14 +20,16 @@ def get_unprocessed_corps_query(flow_name, environment, batch_size): -- and c.corp_num = 'BC0043406' -- lots of directors -- and c.corp_num in ('BC0326163', 'BC0395512', 'BC0883637') -- TODO: re-migrate issue (can be solved by adding tracking) -- and c.corp_num = 'BC0870626' -- lots of filings - IA, CoDs, ARs --- and c.corp_num = 'BC0004969' -- lots of filings - IA, ARs, transition, alteration, COD, COA +-- and c.corp_num = 'BC0004969' -- lots of filings - IA, ARs, transition, alteration, COD, COA -- and c.corp_num = 'BC0002567' -- lots of filings - IA, ARs, transition, COD -- and c.corp_num in ('BC0068889', 'BC0441359') -- test users mapping -- and c.corp_num in ('BC0326163', 'BC0046540', 'BC0883637', 'BC0043406', 'BC0068889', 'BC0441359') -- and c.corp_num in ('BC0472301', 'BC0649417', 'BC0808085', 'BC0803411', 'BC0756111', 'BC0511226', 'BC0833000', 'BC0343855', 'BC0149266') -- dissolution and c.corp_type_cd in ('BC', 'C', 'ULC', 'CUL', 'CC', 'CCC', 'QA', 'QB', 'QC', 'QD', 'QE') -- TODO: update transfer script and cs.end_event_id is null - and ((cp.processed_status is null or cp.processed_status != 'COMPLETED')) +-- and ((cp.processed_status is null or cp.processed_status != 'COMPLETED')) + and cp.processed_status is null + and cp.flow_run_id is null -- and cs.state_type_cd = 'ACT' -- order by random() limit {batch_size} diff --git a/data-tool/scripts/colin_corps_extract_postgres_ddl b/data-tool/scripts/colin_corps_extract_postgres_ddl index f2578febe4..16938f1c02 100644 --- a/data-tool/scripts/colin_corps_extract_postgres_ddl +++ b/data-tool/scripts/colin_corps_extract_postgres_ddl @@ -629,6 +629,8 @@ create table if not exists corp_processing create_date timestamp with time zone, last_modified timestamp with time zone, last_error varchar(1000), + claimed_at timestamp with time zone, + flow_run_id uuid, constraint unq_corp_processing unique (corp_num, flow_name, environment) );