Skip to content

Commit

Permalink
patch: refactor api to use a single topic
Browse files Browse the repository at this point in the history
  • Loading branch information
aethernet committed Apr 22, 2024
1 parent b1d2bda commit 6582260
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 65 deletions.
56 changes: 33 additions & 23 deletions lib/gui/app/modules/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,35 +128,45 @@ function startApiAndSpawnChild({
return new Promise((resolve, reject) => {
ipc.serve();

// log is special message which brings back the logs from the child process and prints them to the console
ipc.server.on('log', (message: string) => {
console.log(message);
// parse and route messages
const messagesHandler: any = {
log: (message: any) => {
console.log(message);
},

error: (error: any) => {
terminateServer(ipc.server);
const errorObject = errors.fromJSON(error);
reject(errorObject);
},

// once api is ready (means child process is connected) we pass the emit and terminate function to the caller
ready: (_: any, socket: any) => {
const emit = (type: string, payload: any) => {
ipc.server.emit(socket, 'message', { type, payload });
};
resolve({
emit,
terminateServer: () => terminateServer(ipc.server),
registerHandler,
});
},
};

ipc.server.on('message', (data: any, socket: any) => {
const message = messagesHandler[data.type];
if (message) {
message(data.payload, socket);
} else {
throw new Error(`Unknown message type: ${data.type}`);
}
});

// api to register more handlers with callbacks
const registerHandler = (event: string, handler: any) => {
ipc.server.on(event, handler);
messagesHandler[event] = handler;
};

// once api is ready (means child process is connected) we pass the emit and terminate function to the caller
ipc.server.on('ready', (_: any, socket) => {
const emit = (channel: string, data: any) => {
ipc.server.emit(socket, channel, data);
};
resolve({
emit,
terminateServer: () => terminateServer(ipc.server),
registerHandler,
});
});

// on api error we terminate
ipc.server.on('error', (error: any) => {
terminateServer(ipc.server);
const errorObject = errors.fromJSON(error);
reject(errorObject);
});

// when the api is started we spawn the child process
ipc.server.on('start', async () => {
try {
Expand Down
99 changes: 57 additions & 42 deletions lib/util/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,16 @@ ipc.config.stopRetrying = 0;
const DISCONNECT_DELAY = 100;
const IPC_SERVER_ID = process.env.IPC_SERVER_ID as string;

console.log('starting ');
if (!IPC_SERVER_ID) {
console.log('IPC_SERVER_ID is not defined, exiting');
}

/**
* @summary Send a message to the IPC server
*/
function emit(channel: string, message?: any) {
ipc.of[IPC_SERVER_ID].emit(channel, message);
function emit(type: string, payload?: any) {
ipc.of[IPC_SERVER_ID].emit('message', { type, payload });
}

/**
Expand Down Expand Up @@ -129,49 +134,59 @@ ipc.connectTo(IPC_SERVER_ID, () => {
await terminate(SUCCESS);
});

ipc.of[IPC_SERVER_ID].on('sourceMetadata', async (params) => {
const { selected, SourceType, auth } = JSON.parse(params);
try {
const sourceMatadata = await getSourceMetadata(
selected,
SourceType,
auth,
);
emitSourceMetadata(sourceMatadata);
} catch (error: any) {
emitFail(error);
const messagesHandler: any = {
scan: () => {
startScanning();
},

write: async (options: WriteOptions) => {
// Remove leftover tmp files older than 1 hour
cleanup(Date.now() - 60 * 60 * 1000);

let exitCode = SUCCESS;

ipc.of[IPC_SERVER_ID].on('cancel', () => onAbort(exitCode));

ipc.of[IPC_SERVER_ID].on('skip', () => onSkip(exitCode));

const results = await write(options);

if (results.errors.length > 0) {
results.errors = results.errors.map((error: any) => {
return toJSON(error);
});
exitCode = GENERAL_ERROR;
}

emit('done', { results });
await delay(DISCONNECT_DELAY);
await terminate(exitCode);
},

sourceMetadata: async (params: any) => {
const { selected, SourceType, auth } = JSON.parse(params);
try {
const sourceMatadata = await getSourceMetadata(
selected,
SourceType,
auth,
);
emitSourceMetadata(sourceMatadata);
} catch (error: any) {
emitFail(error);
}
},
};

ipc.of[IPC_SERVER_ID].on('message', async (data: any) => {
const message = messagesHandler[data.type];
if (message) {
await message(data.payload);
} else {
throw new Error(`Unknown message type: ${data.type}`);
}
});

ipc.of[IPC_SERVER_ID].on('scan', async () => {
startScanning();
});

// write handler
ipc.of[IPC_SERVER_ID].on('write', async (options: WriteOptions) => {
// Remove leftover tmp files older than 1 hour
cleanup(Date.now() - 60 * 60 * 1000);

let exitCode = SUCCESS;

ipc.of[IPC_SERVER_ID].on('cancel', () => onAbort(exitCode));

ipc.of[IPC_SERVER_ID].on('skip', () => onSkip(exitCode));

const results = await write(options);

if (results.errors.length > 0) {
results.errors = results.errors.map((error: any) => {
return toJSON(error);
});
exitCode = GENERAL_ERROR;
}

emit('done', { results });
await delay(DISCONNECT_DELAY);
await terminate(exitCode);
});

ipc.of[IPC_SERVER_ID].on('connect', () => {
log(
`Successfully connected to IPC server: ${IPC_SERVER_ID}, socket root ${ipc.config.socketRoot}`,
Expand Down

0 comments on commit 6582260

Please sign in to comment.