是rxjava1.X的版本,现在有2.X了
/**
被观察者
**/
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
//当 Observable被订阅的时候,Observable.OnSubscribe的call()方法会自动被调用,
//事件序列就会依照设定依次触发。这样,由被观察者调用了观察者的回调方法,
//就实现了由被观察者向观察者的事件传递
public void call(Subscriber<? super String> subscriber) {
System.out.println("test call");
//定义事件队列
subscriber.onNext("test next");
subscriber.onCompleted();
}
});
/**
* 观察者
*/
Observer<String> observer = new Observer<String>() {
//被观察者调用onCompleted时触发
@Override
public void onCompleted() {
System.out.println("1");
}
//被观察者调用onError时触发
@Override
public void onError(Throwable e) {
System.out.println("2");
}
//被观察者调用onNext时触发
@Override
public void onNext(String s) {
System.out.println("3");
}
};
/**
* 订阅
*/
observable.subscribe(observer);
/**
* Action只对某些事件做出响应
*/
//只对事件序列中的onNext做出响应
Action1<String> action1 = new Action1<String>() {
@Override
public void call(String s) {
System.out.println("action1--->onNext "+s);
}
};
//只对事件序列中的onError做出响应
Action1<Throwable> action11 = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("action11--->onError "+throwable.getMessage());
}
};
//只对事件序列中的onCompleted做出响应
Action0 action0 = new Action0() {
@Override
public void call() {
System.out.println("action11--->onCompleted ");
}
};
//订阅事件,只处理onNext事件
observable.subscribe(action1);
//订阅事件,处理onNext、onError和onCompleted事件
observable.subscribe(action1,action11,action0);
/**
快速创建
*/
//just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据
observable = Observable.just("hello");
//from()方法将传入的数组或Iterable拆分成具体对象后,订阅之后自动调用onNext方法依次发送,
// 再发送结束后发送onCompleted结束整个事件
observable = Observable.from(new ArrayList<>());
//创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。订阅之后即按照固定时长调用onNext()方法
Observable<Long> observablelong = Observable.interval(1, TimeUnit.SECONDS);
observablelong.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("时长"+aLong);
}
});
//timer()该方法可以在订阅之后延迟发送特定事件
observablelong = Observable.timer(2000,TimeUnit.MILLISECONDS);
/**
变换操作符
*/
// map将被观察者发送的事件转换为任意类型的事件
Integer[] ints = {1, 2, 3};
//map方法把Intger类型转换为String类型
Observable.from(ints).map(new Func1<Integer, String>() {
@Override
public String call(Integer i) {
//对Integer数据进行处理,转换成String类型返回
return i + "i";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s+" mapamap");
}
});
//FlatMap操作符将被观察者发送的多个事件进行拆分,分别转换为不同的事件,多线程下顺序不一定
Observable.just("1,2,3").flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
String[] split = s.split(",");
return Observable.from(split);
}
}).subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
});
//ConcatMap操作符,和FlatMap操作符原理一样,
区别在于 FlatMap把事件拆分后在发送时,顺序可能和被观察者发送的事件顺序不一样,
而ConcatMap操作符处理后的事件顺序和被观察者发送事件的顺序是一样的
//buffer操作符将源Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射
Observable.just(1,2,3,4).buffer(3);
//filter()操作符加入逻辑判断,返回false表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中
/**
*线程调度
*默认情况下,在哪个线程调subscribe(),就会在哪个线程生产事件
*如果需要切换线程,则需要 Scheduler (调度器)
* Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
* Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
* Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。
* 行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,
* Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,
* 这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
*对线程进行控制方法
*subscribeOn(): 指定Observable(被观察者)发布消息所在的线程,或者叫做事件产生的线程。
*observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程
**/
Observable.just(1)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation());
|