Solvedkotlinx.coroutines Provide abstraction for cold streams

All the currently provided channel abstractions in kotlinx.coroutines are hot. The data is being produced regardless of the presence of subscriber. This is good for data sources and applications that are inherently hot, like incoming network and UI-events.

However, hot streams are not an ideal solution for cases where data stream is produced on demand. Consider, for example, the following simple code that produces ReceiveChannel<Int>:

produce<Int> { 
    while (true) {
        val x = computeNextValue()
        send(x)
    } 
}

One obvious downside is the computeNextValue() is invoked before send, so even when receiver is not ready, the next value gets computed. Of course, it gets suspended in send if there is no receiver, but it is not as lazy as you get with cold reactive Publisher/Observable/Flowable/Flux/Flow.

We need the abstraction for cold streams in kotlinx.coroutines that is going to be just as lazy, computing data in "push" mode versus "pull" mode of hot channels that we have now.

There are the following related discussions:

  • reactor/reactor-core#979 (comment) describes preliminary performance test that indicates that "push" mode is much faster for same-thread cases.
  • #113 (SPSC channels) seems to get superseded by the support of cold streams.
42 Answers

✔️Accepted Answer

I question the very need of Single/Solo. The typealias digression was just to demonstrate what this concept actually does. I question the very use-cases for Single/Solo. Why would you ever use it? Why would you need a dedicated type to denote a reference to a computation that reruns every time you ask for it? Why would you need this level of indirection? Your code will be much easier to understand if you use suspending functions directly.

To illustrate. Instead of:

interface A { fun doSomething(): Single<T> } 

write

interface A { suspend fun doSomething(): T }

Instead of:

fun foo() = // return Single<R>
    doSomething().map { it.transform() }

do

suspend fun foo() = // return R
    doSomething().transform()  

you can continue this example with flatMap, etc, etc. The code that does not use Single/Solo will be invariably more direct and easier to understand.

On the other hand, we need a dedicated abstraction for asynchronous streams of values simply because there is no other way to represent it in Kotlin, but via a separate type.

Other Answers:

@pull-vert The mental model is simple. There are no cold operators. You just "setup your pipeline" (that is what cold operators do) that is later executed by the terminal operator. Since every chain of operators ultimately ends in a terminal operator, you don't really care what happens in between. It just works.

With respect to the kind of operators we need to provide, we have huge advantage over reactive libraries because we only need to define basic ones like map and filter. For example, there is no need to have ready-to-use delayEach(time) in the library, because it can be trivially implemented with .map { delay(time); it }

@pull-vert I'm not a big fan of Single-like abstraction. Solo is great name, btw, but the if we include it in the library, then we'll have three slightly different ways to asynchronously perform an operation that returns a value:

  • async { something } -- starts computation in background immediately to produce result later.
  • async(start = CoroutineStart.LAZY) { something } -- starts computation only when it is requested, sharing result with subsequent requests.
  • solo { something } -- starts computation when it is requested and every time it is requested do it again from scratch.

They are all different ways, but do we really need an abstraction for that last one? Consider this definition:

typealias Solo<T> = suspend () -> T

Isn't this functional type the Solo we are looking for? Do we really need to give it some other name like Solo? We can always declare all the extensions we might need directly on top of that suspend () -> T functional type.

Let me also quote Eric Meijer's tweet here: https://twitter.com/headinthebox/status/971492821151580160

Hi,
I was writing a kotlinx-coroutine reactive (=cold) library based on reactive-streams. But following Elizarov advice on #201 I switched to a pure kotlin suspend version, strongly inspired by your StreamBenchmarks project.
Here is the library : Reactivity

It provides 2 cold producers (Reactor inspiration) :

  • Multi for multi values
  • Solo for single value

As they are reactive objects, they start producing items only when a client calls a Terminal (final/consuming) operation function.

Reactivity is a multiplatform project, with common and platform specific tests and extensions. Providing for example platform specific Solo.toPromise in JS, Solo.toCompletableFuture or Stream.toMulti in JVM (in JDK8 project).
There are only a few operators right now (map, filter, delay) but they can be easily added as they are very simple.

I would be really happy if Reactivity can save time, or serve as source of inspiration for this issue

The biggest issue with Channel<T> in my experience is that .openSubscription() has to be called before operators can be applied. I personally really liked LiveData<T>'s approach.

It shipped with LiveData<T>, MutableLiveData<T>, and MediatorLiveData<T>, and two transformations, .map() and .switchMap{ }.

Implementing custom operators is easy, just add an extension function.

Unfortunately, LiveData<T> falls completely flat because it's not thread safe and highly coupled to the main thread.

Ultimately, my use case is that I need a lightweight RxJava for scenarios where I can't use RxJava but want a reactive-state based architecture, and I am hoping co-routines can solve this problem.

More Issues: