Skip to content

Commit

Permalink
Add on notification listener support
Browse files Browse the repository at this point in the history
Change-type: minor
  • Loading branch information
joshbwlng committed Sep 2, 2024
1 parent d9b0db8 commit 7c87023
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions src/database-layer/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ export interface Database extends BaseDatabase {
) => Promise<Result>;
transaction: TransactionFn;
readTransaction: TransactionFn;
on?: (
name: 'notification',
fn: (...args: any[]) => Promise<void>,
options?: {
channel?: string;
},
) => void;
}

interface EngineParams {
Expand Down Expand Up @@ -706,9 +713,61 @@ if (maybePg != null) {
`);
}
}

// Connect and listen for notifications
async function listen(
channel: string,
fn: (...args: any[]) => Promise<void>,
) {
let listenerClient: Pg.PoolClient | null = null;

// Clean up and reconnect listener
const reconnect = () => {
try {
listenerClient?.release();
} catch (err) {
// Ignore listener client release errors
}
setTimeout(() => {
void listen(channel, fn);
}, 1000);
};

try {
listenerClient = await pool.connect();
listenerClient.on('end', reconnect);
listenerClient.on('notification', (msg) => {
if (msg.channel === channel) {
void fn(msg).catch((err) => {
console.error(`Error handling message for '${channel}':`, err);
});
}
});
await listenerClient.query(`LISTEN "${channel}"`);
} catch (err) {
console.error(
`Error setting up listener client for '${channel}':`,
err,
);
reconnect();
}
}

return {
engine: Engines.postgres,
executeSql: atomicExecuteSql,
on: async (name, fn, options) => {
if (name !== 'notification') {
throw new Error(`Unsupported listener type: ${name}`);
} else if (options?.channel == null) {
throw new Error('Missing channel option for notification listener');
} else if (options.channel.includes('"')) {
throw new Error(
`Invalid channel name for task LISTEN: ${options.channel}`,
);
}
await listen(options.channel, fn);
},
transaction: createTransaction(async (stackTraceErr, timeoutMS) => {
const client = await pool.connect();
const tx = new PostgresTx(client, false, stackTraceErr, timeoutMS);
Expand Down

0 comments on commit 7c87023

Please sign in to comment.