33// Copyright (c) 2015-2025 Alexander Grebenyuk (github.com/kean).
44
55import Foundation
6- @preconcurrency import Combine
76
87#if canImport(UIKit)
98import UIKit
@@ -65,8 +64,8 @@ public final class ImageTask: Hashable {
6564 }
6665
6766 /// The stream of progress updates.
68- public nonisolated var progress : AsyncStream < Progress > {
69- makeStream {
67+ public nonisolated var progress : AsyncCompactMapSequence < AsyncStream < Event > , Progress > {
68+ events . compactMap {
7069 if case . progress( let value) = $0 { return value }
7170 return nil
7271 }
@@ -76,17 +75,27 @@ public final class ImageTask: Hashable {
7675 /// progressive decoding.
7776 ///
7877 /// - seealso: ``ImagePipeline/Configuration-swift.struct/isProgressiveDecodingEnabled``
79- public nonisolated var previews : AsyncStream < ImageResponse > {
80- makeStream {
78+ public nonisolated var previews : AsyncCompactMapSequence < AsyncStream < Event > , ImageResponse > {
79+ events . compactMap {
8180 if case . preview( let value) = $0 { return value }
8281 return nil
8382 }
8483 }
8584
86- // MARK: - Events
87-
8885 /// The events sent by the pipeline during the task execution.
89- public nonisolated var events : AsyncStream < Event > { makeStream { $0 } }
86+ public nonisolated var events : AsyncStream < Event > {
87+ AsyncStream { continuation in
88+ Task { @ImagePipelineActor in
89+ if case . suspended = state {
90+ startRunning ( )
91+ }
92+ guard case . running = state else {
93+ return continuation. finish ( )
94+ }
95+ streamContinuations. append ( continuation)
96+ }
97+ }
98+ }
9099
91100 let isDataTask : Bool
92101 private let nonisolatedState : Mutex < ImageTaskState >
@@ -95,8 +104,8 @@ public final class ImageTask: Hashable {
95104 private var subscription : TaskSubscription ?
96105
97106 // TODO: optimize (store one inline)
98- private var continuations = ContiguousArray < UnsafeContinuation < ImageResponse , Swift . Error > > ( )
99- private var _events : PassthroughSubject < ImageTask . Event , Never > ?
107+ private var taskContinuations = ContiguousArray < UnsafeContinuation < ImageResponse , Swift . Error > > ( )
108+ private var streamContinuations = ContiguousArray < AsyncStream < ImageTask . Event > . Continuation > ( )
100109
101110 nonisolated init ( taskId: Int64 , request: ImageRequest , isDataTask: Bool , pipeline: ImagePipeline , onEvent: ( @Sendable ( Event , ImageTask ) -> Void ) ? ) {
102111 self . taskId = taskId
@@ -113,10 +122,10 @@ public final class ImageTask: Hashable {
113122 try await withUnsafeThrowingContinuation {
114123 switch state {
115124 case . suspended:
116- continuations . append ( $0)
125+ taskContinuations . append ( $0)
117126 startRunning ( )
118127 case . running:
119- continuations . append ( $0)
128+ taskContinuations . append ( $0)
120129 case . cancelled:
121130 $0. resume ( throwing: ImageTask . Error. cancelled)
122131 case . completed( let result) :
@@ -202,15 +211,22 @@ public final class ImageTask: Hashable {
202211 /// - warning: The task needs to be fully wired (`_continuation` present)
203212 /// before it can start sending the events.
204213 private func dispatch( _ event: Event ) {
205- _events? . send ( event)
214+ for continuation in streamContinuations {
215+ continuation. yield ( event)
216+ }
206217
207218 func complete( with result: Result < ImageResponse , ImageTask . Error > ) {
208219 subscription = nil
209- _events? . send ( completion: . finished)
210- for continuation in continuations {
220+
221+ for continuation in streamContinuations {
222+ continuation. finish ( )
223+ }
224+ streamContinuations = [ ]
225+
226+ for continuation in taskContinuations {
211227 continuation. resume ( with: result)
212228 }
213- continuations = [ ]
229+ taskContinuations = [ ]
214230 }
215231
216232 switch event {
@@ -237,46 +253,6 @@ public final class ImageTask: Hashable {
237253 }
238254}
239255
240- // MARK: - ImageTask (Private)
241-
242- extension ImageTask {
243- private nonisolated func makeStream< T> ( of closure: @Sendable @escaping ( Event ) -> T ? ) -> AsyncStream < T > {
244- AsyncStream { continuation in
245- Task { @ImagePipelineActor in
246- if case . suspended = state {
247- startRunning ( )
248- }
249- guard case . running = state else {
250- return continuation. finish ( )
251- }
252- let cancellable = makeEvents ( ) . sink { _ in
253- continuation. finish ( )
254- } receiveValue: { event in
255- if let value = closure ( event) {
256- continuation. yield ( value)
257- }
258- switch event {
259- case . cancelled, . finished:
260- continuation. finish ( )
261- default :
262- break
263- }
264- }
265- continuation. onTermination = { _ in
266- cancellable. cancel ( )
267- }
268- }
269- }
270- }
271-
272- private func makeEvents( ) -> PassthroughSubject < ImageTask . Event , Never > {
273- if _events == nil {
274- _events = PassthroughSubject ( )
275- }
276- return _events!
277- }
278- }
279-
280256private struct ImageTaskState {
281257 var isCancelling = false
282258 var priority : ImageRequest . Priority
0 commit comments