Skip to content

Commit d3f50ad

Browse files
authored
feat(llc): relisten to websocket events after reconnect (#43)
1 parent 260323e commit d3f50ad

File tree

3 files changed

+44
-3
lines changed

3 files changed

+44
-3
lines changed

packages/stream_feeds/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## Unreleased
22
- [BREAKING] Renamed `AppLifecycleStateProvider` to `LifecycleStateProvider` and `AppLifecycleState` to `LifecycleState`.
3+
- Re-watch websocket events for feeds when the websocket reconnects.
34

45
## 0.2.0
56
- [BREAKING] Update API client code, specifically the FeedOwnCapability enum.

packages/stream_feeds/lib/src/client/feeds_client_impl.dart

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22

3+
import 'package:rxdart/rxdart.dart';
34
import 'package:stream_core/stream_core.dart';
45

56
import '../cdn/cdn_api.dart';
@@ -270,6 +271,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
270271
feedsRepository: _feedsRepository,
271272
pollsRepository: _pollsRepository,
272273
eventsEmitter: events,
274+
onReconnectEmitter: onReconnectEmitter,
273275
);
274276
}
275277

@@ -474,4 +476,25 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
474476
Future<Result<void>> deleteImage({required String url}) {
475477
return _cdnClient.deleteImage(url);
476478
}
479+
480+
Stream<void> get onReconnectEmitter {
481+
return connectionState.stream.scan(
482+
(state, connectionStatus, i) => switch (connectionStatus) {
483+
Initialized() || Connecting() => (
484+
wasDisconnected: state.wasDisconnected,
485+
reconnected: false,
486+
),
487+
Disconnecting() || Disconnected() => (
488+
wasDisconnected: true,
489+
reconnected: false,
490+
),
491+
Connected() => (
492+
wasDisconnected: false,
493+
reconnected: state.wasDisconnected,
494+
),
495+
_ => state,
496+
},
497+
(wasDisconnected: false, reconnected: false),
498+
).mapNotNull((state) => state.reconnected ? () : null);
499+
}
477500
}

packages/stream_feeds/lib/src/state/feed.dart

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import 'dart:async';
22

33
import 'package:freezed_annotation/freezed_annotation.dart';
4+
import 'package:rxdart/rxdart.dart';
45
import 'package:state_notifier/state_notifier.dart';
56
import 'package:stream_core/stream_core.dart';
67

@@ -48,6 +49,7 @@ class Feed with Disposable {
4849
required this.feedsRepository,
4950
required this.pollsRepository,
5051
required this.eventsEmitter,
52+
required Stream<void> onReconnectEmitter,
5153
}) {
5254
final fid = query.fid;
5355

@@ -65,7 +67,12 @@ class Feed with Disposable {
6567

6668
// Attach event handlers for the feed events
6769
final handler = FeedEventHandler(fid: fid, state: _stateNotifier);
68-
_eventsSubscription = eventsEmitter.listen(handler.handleEvent);
70+
_feedSubscriptions.add(eventsEmitter.listen(handler.handleEvent));
71+
72+
// Automatically refetch data on reconnection
73+
if (query.watch) {
74+
_subscribeToReconnectionUpdates(onReconnectEmitter: onReconnectEmitter);
75+
}
6976
}
7077

7178
FeedId get fid => query.fid;
@@ -86,11 +93,11 @@ class Feed with Disposable {
8693
late final FeedStateNotifier _stateNotifier;
8794

8895
final SharedEmitter<WsEvent> eventsEmitter;
89-
StreamSubscription<WsEvent>? _eventsSubscription;
96+
final CompositeSubscription _feedSubscriptions = CompositeSubscription();
9097

9198
@override
9299
void dispose() {
93-
_eventsSubscription?.cancel();
100+
_feedSubscriptions.cancel();
94101
_stateNotifier.dispose();
95102
_memberList.dispose();
96103
super.dispose();
@@ -628,4 +635,14 @@ class Feed with Disposable {
628635
}
629636

630637
// endregion
638+
639+
void _subscribeToReconnectionUpdates({
640+
required Stream<void> onReconnectEmitter,
641+
}) {
642+
_feedSubscriptions.add(
643+
onReconnectEmitter.listen((_) {
644+
getOrCreate();
645+
}),
646+
);
647+
}
631648
}

0 commit comments

Comments
 (0)