1. Información general
En este artículo, nos centraremos en el uso de Reactive Extensions (Rx) en Java para componer y consumir secuencias de datos.
De un vistazo, la API puede parecer similar a Java 8 Streams, pero de hecho, es mucho más flexible y fluida, lo que la convierte en un poderoso paradigma de programación.
Si desea leer más sobre RxJava, consulte este artículo.
2. Configuración
Para usar RxJava en nuestro proyecto Maven, necesitaremos agregar la siguiente dependencia a nuestro pom.xml:
io.reactivex rxjava ${rx.java.version}
O, para un proyecto de Gradle:
compile 'io.reactivex.rxjava:rxjava:x.y.z'
3. Conceptos reactivos funcionales
Por un lado, la programación funcional es el proceso de crear software componiendo funciones puras, evitando estados compartidos, datos mutables y efectos secundarios.
Por otro lado, la programación reactiva es un paradigma de programación asincrónica que se ocupa de los flujos de datos y la propagación del cambio.
En conjunto, la programación reactiva funcional forma una combinación de técnicas funcionales y reactivas que pueden representar un enfoque elegante para la programación impulsada por eventos, con valores que cambian con el tiempo y donde el consumidor reacciona a los datos a medida que ingresan.
Esta tecnología reúne diferentes implementaciones de sus principios básicos, algunos autores elaboraron un documento que define el vocabulario común para describir el nuevo tipo de aplicaciones.
3.1. Manifiesto reactivo
El Reactive Manifesto es un documento en línea que establece un alto estándar para las aplicaciones dentro de la industria del desarrollo de software. En pocas palabras, los sistemas reactivos son:
- Responsive: los sistemas deben responder de manera oportuna
- Controlado por mensajes: los sistemas deben utilizar el paso de mensajes asíncrono entre componentes para garantizar un acoplamiento flexible
- Elástico: los sistemas deben seguir respondiendo bajo cargas elevadas
- Resiliente: los sistemas deben seguir respondiendo cuando algunos componentes fallan
4. Observables
Hay dos tipos de claves que debe comprender al trabajar con Rx:
- Observable representa cualquier objeto que puede obtener datos de una fuente de datos y cuyo estado puede ser de interés de manera que otros objetos puedan registrar un interés.
- Un observador es cualquier objeto que desea ser notificado cuando cambia el estado de otro objeto.
Un observador se suscribe a una secuencia observable . La secuencia envía elementos al observador de uno en uno.
El observador maneja cada uno antes de procesar el siguiente. Si muchos eventos entran de forma asincrónica, deben almacenarse en una cola o descartarse.
En Rx , nunca se llamará a un observador con un artículo fuera de servicio o antes de que se haya devuelto la llamada del artículo anterior.
4.1. Tipos de observables
Hay dos tipos:
- Sin bloqueo: se admite la ejecución asíncrona y se permite cancelar la suscripción en cualquier punto del flujo de eventos. En este artículo, nos centraremos principalmente en este tipo de
- Bloqueo: todas las llamadas de observadores de onNext serán sincrónicas y no es posible cancelar la suscripción en medio de un flujo de eventos. Siempre podemos convertir un Observable en un Blocking Observable , usando el método toBlocking:
BlockingObservable blockingObservable = observable.toBlocking();
4.2. Operadores
Un operador es una función que toma un O bservable (la fuente) como primer argumento y devuelve otro Observable (el destino). Luego, para cada elemento que emite el observable de origen, aplicará una función a ese elemento y luego emitirá el resultado en el Observable de destino .
Los operadores se pueden encadenar para crear flujos de datos complejos que filtran eventos en función de determinados criterios. Se pueden aplicar varios operadores al mismo observable .
No es difícil entrar en una situación en la que un Observable emite elementos más rápido de lo que un operador u observador puede consumirlos. Puede leer más sobre la contrapresión aquí.
4.3. Crear observable
El operador básico solo produce un Observable que emite una única instancia genérica antes de completar, la Cadena "Hola". Cuando queremos obtener información de un Observable , implementamos una interfaz de observador y luego llamamos a subscribe en el Observable deseado :
Observable observable = Observable.just("Hello"); observable.subscribe(s -> result = s); assertTrue(result.equals("Hello"));
4.4. OnNext, OnError y OnCompleted
Hay tres métodos en la interfaz del observador que queremos conocer:
- Nuestro observador llama a OnNext cada vez que se publica un nuevo evento en el Observable adjunto . Este es el método en el que realizaremos alguna acción en cada evento.
- OnCompleted se llama cuando la secuencia de eventos asociados con un Observable está completa, lo que indica que no debemos esperar más llamadas onNext en nuestro observador
- Se llama a OnError cuando se lanza una excepción no controlada durante el código del marco RxJava o nuestro código de manejo de eventos
El valor de retorno del método de suscripción de Observables es una interfaz de suscripción :
String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); observable.subscribe( i -> result += i, //OnNext Throwable::printStackTrace, //OnError () -> result += "_Completed" //OnCompleted ); assertTrue(result.equals("abcdefg_Completed"));
5. Transformaciones observables y operadores condicionales
5.1. Mapa
El operador de mapa transforma los elementos emitidos por un Observable aplicando una función a cada elemento.
Supongamos que hay una matriz declarada de cadenas que contiene algunas letras del alfabeto y queremos imprimirlas en modo mayúscula:
Observable.from(letters) .map(String::toUpperCase) .subscribe(letter -> result += letter); assertTrue(result.equals("ABCDEFG"));
FlatMap se puede usar para aplanar Observables siempre que terminemos con Observables anidados .
Puede encontrar más detalles sobre la diferencia entre map y flatMap aquí.
Suponiendo que tenemos un método que devuelve un Observable de una lista de cadenas. Ahora imprimiremos para cada cadena de un nuevo Observable la lista de títulos en función de lo que ve el Suscriptor :
Observable getTitle() { return Observable.from(titleList); } Observable.just("book1", "book2") .flatMap(s -> getTitle()) .subscribe(l -> result += l); assertTrue(result.equals("titletitle"));
5.2. Escanear
El operador de escaneo aplica una función a cada elemento emitido por un Observable secuencialmente y emite cada valor sucesivo.
Nos permite transferir el estado de un evento a otro:
String[] letters = {"a", "b", "c"}; Observable.from(letters) .scan(new StringBuilder(), StringBuilder::append) .subscribe(total -> result += total.toString()); assertTrue(result.equals("aababc"));
5.3. Agrupar por
Agrupar por operador nos permite clasificar los eventos en la entrada Observable en categorías de salida.
Supongamos que creamos una matriz de números enteros de 0 a 10, luego aplicamos group by que los dividirá en categorías pares e impares :
Observable.from(numbers) .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") .subscribe(group -> group.subscribe((number) -> { if (group.getKey().toString().equals("EVEN")) { EVEN[0] += number; } else { ODD[0] += number; } }) ); assertTrue(EVEN[0].equals("0246810")); assertTrue(ODD[0].equals("13579"));
5.4. Filtrar
El filtro de operador emite solo aquellos elementos de un observable que pasan una prueba de predicado .
Así que filtremos en una matriz de enteros para los números impares:
Observable.from(numbers) .filter(i -> (i % 2 == 1)) .subscribe(i -> result += i); assertTrue(result.equals("13579"));
5.5. Operadores condicionales
DefaultIfEmpty emite un elemento de la fuente Observable , o un elemento predeterminado si la fuente Observable está vacía:
Observable.empty() .defaultIfEmpty("Observable is empty") .subscribe(s -> result += s); assertTrue(result.equals("Observable is empty"));
El siguiente código emite la primera letra del alfabeto ' a' porque la matriz de letras no está vacía y esto es lo que contiene en la primera posición:
Observable.from(letters) .defaultIfEmpty("Observable is empty") .first() .subscribe(s -> result += s); assertTrue(result.equals("a"));
El operador TakeWhile descarta los elementos emitidos por un Observable después de que una condición especificada se vuelve falsa:
Observable.from(numbers) .takeWhile(i -> i sum[0] += s); assertTrue(sum[0] == 10);
Por supuesto, hay más operadores que podrían cubrir nuestras necesidades como Contain, SkipWhile, SkipUntil, TakeUntil, etc.
6. Observables conectables
A ConnectableObservable resembles an ordinary Observable, except that it doesn't begin emitting items when it is subscribed to, but only when the connect operator is applied to it.
In this way, we can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items:
String[] result = {""}; ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(i -> result[0] += i); assertFalse(result[0].equals("01")); connectable.connect(); Thread.sleep(500); assertTrue(result[0].equals("01"));
7. Single
Single is like an Observable who, instead of emitting a series of values, emits one value or an error notification.
With this source of data, we can only use two methods to subscribe:
- OnSuccess returns a Single that also calls a method we specify
- OnError also returns a Single that immediately notifies subscribers of an error
String[] result = {""}; Single single = Observable.just("Hello") .toSingle() .doOnSuccess(i -> result[0] += i) .doOnError(error -> { throw new RuntimeException(error.getMessage()); }); single.subscribe(); assertTrue(result[0].equals("Hello"));
8. Subjects
A Subject is simultaneously two elements, a subscriber and an observable. As a subscriber, a subject can be used to publish the events coming from more than one observable.
And because it's also observable, the events from multiple subscribers can be reemitted as its events to anyone observing it.
In the next example, we'll look at how the observers will be able to see the events that occur after they subscribe:
Integer subscriber1 = 0; Integer subscriber2 = 0; Observer getFirstObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber1 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber1 completed"); } }; } Observer getSecondObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber2 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber2 completed"); } }; } PublishSubject subject = PublishSubject.create(); subject.subscribe(getFirstObserver()); subject.onNext(1); subject.onNext(2); subject.onNext(3); subject.subscribe(getSecondObserver()); subject.onNext(4); subject.onCompleted(); assertTrue(subscriber1 + subscriber2 == 14)
9. Resource Management
Using operation allows us to associate resources, such as a JDBC database connection, a network connection, or open files to our observables.
Aquí presentamos en comentarios los pasos que debemos seguir para lograr este objetivo y también un ejemplo de implementación:
String[] result = {""}; Observable values = Observable.using( () -> "MyResource", r -> { return Observable.create(o -> { for (Character c : r.toCharArray()) { o.onNext(c); } o.onCompleted(); }); }, r -> System.out.println("Disposed: " + r) ); values.subscribe( v -> result[0] += v, e -> result[0] += e ); assertTrue(result[0].equals("MyResource"));
10. Conclusión
En este artículo, hemos hablado de cómo usar la biblioteca RxJava y también cómo explorar sus características más importantes.
El código fuente completo del proyecto, incluidos todos los ejemplos de código utilizados aquí, se puede encontrar en Github.