-
Notifications
You must be signed in to change notification settings - Fork 0
Redis pubsub
Komine Shunta edited this page Jul 2, 2021
·
2 revisions
pub/subを利用して他のプロセス・インスタンスに情報を伝え、相手が何らかの処理をしたことを保証するためのコード
import Redis from 'ioredis';
const redis = new Redis();
const pub = new Redis();
const onSuccessHandler: { [key: string]: () => void } = {};
const onReceiveHandler: {
[channel: string]: (data: any) => Promise<void>;
} = {};
redis.on('message', async (channel: string, message: string) => {
if (channel.startsWith('success')) {
const [_, id] = channel.split(':');
onSuccessHandler[id] && onSuccessHandler[id]();
} else {
const { data, id } = JSON.parse(message);
onReceiveHandler[channel] && (await onReceiveHandler[channel](data));
pub.publish(`success:${id}`, id);
}
});
export async function onReceive(
channel: string,
callback: (data: any) => Promise<void>
) {
await redis.subscribe(channel);
onReceiveHandler[channel] = callback;
}
export async function sendMessageAndWaitSuccess(channel: string, data: any) {
const id = Math.random().toString();
await redis.subscribe(`success:${id}`);
let num = -1;
const ret = new Promise<void>((resolve) => {
onSuccessHandler[id] = () => {
if (--num <= 0) {
redis.unsubscribe(`success:${id}`);
resolve();
}
};
});
num = await pub.publish(channel, JSON.stringify({ id, data }));
return ret;
}
使用例
import { onReceive, sendMessageAndWaitSuccess } from './pubsubUtil';
const sleep = async (ms: number) =>
new Promise<void>((resolve) => setTimeout(resolve, ms));
onReceive('hoge', async (data) => {
const { cur, id } = data;
const diff = new Date().getTime() - cur;
await sleep(1000);
console.log(process.pid, 'received', diff, id);
});
setTimeout(async () => {
const start = new Date().getTime();
await sendMessageAndWaitSuccess('hoge', {
cur: new Date().getTime(),
id: Math.random(),
});
console.log('Took time:', new Date().getTime() - start);
}, 3000);
上を見ろ
左を見ろ