Solvedkotlinx.coroutines BroadcastChannel.asFlow().onStart(...) is invoked before the subscription is opened

In the docs, we have this example of onStart(...):

flowOf("a", "b", "c")
    .onStart { emit("Begin") }
    .collect { println(it) } // prints Begin, a, b, c

Not just with emit(...), this works with sending to a Channel as well.

val channel = Channel<String>(4)

channel.consumeAsFlow()
    .onStart { 
      channel.send("Begin")
      channel.send("a")
      // etc.
    }
    .collect { println(it) } // prints Begin, a, b, c

However, with a BroadcastChannel we won't get anything:

val channel = broadcast {
  send(2)
}

channel.asFlow()
    .onStart { channel.send(1) }
    .collect { println(it) } // prints 2
}

If onStart(...) is primarily a callback to say that "we're collecting data", instead of a fancy way to prepend data to the Flow, then this feels wrong.

The difference is because of how the subscription is created. The subscription channel isn't created until the builder is reached, which is after onStart(...):

public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
    emitAll(openSubscription())
}

onStart(...) is being invoked at the same time as always, and the value is being sent, but the subscription's ReceiveChannel is created afterwards and never gets it.

Proposed Solution

Make asFlow() return its own internal type (BroadcastFlow?), and give it an update(...) function similar to ChannelFlow. This function will just accumulate the action(s) and store it/them until the final collect(...) is called.

Then make onStart(...) do a type check and just call update(...) if it's a BroadcastFlow.
When it's time to start collecting, create the subscription, then invoke the action, then call emitAll(...).

public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = BroadcastFlow(this)

internal class BroadcastFlow<T>(
    private val source: BroadcastChannel<T>,
    private val _startAction: suspend FlowCollector<T>.() -> Unit = {}
) : Flow<T> {

    fun update(
        startAction: suspend FlowCollector<T>.() -> Unit
    ): BroadcastFlow<T> = BroadcastFlow(channel) {
        startAction()
        _startAction()
    }

    override suspend fun collect(collector: FlowCollector<T>) {
        val channel = source.openSubscription()
        collector._startAction()
        collector.emitAll(channel)
    }
}

public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = if (this is BroadcastFlow) {
    update(action)
} else {
    unsafeFlow {
        // Note: unsafe flow is used here, but safe collector is used to invoke start action
        SafeCollector<T>(this, coroutineContext).action()
        collect(this) // directly delegate
    }
}
27 Answers

βœ”οΈAccepted Answer

UNDISPATCHED is predictable:

launch(start = CoroutineStart.UNDISPATCHED) {
    print(1)
    delay(100) // the actual suspension that matters here
    print(2)
}
print(3)

Prints: 132, playground: https://pl.kotl.in/R9w5PAcps

One advantage that onSubscription brings to the table is that the code reads easier and you can write all the code with a single operator chain. Let's take a simplified form of the example from #1758 (comment)

Here's the code with undispatched start:

suspend fun awaitResponse(cmd: Cmd, id: ID): Response = coroutineScope {
    val response = async(start = CoroutineStart.UNDISPATCHED) {
        sharedConnectionFlow.filter { it.id == id }.first()
    }
    connection.writeOrThrow(cmd)
    response.await()
}

Here's how you'd write the same code with onSubscription:

suspend fun awaitResponse(cmd: Cmd, id: ID): Response = 
    sharedConnectionFlow.
        .onSubscription { connection.writeOrThrow(cmd) }
        .filter { it.id == id }.first()

Unlike a solution with async/await, this code can be also easily updated for cases when there are multiple responses and we need to return a flow of them. onSubscription provides a composable solution. We can have a separate function to setup a flow of responses:

fun responseFlow(cmd: Cmd, id: ID): Flow<Response> = 
    sharedConnectionFlow.
        .onSubscription { connection.writeOrThrow(cmd) }
        .filter { it.id == id }

And then use various terminal operators on it like responseFlow(cmd, id).first() or others.

Related Issues:

82
kotlinx.coroutines Introduce StateFlow
@erikc5000 @igorwojda Indeed the naming for constructor functions is quite a controversial issue her...
65
kotlinx.coroutines Compilation error on the androidTest configuration after updating to 1.3.6
I found 2 workarounds: Exclude the duplicated files: Exclude the kotlinx-coroutines-debug dependency...
61
kotlinx.coroutines Flow.shareIn and stateIn operators
πŸ“£ There is an important question on the design of sharing operators we need community help with ...
37
kotlinx.coroutines Default dispatcher and UI dispatcher support for iOS
@kamerok below is the implementation I'm currently using Updated for coroutines 1.0 and implemented ...
24
kotlinx.coroutines Replacing Java Timer with Kotlin Coroutine Timer
You are welcome to use your startCoroutineTimer solution but we don't plan to a function like startC...
24
kotlinx.coroutines Help newbies to handle exceptions in coroutines
You can switch to async instead of launch and use await instead of join This way exceptions will per...
18
kotlinx.coroutines Flow.collects receives items after it was cancelled on a single thread
I have replaced every single collect with a safeCollect function in my project: It would be great if...
17
kotlinx.coroutines Provide abstraction for cold streams
I question the very need of Single/Solo The typealias digression was just to demonstrate what this c...
14
kotlinx.coroutines Support runBlocking for UI Tests
It is already reverted in develop branch Will be part of the next build (tentatively 0.24.1). ...
13
kotlinx.coroutines [question] Is this "IllegalStateException: This job has not completed yet" while using runBlockingTest normal?
I replaced runBlockingTest on runBlocking and it helped. Why when executing the following test: The ...
7
kotlinx.coroutines Lifecycle handling
Is there any problem to allow the code to be excecuted on the main thread after onDestroy per se? Ac...
6
kotlinx.coroutines BroadcastChannel.asFlow().onStart(...) is invoked before the subscription is opened
UNDISPATCHED is predictable: Prints: 132 In the docs we have this example of onStart(...): Not just ...
4
kotlinx.coroutines java.lang.NoSuchMethodError: kotlinx.coroutines.SupervisorKt.SupervisorJob
Does your version of kotlinx-coroutines-test matches with the version of kotlinx-coroutines-core? ...
96
fastapi WARNING: Unsupported upgrade request.
This error is not part of the FastAPI codebase When attempting to run this (using UviCorn) it starts...
84
actix web actix-web 1.0
1.0.0-rc is released next release is 1.0 I am planing to move 0.7 to separate branch and move 1.0 to...
64
fastapi [QUESTION] How to bridge Pydantic models with SQLAlchemy?
I just finished integrating Pydantic ORM mode into FastAPI it is released as version 0.30.0 πŸŽ‰ The n...
52
fastapi [QUESTION] How to send 204 response?
Instead of returning None and instead of injecting the response just return a newly created response...
51
swoole src Compile Error on Mac Big Sur with PHP 8
I can get it work by manually symlink the required file. Please answer these questions before submit...
51
swoole src Swoole's admin interface hot-loads code from a third-party server ?
At this stage all the member's releases have to use matyhtf's PECL account which is ridiculous for a...
42
fastapi OpenAPI UI not working properly when using automatic swagger-ui CDN (swagger-ui-3.30.1)
Thanks for reporting it and for all the discussion here everyone! πŸš€ β˜• Indeed it's a bug in Swagger ...
38
aiohttp "RuntimeError: Event loop is closed" when ProactorEventLoop is used
I found another solution for this problem if some still having issues with it This involves directly...
34
RxGo RxGo v2
Hey My opinion about what should be part of RxGo v2 General Iterable should be moved to an interface...
34
fastapi [QUESTION] Is this the correct way to save an uploaded file ?
@classywhetten FastAPI has almost no custom logic related to UploadFile -- most of it is coming from...
34
fastapi [QUESTION] Storing object instances in the app context
@ebarlas you're 100% right Description In Flask ...
34
tortoise orm Migrations
Hey guys I'm excited to announce that now we have a migrate tool written by pure python and just for...
32
You Dont Know JS let hoisting?
Hoisting is not a real thing It's a made up concept Hi thanks for taking the time to write such a gr...
30
fastapi [QUESTION] aiohttp integration best practice
That is one way if you want create a new session for every request You can also use a singleton appr...
26
fastapi logs with FastAPI and Uvicorn
Doing : is exactly what I was looking for ! Thank you dbanty. Hello Thanks for FastAPI easy to use i...
22
fastapi [QUESTION] Client Credentials Flow openAPI UI
I think I found the solution for others looking to implement the code - tiangolo has already enabled...
21
react query How to use useInfiniteQuery with custom props
The issue here is that you are not mapping up your query key to your query function's arguments prop...
21
fastapi FastAPI 0.65.2 POST request fails with "value is not a valid dict" when using the Requests library; 0.65.1 works (with a caveat)
Can confirm this still happens! We solved it by adding a -H Content-Type: application/json to the cu...
19
fastapi [QUESTION] Using pydantic models for GET request query params? Currently not possible, have to use dataclasses or normal classes.
@LasseGravesen You would do it like this: Check the docs here: https://fastapi.tiangolo.com/tutorial...
18
aiohttp ssl.SSLError: [SSL: KRB5_S_INIT] application data after close notify (_ssl.c:2605)
For those looking for a work-around to at least silence these exceptions: the traceback seen is outp...
18
fastapi [QUESTION] about threads issue with fastapi.
Hello Hi I have a question about the threads issue with fastapi ...
17
ava Allow tests files to have any extension (i.e. .jsx, .ts)
I think what @sindresorhus is getting at is the following With the latest release(0.13) and the abil...
17
ava Only exclude helpers directory when inside a test directory
I feel like we're talking past each other Any chance the exclude rule for helpers can be relaxed so ...
17
ava Flow type definition
Okay so I was planning on just playing around with it but you nerd sniped me and I ended up doing th...
16
react query Array of queries hook
@tannerlinsley yeah Problem: I have a use case were a component would ideally consume a dynamic numb...
15
fastapi [FEATURE] support for rate-limit
I've taken a stab at adapting flask-limiter to starlette and FastAPI Is your feature request related...
14
react query Unable to type useQueries options or results without casting
Hey @matthewdavidrodgers! As @TkDodo has already indicated there's a PR open which looks to improve ...
14
fastapi [QUESTION] How can I serve static files (html, js) easily?
Hi in case a solution is still needed though the issue is closed Description How can I serve static ...
14
fastapi Using UploadFile and Pydantic model in one request
Oups sorry I forgot I made custom validator to transform str to json for Model: ...
13
fastapi How can I pass the configuration in the app?
Is the example I posted above not clear enough? Without going into all the nuances of everything my ...
13
vibe.d undefined reference to methods in ssl
Also try to install libssl1.0-dev - this solves same problem for me Good day I'm trying to generate ...
12
You Dont Know JS Does const declaration is subject to the Variable Hoisting?
This example isn't actually about hoisting at all You don't call foo() until after you've declared a...
12
react query Thoughts on mutate function not handling rejected promises
I came to this issue because I was following the docs and my try/catch wasn't catching even though m...
12
fastapi [BUG] openapi.json fails to be generated for nested models
oups sorry I think your mistake is putting response_model=SimilarProducts in the wrong spot it's in ...
11
opencv4nodejs fatal error: 'tesseract/baseapi.h' file not found
@GiulioPettenuzzo Let's say you receive output like this from your shell: The non-ideal ...
11
pyrogram Pyrogram v1.2.9 auth not working. [406 Not Acceptable]: [406 UPDATE_APP_TO_LOGIN]
This issue has been fixed You can upgrade with the usual command pip install -U pyrogram. ...
11
fastapi Debug Logging (Maybe just a n00b issue)
ok so basically I'm using this in a applog package you use it wherever your entrypint is this way sh...