Android Coroutines - Suspend, Resume, and Dispatch

kotlin-coroutines-1.png?w=2924

In the Introduction section, we already have a big picture of the Coroutines feature, including what it is, how we can use it, and other basic concepts. In this article, I will introduce another critical keyword suspend:

What’s a suspend function?

Remember the example we use before:

1
2
3
4
5
6
7
8
9
10
11
suspend fun requestToken(): Token { ... }
suspend fun createPost(token: Token, item: Item): Post { ... }
fun processPost(post: Post) { ... }

suspend fun postItem(item: Item) {
GlobalScope.launch {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
}
}

You can see the keyword suspend in front of a function. What does suspend keyword mean? If you compile the Kotlin code, you can see something like this:

1
2
// declaration: requestToken(kotlin.coroutines.Continuation<? super kotlin.Unit>)
public final requestToken(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

The code snap above is what we decompile into java code.

1
2
3
4
@Nullable
public final Object requestToken(@NotNull Continuation $completion) {
return Unit.INSTANCE;
}

This is a bytecode/compiled-java-code sample for this suspend function - requestToken(). We can find there’s a parameter passed into requestToken(), which is a Continuation. What’s a Continuation?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext

/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result)
}

It’s an interface for resuming the suspended point. Here’s an example when you try to use CoroutineScope.Launch:

1
2
3
4
5
GlobalScope.launch {
val token = requestToken()
val post = createPost(token)
processPost(post)
}

it will become something like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var10000;
label17: {
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
CoroutineScope $this$launch;
Token token;
MainActivity var6;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
$this$launch = this.p$;
var6 = MainActivity.this;
this.L$0 = $this$launch;
this.label = 1;
var10000 = var6.requestToken(this);
if (var10000 == var5) {
return var5;
}
break;
case 1:
$this$launch = (CoroutineScope)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
token = (Token)this.L$1;
$this$launch = (CoroutineScope)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label17;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

token = (Token)var10000;
var6 = MainActivity.this;
this.L$0 = $this$launch;
this.L$1 = token;
this.label = 2;
var10000 = var6.createPost(token, this);
if (var10000 == var5) {
return var5;
}
}

Post post = (Post)var10000;
MainActivity.this.processPost(post);
return Unit.INSTANCE;
}

In a Coroutine, it doesn’t use a callback function to return value. Instead, it uses the state machine to control what will respond to the next state. A suspend function will block the Coroutine when it’s in, but it wouldn’t block the thread. When it executes the suspend function, it won’t block the whole thread. The current thread can still do other jobs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Thread {

val context = newSingleThreadContext("Test Thread")
GlobalScope.launch (context) {
println("Start first coroutine.")
delay(200)
println("Resume first coroutine after delay 200ms")
}

GlobalScope.launch (context){
println("Start second coroutine.")
delay(100)
println("Resume second coroutine after delay 100ms")
}

Thread.sleep(500)
}.start()

the result:

1
2
3
4
2019-08-17 21:25:22.437 I/System.out: Start first coroutine.
2019-08-17 21:25:22.473 I/System.out: Start second coroutine.
2019-08-17 21:25:22.574 I/System.out: Resume second coroutine after delay 100ms
2019-08-17 21:25:22.656 I/System.out: Resume first coroutine after delay 200ms

That’s why a Coroutine is very lightweight, and you can create it anytime without spending many resources on creating a Coroutine. After this, you might have a question: after resuming the suspended status, current Coroutine will still run on the same thread?

If you remember, I mentioned Dispatcher would decide which thread a Coroutine should run. There are two categories of dispatcher - confined & unconfined. The Default, Main, and IO belong to Confined category, and Unconfined is unconfined. Let’s create two different coroutines and use Confined & Unconfined dispatchers to see what will happen.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Thread {
GlobalScope.launch (Dispatchers.Main) {
println("First coroutine runs in ${Thread.currentThread().name}")
delay(100)
println("First coroutine resumes and runs in ${Thread.currentThread().name}")
}

GlobalScope.launch (Dispatchers.Unconfined){
println("Second coroutine runs in ${Thread.currentThread().name}")
delay(100)
println("Second coroutine resumes and runs in ${Thread.currentThread().name}")
}

Thread.sleep(500)
}.start()

Result:

1
2
3
4
2019-08-17 21:43:22.895 I/System.out: Second coroutine runs in Thread-4
2019-08-17 21:43:22.947 I/System.out: First coroutine runs in main
2019-08-17 21:43:23.005 I/System.out: Second coroutine resumes and runs in kotlinx.coroutines.DefaultExecutor
2019-08-17 21:43:23.050 I/System.out: First coroutine resumes and runs in main

When you use a confined dispatcher, no matter when you suspend the Coroutine, it would still run on the same thread after it resumes for the Dispatchers.Unconfined setting, it would first start on the caller thread but continue on a thread based on where the suspend function is.

So far, I introduce how the suspend function works, but it’s still worth going through how to create a Coroutine, start it and resume mechanism, especially it would be great to know how to switch different Coroutines. Let’s dig in a little bit deeper.

Go back to the CoroutineScope.Launch:

1
2
3
4
5
6
7
8
9
10
11
12
13
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block)
else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

In general, the start parameter will be CoroutineStart.DEFAULT. It would go to here once the start function calls:

1
2
3
4
5
6
7
8
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R,
completion: Continuation
) = runSafely(completion) {
createCoroutineUnintercepted(
receiver, completion
).intercepted().resumeCancellable(Unit)
}

It eventually calls the function - createCoroutineUnintercepted to create a Coroutine. What does this function do?

1
2
3
4
5
6
7
8
9
10
11
public actual fun (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation
): Continuation {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation, Any?>).invoke(it)
}
}

It creates a Coroutine, which is not intercepted without a receiver but with a result type T. This function creates a new and fresh instance of suspendable computation every time when it is invoked. To start executing this prepared Coroutine, it would call resume(Unit) on the returned Continuation instance. So, remember createCoroutineUnintercepted that will call resumeCancellable to execute a created Coroutine.

resumeCancellable function

1
2
3
4
internal fun Continuation.resumeCancellable(value: T) = when (this) {
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}

resume function

1
2
public inline fun Continuation.resume(value: T): Unit =
resumeWith(Result.success(value))

In the resume function, it calls the resumeWith function in a Continuation. Let’s check how it works.

BaseContinuationImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
internal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable {

// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result) {
...

try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
...
}

protected abstract fun invokeSuspend(result: Result): Any?

...
}

BaseContinuationImpl is the implementation of the interface Continuation. You can see when a Coroutine tries to get the outcome, it will call invokeSuspend function which is generated by the compiler. If you still remember the example processPost we listed before, the compiled java code shows that the actual computation is in the invokeSuspend function. So the starting flow is:

1
resumeCancellable(value: T) -> resume(value) -> resumeWith(Result.success(value)) -> invokeSuspend(param)

What does intercepted() do in the startCoroutineCancellable? Let’s see the implementation:

ContinuationImpl

1
2
3
4
5
6
7
8
9
10
11
12
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
...

public fun intercepted(): Continuation<Any?> = intercepted ?:
(context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

...
}

In intercepted(), it tries to call context[ContinuationInterceptor]?.interceptContinuation(this) to get the Continuation, what’s the interceptContinuation()?

1
2
3
4
5
6
public interface ContinuationInterceptor : CoroutineContext.Element {
...

public fun interceptContinuation(continuation: Continuation): Continuation
...
}

It’s been defined in the interface ContinuationInterceptor. You can find:

1
2
3
4
5
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor),
ContinuationInterceptor {
...
}

The CoroutineDispatcher implements this interface. Basically, the context[ContinuationInterceptor] is CoroutineDispatcher. When we use interceptContinuation(), it executes the DispatchedContinuation() in the CoroutineDispatcher class. What does the DispatchedContinuation() do?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
internal class DispatchedContinuation(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation
) : DispatchedTask(MODE_ATOMIC_DEFAULT),
CoroutineStackFrame,
Continuation by continuation {

...

override fun resumeWith(result: Result) {
// check if need to dispatch
if (dispatcher.isDispatchNeeded(context)) {
...

// start dispatching
dispatcher.dispatch(context, this)
} else {
// if not, execute the task as part of current event loop
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
// check if need to dispatch
if (dispatcher.isDispatchNeeded(context)) {
...

// start dispatching
dispatcher.dispatch(context, this)
} else {
// if not, execute the task as part of current event loop
executeUnconfined(value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}

...
}

This class intercepts the resumeCancellable and resumeWith functions. And it checks if this Coroutine will need to dispatch.

Alright, so far, we already know how to suspend, resume, and dispatch a Coroutine. It uses different layers to implement different functionalities. For example, BaseContinuationImpl implements the Coroutine resume, DispatchedContinuation implements the dispatching logic. Let’s move on to how to control the Job itself in the next post.

Happy coding, enjoy.

Part1 - Introduction

Part3 - Callback, Interaction and Cancellation

Part4 - Exception

Part5 - Concurrency

Reference

  1. KotlinConf 2017 - Introduction to Coroutines by Roman Elizarov
  2. Asynchronous Programming Techniques - Kotlin Programming Language