Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(realtime): add system event #589

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Sources/Realtime/RealtimeChannel+AsyncAwait.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ extension RealtimeChannelV2 {

return stream
}

/// Listen for `system` event.
public func system() -> AsyncStream<RealtimeMessageV2> {
let (stream, continuation) = AsyncStream<RealtimeMessageV2>.makeStream()

let subscription = onSystem {
continuation.yield($0)
}

continuation.onTermination = { _ in
subscription.cancel()
}

return stream
}

/// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
@available(*, deprecated, renamed: "broadcastStream(event:)")
Expand Down
29 changes: 29 additions & 0 deletions Sources/Realtime/V2/CallbackManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ final class CallbackManager: Sendable {
}
}

@discardableResult
func addSystemCallback(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Int {
mutableState.withValue {
$0.id += 1
$0.callbacks.append(.system(SystemCallback(id: $0.id, callback: callback)))
return $0.id
}
}

func setServerChanges(changes: [PostgresJoinConfig]) {
mutableState.withValue {
$0.serverChanges = changes
Expand Down Expand Up @@ -145,6 +154,19 @@ final class CallbackManager: Sendable {
}
}

func triggerSystem(message: RealtimeMessageV2) {
let systemCallbacks = mutableState.callbacks.compactMap {
if case .system(let callback) = $0 {
return callback
}
return nil
}

for systemCallback in systemCallbacks {
systemCallback.callback(message)
}
}

func reset() {
mutableState.setValue(MutableState())
}
Expand All @@ -167,16 +189,23 @@ struct PresenceCallback {
var callback: @Sendable (any PresenceAction) -> Void
}

struct SystemCallback {
var id: Int
var callback: @Sendable (RealtimeMessageV2) -> Void
}

enum RealtimeCallback {
case postgres(PostgresCallback)
case broadcast(BroadcastCallback)
case presence(PresenceCallback)
case system(SystemCallback)

var id: Int {
switch self {
case let .postgres(callback): callback.id
case let .broadcast(callback): callback.id
case let .presence(callback): callback.id
case let .system(callback): callback.id
}
}
}
50 changes: 39 additions & 11 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import ConcurrencyExtras
import Foundation
import Helpers
import HTTPTypes
import Helpers

#if canImport(FoundationNetworking)
import FoundationNetworking
Expand Down Expand Up @@ -59,7 +59,9 @@ extension Socket {
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
httpSend: { [weak client] in
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
}
)
}
}
Expand Down Expand Up @@ -185,7 +187,8 @@ public final class RealtimeChannelV2: Sendable {
@available(
*,
deprecated,
message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
message:
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
)
public func updateAuth(jwt: String?) async {
logger?.debug("Updating auth token for channel \(topic)")
Expand Down Expand Up @@ -238,8 +241,8 @@ public final class RealtimeChannelV2: Sendable {
event: event,
payload: message,
private: config.isPrivate
),
],
)
]
]
)
)
Expand Down Expand Up @@ -295,20 +298,27 @@ public final class RealtimeChannelV2: Sendable {

func onMessage(_ message: RealtimeMessageV2) async {
do {
guard let eventType = message.eventType else {
guard let eventType = message._eventType else {
logger?.debug("Received message without event type: \(message)")
return
}

switch eventType {
case .tokenExpired:
logger?.debug(
"Received token expired event. This should not happen, please report this warning."
)
// deprecated type
break

case .system:
logger?.debug("Subscribed to channel \(message.topic)")
status = .subscribed
if message.status == .ok {
logger?.debug("Subscribed to channel \(message.topic)")
status = .subscribed
} else {
logger?.debug(
"Failed to subscribe to channel \(message.topic): \(message.payload)"
)
}

callbackManager.triggerSystem(message: message)

case .reply:
guard
Expand Down Expand Up @@ -545,6 +555,24 @@ public final class RealtimeChannelV2: Sendable {
}
}

/// Listen for `system` event.
public func onSystem(
callback: @escaping @Sendable (RealtimeMessageV2) -> Void
) -> RealtimeSubscription {
let id = callbackManager.addSystemCallback(callback: callback)
return RealtimeSubscription { [weak callbackManager, logger] in
logger?.debug("Removing system callback with id: \(id)")
callbackManager?.removeCallback(id: id)
}
}

/// Listen for `system` event.
public func onSystem(
callback: @escaping @Sendable () -> Void
) -> RealtimeSubscription {
self.onSystem { _ in callback() }
}

@discardableResult
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
let push = mutableState.withValue {
Expand Down
21 changes: 15 additions & 6 deletions Sources/Realtime/V2/RealtimeMessageV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
self.payload = payload
}

var status: PushStatus? {
/// Status for the received message if any.
public var status: PushStatus? {
payload["status"]
.flatMap(\.stringValue)
.flatMap(PushStatus.init(rawValue:))
}

public var eventType: EventType? {
@available(
*, deprecated,
message: "Access to event type will be removed, please inspect raw event value instead."
)
public var eventType: EventType? { _eventType }

var _eventType: EventType? {
switch event {
case ChannelEvent.system where status == .ok: .system
case ChannelEvent.system: .system
case ChannelEvent.postgresChanges:
.postgresChanges
case ChannelEvent.broadcast:
Expand All @@ -44,9 +51,6 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
.presenceDiff
case ChannelEvent.presenceState:
.presenceState
case ChannelEvent.system
where payload["message"]?.stringValue?.contains("access token has expired") == true:
.tokenExpired
case ChannelEvent.reply:
.reply
default:
Expand All @@ -62,6 +66,11 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
case error
case presenceDiff
case presenceState
@available(
*, deprecated,
message:
"tokenExpired gets returned as system, check payload for verifying if is a token expiration."
)
case tokenExpired
case reply
}
Expand Down
38 changes: 29 additions & 9 deletions Tests/RealtimeTests/CallbackManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import ConcurrencyExtras
import CustomDump
import Helpers
@testable import Realtime
import XCTest

@testable import Realtime

final class CallbackManagerTests: XCTestCase {
func testIntegration() {
let callbackManager = CallbackManager()
Expand Down Expand Up @@ -52,13 +53,15 @@ final class CallbackManagerTests: XCTestCase {
let callbackManager = CallbackManager()
XCTAssertNoLeak(callbackManager)

let changes = [PostgresJoinConfig(
event: .update,
schema: "public",
table: "users",
filter: nil,
id: 1
)]
let changes = [
PostgresJoinConfig(
event: .update,
schema: "public",
table: "users",
filter: nil,
id: 1
)
]

callbackManager.setServerChanges(changes: changes)

Expand Down Expand Up @@ -118,7 +121,8 @@ final class CallbackManagerTests: XCTestCase {
receivedActions.withValue { $0.append(action) }
}

let deleteSpecificUserId = callbackManager
let deleteSpecificUserId =
callbackManager
.addPostgresCallback(filter: deleteSpecificUserFilter) { action in
receivedActions.withValue { $0.append(action) }
}
Expand Down Expand Up @@ -215,6 +219,22 @@ final class CallbackManagerTests: XCTestCase {
expectNoDifference(receivedAction.value?.joins, joins)
expectNoDifference(receivedAction.value?.leaves, leaves)
}

func testTriggerSystem() {
let callbackManager = CallbackManager()

let receivedMessage = LockIsolated(RealtimeMessageV2?.none)
callbackManager.addSystemCallback { message in
receivedMessage.setValue(message)
}

callbackManager.triggerSystem(
message: RealtimeMessageV2(
joinRef: nil, ref: nil, topic: "test", event: "system", payload: ["status": "ok"]))

XCTAssertEqual(receivedMessage.value?._eventType, .system)
XCTAssertEqual(receivedMessage.value?.status, .ok)
}
}

extension XCTestCase {
Expand Down
13 changes: 11 additions & 2 deletions Tests/RealtimeTests/RealtimeChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
//

import InlineSnapshotTesting
@testable import Realtime
import XCTest
import XCTestDynamicOverlay

@testable import Realtime

final class RealtimeChannelTests: XCTestCase {
let sut = RealtimeChannelV2(
topic: "topic",
Expand Down Expand Up @@ -48,9 +49,13 @@ final class RealtimeChannelTests: XCTestCase {

sut.onPresenceChange { _ in }.store(in: &subscriptions)

sut.onSystem {
}
.store(in: &subscriptions)

assertInlineSnapshot(of: sut.callbackManager.callbacks, as: .dump) {
"""
7 elements
8 elements
▿ RealtimeCallback
▿ postgres: PostgresCallback
- callback: (Function)
Expand Down Expand Up @@ -112,6 +117,10 @@ final class RealtimeChannelTests: XCTestCase {
▿ presence: PresenceCallback
- callback: (Function)
- id: 7
▿ RealtimeCallback
▿ system: SystemCallback
- callback: (Function)
- id: 8

"""
}
Expand Down
Loading
Loading