Skip to content

Commit

Permalink
Merge pull request #18 from PowerLoom/chore/processor_distributor_cle…
Browse files Browse the repository at this point in the history
…anup

Processor Distributor Cleanup
  • Loading branch information
xadahiya authored Nov 1, 2024
2 parents 7209443 + d4bbfe5 commit 852d536
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 145 deletions.
2 changes: 0 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ services:
- MAX_STREAM_POOL_SIZE=$MAX_STREAM_POOL_SIZE
- STREAM_POOL_HEALTH_CHECK_INTERVAL=$STREAM_POOL_HEALTH_CHECK_INTERVAL
- ENABLE_CRON_RESTART_LOCAL_COLLECTOR=$ENABLE_CRON_RESTART_LOCAL_COLLECTOR
command:
bash -c "bash server_autofill.sh && bash init_processes.sh"
networks:
- custom_network
ulimits:
Expand Down
113 changes: 0 additions & 113 deletions snapshotter/processor_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,15 @@
from snapshotter.utils.models.data_models import SnapshotterStateUpdate
from snapshotter.utils.models.message_models import EpochBase
from snapshotter.utils.models.message_models import PowerloomCalculateAggregateMessage
from snapshotter.utils.models.message_models import PowerloomProjectsUpdatedMessage
from snapshotter.utils.models.message_models import PowerloomSnapshotBatchSubmittedMessage
from snapshotter.utils.models.message_models import PowerloomSnapshotFinalizedMessage
from snapshotter.utils.models.message_models import PowerloomSnapshotProcessMessage
from snapshotter.utils.models.message_models import PowerloomSnapshotSubmittedMessage
from snapshotter.utils.models.message_models import ProcessHubCommand
from snapshotter.utils.models.settings_model import AggregateOn
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.redis.redis_keys import active_status_key
from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key
from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping
from snapshotter.utils.redis.redis_keys import process_hub_core_start_timestamp
from snapshotter.utils.redis.redis_keys import project_finalized_data_zset
from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key
from snapshotter.utils.rpc import RpcHelper
Expand Down Expand Up @@ -110,7 +107,6 @@ def __init__(self, name, **kwargs):
_payload_commit_routing_key (str): The routing key for payload commits.
_upcoming_project_changes (defaultdict): Dictionary of upcoming project changes.
_preload_completion_conditions (defaultdict): Dictionary of preload completion conditions.
_newly_added_projects (set): Set of newly added projects.
_shutdown_initiated (bool): Flag indicating if shutdown has been initiated.
_all_preload_tasks (set): Set of all preload tasks.
_project_type_config_mapping (dict): Dictionary mapping project types to their configurations.
Expand Down Expand Up @@ -147,7 +143,6 @@ def __init__(self, name, **kwargs):
dict,
) # epoch ID to preloading complete event

self._newly_added_projects = set()
self._shutdown_initiated = False
self._all_preload_tasks = set()
self._project_type_config_mapping = dict()
Expand Down Expand Up @@ -234,33 +229,6 @@ async def _init_httpx_client(self):
transport=self._async_transport,
)

async def _send_proc_hub_respawn(self):
"""
Sends a respawn command to the process hub.
This method creates a ProcessHubCommand object with the command 'respawn',
acquires a channel from the channel pool, gets the exchange, and publishes
the command message to the exchange.
Args:
None
Returns:
None
"""
proc_hub_cmd = ProcessHubCommand(
command='respawn',
)
async with self._rmq_channel_pool.acquire() as channel:
await channel.set_qos(10)
exchange = await channel.get_exchange(
name=f'{settings.rabbitmq.setup.core.exchange}:{settings.namespace}',
)
await exchange.publish(
routing_key=f'processhub-commands:{settings.namespace}:{settings.instance_id}',
message=Message(proc_hub_cmd.json().encode('utf-8')),
)

async def _init_preloader_compute_mapping(self):
"""
Initializes the preloader compute mapping by importing the preloader module and class and
Expand Down Expand Up @@ -354,19 +322,6 @@ async def init_worker(self):

self._initialized = True

async def _get_proc_hub_start_time(self) -> int:
"""
Retrieves the start time of the process hub core from Redis.
Returns:
int: The start time of the process hub core, or 0 if not found.
"""
_ = await self._redis_conn.get(process_hub_core_start_timestamp())
if _:
return int(_)
else:
return 0

async def _preloader_waiter(
self,
epoch: EpochBase,
Expand Down Expand Up @@ -538,11 +493,6 @@ async def _epoch_release_processor(self, message: IncomingMessage):
'Unexpected message format of epoch callback',
)
return

self._newly_added_projects = self._newly_added_projects.union(
await self._enable_pending_projects_for_epoch(msg_obj.epochId),
)
self._logger.debug('Newly added projects for epoch {}: {}', msg_obj.epochId, self._newly_added_projects)
self._logger.debug('Pushing epoch release to preloader coroutine: {}', msg_obj)
current_time = time.time()
task = asyncio.create_task(
Expand Down Expand Up @@ -597,16 +547,10 @@ async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: Epo
# Handling projects with no data sources
if project_config.projects is None:
project_id = f'{project_type}:{settings.namespace}'
if project_id.lower() in self._newly_added_projects:
genesis = True
self._newly_added_projects.remove(project_id.lower())
else:
genesis = False
process_unit = PowerloomSnapshotProcessMessage(
begin=epoch.begin,
end=epoch.end,
epochId=epoch.epochId,
genesis=genesis,
)

msg_body = Message(process_unit.json().encode('utf-8'))
Expand All @@ -625,12 +569,6 @@ async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: Epo
for project in project_config.projects:
project_id = f'{project_type}:{project}:{settings.namespace}'
static_source_project_ids.append(project_id)
if project_id.lower() in self._newly_added_projects:
genesis = True
self._newly_added_projects.remove(project_id.lower())
else:
genesis = False

data_sources = project.split('_')
if len(data_sources) == 1:
data_source = data_sources[0]
Expand All @@ -643,7 +581,6 @@ async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: Epo
epochId=epoch.epochId,
data_source=data_source,
primary_data_source=primary_data_source,
genesis=genesis,
)

msg_body = Message(process_unit.json().encode('utf-8'))
Expand All @@ -667,37 +604,6 @@ async def _distribute_callbacks_snapshotting(self, project_type: str, epoch: Epo
f' for epoch {epoch.epochId}',
)

async def _enable_pending_projects_for_epoch(self, epoch_id) -> Set[str]:
"""
Enables pending projects for the given epoch ID and returns a set of project IDs that were allowed.
Args:
epoch_id: The epoch ID for which to enable pending projects.
Returns:
A set of project IDs that were allowed.
"""
pending_project_msgs: List[PowerloomProjectsUpdatedMessage] = self._upcoming_project_changes.pop(epoch_id, [])
if not pending_project_msgs:
return set()
else:
for msg_obj in pending_project_msgs:
# Update projects list
for project_type, project_config in self._project_type_config_mapping.items():
projects_set = set(project_config.projects)
if project_type in msg_obj.projectId:
if project_config.projects is None:
continue
data_source = msg_obj.projectId.split(':')[-2]
if msg_obj.allowed:
projects_set.add(data_source)
else:
if data_source in project_config.projects:
projects_set.discard(data_source)
project_config.projects = list(projects_set)

return set([msg.projectId.lower() for msg in pending_project_msgs if msg.allowed])

def _fetch_base_project_list(self, project_type: str) -> List[str]:
"""
Fetches the base project list for the given project type.
Expand Down Expand Up @@ -744,25 +650,6 @@ def _gen_projects_to_wait_for(self, project_type: str) -> List[str]:
projects_to_wait_for.update(self._gen_projects_to_wait_for(project_type))
return projects_to_wait_for

async def _update_all_projects(self, message: IncomingMessage):
"""
Updates all projects based on the incoming message.
Args:
message (IncomingMessage): The incoming message containing the project updates.
"""

event_type = message.routing_key.split('.')[-1]

if event_type == 'ProjectsUpdated':
msg_obj: PowerloomProjectsUpdatedMessage = (
PowerloomProjectsUpdatedMessage.parse_raw(message.body)
)
else:
return

self._upcoming_project_changes[msg_obj.enableEpochId].append(msg_obj)

# NOTE: Considering SequencerFinalized state as Finalized for now
# data data is overwritten upon receiving SnapshotFinalized message for the project
# TODO: Create separate states for SequencerFinalized and SnapshotFinalized
Expand Down
4 changes: 2 additions & 2 deletions snapshotter/utils/aggregation_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _gen_project_id(self, task_type, epoch):
else:
raise ValueError(f'Unknown project type {task_type}')

async def _processor_task(
async def _process_task(
self,
msg_obj: Union[PowerloomSnapshotSubmittedMessage, PowerloomCalculateAggregateMessage],
task_type: str,
Expand Down Expand Up @@ -336,7 +336,7 @@ async def _on_rabbitmq_message(self, message: IncomingMessage):
'Unknown task type {}', task_type,
)
return
await self._create_tracked_task(self._processor_task(msg_obj=msg_obj, task_type=task_type))
await self._create_tracked_task(self._process_task(msg_obj=msg_obj, task_type=task_type))

async def _init_project_calculation_mapping(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions snapshotter/utils/delegate_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, name, **kwargs):

self._q, self._rmq_routing = get_delegate_worker_request_queue_routing_key()

async def _processor_task(self, msg_obj: PowerloomDelegateWorkerRequestMessage):
async def _process_task(self, msg_obj: PowerloomDelegateWorkerRequestMessage):
"""
Process a delegate task for the given message object.
Expand Down Expand Up @@ -216,7 +216,7 @@ async def _on_rabbitmq_message(self, message: IncomingMessage):
return
# Start processing the task asynchronously
current_time = time.time()
task = asyncio.create_task(self._processor_task(msg_obj=msg_obj))
task = asyncio.create_task(self._process_task(msg_obj=msg_obj))
self._active_tasks.add((current_time, task))
task.add_done_callback(lambda _: self._active_tasks.discard((current_time, task)))

Expand Down
16 changes: 0 additions & 16 deletions snapshotter/utils/models/message_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class PowerloomSnapshotProcessMessage(EpochBase):
"""Model for Powerloom snapshot process messages."""
data_source: Optional[str] = None
primary_data_source: Optional[str] = None
genesis: Optional[bool] = False
bulk_mode: Optional[bool] = False


Expand All @@ -70,13 +69,6 @@ class PowerloomSnapshotBatchSubmittedMessage(BaseModel):
transactionHash: str


class PowerloomProjectsUpdatedMessage(BaseModel):
"""Model for Powerloom project update messages."""
projectId: str
allowed: bool
enableEpochId: int


class PowerloomSnapshotSubmittedMessage(BaseModel):
"""Model for Powerloom snapshot submission messages."""
snapshotCid: str
Expand Down Expand Up @@ -112,14 +104,6 @@ class PowerloomCalculateAggregateMessage(BaseModel):
timestamp: int


class ProcessHubCommand(BaseModel):
"""Model for process hub commands."""
command: str
pid: Optional[int] = None
proc_str_id: Optional[str] = None
init_kwargs: Optional[Dict] = dict()


class AggregateBase(BaseModel):
"""Base model for aggregate-related data."""
epochId: int
Expand Down
10 changes: 0 additions & 10 deletions snapshotter/utils/redis/redis_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,6 @@ def submitted_unfinalized_snapshot_cids(project_id):
return f'projectID:{project_id}:unfinalizedSnapshots'


def process_hub_core_start_timestamp():
"""
Generate Redis key for process hub core start timestamp.
Returns:
str: Redis key for the process hub core start timestamp.
"""
return f'processHubCoreStartTimestamp:{settings.namespace}'


def callback_last_sent_by_issue(issue_type):
"""
Generate Redis key for callback last sent timestamp. Stores the last sent timestamp for each issueType.
Expand Down

0 comments on commit 852d536

Please sign in to comment.