【 好书分享:《Spring 响应式编程》-- 京东】
C2 Spring 响应式编程基本概念
RxJava 库,Java 第一个响应式库
2.1 早期方案
- 方法1: 可以用 回调 (callback) 来实现跨组件通信。
- 方法2: 用
Future (java.util.concurrent.Future) - 方法3: 更好的
CompletionStage 和 CompletableFuture 。 - 方法4: Spring 4 里 的
ListenableFuture 和 AsyncRestTemplate
为了更好的理解相关的原理,我们需要理解一个经典的设计模式。
观察者模式: 主题(Subject)包含一个依赖者(观察者)列表,主题通过调用自身的一个方法通知观察者有状态变化
观察者模式在运行时注册对象之间的一对多依赖,单向通信,解耦实现,高效分配事件。
public interface Subject<T> {
void registerObserver(Observer<T> observer);
void unregisterObserver(Observer<T> observer);
void notifyObservers(T event);
}
public interface Observer<T> {
void observe(T event);
}
依赖注入 (Dependency Injection) 容器可以负责查找所有 Subject 实例和注册程序。(@EventListener)
接下来我们做一点简单实现:
public class ConcreteObserverA implements Observer<String> {
@Override
public void observe(String event){
System.out.println("ConcreteObserverA:" + event);
}
}
public class ConcreteObserverB implements Observer<String> {
@Override
public void observe(String event){
System.out.println("ConcreteObserverB:" + event);
}
}
public class ConcreteSubject implements Subject<String> {
private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();
public void registerObserver(Observer<String> observer){
observers.add(observer);
}
public void unregisterObserver(Observer<String> observer){
observers.remove(observer);
}
public void notifyObservers(String event){
observers.forEach( ob -> ob.observe(event));
}
}
CopyOnWriteArraySet 是一个线程安全的实现。
通信务必要注意线程安全问题。
针对 Funtional interfaces,比如 Observer 只有一个抽象方法接口,所以可以直接用 lambda 实现。
@Test
public void test(){
Subject<String> subOne = new ConcreteSubject();
Observer<String> obsA1 = Mockito.spy(new ConcreteObserverA());
Observer<String> obsB1 = Mockito.spy(new ConcreteObserverB());
subOne.registerObserver(obsA1);
subOne.registerObserver(obsB1);
Subject<String> subTwo = new ConcreteSubject();
subTwo.registerObserver(event -> System.out.println("ConcreteObserverA:" + event));
subTwo.registerObserver(event -> System.out.println("ConcreteObserverB:" + event));
}
并行传播消息:
private final ExecutorService executorService = Executors.newCachedThreadPool();
public void notifyObservers(String event){
observers.forEach(ob -> executorService.submit(
() -> ob.observe(event)
));
}
多线程要小心。为了防止资源滥用,我们可以限制线程池大小,并且活跃度(liveness)属性设置为 violate 。
基于上述观察者模式的思想,可以变体的变成另一种发布订阅模式。为了实现事件分发,Spring 提供了 @EventListener 注解。
- 观察者模式: 主题 <==> 观察者 (–>触发 订阅<–)
- 发布订阅模式:
发布者 -发布-> 事件通道 <==> 订阅者(–>触发 订阅<–)
事件通道 event channel,又称为消息代理或事件总线。
实现发布订阅模式的库:
- MBassador
- Guava 提供的 EventBus 库
Demo练习:显示房间温度:用Spring原生的方案来实现一个原始的响应式接口
领域模型:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Temperature{
private final double value;
}
模拟传感器:
@Component
public class TemperatureSensor{
private final ApplicationEventPublisher publisher;
private final Random random = new Random();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
public TemperatureSensor(ApplicationEventPublisher publisher){
this.publisher = publisher;
}
@PostConstruct
public void startProcessing(){
executor.schedule(this::probe, 1, SECONDS);
}
private void probe(){
var temp = random.nextGaussian() * 10 + 8;
publisher.publishEvent(new TemperatureSensor(temp));
executor.schedule(this::probe, random.nextInt(3000), MILLISECONDS);
}
}
@Component 将其注册为一个 bean ,在 bean 准备就绪时,@PostConstruct注解的非静态函数会被 Spring 框架调用并且触发,开始执行随机温度序列的发布。事件的生成发生在单独的 ScheduledExecutorService executor 里面。
Spring Web MVC 中,不仅可以返回 @Controller 定义的泛型,还可以返回:
Callable<T> 非容器线程内阻塞调用DeferredResult<T> 可以调用 setResult(T res) 在非容器线程内生成异步响应。
4.2版本之后,可以返回 ResponseBodyEmitter 用于发送多个对象,每个对象和消息转换器解耦。SseEmitter 进一步拓展,可以一个传入请求发送多个传出消息。
StreamimgResponseBody 异步发送原始数据,更方便流式传输大数据而不阻塞 Servlet 线程。
暴露SSE端点:
下面继续构建组件,编写 Controller 用于 HTTP 通信以实现 demo。
@RestController
public class TemperatureController {
private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();
@GetMapping("/temperatre-stream")
public SseEmitter events(HttpServletRequest request) {
SseEmitter emitter = new SseEmitter();
emitter.onTimeout(() -> clients.remove(emitter));
emitter.onComplete(() -> clients.remove(emitter));
clients.add(emitter);
return emitter;
}
@Async
@EventListener
public void handleMessage(Temperatre t) {
List<SseEmitter> deadEmitters = new ArrayList<>();
clients.forEach(emitter -> {
try {
emitter.send(t, MediaType.APPLICATION_JSON);
} catch (Exception e) {
deadEmitters.add(emitter);
}
});
clients.removeAll(deadEmitters);
}
}
SseEmitter 的唯一用途就是用于发送 SSE 事件。客户端请求URI将会得到一个新的 SseEmitter 实例,并且这个实例会注册到服务端的 Set<SseEmitter> clients 里,如果这个 SseEmitter 超时或者完成处理,就会从 clients 列表里删掉自己。
SseEmitter 已经建立通信渠道,所以需要服务端在收到有关温度变化的事件的时候通知到订阅客户。 @EventListener 可以从 Spring 中接受事件,@Async 标记异步执行,所以他在手动配置的线程池里被调用。handleMessage() 接受 Temperatre 事件之后,发送结果到各个客户端。因为 SseEmitter 没有处理错误的回调,所以这里用 try catch 处理异常。
配置异步支持:
@SpringBootApplication
@EnableAsync
public class Application implements AsyncConfigurer {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public Executor getAsyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(60);
executor.setQueueCapacity(5);
executor.initialize();
return executor;
}
}
CorePoolSize 是执行单元的线程限制。
QueueCapacity 是任务队列的容量,默认 Integer.MAX_VALUE ,非正数进去会导致无法并发。
Set the capacity for the ThreadPoolExecutor’s BlockingQueue. Default is Integer.MAX_VALUE. Any positive value will lead to a LinkedBlockingQueue instance; any other value will lead to a SynchronousQueue instance. See Also: LinkedBlockingQueue, SynchronousQueue
MaxPoolSize 控制总量,当线程数量大于等于maxPoolSize时,根据RejectedExecutionHandler设置的 Policy 来处理新加入的任务。
直接运行上面写好的服务端程序,即可测试我们的响应式接口
> curl http://localhost:8080/temperature-stream
data:{"value":14.51242412312}
data:{"value":23.08610531321}
问题和思考:
- Spring Event 的发布订阅机制设计初衷是处理应用程序生命周期事件,不适合高负载高性能场景,并且把业务架构依赖于Spring有风险,以后框架的变动更新可能会导致程序出错。
- 手动分配线程池用于异步广播事件很原始,所以我们应该使用更好的异步响应式框架。
|