Custom Kotlin Coroutine Context Uses Cases

8 minute read

A Context is a set of elements that provides us with information about the coroutine. A Job, Dispatcher, and Exception Handler are examples of context elements provided by the coroutines library. But, sometimes you may want to create your own context element. In this article, I will explore the use cases where a custom context is useful.

What is a Context?

A Context is a map of elements that describes the environment of the coroutine. Internally, it is an interface that defines the operations you could do with it such as query, update, or add. The coroutines library provides several elements such as Job, Dispatcher, or Coroutine Name.

Let’s look at the underlying implementation of a Context more closely.

public interface CoroutineContext {
     
     // Operations
     operator fun <E : Element> get(key: Key<E>): E?
     
     fun <R> fold(initial: R, operation: (R, Element) -> R): R
     
     operator fun plus(context: CoroutineContext): CoroutineContext
     
     fun minusKey(key: Key<*>): CoroutineContext

     // Key & Elements
     interface Key<E : Element>
     interface Element : CoroutineContext {
         
         public val key: Key<*>
         ...
     }
}

Source: CoroutineContext

This interface defines operations for getting and adding to a Context. Each item in the context is identified by a unique Key. To see a concrete implementation of this interface, let’s look at a Job under the hood.

What is a Job?


A Job lets us query the life cycle of a coroutine. We could know whether a coroutine is active, canceled, or is completing from this construct. When the launch method is called, it returns an instance of a Job. This instance can be used to cancel the coroutine or query its lifecycle state as shown below.

val scope = CoroutineScope()
val job: Job = scope.launch {
}

job.isActive
job.cancel()

Besides getting the Job from the launch method, I could also get it by querying the context using index notation. The launch extension provides you with the scope of the coroutine as a receiver of a lambda block. See the signature of the method below.

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

Source: Builders.common

A CoroutineScope provides a context as a param.

public interface CoroutineScope {
   val coroutineContext: CoroutineContext
}

Source: CoroutineScope

val scope = CoroutineScope(Dispatcher.IO)
scope.launch {
    val job = this.coroutineContext[Job]
}

In this example, I am creating a coroutine by defining scope with an IO Dispatcher. I am injecting which dispatcher the coroutine will run on into the context. The coroutine will automatically create a Job for me in the context. 

I am able to get the Job from the context because the get operator is defined in the context interface. Another way to define a coroutine is to merge context elements together. 

val scope = CoroutineScope(Job() + Dispatcher.IO)

In this example above, I’m using the plus operator defined in the context interface. We are merging a custom job and dispatcher together and injecting it into the context.

Job Implementation


If we look at how a Job is defined, it gives an idea of how to define our own context element.

interface Job : CoroutineContext.Element {
   /**
    * Key for [Job] instance in the coroutine context.
    */    
    companion object Key : CoroutineContext.Key<Job> {
    }
}

Source: Job

Steps for writing a Coroutine Context Element


There are two steps to define your context element. 

  1. Inherit the CoroutineContext.Element interface. This will give you the ability to get, update or merge your context element.

  2. Define a key for your contextual element. This is done by creating a companion object Key that inherits CoroutineContext.Key. This allows the user to use the class name in the index notation to get the element from the context.


Use Cases

In what scenarios, would you want to define your own context element?

Dispatcher Provider


Problem

A pattern for using a dispatcher is to inject it into the constructor via Dagger using a qualifier.

class Repository(@IODispatcher dispatcher: CoroutineDispatcher) {
   ...
}


While this approache is valid, is there a type safe approach to using scopes and dispatchers?

Solution

An approach we could take is to create a custom context element that provides dispatchers. Around this context element we could create types that make sure we are launching a coroutine on the correct dispatcher. There is an awesome utility library by Rick Busarow called Dispatch that uses this pattern. Let’s explore how it works.

dispatch-diagram

This diagram shows the types and utilities provided by the Dispatch library. All the utilities are based on custom dispatcher provider.

interface DispatcherProvider : CoroutineContext.Element {

  override val key: CoroutineContext.Key<*> get() = Key

  val default: CoroutineDispatcher
  val io: CoroutineDispatcher
  val main: CoroutineDispatcher
  val mainImmediate: CoroutineDispatcher
  val unconfined: CoroutineDispatcher

  companion object Key : CoroutineContext.Key<DispatcherProvider>
}

class DefaultDispatcherProvider : DispatcherProvider {

    override val default: CoroutineDispatcher = Dispatchers.Default
    
    override val io: CoroutineDispatcher = Dispatchers.IO
    
    override val main: CoroutineDispatcher get() = Dispatchers.Main
    
    override val mainImmediate: CoroutineDispatcher get() = Dispatchers.Main.immediate
    
    override val unconfined: CoroutineDispatcher = Dispatchers.Unconfined
}

Source: DispatcherProvider

The core of the library is this custom context element. This interface provides you with a default,io,main or unconfined dispatcher. A concrete implementation is provided to you with DefaultDispatcherProvider.

IO Scope


If we want to ensure a person launches a coroutines on an IO dispatcher, we coulde use IOScope type.

val scope = IOCoroutineScope()
scope.launch {
   println(this.coroutineContext[ContinuationInterceptor])
}

Output

LimitingDispatcher[dispatcher = DefaultDispatcher]


Internally, the method IOCoroutineScope create a scope with the IO Dispatcher used from the custom context element shown earlier. It is specifying it using the plus operator.

fun IOCoroutineScope(
    job: Job = SupervisorJob(),
    dispatcherProvider: DispatcherProvider =   
                        DefaultDispatcherProvider()
): IOCoroutineScope = object : IOCoroutineScope {
 
   override val coroutineContext = job +
           dispatcherProvider.io + dispatcherProvider
}

Source: CoroutineScopes

FlowOn Utilities


Besides providing scopes with a specific dispatcher, the library takes provides utilities for changing the context for a Flow.

flowOf("A", "B", "C")
       .map {
           println(Thread.currentThread().name)
           "$it 1"
       }
       .flowOnIO()
       .collect {
           println(Thread.currentThread().name)
           println(it)
       }

Output

DefaultDispatcher-worker-1
DefaultDispatcher-worker-1
DefaultDispatcher-worker-1
main
A 1
main
B 1
main
C 1

Internally, it uses the same idea of updating the context of the Flow from the provider.

fun <T> Flow<T>.flowOnIO(): Flow<T> = flow {
  flowOn(coroutineContext.dispatcherProvider.io)
   .collect { emit(it) }
}

Source: Flow

Dispatch is a useful library that builds upon the custom context element that is a dispatcher provider. It also has testing utilies.

Thread Context Element


Some libraries use ThreadLocal to store context data. Examples are Log4j and kroto-plus. How do we pass context data to a coroutine or from one coroutine to another?

Kroto-plus


Problem

Let’s look how this is accomplished in the kroto-plus library. This library provides you with the ability to communicate with a Grpc system. It provides an API using coroutines.

The library generates a io.grpc.Context. It can be used to set values. When we launch a coroutine in a specific dispatcher, how could this context value to it?

Solution

public class GrpcContextElement(
    /**
     * The value of [io.grpc.Context] grpc context.
     */
    public val context: io.grpc.Context = io.grpc.Context.current()
) : ThreadContextElement<io.grpc.Context>, AbstractCoroutineContextElement(Key) {
    /**
     * Key of [GrpcContextElement] in [CoroutineContext].
     */
    companion object Key : CoroutineContext.Key<GrpcContextElement>

    override fun updateThreadContext(context: CoroutineContext): io.grpc.Context =
        this@GrpcContextElement.context.attach()

    override fun restoreThreadContext(context: CoroutineContext, oldState: io.grpc.Context) {
        this@GrpcContextElement.context.detach(oldState)
    }

}

Source: GrpcContextElement

This is a custom context element much like the dispatcher provider. It defines a unite for the context element. It overrides two methods from the ThreadContextElement to attach and detach its context with state.

This allows you to set a value outside the coroutine in its context and read from it inside the coroutine.

// Create a gRPC context key for putting a value into io.grpc.Context
 * val KEY_FOR_DATA = io.grpc.Context.key<String>("data")

launch(grpcContext.asContextElement()) {
   // Retrieve the value for KEY_FOR_DATA from the current io.grpc.Context // and print it
   println(KEY_FOR_DATA.get())
 }

Source:GrpcContextElement

Log4j


The same pattern is also for the Log4j library. The coroutines library provides an integration for it.

class MDCContext(
    
    public val contextMap: MDCContextMap = MDC.getCopyOfContextMap()
) : ThreadContextElement<MDCContextMap>, AbstractCoroutineContextElement(Key) {
    
    companion object Key : CoroutineContext.Key<MDCContext>

    override fun updateThreadContext(context: CoroutineContext): MDCContextMap {
        val oldState = MDC.getCopyOfContextMap()
        setCurrent(contextMap)
        return oldState
    }

    override fun restoreThreadContext(context: CoroutineContext, oldState: MDCContextMap) {
        setCurrent(oldState)
    }

    private fun setCurrent(contextMap: MDCContextMap) {
        if (contextMap == null) {
            MDC.clear()
        } else {
            MDC.setContextMap(contextMap)
        }
    }
}

Source: MDCContext

This code snipped is setting and restoring the MDC context when there is a context change. In this example, I could set a value for MDC outside the coroutine and performing logging inside of it.

MDC.put("key", "value") 

launch(MDCContext()) {
   logger.info { ... } 
}

This is a common pattern you will see in libraries that have their own context and use ThreadLocal.

Database Transactions


Room, a presistance library, has support for performing database transaction with coroutines. It provides an extension withTransaction which accepts transactions you want to perform in its lambda block.

database.withTransaction {
    dao().deleteAll()
    dao().insert(...)
} 

Everytime a transaction is performed, the library add a custom context element called TransactionElement.

internal class TransactionElement(
    private val transactionThreadControlJob: Job,
    internal val transactionDispatcher: ContinuationInterceptor
) : CoroutineContext.Element {

    // Key & Element
    companion object Key : CoroutineContext.Key<TransactionElement>
    override val key: CoroutineContext.Key<TransactionElement>
        get() = TransactionElement
    
    private val referenceCount = AtomicInteger(0)

    fun acquire() {
        referenceCount.incrementAndGet()
    }
    fun release() {
        val count = referenceCount.decrementAndGet()
        if (count < 0) {
            throw IllegalStateException("Transaction was never started or was already released.")
        } else if (count == 0) {
            // Cancel the job that controls the transaction thread, causing it to be released.
            transactionThreadControlJob.cancel()
        }
    }
}


When transactions are performed, it acquires the context element. This increments the counter as seen above. Once all the transaction are completed, the Job of the coroutine is cancelled. This is a use case for handling transaction as you would see commonly in database coroutine intergations.

I hope seeing the various use cases for defining a custom context element was useful to solve your own problems. Thanks for reading!

Resources