Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
* Provides `Infallible` versions of `combineLatest` without `resultSelector` requirement.
* Provides `Infallible` versions of `CombineLatest+Collection` helpers.
* Explicitly declare `APPLICATION_EXTENSION_API_ONLY` for CocoaPods
* Ensure `AsyncSequence.asObservable()` runs on background thread using `Task.detached`.

## 6.5.0

Expand Down
10 changes: 7 additions & 3 deletions RxSwift/Observable+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public extension AsyncSequence {
/// values of the asynchronous sequence's type
///
/// - returns: An `Observable` of the async sequence's type
func asObservable() -> Observable<Element> {
func asObservable(detached: Bool = false) -> Observable<Element> {
Observable.create { observer in
let task = Task {
let taskBlock: @Sendable () async -> Void = {
do {
for try await value in self {
observer.onNext(value)
Expand All @@ -73,7 +73,11 @@ public extension AsyncSequence {
observer.onError(error)
}
}


let task: Task<Void, Never> = detached
? Task.detached(operation: taskBlock)
: Task(operation: taskBlock)

return Disposables.create { task.cancel() }
}
}
Expand Down
96 changes: 96 additions & 0 deletions Tests/RxSwiftTests/Observable+ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,101 @@ extension ObservableConcurrencyTests {
task.cancel()
}

func testAsyncSequenceToObservableRunsOnBackgroundThread() async throws {

let asyncSequence = AsyncStream<Int> { continuation in
for i in 1...5 {
continuation.yield(i)
}
continuation.finish()
}

let expectation = XCTestExpectation(description: "Observable completes")

DispatchQueue.main.async {
let observable = asyncSequence.asObservable(detached: true)

var threadIsNotMain = false
var values = [Int]()

_ = observable.subscribe(
onNext: { value in
values.append(value)
threadIsNotMain = !Thread.isMainThread
},
onCompleted: {
XCTAssertEqual(values, [1, 2, 3, 4, 5])
XCTAssertTrue(threadIsNotMain, "AsyncSequence.asObservable should not run on main thread")
expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
}

func testAsyncSequenceToObservableWithSleep() async throws {
let asyncSequence = AsyncStream<Int> { continuation in
Task {
for i in 1...3 {
try? await Task.sleep(nanoseconds: 100_000_000)
continuation.yield(i)
}
continuation.finish()
}
}

let expectation = XCTestExpectation(description: "Observable with sleep completes")

DispatchQueue.main.async {
let startTime = Date()
var values = [Int]()
var executionThreads = Set<String>()

_ = asyncSequence.asObservable(detached: true).subscribe(
onNext: { value in
values.append(value)
let threadName = Thread.current.description
executionThreads.insert(threadName)
},
onCompleted: {
let duration = Date().timeIntervalSince(startTime)
XCTAssertGreaterThanOrEqual(duration, 0.3)
XCTAssertEqual(values, [1, 2, 3])
XCTAssertFalse(executionThreads.contains(where: { $0.contains("main") }))

expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
}

func testAsyncSequenceToObservableWithError() async throws {
struct TestError: Error {}

let asyncSequence = AsyncThrowingStream<Int, Error> { continuation in
for i in 1...3 {
continuation.yield(i)
}
continuation.finish(throwing: TestError())
}

let expectation = XCTestExpectation(description: "Observable with error completes")
var receivedError: Error?

_ = asyncSequence.asObservable(detached: true).subscribe(
onNext: { _ in },
onError: { error in
receivedError = error
expectation.fulfill()
}
)

await fulfillment(of: [expectation], timeout: 5.0)
XCTAssertTrue(receivedError is TestError)
}

}
#endif