Skip to content

Commit

Permalink
24478 Add corp processing logic to tombstone pipeline (bcgov#3114)
Browse files Browse the repository at this point in the history
* 24478 Add corp processing logic to tombstone pipeline

* Tweak logic for marking corp processing as completed or error
  • Loading branch information
argush3 authored Dec 5, 2024
1 parent 50e41da commit ed7a88a
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 20 deletions.
2 changes: 0 additions & 2 deletions data-tool/flows/common/auth_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ def create_affiliation(cls,
timeout=cls.get_time_out(config)
)

# @TODO delete affiliation and entity record next sprint when affiliation service is updated
if affiliate.status_code != HTTPStatus.CREATED or entity_record.status_code != HTTPStatus.CREATED:
return HTTPStatus.BAD_REQUEST
return HTTPStatus.OK
Expand Down Expand Up @@ -175,7 +174,6 @@ def update_entity(cls,
def delete_affiliation(cls, config, account: int, business_registration: str) -> Dict:
"""Affiliate a business to an account.
@TODO Update this when account affiliation is changed next sprint.
"""
auth_url = config.AUTH_SVC_URL
account_svc_entity_url = f'{auth_url}/entities'
Expand Down
173 changes: 173 additions & 0 deletions data-tool/flows/common/corp_processing_queue_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from enum import Enum
from typing import List
from sqlalchemy import text
import logging

class ProcessingStatuses(str, Enum):
PENDING = 'PENDING'
PROCESSING = 'PROCESSING'
COMPLETED = 'COMPLETED'
FAILED = 'FAILED'

class CorpProcessingQueueService:
def __init__(self, environment: str, db_engine, flow_name: str):
self.data_load_env = environment
self.db_engine = db_engine
self.flow_name = flow_name
self.logger = logging.getLogger(__name__)

def reserve_for_flow(self, base_query: str, flow_run_id: str) -> int:
"""Reserve corporations for processing in a specific flow run.
Args:
base_query: SQL query that selects corps to be processed
flow_run_id: Unique identifier for this flow run
Returns:
Number of corporations successfully reserved for this flow
"""
init_query = f"""
WITH candidate_corps AS ({base_query}),
available_corps AS (
SELECT corp_num, corp_type_cd
FROM candidate_corps
FOR UPDATE SKIP LOCKED
)
INSERT INTO corp_processing (
corp_num,
corp_type_cd,
flow_name,
processed_status,
environment,
flow_run_id,
create_date,
last_modified,
claimed_at
)
SELECT
corp_num,
corp_type_cd,
:flow_name,
:status,
:environment,
:flow_run_id,
NOW(),
NOW(),
NULL
FROM available_corps
ON CONFLICT (corp_num, flow_name, environment)
DO NOTHING
RETURNING corp_num
"""

with self.db_engine.connect() as conn:
with conn.begin():
result = conn.execute(
text(init_query),
{
'flow_name': self.flow_name,
'status': ProcessingStatuses.PENDING,
'environment': self.data_load_env,
'flow_run_id': flow_run_id
}
)
count = len(result.fetchall())
self.logger.info(f"Initialized {count} corps for flow run {flow_run_id}")
return count

def claim_batch(self, flow_run_id: str, batch_size: int) -> List[str]:
"""Claim a batch of corporations for immediate processing within a flow run.
Args:
flow_run_id: Unique identifier for this flow run
batch_size: Maximum number of corps to claim
Returns:
List of corporation numbers that were successfully claimed for processing
"""
query = """
WITH claimable AS (
SELECT corp_num
FROM corp_processing
WHERE processed_status = :pending_status
AND environment = :environment
AND flow_name = :flow_name
AND flow_run_id = :flow_run_id
AND claimed_at IS NULL
LIMIT :batch_size
FOR UPDATE SKIP LOCKED
)
UPDATE corp_processing
SET processed_status = :processing_status,
claimed_at = NOW(),
last_modified = NOW()
FROM claimable
WHERE corp_processing.corp_num = claimable.corp_num
RETURNING corp_processing.corp_num, corp_processing.claimed_at
"""

with self.db_engine.connect() as conn:
with conn.begin():
result = conn.execute(
text(query),
{
'pending_status': ProcessingStatuses.PENDING,
'processing_status': ProcessingStatuses.PROCESSING,
'environment': self.data_load_env,
'flow_name': self.flow_name,
'flow_run_id': flow_run_id,
'batch_size': batch_size
}
)
claimed = result.fetchall()
claimed_corps = [row[0] for row in claimed]
if claimed_corps:
self.logger.info(f"Claimed {len(claimed_corps)} corps for flow {flow_run_id}")
self.logger.info(f"Corps: {', '.join(claimed_corps[:5])}...")
for corp, claimed_at in claimed:
self.logger.debug(f" {corp} claimed at {claimed_at}")
return claimed_corps

def update_corp_status(
self,
flow_run_id: str,
corp_num: str,
status: ProcessingStatuses,
error: str = None
) -> bool:
"""Update status for a corp."""
query = """
UPDATE corp_processing
SET processed_status = :status,
last_modified = NOW(),
last_error = CASE
WHEN :error IS NOT NULL THEN :error
ELSE last_error
END
WHERE corp_num = :corp_num
AND flow_run_id = :flow_run_id
AND environment = :environment
AND flow_name = :flow_name
RETURNING corp_num
"""

with self.db_engine.connect() as conn:
with conn.begin():
result = conn.execute(
text(query),
{
'status': status,
'error': error,
'corp_num': corp_num,
'flow_run_id': flow_run_id,
'environment': self.data_load_env,
'flow_name': self.flow_name
}
)
success = result.rowcount > 0
if not success:
self.logger.warning(
f"Failed to update {corp_num} to {status} "
f"(flow_run_id={flow_run_id})"
)
return success
75 changes: 59 additions & 16 deletions data-tool/flows/corps_tombstone_flow.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import math
from datetime import datetime
from datetime import datetime, timedelta

from common.init_utils import colin_init, get_config, lear_init
from common.query_utils import convert_result_set_to_dict
from common.auth_service import AuthService
from prefect import flow, task
from prefect import flow, task, serve
from prefect.futures import wait
from prefect.context import get_run_context
from sqlalchemy import Connection, text
from sqlalchemy.engine import Engine

from common.corp_processing_queue_service import CorpProcessingQueueService as CorpProcessingService, ProcessingStatuses
from tombstone.tombstone_queries import (get_corp_snapshot_filings_queries,
get_corp_users_query,
get_total_unprocessed_count_query,
Expand All @@ -20,20 +23,21 @@


@task
def get_unprocessed_corps(config, colin_engine: Engine) -> list:
"""Get unprocessed corp numbers."""
query = get_unprocessed_corps_query(
'local',
def reserve_unprocessed_corps(config, processing_service, flow_run_id, num_corps) -> list:
"""Reserve corps for a given flow run.
Note that this is not same as claiming them for processing which will be done in some subsequent steps. This step
is done to avoid parallel flows from trying to compete for the same corps.
"""
base_query = get_unprocessed_corps_query(
'tombstone-flow',
config.DATA_LOAD_ENV,
config.TOMBSTONE_BATCH_SIZE
num_corps # Pass the total number we want to process
)
sql_text = text(query)

with colin_engine.connect() as conn:
rs = conn.execute(sql_text)
raw_data_dict = convert_result_set_to_dict(rs)
corp_nums = [x.get('corp_num') for x in raw_data_dict]
return corp_nums
# reserve corps
reserved = processing_service.reserve_for_flow(base_query, flow_run_id)
return reserved


@task
Expand Down Expand Up @@ -200,8 +204,8 @@ def load_placeholder_filings(conn: Connection, tombstone_data: dict, business_id
@task(name='3.3-Update-Auth-Task')
def update_auth(conn: Connection, config, corp_num: str, tombstone_data: dict):
"""Create auth entity and affiliate as required."""
# TODO affiliation to an account does not need to happen. only entity creation in auth is req'd.
# used for testing purposes to see how things look in entity dashboard - remove when done testing
# Note: affiliation to an account does not need to happen. only entity creation in auth is req'd.
# used for testing purposes to see how things look in entity dashboard
if config.AFFILIATE_ENTITY:
business_data = tombstone_data['businesses']
account_id = config.AFFILIATE_ENTITY_ACCOUNT_ID
Expand Down Expand Up @@ -287,6 +291,8 @@ def tombstone_flow():
config = get_config()
colin_engine = colin_init(config)
lear_engine = lear_init(config)
flow_run_id = get_run_context().flow_run.id
processing_service = CorpProcessingService(config.DATA_LOAD_ENV, colin_engine, 'tombstone-flow')

total = get_unprocessed_count(config, colin_engine)

Expand All @@ -297,12 +303,21 @@ def tombstone_flow():
batch_size = config.TOMBSTONE_BATCH_SIZE
batches = min(math.ceil(total/batch_size), config.TOMBSTONE_BATCHES)

# Calculate max corps to initialize
max_corps = min(total, config.TOMBSTONE_BATCHES * config.TOMBSTONE_BATCH_SIZE)
print(f'max_corps: {max_corps}')
reserved_corps = reserve_unprocessed_corps(config, processing_service, flow_run_id, max_corps)
print(f'👷 Reserved {reserved_corps} corps for processing')
print(f'👷 Going to migrate {total} corps with batch size of {batch_size}')

cnt = 0
migrated_cnt = 0
while cnt < batches:
corp_nums = get_unprocessed_corps(config, colin_engine)
# Claim next batch of reserved corps for current flow
corp_nums = processing_service.claim_batch(flow_run_id, batch_size)
if not corp_nums:
print("No more corps available to claim")
break

print(f'👷 Start processing {len(corp_nums)} corps: {", ".join(corp_nums[:5])}...')

Expand Down Expand Up @@ -332,6 +347,24 @@ def tombstone_flow():
print(f'❗ Skip migrating {corp_num} due to data collection error.')

wait(corp_futures)

for f in corp_futures:
corp_num = f.result()
if corp_num:
processing_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.COMPLETED
)
else:
# Handle error case if needed
processing_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.FAILED,
error="Migration failed"
)

succeeded = sum(1 for f in corp_futures if f.state.is_completed())
failed = len(corp_futures) - succeeded
print(f'🌟 Complete round {cnt}. Succeeded: {succeeded}. Failed: {failed}. Skip: {skipped}')
Expand All @@ -347,3 +380,13 @@ def tombstone_flow():

if __name__ == "__main__":
tombstone_flow()

# # Create deployment - only intended to test locally for parallel flows
# deployment = tombstone_flow.to_deployment(
# name="tombstone-deployment",
# interval=timedelta(seconds=8), # Run every x seconds
# tags=["tombstone-migration"]
# )
#
# # Start serving the deployment
# serve(deployment)
6 changes: 4 additions & 2 deletions data-tool/flows/tombstone/tombstone_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ def get_unprocessed_corps_query(flow_name, environment, batch_size):
-- and c.corp_num = 'BC0043406' -- lots of directors
-- and c.corp_num in ('BC0326163', 'BC0395512', 'BC0883637') -- TODO: re-migrate issue (can be solved by adding tracking)
-- and c.corp_num = 'BC0870626' -- lots of filings - IA, CoDs, ARs
-- and c.corp_num = 'BC0004969' -- lots of filings - IA, ARs, transition, alteration, COD, COA
-- and c.corp_num = 'BC0004969' -- lots of filings - IA, ARs, transition, alteration, COD, COA
-- and c.corp_num = 'BC0002567' -- lots of filings - IA, ARs, transition, COD
-- and c.corp_num in ('BC0068889', 'BC0441359') -- test users mapping
-- and c.corp_num in ('BC0326163', 'BC0046540', 'BC0883637', 'BC0043406', 'BC0068889', 'BC0441359')
-- and c.corp_num in ('BC0472301', 'BC0649417', 'BC0808085', 'BC0803411', 'BC0756111', 'BC0511226', 'BC0833000', 'BC0343855', 'BC0149266') -- dissolution
and c.corp_type_cd in ('BC', 'C', 'ULC', 'CUL', 'CC', 'CCC', 'QA', 'QB', 'QC', 'QD', 'QE') -- TODO: update transfer script
and cs.end_event_id is null
and ((cp.processed_status is null or cp.processed_status != 'COMPLETED'))
-- and ((cp.processed_status is null or cp.processed_status != 'COMPLETED'))
and cp.processed_status is null
and cp.flow_run_id is null
-- and cs.state_type_cd = 'ACT'
-- order by random()
limit {batch_size}
Expand Down
2 changes: 2 additions & 0 deletions data-tool/scripts/colin_corps_extract_postgres_ddl
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ create table if not exists corp_processing
create_date timestamp with time zone,
last_modified timestamp with time zone,
last_error varchar(1000),
claimed_at timestamp with time zone,
flow_run_id uuid,
constraint unq_corp_processing
unique (corp_num, flow_name, environment)
);
Expand Down

0 comments on commit ed7a88a

Please sign in to comment.