From c070ea9e2c24483892875d921f621668a85f3a9a Mon Sep 17 00:00:00 2001 From: Gon Pombo Date: Fri, 18 Oct 2024 19:02:52 -0300 Subject: [PATCH] expose stateIsSyncronized flag to check if the crdt state is initialized (#1020) * expose stateIsSyncronized flag to check if the crdt state is initialized * fix system * move logic to comms flag instead of player enter/leave scnee * add a max amount of retries * increment the delay to 5s * prevent for reconnecting if its disconnected * avoid requesting state on changes that are not in the bool flag * revert * add logs * remove commented code --- packages/@dcl/sdk/src/network/index.ts | 9 +- .../@dcl/sdk/src/network/message-bus-sync.ts | 99 ++++++++++++++----- 2 files changed, 75 insertions(+), 33 deletions(-) diff --git a/packages/@dcl/sdk/src/network/index.ts b/packages/@dcl/sdk/src/network/index.ts index aa09f36ce..7acc82e34 100644 --- a/packages/@dcl/sdk/src/network/index.ts +++ b/packages/@dcl/sdk/src/network/index.ts @@ -4,10 +4,7 @@ import { addSyncTransport } from './message-bus-sync' import { getUserData } from '~system/UserIdentity' // initialize sync transport for sdk engine -const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild } = addSyncTransport( - engine, - sendBinary, - getUserData -) +const { getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, getFirstChild, isStateSyncronized } = + addSyncTransport(engine, sendBinary, getUserData) -export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent } +export { getFirstChild, getChildren, syncEntity, parentEntity, getParent, myProfile, removeParent, isStateSyncronized } diff --git a/packages/@dcl/sdk/src/network/message-bus-sync.ts b/packages/@dcl/sdk/src/network/message-bus-sync.ts index 0a9faf970..f2ecc2a56 100644 --- a/packages/@dcl/sdk/src/network/message-bus-sync.ts +++ b/packages/@dcl/sdk/src/network/message-bus-sync.ts @@ -1,4 +1,4 @@ -import { IEngine, Transport, RealmInfo } from '@dcl/ecs' +import { IEngine, Transport, RealmInfo, PlayerIdentityData } from '@dcl/ecs' import { type SendBinaryRequest, type SendBinaryResponse } from '~system/CommunicationsController' import { syncFilter } from './filter' @@ -34,8 +34,11 @@ export function addSyncTransport( pendingMessageBusMessagesToSend.length = 0 return messages } + const players = definePlayerHelper(engine) + let stateIsSyncronized = false let transportInitialzed = false + // Add Sync Transport const transport: Transport = { filter: syncFilter(engine), @@ -55,61 +58,103 @@ export function addSyncTransport( engine.addTransport(transport) // End add sync transport - // If we dont have any state initialized, and recieve a state message. + // Receive & Process CRDT_STATE binaryMessageBus.on(CommsMessage.RES_CRDT_STATE, (value) => { const { sender, data } = decodeCRDTState(value) if (sender !== myProfile.userId) return DEBUG_NETWORK_MESSAGES() && console.log('[Processing CRDT State]', data.byteLength) transport.onmessage!(data) + stateIsSyncronized = true }) - binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, (message, userId) => { + // Answer to REQ_CRDT_STATE + binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, async (message, userId) => { + console.log(`Sending CRDT State to: ${userId}`) transport.onmessage!(message) binaryMessageBus.emit(CommsMessage.RES_CRDT_STATE, encodeCRDTState(userId, engineToCrdt(engine))) }) - const players = definePlayerHelper(engine) + // Process CRDT messages here + binaryMessageBus.on(CommsMessage.CRDT, (value) => { + DEBUG_NETWORK_MESSAGES() && + console.log(Array.from(serializeCrdtMessages('[NetworkMessage received]:', value, engine))) + transport.onmessage!(value) + }) - let requestCrdtStateWhenConnected = false + async function requestState(retryCount: number = 1) { + let players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) + DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`) - players.onEnterScene((player) => { - DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId) - if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) { - if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { - DEBUG_NETWORK_MESSAGES() && console.log('Requesting state') - binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + if (!RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) { + DEBUG_NETWORK_MESSAGES() && console.log(`Aborting Requesting state?. Disconnected`) + return + } + + binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + + // Wait ~5s for the response. + await sleep(5000) + + players = Array.from(engine.getEntitiesWith(PlayerIdentityData)) + + if (!stateIsSyncronized) { + if (players.length > 1 && retryCount <= 2) { + DEBUG_NETWORK_MESSAGES() && + console.log(`Requesting state again ${retryCount} (no response). Players connected: ${players.length - 1}`) + void requestState(retryCount + 1) } else { - DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted') - requestCrdtStateWhenConnected = true + DEBUG_NETWORK_MESSAGES() && console.log('No active players. State syncronized') + stateIsSyncronized = true } } + } + + players.onEnterScene((player) => { + DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId) }) + // Asks for the REQ_CRDT_STATE when its connected to comms RealmInfo.onChange(engine.RootEntity, (value) => { - if (value?.isConnectedSceneRoom && requestCrdtStateWhenConnected) { - DEBUG_NETWORK_MESSAGES() && console.log('Requesting state.') - requestCrdtStateWhenConnected = false - binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine)) + if (!value?.isConnectedSceneRoom) { + DEBUG_NETWORK_MESSAGES() && console.log('Disconnected from comms') + stateIsSyncronized = false + } + + if (value?.isConnectedSceneRoom) { + DEBUG_NETWORK_MESSAGES() && console.log('Connected to comms') + } + + if (value?.isConnectedSceneRoom && !stateIsSyncronized) { + void requestState() } }) players.onLeaveScene((userId) => { DEBUG_NETWORK_MESSAGES() && console.log('[onLeaveScene]', userId) - if (userId === myProfile.userId) { - requestCrdtStateWhenConnected = false - } }) - // Process CRDT messages here - binaryMessageBus.on(CommsMessage.CRDT, (value) => { - DEBUG_NETWORK_MESSAGES() && - console.log(Array.from(serializeCrdtMessages('[NetworkMessage received]:', value, engine))) - transport.onmessage!(value) - }) + function isStateSyncronized() { + return stateIsSyncronized + } + + function sleep(ms: number) { + return new Promise((resolve) => { + let timer = 0 + function sleepSystem(dt: number) { + timer += dt + if (timer * 1000 >= ms) { + engine.removeSystem(sleepSystem) + resolve() + } + } + engine.addSystem(sleepSystem) + }) + } return { ...entityDefinitions, - myProfile + myProfile, + isStateSyncronized } }