Skip to content

Redis pubsub

Komine Shunta edited this page Jul 2, 2021 · 2 revisions

pub/subを利用して他のプロセス・インスタンスに情報を伝え、相手が何らかの処理をしたことを保証するためのコード

import Redis from 'ioredis';

const redis = new Redis();
const pub = new Redis();

const onSuccessHandler: { [key: string]: () => void } = {};
const onReceiveHandler: {
    [channel: string]: (data: any) => Promise<void>;
} = {};
redis.on('message', async (channel: string, message: string) => {
    if (channel.startsWith('success')) {
        const [_, id] = channel.split(':');
        onSuccessHandler[id] && onSuccessHandler[id]();
    } else {
        const { data, id } = JSON.parse(message);
        onReceiveHandler[channel] && (await onReceiveHandler[channel](data));
        pub.publish(`success:${id}`, id);
    }
});

export async function onReceive(
    channel: string,
    callback: (data: any) => Promise<void>
) {
    await redis.subscribe(channel);
    onReceiveHandler[channel] = callback;
}

export async function sendMessageAndWaitSuccess(channel: string, data: any) {
    const id = Math.random().toString();
    await redis.subscribe(`success:${id}`);
    let num = -1;
    const ret = new Promise<void>((resolve) => {
        onSuccessHandler[id] = () => {
            if (--num <= 0) {
                redis.unsubscribe(`success:${id}`);
                resolve();
            }
        };
    });
    num = await pub.publish(channel, JSON.stringify({ id, data }));
    return ret;
}

使用例

import { onReceive, sendMessageAndWaitSuccess } from './pubsubUtil';
const sleep = async (ms: number) =>
    new Promise<void>((resolve) => setTimeout(resolve, ms));

onReceive('hoge', async (data) => {
    const { cur, id } = data;
    const diff = new Date().getTime() - cur;
    await sleep(1000);
    console.log(process.pid, 'received', diff, id);
});
setTimeout(async () => {
    const start = new Date().getTime();
    await sendMessageAndWaitSuccess('hoge', {
        cur: new Date().getTime(),
        id: Math.random(),
    });
    console.log('Took time:', new Date().getTime() - start);
}, 3000);