SolvedRxGo RxGo v2

This issue is a placeholder to discuss the future v2 release.

I know for example migrating Observable, Observer etc. should be part of the v1 according to this card: https://github.com/ReactiveX/RxGo/projects/1#card-6433015

Yet, if we do that it will not be backward compatible anymore with the existing API. For example, an user cannot wait anymore on the end of an Observable by doing a:

<-myObservable

Regarding the v2, I've started to implement few stuf which are part of this PR: #95. In a nutshell, it contains a migration of the main types into interfaces, a creation of new types (Optional, Single etc.) and new operators. Is this PR going in the right direction for the v2?

What else should we include?

Furthermore, what are the plans for the v2 in terms of scheduling? Shall we release something before or after Go 2?

16 Answers

✔️Accepted Answer

Hey,

My opinion about what should be part of RxGo v2.

General

  • Iterable should be moved to an interface. That's the latest type we need to move.
  • Generalize the management of the operator options (introduce in Subscribe for example)
  • Improve the assertion API started in #109 (extending the capabilities, including and enriching the ObserverMock etc.

Creating Observables

  • Migration of the Create operator: The signature should comply with Rx standards. Moreover, there's now a mechanism to dispose of a subscription. It's not necessary to do something specific for this operator.
  • Today, there's a big difference in terms of behavior with other Rx libraries. If we do a:
just := Just(1, 2, 3)
just.Subscribe(...)
just.Subscribe(...)

The second observer is going to receive nothing. This is because if an Observable is created from a slice, every element is sent to a channel and then received by one Observer. Then, the elements are somehow lost.
We need to implement something smart to handle this use-case (a new interface) because it could somehow overlap with the capability to retry an Observable created by a channel for example.

  • New Timer operator

Transforming Observables

  • New Buffer operator
  • New GroupBy operator
  • New Window operator

Filtering Observables

  • New Debounce operator
  • New IgnoreElements operator
  • New Sample operator

Combining Observables

  • New CombineLatest operator
  • New Join operator
  • New Merge operator
  • New StartWith operator
  • New Switch operator

Error Handling Operators

  • New Retry operator (Retry, RetryUntil, RetryWhen): See my comment above.

Observable Utility Operators

  • New CombineLatest operator
  • Do operator: Today only DoOnEach is implemented
  • New Materialize/Dematerialize operator
  • New ObserveOn operator
  • New Serialize operator
  • New SubscribeOn operator
  • New TimeInterval operator
  • New Timeout operator
  • New Timestamp operator + new type to handle timestamped items
  • New Using operator

Conditional and Boolean Operators

  • New Amb operator
  • New SequenceEqual operator
  • New TakeUntil operator

Mathematical and Aggregate Operators

  • New Average operator
  • New Concat operator
  • New Max operator
  • New Min operator
  • New Sum operator

Backpressure

  • We need to find a way to implement a first version of backpressure. It is not easy at all in my opinion as today we heavily rely on channels and this would be impossible to handle different strategy types like these ones. It has to be designed and implemented cautiously.

Operators to Convert Observables

  • To operator: we miss some implementations (ToSortedList etc.)

Documentation

  • Code documentation
  • README improvement to give examples (as described by @avelino in #114)
  • Adding more concrete examples (in /examples)

Then we should improve the visibility of RxGo and the integration with ReactiveX documentation. For example, we need to find RxGo in such pages: http://reactivex.io/documentation/operators/to.html.
Maybe, we could also communicate more about RxGo, about the concepts, about the companies which are using it etc.

Other Answers:

It is a great pleasure to revise your contribution (#95), thanks for the great dedication of hours.

My idea is to release version 2 of RxGo before Go 2.
After the launch of Go 2, we see if it will be necessary to create a new version of RxGo or we can leave retro compatible for Go 1 and 2.

Tell Me what you think.

I just released v2.0.0-beta.1: https://github.com/ReactiveX/RxGo/releases/tag/v2.0.0-beta.1
In total, the v2 contains 87 different operators.

Installation

go get -u github.com/reactivex/rxgo/v2@v2.0.0-beta.1

Release

General

  • Hot vs cold observable (see readme)
  • Backpressure management (see readme)
  • Lazy vs eager observation (see readme)
  • Sequential vs parallel operators (see readme)

New Operators

  • All
  • Amb
  • Average
  • BackOffRetry
  • Buffer
  • Catch
  • CombineLatest
  • Concat
  • Contains
  • Count
  • Debounce
  • DefaultIfEmpty
  • Defer
  • ElementAt
  • Error
  • FirstOrDefault
  • FromEventSource
  • GroupBy
  • IgnoreElements
  • Interval
  • LastOrDefault
  • Marshal
  • Max
  • Merge
  • Min
  • Never
  • Range
  • Reduce
  • Retry
  • Run
  • Sample
  • Scan
  • SequenceEqual
  • Send
  • Serialize
  • SkipWhile
  • StartWith
  • Sum
  • TakeUntil
  • TakeWhile
  • TimeInterval
  • Timestamp
  • Thrown
  • Timer
  • ToMap
  • ToMapWithValueSelector
  • ToSlice
  • Unmarshal
  • Window
  • ZipFromIterable

Make sure to check the documentation (develop branch) and to raise any issues you'd find out!

v2.0.0-beta-2 published: https://github.com/ReactiveX/RxGo/releases/tag/v2.0.0-beta.2

In a nutshell:

  • Emphasis on the documentation (update of the README, /doc folder containing the documentation for each operator)
  • Connectable Observable (see README documentation)
  • Serialize option. To be used in coordination with rxgo.WithPool(n) option.
    Basically, if we want for example to spin up n goroutines to handle a map operation but guarantee that the items will still be emitted sequentially, we can do this:
Map(func(_ context.Context, item interface{}) (interface{}, error) {
    // Map implementation
},
    // Create multiple instances of the map operator
    rxgo.WithPool(pool),
    // Serialize the items emitted by their Customer.ID
    rxgo.Serialize(func(item interface{}) int {
        customer := item.(Customer)
        return customer.ID
    }), rxgo.WithBufferedChannel(1))
  • New Join operator (thanks to @v-zubko)
  • Update of the Just operator for more syntactic sugar (allows to pass variadic arguments for the items and the options)

Example:

rxgo.Just(1, 2, 3)(rxgo.WithContext(ctx), rxgo.WithBufferedChannel(5))
  • Couple of fixes (missing context propagation mainly) and minor changes
  • Improvements of the unit tests (using, for example, a deterministic time API; see #233 (comment))

More Issues: