Learning State & Shared Flows with Unit Tests
State and shared Flows are hot streams that can propagate items to multiple consumers. State Flows have features such as sharing strategies and conflation. Whereas, shared flows allow you to replay and buffer emissions. In this article, we will explore features of shared and state flows with unit testing.
State Flow
Problem
The most common task in Android is to manage and propagate state. Suppose, we had a setup of a View Model that communicates with a view. We could use LiveData to communicate state changes to the view. However, a State Flow is an alternative that is provided by the coroutines library.
How do we set up a StateFlow?
val stateFlow = MutableStateFlow(UIState.Success())
The MutableStateFlow method takes a default value in its constructor. In this case, it is an instance of a success state. I have defined the UIState as a sealed class. You could have the Success type as a class and add any data for your use case.
sealed class UIState {
object Success: UIState()
object Error: UIState()
}
How do we emit to the state flow?
stateFlow.emit(UIState.Error)
State Flow Collection
Let’s look at simple unit tests to learn about the behavior of a MutableStateFlow.
val stateFlow = MutableStateFlow<UIState>(UIState.Success)
@Test
fun `should emit default value`() = runBlockingTest {
stateFlow.test {
expectItem() shouldBe UIState.Success
}
}
Tests Passed
Test Setup
The runBlockingTest
method creates a coroutine. It is the subscriber of the state flow.
I will use the Turbine library by Square to verify emissions from the flow. The library provides a test
extension that internally launches a coroutine and collects from the flow. It provides methods such as expectItem
, expectError
and expectComplete
to verify different events when collecting from a flow.
This test will succeed. When we collect from the state flow using the test
extension, it will emit a successful UI state which is the default value. However, does the flow complete?
State Flow Completion
val stateFlow = MutableStateFlow<UIState>(UIState.Success)
@Test
fun `should emit default value`() = runBlockingTest {
stateFlow.test {
expectItem() shouldBe UIState.Success
expectComplete()
}
}
The test fails when we attempt to verify the flow has completed emitting everything by using the expectComplete
method. This is the error you will see in the test result.
Test Failed
Timed out waiting for 1000 ms kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
The unit test failed with a timed out exception. The coroutine launched by the test extension times out waiting for an emission from the state flow.
State Flow Never Completes
A call to [Flow.collect] on a state flow never completes normally, and neither does a coroutine started by the [Flow.launchIn] function.
Let’s look at another example of consuming a state flow. This example is not a unit test.
val stateFlow = MutableStateFlow<UIState>(UIState.Success)
fun main() = runBlocking {
stateFlow
.onCompletion { println("ON COMPLETE") }
.collect {
println(it)
}
}
Output
UIState.Success
In this example, I am logging the completion and collection of a StateFlow. The completion will never be logged, because the flow never completes. But, what happens if I add this logging to my unit test?
val stateFlow = MutableStateFlow<UIState>(UIState.Success)
@Test
fun `should emit default value`() = runBlockingTest {
stateFlow
.onCompletion { println("ON COMPLETE") }
.test {
expectItem() shouldBe UIState.Success
}
}
Output
ON COMPLETE
Why does it log the completion in this unit test? The flow completes exceptionally in this example. If you look under the hood in the Turbine library, the test
extensions launch a coroutine to collect from the state flow. The coroutine’s job is cancelled in the test
extension.
fun <T> Flow<T>.test(
timeout: Duration = Duration.seconds(1),
validate: suspend FlowTurbine<T>.() -> Unit
) {
coroutineScope {
val events = Channel<Event<T>>(UNLIMITED)
val collectJob = launch(UNDISPATCHED, Unconfined) {
val terminalEvent = try {
collect { item ->
events.send(Event.Item(item))
}
val flowTurbine = ChannelBasedFlowTurbine(events, collectJob, timeout)
flowTurbine.cancel()
}
class ChannelBasedFlowTurbine<T>(…, val collectJob: Job, …,) : FlowTurbine<T> {
override suspend fun cancel() {
collectJob.cancel()
}
}
Source: Turbine Library
The flow in this example doesn’t complete normally.
Conflation
How do you emit to a state flow? We send an item to a state flow by calling the emit
method on the flow. Here is a unit test where we are emitting the error state and listening to the flow. We are asserting whether we get the default state and the emitted item. Let’s see what happens when we run our unit test.
val stateFlow = MutableStateFlow<UIState>(UIState.Success)
@Test
fun `should emit default value`() = runBlockingTest {
stateFlow.emit(UIState.Error)
stateFlow.test {
expectItem() shouldBe UIState.Success
expectItem() shouldBe UIState.Error
}
}
}
Test Failed
Expected <UIState.Success>
, actual <UIState.Error>
are not the same instance.
The unit test fails, because the default value is conflated. Only the most recent item is cached in the state flow and emitted to the subscriber. Here is the passing test.
val stateFlow = MutableStateFlow<UIState>(UIState.Success)
@Test
fun `should emit default value`() = runBlockingTest {
stateFlow.emit(UIState.Error)
stateFlow.test {
expectItem() shouldBe UIState.Error
}
}
}
Tests Passed
Cold Stream vs Hot Stream
Before we move forward discussing state flows, let’s look at what is a cold flow and a hot flow with a unit test.
Cold Stream
What is a cold stream? A cold stream is a flow that triggers the same code every time it is collected.
val coldFlow = flowOf(1, 2, 3).map { it + 1 }
@Test
fun `should emit from cold flow`() = runBlockingTest {
coldFlow.test {
expectItem() shouldBeEqualTo 2
expectItem() shouldBeEqualTo 3
expectItem() shouldBeEqualTo 4
expectComplete()
}
coldFlow.test {
expectItem() shouldBeEqualTo 2
expectItem() shouldBeEqualTo 3
expectItem() shouldBeEqualTo 4
expectComplete()
}
}
In this example, we have a flow that emits numbers 1, 2 and 3. A map operator is applied to it to increment each value. We’re collecting from the flow two times. Each time we collect from the flow, the emission is started again and the map operator is applied.
Hot Stream
What is a hot flow? A hot flow is a stream whose active instance exists independently of the presence of collectors. A state flow and a shared flow are examples of hot flows. We could convert a cold flow into state or shared flow in different ways.
Convert Cold Flow to State Flow
The stateIn extension allows us to convert a cold flow to a state flow. Assume we had a simple cold flow that emitted two strings.
val flowOfEvents = flowOf(
"Event 1",
"Event 2"
)
fun <T> Flow<T>.stateIn(
scope: CoroutineScope
): StateFlow<T>
The stateIn
method takes in a scope to start the cold flow.
@Test
fun `convert cold flow to state flow`() = runBlockingTest {
val stateFlow = flowOfEvents.stateIn(this)
stateFlow.test {
expectItem() shouldBeEqualTo "Event 2"
}
stateFlow.test {
expectItem() shouldBeEqualTo "Event 2"
}
}
In this unit test, the stateIn method is given the test coroutine scope which is created by runBlockingTest. We have two subscribers listening to the state flow. The most recent value emitted by the upstream flow which is the string “Event 2” is emitted by the state flow.
Shared Flow
A shared flow is also a hot flow that features such as replaying and buffering.
How do you set a shared flow?
val sharedFlow = MutableSharedFlow<String>()
Unlike a state flow, the share flow doesn’t need a default value. The shared flow can be given a replay count, buffer capacity or buffer overflow capacity.
Shared Flow Collection
val sharedFlow = MutableSharedFlow<String>()
@Test
fun `collect from shared flow`() = runBlockingTest {
val job = launch(start = CoroutineStart.LAZY) {
sharedFlow.emit("Event 1")
}
sharedFlow.test {
job.start()
expectItem() shouldBeEqualTo "Event 1"
}
}
Tests Passed
In this test, we have a coroutine that is launched lazily which emits a string. It is started lazily, because the replay count is 0. If I emit before subscribing to the shared flow, it would emit anything with a replay count of 0. We start collecting from the shared flow and emit something to it. We verify that we get the string “Event 1”.
Here is the failing unit test where the producer coroutine that emits to the shared flow wasn’t launched lazily.
val sharedFlow = MutableSharedFlow<String>()
@Test
fun `collect from shared flow`() = runBlockingTest {
sharedFlow.emit("Event 1")
sharedFlow.test {
expectItem() shouldBeEqualTo "Event 1"
}
}
Test Failed
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
The coroutine launched by the test
extension will fail waiting for emission. The consumer subscribed to the shared flow.
Replay Count
We could set up a replay count in the shared flow to pass the previous test. A replay count specifies how many previous emissions to replay to any subscribers.
val sharedFlow = MutableSharedFlow<String>(replay = 1)
@Test
fun `collect from shared flow`() = runBlockingTest {
sharedFlow.emit("Event 1")
sharedFlow.test {
expectItem() shouldBeEqualTo "Event 1"
}
}
Tests Passed
In this test, the shared flow has a replay count of 1. The previous emission “Event 1” was sent to the new subscriber launched with the test
extension.
Sharing Strategies
We could convert cold flows to hot flows using the shareIn
operator. The shareIn operator takes in three arguments.
fun <T> Flow<T>.shareIn(
scope, ----> The coroutine scope in which sharing is started
started, ----> Sharing Policy
replay ----> The number of values replayed to new subscriber
)
There are three different strategies you could apply.
While Subscribed
Assume we want to convert the following cold flow to a shared flow.
val flow = flowOf(
"Event 1",
"Event 2",
"Event 3"
)
The behavior of the SharingStarted.WhileSubscribed
strategy is to start upstream when a subscriber is present and it will stop after the last subscriber disappears. Let’s explore this behavior with unit tests.
@Test
fun `collect with while subscribed strategy`() = runBlockingTest {
val sharingScope = TestCoroutineScope()
val sharedFlow = flow
.onStart { println("ON START") }
.shareIn(
sharingScope,
SharingStarted.WhileSubscribed(),
1
)
}
When we run this test, you’ll see that the statement in the onStart
method is not logged. Since we don’t have a subscriber the flow hasn’t started.
@Test
fun `collect with while subscribed strategy`() = runBlockingTest {
val sharingScope = TestCoroutineScope()
val sharedFlow = flow
.onCompletion { println("SHARED FLOW COMPLETED") }
.shareIn(
sharingScope,
SharingStarted.WhileSubscribed(),
1
)
sharedFlow.test {
expectItem() shouldBeEqualTo "Event 1"
expectItem() shouldBeEqualTo "Event 2"
expectItem() shouldBeEqualTo "Event 3"
}
}
Tests Passed
The shared flow will complete, because the test subscriber is cancelled internally. This test will pass.
Eagerly
The SharingStarted.Eagerly
strategy starts the upstream flow even where there are no subscribers and never stops.
@Test
fun `collect with eager strategy`() = runBlockingTest {
val sharingScope = TestCoroutineScope()
val sharedFlow = flow
.onStart { println("ON START") }
.shareIn(
sharingScope,
SharingStarted.Eagerly,
1
)
}
Output
ON START
@Test
fun `collect with eager strategy`() = runBlockingTest {
val sharingScope = TestCoroutineScope()
val sharedFlow = flow
.shareIn(
sharingScope,
SharingStarted.Eagerly,
1
)
sharedFlow.test {
expectItem() shouldBeEqualTo "Event 3"
}
}
Tests Passed
In this test, we’re verifying the item emitted by the shared flow is the string “Event 3”. Since the flow starts eagerly and it is a hot flow, the collection will not start from the beginning. By the time we subscribe to the shared flow, it will have emitted “Event 3”.
Lazily
A lazily sharing strategy starts the upstream flow when the first subscriber appears. This behavior is similar to the while subscribed strategy. However, the lazily behavior never stops the upstream flow.
Let’s run the test above with the lazily strategy.
@Test
fun `collect with eager strategy`() = runBlockingTest {
val sharingScope = TestCoroutineScope()
val sharedFlow = flow
.onStart { println("ON START") }
.onCompletion { println("SHARED FLOW COMPLETED") }
.shareIn(
sharingScope,
SharingStarted.Lazily,
1
)
sharedFlow.test {
expectItem() shouldBeEqualTo "Event 3"
}
}
Test Failed
java.lang.AssertionError: Expected <Event 3>
, actual <Event 1>
.
The assertion will fail this time. When the test extension begins to execute, the shared flow will have its first subscriber. It will emit the item which is “Event 1”.
Here is the passing test.
@Test
fun `collect with eager strategy`() = runBlockingTest {
val sharingScope = TestCoroutineScope()
val sharedFlow = flow
.shareIn(
sharingScope,
SharingStarted.Lazily,
1
)
sharedFlow.test {
expectItem() shouldBeEqualTo "Event 1"
expectItem() shouldBeEqualTo "Event 2"
expectItem() shouldBeEqualTo "Event 3"
}
}
Tests Passed
Conclusion
We went over features of a state flow and shared flow with unit tests. We looked at how to set up a state flow and how to emit items to it. We look at conflation with various unit tests. We also looked at how to set up a shared flow. I hope it was helpful for you.