Solvedkotlinx.coroutines Flow.shareIn and stateIn operators

This issue supersedes #1261 and is based on the SharedFlow #2034 and StateFlow #1973 framework. See #2034 for most of the conceptual details on shared flows.

Introduction

A MutableSharedFlow provides convenient means to own a shared flow of values that other parts of code can subscribe to. However, it is often convenient to take an existing cold Flow that is defined in some other piece of code and start sharing it by launching a coroutine that collects this upstream flow and emits into the shared flow.

shareIn operator

The shareIn operator is introduced:

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0,
): SharedFlow<T>

It has the following parameters:

  • scope- a coroutine scope in which the sharing coroutine is launched.
  • started - a strategy that controls when sharing is started and stopped.
  • replay - a number specifying how many values are replayed for each new subscriber (defaults to zero β€” no replay).

Starting sharing

There are the following out-of-the-box implementations of SharingStarted:

  • SharingStarted.Eagerly - start sharing coroutine immediately.
  • SharingStarted.Lazily - start sharing after the first subscriber appears and keep on going forever.
  • SharingStarted.WhileSubscribed() - maintain sharing coroutine only while there is at least one subscriber (start when the first one appears, stop when the last one disappears). It keeps the replay cache forever when subscribers disappear by default.

The WhileSubscribed strategy is a function with optional parameters:

fun SharingStarted.Companion.WhileSubscribed(
    stopTimeoutMillis: Long = 0, 
    replayExpirationMillis: Long = Long.MAX_VALUE
)
  • stopTimeoutMillis - how long to wait (in ms) before stopping sharing after the number of subscribers becomes zero.
  • replayExpirationMillis - how long to wait (in ms) after stopping sharing before resetting the replay cache.

All the above values and WhileSubscribed function are defined in SharingStarted companion object. This way, additional values/functions can be defined as extensions. Variants of WhileSubscribed function that work with different representations of duration (java.time.Duration, kotlin.time.Duration, Long, TimeUnit) will be defined by the library in the appropriate modules.

stateIn operator

As StateFlow is a specialized version of SharedFlow, stateIn operator is a specialized version of shareIn operator. It does not have a replay parameter (it is always equal to 1 for the state flow) and has a required initialValue:

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

When execution happens in suspending context and you want to compute and wait for the initial value of the state to arrive from the upstream flow, there is a suspending variant of stateIn without initial value and with the hard-coded sharingStarted = Eagerly:

suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>

Custom starting strategies

SharingStarted is an interface that supports 3rd-party implementations, allowing any starting strategy to be plugged into the sharing operators:

interface SharingStarted {
    fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}

An implementation of SharingStarted provides a single function command that transforms the subscriptionCount of the shared flow into the flow of commands that control sharing coroutine and are represented with SharingCommand enum:

enum class SharingCommand { START, STOP, STOP_AND_RESET_REPLAY_CACHE }

Error handling

Any error in the upstream flow cancels the sharing coroutine and resets the buffer of the shared flow. The error is delivered to the scope. If this behavior is not desirable, then error-handling operators (such as retry and catch) should be applied to the upstream flow before shareIn operator. If the upstream completes normally, then nothing happens.

Conceptual implementation

The conceptual implementation of shareIn operator is simple:

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val upstream = this
    val shared = MutableSharedFlow<T>(
        replay = replay, 
        extraBufferCapacity = maxOf(replay, DEFAULT_BUFFER_SIZE) - replay, 
    )
    scope.launch { // the single coroutine to rule the sharing
        started.command(shared.subscriptionCount)
            .distinctUntilChanged()
            .collectLatest { // cancels block on new emission
                when (it) {
                    SharingCommand.START -> upstream.collect(shared) // can be cancelled
                    SharingCommand.STOP -> { /* just cancel and do nothing else */ }
                    SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> shared.resetReplayCache()
                }
            }
    }
    return shared
}

Note, that a buffer is padded to some minimal default capacity (64) by default for performance reasons.

Operator fusion

The actual implementation of shareIn operator is more complex. It fuses with the immediately preceding flowOn operators, directly launching the sharing coroutine in the corresponding context (without additional coroutine and channel to change the context).

It also fuses with the immediately preceding Flow.buffer operators. It allows for explicit configuration of the buffer size, creating a shared flow that takes a configured buffer size into account:

  • buffer(0).shareIn(scope, 0, started) - creates shared flow with extraBufferCapacity = 0. It overrides a default buffer size and thus configures full rendezvous between upstream emitter and subscribers (emitter is suspended until all subscribers process the value).
  • buffer(b).shareIn(scope, started, r) - creates shared flow with replay = r and extraBufferCapacity = b.
  • conflate().shareIn(scope, started, r) - creates shared flow with replay = r, extraBufferCapacity = 1 when replay == 0, and onBufferOverflow = DROP_OLDEST.

Note, that the last fusion of conflate().shareIn(scope, started, r) also flounders with the true spirit of fusion concept. It produces execution results that are different from the case of separate sequential application of confate and shareIn operators without fusion. For example, take an upstream flow that emits consecutive numbers starting from 1 with delay of 100ms between numbers and slow downstream subscriber taking 530ms for each value. A conflate() followed by buffer-1 shareIn without fusion collects values 1, 2, 3, 6, 11, 16, etc (three initial values collected due to double-buffering by conflation and sharing) and would buffer even more initial values if a default buffer size was used. But a fused implementation will have a single small buffer and collects 1, 6, 11, 16, etc just as expected from a separate conflate() without sharing. Fused buffer decreases the overall latency, processing the number 6 asap at 1060ms mark after start, as opposed to non-fused version where the number 6 would be collected at 2120ms mark. We consider this to be a good thing.

Implementation

Implementation is in PR #2069.

36 Answers

βœ”οΈAccepted Answer

πŸ“£ There is an important question on the design of sharing operators we need community help with. We need to figure out what should be the default behavior of SharingStarted.WhileSubscribed with respect to cache reset. We feel that the default should be based on the most common usage.

The question only matters for an upstreamFlow.shareIn(scope, replay) where replay >= 1 or for stateIn operator (where replay is always implicitly equal to one). The intended use for started = WhileSubscribed is a case when an upstream flow is expensive to maintain (uses network resources, device connection, or something like that) and you want to maintain a running upstream flow only when there is at least one downstream subscriber present. Now, when all subscribers disappear, we have two variants of behavior to pick as our default:

πŸ‘ Immediately reset the cache to the initial value (if it was specified; clear if it was not) so that the next time subscribers appear they will not get stale value(s) from the previous upstream flow collection but will receive some kind of initial value (or an empty flow) to explicitly tell them to wait while upstream emits anything (establishes network connection and gets data, etc)

πŸš€ Keep the last value(s) emitted by the upstream so that the next time subscribers appear they will immediately get previously emitted last value(s) without having to wait until upstream flow emits anything.

Let's do quick poll: What do you use in your code most often and want to see as a default?

Other Answers:

πŸ“£ UPDATE: PR is now done. Additional changes in the design:

  • initialValue support is dropped from shareIn operator. You can always use onStart { emit(initialValue) } on the upstream flow to mimic it if needed, this snippet is also added to the docs.
  • Replay cache is kept intact on completion or failure of the sharing coroutine, which simplifies the conceptual implementation of shareIn operator. If a special action is need on completion it can be configured with onCompletion operator. The example is given in the shareIn docs.

I can't wait to use this, I hope it comes out soon like how stateflow came out so quick following it's GitHub issue.

Any updates on when this might come out?

But move .onStart before the cache and you don't have that problem. The whole point of the cache is so you get the most recent value. If you're deliberately subverting the cache, the problem isn't the cache behavior it's your order of operations. The state machine which starts with the loading state is an upstream source of states from the cache.

More Issues: