Solvedkotlinx.coroutines Flow.collects receives items after it was cancelled on a single thread

Could anyone explain to me why this code throws an exception (on Android)? The collect and cancel functions are both called on the main thread. I tried this with both version 1.2.1 and 1.3.0-M1

var cancelled = false
val job = GlobalScope.launch(Dispatchers.Main) {
    val flow = flow {
        while (true) {
            emit(Unit)
        }
    }
    flow.flowOn(Dispatchers.IO)
        .collect {
            if (cancelled) { // main thread
                throw IllegalStateException() // this exception is thrown
            }
        }
}

GlobalScope.launch(Dispatchers.Main) {
    delay(1000)
    job.cancel() // main thread
    cancelled = true // set to true after the Flow was cancelled.
}

Here is an Android sample project, just run the app and it will crash.

I'm not sure if this is intended behaviour or a bug. I need to make sure all Flows are cancelled in the Android onDestroy so they do not try to update the view if it is already gone. So if this is not a bug I am not sure how I would need to handle this otherwise.

There is a long discussion about this with more code examples on Slack: https://kotlinlang.slack.com/archives/C1CFAFJSK/p1559908916083600

We had some trouble trying to reproduce this in a unit test (it did not throw the exception), so that's why I attached an Android project.

14 Answers

✔️Accepted Answer

I have replaced every single collect with a safeCollect function in my project:

/**
 * Only proceed with the given action if the coroutine has not been cancelled.
 * Necessary because Flow.collect receives items even after coroutine was cancelled
 * https://github.com/Kotlin/kotlinx.coroutines/issues/1265
 */
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
  collect {
    coroutineContext.ensureActive()
    action(it)
  }
}

It would be great if this could be implemented as a default as it's really unexpected behavior and its error prone to always have to remember to not use specific library functions.
On Android my collect functions are always right before they touch the views so they must not get invoked after the view is gone.

Other Answers:

I've found that Rx does not have the same problem because Rx does not have atomic cancellation (figured out that's the issue here).

Rx's ObservableObserveOn checks if it is disposed on the observe-side thread. Kotlin's ChannelFlow does no such check so it inherits Channel's atomic cancellation behavior.

This reframes the issue in my mind as: Should Flow operators like flowOn use atomic cancellation?

I'm leaning towards no, myself. A single Channel can guarantee delivery OR cancellation, but a flow may be built from multiple channels (multiple flowOns with a suspending map between, for example) which means that the benefit of atomic cancellation is something that is easily lost end-to-end.

I'm also thinking it'll help more developers avoid race-condition bugs than if the behavior is left as-is.

And if a developer really does need atomic cancellation, they can still use a Channel.

Note: I've since noticed Issue 1177 which is similar. That specific repro was fixed by changing withContext but the race condition still exists for flowOn and collect.

Here's an example that has failed consistently for me on Android in an Activity's onCreate:

    var timeToCancel = false
    GlobalScope.launch(Dispatchers.Main) {
        flow {
            while (isActive) {
                delay(100)
                if (timeToCancel) {
                    runOnUiThread { //Pretend unfortunate timing of onDestroy
                        cancel()
                    }
                }
                emit(Unit)
            }
        }.flowOn(Dispatchers.IO).collect {
            if (!isActive) {
                throw IllegalStateException("How'd I get here?!")
            }
        }
    }
    GlobalScope.launch(Dispatchers.Main) {
        delay(1000)
        timeToCancel = true
    }

Hi @elizarov,

I believe the purpose of the example was to point out that collect calls its lambda without checking for cancellation first. If the example ignores cancelled and checks !isActive instead, the result is the same.

The documentation you linked states:

All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled.

I think it's easy for developers to gather from this that collect will check for cancellation before calling its provided lambda. This also seems like an intuitive assumption for developers to make.

Here's a use case I've dealt with where this seems especially problematic:

  • An Activity uses C libraries to share code with other platforms
  • Those C libraries are manipulated based on events from objects that live on after the Activity
  • The Activity cleans up its native resources and subscriptions in onDestroy

Note: no thread switching is involved, only different lifetimes between source and observer

Currently, this code base uses RxJava, but if it were naively switched to use Flow and collect, it could result in a corrupted heap due to collect calling a lambda while no longer active that manipulates freed native resources.

While creating a custom collectWhileActive terminal operator is trivial, it seems quite easy for other developers in a similar situation to not know they need such an operator.

This is pretty unexpected to me but explains the issue with flows. This does feel like a very dangerous behavior on Android where objects tend to be nulled out in response to lifecycle methods. This would definitely not happen with RX since like @nickallendev said RX checks for cancellation both in the producer and consumer sides

More Issues: