Skip to content

Commit

Permalink
expose stateIsSyncronized flag to check if the crdt state is initiali…
Browse files Browse the repository at this point in the history
…zed (#1020)

* expose stateIsSyncronized flag to check if the crdt state is initialized

* fix system

* move logic to comms flag instead of player enter/leave scnee

* add a max amount of retries

* increment the delay to 5s

* prevent for reconnecting if its disconnected

* avoid requesting state on changes that are not in the bool flag

* revert

* add logs

* remove commented code
  • Loading branch information
gonpombo8 authored Oct 18, 2024
1 parent d9e3bec commit c070ea9
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 33 deletions.
9 changes: 3 additions & 6 deletions packages/@dcl/sdk/src/network/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ import { addSyncTransport } from './message-bus-sync'
import { getUserData } from '~system/UserIdentity'

// initialize sync transport for sdk engine
const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild } = addSyncTransport(
engine,
sendBinary,
getUserData
)
const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild, isStateSyncronized } =
addSyncTransport(engine, sendBinary, getUserData)

export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent }
export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, isStateSyncronized }
99 changes: 72 additions & 27 deletions packages/@dcl/sdk/src/network/message-bus-sync.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IEngine, Transport, RealmInfo } from '@dcl/ecs'
import { IEngine, Transport, RealmInfo, PlayerIdentityData } from '@dcl/ecs'
import { type SendBinaryRequest, type SendBinaryResponse } from '~system/CommunicationsController'

import { syncFilter } from './filter'
Expand Down Expand Up @@ -34,8 +34,11 @@ export function addSyncTransport(
pendingMessageBusMessagesToSend.length = 0
return messages
}
const players = definePlayerHelper(engine)

let stateIsSyncronized = false
let transportInitialzed = false

// Add Sync Transport
const transport: Transport = {
filter: syncFilter(engine),
Expand All @@ -55,61 +58,103 @@ export function addSyncTransport(
engine.addTransport(transport)
// End add sync transport

// If we dont have any state initialized, and recieve a state message.
// Receive & Process CRDT_STATE
binaryMessageBus.on(CommsMessage.RES_CRDT_STATE, (value) => {
const { sender, data } = decodeCRDTState(value)
if (sender !== myProfile.userId) return
DEBUG_NETWORK_MESSAGES() && console.log('[Processing CRDT State]', data.byteLength)
transport.onmessage!(data)
stateIsSyncronized = true
})

binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, (message, userId) => {
// Answer to REQ_CRDT_STATE
binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, async (message, userId) => {
console.log(`Sending CRDT State to: ${userId}`)
transport.onmessage!(message)
binaryMessageBus.emit(CommsMessage.RES_CRDT_STATE, encodeCRDTState(userId, engineToCrdt(engine)))
})

const players = definePlayerHelper(engine)
// Process CRDT messages here
binaryMessageBus.on(CommsMessage.CRDT, (value) => {
DEBUG_NETWORK_MESSAGES() &&
console.log(Array.from(serializeCrdtMessages('[NetworkMessage received]:', value, engine)))
transport.onmessage!(value)
})

let requestCrdtStateWhenConnected = false
async function requestState(retryCount: number = 1) {
let players = Array.from(engine.getEntitiesWith(PlayerIdentityData))
DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`)

players.onEnterScene((player) => {
DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId)
if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) {
if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) {
DEBUG_NETWORK_MESSAGES() && console.log('Requesting state')
binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine))
if (!RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) {
DEBUG_NETWORK_MESSAGES() && console.log(`Aborting Requesting state?. Disconnected`)
return
}

binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine))

// Wait ~5s for the response.
await sleep(5000)

players = Array.from(engine.getEntitiesWith(PlayerIdentityData))

if (!stateIsSyncronized) {
if (players.length > 1 && retryCount <= 2) {
DEBUG_NETWORK_MESSAGES() &&
console.log(`Requesting state again ${retryCount} (no response). Players connected: ${players.length - 1}`)
void requestState(retryCount + 1)
} else {
DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted')
requestCrdtStateWhenConnected = true
DEBUG_NETWORK_MESSAGES() && console.log('No active players. State syncronized')
stateIsSyncronized = true
}
}
}

players.onEnterScene((player) => {
DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId)
})

// Asks for the REQ_CRDT_STATE when its connected to comms
RealmInfo.onChange(engine.RootEntity, (value) => {
if (value?.isConnectedSceneRoom && requestCrdtStateWhenConnected) {
DEBUG_NETWORK_MESSAGES() && console.log('Requesting state.')
requestCrdtStateWhenConnected = false
binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine))
if (!value?.isConnectedSceneRoom) {
DEBUG_NETWORK_MESSAGES() && console.log('Disconnected from comms')
stateIsSyncronized = false
}

if (value?.isConnectedSceneRoom) {
DEBUG_NETWORK_MESSAGES() && console.log('Connected to comms')
}

if (value?.isConnectedSceneRoom && !stateIsSyncronized) {
void requestState()
}
})

players.onLeaveScene((userId) => {
DEBUG_NETWORK_MESSAGES() && console.log('[onLeaveScene]', userId)
if (userId === myProfile.userId) {
requestCrdtStateWhenConnected = false
}
})

// Process CRDT messages here
binaryMessageBus.on(CommsMessage.CRDT, (value) => {
DEBUG_NETWORK_MESSAGES() &&
console.log(Array.from(serializeCrdtMessages('[NetworkMessage received]:', value, engine)))
transport.onmessage!(value)
})
function isStateSyncronized() {
return stateIsSyncronized
}

function sleep(ms: number) {
return new Promise<void>((resolve) => {
let timer = 0
function sleepSystem(dt: number) {
timer += dt
if (timer * 1000 >= ms) {
engine.removeSystem(sleepSystem)
resolve()
}
}
engine.addSystem(sleepSystem)
})
}

return {
...entityDefinitions,
myProfile
myProfile,
isStateSyncronized
}
}

Expand Down

0 comments on commit c070ea9

Please sign in to comment.