1. Información general
En este tutorial, vamos a revisar el uso de Reactive Extensions (Rx) en Kotlin idiomático usando la biblioteca RxKotlin.
RxKotlin no es una implementación de Reactive Extensions, per se. En cambio, es principalmente una colección de métodos de extensión. Es decir, RxKotlin aumenta la biblioteca RxJava con una API diseñada con Kotlin en mente.
Por lo tanto, usaremos conceptos de nuestro artículo, Introducción a RxJava, así como el concepto de Flowables que presentamos en un artículo dedicado.
2. Configuración de RxKotlin
Para usar RxKotlin en nuestro proyecto Maven, necesitaremos agregar la dependencia rxkotlin a nuestro pom.xml:
io.reactivex.rxjava2 rxkotlin 2.3.0
O, para un proyecto de Gradle, a nuestro build.gradle:
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
Aquí, estamos usando RxKotlin 2.x, que apunta a RxJava 2. Los proyectos que usan RxJava 1 deben usar RxKotlin 1.x. Los mismos conceptos se aplican a ambas versiones.
Tenga en cuenta que RxKotlin depende de RxJava, pero no actualizan la dependencia con frecuencia a la última versión. Por lo tanto, recomendamos incluir explícitamente la versión específica de RxJava de la que vamos a depender, como se detalla en nuestro artículo de RxJava.
3. Creando s observables en RxKotlin
RxKotlin incluye varios métodos de extensión para crear objetos observables y fluidos a partir de colecciones.
En particular, cada tipo de matriz tiene un método toObservable () y un método toFlowable () :
val observable = listOf(1, 1, 2, 3).toObservable() observable.test().assertValues(1, 1, 2, 3)
val flowable = listOf(1, 1, 2, 3).toFlowable() flowable.buffer(2).test().assertValues(listOf(1, 1), listOf(2, 3))
3.1. Completable s
RxKotlin también proporciona algunos métodos para crear instancias Completables . En particular, podemos convertir Action s, Callable s, Future sy funciones zero-arity a Completable con el método de extensión toCompletable:
var value = 0 val completable = { value = 3 }.toCompletable() assertFalse(completable.test().isCancelled()) assertEquals(3, value)
4. observable y fluida a Mapa y Multimapa
Cuando tenemos un Observable o Flowable que produce instancias de Par , podemos transformarlas en un Observable Único que produce un Mapa:
val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4)) val observable = list.toObservable() val map = observable.toMap() assertEquals(mapOf(Pair("a", 4), Pair("b", 2), Pair("c", 3)), map.blockingGet())
Como podemos ver en el ejemplo anterior, toMap sobrescribe los valores emitidos anteriormente con valores posteriores si tienen la misma clave.
Si queremos acumular todos los valores asociados con una clave en una colección, usamos toMultimap en su lugar:
val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4)) val observable = list.toObservable() val map = observable.toMultimap() assertEquals( mapOf(Pair("a", listOf(1, 4)), Pair("b", listOf(2)), Pair("c", listOf(3))), map.blockingGet())
5. La combinación observables s y fluida s
Uno de los puntos de venta de Rx es la posibilidad de combinar Observable sy Flowable s de varias formas. De hecho, RxJava proporciona varios operadores listos para usar.
Además de eso, RxKotlin incluye algunos métodos de extensión más para combinar Observable sy similares.
5.1. Combinando emisiones observables
Cuando tenemos un Observable que emite otros Observable , podemos usar uno de los métodos de extensión en RxKotlin para combinar los valores emitidos.
En particular, mergeAll combina los observables con flatMap:
val subject = PublishSubject.create
() val observable = subject.mergeAll()
Que sería lo mismo que:
val observable = subject.flatMap { it }
El Observable resultante emitirá todos los valores del Observable s de origen en un orden no especificado.
De manera similar, concatAll usa concatMap (los valores se emiten en el mismo orden que las fuentes), mientras que switchLatest usa switchMap (los valores se emiten desde el último Observable emitido ).
Como hemos visto hasta ahora, todos los métodos anteriores se proporcionan también para fuentes fluidas , con la misma semántica.
5.2. Combinando s Completables , Maybe s y Single s
Cuando tenemos un Observable que emite instancias de Completable , Maybe o Single , podemos combinarlos con el método mergeAllXs apropiado como, por ejemplo, mergeAllMaybes :
val subject = PublishSubject.create
() val observable = subject.mergeAllMaybes() subject.onNext(Maybe.just(1)) subject.onNext(Maybe.just(2)) subject.onNext(Maybe.empty()) subject.onNext(Maybe.error(Exception("error"))) subject.onNext(Maybe.just(3)) observable.test().assertValues(1, 2).assertError(Exception::class.java)
5.3. Combinando Iterable es de observables s
Para las colecciones de observables o fluidas casos vez, RxKotlin tiene un par de otros operadores, fusión y mergeDelayError . Ambos tienen el efecto de combinar todos los Observable o Flowable s en uno que emitirá todos los valores en secuencia:
val observables = mutableListOf(Observable.just("first", "second")) val observable = observables.merge() observables.add(Observable.just("third", "fourth")) observable.test().assertValues("first", "second", "third", "fourth")
The difference between the two operators — which are directly derived from the same-named operators in RxJava — is their treatment of errors.
The merge method emits errors as soon as they're emitted by the source:
// ... observables.add(Observable.error(Exception("e"))) observables.add(Observable.just("fifth")) // ... observable.test().assertValues("first", "second", "third", "fourth")
Whereas mergeDelayError emits them at the end of the stream:
// ... observables.add(Observable.error(Exception("e"))) observables.add(Observable.just("fifth")) // ... observable.test().assertValues("first", "second", "third", "fourth", "fifth")
6. Handling Values of Different Types
Let's now look at the extension methods in RxKotlin for dealing with values of different types.
These are variants of RxJava methods, that make use of Kotlin's reified generics. In particular, we can:
- cast emitted values from one type to another, or
- filter out values that are not of a certain type
So, we could, for example, cast an Observable of Numbers to one of Ints:
val observable = Observable.just(1, 1, 2, 3) observable.cast().test().assertValues(1, 1, 2, 3)
Here, the cast is unnecessary. However, when combining different observables together, we might need it.
With ofType, instead, we can filter out values that aren't of the type we expect:
val observable = Observable.just(1, "and", 2, "and") observable.ofType().test().assertValues(1, 2)
As always, cast and ofType are applicable to both Observables and Flowables.
Furthermore, Maybe supports these methods as well. The Single class, instead, only supports cast.
7. Other Helper Methods
Finally, RxKotlin includes several helper methods. Let's have a quick look.
We can use subscribeBy instead of subscribe – it allows named parameters:
Observable.just(1).subscribeBy(onNext = { println(it) })
Similarly, for blocking subscriptions we can use blockingSubscribeBy.
Additionally, RxKotlin includes some methods that mimic those in RxJava but work around a limitation of Kotlin's type inference.
For example, when using Observable#zip, specifying the zipper doesn't look so great:
Observable.zip(Observable.just(1), Observable.just(2), BiFunction { a, b -> a + b })
So, RxKotlin adds Observables#zip for more idiomatic usage:
Observables.zip(Observable.just(1), Observable.just(2)) { a, b -> a + b }
Notice the final “s” in Observables. Similarly, we have Flowables, Singles, and Maybes.
8. Conclusions
En este artículo, hemos revisado a fondo la biblioteca RxKotlin, que aumenta RxJava para hacer que su API se parezca más a Kotlin idiomático.
Para obtener más información, consulte la página de GitHub de RxKotlin. Para obtener más ejemplos, recomendamos las pruebas RxKotlin.
La implementación de todos estos ejemplos y fragmentos de código se puede encontrar en el proyecto GitHub como un proyecto Maven y Gradle, por lo que debería ser fácil de importar y ejecutar como está.