Skip to content

Commit

Permalink
test AsyncValueObservation with all DatabaseQueue & Pool
Browse files Browse the repository at this point in the history
Also use .trackingConstantRegion since it triggers optimizations
  • Loading branch information
groue committed Oct 15, 2021
1 parent 9a28c67 commit bbd7716
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 105 deletions.
2 changes: 1 addition & 1 deletion Tests/GRDBTests/ValueObservationRecorder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ extension XCTestCase {
// Both missing and repeated values are allowed in the recorded values.
// This is because of asynchronous DatabasePool observations.
if recordedSuffix.isEmpty {
XCTFail("missing expected value \(value) - \(message())", file: file, line: line)
XCTFail("missing expected value \(value) - \(message()) in \(recordedValues)", file: file, line: line)
}
}

Expand Down
266 changes: 162 additions & 104 deletions Tests/GRDBTests/ValueObservationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -520,144 +520,202 @@ class ValueObservationTests: GRDBTestCase {
}

#if swift(>=5.5)
// MARK: - Async Await

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func testAsyncAwait_values_prefix() async throws {
let dbQueue = try makeDatabaseQueue()

// We need something to change
try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }

let cancellationExpectation = expectation(description: "cancelled")
let observation = ValueObservation
.tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(didCancel: { cancellationExpectation.fulfill() })

let task = Task { () -> [Int] in
var counts: [Int] = []
func test(writer: DatabaseWriter) async throws {
// We need something to change
try await writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }

let cancellationExpectation = expectation(description: "cancelled")
let observation = ValueObservation
.trackingConstantRegion { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(didCancel: { cancellationExpectation.fulfill() })

for try await count in observation.values(in: dbQueue).prefix(3) {
counts.append(count)
try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
let task = Task { () -> [Int] in
var counts: [Int] = []

for try await count in try observation.values(in: writer).prefix(while: { $0 < 3 }) {
counts.append(count)
try await writer.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
}
return counts
}
return counts

let counts = try await task.value

// All values were published
assertValueObservationRecordingMatch(recorded: counts, expected: [0, 1, 2])

// Observation was ended
wait(for: [cancellationExpectation], timeout: 2)
}

let counts = try await task.value

// All values were published
XCTAssertEqual(counts, [0, 1, 2])

// Observation was ended
wait(for: [cancellationExpectation], timeout: 2)
try await AsyncTest(test)
.run { DatabaseQueue() }
.runAtTemporaryDatabasePath { try DatabaseQueue(path: $0) }
.runAtTemporaryDatabasePath { try DatabasePool(path: $0) }
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func testAsyncAwait_values_prefix_immediate_scheduling() async throws {
let dbQueue = try makeDatabaseQueue()

// We need something to change
try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }

let cancellationExpectation = expectation(description: "cancelled")
let observation = ValueObservation
.tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(didCancel: { cancellationExpectation.fulfill() })

let task = Task { @MainActor () -> [Int] in
var counts: [Int] = []
func test(writer: DatabaseWriter) async throws {
// We need something to change
try await writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }

for try await count in observation.values(in: dbQueue, scheduling: .immediate).prefix(3) {
counts.append(count)
try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
let cancellationExpectation = expectation(description: "cancelled")
let observation = ValueObservation
.trackingConstantRegion { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(didCancel: { cancellationExpectation.fulfill() })

let task = Task { @MainActor () -> [Int] in
var counts: [Int] = []

for try await count in try observation.values(in: writer, scheduling: .immediate).prefix(while: { $0 < 3 }) {
counts.append(count)
try await writer.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
}
return counts
}
return counts

let counts = try await task.value

// All values were published
assertValueObservationRecordingMatch(recorded: counts, expected: [0, 1, 2])

// Observation was ended
wait(for: [cancellationExpectation], timeout: 2)
}

let counts = try await task.value

// All values were published
XCTAssertEqual(counts, [0, 1, 2])

// Observation was ended
wait(for: [cancellationExpectation], timeout: 2)
try await AsyncTest(test)
.run { DatabaseQueue() }
.runAtTemporaryDatabasePath { try DatabaseQueue(path: $0) }
.runAtTemporaryDatabasePath { try DatabasePool(path: $0) }
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func testAsyncAwait_values_break() async throws {
let dbQueue = try makeDatabaseQueue()

// We need something to change
try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }

let cancellationExpectation = expectation(description: "cancelled")
let observation = ValueObservation
.tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(didCancel: { cancellationExpectation.fulfill() })
func test(writer: DatabaseWriter) async throws {
// We need something to change
try await writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }

let cancellationExpectation = expectation(description: "cancelled")
let observation = ValueObservation
.trackingConstantRegion { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(didCancel: { cancellationExpectation.fulfill() })

let task = Task { () -> [Int] in
var counts: [Int] = []

for try await count in observation.values(in: writer) {
counts.append(count)
if count == 2 {
break
} else {
try await writer.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
}
}
return counts
}

let counts = try await task.value

// All values were published
assertValueObservationRecordingMatch(recorded: counts, expected: [0, 1, 2])

// Observation was ended
wait(for: [cancellationExpectation], timeout: 2)
}

let task = Task { () -> [Int] in
var counts: [Int] = []
try await AsyncTest(test)
.run { DatabaseQueue() }
.runAtTemporaryDatabasePath { try DatabaseQueue(path: $0) }
.runAtTemporaryDatabasePath { try DatabasePool(path: $0) }
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func testAsyncAwait_values_immediate_break() async throws {
func test(writer: DatabaseWriter) async throws {
// We need something to change
try await writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }

let cancellationExpectation = expectation(description: "cancelled")
let observation = ValueObservation
.trackingConstantRegion { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(didCancel: { cancellationExpectation.fulfill() })

for try await count in observation.values(in: dbQueue) {
counts.append(count)
if count == 2 {
let task = Task { @MainActor () -> [Int] in
var counts: [Int] = []

for try await count in observation.values(in: writer, scheduling: .immediate) {
counts.append(count)
break
} else {
try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
}
return counts
}
return counts

let counts = try await task.value

// A single value was published
assertValueObservationRecordingMatch(recorded: counts, expected: [0])

// Observation was ended
wait(for: [cancellationExpectation], timeout: 2)
}

let counts = try await task.value

// All values were published
XCTAssertEqual(counts, [0, 1, 2])

// Observation was ended
wait(for: [cancellationExpectation], timeout: 2)
try await AsyncTest(test)
.run { DatabaseQueue() }
.runAtTemporaryDatabasePath { try DatabaseQueue(path: $0) }
.runAtTemporaryDatabasePath { try DatabasePool(path: $0) }
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func testAsyncAwait_values_cancelled() async throws {
let dbQueue = try makeDatabaseQueue()

// We need something to change
try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }

let cancellationExpectation = expectation(description: "cancelled")
let valueExpectation = expectation(description: "value")
valueExpectation.assertForOverFulfill = false
let observation = ValueObservation
.tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(
didReceiveValue: { _ in valueExpectation.fulfill() },
didCancel: { cancellationExpectation.fulfill() })

struct TestError: Error { }
do {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
// Infinite loop
for try await _ in observation.values(in: dbQueue) {
try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
func test(writer: DatabaseWriter) async throws {
// We need something to change
try await writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }
let cancellationExpectation = expectation(description: "cancelled")
let valueExpectation = expectation(description: "value")
valueExpectation.assertForOverFulfill = false
let observation = ValueObservation
.trackingConstantRegion { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
.handleEvents(
didReceiveValue: { _ in valueExpectation.fulfill() },
didCancel: { cancellationExpectation.fulfill() })

struct TestError: Error { }
do {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
// Infinite loop
for try await _ in observation.values(in: writer) {
try await writer.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
}
}
group.addTask {
// Throw after a delay
try await Task.sleep(nanoseconds: 1_000_000)
throw TestError()
}

for try await _ in group { }
}
group.addTask {
// Throw after a delay
try await Task.sleep(nanoseconds: 1_000_000)
throw TestError()
}

for try await _ in group { }
XCTFail("Expected error")
} catch is TestError {
} catch {
XCTFail("Unexpected error \(error)")
}
XCTFail("Expected error")
} catch is TestError {
} catch {
XCTFail("Unexpected error \(error)")

// A value was observed, and observation was ended
wait(for: [valueExpectation, cancellationExpectation], timeout: 2)
}

// A value was observed, and observation was ended
wait(for: [valueExpectation, cancellationExpectation], timeout: 2)
try await AsyncTest(test)
.run { DatabaseQueue() }
.runAtTemporaryDatabasePath { try DatabaseQueue(path: $0) }
.runAtTemporaryDatabasePath { try DatabasePool(path: $0) }
}
#endif
}

0 comments on commit bbd7716

Please sign in to comment.