RxJava
What:
异步
基本概念
- Observable 通用的被观察者
- Observer 通用的观察者
- Scheduler 线程调度器
- Disposable 一次性的
how
简单使用
1 | //首先创建被观察者 |
结合使用lambda链式写法
1 | Observable.create(subscribe-> { |
线程调度
Rxjava提供了以下几种线程调度器:
1 | Schedulers.single();//所有任务在单线程 |
使用
1 | Observable.create(emitter -> { |
这样就基本上能完成很多线程切换的操作。
操作符
格式各样的操作符,我就记下我看懂的操作符吧呃呃
变换
- map
- flatMap
过滤
- filter
- take
- takeLast
- skip
- skipLast
- elementAt
- debounce
- distinct
- first
- last
组合
- merge
- startWith
- concat
- zip
其它
- interval
- compose
map
将发射值进行转换成另一种发设值
map(Func1<? super T, ? extends R> func))
1 | Observable.just(1,2,3,4,5) |
flatMap
发射值进行进一步转换成Observable序列,平摊成Observable序列
1 | Observable.just("1234", "5678") |
scan
将上一个经过函数处理的发射值作为下一个参数继续处理。
好比如:第一次处理为1,第二次就是1+2=3,然后第三次3+3=6,依次类推
1 | Observable.just(1, 2, 3, 4, 5) |
filter
判断式过滤
1 | Observable.just(1,2,3,4,5,6) |
take
只取前n个元素
1 | Observable.just(1,2,3,4,5,6) |
takeLast
只去后n个元素
1 | Observable.just(1,2,3,4,5,6) |
skip
跳过前n个元素
1 | Observable.just(1,2,3,4,5,6) |
skipLast
跳过后n个元素
1 | Observable.just(1,2,3,4,5,6) |
elementAt
只去固定序号元素
1 | Observable.just(1,2,3,4,5,6) |
debounce
过滤掉了由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。通常我们用来结合RxBinding(Jake Wharton大神使用RxJava封装的Android UI组件)使用,防止button重复点击。
distinct
去重
1 | Observable.just(1,2,3,3,2,1) |
distinct(Func1) 则需要你给出key作为去重依据
distinctUntilChanged(Func1)和distinct(Func1)一样,根据Func1中call方法产生一个Key来判断两个相邻的数据项是否相同。
first
只发送第一个元素,first参数为,当没有元素发出,则使用参数发出
1 | Observable.just(1,2,3,4) |
last
类似first,最后一个
merge
两个事件序列和一个,合并的数据是无序的。
startWith
在某序列前合并
1 | startWith(T); |
1 | Observable a = Observable.just(1,2,3,4); |
concat
严格按照发射顺序组合,上一个没发射完,下一个不会发射。
zip
将两个事件序列,通过函数组合新的,数量不匹配的元素丢弃。
1 | Observable a = Observable.just(1,2,3,4,6); |
interval
按照间隔执行,但是要产生订阅关系的线程不能立即执行完毕,否则无法执行
1 | Observable.interval(300,TimeUnit.MILLISECONDS).subscribe(position -> System.out.println(position)); |
compose
对事件序列的变换也就是Obervable的变化,之前是mapflatMap是对发射元素的变换,这个可以直接处理事件流
1 | Observable.just(1,2,3,4) |
比如重复代码:线程切换,可以写成通用代码,然后直接传给compose就行了同理有其他的流处理操作也可以复用,然后调用compose
1 | public static <T> ObservableTransformer<T, T> rxSchedulerhelper(){ |
why
调用还是挺简单的,内部实现的话可以概括为:
自上而下生成装饰的Observable,在subscribe时,从下往上订阅上游Observable并且生成装饰的Observer。
自上而下包装Observable,subscriber自下而上生调用上流的observale,同时消费对应的Observer,即自下而上包装Obserer,在顺序上表现为,下流Obervable不断的调用存储上流的Obervable,在onNext时,上游Oberver依次调用下游Oberver。
分析最简单的订阅
1 | Observable observable = Observable.create(new ObservableOnSubscribe<String>() { |
首先Observable.create创键出Obervable。
1 | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { |
返回了一个ObservableCreate的并且把我们的源储存起来了
调用subscribe
1 | public final void subscribe(Observer<? super T> observer) { |
接着实际调用subscribeActual,这个是抽象方法,所以得看ObservableCreate得实现内容
1 | protected void subscribeActual(Observer<? super T> observer) { |
调用我们源头的source,也就是开始定义得。然后就执行
1 | Observable observable = Observable.create(new ObservableOnSubscribe<String>() { |
这样就完成一个订阅关系。可以看出每一层Observable和Observer通过subscribe订阅触发,如果调用了线程切换,各种操作符,其实也是在从上层生成新的Observable包裹着上游的Observable,最后在最下层订阅,从下层开始一层一层调用subscribe的最终调到源头的Observable。
带线程的调度分析
知道他的触发原理后我们可以直接去看那一层的Observable和Observer
ObservableSubscribeOn
这个就是我们使用subscribeOn即在Observable线程调度,看看它实际触发订阅的方法
1 | public void subscribeActual(final Observer<? super T> s) { |
额,scheduler的调度我下次分析把,反正大抵上就是使用线程池啊等来进行线程调度。
ObservableObserveOn
这个就是形成obsrverOn的地方,也就是observer的线程调度
1 | protected void subscribeActual(Observer<? super T> observer) { |
因为Obervable是从上到下进行包裹,所以subscribOn多次调用也只会是第一起作用。而Observer是订阅时从下往上包裹,所以observerOn可以变化许多次。
其它操作符也是如此把。
RxBus
RxBus 实现是在一个既为Observable和Observer的实例中内部保存了事件数组,通过触发onNext遍历到对应的事件,进行调用。
待补充。。。