学习使用Rxjava

RxJava

What:

异步

基本概念

  • Observable 通用的被观察者
  • Observer 通用的观察者
  • Scheduler 线程调度器
  • Disposable 一次性的

how

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//首先创建被观察者
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("hello");
int error = 1;
if (error == 0) {
observableEmitter.onError(new Exception("error == 0"));
}
observableEmitter.onComplete();
}
});
//创建观察者

Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");//产生订阅关系调用,即在调用subscribe线程
}

@Override
public void onNext(String str) {
System.out.println(str);
}

@Override
public void onError(Throwable throwable) {
System.out.println("error!!");
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
};

//订阅,(注入灵魂233)
observable.subscribe(oberver);

//output
onSubscribe
hello
onComplete

结合使用lambda链式写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.create(subscribe-> {
subscribe.onNext("hello");
int errorCode = 0;
if (errorCode == 1) {
subscribe.onError(new Exception("errorCode = 1"));
}
subscribe.onComplete();
}).doOnSubscribe(str -> {
System.out.println("onSubscribe");
}).subscribe(
str-> System.out.println(str),
throwable -> throwable.printStackTrace(),
()-> System.out.println("complete"));
//output
onSubscribe
hello
complete

线程调度

Rxjava提供了以下几种线程调度器:

1
2
3
4
5
6
Schedulers.single();//所有任务在单线程
Schedulers.newThread();//使用新线程
Schedulers.trampoline();//
Schedulers.io();//用于IO密集型操作,比如读取文件,数据库,网络
Schedulers.computation();//用于CPU密集型计算,比如xml,json解析,Bitmap压缩取样
AndroidSchedulers.mainThread();//android 主线程

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 Observable.create(emitter -> {
System.out.println("observable");
emitter.onNext("hello");
System.out.println(Thread.currentThread().getId());
}) .subscribeOn(Schedulers.newThread())//被观察者执行动作所在线程
.observeOn(Schedulers.newThread())//观察者执行动作所在线程
.subscribe(str->{
System.out.println("observer");
System.out.println(Thread.currentThread().getId());
System.out.println(str);
});
//output
observable
14
observer
15
hello

这样就基本上能完成很多线程切换的操作。

操作符

格式各样的操作符,我就记下我看懂的操作符吧呃呃

变换

  • map
  • flatMap

过滤

  • filter
  • take
  • takeLast
  • skip
  • skipLast
  • elementAt
  • debounce
  • distinct
  • first
  • last

组合

  • merge
  • startWith
  • concat
  • zip

其它

  • interval
  • compose
map

将发射值进行转换成另一种发设值

map(Func1<? super T, ? extends R> func))

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1,2,3,4,5)
.map(integer -> {
char tmp = (char) ('a' + integer);
return String.valueOf(integer) + tmp;
}).subscribe(System.out::println);
//output
1b
2c
3d
4e
5f
flatMap

发射值进行进一步转换成Observable序列,平摊成Observable序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.just("1234", "5678")
.flatMap((Function<String, ObservableSource<Character>>) s -> {
Character[] characters = new Character[s.length()];
for (int i = 0; i < s.length(); i++) {
characters[i] = s.charAt(i);
}
return Observable.fromArray(characters);
}).subscribe(System.out::println);
//output
1
2
3
4
5
6
7
8
scan

将上一个经过函数处理的发射值作为下一个参数继续处理。

好比如:第一次处理为1,第二次就是1+2=3,然后第三次3+3=6,依次类推

1
2
3
4
5
6
7
8
9
Observable.just(1, 2, 3, 4, 5)
.scan((integer, integer2) -> integer + integer2)
.subscribe(System.out::println);
//output
1
3
6
10
15
filter

判断式过滤

1
2
3
4
5
6
7
  Observable.just(1,2,3,4,5,6)
.filter(integer -> integer % 2 == 0)
.subscribe(System.out::println);
//output
2
4
6
take

只取前n个元素

1
2
3
4
5
6
7
Observable.just(1,2,3,4,5,6)
.take(3)
.subscribe(System.out::println);
//output
1
2
3
takeLast

只去后n个元素

1
2
3
4
5
6
7
 Observable.just(1,2,3,4,5,6)
.takeLast(3)
.subscribe(System.out::println);
//output
4
5
6
skip

跳过前n个元素

1
2
3
4
5
6
7
8
 Observable.just(1,2,3,4,5,6)
.skip(2)
.subscribe(System.out::println);
//output
3
4
5
6
skipLast

跳过后n个元素

1
2
3
4
5
6
7
8
 Observable.just(1,2,3,4,5,6)
.skipLast(2)
.subscribe(System.out::println);
//output
1
2
3
4
elementAt

只去固定序号元素

1
2
3
4
5
 Observable.just(1,2,3,4,5,6)
.elementAt(3)
.subscribe(System.out::println);
//output
4
debounce

过滤掉了由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。通常我们用来结合RxBinding(Jake Wharton大神使用RxJava封装的Android UI组件)使用,防止button重复点击。

distinct

去重

1
2
3
4
5
6
7
Observable.just(1,2,3,3,2,1)
.distinct()
.subscribe(System.out::println);
//output
1
2
3

distinct(Func1) 则需要你给出key作为去重依据

distinctUntilChanged(Func1)和distinct(Func1)一样,根据Func1中call方法产生一个Key来判断两个相邻的数据项是否相同。

first

只发送第一个元素,first参数为,当没有元素发出,则使用参数发出

1
2
3
4
5
Observable.just(1,2,3,4)
.first(2)
.subscribe(System.out::println);
//output
1
last

类似first,最后一个

merge

两个事件序列和一个,合并的数据是无序的。

startWith

在某序列前合并

1
2
3
4
startWith(T);
startWith(T...);
startWith(Observable<T>);
startWith(Iterable<T>);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable a = Observable.just(1,2,3,4);
Observable b = Observable.just('a','b','c');
a.startWith(Arrays.asList(0,9,22)).subscribe(System.out::println);
System.out.println("--------------");
a.startWith(b).subscribe(System.out::println);
//output
0
9
22
1
2
3
4
--------------
a
b
c
1
2
3
4
concat

严格按照发射顺序组合,上一个没发射完,下一个不会发射。

zip

将两个事件序列,通过函数组合新的,数量不匹配的元素丢弃。

1
2
3
4
5
6
7
8
9
Observable a = Observable.just(1,2,3,4,6);
Observable b = Observable.just('a','b','c');
Observable.zip(a,b,(BiFunction<Integer, Character, String>)
(integer, character) -> integer + String.valueOf(character))
.subscribe(System.out::println);
//output
1a
2b
3c
interval

按照间隔执行,但是要产生订阅关系的线程不能立即执行完毕,否则无法执行

1
2
3
4
5
6
7
8
Observable.interval(300,TimeUnit.MILLISECONDS).subscribe(position -> System.out.println(position));
try {
Thread.sleep(20000);
} catch (Exception e) {
e.printStackTrace();
}
//output
按照间隔300ms打印positon,从0开始,代表次数
compose

对事件序列的变换也就是Obervable的变化,之前是mapflatMap是对发射元素的变换,这个可以直接处理事件流

1
2
3
4
5
6
7
8
        Observable.just(1,2,3,4)
.compose(upstream -> upstream.subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()))
.subscribe();
//等价于
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread()))
.subscribe();

比如重复代码:线程切换,可以写成通用代码,然后直接传给compose就行了同理有其他的流处理操作也可以复用,然后调用compose

1
2
3
public static <T> ObservableTransformer<T, T> rxSchedulerhelper(){
return observable -> observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}

why

调用还是挺简单的,内部实现的话可以概括为:

自上而下生成装饰的Observable,在subscribe时,从下往上订阅上游Observable并且生成装饰的Observer。

自上而下包装Observable,subscriber自下而上生调用上流的observale,同时消费对应的Observer,即自下而上包装Obserer,在顺序上表现为,下流Obervable不断的调用存储上流的Obervable,在onNext时,上游Oberver依次调用下游Oberver。

分析最简单的订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println(Thread.currentThread().getId());
System.out.println("run");
emitter.onNext("test");
emitter.onComplete();
}
});

Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(Thread.currentThread().getId());
System.out.println("onSubscribe");
}

@Override
public void onNext(String o) {
System.out.println(Thread.currentThread().getId());
System.out.println("onNext" + o);
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
};

observable.subscribe(observer);

首先Observable.create创键出Obervable。

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

返回了一个ObservableCreate的并且把我们的源储存起来了

调用subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);/
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

接着实际调用subscribeActual,这个是抽象方法,所以得看ObservableCreate得实现内容

1
2
3
4
5
6
7
8
9
10
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);//将observer传入CreateEmitter
observer.onSubscribe(parent);//触发onSubscribe
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

调用我们源头的source,也就是开始定义得。然后就执行

1
2
3
4
5
6
7
8
9
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println(Thread.currentThread().getId());
System.out.println("run");
emitter.onNext("test");//触发observer的onNext
emitter.onComplete();//触发observer的onComplete
}
});

这样就完成一个订阅关系。可以看出每一层Observable和Observer通过subscribe订阅触发,如果调用了线程切换,各种操作符,其实也是在从上层生成新的Observable包裹着上游的Observable,最后在最下层订阅,从下层开始一层一层调用subscribe的最终调到源头的Observable。

带线程的调度分析

知道他的触发原理后我们可以直接去看那一层的Observable和Observer

ObservableSubscribeOn

这个就是我们使用subscribeOn即在Observable线程调度,看看它实际触发订阅的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);//新的Observer

s.onSubscribe(parent);//onSubscribe总是在订阅触发线程执行

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));//使用具体的调度器取执行调度
}


final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);//触发上一层的订阅
}
}

额,scheduler的调度我下次分析把,反正大抵上就是使用线程池啊等来进行线程调度。

ObservableObserveOn

这个就是形成obsrverOn的地方,也就是observer的线程调度

1
2
3
4
5
6
7
8
9
10
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//这次就没有再subscribe上做手脚,而是再ObserveOnObserver包装类上做了线程切换,下游的onNext进行线程切换处理

因为Obervable是从上到下进行包裹,所以subscribOn多次调用也只会是第一起作用。而Observer是订阅时从下往上包裹,所以observerOn可以变化许多次。

其它操作符也是如此把。

RxBus

RxBus 实现是在一个既为Observable和Observer的实例中内部保存了事件数组,通过触发onNext遍历到对应的事件,进行调用。

待补充。。。