Skip to content

Commit a200a47

Browse files
committed
Added unwrappedStream() method to AsyncObservable that allows you to read a stream of only non-nil values when the type is Optional.
1 parent c52b053 commit a200a47

File tree

4 files changed

+69
-15
lines changed

4 files changed

+69
-15
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
All notable changes to AsyncObservable will be documented in this file.
44

5+
## [0.3.1] - 2025-03-08
6+
7+
### Added
8+
- Added unwrappedStream() method to AsyncObservable that allows you to read a stream of only non-nil values when the type is Optional.
9+
510
## [0.3.0] - 2025-03-08
611

712
### Changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,19 @@ for await value in someProperty.stream {
111111
}
112112
```
113113

114+
## Unwrapped Stream
115+
116+
If you want to read a stream of non-nil values, but your type is an optional, you can use the `unwrappedStream` method.
117+
118+
```swift
119+
let someProperty = AsyncObservable(Data?)
120+
let stream = someProperty.unwrappedStream()
121+
122+
for await value in stream {
123+
print(value) // only non-nil values
124+
}
125+
```
126+
114127
## Mutate
115128

116129
Sometimes you just want to mutate the original value instead of having to copy and return a new value. This still updates all the observers correctly and is safe.

Sources/AsyncObservable/AsyncObservable.swift

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import Foundation
21
import Dispatch
2+
import Foundation
33

44
#if canImport(Observation)
5-
import Observation
5+
import Observation
66
#endif
77

88
/// A thread-safe state management system that provides both async stream-based observation
@@ -85,7 +85,7 @@ open class AsyncObservable<T: Sendable>: AsyncObservableReadOnly, @unchecked Sen
8585

8686
/// An async stream of values that can be used with Swift concurrency.
8787
/// This property provides a convenient way to access the value stream without calling `stream()`.
88-
public var stream: StreamOf<T> {
88+
open var stream: StreamOf<T> {
8989
streamOf()
9090
}
9191

@@ -152,7 +152,7 @@ open class AsyncObservable<T: Sendable>: AsyncObservableReadOnly, @unchecked Sen
152152
}
153153

154154
/// Creates a new state manager with the given initial value.
155-
/// This initializer can be called from any context
155+
/// This initializer can be called from any context
156156
///
157157
/// - Parameters:
158158
/// - initialValue: The initial value to manage
@@ -162,8 +162,7 @@ open class AsyncObservable<T: Sendable>: AsyncObservableReadOnly, @unchecked Sen
162162
_ initialValue: T,
163163
bufferingPolicy: AsyncStream<T>.Continuation.BufferingPolicy = .unbounded,
164164
serialQueue: DispatchQueue = DispatchQueue(label: "AsyncObservable")
165-
)
166-
{
165+
) {
167166
_raw = initialValue
168167
self.bufferingPolicy = bufferingPolicy
169168
self.serialQueue = serialQueue
@@ -237,7 +236,7 @@ open class AsyncObservable<T: Sendable>: AsyncObservableReadOnly, @unchecked Sen
237236
/// The stream immediately yields the current value and then yields all subsequent updates.
238237
///
239238
/// - Returns: A StreamOf<T> instance that can be used with async/await code
240-
private func streamOf() -> StreamOf<T> {
239+
internal func streamOf() -> StreamOf<T> {
241240
let id = UUID()
242241
return StreamOf<T>(
243242
bufferingPolicy: bufferingPolicy,
@@ -251,4 +250,10 @@ open class AsyncObservable<T: Sendable>: AsyncObservableReadOnly, @unchecked Sen
251250
continuation.yield(self.raw)
252251
})
253252
}
253+
254+
@available(iOS 17.0, macOS 15.0, tvOS 17.0, watchOS 10.0, *)
255+
func unwrappedStream<U>() -> any AsyncSequence<U, Never> where T == U? {
256+
stream.compactMap { $0 }
257+
}
258+
254259
}

Tests/AsyncObservableTests/StreamTests.swift

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import Foundation
22
import Testing
3+
34
@testable import AsyncObservable
45

56
@Suite("AsyncObservable Stream Tests")
@@ -36,7 +37,7 @@ struct AsyncObservableStreamTests {
3637

3738
#expect(receivedValues == [0, 1, 2])
3839
}
39-
40+
4041
@Test("Should handle multiple observers")
4142
@available(iOS 17.0, macOS 14.0, tvOS 17.0, watchOS 10.0, *)
4243
func testMultipleObservers() async {
@@ -45,7 +46,7 @@ struct AsyncObservableStreamTests {
4546
// Create two separate streams
4647
let stream1 = observable.stream
4748
let stream2 = observable.stream
48-
49+
4950
// Consume initial values first
5051
var values1: [Int] = []
5152
var values2: [Int] = []
@@ -56,7 +57,7 @@ struct AsyncObservableStreamTests {
5657
values1.append(value)
5758
}
5859
}
59-
60+
6061
let task2 = Task {
6162
for await value in stream2 {
6263
values2.append(value)
@@ -90,7 +91,7 @@ struct AsyncObservableStreamTests {
9091

9192
// Now start observing - we should get the current value (4) and possibly some buffered values
9293
var receivedValues: [Int] = []
93-
94+
9495
// Create a task with a timeout to avoid hanging
9596
let task = Task {
9697
for await value in observable.stream {
@@ -100,7 +101,7 @@ struct AsyncObservableStreamTests {
100101
}
101102
}
102103
}
103-
104+
104105
// Add a timeout to ensure the test doesn't hang if the stream doesn't yield enough values
105106
try? await Task.sleep(nanoseconds: 100_000_000)
106107
task.cancel()
@@ -109,7 +110,7 @@ struct AsyncObservableStreamTests {
109110
#expect(receivedValues.count > 0)
110111
#expect(receivedValues.contains(4), "Should contain the latest value")
111112
}
112-
113+
113114
@Test("Should not update observers when notifyObservers is false")
114115
@available(iOS 17.0, macOS 14.0, tvOS 17.0, watchOS 10.0, *)
115116
func testSilentUpdate() async {
@@ -140,9 +141,39 @@ struct AsyncObservableStreamTests {
140141

141142
// No value should have been received
142143
#expect(receivedValue == false)
143-
144+
144145
// But the internal value should be updated
145146
#expect(observable.raw == 20)
146147
await #expect(observable.observable == 20)
147148
}
148-
}
149+
150+
@Test("unsrapped steram")
151+
@available(iOS 17.0, macOS 15.0, tvOS 17.0, watchOS 10.0, *)
152+
func testUnrappedStream() async {
153+
let observable: AsyncObservable<Int?> = AsyncObservable(nil)
154+
let stream = observable.unwrappedStream()
155+
156+
// should only get values that are not nil
157+
var sum = 0
158+
let task = Task {
159+
for await value in stream {
160+
sum += value
161+
#expect(value != nil)
162+
}
163+
}
164+
165+
observable.update(nil)
166+
observable.update(1)
167+
observable.update(2)
168+
observable.update(nil)
169+
observable.update(3)
170+
observable.update(nil)
171+
observable.update(5)
172+
173+
// Give some time for potential values to arrive
174+
try? await Task.sleep(for: .milliseconds(100))
175+
task.cancel()
176+
177+
#expect(sum == 11)
178+
}
179+
}

0 commit comments

Comments
 (0)