Skip to content

Commit

Permalink
chore: change dispatch subscribe to run through the deserializer
Browse files Browse the repository at this point in the history
  • Loading branch information
mabels committed Jan 30, 2025
1 parent 843fe42 commit 6328100
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions src/runtime/gateways/def-serde-gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class DefSerdeGateway implements SerdeGateway {
return this.gw.close(uri, sthis);
}

private subscribeFn = new Map<string, (meta: FPEnvelopeMeta) => Promise<void>>();
private subscribeFn = new Map<string, (raw: Uint8Array) => Promise<void>>();

async put<T>({ loader: { sthis }, encoder }: SerdeGatewayCtx, url: URI, env: FPEnvelope<T>): Promise<Result<void>> {
const rUint8 = await fpSerialize(sthis, env, encoder);
Expand All @@ -35,7 +35,7 @@ export class DefSerdeGateway implements SerdeGateway {
if (env.type === FPEnvelopeType.META) {
if (this.subscribeFn.has(url.toString())) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.subscribeFn.get(url.toString())!(env as FPEnvelopeMeta);
this.subscribeFn.get(url.toString())!(rUint8.Ok());
}
}
return ret;
Expand All @@ -52,23 +52,23 @@ export class DefSerdeGateway implements SerdeGateway {
url: URI,
callback: (meta: FPEnvelopeMeta) => Promise<void>,
): Promise<Result<() => void>> {
function rawCallback(raw: Uint8Array) {
return fpDeserialize<DbMetaEvent[]>(sthis, url, Result.Ok(raw), decoder).then((res) => {
if (res.isErr()) {
sthis.logger.Error().Err(res).Msg("Failed to deserialize");
return;
}
callback(res.Ok() as FPEnvelopeMeta);
});
}
if (!this.gw.subscribe) {
// memory leak possible
this.subscribeFn.set(url.toString(), callback);
this.subscribeFn.set(url.toString(), rawCallback);
return Result.Ok(() => {
this.subscribeFn.delete(url.toString());
});
}
const unreg = await this.gw.subscribe(
url,
(raw: Uint8Array) => {
fpDeserialize<DbMetaEvent[]>(sthis, url, Result.Ok(raw), decoder).then((res) => {
if (res.isErr()) return;
callback(res.Ok() as FPEnvelopeMeta);
});
},
sthis,
);
const unreg = await this.gw.subscribe(url, rawCallback, sthis);
return unreg;
}

Expand Down

0 comments on commit 6328100

Please sign in to comment.