Observable
package com.example.testjavademo;
public abstract class Observable implements ObServableSource{
@Override
public void subscribeObserver(ObServer obServer) {
//把这个功能留给不同的Observable处理
sbuscribleActual(obServer);
}
//模板方法
protected abstract void sbuscribleActual(ObServer observer);
//创建具体被观察者
public static <T> Observable create(ObServableOnSubscrile<T> subscrile){
return new ObServableCreate<T>(subscrile);
}
}
ObServer
package com.example.testjavademo;
public interface ObServer<T> {
//接收消息 undate()
void onNext(T t);
//调用关联时调用
void onSubscribe();
//接收异常消息
void onError();
//接收消息完成
void onComplete();
}
ObServableSource
package com.example.testjavademo;
//抽象被观察者
public interface ObServableSource {
//订阅功能 绑定ObServable 与ObServer关联
public void subscribeObserver(ObServer obServer);
}
ObServableOnSubscrile
package com.example.testjavademo;
/**
* 绑定发射器 被观察者和发射器分离了
* @param <T>
*/
public interface ObServableOnSubscrile <T>{
void subscribe(Emitter<T> emitter);
}
Emitter
package com.example.testjavademo;
/**
* 给用户发送消息接口
* @param <T>
*/
public interface Emitter <T>{
//接收消息 undate()
void onNext(T t);
//接收异常消息
void onError();
//接收消息完成
void onComplete();
}
ObServableCreate
package com.example.testjavademo;
/**
* 具体被 观察者
*/
public class ObServableCreate<T> extends Observable{
ObServableOnSubscrile<T> subscrile;
public ObServableCreate ( ObServableOnSubscrile<T> subscrile){
this.subscrile=subscrile;
}
@Override
protected void sbuscribleActual(ObServer observer) {
observer.onSubscribe();
//创建发射器
CreateEmitter createEmitter=new CreateEmitter(observer);
subscrile.subscribe(createEmitter);
}
static final class CreateEmitter<T> implements Emitter<T>{
final ObServer<T> obServer;
CreateEmitter(ObServer<T> obServer) {
this.obServer = obServer;
}
@Override
public void onNext(T t) {
obServer.onNext(t);
}
@Override
public void onError() {
obServer.onError();
}
@Override
public void onComplete() {
obServer.onComplete();
}
}
}
MainActivity
package com.example.testjavademo;
import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
test();
}
private void test() {
Observable .create(new ObServableOnSubscrile<String>() {
@Override
public void subscribe(Emitter<String> emitter) {
emitter.onNext("大家好");
}
}).subscribeObserver(new ObServer<String>() {
@Override
public void onNext(String s) {
Log.i("sklfjskfs",s);
}
@Override
public void onSubscribe() {
}
@Override
public void onError() {
}
@Override
public void onComplete() {
Log.i("sklfjskfs","onComplete");
}
});
}
}
|