From 880bd9db778bbd62d2402676a435c71d8eb13f82 Mon Sep 17 00:00:00 2001 From: anomit ghosh Date: Tue, 29 Oct 2024 18:42:22 +0530 Subject: [PATCH 1/3] chore: remove command entry point on local collector image --- docker-compose.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 824653f..3791af3 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -26,8 +26,6 @@ services: - MAX_STREAM_POOL_SIZE=$MAX_STREAM_POOL_SIZE - STREAM_POOL_HEALTH_CHECK_INTERVAL=$STREAM_POOL_HEALTH_CHECK_INTERVAL - ENABLE_CRON_RESTART_LOCAL_COLLECTOR=$ENABLE_CRON_RESTART_LOCAL_COLLECTOR - command: - bash -c "bash server_autofill.sh && bash init_processes.sh" networks: - custom_network ulimits: From 9a8989382c4de4a6510e6a9dfb9ab8a80c427e6b Mon Sep 17 00:00:00 2001 From: Akshay Dahiya Date: Fri, 1 Nov 2024 16:39:36 +0530 Subject: [PATCH 2/3] chore: cleanup processor distributor and related files --- snapshotter/processor_distributor.py | 113 --------------------- snapshotter/utils/models/message_models.py | 16 --- snapshotter/utils/redis/redis_keys.py | 10 -- 3 files changed, 139 deletions(-) diff --git a/snapshotter/processor_distributor.py b/snapshotter/processor_distributor.py index c57a5f9..9188878 100644 --- a/snapshotter/processor_distributor.py +++ b/snapshotter/processor_distributor.py @@ -46,18 +46,15 @@ from snapshotter.utils.models.data_models import SnapshotterStateUpdate from snapshotter.utils.models.message_models import EpochBase from snapshotter.utils.models.message_models import PowerloomCalculateAggregateMessage -from snapshotter.utils.models.message_models import PowerloomProjectsUpdatedMessage from snapshotter.utils.models.message_models import PowerloomSnapshotBatchSubmittedMessage from snapshotter.utils.models.message_models import PowerloomSnapshotFinalizedMessage from snapshotter.utils.models.message_models import PowerloomSnapshotProcessMessage from snapshotter.utils.models.message_models import PowerloomSnapshotSubmittedMessage -from snapshotter.utils.models.message_models import ProcessHubCommand from snapshotter.utils.models.settings_model import AggregateOn from snapshotter.utils.redis.redis_conn import RedisPoolCache from snapshotter.utils.redis.redis_keys import active_status_key from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping -from snapshotter.utils.redis.redis_keys import process_hub_core_start_timestamp from snapshotter.utils.redis.redis_keys import project_finalized_data_zset from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key from snapshotter.utils.rpc import RpcHelper @@ -110,7 +107,6 @@ def __init__(self, name, **kwargs): _payload_commit_routing_key (str): The routing key for payload commits. _upcoming_project_changes (defaultdict): Dictionary of upcoming project changes. _preload_completion_conditions (defaultdict): Dictionary of preload completion conditions. - _newly_added_projects (set): Set of newly added projects. _shutdown_initiated (bool): Flag indicating if shutdown has been initiated. _all_preload_tasks (set): Set of all preload tasks. _project_type_config_mapping (dict): Dictionary mapping project types to their configurations. @@ -147,7 +143,6 @@ def __init__(self, name, **kwargs): dict, ) # epoch ID to preloading complete event - self._newly_added_projects = set() self._shutdown_initiated = False self._all_preload_tasks = set() self._project_type_config_mapping = dict() @@ -234,33 +229,6 @@ async def _init_httpx_client(self): transport=self._async_transport, ) - async def _send_proc_hub_respawn(self): - """ - Sends a respawn command to the process hub. - - This method creates a ProcessHubCommand object with the command 'respawn', - acquires a channel from the channel pool, gets the exchange, and publishes - the command message to the exchange. - - Args: - None - - Returns: - None - """ - proc_hub_cmd = ProcessHubCommand( - command='respawn', - ) - async with self._rmq_channel_pool.acquire() as channel: - await channel.set_qos(10) - exchange = await channel.get_exchange( - name=f'{settings.rabbitmq.setup.core.exchange}:{settings.namespace}', - ) - await exchange.publish( - routing_key=f'processhub-commands:{settings.namespace}:{settings.instance_id}', - message=Message(proc_hub_cmd.json().encode('utf-8')), - ) - async def _init_preloader_compute_mapping(self): """ Initializes the preloader compute mapping by importing the preloader module and class and @@ -354,19 +322,6 @@ async def init_worker(self): self._initialized = True - async def _get_proc_hub_start_time(self) -> int: - """ - Retrieves the start time of the process hub core from Redis. - - Returns: - int: The start time of the process hub core, or 0 if not found. - """ - _ = await self._redis_conn.get(process_hub_core_start_timestamp()) - if _: - return int(_) - else: - return 0 - async def _preloader_waiter( self, epoch: EpochBase, @@ -538,11 +493,6 @@ async def _epoch_release_processor(self, message: IncomingMessage): 'Unexpected message format of epoch callback', ) return - - self._newly_added_projects = self._newly_added_projects.union( - await self._enable_pending_projects_for_epoch(msg_obj.epochId), - ) - self._logger.debug('Newly added projects for epoch {}: {}', msg_obj.epochId, self._newly_added_projects) self._logger.debug('Pushing epoch release to preloader coroutine: {}', msg_obj) current_time = time.time() task = asyncio.create_task( @@ -597,16 +547,10 @@ async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: Epo # Handling projects with no data sources if project_config.projects is None: project_id = f'{project_type}:{settings.namespace}' - if project_id.lower() in self._newly_added_projects: - genesis = True - self._newly_added_projects.remove(project_id.lower()) - else: - genesis = False process_unit = PowerloomSnapshotProcessMessage( begin=epoch.begin, end=epoch.end, epochId=epoch.epochId, - genesis=genesis, ) msg_body = Message(process_unit.json().encode('utf-8')) @@ -625,12 +569,6 @@ async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: Epo for project in project_config.projects: project_id = f'{project_type}:{project}:{settings.namespace}' static_source_project_ids.append(project_id) - if project_id.lower() in self._newly_added_projects: - genesis = True - self._newly_added_projects.remove(project_id.lower()) - else: - genesis = False - data_sources = project.split('_') if len(data_sources) == 1: data_source = data_sources[0] @@ -643,7 +581,6 @@ async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: Epo epochId=epoch.epochId, data_source=data_source, primary_data_source=primary_data_source, - genesis=genesis, ) msg_body = Message(process_unit.json().encode('utf-8')) @@ -667,37 +604,6 @@ async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: Epo f' for epoch {epoch.epochId}', ) - async def _enable_pending_projects_for_epoch(self, epoch_id) -> Set[str]: - """ - Enables pending projects for the given epoch ID and returns a set of project IDs that were allowed. - - Args: - epoch_id: The epoch ID for which to enable pending projects. - - Returns: - A set of project IDs that were allowed. - """ - pending_project_msgs: List[PowerloomProjectsUpdatedMessage] = self._upcoming_project_changes.pop(epoch_id, []) - if not pending_project_msgs: - return set() - else: - for msg_obj in pending_project_msgs: - # Update projects list - for project_type, project_config in self._project_type_config_mapping.items(): - projects_set = set(project_config.projects) - if project_type in msg_obj.projectId: - if project_config.projects is None: - continue - data_source = msg_obj.projectId.split(':')[-2] - if msg_obj.allowed: - projects_set.add(data_source) - else: - if data_source in project_config.projects: - projects_set.discard(data_source) - project_config.projects = list(projects_set) - - return set([msg.projectId.lower() for msg in pending_project_msgs if msg.allowed]) - def _fetch_base_project_list(self, project_type: str) -> List[str]: """ Fetches the base project list for the given project type. @@ -744,25 +650,6 @@ def _gen_projects_to_wait_for(self, project_type: str) -> List[str]: projects_to_wait_for.update(self._gen_projects_to_wait_for(project_type)) return projects_to_wait_for - async def _update_all_projects(self, message: IncomingMessage): - """ - Updates all projects based on the incoming message. - - Args: - message (IncomingMessage): The incoming message containing the project updates. - """ - - event_type = message.routing_key.split('.')[-1] - - if event_type == 'ProjectsUpdated': - msg_obj: PowerloomProjectsUpdatedMessage = ( - PowerloomProjectsUpdatedMessage.parse_raw(message.body) - ) - else: - return - - self._upcoming_project_changes[msg_obj.enableEpochId].append(msg_obj) - # NOTE: Considering SequencerFinalized state as Finalized for now # data data is overwritten upon receiving SnapshotFinalized message for the project # TODO: Create separate states for SequencerFinalized and SnapshotFinalized diff --git a/snapshotter/utils/models/message_models.py b/snapshotter/utils/models/message_models.py index c768198..d555c0b 100644 --- a/snapshotter/utils/models/message_models.py +++ b/snapshotter/utils/models/message_models.py @@ -49,7 +49,6 @@ class PowerloomSnapshotProcessMessage(EpochBase): """Model for Powerloom snapshot process messages.""" data_source: Optional[str] = None primary_data_source: Optional[str] = None - genesis: Optional[bool] = False bulk_mode: Optional[bool] = False @@ -70,13 +69,6 @@ class PowerloomSnapshotBatchSubmittedMessage(BaseModel): transactionHash: str -class PowerloomProjectsUpdatedMessage(BaseModel): - """Model for Powerloom project update messages.""" - projectId: str - allowed: bool - enableEpochId: int - - class PowerloomSnapshotSubmittedMessage(BaseModel): """Model for Powerloom snapshot submission messages.""" snapshotCid: str @@ -112,14 +104,6 @@ class PowerloomCalculateAggregateMessage(BaseModel): timestamp: int -class ProcessHubCommand(BaseModel): - """Model for process hub commands.""" - command: str - pid: Optional[int] = None - proc_str_id: Optional[str] = None - init_kwargs: Optional[Dict] = dict() - - class AggregateBase(BaseModel): """Base model for aggregate-related data.""" epochId: int diff --git a/snapshotter/utils/redis/redis_keys.py b/snapshotter/utils/redis/redis_keys.py index 451249c..5358f54 100644 --- a/snapshotter/utils/redis/redis_keys.py +++ b/snapshotter/utils/redis/redis_keys.py @@ -273,16 +273,6 @@ def submitted_unfinalized_snapshot_cids(project_id): return f'projectID:{project_id}:unfinalizedSnapshots' -def process_hub_core_start_timestamp(): - """ - Generate Redis key for process hub core start timestamp. - - Returns: - str: Redis key for the process hub core start timestamp. - """ - return f'processHubCoreStartTimestamp:{settings.namespace}' - - def callback_last_sent_by_issue(issue_type): """ Generate Redis key for callback last sent timestamp. Stores the last sent timestamp for each issueType. From d4bbfe53825267d63eb2f08659812470f6f93350 Mon Sep 17 00:00:00 2001 From: Akshay Dahiya Date: Fri, 1 Nov 2024 16:47:39 +0530 Subject: [PATCH 3/3] fix: naming `processor_task` > `process_task` --- snapshotter/utils/aggregation_worker.py | 4 ++-- snapshotter/utils/delegate_worker.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/snapshotter/utils/aggregation_worker.py b/snapshotter/utils/aggregation_worker.py index fc5cbd2..10a3f46 100644 --- a/snapshotter/utils/aggregation_worker.py +++ b/snapshotter/utils/aggregation_worker.py @@ -120,7 +120,7 @@ def _gen_project_id(self, task_type, epoch): else: raise ValueError(f'Unknown project type {task_type}') - async def _processor_task( + async def _process_task( self, msg_obj: Union[PowerloomSnapshotSubmittedMessage, PowerloomCalculateAggregateMessage], task_type: str, @@ -336,7 +336,7 @@ async def _on_rabbitmq_message(self, message: IncomingMessage): 'Unknown task type {}', task_type, ) return - await self._create_tracked_task(self._processor_task(msg_obj=msg_obj, task_type=task_type)) + await self._create_tracked_task(self._process_task(msg_obj=msg_obj, task_type=task_type)) async def _init_project_calculation_mapping(self): """ diff --git a/snapshotter/utils/delegate_worker.py b/snapshotter/utils/delegate_worker.py index 1570fda..20e0e55 100644 --- a/snapshotter/utils/delegate_worker.py +++ b/snapshotter/utils/delegate_worker.py @@ -51,7 +51,7 @@ def __init__(self, name, **kwargs): self._q, self._rmq_routing = get_delegate_worker_request_queue_routing_key() - async def _processor_task(self, msg_obj: PowerloomDelegateWorkerRequestMessage): + async def _process_task(self, msg_obj: PowerloomDelegateWorkerRequestMessage): """ Process a delegate task for the given message object. @@ -216,7 +216,7 @@ async def _on_rabbitmq_message(self, message: IncomingMessage): return # Start processing the task asynchronously current_time = time.time() - task = asyncio.create_task(self._processor_task(msg_obj=msg_obj)) + task = asyncio.create_task(self._process_task(msg_obj=msg_obj)) self._active_tasks.add((current_time, task)) task.add_done_callback(lambda _: self._active_tasks.discard((current_time, task)))