1. Introducción
RxJava es una implementación de Java Reactive Extensions que nos permite escribir aplicaciones asincrónicas y controladas por eventos. Puede encontrar más información sobre cómo usar RxJava en nuestro artículo de introducción aquí.
RxJava 2 fue reescrito desde cero, lo que trajo múltiples características nuevas; algunos de los cuales se crearon como respuesta a problemas que existían en la versión anterior del marco.
Una de esas características es io.reactivex.Flowable .
2. observable vs . Fluido
En la versión anterior de RxJava, solo había una clase base para tratar con fuentes conscientes y no conscientes de la contrapresión: Observable.
RxJava 2 introdujo una clara distinción entre estos dos tipos de fuentes: las fuentes conscientes de la contrapresión ahora se representan mediante una clase dedicada: Flowable.
Las fuentes observables no admiten la contrapresión. Por eso, deberíamos usarlo para fuentes que simplemente consumimos y no podemos influir.
Además, si estamos tratando con una gran cantidad de elementos, pueden ocurrir dos posibles escenarios relacionados con la contrapresión dependiendo del tipo de Observable .
En caso de utilizar un denominado " Observable frío ", los eventos se emiten de forma perezosa, por lo que estamos a salvo de desbordar a un observador.
Sin embargo, cuando se utiliza un " Observable caliente " , este seguirá emitiendo eventos, incluso si el consumidor no puede seguir el ritmo.
3. Creando un fluido
Hay diferentes formas de crear un Flowable . Convenientemente para nosotros, esos métodos se ven similares a los métodos en Observable en la primera versión de RxJava.
3.1. Fluido simple
Podemos crear un Flowable usando el método just () de manera similar a como lo haríamos con Observable:
Flowable integerFlowable = Flowable.just(1, 2, 3, 4);
Aunque usar just () es bastante simple, no es muy común crear un Flowable a partir de datos estáticos, y se usa con fines de prueba.
3.2. Fluido de observable
Cuando tenemos un Observable , podemos transformarlo fácilmente en Flowable usando el método toFlowable () :
Observable integerObservable = Observable.just(1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable(BackpressureStrategy.BUFFER);
Tenga en cuenta que para poder realizar la conversión, necesitamos enriquecer el Observable con una BackpressureStrategy. Describiremos las estrategias disponibles en la siguiente sección.
3.3. Fluido de FlowableOnSubscribe
RxJava 2 introdujo una interfaz funcional FlowableOnSubscribe , que representa un Flowable que comienza a emitir eventos después de que el consumidor se suscribe.
Debido a eso, todos los clientes recibirán el mismo conjunto de eventos, lo que hace que FlowableOnSubscribe sea seguro contra la contrapresión.
Cuando tenemos el FlowableOnSubscribe podemos usarlo para crear el Flowable :
FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext(1); Flowable integerFlowable = Flowable .create(flowableOnSubscribe, BackpressureStrategy.BUFFER);
La documentación describe muchos más métodos para crear Flowable.
4. Fluido BackpressureStrategy
Algunos métodos como toFlowable () o create () toman una BackpressureStrategy como argumento.
El BackpressureStrategy es una enumeración, que define el comportamiento contrapresión que vamos a aplicar a nuestro fluido.
Puede cachear o descartar eventos o no implementar ningún comportamiento, en el último caso, nosotros nos encargaremos de definirlo, utilizando operadores de contrapresión.
BackpressureStrategy es similar a BackpressureMode presente en la versión anterior de RxJava.
Hay cinco estrategias diferentes disponibles en RxJava 2.
4.1. Buffer
Si usamos BackpressureStrategy.BUFFER , la fuente almacenará en búfer todos los eventos hasta que el suscriptor pueda consumirlos :
public void thenAllValuesAreBufferedAndReceived() { List testList = IntStream.range(0, 100000) .boxed() .collect(Collectors.toList()); Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.BUFFER) .observeOn(Schedulers.computation()).test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertEquals(testList, receivedInts); }
Es similar a invocar el método onBackpressureBuffer () en Flowable, pero no permite definir un tamaño de búfer o la acción onOverflow explícitamente.
4.2. soltar
Podemos usar BackpressureStrategy.DROP para descartar los eventos que no se pueden consumir en lugar de almacenarlos en búfer.
Nuevamente, esto es similar a usar onBackpressureDrop () en Flowable :
public void whenDropStrategyUsed_thenOnBackpressureDropped() { Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.DROP) .observeOn(Schedulers.computation()) .test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertThat(receivedInts.size() < testList.size()); assertThat(!receivedInts.contains(100000)); }
4.3. Último
El uso de BackpressureStrategy.LATEST forzará a la fuente a mantener solo los últimos eventos, sobrescribiendo así cualquier valor anterior si el consumidor no puede mantenerse al día:
public void whenLatestStrategyUsed_thenTheLastElementReceived() { Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.LATEST) .observeOn(Schedulers.computation()) .test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertThat(receivedInts.size() < testList.size()); assertThat(receivedInts.contains(100000)); }
BackpressureStrategy.LATEST y BackpressureStrategy.DROP se ven muy similares cuando miramos el código.
Sin embargo, BackpressureStrategy.LATEST sobrescribirá los elementos que nuestro suscriptor no puede manejar y conservará solo los últimos, de ahí el nombre.
BackpressureStrategy.DROP, por otro lado, descartará los elementos que no se pueden manejar. Esto significa que no se emitirán necesariamente los elementos más nuevos.
4.4. Error
Cuando usamos BackpressureStrategy.ERROR, simplemente estamos diciendo que no esperamos que ocurra contrapresión . En consecuencia, se debe lanzar una MissingBackpressureException si el consumidor no puede mantenerse al día con la fuente:
public void whenErrorStrategyUsed_thenExceptionIsThrown() { Observable observable = Observable.range(1, 100000); TestSubscriber subscriber = observable .toFlowable(BackpressureStrategy.ERROR) .observeOn(Schedulers.computation()) .test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); }
4.5. Desaparecido
Si usamos BackpressureStrategy.MISSING , la fuente empujará elementos sin descartarlos o almacenarlos en búfer.
El downstream tendrá que lidiar con desbordamientos en este caso:
public void whenMissingStrategyUsed_thenException() { Observable observable = Observable.range(1, 100000); TestSubscriber subscriber = observable .toFlowable(BackpressureStrategy.MISSING) .observeOn(Schedulers.computation()) .test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); }
En nuestras pruebas, exceptuamos MissingbackpressureException para las estrategias ERROR y MISSING . Ya que ambos lanzarán dicha excepción cuando se desborde el búfer interno de la fuente.
Sin embargo, vale la pena señalar que ambos tienen un propósito diferente.
Deberíamos usar el primero cuando no esperamos contrapresión en absoluto, y queremos que la fuente arroje una excepción en caso de que ocurra.
Este último podría usarse si no queremos especificar un comportamiento predeterminado en la creación del Flowable . Y usaremos operadores de contrapresión para definirlo más adelante.
5. Resumen
En este tutorial, presentamos la nueva clase introducida en RxJava 2 llamada Flowable.
Para encontrar más información sobre Flowable y su API, podemos consultar la documentación.
Como siempre, todas las muestras de código se pueden encontrar en GitHub.