IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 《Head First 设计模式》学习笔记---观察者模式+异步实现消息发送 -> 正文阅读

[Java知识库]《Head First 设计模式》学习笔记---观察者模式+异步实现消息发送

1.使用场景

平时工作中都会用到MQ, 主要作用是解耦, 异步, 消峰. 但是很多简单的场景其实没有必要使用MQ来发送消息, 假设有一个用户注册, 注册成功需要发送各种消息通知, 但可能代码是这么编码的:

@PostMapping
    public BaseRspVo test() {
        // 注册成功的逻辑
        // 发送短信通知
        msg.send();
        // 发送邮件通知
        email.send();
    }

不考虑同步的性能问题, 而且以后也不会再增加新的信息通知类型, 这种编码实际上是没有问题的.

来分析下这种编码存在的问题:

1.发送短信和邮件通知不应该有先后循序, 不考虑代码异常, 使用同步会影响程序性能

2.如果后续增加新的通知类型, 那么这个接口需要更改, 这就违反了开闭原则, 如果注册成功后需要处理的业务逻辑越来越多, 这个接口就会变得相当复杂, 代码不易阅读, 且日后很难维护, 逐渐形成代码屎山, 谁接手谁痛苦

围绕着上面2个问题, 来做一下优化, 可以使用设计模式中的观察者模式, 下面结合demo和业务案例来实现下观察者模式

提示: 本文篇幅较长

2.观察者模式

首选观察者模式的使用场景非常广泛, 小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式。

《Head First 设计模式》书中是这么定义观察者模式的:

观察者模式定义了对象之间的一对多依赖, 这样一来, 当一个对象的状态发生改变, 他的所有依赖者都会收到通知并自动更新

这里的对象就是被观察者, 他的所有依赖就是观察者, 并让他们之间松耦合

来看一下书中定义的观察者模式的类图:

根据这个类图可以设计一个demo, 模拟气象站在更新天气后, 把天气信息推送到气象面板, 具体代码如下 :

2.1 首先定义被气象站顶级接口, 以及他的实现类 , 他们都可以定义为被观察者

package com.demo03;

/**
 * @author canxiusi.yan
 * @description WeatherSubject 主题接口
 * @date 2022/2/6 18:02
 */
public interface WeatherSubject {

    /**
     * 注册观察者
     * @param o
     */
    void registerObserver(Observer o);

    /**
     * 移除观察者
     * @param o
     */
    void removeObserver(Observer o);

    /**
     * 通知观察者
     */
    void notifyObservers();
}
package com.demo03.observer;

import com.demo03.Observer;
import com.demo03.WeatherSubject;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * @author canxiusi.yan
 * @description WeatherData 具体主题总是实现主题接口
 * @date 2022/2/6 18:41
 */
public class WeatherData implements WeatherSubject {

    private List<Observer> observerList;
    private float temp;
    private float humidity;
    private float pressure;

    public WeatherData() {
        observerList = new CopyOnWriteArrayList<>();
    }

    /**
     * 注册观察者
     * @param o
     */
    @Override
    public void registerObserver(Observer o) {
        if (!observerList.contains(o)) {
            observerList.add(o);
        }
    }

    @Override
    public void removeObserver(Observer o) {
        observerList.remove(o);
    }

    /**
     * 通知观察者
     */
    @Override
    public void notifyObservers() {
        for (Observer observer : observerList) {
            // 调用子类的方法
            observer.update(temp, humidity, pressure);
        }
    }

    public void setMeasurements(float temp, float humidity, float pressure) {
        this.temp = temp;
        this.humidity = humidity;
        this.pressure = pressure;
        // 调用方法
        measurementsChanged();
    }

    private void measurementsChanged() {
        notifyObservers();
    }
}

2.2 定义气象面板接口以及实现类,? 他们都可以定义为观察者

package com.demo03;

/**
 * @author canxiusi.yan
 * @description Observer 观察者父接口
 * @date 2022/2/6 18:06
 */
public interface Observer {

    /**
     * 更新天气
     * @param temp 温度
     * @param humidity 湿度
     * @param pressure 气压
     */
    void update(float temp, float humidity, float pressure);
}
package com.demo03.observer;

import com.demo03.DisplayElement;
import com.demo03.Observer;
import com.demo03.WeatherSubject;

/**
 * @author canxiusi.yan
 * @description CurrentConditionsDisplay 具体观察者实例
 * @date 2022/2/6 19:15
 */
public class CurrentConditionsDisplay implements Observer, DisplayElement {

    private float temp;
    private float humidity;
    private WeatherSubject weatherSubject;

    @Override
    public void display() {
        System.out.println("当前状况 : " + "CurrentConditionsDisplay{" +
                "temp=" + temp +
                ", humidity=" + humidity
                + "}");
    }

    @Override
    public void update(float temp, float humidity, float pressure) {
        this.temp = temp;
        this.humidity = humidity;
        display();
    }

    /**
     * 注册观察者
     * @param weatherSubject
     */
    public CurrentConditionsDisplay(WeatherSubject weatherSubject) {
        this.weatherSubject = weatherSubject;
        weatherSubject.registerObserver(this);
    }
}

2.3 定义mini气象站并输出演示

package com.demo03;

import com.demo03.observer.CurrentConditionsDisplay;
import com.demo03.observer.WeatherData;

/**
 * @author canxiusi.yan
 * @description WeatherStation
 * @date 2022/2/6 19:18
 */
public class WeatherStation {

    public static void main(String[] args) {

        WeatherData weatherData = new WeatherData();
        CurrentConditionsDisplay conditionsDisplay = new CurrentConditionsDisplay(weatherData);

        // 被观察者控制数据, 观察者打印输出
        weatherData.setMeasurements(30, 60, 1000);
        weatherData.setMeasurements(36, 50, 1000);
        weatherData.setMeasurements(33, 55, 1000);
    }
}
/Library/Java/JavaVirtualMachines/zulu-8.jdk/Contents/Home/bin/java 
当前状况 : CurrentConditionsDisplay{temp=30.0, humidity=60.0}
当前状况 : CurrentConditionsDisplay{temp=36.0, humidity=50.0}
当前状况 : CurrentConditionsDisplay{temp=33.0, humidity=55.0}

Process finished with exit code 0

3.分析实现过程

在main方法中, 显示创建被观察者实例WeatherData, 以及观察者实例CurrentConditionDisplay

在CurrentConditionDisplay的构造器中, 传递被观察者实例声明自己要观察的对象, 并调用registerObserver()把它注册到被观察者集合中

在main方法中使用被观察者对象调用了setMeasurements()方法, 更新天气信息, 也就是推送数据

之后会依次调用, 直至调用观察者对象的display()方法, 打印输出刚才被观察者推送的数据

总的来看其实就是一种发布订阅模型, 每当气象站更新天气数据, 气象面板都会打印出最新的数据, 并且气象站不用关心都有哪些气象面板, 气象面板也不用知道是谁推送的消息, 做到了对象之间的松耦合

也就是上面所说的观察者模式定义了对象之间的一对多依赖, 这样一来, 当一个对象的状态发生改变, 他的所有依赖者都会收到通知并自动更新

实际上,观察者模式是一个比较抽象的模式,根据不同的应用场景和需求,有完全不同的实现方式,等会我会做一个具体的场景。现在,我们先来看其中最经典的一种实现方式。这也是在说到这种模式的时候,很多书籍或资料给出的最常见的实现方式。具体的代码如下所示:

package com.result;

import java.util.ArrayList;
import java.util.List;

public interface Observable {
  void registerObserver(Observer observer);
  void removeObserver(Observer observer);
  void notifyObservers(String message);
}

public interface Observer {
  void update(String message);
}

public class ConcreteObservable implements Observable {
  private List<Observer> observers = new ArrayList<Observer>();

  @Override
  public void registerObserver(Observer observer) {
    observers.add(observer);
  }

  @Override
  public void removeObserver(Observer observer) {
    observers.remove(observer);
  }

  @Override
  public void notifyObservers(String message) {
    for (Observer observer : observers) {
      observer.update(message);
    }
  }

}

public class ConcreteObserverOne implements Observer {
  @Override
  public void update(String message) {
    // 消息推送
  }
}

public class ConcreteObserverTwo implements Observer {
  @Override
  public void update(String message) {
    // 消息推送
  }
}

public class Demo {
  public static void main(String[] args) {
    Observable subject = new ConcreteObservable();
    subject.registerObserver(new ConcreteObserverOne());
    subject.registerObserver(new ConcreteObserverTwo());
    subject.notifyObservers("aaa");
  }
}

其实上面的代码算是观察者模式的“模板代码”,具体设计思路,? 我们可以根据自己的业务场景重新设计代码, 现在就可以改造文章一开始的注册, 发送通知的功能了!

4.改造用户注册代码

4.1 首先是被观察者, 无例外, 我们需要接口和实现类(图片依次列出), 以及具体的业务场景消息发送器

package com.demo09.observable;

import java.util.Collection;

/**
 * @author canxiusi.yan
 * @description Observable 被观察者父接口
 * @date 2022/2/15 10:32
 */
public interface Observable<E> {

    /**
     * 添加观察者
     * @param e
     */
    void addObserver(E e);

    /**
     * 添加全部观察者
     */
    void addObserverAll(Collection<? extends E> collection);

    /**
     * 移除观察者
     * @param e
     */
    void deleteObserver(E e);

    /**
     * 通知观察者
     */
    void notifyObservers();

    /**
     * 通知观察者
     * @param arg
     */
    void notifyObservers(Object arg);

    /**
     * 移除所有观察者
     */
    void deleteObservers();

    /**
     * 观察者数量
     * @return
     */
    int countObservers();

}
package com.demo09.observable;

import com.demo09.observer.Observer;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author canxiusi.yan
 * @description MsgObservable 被观察者抽象实现
 * @date 2022/2/15 11:29
 */
public abstract class MsgObservable<E> implements Observable<E> {

    /**
     * 观察者集合
     */
    private final List<E> observerList = new ArrayList<>();

    @Autowired
    ThreadPoolExecutor threadPoolExecutor;

    @Override
    public void addObserver(E e) {
        observerList.add(e);
    }

    @Override
    public void addObserverAll(Collection<? extends E> collection) {
        observerList.addAll(collection);
    }

    @Override
    public void deleteAllObserver(Collection<? extends E> collection) {
        observerList.removeAll(collection);
    }

    @Override
    public void notifyObservers() {
        notifyObservers(null);
    }

    @Override
    public void notifyObservers(Object arg) {
        for (E e : observerList) {
            if (e instanceof Observer) {
                // 考虑到该场景发送消息没有固定的顺序, 可以使用开启多线程实现进程间的异步
                threadPoolExecutor.execute(() -> {
                    Observer observer = (Observer) e;
                    observer.send(arg);
                });
            }
        }
    }

    @Override
    public int countObservers() {
        return observerList.size();
    }

    @Override
    public void removeObserver(Observer observer){
        observerList.remove(observer);
    }
}
package com.demo09.observable.impl;

import com.demo09.observable.MsgObservable;
import com.demo09.observer.EmailSendObserver;
import com.demo09.observer.MsgSendObserver;
import com.demo09.observer.Observer;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
 * @author canxiusi.yan
 * @description MsgSender 注册消息发送器
 * @date 2022/2/15 13:29
 */
@Component
public class RegisterSender extends MsgObservable {

    @Autowired
    EmailSendObserver emailSendObserver;

    @Autowired
    MsgSendObserver msgSendObserver;

    /**
     * 初始化注册观察者集合
     */
    @PostConstruct
    @SuppressWarnings("unchecked")
    public void initRegisterSender() {
        List<Observer> registerSender = Lists.newArrayList(emailSendObserver, msgSendObserver);
        super.addObserverAll(registerSender);
    }

    @Override
    public void notifyObservers(Object arg) {
        super.notifyObservers(arg);
    }

}
package com.demo09.observable.impl;

import com.demo09.observable.MsgObservable;
import com.demo09.observer.*;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
 * @author canxiusi.yan
 * @description OtherSender 根据业务可以定义别的发送器, 这里模拟微信小程序和公共号推送消息
 * @date 2022/2/16 10:22
 */
@Component
public class WxSender extends MsgObservable {

    @Autowired
    WxMiniProgramSendObserver wxMiniProgramSendObserver;

    @Autowired
    WxPublicAccountSendObserver wxPublicAccountSendObserver;

    @PostConstruct
    @SuppressWarnings("unchecked")
    public void initWxSender() {
        List<Observer> registerSender = Lists.newArrayList(wxPublicAccountSendObserver, wxMiniProgramSendObserver);
        super.addObserverAll(registerSender);
    }

    @Override
    public void notifyObservers(Object arg) {
        super.notifyObservers(arg);
    }
}

先说明一下引入发送器的目的, 第一是初始化注册观察者集合, 第二是推送消息, 也是为了开闭原则和解耦合, 比如设计初期只需要注册业务发送消息, 可能后续的增加了用户注销账户也要发送消息, 这样就可以定义新的DestorySender, 去继承MsgObservable类即可

4.2 接下来是观察者接口, 以及具体的观察者实现类

package com.demo09.observer;

/**
 * @author canxiusi.yan
 * @description MsgObserver
 * @date 2022/2/15 10:08
 */
public interface Observer {

    /**
     * 发送消息
     * @param arg
     */
    void send(Object arg);
}
package com.demo09.observer;

import com.demo09.service.MsgSendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadPoolExecutor;


/**
 * @author canxiusi.yan
 * @description MsgSendObserver 短信发送观察者
 * @date 2022/2/15 10:30
 */
@Component
public class MsgSendObserver implements Observer {

    @Autowired
    MsgSendService msgSendService;

    @Autowired
    ThreadPoolExecutor threadPoolExecutor;

    @Override
    public void send(Object arg) {
        threadPoolExecutor.execute(() -> msgSendService.sendMsg(arg.toString()));
    }
}
package com.demo09.observer;

import com.demo09.service.EmailSendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author canxiusi.yan
 * @description EmailSendObserver 邮件观察者
 * @date 2022/2/15 10:26
 */
@Component
public class EmailSendObserver implements Observer {

    @Autowired
    ThreadPoolExecutor threadPoolExecutor;

    @Autowired
    private EmailSendService emailSendService;

    
    @Override
    public void send(Object arg) {
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                // 调用业务service 发送邮件
                emailSendService.sendEmail(arg.toString());
            }
        });
    }
}

4.3 控制层注入依赖, 并调用

package com.demo09.controller;

import com.demo09.info.RegisterInfo;
import com.demo09.observable.impl.RegisterSender;
import com.demo09.observable.impl.WxSender;
import com.demo09.service.UserRegisterService;
import com.result.BaseRspVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author canxiusi.yan
 * @description Controller
 * @date 2022/2/15 14:10
 */
@RestController
@RequestMapping("/user")
public class RegisterController {

    @Autowired
    private RegisterSender msgSender;

    @Autowired
    private UserRegisterService userRegisterService;

    @Autowired
    private WxSender wxSender;

    @PostMapping("/register")
    public BaseRspVo doRegister(@RequestBody RegisterInfo registerInfo) {
        String userId = userRegisterService.doRegister(registerInfo);
        // 注册成功发送短信邮件...
        msgSender.notifyObservers(userId);
        return BaseRspVo.ok();
    }

    @PostMapping("/wxSend")
    public BaseRspVo wxSend() {
        // 发送之前的业务逻辑
        // 发送微信消息
        wxSender.notifyObservers("你有新消息");
        return BaseRspVo.ok();
    }

}

4.4 运行效果

?由于只是模拟调用, 所以异步效果不是很直观

再来测试一下微信消息推送

至此, 使用观察者模式改造了原有的注册-发消息代码, 并且使用线程池实现了进程内的非阻塞, 后续拓展业务只需要增加新的Sender去继承MsgObservable即可, 便于拓展, 且高内聚低耦合, 如果想实现一个跨进程的, 可以使用消息队列MQ

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-02-19 01:00:55  更:2022-02-19 01:01:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 12:36:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码