diff --git a/updates_and_signals/safe_message_handlers/src/activities.ts b/updates_and_signals/safe_message_handlers/src/activities.ts new file mode 100644 index 00000000..497fab4e --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/activities.ts @@ -0,0 +1,33 @@ +import { proxyActivities } from '@temporalio/workflow'; +import { ActivityInput as AllocateNodesToJobInput } from './interfaces'; + +// Activities with TypeScript syntax and Temporal TypeScript SDK specifics +const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities<{ + allocateNodesToJob(input: AllocateNodesToJobInput): Promise; + deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise; + findBadNodes(input: FindBadNodesInput): Promise; +}>({ + startToCloseTimeout: '1 minute', +}); + +export async function allocateNodesToJob(input: AllocateNodesToJobInput): Promise { + console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`); + await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation +} + +export async function deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise { + console.log(`Deallocating nodes ${input.nodes} from job ${input.jobName}`); + await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation +} + +export async function findBadNodes(input: FindBadNodesInput): Promise { + await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation + const badNodes = input.nodesToCheck.filter(n => parseInt(n) % 5 === 0); + if (badNodes.length) { + console.log(`Found bad nodes: ${badNodes}`); + } else { + console.log("No new bad nodes found."); + } + return badNodes; +} + diff --git a/updates_and_signals/safe_message_handlers/src/starter.ts b/updates_and_signals/safe_message_handlers/src/starter.ts new file mode 100644 index 00000000..21a7b55e --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/starter.ts @@ -0,0 +1,20 @@ +import { WorkflowClient } from '@temporalio/client'; +import { ClusterManagerWorkflow } from './workflow'; +import { doClusterLifecycle } from './utils'; + +async function main() { + const client = new WorkflowClient(); + + // Define the workflow handle + const workflow = client.createWorkflowHandle(ClusterManagerWorkflow, { + workflowId: 'cluster-management-workflow', + }); + + // Start the cluster lifecycle + await doClusterLifecycle(workflow); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/worker.ts b/updates_and_signals/safe_message_handlers/src/worker.ts new file mode 100644 index 00000000..dc3baa79 --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/worker.ts @@ -0,0 +1,17 @@ +import { Worker } from '@temporalio/worker'; +import path from 'path'; + +async function run() { + const worker = await Worker.create({ + workflowsPath: path.join(__dirname, './workflows'), + activitiesPath: path.join(__dirname, './activities'), + taskQueue: 'safe-message-handlers-task-queue', + }); + + await worker.run(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/updates_and_signals/safe_message_handlers/src/workflow.ts b/updates_and_signals/safe_message_handlers/src/workflow.ts new file mode 100644 index 00000000..ca984b7c --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/workflow.ts @@ -0,0 +1,102 @@ +// This file contains the TypeScript port of the Python workflow for managing cluster updates and signals + +import { proxyActivities, defineSignal, defineQuery, setHandler, condition, sleep, defineWorkflow } from '@temporalio/workflow'; +import type { AllocateNodesToJobInput, DeallocateNodesForJobInput, FindBadNodesInput } from './interfaces'; + +// Define signals +const startClusterSignal = defineSignal('startCluster'); +const shutdownClusterSignal = defineSignal('shutdownCluster'); +const allocateNodesToJobSignal = defineSignal<[AllocateNodesToJobInput]>('allocateNodesToJob'); +const deallocateNodesForJobSignal = defineSignal<[DeallocateNodesForJobInput]>('deallocateNodesForJob'); + +// Define queries +const getClusterStatusQuery = defineQuery<{}>('getClusterStatus'); + +// Define activities +const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities<{ + allocateNodesToJob(input: AllocateNodesToJobInput): Promise; + deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise; + findBadNodes(input: FindBadNodesInput): Promise; +}>({ + startToCloseTimeout: '1 minute', +}); + +// Define workflow interface +export interface ClusterManagerWorkflow { + run(input: ClusterManagerWorkflowInput): Promise; +} + +// Define workflow input and result types +export interface ClusterManagerWorkflowInput { + testContinueAsNew: boolean; +} + +export interface ClusterManagerWorkflowResult { + maxAssignedNodes: number; + numCurrentlyAssignedNodes: number; + numBadNodes: number; +} + +// Workflow implementation +export const clusterManagerWorkflow: ClusterManagerWorkflow = defineWorkflow({ + async run(input: ClusterManagerWorkflowInput) { + let state = { + clusterStarted: false, + clusterShutdown: false, + nodes: {} as Record, + jobsAdded: new Set(), + maxAssignedNodes: 0, + }; + + // Signal handlers + setHandler(startClusterSignal, () => { + state.clusterStarted = true; + for (let i = 0; i < 25; i++) { + state.nodes[i.toString()] = null; + } + }); + + setHandler(shutdownClusterSignal, () => { + state.clusterShutdown = true; + }); + + setHandler(allocateNodesToJobSignal, async (input: AllocateNodesToJobInput) => { + if (!state.clusterStarted || state.clusterShutdown) { + throw new Error('Cluster is not in a valid state for node allocation'); + } + // Allocate nodes to job logic + }); + + setHandler(deallocateNodesForJobSignal, async (input: DeallocateNodesForJobInput) => { + if (!state.clusterStarted || state.clusterShutdown) { + throw new Error('Cluster is not in a valid state for node deallocation'); + } + // Deallocate nodes from job logic + }); + + // Query handler + setHandler(getClusterStatusQuery, () => { + return { + clusterStarted: state.clusterStarted, + clusterShutdown: state.clusterShutdown, + numNodes: Object.keys(state.nodes).length, + numAssignedNodes: Object.values(state.nodes).filter(n => n !== null).length, + }; + }); + + // Main workflow logic + await condition(() => state.clusterStarted, 'Waiting for cluster to start'); + // Perform operations while cluster is active + while (!state.clusterShutdown) { + // Example: perform periodic health checks + await sleep(60000); // Sleep for 60 seconds + } + + // Return workflow result + return { + maxAssignedNodes: state.maxAssignedNodes, + numCurrentlyAssignedNodes: Object.values(state.nodes).filter(n => n !== null).length, + numBadNodes: Object.values(state.nodes).filter(n => n === 'BAD').length, + }; + }, +}); diff --git a/updates_and_signals/safe_message_handlers/src/workflow_test.ts b/updates_and_signals/safe_message_handlers/src/workflow_test.ts new file mode 100644 index 00000000..6f95198e --- /dev/null +++ b/updates_and_signals/safe_message_handlers/src/workflow_test.ts @@ -0,0 +1,28 @@ +import { WorkflowClient } from '@temporalio/client'; +import { ClusterManagerWorkflow } from './workflow'; +import { v4 as uuidv4 } from 'uuid'; + +async function run() { + const client = new WorkflowClient(); + + // Define the workflow handle + const workflow = client.createWorkflowHandle(ClusterManagerWorkflow, { + workflowId: `cluster-management-workflow-${uuidv4()}`, + }); + + // Test workflow functionality + await workflow.start(); + await workflow.signal.startCluster(); + await workflow.executeUpdate('allocateNodesToJob', { + numNodes: 5, + jobName: 'job1', + }); + await workflow.signal.shutdownCluster(); + const result = await workflow.result(); + console.log('Workflow result:', result); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +});