diff --git a/GRDB/ValueObservation/ValueObservation.swift b/GRDB/ValueObservation/ValueObservation.swift index 86e6f89e5f..8baf5aeaad 100644 --- a/GRDB/ValueObservation/ValueObservation.swift +++ b/GRDB/ValueObservation/ValueObservation.swift @@ -211,6 +211,109 @@ extension ValueObservation: Refinable { } } +#if swift(>=5.5) +extension ValueObservation { + // MARK: - Asynchronous Observation + + /// The database observation, as an asynchronous sequence of + /// database changes. + /// + /// [**Experimental**](http://github.com/groue/GRDB.swift#what-are-experimental-features) + /// + /// - parameter reader: A DatabaseReader. + /// - parameter scheduler: A Scheduler. By default, fresh values are + /// dispatched asynchronously on the main queue. + @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) + public func values( + in reader: DatabaseReader, + scheduling scheduler: ValueObservationScheduler = .async(onQueue: .main)) + -> AsyncValueObservation + { + AsyncValueObservation { onError, onChange in + self.start(in: reader, scheduling: scheduler, onError: onError, onChange: onChange) + } + } +} + +/// An asynchronous sequence of database changes. +/// +/// [**Experimental**](http://github.com/groue/GRDB.swift#what-are-experimental-features) +/// +/// Usage: +/// +/// let observation = ValueObservation.tracking(Player.fetchAll) +/// let dbQueue: DatabaseQueue: ... +/// +/// // Each database change in the player prints "Fresh players: ..." +/// for try await players in observation.values(in: dbQueue) { +/// print("Fresh players: \(players)") +/// } +/// +/// See `ValueObservation` for more information. +/// +/// - note: This async sequence never ends. +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +public struct AsyncValueObservation: AsyncSequence { + public typealias AsyncIterator = Iterator + typealias StartFunction = ( + _ onError: @escaping (Error) -> Void, + _ onChange: @escaping (Element) -> Void) + -> DatabaseCancellable + private var start: StartFunction + + init(start: @escaping StartFunction) { + self.start = start + } + + public func makeAsyncIterator() -> Iterator { + // This cancellable will be retained by the Iterator, which itself will + // be retained by the Swift async runtime. + // + // We must not retain this cancellable in any other way, in order to + // cancel the observation when the Swift async runtime releases + // the iterator. + var cancellable: AnyDatabaseCancellable? + let stream = AsyncThrowingStream(Element.self, bufferingPolicy: .unbounded) { continuation in + cancellable = AnyDatabaseCancellable(start( + // onError + { error in + continuation.finish(throwing: error) + }, + // onChange + { [weak cancellable] element in + if case .terminated = continuation.yield(element) { + // TODO: I could never see this code running. Is it needed? + cancellable?.cancel() + } + })) + continuation.onTermination = { @Sendable [weak cancellable] _ in + cancellable?.cancel() + } + } + + let iterator = stream.makeAsyncIterator() + if let cancellable = cancellable { + return Iterator( + iterator: iterator, + cancellable: cancellable) + } else { + // GRDB bug: there is no point throwing any error. + fatalError("Expected AsyncThrowingStream to have started the observation already") + } + } + + /// An asynchronous iterator that supplies database changes one at a time. + public struct Iterator: AsyncIteratorProtocol { + var iterator: AsyncThrowingStream.AsyncIterator + let cancellable: AnyDatabaseCancellable + + public mutating func next() async throws -> Element? { + try await iterator.next() + } + } +} +#endif + #if canImport(Combine) extension ValueObservation { // MARK: - Publishing Observed Values diff --git a/Tests/GRDBTests/ValueObservationTests.swift b/Tests/GRDBTests/ValueObservationTests.swift index 6e7cd95ec4..8a8f7c25f4 100644 --- a/Tests/GRDBTests/ValueObservationTests.swift +++ b/Tests/GRDBTests/ValueObservationTests.swift @@ -518,4 +518,146 @@ class ValueObservationTests: GRDBTestCase { try test(makeDatabaseQueue()) try test(makeDatabasePool()) } + +#if swift(>=5.5) + @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) + func testAsyncAwait_values_prefix() async throws { + let dbQueue = try makeDatabaseQueue() + + // We need something to change + try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") } + + let cancellationExpectation = expectation(description: "cancelled") + let observation = ValueObservation + .tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! } + .handleEvents(didCancel: { cancellationExpectation.fulfill() }) + + let task = Task { () -> [Int] in + var counts: [Int] = [] + + for try await count in observation.values(in: dbQueue).prefix(3) { + counts.append(count) + try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") } + } + return counts + } + + let counts = try await task.value + + // All values were published + XCTAssertEqual(counts, [0, 1, 2]) + + // Observation was ended + wait(for: [cancellationExpectation], timeout: 2) + } + + @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) + func testAsyncAwait_values_prefix_immediate_scheduling() async throws { + let dbQueue = try makeDatabaseQueue() + + // We need something to change + try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") } + + let cancellationExpectation = expectation(description: "cancelled") + let observation = ValueObservation + .tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! } + .handleEvents(didCancel: { cancellationExpectation.fulfill() }) + + let task = Task { @MainActor () -> [Int] in + var counts: [Int] = [] + + for try await count in observation.values(in: dbQueue, scheduling: .immediate).prefix(3) { + counts.append(count) + try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") } + } + return counts + } + + let counts = try await task.value + + // All values were published + XCTAssertEqual(counts, [0, 1, 2]) + + // Observation was ended + wait(for: [cancellationExpectation], timeout: 2) + } + + @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) + func testAsyncAwait_values_break() async throws { + let dbQueue = try makeDatabaseQueue() + + // We need something to change + try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") } + + let cancellationExpectation = expectation(description: "cancelled") + let observation = ValueObservation + .tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! } + .handleEvents(didCancel: { cancellationExpectation.fulfill() }) + + let task = Task { () -> [Int] in + var counts: [Int] = [] + + for try await count in observation.values(in: dbQueue) { + counts.append(count) + if count == 2 { + break + } else { + try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") } + } + } + return counts + } + + let counts = try await task.value + + // All values were published + XCTAssertEqual(counts, [0, 1, 2]) + + // Observation was ended + wait(for: [cancellationExpectation], timeout: 2) + } + + @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) + func testAsyncAwait_values_cancelled() async throws { + let dbQueue = try makeDatabaseQueue() + + // We need something to change + try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") } + + let cancellationExpectation = expectation(description: "cancelled") + let valueExpectation = expectation(description: "value") + valueExpectation.assertForOverFulfill = false + let observation = ValueObservation + .tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! } + .handleEvents( + didReceiveValue: { _ in valueExpectation.fulfill() }, + didCancel: { cancellationExpectation.fulfill() }) + + struct TestError: Error { } + do { + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + // Infinite loop + for try await _ in observation.values(in: dbQueue) { + try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") } + } + } + group.addTask { + // Throw after a delay + try await Task.sleep(nanoseconds: 1_000_000) + throw TestError() + } + + for try await _ in group { } + } + XCTFail("Expected error") + } catch is TestError { + } catch { + XCTFail("Unexpected error \(error)") + } + + // A value was observed, and observation was ended + wait(for: [valueExpectation, cancellationExpectation], timeout: 2) + } +#endif }