Skip to content

Commit d01941c

Browse files
committed
Update Workspace to support AsyncSequence subscribed object updates.
1 parent 82d0cbe commit d01941c

File tree

3 files changed

+281
-6
lines changed

3 files changed

+281
-6
lines changed

src/Workspace.swift

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ public protocol WorkspaceDictionary {
3434
/**
3535
* Get the latest value, whether it is in memory or from disk.
3636
* Set the value, it will persist asynchronously.
37+
*
38+
* - Parameters:
39+
* - key: Key to index into the persisted dictionary.
40+
*
41+
* - Returns: Value associated with the provided key. Nil if doesn't exist.
3742
*/
3843
subscript<T: Codable & Equatable>(_: String, _: T.Type) -> T? { get set }
3944
subscript<T: FlatBuffersCodable & Equatable>(_: String, _: T.Type) -> T? { get set }
@@ -55,6 +60,8 @@ public protocol WorkspaceDictionary {
5560
/**
5661
* Return all keys available in the dictionary. This is an expensive (for this dictionary)
5762
* method as it fetches from disk, from in-memory structures, and acquire locks if needed.
63+
*
64+
* - Returns: List of all keys available at the point of time in persisted dictionary.
5865
*/
5966
var keys: [String] { get }
6067
/**
@@ -160,22 +167,51 @@ public protocol Workspace: Queryable {
160167
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
161168
// MARK - Combine-compliant
162169
/**
163-
* Return a publisher for object subscription in Combine.
164-
*/
170+
* Return a publisher for object subscription in Combine.
171+
*/
165172
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
166173
func publisher<Element: Atom & Equatable>(for: Element) -> AtomPublisher<Element>
167174
/**
168-
* Return a publisher for fetched result subscription in Combine.
169-
*/
175+
* Return a publisher for fetched result subscription in Combine.
176+
*/
170177
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
171178
func publisher<Element: Atom & Equatable>(for: FetchedResult<Element>)
172179
-> FetchedResultPublisher<Element>
173180
/**
174-
* Return a publisher builder for query subscription in Combine.
175-
*/
181+
* Return a publisher builder for query subscription in Combine.
182+
*/
176183
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
177184
func publisher<Element: Atom & Equatable>(for: Element.Type) -> QueryPublisherBuilder<Element>
178185
#endif
186+
#if compiler(>=5.5) && canImport(_Concurrency)
187+
/**
188+
* Subscribe to changes to the said object, and return the AsyncSequence you can iterate over.
189+
*
190+
* - Parameters:
191+
* - object: The object previously fetched that we want to observe the new updates.
192+
* - bufferingPolicy: The buffering policy to avoid issuing all updates to concerned parties. Default will be the newest of 1.
193+
*
194+
* - Returns: An AsyncSequence that can await for new object updates. Finishes only if the object deletes.
195+
*/
196+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
197+
func subscribe<Element: Atom & Equatable>(
198+
object: Element, bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy
199+
) -> AsyncStream<Element>
200+
/**
201+
* Subscribe to changes to the said fetched result, and return the AsyncSequence you can iterate over.
202+
*
203+
* - Parameters:
204+
* - fetchedResult: The result fetched that we want to observe the new updates.
205+
* - bufferingPolicy: The buffering policy to avoid issuing all updates to concerned parties. Default will be the newest of 1.
206+
*
207+
* - Returns: An AsyncSequence that can await for new fetched result. It never finishes.
208+
*/
209+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
210+
func subscribe<Element: Atom & Equatable>(
211+
fetchedResult: FetchedResult<Element>,
212+
bufferingPolicy: AsyncStream<FetchedResult<Element>>.Continuation.BufferingPolicy
213+
) -> AsyncStream<FetchedResult<Element>>
214+
#endif
179215
}
180216

181217
extension WorkspaceDictionary {
@@ -224,6 +260,12 @@ extension WorkspaceDictionary {
224260
* call this method again with different default value (while
225261
* the underlying kept nil), it will return that different default
226262
* value.
263+
*
264+
* - Parameters:
265+
* - key: Key to index into the persisted dictionary.
266+
* - default: The default value to use if the key is missing.
267+
*
268+
* - Returns: Value associated with the provided key. Return the default value if missing.
227269
*/
228270
public subscript<T: Codable & Equatable>(key: String, default value: T) -> T {
229271
get { self[key] ?? value }
@@ -260,4 +302,16 @@ extension Workspace {
260302
) {
261303
performChanges(transactionalObjectTypes, changesHandler: changesHandler, completionHandler: nil)
262304
}
305+
#if compiler(>=5.5) && canImport(_Concurrency)
306+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
307+
public func subscribe<Element: Atom & Equatable>(object: Element) -> AsyncStream<Element> {
308+
subscribe(object: object, bufferingPolicy: .bufferingNewest(1))
309+
}
310+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
311+
public func subscribe<Element: Atom & Equatable>(fetchedResult: FetchedResult<Element>)
312+
-> AsyncStream<FetchedResult<Element>>
313+
{
314+
subscribe(fetchedResult: fetchedResult, bufferingPolicy: .bufferingNewest(1))
315+
}
316+
#endif
263317
}

src/sqlite/SQLiteWorkspace.swift

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ public final class SQLiteWorkspace: Workspace {
254254
)
255255
}
256256
}
257+
257258
#if compiler(>=5.5) && canImport(_Concurrency)
258259
/**
259260
* Perform a transaction for given object types and await either success or failure boolean.
@@ -541,6 +542,44 @@ public final class SQLiteWorkspace: Workspace {
541542

542543
#endif
543544

545+
// MARK - Subscription as AsyncSequence
546+
547+
#if compiler(>=5.5) && canImport(_Concurrency)
548+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
549+
public func subscribe<Element: Atom & Equatable>(
550+
object: Element, bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy
551+
) -> AsyncStream<Element> {
552+
AsyncStream { continuation in
553+
let cancellable = self.subscribe(object: object) { object in
554+
switch object {
555+
case .deleted:
556+
continuation.finish()
557+
case .updated(let object):
558+
continuation.yield(object)
559+
}
560+
}
561+
continuation.onTermination = { @Sendable _ in
562+
cancellable.cancel()
563+
}
564+
}
565+
}
566+
567+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
568+
public func subscribe<Element: Atom & Equatable>(
569+
fetchedResult: FetchedResult<Element>,
570+
bufferingPolicy: AsyncStream<FetchedResult<Element>>.Continuation.BufferingPolicy
571+
) -> AsyncStream<FetchedResult<Element>> {
572+
AsyncStream { continuation in
573+
let cancellable = self.subscribe(fetchedResult: fetchedResult) { fetchedResult in
574+
continuation.yield(fetchedResult)
575+
}
576+
continuation.onTermination = { @Sendable _ in
577+
cancellable.cancel()
578+
}
579+
}
580+
}
581+
#endif
582+
544583
// MARK - Internal
545584

546585
static func setUpFilePathWithProtectionLevel(

src/tests/AsyncTests.swift

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,187 @@ class AsyncTests: XCTestCase {
3333
let firstMonster = fetchedResult[0]
3434
XCTAssertEqual(firstMonster.name, "What's my name")
3535
}
36+
37+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
38+
func testSubscribeObjectAsyncSequence() async {
39+
guard let dflat = dflat else { return }
40+
await dflat.performChanges(
41+
[MyGame.Sample.Monster.self],
42+
changesHandler: { (txnContext) in
43+
for i in 0..<10 {
44+
let creationRequest = MyGame.Sample.MonsterChangeRequest.creationRequest()
45+
creationRequest.name = "name \(i)"
46+
try! txnContext.submit(creationRequest)
47+
}
48+
}
49+
)
50+
let fetchedResult = dflat.fetch(for: MyGame.Sample.Monster.self).all()
51+
XCTAssertEqual(fetchedResult.count, 10)
52+
let firstMonster = fetchedResult[0]
53+
XCTAssertEqual(firstMonster.name, "name 0")
54+
let subscribeTask = Task {
55+
var updatedObject: MyGame.Sample.Monster? = nil
56+
for await newObject in dflat.subscribe(object: firstMonster, bufferingPolicy: .unbounded) {
57+
updatedObject = newObject
58+
}
59+
XCTAssertEqual(updatedObject?.color, .red)
60+
}
61+
await dflat.performChanges(
62+
[MyGame.Sample.Monster.self],
63+
changesHandler: { (txnContext) in
64+
guard let changeRequest = MyGame.Sample.MonsterChangeRequest.changeRequest(firstMonster)
65+
else { return }
66+
changeRequest.color = .red
67+
try! txnContext.submit(changeRequest)
68+
}
69+
)
70+
await dflat.performChanges(
71+
[MyGame.Sample.Monster.self],
72+
changesHandler: { (txnContext) in
73+
guard
74+
let deletionRequest = MyGame.Sample.MonsterChangeRequest.deletionRequest(firstMonster)
75+
else { return }
76+
try! txnContext.submit(deletionRequest)
77+
}
78+
)
79+
await subscribeTask.value
80+
}
81+
82+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
83+
func testSubscribeOutdatedObjectAndCacnelBeforeNextUpdateAsyncSequence() async {
84+
guard let dflat = dflat else { return }
85+
await dflat.performChanges(
86+
[MyGame.Sample.Monster.self],
87+
changesHandler: { (txnContext) in
88+
for i in 0..<10 {
89+
let creationRequest = MyGame.Sample.MonsterChangeRequest.creationRequest()
90+
creationRequest.name = "name \(i)"
91+
try! txnContext.submit(creationRequest)
92+
}
93+
}
94+
)
95+
let fetchedResult = dflat.fetch(for: MyGame.Sample.Monster.self).all()
96+
XCTAssertEqual(fetchedResult.count, 10)
97+
let firstMonster = fetchedResult[0]
98+
XCTAssertEqual(firstMonster.name, "name 0")
99+
await dflat.performChanges(
100+
[MyGame.Sample.Monster.self],
101+
changesHandler: { (txnContext) in
102+
guard let changeRequest = MyGame.Sample.MonsterChangeRequest.changeRequest(firstMonster)
103+
else { return }
104+
changeRequest.color = .red
105+
try! txnContext.submit(changeRequest)
106+
}
107+
)
108+
let subscribeTask = Task {
109+
var updatedObject: MyGame.Sample.Monster? = nil
110+
for await newObject in dflat.subscribe(object: firstMonster, bufferingPolicy: .unbounded) {
111+
updatedObject = newObject
112+
break
113+
}
114+
XCTAssertEqual(updatedObject?.color, .red)
115+
}
116+
await subscribeTask.value
117+
await dflat.performChanges(
118+
[MyGame.Sample.Monster.self],
119+
changesHandler: { (txnContext) in
120+
guard
121+
let deletionRequest = MyGame.Sample.MonsterChangeRequest.deletionRequest(firstMonster)
122+
else { return }
123+
try! txnContext.submit(deletionRequest)
124+
}
125+
)
126+
}
127+
128+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
129+
func testSubscribeFetchedResultAsyncSequence() async {
130+
guard let dflat = dflat else { return }
131+
await dflat.performChanges(
132+
[MyGame.Sample.Monster.self],
133+
changesHandler: { (txnContext) in
134+
for i in 0..<10 {
135+
let creationRequest = MyGame.Sample.MonsterChangeRequest.creationRequest()
136+
creationRequest.name = "name \(i)"
137+
creationRequest.mana = Int16(i * 10)
138+
try! txnContext.submit(creationRequest)
139+
}
140+
}
141+
)
142+
let fetchedResult = dflat.fetch(for: MyGame.Sample.Monster.self).where(
143+
MyGame.Sample.Monster.mana <= 50, orderBy: [MyGame.Sample.Monster.mana.ascending])
144+
XCTAssertEqual(fetchedResult.count, 6)
145+
let subscribeTask = Task { () -> FetchedResult<MyGame.Sample.Monster> in
146+
var updateCount = 0
147+
var updatedFetchedResult = fetchedResult
148+
for await newFetchedResult in dflat.subscribe(
149+
fetchedResult: fetchedResult, bufferingPolicy: .unbounded)
150+
{
151+
updatedFetchedResult = newFetchedResult
152+
updateCount += 1
153+
if updateCount == 4 {
154+
break
155+
}
156+
}
157+
return updatedFetchedResult
158+
}
159+
// Add one.
160+
await dflat.performChanges(
161+
[MyGame.Sample.Monster.self],
162+
changesHandler: { (txnContext) in
163+
let creationRequest = MyGame.Sample.MonsterChangeRequest.creationRequest()
164+
creationRequest.name = "name 10"
165+
creationRequest.mana = 15
166+
try! txnContext.submit(creationRequest)
167+
}
168+
)
169+
// Mutate one, move to later.
170+
await dflat.performChanges(
171+
[MyGame.Sample.Monster.self],
172+
changesHandler: { (txnContext) in
173+
let monster = dflat.fetch(for: MyGame.Sample.Monster.self).where(
174+
MyGame.Sample.Monster.name == "name 2")[0]
175+
guard let changeRequest = MyGame.Sample.MonsterChangeRequest.changeRequest(monster) else {
176+
return
177+
}
178+
changeRequest.mana = 43
179+
try! txnContext.submit(changeRequest)
180+
}
181+
)
182+
// Mutate one, move to earlier.
183+
await dflat.performChanges(
184+
[MyGame.Sample.Monster.self],
185+
changesHandler: { (txnContext) in
186+
let monster = dflat.fetch(for: MyGame.Sample.Monster.self).where(
187+
MyGame.Sample.Monster.name == "name 4")[0]
188+
guard let changeRequest = MyGame.Sample.MonsterChangeRequest.changeRequest(monster) else {
189+
return
190+
}
191+
changeRequest.mana = 13
192+
try! txnContext.submit(changeRequest)
193+
}
194+
)
195+
// Delete one, move to earlier.
196+
await dflat.performChanges(
197+
[MyGame.Sample.Monster.self],
198+
changesHandler: { (txnContext) in
199+
let monster = dflat.fetch(for: MyGame.Sample.Monster.self).where(
200+
MyGame.Sample.Monster.name == "name 3")[0]
201+
guard let deletionRequest = MyGame.Sample.MonsterChangeRequest.deletionRequest(monster)
202+
else { return }
203+
try! txnContext.submit(deletionRequest)
204+
}
205+
)
206+
let updatedFetchedResult = await subscribeTask.value
207+
XCTAssertEqual(updatedFetchedResult.count, 6)
208+
XCTAssertEqual(updatedFetchedResult[0].name, "name 0")
209+
XCTAssertEqual(updatedFetchedResult[1].name, "name 1")
210+
XCTAssertEqual(updatedFetchedResult[2].name, "name 4")
211+
XCTAssertEqual(updatedFetchedResult[3].name, "name 10")
212+
XCTAssertEqual(updatedFetchedResult[4].name, "name 2")
213+
XCTAssertEqual(updatedFetchedResult[5].name, "name 5")
214+
let finalFetchedResult = dflat.fetch(for: MyGame.Sample.Monster.self).where(
215+
MyGame.Sample.Monster.mana <= 50, orderBy: [MyGame.Sample.Monster.mana.ascending])
216+
XCTAssertEqual(updatedFetchedResult, finalFetchedResult)
217+
}
36218
#endif
37219
}

0 commit comments

Comments
 (0)