Skip to content

Commit

Permalink
provider / socket v3
Browse files Browse the repository at this point in the history
  • Loading branch information
janthurau committed Sep 2, 2024
1 parent 524f2b6 commit 6ab93ef
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 110 deletions.
209 changes: 104 additions & 105 deletions packages/provider/src/HocuspocusProvider.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { awarenessStatesToArray } from '@hocuspocus/common'
import * as bc from 'lib0/broadcastchannel'
import * as mutex from 'lib0/mutex'
import type { CloseEvent, Event, MessageEvent } from 'ws'
import { Awareness, removeAwarenessStates } from 'y-protocols/awareness'
Expand All @@ -15,14 +14,11 @@ import { MessageSender } from './MessageSender.js'
import { AuthenticationMessage } from './OutgoingMessages/AuthenticationMessage.js'
import { AwarenessMessage } from './OutgoingMessages/AwarenessMessage.js'
import { CloseMessage } from './OutgoingMessages/CloseMessage.js'
import { QueryAwarenessMessage } from './OutgoingMessages/QueryAwarenessMessage.js'
import { StatelessMessage } from './OutgoingMessages/StatelessMessage.js'
import { SyncStepOneMessage } from './OutgoingMessages/SyncStepOneMessage.js'
import { SyncStepTwoMessage } from './OutgoingMessages/SyncStepTwoMessage.js'
import { UpdateMessage } from './OutgoingMessages/UpdateMessage.js'
import {
ConstructableOutgoingMessage,
WebSocketStatus,
onAuthenticationFailedParameters,
onAwarenessChangeParameters,
onAwarenessUpdateParameters,
Expand Down Expand Up @@ -153,7 +149,7 @@ export class HocuspocusProvider extends EventEmitter {

unsyncedChanges = 0

status = WebSocketStatus.Disconnected
// status = WebSocketStatus.Disconnected

isAuthenticated = false

Expand All @@ -165,7 +161,7 @@ export class HocuspocusProvider extends EventEmitter {
forceSync: null,
}

isConnected = true
// isConnected = true

constructor(configuration: HocuspocusProviderConfiguration) {
super()
Expand Down Expand Up @@ -196,7 +192,7 @@ export class HocuspocusProvider extends EventEmitter {
this.configuration.websocketProvider.on('close', this.configuration.onClose)
this.configuration.websocketProvider.on('close', this.forwardClose)

this.configuration.websocketProvider.on('status', this.boundOnStatus)
// this.configuration.websocketProvider.on('status', this.boundOnStatus)

this.configuration.websocketProvider.on('disconnect', this.configuration.onDisconnect)
this.configuration.websocketProvider.on('disconnect', this.forwardDisconnect)
Expand Down Expand Up @@ -229,15 +225,15 @@ export class HocuspocusProvider extends EventEmitter {
this.configuration.websocketProvider.attach(this)
}

boundBroadcastChannelSubscriber = this.broadcastChannelSubscriber.bind(this)
// boundBroadcastChannelSubscriber = this.broadcastChannelSubscriber.bind(this)

boundPageHide = this.pageHide.bind(this)

boundOnOpen = this.onOpen.bind(this)

boundOnClose = this.onClose.bind(this)

boundOnStatus = this.onStatus.bind(this)
// boundOnStatus = this.onStatus.bind(this)

forwardConnect = (e: any) => this.emit('connect', e)

Expand All @@ -249,12 +245,12 @@ export class HocuspocusProvider extends EventEmitter {

forwardDestroy = (e: any) => this.emit('destroy', e)

public onStatus({ status } : onStatusParameters) {
this.status = status

this.configuration.onStatus({ status })
this.emit('status', { status })
}
// public onStatus({ status } : onStatusParameters) {
// this.status = status
//
// this.configuration.onStatus({ status })
// this.emit('status', { status })
// }

public setConfiguration(configuration: Partial<HocuspocusProviderConfiguration> = {}): void {
if (!configuration.websocketProvider && (configuration as CompleteHocuspocusProviderWebsocketConfiguration).url) {
Expand Down Expand Up @@ -373,29 +369,31 @@ export class HocuspocusProvider extends EventEmitter {

// not needed, but provides backward compatibility with e.g. lexical/yjs
async connect() {
if (this.configuration.broadcast) {
this.subscribeToBroadcastChannel()
}
console.warn('HocuspocusProvider::connect() is deprecated and does not do anything. Please connect/disconnect on the websocketProvider, or attach/deattach providers.')

this.configuration.websocketProvider.shouldConnect = true

return this.configuration.websocketProvider.attach(this)
// if (this.configuration.broadcast) {
// this.subscribeToBroadcastChannel()
// }
//
// return this.configuration.websocketProvider.connect()
}

disconnect() {
this.disconnectBroadcastChannel()
this.configuration.websocketProvider.detach(this)
this.isConnected = false

if (!this.configuration.preserveConnection) {
this.configuration.websocketProvider.disconnect()
}
console.warn('HocuspocusProvider::disconnect() is deprecated and does not do anything. Please connect/disconnect on the websocketProvider, or attach/deattach providers.')

// this.disconnectBroadcastChannel()
// this.configuration.websocketProvider.detach(this)
// this.isConnected = false
//
// if (!this.configuration.preserveConnection) {
// this.configuration.websocketProvider.disconnect()
// }
//
}

async onOpen(event: Event) {
this.isAuthenticated = false
this.isConnected = true
// this.isConnected = true

this.emit('open', { event })

Expand Down Expand Up @@ -441,13 +439,13 @@ export class HocuspocusProvider extends EventEmitter {
}

send(message: ConstructableOutgoingMessage, args: any, broadcast = false) {
if (!this.isConnected) {
return
}
// if (!this.isConnected) {
// return
// }

if (broadcast) {
this.mux(() => { this.broadcast(message, args) })
}
// if (broadcast) {
// this.mux(() => { this.broadcast(message, args) })
// }

const messageSender = new MessageSender(message, args)

Expand Down Expand Up @@ -505,13 +503,14 @@ export class HocuspocusProvider extends EventEmitter {
this.configuration.websocketProvider.off('close', this.boundOnClose)
this.configuration.websocketProvider.off('close', this.configuration.onClose)
this.configuration.websocketProvider.off('close', this.forwardClose)
this.configuration.websocketProvider.off('status', this.boundOnStatus)
// this.configuration.websocketProvider.off('status', this.boundOnStatus)
this.configuration.websocketProvider.off('disconnect', this.configuration.onDisconnect)
this.configuration.websocketProvider.off('disconnect', this.forwardDisconnect)
this.configuration.websocketProvider.off('destroy', this.configuration.onDestroy)
this.configuration.websocketProvider.off('destroy', this.forwardDestroy)

this.send(CloseMessage, { documentName: this.configuration.name })
this.configuration.websocketProvider.detach(this)
this.disconnect()

if (typeof window === 'undefined' || !('removeEventListener' in window)) {
Expand All @@ -524,8 +523,8 @@ export class HocuspocusProvider extends EventEmitter {
permissionDeniedHandler(reason: string) {
this.emit('authenticationFailed', { reason })
this.isAuthenticated = false
this.disconnect()
this.status = WebSocketStatus.Disconnected
// this.disconnect()
// this.status = WebSocketStatus.Disconnected
}

authenticatedHandler(scope: string) {
Expand All @@ -535,73 +534,73 @@ export class HocuspocusProvider extends EventEmitter {
this.emit('authenticated')
}

get broadcastChannel() {
return `${this.configuration.name}`
}

broadcastChannelSubscriber(data: ArrayBuffer) {
this.mux(() => {
const message = new IncomingMessage(data)

const documentName = message.readVarString()

message.writeVarString(documentName)

new MessageReceiver(message)
.setBroadcasted(true)
.apply(this, false)
})
}

subscribeToBroadcastChannel() {
if (!this.subscribedToBroadcastChannel) {
bc.subscribe(this.broadcastChannel, this.boundBroadcastChannelSubscriber)
this.subscribedToBroadcastChannel = true
}

this.mux(() => {
this.broadcast(SyncStepOneMessage, { document: this.document, documentName: this.configuration.name })
this.broadcast(SyncStepTwoMessage, { document: this.document, documentName: this.configuration.name })
this.broadcast(QueryAwarenessMessage, { document: this.document, documentName: this.configuration.name })
if (this.awareness) {
this.broadcast(AwarenessMessage, {
awareness: this.awareness,
clients: [this.document.clientID],
document: this.document,
documentName: this.configuration.name,
})
}
})
}

disconnectBroadcastChannel() {
// broadcast message with local awareness state set to null (indicating disconnect)
if (this.awareness) {
this.send(AwarenessMessage, {
awareness: this.awareness,
clients: [this.document.clientID],
states: new Map(),
documentName: this.configuration.name,
}, true)
}

if (this.subscribedToBroadcastChannel) {
bc.unsubscribe(this.broadcastChannel, this.boundBroadcastChannelSubscriber)
this.subscribedToBroadcastChannel = false
}
}

broadcast(Message: ConstructableOutgoingMessage, args?: any) {
if (!this.configuration.broadcast) {
return
}

if (!this.subscribedToBroadcastChannel) {
return
}

new MessageSender(Message, args).broadcast(this.broadcastChannel)
}
// get broadcastChannel() {
// return `${this.configuration.name}`
// }
//
// broadcastChannelSubscriber(data: ArrayBuffer) {
// this.mux(() => {
// const message = new IncomingMessage(data)
//
// const documentName = message.readVarString()
//
// message.writeVarString(documentName)
//
// new MessageReceiver(message)
// .setBroadcasted(true)
// .apply(this, false)
// })
// }

// subscribeToBroadcastChannel() {
// if (!this.subscribedToBroadcastChannel) {
// bc.subscribe(this.broadcastChannel, this.boundBroadcastChannelSubscriber)
// this.subscribedToBroadcastChannel = true
// }
//
// this.mux(() => {
// this.broadcast(SyncStepOneMessage, { document: this.document, documentName: this.configuration.name })
// this.broadcast(SyncStepTwoMessage, { document: this.document, documentName: this.configuration.name })
// this.broadcast(QueryAwarenessMessage, { document: this.document, documentName: this.configuration.name })
// if (this.awareness) {
// this.broadcast(AwarenessMessage, {
// awareness: this.awareness,
// clients: [this.document.clientID],
// document: this.document,
// documentName: this.configuration.name,
// })
// }
// })
// }
//
// disconnectBroadcastChannel() {
// // broadcast message with local awareness state set to null (indicating disconnect)
// if (this.awareness) {
// this.send(AwarenessMessage, {
// awareness: this.awareness,
// clients: [this.document.clientID],
// states: new Map(),
// documentName: this.configuration.name,
// }, true)
// }
//
// if (this.subscribedToBroadcastChannel) {
// bc.unsubscribe(this.broadcastChannel, this.boundBroadcastChannelSubscriber)
// this.subscribedToBroadcastChannel = false
// }
// }

// broadcast(Message: ConstructableOutgoingMessage, args?: any) {
// if (!this.configuration.broadcast) {
// return
// }
//
// if (!this.subscribedToBroadcastChannel) {
// return
// }
//
// new MessageSender(Message, args).broadcast(this.broadcastChannel)
// }

setAwarenessField(key: string, value: any) {
if (!this.awareness) {
Expand Down
8 changes: 3 additions & 5 deletions packages/provider/src/HocuspocusProviderWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,9 @@ export class HocuspocusProviderWebsocket extends EventEmitter {
provider.onOpen(this.receivedOnOpenPayload)
}

if (this.receivedOnStatusPayload) {
provider.onStatus(this.receivedOnStatusPayload)
}

return connectPromise
// if (this.receivedOnStatusPayload) {
// provider.onStatus(this.receivedOnStatusPayload)
// }
}

detach(provider: HocuspocusProvider) {
Expand Down

0 comments on commit 6ab93ef

Please sign in to comment.