Unit Testing Delays, Errors & Retries with Kotlin Flows
Featured in Android Weekly #406
In February, I gave a presentation at the NY Android Meetup on unit testing Channels and Flows with practical use cases. In this blog post, I will share with you how to test delays, retries, and errors with Flows. I’ve also added useful tips and things to watch out for when unit testing.
Use Case
Before, we could write unit tests. We need a use case. Suppose we are building a feature in an app that displays user details. We may have a setup as shown below.
We have a Repository that goes out to the API and gets the user’s details and gives it to the View Model to display it to the UI. In this article, I will focus on creating the repository and testing it. How would we go about implementing the Repository using a Flow?
Repository
The repository will use an API service to get the user’s details. We could implement this using Retrofit which has support for suspending functions.
interface ApiService {
@GET("/users/{id}")
suspend fun userDetails(@Path("id") id: Int): UserDetails
}
In this API service, we are going to hit an endpoint that takes an id
and return the data in a UserDetails
response.
fun <T> flow(
block: suspend FlowCollector<T>.() -> Unit
): Flow<T>
Source: Builders.kt
Using the flow
method, we could implement our Repository as follows.
class UserRepository(val apiService: ApiService) {
fun userDetails(id: Int): Flow<Result<UserDetails>> {
return flow {
val users = apiService.userDetails(id)
emit(Result.success(users))
}
}
}
Tips
This Flow is emitting a single item. I could the single method on the Flow and have the userDetails
return Result<UserDetails>
. However, the restriction on using Result type is that it can’t be returned from a method.
This could be implemented without using Flows. But, I want to use a simple example to illustrate testing Flows.
Error Handling
What happens if an error occurs in our example? If the method apiService.userDetails(id)
throws an exception, it will cause a crash if not handled properly. Here is an example of collecting from a Flow inside of a coroutine launched in a view model scope. It would crash due to the exception.
viewModelScope.launch {
val flow = repository.getUserDetails("id")
flow.collect { result: Result<UserDetails>
showUI(result.data)
}
}
There are three ways to handle exceptions.
-
Provide an exception handler to the coroutine in the launch method.
-
Use a try and catch around the getUserDetails method.
-
Catch the exception in the Flow.
All of these approaches are valid depending on your use case. I will show you how to use the third approach.
class UserRepository(val apiService: ApiService) {
fun userDetails(id: Int): Flow<Result<UserDetails>> {
return flow {
val users = apiService.userDetails(id)
emit(Result.success(users))
}
.catch { throwable ->
emit(Result.error(throwable))
}
}
}
The catch extension on a Flow intercepts the exception. We then emit it in a wrapper type like a Result. That’s exactly what I’m doing above using Result.error(throwable)
. With this approach, the exception is caught and won’t crash our application. Instead, it will be incorporated into our Result type.
Retry
If an error happens, I may want to retry the request a number of times. The coroutines library provides a retry extension that allows you to specify the number of retries and when to perform it. Using the retry extension, our Repository will now look as follows.
class UserRepository(val apiService: ApiService) {
fun userDetails(id: Int): Flow<Result<UserDetails>> {
return flow {
val users = apiService.userDetails(id)
emit(Result.success(users))
}
.retry(2) { e ->
(e is Exception).also { if (it) delay(1000) }
}
.catch { throwable ->
emit(Result.error(throwable))
}
}
I am specifying that I want to perform two retries when an exception occurs. However, before retrying the request, I will delay the coroutine for one second. Each retry will execute the Flow block again.
Dispatcher
In order to test delays and have control over them, we need to be able to inject a dispatcher. Currently, our repository will use a dispatcher from the coroutine it is called from. But, it is a good practice to explicitly inject into the repository which dispatcher the Flow will use. The coroutines test library provides a test dispatcher that allows us to control virtual time. We could modify our repository to take in a dispatcher in its constructor.
class UserRepository(
val apiService: ApiService,
val dispatcher: CoroutineDispatcher
) {
fun userDetails(id: Int): Flow<Result<UserDetails>> {
return flow {
val users = apiService.userDetails(id)
emit(Result.success(users))
}
.retry(2) { e ->
(e is Exception).also { if (it) delay(1000) }
}
.catch { throwable ->
emit(Result.error(throwable))
}
.flowOn(dispatcher)
}
}
The injected dispatcher is used in the Flow by using the flowOn extension. This means everything before flowOn
method which is the retry, catch and the flow block itself will run on the inject dispatcher.
Tips
A common pattern followed in the community is to inject a provider for the dispatcher. The provider could give you a Dispatchers.IO or Dispatchers.Default.
Unit Tests
How do we unit test the repository? There are three cases we want to test in our example.
-
Flow emits successfully.
-
Flow completes all retries with an error.
-
Flow retries and a successful response occurs.
Test Success
How do we test the simplest case of a successful emission? We need to mock the API service to return data. I will use mockito-kotlin for stubbing our API service.
class UserRepositoryTest {
val testDispatcher = TestCoroutineDispatcher()
val apiService = mock<ApiService>()
val repository = UserRepository(apiService, testDispatcher)
}
Create the repository we will test with that takes a test dispatcher and a mocked API service. In order to write our test case, we have to create a coroutine that our test will run in. There are two methods we could use from the coroutine library to do this.
The runBlocking method is good for testing non-delays while runBlockingTest gives you finer control over virtual time if you need it. For the successful case, we will use runBlocking to create a coroutine.
class UserRepositoryTest {
val testDispatcher = TestCoroutineDispatcher()
val apiService = mock<ApiService>()
val repository = UserRepository(apiService, testDispatcher)
@Test
fun `flow emits successfully`() = runBlocking {
// Mock API Service
val userDetails = UserDetails(1, "User 1", "avatar_url")
userService.stub {
onBlocking { userDetails(1) } doReturn userDetails
}
// Test
val flow = repository.getUserDetails(id = 1)
// Verify
flow.collect { result: Result<UserDetails> ->
result.isSuccess.shouldBeTrue()
result.onSuccess {
it shouldBeEqualTo userDetails
}
}
}
}
There are three steps in this test — mock, test and verify.
Mock API Service
I am stubbing out the API service to return a mock UserDetails response. Since the userDetails
method is a suspending function, I cannot use
on { userDetails(id = 1) } doReturn userDetails
. It will give an error that a suspending method can only be called from another suspending method. The lambda block of the on
method is not suspending. Mockito-kotlin provides an onBlocking method that takes in a suspending lambda and stubs the method out in a runBlocking
coroutine.
Test & Verify
In the test phase, I get the Flow from the repository, collect from it and verify the data in the result. I am using the library Kluent for verification. It provides methods such as shouldBeTrue
and shouldBeEqualTo
.
Tips
There is a library called MockK that provides methods to verify Flow assertions. In the community, libraries have also built patterns to performs assertions. Check out the SQL Delight’s library Flow Assert pattern.
Test Retry
How do we test that the Flow handles exceptions and retries? There are two retry cases we want to test.
-
Flow completes all retries with an error.
-
Flow retries and a successful response occurs.
Let’s start with the first test case.
- Flow completes all retries with an error.
@Test
fun `should retry with error`() =
testCoroutineDispatcher.runBlockingTest {
userService.stub {
onBlocking { userDetails(1) } doAnswer {
throw IOException()
}
}
val flow = repository.getUserDetails(id = 1)
flow.collect { result: Result<UserDetails> ->
result.isFailure.shouldBeTrue()
}
}
In our Repository, when an error occurs, we delay for 1 second before executing the flow
block again.
flow {
val users = apiService.userDetails(id)
emit(Result.success(users))
}.retry(2) { e ->
(e is Exception).also { if (it) delay(1000) }
}.catch { throwable ->
emit(Result.error(throwable))
}.flowOn(dispatcher)
As we have delays in our code, we will use the runBlockingTest
method in our test.
What does runBlockingTest
do?
This method creates a coroutine with a TestCoroutineScope
that has a TestCoroutineDispatcher
and a TestCoroutineExceptionHandler
. This method takes in a lambda block testBody
and an optional context
.
fun runBlockingTest(
context: CoroutineContext = EmptyCoroutineContext,
testBody: suspend TestCoroutineScope.() -> Unit
)
In my test, I am using this method as
testCoroutineDispatcher.runBlockingTest {
...
}
This will create a coroutine with our own customtestCoroutineDispatcher.
This is equivalent to:
runBlockingTest(testCoroutineDispatcher) {
...
}
However, an extension is provided on runBlockingTest
that I am using to provide my own dispatcher.
fun TestCoroutineDispatcher.runBlockingTest(
block: suspend TestCoroutineScope.() -> Unit
)
Source: TestBuilders.kt
After creating the coroutine my test will run in, I am mocking my API service to throw an exception. Then, I proceed to collect from my Flow and perform an assertion.
@Test
fun `should retry with error`() =
testCoroutineDispatcher.runBlockingTest {
// Mock
userService.stub {
onBlocking { userDetails(1) } doAnswer {
throw IOException()
}
}
// Test
val flow = repository.getUserDetails(id = 1)
// Verify
flow.collect { result: Result<UserDetails> ->
result.isFailure.shouldBeTrue()
}
}
Let’s trace how this test runs. If I add logs or add breakpoints in the IDE, it will help visualize what happens.
fun getUserDetails(id: Int): Flow<Result<UserDetails>> {
return flow {
println("STARTING FLOW")
val users = userService.userDetails(id)
emit(Result.success(users))
}
.retry(retries = 2) { t ->
println("RETRY")
(t is IOException).also {
if (it) {
println("BEFORE DELAY")
delay(DELAY_ONE_SECOND)
println("AFTER DELAY")
}
}
}
.catch {
println("CATCH")
emit(Result.failure(it))
}.flowOn(dispatcher)
}
Here is the output of running the test.
// Initial Run of Flow
STARTING FLOW
// 1st Retry
RETRY
BEFORE DELAY
AFTER DELAY
STARTING FLOW
// 2nd Retry
RETRY
BEFORE DELAY
AFTER DELAY
STARTING FLOW
// Both retries failed run catch block
CATCH
Initial Run of Flow
When I collect from the Flow in the test, it will execute the flow block and an exception will be thrown since we used doAnswer to train our Api Service to throw an IOException.
1st and 2nd Retry
It will then go into the retry block and delay for one second. After the delay, it will execute the Flow block again. An exception will occur again. This process will repeat itself for the 2nd retry.
Catch Error
Since we only specified two retries, it will finally halt the retries and go into the catch block since all retries failed and emit the error.
Under the hood
// The ordered queue for the runnable tasks.
private val queue = ThreadSafeHeap<TimedRunnable>()
Every time it dispatches, it adds an TimedRunnable
object to the queue. This object contains the Runnable task and a time to perform the task.
private fun post(block: Runnable) =
queue.addLast(
TimedRunnable(block, _counter.getAndIncrement())
)
class TimedRunnable(
@JvmField val runnable: Runnable,
private val count: Long = 0,
@JvmField val time: Long = 0
)
Source: TimedRunnable
As time is advanced forward on the test dispatcher, the Runnable tasks are executed.
When you look at this test, I am not explicitly advancing time forward. How does it move virtual time forward?
@Test
fun `should retry with error`() =
testCoroutineDispatcher.runBlockingTest {
userService.stub {
onBlocking { userDetails(1) } doAnswer {
throw IOException()
}
}
val flow = repository.getUserDetails(id = 1)
flow.collect { result: Result<UserDetails> ->
result.isFailure.shouldBeTrue()
}
}
Let’s look under the hood at the implementation of runBlockingTest . You will see that it will advance time forward until there are no more runnables to execute for all of your test.
val startingJobs = safeContext.activeJobs()
val scope = TestCoroutineScope(safeContext)
val deferred = scope.async {
scope.testBody()
}
dispatcher.advanceUntilIdle()
deferred.getCompletionExceptionOrNull()?.let {
throw it
}
scope.cleanupTestCoroutines()
val endingJobs = safeContext.activeJobs()
if ((endingJobs - startingJobs).isNotEmpty()) {
throw UncompletedCoroutinesError(
"Test finished with active jobs: $endingJobs"
)
}
Source: TestCoroutineDispatcher
dispatcher.advanceUntilIdle()
will advance the delays in our tests. In my tests, I am leveraging this to go through all the retries with delays and emit an error. This is how it works under the hood.
Tips
The most common error you will get is Test finished with active jobs
error when you use runBlockingTest
. This happens, because you have an active coroutine running in your test. For example, if you had a producer using a Rendezvous Channel and you had called send and didn’t receive the value, you will get this error. That’s only one example of it. So, watch out for that error.
Test Retry with Success
- Flow retries and a successful response occurs.
How do we test that a retry actually succeeds and emits the data?
@Test
fun `should retry with success`()
testCoroutineDispatcher.runBlockingTest {
// Mock
var throwError = true
val userDetails = UserDetails(1, "User 1", "avatar_url")
userService.stub {
onBlocking { userDetails(1) } doAnswer {
if (throwError) throw IOException() else userDetails
}
}
// Test
val flow = repository.getUserDetails(id = 1)
// Verify
launch {
flow.collect { result ->
result.isSuccess.shouldBeTrue()
}
}
// 1st Retry
advanceTimeBy(DELAY_ONE_SECOND)
// 2nd Retry
throwError = false
advanceTimeBy(DELAY_ONE_SECOND)
}
In this test, I am controlling whether an exception will be thrown or a successful response be given by using the throwError
variable.
var throwError = true
val userDetails = UserDetails(1, "User 1", "avatar_url")
userService.stub {
onBlocking { userDetails(1) } doAnswer {
if (throwError) throw IOException() else userDetails
}
}
I am collecting from the Flow in a child coroutine. This coroutine is the consumer. If I didn’t launch a child coroutine, this test would fail.
launch {
flow.collect { result ->
result.isSuccess.shouldBeTrue()
}
}
I want to be able to advance time forward when it encounters a delay so that the first retry fails. But, I want the second retry to pass. I will set throwError
to false so that when the flow
block is executed it succeeds with data.
// 1st Retry
advanceTimeBy(DELAY_ONE_SECOND)
// 2nd Retry
throwError = false
advanceTimeBy(DELAY_ONE_SECOND)
Tips:
Watch out for an undocumented edge case of runBlockingTest
. If you wrote a test that has multiple errors, runBlockingTest
will drop the first one. See TestCoroutineExceptionHandler.
Resources
I hope that was helpful to you. If you practice writing lots of tests using runBlockingTest
for different scenarios, then it will get easier. If you have any questions, please let me know in the response. I gave more examples in my presentation. Please check out my slides on Unit Testing Channels & Flow in Practice.