An alternative of stream operation in Riverpod 3
Riverpod offers StreamProvider
to manage streams, and in combination with rxdart, you can explore the deep sea of stream programming.
For example, to scratch the surface of stream programming, receiving the latest combo of two different streams is very easy.
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:rxdart/rxdart.dart';
final streamProviderA = StreamProvider<int>((ref) async* {
yield* Stream.periodic(const Duration(seconds: 2), (index) => index);
});
final streamProviderB = StreamProvider<int>((ref) async* {
yield* Stream.periodic(const Duration(seconds: 3), (index) => index);
});
final combinedStreamProvider = StreamProvider.autoDispose<List<int>>((ref) {
final streamA = ref.watch(streamProviderA.stream);
final streamB = ref.watch(streamProviderB.stream);
return Rx.combineLatest2(streamA, streamB, (a, b) => [a, b]);
});
In this example, streamProviderA
provides a stream
that emits an event every 2 seconds, and streamProviderB
provides a stream that emits an event every 3 seconds. By using Rx.combineLatest2
operator, combinedStreamProvider
can receive the latest combination of the two streams.
Displaying the snapshot of the combinedStreamProvider
would be like this:
The problem: StreamProvider.stream will be removed in Riverpod 3
In the above code, I am accessing StreamProvider
‘s stream
property. However, it is deprecated in Riverpod 2.3 and will be removed in Riverpod 3.
Riverpod’s author Remi Rousselet himself explains this reasoning in one of the Stackoverflow threads. I can’t entirely comprehend it, but it seems accessing the stream
property could cause issues that are hard to debug.
Anyway, that’s that. We can’t access the stream
directly anymore. How can we achieve what we used to do with the stream
before?
EDIT: 2023/10/13 I added an example using AsyncNotifierProvider
after StateNotifierProvider
to keep up with the latest standards.
We could use good ol’ StateNotifierProvider 🤔
Actually, the answer from Randal Schwartz in the same Stackoverflow thread gave me a hint. He says
“… Streams are useful for places where having every single intermediate state is important to record, like a log or a bank transaction ledger. But in state management, having the latest value is all that’s important. You merely need your view or your provider to consume the other provider’s latest value, …”
Yes, we are only interested in the latest value of the stream when it comes to elements on the UI. And in that perspective, the good ol’ StateNotifierProvider
already offers that.
The implementation becomes a bit more involved though. You first need to prepare StateNotifier
subtype that can hold/notify the latest combined values.
import 'package:flutter_riverpod/flutter_riverpod.dart';
// Define States
sealed class CombinedValueState {}
class CombinedValueStateData extends CombinedValueState {
final List<int> values;
CombinedValueStateData(this.values);
}
class CombinedValueStateLoading extends CombinedValueState {}
class CombinedValueStateError extends CombinedValueState {
final Object error;
final StackTrace stackTrace;
CombinedValueStateError(this.error, this.stackTrace);
}
// StateNotifier subtype to update states
class CombinedValueStateNotifier extends StateNotifier<CombinedValueState> {
CombinedValueStateNotifier() : super(CombinedValueStateLoading());
void updateWithError(Object error, StackTrace stackTrace) {
state = CombinedValueStateError(error, stackTrace);
}
void updateLoading() {
if (state is CombinedValueStateLoading) return;
state = CombinedValueStateLoading();
}
// We need to call this function when we receive an event from streamA
void updateA(int value) {
if (state is CombinedValueStateData) {
CombinedValueStateData currentState = state as CombinedValueStateData;
currentState.values[0] = value;
state = currentState;
} else {
state = CombinedValueStateData([value, 0]);
}
}
// We need to call this function when we receive an event from streamB
void updateB(int value) {
if (state is CombinedValueStateData) {
CombinedValueStateData currentState = state as CombinedValueStateData;
currentState.values[1] = value;
state = currentState;
} else {
state = CombinedValueStateData([0, value]);
}
}
}
Then, the counterpart of combinedStreamProvider
can be written like this using the aforementioned CombinedValueStateNotifier
.
final combinedStateNotifierProvider = StateNotifierProvider.autoDispose<
CombinedValueStateNotifier, CombinedValueState>((ref) {
final asyncValueA = ref.watch(streamProviderA);
final asyncValueB = ref.watch(streamProviderB);
CombinedValueStateNotifier stateNotifier = CombinedValueStateNotifier();
asyncValueA.when(data: (value) {
// updating the latest value of streamA
stateNotifier.updateA(value);
}, error: (error, stackTrace) {
stateNotifier.updateWithError(error, stackTrace);
}, loading: () {
stateNotifier.updateLoading();
});
asyncValueB.when(data: (value) {
// updating the latest value of streamA
stateNotifier.updateB(value);
}, error: (error, stackTrace) {
stateNotifier.updateWithError(error, stackTrace);
}, loading: () {
stateNotifier.updateLoading();
});
return state;
});
By watching this Provider, we can display exactly the same thing as the original deprecated example.
We could use AsyncNotifierProvider too 💃
As Randal Schwartz pointed out in the comment, StateNotifierProvider
is not recommended in Riverpod 2.x anymore, even though there is no plan of removing it at this point.
So, to keep up with the latest standards, I took a stab at this problem using AsyncNotifierProvider
next.
Unlike StateNotifierProvider
, AsyncNotifierProvider
does not receive ProviderRef
, so the heart of the implementation goes into AsyncNotifier
itself. My implementation goes as follows.
class CombinedValueAsyncNotifier extends AsyncNotifier<(int, int)?> {
@override
FutureOr<(int, int)?> build() {
debugPrint("rebuilding...");
final asyncValueA = ref.watch(streamProviderA);
final asyncValueB = ref.watch(streamProviderB);
asyncValueA.when(
data: (value) {
debugPrint("new data for stream A: $value");
state = AsyncData(updateA(value));
},
error: (error, stackTrace) {
throw error;
},
loading: () {});
asyncValueB.when(
data: (value) {
debugPrint("new data for stream B: $value");
state = AsyncData(updateB(value));
},
error: (error, stackTrace) {
throw error;
},
loading: () {});
if (state.asData == null) {
// means that both streams are still loading
debugPrint("both streams are still loading");
return null;
}
debugPrint("new data from rebuild: ${state.asData!.value}");
return state.asData!.value;
}
(int, int) updateA(int value) {
debugPrint("state before updateA: $state");
final currentCombo = state.valueOrNull;
if (currentCombo == null) {
return (value, 0);
} else {
return (value, currentCombo.$2);
}
}
(int, int) updateB(int value) {
debugPrint("state before updateB: $state");
final currentCombo = state.valueOrNull;
if (currentCombo == null) {
return (0, value);
} else {
return (currentCombo.$1, value);
}
}
}
...
final combinedValueNotifierProvider =
AsyncNotifierProvider<CombinedValueAsyncNotifier, (int, int)?>(
CombinedValueAsyncNotifier.new);
The outcome behaves exactly the same as the prior examples 👍 With slightly shorter code than StateNotifierProvider
example.
My beef about this example though, is that we cannot specify AsyncLoading
status at arbitrary point. Once the build
function returns, the state
of AsyncNotifier
becomes AsyncData
. AsyncLoading
status is only (automatically) assigned at the beginning of the build
function until state
changes to something else. There is no control over it.
So, if you want to show CircularProgressIndicator
at arbitrary point you want, you need to improvise a bit: e.g., return null
from build
function and if AsyncData's
data is null
, show CircularProgressIndicator
.
class MyCombinedWidget extends ConsumerWidget {
const MyCombinedWidget({super.key});
@override
Widget build(BuildContext context, WidgetRef ref) {
final combineValue = ref.watch(combinedValueNotifierProvider);
switch (combineValue) {
case AsyncData(value: (int, int)? value):
if (value == null) {
// Show CircularProgressIndicator when stream is loading
return const CircularProgressIndicator();
} else {
return Text('Stream A: ${value.$1}, Stream B: ${value.$2}');
}
case AsyncLoading():
// This hardly happens
return const CircularProgressIndicator();
case AsyncError(error: Error error, stackTrace: StackTrace _):
return Text('Error: ${error.toString()}');
case _:
// Doesn't happen
return const Text('Unknown state');
}
}
}
Conclusion
Accessing StreamProvider.stream
was deprecated from Riverpod and will be removed soon, even though it was handy. I guess there are different alternatives (for example, in the release note of Riverpod, Remi explains to use FutureProvider
), but in some cases, I think you can get by using StateNotifierProvider
or AsyncNotifierProvider
If you come up with a better alternative to solve this case, please share 🙏🏻
You can access my sample code below: