Can conditional state be used in RxJava Observable streams?

Few examples of RxJava contain conditional operations on a stream that involve state transitions based on stream content. Sure there are operators such as filter and the rest, but since examples are “pure”, that is, using pure functions, there is no setting of state that can influence the next event.

It doesn’t seem that this is part of the use-case of reactive streams. At least, that is my current conclusion. I hope it is incorrect. If not then a large type of use-case is not amendable to Reactive approach.

Here is the general problem statement: A stream consists of items x. If the current x has a property p set, then subsequent x will be skipped if they are related to that first x. Note that those skipped x could also have the same property set. This scenario could be generalized to specify an operation on a subgroup besides skipping.

A concrete example of this ‘pattern’ is found at Prune deleted folders of repo diff report using Java. IN that blog post, the x are the output of a Subversion diff report, and the conditional property is folder deletion. We want to prune deleted subfolders.

In Listing 1 below, I solve this with Java 8 streams in method pruneDeletedFoldersJavaStream(List). I also solve it using RxJava in method pruneDeletedFoldersRx(List). However, the Rx approach is just a copy of the Java 8 streams approach, but just using the filter operator. A State object model is used in both since Java 8 requires final fields.

Can this be implemented with the use of Rx operators and no explicit state handling? I tried many approaches, even use of BehaviorSubject, to no avail. I don’t even know how to phrase the question on Stackoverflow.

Listing 1, Full Source


4 thoughts on “Can conditional state be used in RxJava Observable streams?”

  1. Not sure what you call `no explicit state handling` but you can use the `scan` operator to simulate a state machine. Something like `input$.scan((state, input) -> new_state, initial_state)` (this is javascript, have a look at the doc for RxJava). So your state object would live and be nicely encapsulated inside the `scan` operator.

Leave a Reply

Your email address will not be published. Required fields are marked *