SolvedRxJava How to keep an Observable alive after onError?

(I'm learning reactive)

My source:

  • I have a cold Observable produced by a third party library (retrofit)
  • when subscribed upon it perform an http call to a rest service, parse the data and provide the value
  • then it immediately onComplete
  • if there's an error it provide the error

This is what I want:

  • "helper" singleton
  • the helper provide an hot Observable for data read on a remote REST http service
  • any subscriber should receive the last valid value emitted or nothing if the call has not finished yet
  • onComplete should never be received
  • the helper has a trigger refresh method to cause a new http call
  • all subscribers should receive it automatically as a new event
  • when an error occur subscribers should be able to handle in in catch / onError callback
  • but they should keep receiving values after that
  • if a new subscription is performed after an error it should get the previous non-error result if any, the error otherwise (or both eventually)

I've been able to achieve almost all of this playing with Subjects and RxRelay library. The last two points, in bold, are the one I can't achieve.

In my helper class triggerRefresh method I use RxRelay with a PublishRelay and a ReplaySubject with capacity 1:

coldObservable
    .materialize()
    .subscribe(relay);
relay
    .filter(notification -> notification.getKind() != Notification.Kind.OnCompleted)
    .dematerialize()
    .subscribe(subject);

I execute this code everytime I need to performe a new http call, I also keep a reference to the two subscription and unsubscribe them before doing it (I just didn't included that part of the code).

as hot observable I simply pass subject.asObservable();

It works as I want unless an error happens.

When an error occur all the Observer stop receiving events even if I trigger a refresh.

How should I handle this kind of situation with rx?

19 Answers

✔️Accepted Answer

The pattern for this abersnaze is referring to is (swallowing all errors, you can also log etc inside the flatmap using either doOnError(...) or in the onErrorResumeNext(...)):

observableThatShouldNotCompleteOnError
  .flatMap(
    e ->
      obsrevableThatMayThrow(e)
        .onErrorResumeNext(Observable.empty())
  )

By trapping the error in the inner observable, all the outer subscription sees is the non-error cases so never terminates.

Other Answers:

I'm facing the same issue, I feel like this is a fairly common use case. Take a simple stream of click events that you flatMap to a stream of something you get from the network using Retrofit.
OnError I'd like to inform my view that something is wrong, but I would like my button to continue triggering network calls.
In a more complex scenario, if I have multiple subscribers observing that observable, they will all stop working after an onError, I'll have to create a new observable and register all my subscriber again.

Before the Observable is returned from the flatMap add on an .onError...(...). It's like the try/catch of Rx.

That's exactly it! Thank you both! @abersnaze @jamesgorman2

More Issues: