Skip to content

Commit

Permalink
feat: Waku Sync Protocol
Browse files Browse the repository at this point in the history
periodic sync & peer manager

Fixes

ingess messages

Bindings first draft

Wrapping

String to/from bytes

Vector

added missing binds & wraps

added header files

feat: negentropy c integ (#2448)

* chore: move negentropy to a submodule

* chore: add negentropy folder in vendor dir

* moved submodule to c-wrapper branch

* chore: updated negentropy

* chore: udpate submodule URL to use https

* chore: started integrating negetropy C wrapper

* chore: fixed all compilation errors wrt C-wrapper integration

* chore: include sync peers to be returned as part of REST API

* chore: tested insert into storage and changes done for it.

* chore: experimenting with callback

* chore: first test for sync

* chore: revert callback changes

* chore: revert temp changes

* chore: write tests to verify c integration

* draft: in progress changes to integrate callback based method from C

* chore: in progress callback integration

* chore: first working sync example with c bindings

* feat: added few tests for sync protocol

* chore: copy negentropy so for build to work

* chore: add negentropy as dependency for test targets

* chore: try to fix CI compilation issue of negentropy

* chore: apply suggestions from code review

Co-authored-by: Ivan FB <[email protected]>

* chore: fix naming convention changes

---------

Co-authored-by: Ivan FB <[email protected]>

update ref

update submodule

chore: consider leak fix in negentropy

wrapping pointers

wrapping pointers

clean-up, renaming, wrapping
  • Loading branch information
SionoiS committed Mar 7, 2024
1 parent 3e7a950 commit 203c0fc
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 92 deletions.
36 changes: 16 additions & 20 deletions tests/waku_sync/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,49 +29,45 @@ suite "Waku Sync - Protocol Tests":

asyncTest "test c integration":
let
s1 = negentropyNewStorage()
s2 = negentropyNewStorage()
ng1 = negentropyNew(s1,10000)
ng2 = negentropyNew(s2,10000)
s1 = Storage.new()
s2 = Storage.new()
ng1 = Negentropy.new(s1,10000)
ng2 = Negentropy.new(s2,10000)

let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg1)
var ret = negentropyStorageInsert(s1, msg1.timestamp, msgHash)

check:
ret == true

ret = negentropyStorageInsert(s2, msg1.timestamp, msgHash)
check:
ret == true
s1.insert(msg1.timestamp, msgHash).isOk()
s2.insert(msg1.timestamp, msgHash).isOk()

let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic)
let msgHash2: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2)
ret = negentropyStorageInsert(s2, msg2.timestamp, msgHash2)

check:
ret == true
s2.insert(msg2.timestamp, msgHash2).isOk()

let ng1_q1 = negentropyInitiate(ng1)
let ng1_q1 = ng1.initiate()
check:
ng1_q1.len > 0
ng1_q1.isOk()

let ng2_q1 = negentropyServerReconcile(ng2, ng1_q1)
let ng2_q1 = ng2.serverReconcile(ng1_q1.get())
check:
ng2_q1.len > 0
ng2_q1.isOk()

var
haveHashes: seq[WakuMessageHash]
needHashes: seq[WakuMessageHash]
let ng1_q2 = negentropyClientReconcile(ng1, ng2_q1, haveHashes, needHashes)
let ng1_q2 = ng1.clientReconcile(ng2_q1.get(), haveHashes, needHashes)

check:
needHashes.len() == 1
haveHashes.len() == 0
ng1_q2.len == 0
ng1_q2.isOk()
needHashes[0] == msgHash2

ret = negentropyStorageErase(s1, msg1.timestamp, msgHash)
check:
ret == true
s1.erase(msg1.timestamp, msgHash).isOk()

asyncTest "sync 2 nodes different hashes":
## Setup
Expand Down
93 changes: 51 additions & 42 deletions waku/waku_sync/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,49 +29,48 @@ const DefaultFrameSize = 153600 # using a random number for now
const DefaultSyncInterval = 60.minutes

type
#TODO maybe add the remote peer info?
WakuSyncCallback* = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.}

WakuSync* = ref object of LPProtocol
storage: pointer
negentropy: pointer
storage: Storage
negentropy: Negentropy
peerManager: PeerManager
maxFrameSize: int # Not sure if this should be protocol defined or not...
syncInterval: Duration
callback: Option[WakuSyncCallback]
periodicSyncFut: Future[void]

proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
return

let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
debug "inserting message into storage ", hash=msgHash
let result: bool = negentropyStorageInsert(self.storage, msg.timestamp, msgHash)
if not result :

if self.storage.insert(msg.timestamp, msgHash).isErr():
debug "failed to insert message ", hash=msgHash.toHex()

proc serverReconciliation(self: WakuSync, message: seq[byte]): Result[seq[byte], string] =
let payload: seq[byte] = negentropyServerReconcile(self.negentropy, message)
ok(payload)
proc serverReconciliation(self: WakuSync, payload: NegentropyPayload): Result[NegentropyPayload, string] =
return self.negentropy.serverReconcile(payload)

proc clientReconciliation(
self: WakuSync, message: seq[byte],
self: WakuSync, payload: NegentropyPayload,
haveHashes: var seq[WakuMessageHash],
needHashes: var seq[WakuMessageHash],
): Result[Option[seq[byte]], string] =
let payload: seq[byte] = negentropyClientReconcile(self.negentropy, message, haveHashes, needHashes)
ok(some(payload))
): Result[Option[NegentropyPayload], string] =
return self.negentropy.clientReconcile(payload, haveHashes, needHashes)

proc intitialization(self: WakuSync): Future[Result[seq[byte], string]] {.async.} =
let payload: seq[byte] = negentropyInitiate(self.negentropy)
info "initialized negentropy ", value=payload

ok(payload)
proc intitialization(self: WakuSync): Future[Result[NegentropyPayload, string]] {.async.} =
return self.negentropy.initiate()

proc request(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let request: seq[byte] = (await self.intitialization()).valueOr:
let payload = (await self.intitialization()).valueOr:
return err(error)
debug "sending request to server", req=request
let writeRes = catch: await conn.writeLP(request)

debug "sending request to server", payload = toHex(seq[byte](payload))

let writeRes = catch: await conn.writeLP(seq[byte](payload))
if writeRes.isErr():
return err(writeRes.error.msg)

Expand All @@ -81,24 +80,28 @@ proc request(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHas

while true:
let readRes = catch: await conn.readLp(self.maxFrameSize)

let buffer: seq[byte] = readRes.valueOr:
return err(error.msg)
debug "Received Sync request from peer", request=buffer
let responseOpt: Option[seq[byte]] = self.clientReconciliation(buffer, haveHashes, needHashes).valueOr:

debug "Received Sync request from peer", payload = toHex(buffer)

let request = NegentropyPayload(buffer)

let responseOpt = self.clientReconciliation(request, haveHashes, needHashes).valueOr:
return err(error)

let response: seq[byte] =
if responseOpt.isNone() or responseOpt.get().len == 0:
debug "Closing connection as sync response is none"
let response = responseOpt.valueOr:
debug "Closing connection, sync session is done"
await conn.close()
break
else:
responseOpt.get()
debug "Sending Sync response to peer", response=response
let writeRes = catch: await conn.writeLP(response)

debug "Sending Sync response to peer", payload = toHex(seq[byte](response))

let writeRes = catch: await conn.writeLP(seq[byte](response))
if writeRes.isErr():
return err(writeRes.error.msg)
#Need to handle empty needhashes return

return ok(needHashes)

proc sync*(self: WakuSync): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
Expand All @@ -124,16 +127,21 @@ proc sync*(self: WakuSync, peer: RemotePeerInfo): Future[Result[seq[WakuMessageH

proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
while not conn.isClosed: # Not sure if this works as I think it does...
# Not sure if this works as I think it does...
while not conn.isClosed:
let requestRes = catch: await conn.readLp(self.maxFrameSize)
let buffer: seq[byte] = requestRes.valueOr:

let buffer = requestRes.valueOr:
error "Connection reading error", error=error.msg
return
let response: seq[byte] = self.serverReconciliation(buffer).valueOr:

let request = NegentropyPayload(buffer)

let response = self.serverReconciliation(request).valueOr:
error "Reconciliation error", error=error
return

let writeRes= catch: await conn.writeLP(response)
let writeRes= catch: await conn.writeLP(seq[byte](response))
if writeRes.isErr():
error "Connection write error", error=writeRes.error.msg
return
Expand All @@ -147,9 +155,8 @@ proc new*(T: type WakuSync,
syncInterval: Duration = DefaultSyncInterval,
callback: Option[WakuSyncCallback] = none(WakuSyncCallback)
): T =
let storage = negentropyNewStorage()

let negentropy = negentropyNew(storage, uint64(maxFrameSize))
let storage = Storage.new()
let negentropy = Negentropy.new(storage, uint64(maxFrameSize))

let sync = WakuSync(
storage: storage,
Expand All @@ -167,20 +174,22 @@ proc new*(T: type WakuSync,
return sync

proc periodicSync(self: WakuSync) {.async.} =
while self.started and self.callback.isSome():
while true:
await sleepAsync(self.syncInterval)

let hashes = (await self.sync()).valueOr:
error "periodic sync error", error = error
continue

let callback = self.callback.get()
let callback = self.callback.valueOr:
continue

await callback(hashes)

proc start*(self: WakuSync) =
self.started = true

asyncSpawn self.periodicSync()

proc stop*(self: WakuSync) =
self.started = false
self.periodicSyncFut = self.periodicSync()
proc stopWait*(self: WakuSync) {.async.} =
await self.periodicSyncFut.cancelAndWait()
Loading

0 comments on commit 203c0fc

Please sign in to comment.