Skip to content

Commit

Permalink
refactor(transmit): refactoring syncing through transport
Browse files Browse the repository at this point in the history
  • Loading branch information
RomainLanz committed Sep 25, 2023
1 parent 80f08cc commit 02fad53
Showing 1 changed file with 35 additions and 22 deletions.
57 changes: 35 additions & 22 deletions src/transmit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
* file that was distributed with this source code.
*/

import { randomUUID } from 'node:crypto'
import Emittery from 'emittery'
import { Stream } from './stream.js'
import { StorageBag } from './storage_bag.js'
Expand All @@ -24,11 +23,6 @@ interface TransmitLifecycleHooks {
}

export class Transmit extends Emittery<TransmitLifecycleHooks> {
/**
* The unique id for the transmit instance.
*/
#id: string

/**
* The storage bag instance to store all the streams.
*/
Expand All @@ -45,24 +39,30 @@ export class Transmit extends Emittery<TransmitLifecycleHooks> {
#secureChannelCallbacks: Map<string, (ctx: HttpContext, params?: any) => Promise<boolean>> =
new Map()

/**
* The transport provider to synchronize messages and subscriptions
* across multiple instance.
*/
#transport: Transport | null

/**
* The config for the transmit instance.
*/
#config: TransmitConfig

constructor(config: TransmitConfig, transport: Transport | null) {
super()

this.#id = randomUUID()
this.#config = config
this.#storage = new StorageBag()
this.#secureChannelStore = new SecureChannelStore()
this.#transport = transport

// @ts-ignore
void this.#transport?.subscribe(this.#config.transport.channel, (message) => {
const { channel, payload, from } = JSON.parse(message)
const { channel, payload } = JSON.parse(message)

void this.broadcast(channel, payload, true, from)
void this.#broadcastLocally(channel, payload)
})
}

Expand Down Expand Up @@ -139,25 +139,38 @@ export class Transmit extends Emittery<TransmitLifecycleHooks> {
return this.#storage.removeChannelFromStream(uid, channel)
}

broadcast(channel: string, payload: Record<string, unknown>, internal = false, from?: string) {
if (from === this.#id) {
return
}

#broadcastLocally(
channel: string,
payload: Record<string, unknown>,
senderUid?: string | string[]
) {
const subscribers = this.#storage.findByChannel(channel)

for (const subscriber of subscribers) {
if (
Array.isArray(senderUid)
? senderUid.includes(subscriber.getUid())
: senderUid === subscriber.getUid()
) {
continue
}

subscriber.writeMessage({ data: { channel, payload } })
}
}

if (!internal) {
// @ts-ignore
void this.#transport?.send(this.#config.transport.channel, {
channel,
payload,
from: this.#id,
})
}
broadcastExcept(channel: string, payload: Record<string, unknown>, senderUid: string | string[]) {
return this.#broadcastLocally(channel, payload, senderUid)
}

broadcast(channel: string, payload: Record<string, unknown>) {
this.#broadcastLocally(channel, payload)

// @ts-ignore
void this.#transport?.send(this.#config.transport.channel, {
channel,
payload,
})

void this.emit('broadcast', { channel, payload })
}
Expand Down

0 comments on commit 02fad53

Please sign in to comment.