An alternative of stream operation in Riverpod 3

Yuichi Fujiki
6 min readOct 3, 2023
Photo by Tosab Photography on Unsplash

Riverpod offers StreamProvider to manage streams, and in combination with rxdart, you can explore the deep sea of stream programming.

For example, to scratch the surface of stream programming, receiving the latest combo of two different streams is very easy.

import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:rxdart/rxdart.dart';

final streamProviderA = StreamProvider<int>((ref) async* {
yield* Stream.periodic(const Duration(seconds: 2), (index) => index);
});

final streamProviderB = StreamProvider<int>((ref) async* {
yield* Stream.periodic(const Duration(seconds: 3), (index) => index);
});

final combinedStreamProvider = StreamProvider.autoDispose<List<int>>((ref) {
final streamA = ref.watch(streamProviderA.stream);
final streamB = ref.watch(streamProviderB.stream);

return Rx.combineLatest2(streamA, streamB, (a, b) => [a, b]);
});

In this example, streamProviderA provides a stream that emits an event every 2 seconds, and streamProviderB provides a stream that emits an event every 3 seconds. By using Rx.combineLatest2 operator, combinedStreamProvider can receive the latest combination of the two streams.

Displaying the snapshot of the combinedStreamProvider would be like this:

The problem: StreamProvider.stream will be removed in Riverpod 3

In the above code, I am accessing StreamProvider ‘s stream property. However, it is deprecated in Riverpod 2.3 and will be removed in Riverpod 3.

Riverpod’s author Remi Rousselet himself explains this reasoning in one of the Stackoverflow threads. I can’t entirely comprehend it, but it seems accessing the stream property could cause issues that are hard to debug.

Anyway, that’s that. We can’t access the stream directly anymore. How can we achieve what we used to do with the stream before?

EDIT: 2023/10/13 I added an example using AsyncNotifierProvider after StateNotifierProviderto keep up with the latest standards.

We could use good ol’ StateNotifierProvider 🤔

Actually, the answer from Randal Schwartz in the same Stackoverflow thread gave me a hint. He says

“… Streams are useful for places where having every single intermediate state is important to record, like a log or a bank transaction ledger. But in state management, having the latest value is all that’s important. You merely need your view or your provider to consume the other provider’s latest value, …”

Yes, we are only interested in the latest value of the stream when it comes to elements on the UI. And in that perspective, the good ol’ StateNotifierProvider already offers that.

The implementation becomes a bit more involved though. You first need to prepare StateNotifier subtype that can hold/notify the latest combined values.

import 'package:flutter_riverpod/flutter_riverpod.dart';

// Define States
sealed class CombinedValueState {}

class CombinedValueStateData extends CombinedValueState {
final List<int> values;

CombinedValueStateData(this.values);
}

class CombinedValueStateLoading extends CombinedValueState {}

class CombinedValueStateError extends CombinedValueState {
final Object error;
final StackTrace stackTrace;

CombinedValueStateError(this.error, this.stackTrace);
}

// StateNotifier subtype to update states
class CombinedValueStateNotifier extends StateNotifier<CombinedValueState> {
CombinedValueStateNotifier() : super(CombinedValueStateLoading());

void updateWithError(Object error, StackTrace stackTrace) {
state = CombinedValueStateError(error, stackTrace);
}

void updateLoading() {
if (state is CombinedValueStateLoading) return;
state = CombinedValueStateLoading();
}

// We need to call this function when we receive an event from streamA
void updateA(int value) {
if (state is CombinedValueStateData) {
CombinedValueStateData currentState = state as CombinedValueStateData;
currentState.values[0] = value;
state = currentState;
} else {
state = CombinedValueStateData([value, 0]);
}
}

// We need to call this function when we receive an event from streamB
void updateB(int value) {
if (state is CombinedValueStateData) {
CombinedValueStateData currentState = state as CombinedValueStateData;
currentState.values[1] = value;
state = currentState;
} else {
state = CombinedValueStateData([0, value]);
}
}
}

Then, the counterpart of combinedStreamProvider can be written like this using the aforementioned CombinedValueStateNotifier.

final combinedStateNotifierProvider = StateNotifierProvider.autoDispose<
CombinedValueStateNotifier, CombinedValueState>((ref) {
final asyncValueA = ref.watch(streamProviderA);
final asyncValueB = ref.watch(streamProviderB);

CombinedValueStateNotifier stateNotifier = CombinedValueStateNotifier();

asyncValueA.when(data: (value) {
// updating the latest value of streamA
stateNotifier.updateA(value);
}, error: (error, stackTrace) {
stateNotifier.updateWithError(error, stackTrace);
}, loading: () {
stateNotifier.updateLoading();
});

asyncValueB.when(data: (value) {
// updating the latest value of streamA
stateNotifier.updateB(value);
}, error: (error, stackTrace) {
stateNotifier.updateWithError(error, stackTrace);
}, loading: () {
stateNotifier.updateLoading();
});

return state;
});

By watching this Provider, we can display exactly the same thing as the original deprecated example.

We could use AsyncNotifierProvider too 💃

As Randal Schwartz pointed out in the comment, StateNotifierProvider is not recommended in Riverpod 2.x anymore, even though there is no plan of removing it at this point.

So, to keep up with the latest standards, I took a stab at this problem using AsyncNotifierProvider next.

Unlike StateNotifierProvider, AsyncNotifierProvider does not receive ProviderRef , so the heart of the implementation goes into AsyncNotifier itself. My implementation goes as follows.

class CombinedValueAsyncNotifier extends AsyncNotifier<(int, int)?> {
@override
FutureOr<(int, int)?> build() {
debugPrint("rebuilding...");
final asyncValueA = ref.watch(streamProviderA);
final asyncValueB = ref.watch(streamProviderB);

asyncValueA.when(
data: (value) {
debugPrint("new data for stream A: $value");
state = AsyncData(updateA(value));
},
error: (error, stackTrace) {
throw error;
},
loading: () {});

asyncValueB.when(
data: (value) {
debugPrint("new data for stream B: $value");
state = AsyncData(updateB(value));
},
error: (error, stackTrace) {
throw error;
},
loading: () {});

if (state.asData == null) {
// means that both streams are still loading
debugPrint("both streams are still loading");
return null;
}

debugPrint("new data from rebuild: ${state.asData!.value}");
return state.asData!.value;
}

(int, int) updateA(int value) {
debugPrint("state before updateA: $state");
final currentCombo = state.valueOrNull;
if (currentCombo == null) {
return (value, 0);
} else {
return (value, currentCombo.$2);
}
}

(int, int) updateB(int value) {
debugPrint("state before updateB: $state");
final currentCombo = state.valueOrNull;
if (currentCombo == null) {
return (0, value);
} else {
return (currentCombo.$1, value);
}
}
}

...

final combinedValueNotifierProvider =
AsyncNotifierProvider<CombinedValueAsyncNotifier, (int, int)?>(
CombinedValueAsyncNotifier.new);

The outcome behaves exactly the same as the prior examples 👍 With slightly shorter code than StateNotifierProvider example.

My beef about this example though, is that we cannot specify AsyncLoading status at arbitrary point. Once the build function returns, the state of AsyncNotifier becomes AsyncData . AsyncLoading status is only (automatically) assigned at the beginning of the build function until state changes to something else. There is no control over it.

So, if you want to show CircularProgressIndicator at arbitrary point you want, you need to improvise a bit: e.g., return null from build function and if AsyncData's data is null , show CircularProgressIndicator .

class MyCombinedWidget extends ConsumerWidget {
const MyCombinedWidget({super.key});

@override
Widget build(BuildContext context, WidgetRef ref) {
final combineValue = ref.watch(combinedValueNotifierProvider);

switch (combineValue) {
case AsyncData(value: (int, int)? value):
if (value == null) {
// Show CircularProgressIndicator when stream is loading
return const CircularProgressIndicator();
} else {
return Text('Stream A: ${value.$1}, Stream B: ${value.$2}');
}
case AsyncLoading():
// This hardly happens
return const CircularProgressIndicator();
case AsyncError(error: Error error, stackTrace: StackTrace _):
return Text('Error: ${error.toString()}');
case _:
// Doesn't happen
return const Text('Unknown state');
}
}
}

Conclusion

Accessing StreamProvider.stream was deprecated from Riverpod and will be removed soon, even though it was handy. I guess there are different alternatives (for example, in the release note of Riverpod, Remi explains to use FutureProvider ), but in some cases, I think you can get by using StateNotifierProvider or AsyncNotifierProviderIf you come up with a better alternative to solve this case, please share 🙏🏻

You can access my sample code below:

--

--

Yuichi Fujiki

Technical director, Freelance developer, a Dad, a Quadriplegic, Life of Rehab