Introduction to Data Streaming with RxJava

Dharesh Vadalia
6 min readSep 16, 2020

--

Async processing of data streams was never so easy before…

Introduction to Reactive eXtension

Erik Meijer, a computer scientist at Microsoft developed a reactive programming framework in .NET around 2010. Later was extended for multiple platforms including JS, Python, C++, Java and more. In November 2014, RxJava 1.0 library was released in association with Netflix followed with RxJava 2.0 in 2016. Later to which, ReactiveX quickly evolved as a cross-language standard popularising reactive programming in the IT industry.

In a very simple terms, reactive programming is a concept for asynchronous processing of data streams which envelopes principles of event-driven and message-driven programming where values of the data points change with time to which the consumer/subscriber of that data stream react as data is received.

Why Reactive Programming?

Reactive programming overcomes the limitation of traditional imperative programming to meet the demand of highly scalable, resilient and elastic system with minimum response time. Reactive programming follows functional style programming approach which allows transformation and manipulation of data streams on go. Like streams can be merged, filtered, mapped, zip and more. Implementation of reactive systems follows “Reactive Manifesto” describing the important features for design, which includes:

  • Responsiveness: responding in a timely manner
  • Resilient: staying responsive also in failure situations
  • Elastic: staying responsive under varying workload
  • Message Driven: relying on asynchronous message passing

Async and non-blocking model of RX makes it more responsive, tolerant to failures and capable of processing huge volumes of data quickly as it requires only small number of threads to scale.

Where and when should I use reactive programming?

Reactive programming is most suitable for application with high-load and multiple users such as social networking, gaming and video and audio streaming applications. It can also be used for real time data streaming such as IOT applications, Artificial Intelligence & Machine Learning components in an application, Load balancers and highly interactive UI elements. Using Rx library, application events can be designed as collection of data rather than using series of callbacks as in traditional programming style. Developer does not need to worry about threading while using RxJava.

Lets understand this with a real life example, Morris wants to throw a house party tonight but before that he will need to send invitation, clean the house and arrange for food. If Morris plans to perform all this activity in sync manner. He will first send invites, next he will clean the house and last he will arrange for food and enjoy his party. In this case, Morris starts each task after completion of previous task. There are chances that guest may arrive before arrival of food causing problems.

Whereas if the he uses a reactive async approach, Morris will send invitations while returning home from work. On reaching home will order food and start cleaning the house. Till the time he finishes cleaning, food will arrive. In this case, Morris performs multiple task in parallel, completing all tasks and saves enough time before arrival of guests. Morris can now wait for guest to arrive and enjoy food with them. Reactive programming allows waiting for async actions to complete to proceed with next event.

Fundamentals of RxJava

RxJava is a framework of ReactiveX library implemented for java platform composing async and event-based processing of observable streams. It is licensed under Apache 2.0. Building blocks of RxJava are Observables, Operators and Observers.

Fundamental building blocks for RxJava

Three O’s of RxJava:

  1. Observable: They act as a source of data for emitting items. They usually emits data only if someone subscribes to it. An observable can have any number of subscribers.
  2. Operator: These are the functions that transforms data received from Observables before pushing to Subscriber or another Observable.
  3. Observer: They are also called as subscribers, who consumes data items emitted by Observables. If any new item is emitted by Observable, onNext() method is called on each subscriber. On successful completion of data stream, onComplete() method is called. Similarly onError() method is called in case of termination due to error.

Another important component of RxJava is Schedular, it allows adding threads to Observables and Subscribers.

RxJava over Java Stream and Flow API

RxJava is functions as a push-model in contrast to the Iterators which exhibits normal pull-based behaviour. RxJava, along with observing the changes, also signals subscriber of completion or error during processing of event.

In Java 8, Stream API is introduces which encapsulates a wrapper around source entity and after terminal operator, it will traverse over source entity and then returns the result. In Java 9, Reactive Streams was adopted by the Flow Api. Unlike RxJava, the purpose of the Flow API is to act as an interoperation specification and not an end-user.

Below table highlights the differences between RxJava and Java Stream API.

Source: http://www.reactiveworld.net/2018/04/29/RxJava-vs-Java-Stream.html

Getting started with RxJava

Before we start with implementing, libraries of RxJava needs to be configured in your java project.

  1. Configure RxJava to your Java Project

For Maven project, add RxJava library using following snippet in pom.xml file under <dependencies> tag.

<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.6</version>
</dependency>

For Gradle project details can be found on Maven Central Repository.

2. Creating Observables and subscribing to it

There are various types of Observables, each has its specific use cases.

Types of Observables in RxJava

For example, Flowable is used to process touch events. However, it is not possible to control the user from producing the touch events but using Flowable we can limit the rate of processing events in case application cannot process events at rate it is being produced.

Following shows a basic example on how to create an Observable and subscribing to it.

In above example, we have created a method workingWithObserver() in which an Observable is created using just(). An Observer is created to subscribe the Observable. Once an Observable is subscribed it starts emitting the stream of data, onNext() signals subscriber when new item is emitted. On completion subscriber is signalled using onComplete() function and similarly in case of error onError() is called.

There are several other convenience methods for creating Observables.

  • Observable.just("Hi"): creates an observable as wrapper around values of different data types
  • Observable.fromIterable(): takes an iterating list and emits their values in their order in the data structure
  • Observable.fromArray(): takes an array and emits their values in their order in the data structure
  • Observable.fromCallable():creates an observable for a java.util.concurrent.Callable<V>
  • Observable.fromFuture(): creates an observable for a java.util.concurrent.Future
  • Observable.interval(): emits Long objects in a given interval

Conclusion

Reactive programming have made handling of async processing of data streams simpler and clean. Event-driven and non-blocking model of RxJava makes it more resilient and responsive. RxJava is better at error-handling and its flexible and scalable process flow helps in avoiding “out of memory” problems.

References

  1. ReactiveX
  2. Github RxJava
  3. Reactive Manifesto
  4. Reactive Stream
  5. JavaSE Flow API

--

--

No responses yet