Skip to content

Commit

Permalink
fix: websocket concurrency id issues, switch to semaphores (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
koraykoska authored Oct 31, 2022
1 parent 561c1ff commit eff1a97
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions Sources/FoundationHTTP/Web3WebSocketProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class Web3WebSocketProvider: Web3Provider, Web3BidirectionalProvider {
public private(set) var webSocket: WebSocket!

// Stores ids and notification groups
private let pendingRequests: SynchronizedDictionary<Int, DispatchGroup> = [:]
private let pendingRequests: SynchronizedDictionary<Int, DispatchSemaphore> = [:]
// Stores responses as strings
private let pendingResponses: SynchronizedDictionary<Int, String> = [:]

Expand All @@ -41,9 +41,11 @@ public class Web3WebSocketProvider: Web3Provider, Web3BidirectionalProvider {
private var currentId = 1
private var nextId: Int {
get {
let retId = currentId
var retId: Int!

nextIdQueue.sync(flags: .barrier) {
retId = currentId

if currentId < UInt16.max {
currentId += 1
} else {
Expand Down Expand Up @@ -115,17 +117,16 @@ public class Web3WebSocketProvider: Web3Provider, Web3BidirectionalProvider {
return
}

let responseGroup = DispatchGroup()
self.pendingRequests[replacedIdRequest.id] = responseGroup
responseGroup.enter()
let responseSemaphore = DispatchSemaphore(value: 0)
self.pendingRequests[replacedIdRequest.id] = responseSemaphore

let promise = self.wsEventLoopGroup.next().makePromise(of: Void.self)
self.webSocket.send(String(data: body, encoding: .utf8) ?? "", promise: promise)
promise.futureResult.whenComplete { result in
switch result {
case .success(_):
self.queue.async {
let result = responseGroup.wait(timeout: DispatchTime(uptimeNanoseconds: DispatchTime.now().uptimeNanoseconds + self.timeoutNanoSeconds))
let result = responseSemaphore.wait(timeout: DispatchTime(uptimeNanoseconds: DispatchTime.now().uptimeNanoseconds + self.timeoutNanoSeconds))

defer {
// Remove from pending requests
Expand Down Expand Up @@ -271,7 +272,7 @@ public class Web3WebSocketProvider: Web3Provider, Web3BidirectionalProvider {

if let idOnly = try? self.decoder.decode(IdOnly.self, from: data) {
self.pendingResponses[idOnly.id] = string
self.pendingRequests[idOnly.id]?.leave()
self.pendingRequests[idOnly.id]?.signal()
} else if let subscriptionIdOnly = try? self.decoder.decode(SubscriptionIdOnly.self, from: data) {
self.pendingSubscriptionResponses[subscriptionIdOnly.params.subscription]?.append(string)
self.currentSubscriptions[subscriptionIdOnly.params.subscription]?.signal()
Expand Down

0 comments on commit eff1a97

Please sign in to comment.