Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bridge AsyncSequence to Combine Publisher #23823

Draft
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Modules/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ let package = Package(
.library(name: "WordPressFlux", targets: ["WordPressFlux"]),
.library(name: "WordPressShared", targets: ["WordPressShared"]),
.library(name: "WordPressUI", targets: ["WordPressUI"]),
.library(name: "AsyncCombine", targets: ["AsyncCombine"]),
],
dependencies: [
.package(url: "https://github.com/airbnb/lottie-ios", from: "4.4.0"),
Expand Down Expand Up @@ -49,6 +50,7 @@ let package = Package(
.package(url: "https://github.com/Automattic/wordpress-rs", revision: "alpha-swift-20240813"),
.package(url: "https://github.com/wordpress-mobile/GutenbergKit", revision: "6cc307e7fc24910697be5f71b7d70f465a9c0f63"),
.package(url: "https://github.com/Automattic/color-studio", branch: "trunk"),
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
],
targets: XcodeSupport.targets + [
.target(name: "JetpackStatsWidgetsCore"),
Expand All @@ -61,12 +63,14 @@ let package = Package(
.target(name: "WordPressSharedObjC", resources: [.process("Resources")]),
.target(name: "WordPressShared", dependencies: [.target(name: "WordPressSharedObjC")], resources: [.process("Resources")]),
.target(name: "WordPressUI", dependencies: [.target(name: "WordPressShared")], resources: [.process("Resources")]),
.target(name: "AsyncCombine"),
.testTarget(name: "JetpackStatsWidgetsCoreTests", dependencies: [.target(name: "JetpackStatsWidgetsCore")]),
.testTarget(name: "DesignSystemTests", dependencies: [.target(name: "DesignSystem")]),
.testTarget(name: "WordPressFluxTests", dependencies: ["WordPressFlux"]),
.testTarget(name: "WordPressSharedTests", dependencies: [.target(name: "WordPressShared")]),
.testTarget(name: "WordPressSharedObjCTests", dependencies: [.target(name: "WordPressShared")], resources: [.process("Resources")]),
.testTarget(name: "WordPressUITests", dependencies: [.target(name: "WordPressUI")]),
.testTarget(name: "AsyncCombineTests", dependencies: [.target(name: "AsyncCombine"), .product(name: "AsyncAlgorithms", package: "swift-async-algorithms")]),
]
)

Expand Down Expand Up @@ -134,6 +138,7 @@ enum XcodeSupport {
"WordPressFlux",
"WordPressShared",
"WordPressUI",
"AsyncCombine",
.product(name: "Alamofire", package: "Alamofire"),
.product(name: "AlamofireImage", package: "AlamofireImage"),
.product(name: "AutomatticAbout", package: "AutomatticAbout-swift"),
Expand Down
59 changes: 59 additions & 0 deletions Modules/Sources/AsyncCombine/Just.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import Foundation

public struct JustAsyncSequence<Element>: AsyncSequence {
var producer: () async -> Element

public init(_ output: Element) {
self.init({ output })
}

public init(_ producer: @escaping () async -> Element) {
self.producer = producer
}

public func makeAsyncIterator() -> Iterator {
Iterator(producer: producer)
}

public struct Iterator: AsyncIteratorProtocol {
var started = false
let producer: () async -> Element

public mutating func next() async -> Element? {
guard !started else { return nil }

started = true
let result = await producer()
return Task.isCancelled ? nil : result
}
}
}

public struct JustThrowingAsyncSequence<Element>: AsyncSequence {
var producer: () async throws -> Element

public init(_ error: Error) {
self.init({ throw error })
}

public init(_ producer: @escaping () async throws -> Element) {
self.producer = producer
}

public func makeAsyncIterator() -> Iterator {
Iterator(producer: producer)
}

public struct Iterator: AsyncIteratorProtocol {
var started = false
let producer: () async throws -> Element

public mutating func next() async throws -> Element? {
guard !started else { return nil }

started = true
let result = try await producer()
return Task.isCancelled ? nil : result
}
}
}
87 changes: 87 additions & 0 deletions Modules/Sources/AsyncCombine/Publisher.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import Foundation
import Combine

public extension AsyncSequence {
var publisher: AnyPublisher<Element, any Error> {
precondition(!(self is AsyncStream<Element>) && !(self is AsyncThrowingStream<Element, Error>), "Use sharedPublisher for AsyncStream and AsyncThrowingStream")

return StreamPublisher(sequence: self).eraseToAnyPublisher()
}

var sharedPublisher: Publishers.Share<AnyPublisher<Element, any Error>> {
StreamPublisher(sequence: self).eraseToAnyPublisher().share()
}
}

public extension AsyncStream {
@available(*, deprecated, message: "Use sharedPublisher for AsyncStream and AsyncThrowingStream")
var publisher: AnyPublisher<Element, any Error> {
fatalError("Use sharedPublisher for AsyncStream and AsyncThrowingStream")
}
}

public extension AsyncThrowingStream {
@available(*, deprecated, message: "Use sharedPublisher for AsyncStream and AsyncThrowingStream")
var publisher: AnyPublisher<Element, any Error> {
fatalError("Use sharedPublisher for AsyncStream and AsyncThrowingStream")
}
}

class StreamPublisher<Sequence: AsyncSequence>: Publisher {

typealias Output = Sequence.Element
typealias Failure = Error

let sequence: Sequence

init(sequence: Sequence) {
self.sequence = sequence
}

func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure {
let subscription = Subscription(sequence: sequence, subscriber: subscriber)
subscriber.receive(subscription: subscription)
}

class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure {
let sequence: Sequence

var subscriber: S?
var task: Task<Void, Never>?
var outputSent: Int = 0

init(sequence: Sequence, subscriber: S) {
self.sequence = sequence
self.subscriber = subscriber
}

func request(_ demand: Subscribers.Demand) {
task = Task {
do {
if let max = demand.max {
for try await element in sequence.prefix(max) {
try Task.checkCancellation()

_ = subscriber?.receive(element)
}
} else {
for try await element in sequence {
try Task.checkCancellation()

_ = subscriber?.receive(element)
}
}
subscriber?.receive(completion: .finished)
} catch {
subscriber?.receive(completion: .failure(error))
}
}
}

func cancel() {
task?.cancel()
task = nil
subscriber = nil
}
}
}
38 changes: 38 additions & 0 deletions Modules/Sources/AsyncCombine/Task.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import Foundation
import Combine

extension Task where Failure == Never {
var stream: AsyncStream<Success> {
AsyncStream(unfolding: { await self.value }, onCancel: cancel)
}

public var publisher: Publishers.Share<AnyPublisher<Success, Error>> {
stream.sharedPublisher
}
}

extension Task where Failure == Error {
var stream: AsyncThrowingStream<Success, Failure> {
let builder: (AsyncThrowingStream<Success, Failure>.Continuation) -> Void = { continuation in
Task<Void, Never> {
do {
let output = try await self.value
continuation.yield(output)
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
continuation.onTermination = {
if case .cancelled = $0 {
self.cancel()
}
}
}
return AsyncThrowingStream(Success.self, builder)
}

public var publisher: Publishers.Share<AnyPublisher<Success, Error>> {
stream.sharedPublisher
}
}
Loading