1.使用场景
平时工作中都会用到MQ, 主要作用是解耦, 异步, 消峰. 但是很多简单的场景其实没有必要使用MQ来发送消息, 假设有一个用户注册, 注册成功需要发送各种消息通知, 但可能代码是这么编码的:
@PostMapping
public BaseRspVo test() {
// 注册成功的逻辑
// 发送短信通知
msg.send();
// 发送邮件通知
email.send();
}
不考虑同步的性能问题, 而且以后也不会再增加新的信息通知类型, 这种编码实际上是没有问题的.
来分析下这种编码存在的问题:
1.发送短信和邮件通知不应该有先后循序, 不考虑代码异常, 使用同步会影响程序性能
2.如果后续增加新的通知类型, 那么这个接口需要更改, 这就违反了开闭原则, 如果注册成功后需要处理的业务逻辑越来越多, 这个接口就会变得相当复杂, 代码不易阅读, 且日后很难维护, 逐渐形成代码屎山, 谁接手谁痛苦
围绕着上面2个问题, 来做一下优化, 可以使用设计模式中的观察者模式, 下面结合demo和业务案例来实现下观察者模式
提示: 本文篇幅较长
2.观察者模式
首选观察者模式的使用场景非常广泛, 小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式。
《Head First 设计模式》书中是这么定义观察者模式的:
观察者模式定义了对象之间的一对多依赖, 这样一来, 当一个对象的状态发生改变, 他的所有依赖者都会收到通知并自动更新
这里的对象就是被观察者, 他的所有依赖就是观察者, 并让他们之间松耦合
来看一下书中定义的观察者模式的类图:
根据这个类图可以设计一个demo, 模拟气象站在更新天气后, 把天气信息推送到气象面板, 具体代码如下 :
2.1 首先定义被气象站顶级接口, 以及他的实现类 , 他们都可以定义为被观察者
package com.demo03;
/**
* @author canxiusi.yan
* @description WeatherSubject 主题接口
* @date 2022/2/6 18:02
*/
public interface WeatherSubject {
/**
* 注册观察者
* @param o
*/
void registerObserver(Observer o);
/**
* 移除观察者
* @param o
*/
void removeObserver(Observer o);
/**
* 通知观察者
*/
void notifyObservers();
}
package com.demo03.observer;
import com.demo03.Observer;
import com.demo03.WeatherSubject;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author canxiusi.yan
* @description WeatherData 具体主题总是实现主题接口
* @date 2022/2/6 18:41
*/
public class WeatherData implements WeatherSubject {
private List<Observer> observerList;
private float temp;
private float humidity;
private float pressure;
public WeatherData() {
observerList = new CopyOnWriteArrayList<>();
}
/**
* 注册观察者
* @param o
*/
@Override
public void registerObserver(Observer o) {
if (!observerList.contains(o)) {
observerList.add(o);
}
}
@Override
public void removeObserver(Observer o) {
observerList.remove(o);
}
/**
* 通知观察者
*/
@Override
public void notifyObservers() {
for (Observer observer : observerList) {
// 调用子类的方法
observer.update(temp, humidity, pressure);
}
}
public void setMeasurements(float temp, float humidity, float pressure) {
this.temp = temp;
this.humidity = humidity;
this.pressure = pressure;
// 调用方法
measurementsChanged();
}
private void measurementsChanged() {
notifyObservers();
}
}
2.2 定义气象面板接口以及实现类,? 他们都可以定义为观察者
package com.demo03;
/**
* @author canxiusi.yan
* @description Observer 观察者父接口
* @date 2022/2/6 18:06
*/
public interface Observer {
/**
* 更新天气
* @param temp 温度
* @param humidity 湿度
* @param pressure 气压
*/
void update(float temp, float humidity, float pressure);
}
package com.demo03.observer;
import com.demo03.DisplayElement;
import com.demo03.Observer;
import com.demo03.WeatherSubject;
/**
* @author canxiusi.yan
* @description CurrentConditionsDisplay 具体观察者实例
* @date 2022/2/6 19:15
*/
public class CurrentConditionsDisplay implements Observer, DisplayElement {
private float temp;
private float humidity;
private WeatherSubject weatherSubject;
@Override
public void display() {
System.out.println("当前状况 : " + "CurrentConditionsDisplay{" +
"temp=" + temp +
", humidity=" + humidity
+ "}");
}
@Override
public void update(float temp, float humidity, float pressure) {
this.temp = temp;
this.humidity = humidity;
display();
}
/**
* 注册观察者
* @param weatherSubject
*/
public CurrentConditionsDisplay(WeatherSubject weatherSubject) {
this.weatherSubject = weatherSubject;
weatherSubject.registerObserver(this);
}
}
2.3 定义mini气象站并输出演示
package com.demo03;
import com.demo03.observer.CurrentConditionsDisplay;
import com.demo03.observer.WeatherData;
/**
* @author canxiusi.yan
* @description WeatherStation
* @date 2022/2/6 19:18
*/
public class WeatherStation {
public static void main(String[] args) {
WeatherData weatherData = new WeatherData();
CurrentConditionsDisplay conditionsDisplay = new CurrentConditionsDisplay(weatherData);
// 被观察者控制数据, 观察者打印输出
weatherData.setMeasurements(30, 60, 1000);
weatherData.setMeasurements(36, 50, 1000);
weatherData.setMeasurements(33, 55, 1000);
}
}
/Library/Java/JavaVirtualMachines/zulu-8.jdk/Contents/Home/bin/java
当前状况 : CurrentConditionsDisplay{temp=30.0, humidity=60.0}
当前状况 : CurrentConditionsDisplay{temp=36.0, humidity=50.0}
当前状况 : CurrentConditionsDisplay{temp=33.0, humidity=55.0}
Process finished with exit code 0
3.分析实现过程
在main方法中, 显示创建被观察者实例WeatherData, 以及观察者实例CurrentConditionDisplay
在CurrentConditionDisplay的构造器中, 传递被观察者实例声明自己要观察的对象, 并调用registerObserver()把它注册到被观察者集合中
在main方法中使用被观察者对象调用了setMeasurements()方法, 更新天气信息, 也就是推送数据
之后会依次调用, 直至调用观察者对象的display()方法, 打印输出刚才被观察者推送的数据
总的来看其实就是一种发布订阅模型, 每当气象站更新天气数据, 气象面板都会打印出最新的数据, 并且气象站不用关心都有哪些气象面板, 气象面板也不用知道是谁推送的消息, 做到了对象之间的松耦合
也就是上面所说的观察者模式定义了对象之间的一对多依赖, 这样一来, 当一个对象的状态发生改变, 他的所有依赖者都会收到通知并自动更新
实际上,观察者模式是一个比较抽象的模式,根据不同的应用场景和需求,有完全不同的实现方式,等会我会做一个具体的场景。现在,我们先来看其中最经典的一种实现方式。这也是在说到这种模式的时候,很多书籍或资料给出的最常见的实现方式。具体的代码如下所示:
package com.result;
import java.util.ArrayList;
import java.util.List;
public interface Observable {
void registerObserver(Observer observer);
void removeObserver(Observer observer);
void notifyObservers(String message);
}
public interface Observer {
void update(String message);
}
public class ConcreteObservable implements Observable {
private List<Observer> observers = new ArrayList<Observer>();
@Override
public void registerObserver(Observer observer) {
observers.add(observer);
}
@Override
public void removeObserver(Observer observer) {
observers.remove(observer);
}
@Override
public void notifyObservers(String message) {
for (Observer observer : observers) {
observer.update(message);
}
}
}
public class ConcreteObserverOne implements Observer {
@Override
public void update(String message) {
// 消息推送
}
}
public class ConcreteObserverTwo implements Observer {
@Override
public void update(String message) {
// 消息推送
}
}
public class Demo {
public static void main(String[] args) {
Observable subject = new ConcreteObservable();
subject.registerObserver(new ConcreteObserverOne());
subject.registerObserver(new ConcreteObserverTwo());
subject.notifyObservers("aaa");
}
}
其实上面的代码算是观察者模式的“模板代码”,具体设计思路,? 我们可以根据自己的业务场景重新设计代码, 现在就可以改造文章一开始的注册, 发送通知的功能了!
4.改造用户注册代码
4.1 首先是被观察者, 无例外, 我们需要接口和实现类(图片依次列出), 以及具体的业务场景消息发送器
package com.demo09.observable;
import java.util.Collection;
/**
* @author canxiusi.yan
* @description Observable 被观察者父接口
* @date 2022/2/15 10:32
*/
public interface Observable<E> {
/**
* 添加观察者
* @param e
*/
void addObserver(E e);
/**
* 添加全部观察者
*/
void addObserverAll(Collection<? extends E> collection);
/**
* 移除观察者
* @param e
*/
void deleteObserver(E e);
/**
* 通知观察者
*/
void notifyObservers();
/**
* 通知观察者
* @param arg
*/
void notifyObservers(Object arg);
/**
* 移除所有观察者
*/
void deleteObservers();
/**
* 观察者数量
* @return
*/
int countObservers();
}
package com.demo09.observable;
import com.demo09.observer.Observer;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author canxiusi.yan
* @description MsgObservable 被观察者抽象实现
* @date 2022/2/15 11:29
*/
public abstract class MsgObservable<E> implements Observable<E> {
/**
* 观察者集合
*/
private final List<E> observerList = new ArrayList<>();
@Autowired
ThreadPoolExecutor threadPoolExecutor;
@Override
public void addObserver(E e) {
observerList.add(e);
}
@Override
public void addObserverAll(Collection<? extends E> collection) {
observerList.addAll(collection);
}
@Override
public void deleteAllObserver(Collection<? extends E> collection) {
observerList.removeAll(collection);
}
@Override
public void notifyObservers() {
notifyObservers(null);
}
@Override
public void notifyObservers(Object arg) {
for (E e : observerList) {
if (e instanceof Observer) {
// 考虑到该场景发送消息没有固定的顺序, 可以使用开启多线程实现进程间的异步
threadPoolExecutor.execute(() -> {
Observer observer = (Observer) e;
observer.send(arg);
});
}
}
}
@Override
public int countObservers() {
return observerList.size();
}
@Override
public void removeObserver(Observer observer){
observerList.remove(observer);
}
}
package com.demo09.observable.impl;
import com.demo09.observable.MsgObservable;
import com.demo09.observer.EmailSendObserver;
import com.demo09.observer.MsgSendObserver;
import com.demo09.observer.Observer;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* @author canxiusi.yan
* @description MsgSender 注册消息发送器
* @date 2022/2/15 13:29
*/
@Component
public class RegisterSender extends MsgObservable {
@Autowired
EmailSendObserver emailSendObserver;
@Autowired
MsgSendObserver msgSendObserver;
/**
* 初始化注册观察者集合
*/
@PostConstruct
@SuppressWarnings("unchecked")
public void initRegisterSender() {
List<Observer> registerSender = Lists.newArrayList(emailSendObserver, msgSendObserver);
super.addObserverAll(registerSender);
}
@Override
public void notifyObservers(Object arg) {
super.notifyObservers(arg);
}
}
package com.demo09.observable.impl;
import com.demo09.observable.MsgObservable;
import com.demo09.observer.*;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* @author canxiusi.yan
* @description OtherSender 根据业务可以定义别的发送器, 这里模拟微信小程序和公共号推送消息
* @date 2022/2/16 10:22
*/
@Component
public class WxSender extends MsgObservable {
@Autowired
WxMiniProgramSendObserver wxMiniProgramSendObserver;
@Autowired
WxPublicAccountSendObserver wxPublicAccountSendObserver;
@PostConstruct
@SuppressWarnings("unchecked")
public void initWxSender() {
List<Observer> registerSender = Lists.newArrayList(wxPublicAccountSendObserver, wxMiniProgramSendObserver);
super.addObserverAll(registerSender);
}
@Override
public void notifyObservers(Object arg) {
super.notifyObservers(arg);
}
}
先说明一下引入发送器的目的, 第一是初始化注册观察者集合, 第二是推送消息, 也是为了开闭原则和解耦合, 比如设计初期只需要注册业务发送消息, 可能后续的增加了用户注销账户也要发送消息, 这样就可以定义新的DestorySender, 去继承MsgObservable类即可
4.2 接下来是观察者接口, 以及具体的观察者实现类
package com.demo09.observer;
/**
* @author canxiusi.yan
* @description MsgObserver
* @date 2022/2/15 10:08
*/
public interface Observer {
/**
* 发送消息
* @param arg
*/
void send(Object arg);
}
package com.demo09.observer;
import com.demo09.service.MsgSendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author canxiusi.yan
* @description MsgSendObserver 短信发送观察者
* @date 2022/2/15 10:30
*/
@Component
public class MsgSendObserver implements Observer {
@Autowired
MsgSendService msgSendService;
@Autowired
ThreadPoolExecutor threadPoolExecutor;
@Override
public void send(Object arg) {
threadPoolExecutor.execute(() -> msgSendService.sendMsg(arg.toString()));
}
}
package com.demo09.observer;
import com.demo09.service.EmailSendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author canxiusi.yan
* @description EmailSendObserver 邮件观察者
* @date 2022/2/15 10:26
*/
@Component
public class EmailSendObserver implements Observer {
@Autowired
ThreadPoolExecutor threadPoolExecutor;
@Autowired
private EmailSendService emailSendService;
@Override
public void send(Object arg) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
// 调用业务service 发送邮件
emailSendService.sendEmail(arg.toString());
}
});
}
}
4.3 控制层注入依赖, 并调用
package com.demo09.controller;
import com.demo09.info.RegisterInfo;
import com.demo09.observable.impl.RegisterSender;
import com.demo09.observable.impl.WxSender;
import com.demo09.service.UserRegisterService;
import com.result.BaseRspVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author canxiusi.yan
* @description Controller
* @date 2022/2/15 14:10
*/
@RestController
@RequestMapping("/user")
public class RegisterController {
@Autowired
private RegisterSender msgSender;
@Autowired
private UserRegisterService userRegisterService;
@Autowired
private WxSender wxSender;
@PostMapping("/register")
public BaseRspVo doRegister(@RequestBody RegisterInfo registerInfo) {
String userId = userRegisterService.doRegister(registerInfo);
// 注册成功发送短信邮件...
msgSender.notifyObservers(userId);
return BaseRspVo.ok();
}
@PostMapping("/wxSend")
public BaseRspVo wxSend() {
// 发送之前的业务逻辑
// 发送微信消息
wxSender.notifyObservers("你有新消息");
return BaseRspVo.ok();
}
}
4.4 运行效果
?由于只是模拟调用, 所以异步效果不是很直观
再来测试一下微信消息推送
至此, 使用观察者模式改造了原有的注册-发消息代码, 并且使用线程池实现了进程内的非阻塞, 后续拓展业务只需要增加新的Sender去继承MsgObservable即可, 便于拓展, 且高内聚低耦合, 如果想实现一个跨进程的, 可以使用消息队列MQ
|