注:app文件夹内的build.gradle 的 dependencies 添加rxjava 库
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
1.创建Rxbus
首先创建RxBus ,这里的RxBus仅支持基本的功能。如下示例
import androidx.lifecycle.Observer;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;
public class Rxbus {
private final Subject<Object> subject;
private static volatile Rxbus rxbus;
private Rxbus() {
this.subject = PublishSubject.create().toSerialized();
}
public static Rxbus getInstance(){
if (rxbus == null){
synchronized (Rxbus.class){
rxbus = new Rxbus();
}
}
return rxbus;
}
public void post(Object ob){
subject.onNext(ob);
}
public <T> Observable<T> toObservable(Class<T> eventType){
return subject.ofType(eventType);
}
}
2.新建发送的事件类
public class MessageEvent {
private String message;
public String getMessage() {
return message;
}
public MessageEvent(String message){
this.message = message;
}
}
3.订阅事件及接收和处理
Rxbus.getInstance().toObservable(MessageEvent.class).subscribe(new Observer<MessageEvent>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
用于在Activity 注销时进行取消订阅事件,防止内存泄漏
compositeDisposable.add(d);
}
@Override
public void onNext(@NonNull MessageEvent messageEvent) {
if (messageEvent != null){
Log.e("TAG", "onNext: " + messageEvent.getMessage());
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
compositeDisposable 实例化方式:
compositeDisposable = new CompositeDisposable();
4.发送事件
Rxbus.getInstance().post(new MessageEvent("Rxjava 事件"));
5 Activity 销毁时,进行取消订阅
@Override
protected void onDestroy() {
if (compositeDisposable != null){
compositeDisposable.clear();
}
super.onDestroy();
}
以上就是Rxjava3.x 使用RxBus 实现订阅和事件处理的方式
|