IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念 (一) -> 正文阅读

[Java知识库]Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念 (一)

Spring响应式编程

【 好书分享:《Spring 响应式编程》-- 京东】

C2 Spring 响应式编程基本概念

RxJava 库,Java 第一个响应式库

2.1 早期方案

  • 方法1: 可以用 回调 (callback) 来实现跨组件通信。
  • 方法2: 用 Future (java.util.concurrent.Future)
  • 方法3: 更好的 CompletionStageCompletableFuture
  • 方法4: Spring 4 里 的 ListenableFutureAsyncRestTemplate

为了更好的理解相关的原理,我们需要理解一个经典的设计模式。

观察者模式:
主题(Subject)包含一个依赖者(观察者)列表,主题通过调用自身的一个方法通知观察者有状态变化

观察者模式在运行时注册对象之间的一对多依赖,单向通信,解耦实现,高效分配事件。

public interface Subject<T> {
    void registerObserver(Observer<T> observer); 
    void unregisterObserver(Observer<T> observer); 
	  void notifyObservers(T event); 
}

public interface Observer<T> {
    void observe(T event);
}

依赖注入 (Dependency Injection) 容器可以负责查找所有 Subject 实例和注册程序。(@EventListener)

接下来我们做一点简单实现:

public class ConcreteObserverA implements Observer<String> {
    @Override
    public void observe(String event){
        System.out.println("ConcreteObserverA:" + event);
    }
}

public class ConcreteObserverB implements Observer<String> {
    @Override
    public void observe(String event){
        System.out.println("ConcreteObserverB:" + event);
    }
}

//String event
public class ConcreteSubject implements Subject<String> {
    private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();

    public void registerObserver(Observer<String> observer){
        observers.add(observer);
    }
    
    public void unregisterObserver(Observer<String> observer){
        observers.remove(observer);
    }

	  public void notifyObservers(String event){
        observers.forEach( ob -> ob.observe(event));
    }
}

CopyOnWriteArraySet 是一个线程安全的实现。

通信务必要注意线程安全问题。

针对 Funtional interfaces,比如 Observer 只有一个抽象方法接口,所以可以直接用 lambda 实现。

@Test
public void test(){
    Subject<String> subOne = new ConcreteSubject();
    Observer<String> obsA1 = Mockito.spy(new ConcreteObserverA());
    Observer<String> obsB1 = Mockito.spy(new ConcreteObserverB());
    subOne.registerObserver(obsA1);
    subOne.registerObserver(obsB1);

    //lambda way
    Subject<String> subTwo = new ConcreteSubject();
    subTwo.registerObserver(event -> System.out.println("ConcreteObserverA:" + event));
    subTwo.registerObserver(event -> System.out.println("ConcreteObserverB:" + event));
    // same as override observe(String event)
}

并行传播消息:

private final ExecutorService executorService = Executors.newCachedThreadPool();

public void notifyObservers(String event){
    observers.forEach(ob -> executorService.submit(
        () -> ob.observe(event)
    ));
}

多线程要小心。为了防止资源滥用,我们可以限制线程池大小,并且活跃度(liveness)属性设置为 violate 。

基于上述观察者模式的思想,可以变体的变成另一种发布订阅模式。为了实现事件分发,Spring 提供了 @EventListener 注解。

  • 观察者模式: 主题 <==> 观察者 (–>触发 订阅<–)
  • 发布订阅模式:
    发布者 -发布-> 事件通道 <==> 订阅者(–>触发 订阅<–)

事件通道 event channel,又称为消息代理或事件总线。

实现发布订阅模式的库:

  • MBassador
  • Guava 提供的 EventBus 库

Demo练习:显示房间温度:用Spring原生的方案来实现一个原始的响应式接口

领域模型:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Temperature{
    private final double value;
}

模拟传感器:

@Component
public class TemperatureSensor{
    private final ApplicationEventPublisher publisher;
    private final Random random = new Random();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    public TemperatureSensor(ApplicationEventPublisher publisher){
        this.publisher = publisher;
    }

    @PostConstruct
    public void startProcessing(){
        executor.schedule(this::probe, 1, SECONDS);
    }

    private void probe(){
        var temp = random.nextGaussian() * 10 + 8;
        publisher.publishEvent(new TemperatureSensor(temp));
        executor.schedule(this::probe, random.nextInt(3000), MILLISECONDS);
    }
}

@Component 将其注册为一个 bean ,在 bean 准备就绪时,@PostConstruct注解的非静态函数会被 Spring 框架调用并且触发,开始执行随机温度序列的发布。事件的生成发生在单独的 ScheduledExecutorService executor 里面。

Spring Web MVC 中,不仅可以返回 @Controller 定义的泛型,还可以返回:

  • Callable<T> 非容器线程内阻塞调用
  • DeferredResult<T> 可以调用 setResult(T res) 在非容器线程内生成异步响应。

4.2版本之后,可以返回 ResponseBodyEmitter 用于发送多个对象,每个对象和消息转换器解耦。SseEmitter进一步拓展,可以一个传入请求发送多个传出消息。

StreamimgResponseBody 异步发送原始数据,更方便流式传输大数据而不阻塞 Servlet 线程。

暴露SSE端点:

下面继续构建组件,编写 Controller 用于 HTTP 通信以实现 demo。

@RestController
public class TemperatureController {
    private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();

    @GetMapping("/temperatre-stream")
    public SseEmitter events(HttpServletRequest request) {
        SseEmitter emitter = new SseEmitter();
        emitter.onTimeout(() -> clients.remove(emitter));
        emitter.onComplete(() -> clients.remove(emitter));
        clients.add(emitter);
        return emitter;
    }

    @Async
    @EventListener
    public void handleMessage(Temperatre t) {
        List<SseEmitter> deadEmitters = new ArrayList<>();
        clients.forEach(emitter -> {
            try {
                emitter.send(t, MediaType.APPLICATION_JSON);
            } catch (Exception e) {
                deadEmitters.add(emitter);
            }
        });
        clients.removeAll(deadEmitters);
    }
}

SseEmitter 的唯一用途就是用于发送 SSE 事件。客户端请求URI将会得到一个新的 SseEmitter 实例,并且这个实例会注册到服务端的 Set<SseEmitter> clients 里,如果这个 SseEmitter 超时或者完成处理,就会从 clients 列表里删掉自己。

SseEmitter 已经建立通信渠道,所以需要服务端在收到有关温度变化的事件的时候通知到订阅客户。 @EventListener 可以从 Spring 中接受事件,@Async 标记异步执行,所以他在手动配置的线程池里被调用。handleMessage() 接受 Temperatre 事件之后,发送结果到各个客户端。因为 SseEmitter 没有处理错误的回调,所以这里用 try catch 处理异常。

配置异步支持:

@SpringBootApplication
@EnableAsync
public class Application implements AsyncConfigurer {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

    @Override
    public Executor getAsyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(60);
        executor.setQueueCapacity(5);
        executor.initialize();
        return executor;
    }

}

CorePoolSize 是执行单元的线程限制。

QueueCapacity 是任务队列的容量,默认 Integer.MAX_VALUE ,非正数进去会导致无法并发。

Set the capacity for the ThreadPoolExecutor’s BlockingQueue.
Default is Integer.MAX_VALUE.
Any positive value will lead to a LinkedBlockingQueue instance; any other value will lead to a SynchronousQueue instance.
See Also: LinkedBlockingQueue, SynchronousQueue

MaxPoolSize 控制总量,当线程数量大于等于maxPoolSize时,根据RejectedExecutionHandler设置的 Policy 来处理新加入的任务。

直接运行上面写好的服务端程序,即可测试我们的响应式接口

> curl http://localhost:8080/temperature-stream
data:{"value":14.51242412312}
data:{"value":23.08610531321}

问题和思考:

  • Spring Event 的发布订阅机制设计初衷是处理应用程序生命周期事件,不适合高负载高性能场景,并且把业务架构依赖于Spring有风险,以后框架的变动更新可能会导致程序出错。
  • 手动分配线程池用于异步广播事件很原始,所以我们应该使用更好的异步响应式框架。
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-08-17 01:24:15  更:2021-08-17 01:25:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 8:56:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码