个人分析理解,项目到底需不需要用到MQ技术
订单项 -> 订单规格-> 履行集 -> 履行流程-> OPU
上面是公司根据订单业务抽象成的订单框架流程结构。
这里的OPU是最小执行单元,它可以是同步也可以是异步。
由于该项目没有秒杀方面的需求,MQ就是用来跨系统的异步通信。
这里的异步用到过两种技术,一开始是多线程(线程池) + 定时任务(Quartz),后来改为了 RabbitMQ。
由于涉及保密条例,这里用伪代码来表示下不用MQ的情况下,是如何实现与外围系统的异步通信:
有几点要先注意下。
- 使用线程池需要注意线程池的数量设置,套用公式 U * U *(1 + w/c):CPU内核数 * 期待CPU利用率 * (1+ 单条线程等待时间/单条线程计算时间)
- 定时任务的循环周期需要确定好,曾经项目中设置为1分钟,后来根据每天的订单数量和订单里异步类型的通信数量,改周期为5分钟
- 消息发送失败的重试机制要想好,项目中一开始为了配套这块业务,主要是准备了两个重要的表:(1)订单号、存储报文、接口信息和重试次数等字段的quartz_info_request表 (2) 记录所有内调外或者外调内接口的日志信息表quartz_info_log ,里面含有订单号、接口信息等字段
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor{
int corePoolSize = 50;
int maximumPoolSize = 100;
long keepAliveTime =2;
TimeUnit unit = TimeUnit.SECONDS;
BlokckingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(40);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
new ThreadPoolExecutor.AbortPolicy());
);
return threadPoolExecutor;
}
}
public class SendService{
@Autowired
ThreadPoolExecutor threadPoolExecutor;
public void asyncSend(){
threadPoolExecutor.submit(()->{
})
}
}
public class Sendjob{
public void retry(){
}
}
这样线程池 + 定时任务,也可以做到异步通信的结果,至于订单的对账和修复,全在于设计消息发送失败机制的多方面考虑。
明显不那么容易把控
接下来再看看用了RabbitMQ后,带来的影响。
首先是代码:
@Configuration
public class RabbitConfig {
@Bean
public Queue DirectQueue(){
return new Queue("DirectQueue",true);
}
@Bean
public DirectExchange MyDirectExchange(){
return new DirectExchange("MyDirectExchange",true,false);
}
@Bean
public Binding bindingDirect(){
return BindingBuilder.bind(DirectQueue()).to(MyDirectExchange()).with("MyDirectRouting");
}
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"correlationData:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+correlationData);
System.out.println("ConfirmCallback: "+"cause:"+correlationData);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){
System.out.println("ReturnCallback: "+"消息:"+message);
System.out.println("ReturnCallback: "+"回应码:"+replyCode);
System.out.println("ReturnCallback: "+"回应信息:"+replyText);
System.out.println("ReturnCallback: "+"交换机:"+exchange);
System.out.println("ReturnCallback: "+"路由键:"+routingKey);
}
});
return rabbitTemplate;
}
}
接下来是消费者手动确认的消息确认机制代码
@Configuration
public class RabbitConfig2 {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private AckReceiver ackReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames("DirectQueue");
container.setMessageListener(ackReceiver);
return container;
}
}
消费者代码
@Component
public class AckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = message.toString();
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
生产者代码
public class SendService{
@Autowired
RabbitTemplate rabbitTemplate;
public String asyncSend(){
Map<String,Object> map=new HashMap<>();
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
总结一下:
使用MQ后,明显看到对于消息确认机制更加完善了,相当于用MQ代替了基于数据库的定时任务,效率更高了不说,对于开发人员,一旦出现消息异常或丢失,更容易排查。
只看分布式环境下的异步通信功能,MQ的引入还是很有必要的。
个人基于公司项目的理解,如有更好的看法,欢迎沟通~
|