Skip to content

Commit

Permalink
refactor(realtime): general improvements (#552)
Browse files Browse the repository at this point in the history
* refactor(realtime): general improvements for realtime

* remove select action

* add more tests
  • Loading branch information
grdsdev authored Oct 9, 2024
1 parent 494e8f4 commit bfb6620
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 191 deletions.
70 changes: 70 additions & 0 deletions Sources/Realtime/Deprecated.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,76 @@
//

import Foundation
import Helpers

@available(*, deprecated, renamed: "RealtimeMessage")
public typealias Message = RealtimeMessage

extension RealtimeClientV2 {
@available(*, deprecated, renamed: "channels")
public var subscriptions: [String: RealtimeChannelV2] {
channels
}

@available(*, deprecated, renamed: "RealtimeClientOptions")
public struct Configuration: Sendable {
var url: URL
var apiKey: String
var headers: [String: String]
var heartbeatInterval: TimeInterval
var reconnectDelay: TimeInterval
var timeoutInterval: TimeInterval
var disconnectOnSessionLoss: Bool
var connectOnSubscribe: Bool
var logger: (any SupabaseLogger)?

public init(
url: URL,
apiKey: String,
headers: [String: String] = [:],
heartbeatInterval: TimeInterval = 15,
reconnectDelay: TimeInterval = 7,
timeoutInterval: TimeInterval = 10,
disconnectOnSessionLoss: Bool = true,
connectOnSubscribe: Bool = true,
logger: (any SupabaseLogger)? = nil
) {
self.url = url
self.apiKey = apiKey
self.headers = headers
self.heartbeatInterval = heartbeatInterval
self.reconnectDelay = reconnectDelay
self.timeoutInterval = timeoutInterval
self.disconnectOnSessionLoss = disconnectOnSessionLoss
self.connectOnSubscribe = connectOnSubscribe
self.logger = logger
}
}

@available(*, deprecated, renamed: "RealtimeClientStatus")
public typealias Status = RealtimeClientStatus

@available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)")
public convenience init(config: Configuration) {
self.init(
url: config.url,
options: RealtimeClientOptions(
headers: config.headers,
heartbeatInterval: config.heartbeatInterval,
reconnectDelay: config.reconnectDelay,
timeoutInterval: config.timeoutInterval,
disconnectOnSessionLoss: config.disconnectOnSessionLoss,
connectOnSubscribe: config.connectOnSubscribe,
logger: config.logger
)
)
}
}

extension RealtimeChannelV2 {
@available(*, deprecated, renamed: "RealtimeSubscription")
public typealias Subscription = ObservationToken

@available(*, deprecated, renamed: "RealtimeChannelStatus")
public typealias Status = RealtimeChannelStatus
}
12 changes: 0 additions & 12 deletions Sources/Realtime/RealtimeChannel+AsyncAwait.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ extension RealtimeChannelV2 {
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: SelectAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<SelectAction> {
postgresChange(event: .select, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? SelectAction }
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: AnyAction.Type,
Expand Down
11 changes: 0 additions & 11 deletions Sources/Realtime/V2/PostgresAction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,18 @@ public struct DeleteAction: PostgresAction, HasOldRecord, HasRawMessage {
public let rawMessage: RealtimeMessageV2
}

public struct SelectAction: PostgresAction, HasRecord, HasRawMessage {
public static let eventType: PostgresChangeEvent = .select

public let columns: [Column]
public let commitTimestamp: Date
public let record: [String: AnyJSON]
public let rawMessage: RealtimeMessageV2
}

public enum AnyAction: PostgresAction, HasRawMessage {
public static let eventType: PostgresChangeEvent = .all

case insert(InsertAction)
case update(UpdateAction)
case delete(DeleteAction)
case select(SelectAction)

var wrappedAction: any PostgresAction & HasRawMessage {
switch self {
case let .insert(action): action
case let .update(action): action
case let .delete(action): action
case let .select(action): action
}
}

Expand Down
69 changes: 24 additions & 45 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public struct RealtimeChannelConfig: Sendable {

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClientV2.Status
var status: @Sendable () -> RealtimeClientStatus
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () -> String?
var apiKey: @Sendable () -> String?
Expand Down Expand Up @@ -64,16 +64,6 @@ extension Socket {
}

public final class RealtimeChannelV2: Sendable {
@available(*, deprecated, renamed: "RealtimeSubscription")
public typealias Subscription = ObservationToken

public enum Status: Sendable {
case unsubscribed
case subscribing
case subscribed
case unsubscribing
}

struct MutableState {
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
Expand All @@ -88,14 +78,14 @@ public final class RealtimeChannelV2: Sendable {
let socket: Socket

let callbackManager = CallbackManager()
private let statusEventEmitter = EventEmitter<Status>(initialEvent: .unsubscribed)
private let statusEventEmitter = EventEmitter<RealtimeChannelStatus>(initialEvent: .unsubscribed)

public private(set) var status: Status {
public private(set) var status: RealtimeChannelStatus {
get { statusEventEmitter.lastEvent }
set { statusEventEmitter.emit(newValue) }
}

public var statusChange: AsyncStream<Status> {
public var statusChange: AsyncStream<RealtimeChannelStatus> {
statusEventEmitter.stream()
}

Expand All @@ -105,7 +95,7 @@ public final class RealtimeChannelV2: Sendable {
///
/// - Note: Use ``statusChange`` if you prefer to use Async/Await.
public func onStatusChange(
_ listener: @escaping @Sendable (Status) -> Void
_ listener: @escaping @Sendable (RealtimeChannelStatus) -> Void
) -> ObservationToken {
statusEventEmitter.attach(listener)
}
Expand Down Expand Up @@ -137,10 +127,15 @@ public final class RealtimeChannelV2: Sendable {
await socket.connect()
}

guard status != .subscribed else {
logger?.warning("Channel \(topic) is already subscribed")
return
}

socket.addChannel(self)

status = .subscribing
logger?.debug("subscribing to channel \(topic)")
logger?.debug("Subscribing to channel \(topic)")

let joinConfig = RealtimeJoinConfig(
broadcast: config.broadcast,
Expand All @@ -157,7 +152,7 @@ public final class RealtimeChannelV2: Sendable {
let joinRef = socket.makeRef().description
mutableState.withValue { $0.joinRef = joinRef }

logger?.debug("subscribing to channel with body: \(joinConfig)")
logger?.debug("Subscribing to channel with body: \(joinConfig)")

await push(
RealtimeMessageV2(
Expand All @@ -175,17 +170,17 @@ public final class RealtimeChannelV2: Sendable {
}
} catch {
if error is TimeoutError {
logger?.debug("subscribe timed out.")
logger?.debug("Subscribe timed out.")
await subscribe()
} else {
logger?.error("subscribe failed: \(error)")
logger?.error("Subscribe failed: \(error)")
}
}
}

public func unsubscribe() async {
status = .unsubscribing
logger?.debug("unsubscribing from channel \(topic)")
logger?.debug("Unsubscribing from channel \(topic)")

await push(
RealtimeMessageV2(
Expand Down Expand Up @@ -324,7 +319,7 @@ public final class RealtimeChannelV2: Sendable {
)
}

func onMessage(_ message: RealtimeMessageV2) {
func onMessage(_ message: RealtimeMessageV2) async {
do {
guard let eventType = message.eventType else {
logger?.debug("Received message without event type: \(message)")
Expand All @@ -349,7 +344,7 @@ public final class RealtimeChannelV2: Sendable {
throw RealtimeError("Received a reply with unexpected payload: \(message)")
}

didReceiveReply(ref: ref, status: status)
await didReceiveReply(ref: ref, status: status)

if message.payload["response"]?.objectValue?.keys
.contains(ChannelEvent.postgresChanges) == true
Expand Down Expand Up @@ -409,16 +404,6 @@ public final class RealtimeChannelV2: Sendable {
)
)

case "SELECT":
action = .select(
SelectAction(
columns: postgresActions.columns,
commitTimestamp: postgresActions.commitTimestamp,
record: postgresActions.record ?? [:],
rawMessage: message
)
)

default:
throw RealtimeError("Unknown event type: \(postgresActions.type)")
}
Expand All @@ -435,13 +420,9 @@ public final class RealtimeChannelV2: Sendable {
callbackManager.triggerBroadcast(event: event, json: payload)

case .close:
Task { [weak self] in
guard let self else { return }

await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed
}
await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed

case .error:
logger?.debug(
Expand Down Expand Up @@ -601,12 +582,10 @@ public final class RealtimeChannelV2: Sendable {
return await push.send()
}

private func didReceiveReply(ref: String, status: String) {
Task {
let push = mutableState.withValue {
$0.pushes.removeValue(forKey: ref)
}
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
private func didReceiveReply(ref: String, status: String) async {
let push = mutableState.withValue {
$0.pushes.removeValue(forKey: ref)
}
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
}
}
Loading

0 comments on commit bfb6620

Please sign in to comment.