一、业务要求
将http请求参数和返回值部分进行封装,发送给MQ,由于该项目是一个社区项目,需要将用户点赞、评论等信息发送给消费者,使用接口较多,所以采用AOP注解方式进行生产者的消息发送。
二、AOP几个概念:
(1)Target:目标类,要被代理的类,例如,UserService;
(2)JoinPoint(连接点):所谓的连接点,是指那些被拦截到的方法;
(3)PointCut(切入点):被增强的连接点(所谓的增强其实就是添加的新功能);
(4)Advice(通知、增强),增强代码;
(5)Weaving(织入):是指把增强的advice应用到目标对象target来创建新的代理对象proxy的 过程。
(6)proxy:代理类;
(7)Aspect(切面):是切入点pointcut和通知advice的结合。
(8)前置通知(@Before):在我们执行目标方法之前运行;
(9)后置通知(@After):在我们执行目标方法结束之后,不管有没有异常;
(10)返回通知(@AfterReturning):在我们的目标方法正常返回值后运行;
(11)异常通知(AfterThrowing):在我们的目标方法出现异常后运行;
(12)环绕通知(@Around):动态代理,需要手动执行jionPoint.process(),其实就是执行我们的目标方法执行之前,相当于前置通知,执行之后就相当于我们的后置通知。
二、自定义注解@ActivityMqProducer
任何带有该注解的方法都会将发送mq。
package com.community.annotations;
import java.lang.annotation.*;
/**
* @description: 带有该注解的方法将发送活动mq
* @author kanlina
* @date 2022/9/1 下午2:10
* @version 1.0
*/
@Target({ElementType.PARAMETER, ElementType.METHOD}) //注解作用域
@Retention(RetentionPolicy.RUNTIME)//注解运行时期
@Documented
public @interface ActivityMqProducer {
}
三、AOP设置
package com.community.aop;
import com.alibaba.fastjson.JSONObject;
import com.community.mq.producer.activityPush.ActivityPushLocal;
import com.marketingCommunityApi.dto.response.CommonModel;
import com.ruubypay.miss.common.response.CommonResponse;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author kanlina
* @version 1.0
* @description: 活动mq切入点
* @date 2022/9/1 下午2:15
*/
@Aspect
@Component
@Slf4j
public class ActivityMqProducerAspect {
@Resource
ActivityPushLocal activityPushLocal;
@Pointcut(value = "@annotation(com.community.annotations.ActivityMqProducer)")
public void serviceAspect() {
}
/**
* 正常返回通知,拦截service层记录用户正常的日志
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@AfterReturning(returning = "returnValue", pointcut = "serviceAspect()")
public void doAfter(JoinPoint joinPoint, Object returnValue) throws Exception {
log.info("aop处理activityMq开始");
String name = joinPoint.getSignature().getName();
System.out.println(name);
//发送mq
//获取返回值
CommonResponse response = (CommonResponse) returnValue;
CommonModel commonModel = JSONObject.parseObject(JSONObject.parseObject(response.getResData().toString()).getString("CommonModel"), CommonModel.class);
activityPushLocal.pushMessageLocal(commonModel);
}
}
四、MQ配置
- 配置消费者和topic 阿里云 消息队列RocketMQ版 创建topic和Group
- 生产者配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--社区活动操作推送-->
<bean id="activityPushProducerRef" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
<!-- Spring接入方式支持Java SDK支持的所有配置项 -->
<property name="properties" >
<map>
<!-- 消息生产者通道,给生产者起一个名字 ,此类配置可以放在云上-->
<entry key="ProducerId" value="#{messagePushGroup['activityPushProducerId']}"/>
<!-- 向阿里云验明你身份的两个密钥,生产者和消费者必须保持一致,此类配置建议放在本地配置文件(一旦上线,不会修改)-->
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
</bean>
<bean id="activityPushProducer" class="com.community.mq.producer.PushCenterProducer">
<property name="topic" value="#{messagePushGroup['activityPushTopicId']}"/>
<property name="producer" ref="activityPushProducerRef"/>
</bean>
</beans>
- ?生产者配置
package com.community.mq.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
/**
* @description: 消息推送生产者配置
* @author kanlina
* @date 2022/8/30 上午10:51
* @version 1.0
*/
public class PushCenterProducer {
private Logger logger = LoggerFactory.getLogger(PushCenterProducer.class);
/**
* 消息通道:tpoic,生产者和消费者必须保持相同的topic切网络环境能连通才行
*/
private String topic;
private Producer producer;
public void setTopic(String topic) {
this.topic = topic;
}
public void setProducer(Producer producer) {
this.producer = producer;
}
/**
* 消息队列短信队列发送示例
* @param message 消息内容
* @param tag tag:给消息打的tag,消费者可以消费指定tag的消息,如果消费者配置的 tag接收策略为* ,表示该通道
* 下的所有tab的消息都要被消费。tag一般用于消息过滤时使用。
* @param key 给消息设置的一个key,尽量做到全局唯一。
* @return
*/
public void pushMessage(String message, String tag, String key) throws UnsupportedEncodingException {
Message msg = new Message( //
// Message所属的Topic
topic,
// Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
tag,
// Message Body 可以是任何二进制形式的数据, MQ不做任何干预
// 需要Producer与Consumer协商好一致的序列化和反序列化方式.
//短信中心消息通道接收json格式的字符串。(详情查看README.md)严格使用"UTF-8编码"
message.getBytes("UTF-8"));
// 设置代表消息的业务关键属性,请尽可能全局唯一
// 1 -uuid 2 -uuid+时间戳 3 -推特的snowflake算法(保证唯一)
// 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
//此id可以作为业务消费方去重的依据。
msg.setKey(key);
// 发送消息,只要不抛异常就是成功 (默认的api就是同步发送消息,只要不抛异常,同步发送消息就是成功)
try {
SendResult sendResult = producer.send(msg);
logger.info("消息管理 推送消息成功 返回参数:{}", sendResult);
} catch (Exception e) {
logger.error("消息管理 推送消息异常:", e);
}
}
}
-
生产者信息发送 package com.community.mq.producer.activityPush;
import com.alibaba.fastjson.JSON;
import com.community.mq.producer.PushCenterProducer;
import com.marketingCommunityApi.dto.response.CommonModel;
import com.ruubypay.miss.cmsg.constants.PushType;
import com.ruubypay.miss.cmsg.dto.MqMsgDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;
/**
* @author kanlina
* @version 1.0
* @description: 社区活动推送
* @date 2022/8/30 上午10:18
*/
@Slf4j
@Component
public class ActivityPushLocal {
@Resource(name = "activityPushProducer")
private PushCenterProducer activityPushProducer;
/**
* 发送mq
*
* @param smsString
*/
@Async
public void smsMessage(String smsString) {
try {
activityPushProducer.pushMessage(
smsString,
"activityPush",
UUID.randomUUID().toString().replaceAll("-", ""));
} catch (Exception e) {
log.error("调用活动推送异常", e);
}
}
/**
* 消息中心推送消息,只展示在消息页
*/
@Async
public void pushMessageLocal(CommonModel model) {
log.debug("社区消息页,推送消息,推送内容:{}", model);
smsMessage(JSON.toJSONString(model));
}
}
-
消费者消费 <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="msgListener" class="com.ruubypay.marketing.mq.message.consumer.MqMessageListener"></bean> <!--Listener 配置-->
<!--行程记录消息监听器-->
<bean id="transRecordListener" class="com.ruubypay.marketing.mq.message.consumer.MqTransRecordListener"/>
<bean id="matchedListener" class="com.ruubypay.marketing.mq.message.consumer.MqMatchedListener"/>
<bean id="matched4CMBListener" class="com.ruubypay.marketing.mq.message.consumer.MqMatched4CMBListener"/>
<bean id="shopOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqShopOrderListener"/>
<bean id="digitalShopOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqDigitalShopOrderListener"/>
<bean id="anniversaryListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryListener"/>
<bean id="anniversaryOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryOrderListener"/>
<bean id="anniversaryBusListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryBusListener"/>
<bean id="cmbEquityListener" class="com.ruubypay.marketing.mq.message.consumer.MqCmbEquityListener"/>
<bean id="groundPushListener" class="com.ruubypay.marketing.mq.message.consumer.MqGroundPushListener"/>
<bean id="iCBCActivityListener" class="com.ruubypay.marketing.mq.message.consumer.MqICBCActivityListener"/>
<bean id="couponUsingRecordListener" class="com.ruubypay.marketing.mq.message.consumer.MqCouponUsingRecordListener"/>
<bean id="payFissionListener" class="com.ruubypay.marketing.mq.message.consumer.MqPayFissionListener"/>
<bean id="payChannelOperateListener" class="com.ruubypay.marketing.mq.message.consumer.MqPayChannelListener"/>
<bean id="communityActivityPushListener" class="com.ruubypay.marketing.mq.message.consumer.MqCommunityActivityPushListener"/>
<!-- 多 CID 订阅同一个 Topic,可以创建多个 ConsumerBean-->
<bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" > <!--消费者配置信息-->
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['mq_groupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
<entry key="ConsumeThreadNums" value="50"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="msgListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['consumerTopic']}"/>
<property name="expression" value="*"/>
<!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成*。 *仅代表订阅所有 Tag,不支持通配-->
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="transRecordConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['transRecordGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="transRecordListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['transRecordTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="shopOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['shopOrderGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="shopOrderListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="digitalShopOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['digitalShopOrderGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="digitalShopOrderListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="matchedTripConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['matchedTripGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="matchedListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="matchedTrip4CMBConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['matchedTrip4CMBGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="matched4CMBListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="anniversaryConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['anniversary2GroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="anniversaryListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="anniversaryOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['anniversary2OrderGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="anniversaryOrderListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="anniversaryBusConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['anniversary2BusGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="anniversaryBusListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['transRecordTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="cmbEquityConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['cmbEquityGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="cmbEquityListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['cmbEquityTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="groundPushConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['groundPushGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="groundPushListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['groundPushTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<!-- 工行信用卡活动-监听办卡-->
<bean id="iCBCActivityConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['iCBCActivityGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="iCBCActivityListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['iCBCActivityTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<!-- 工行信用卡活动-监听使用卡券-->
<bean id="couponUsingRecordConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['couponUsingRecordGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="couponUsingRecordListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['couponUsingRecordTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<!-- pay裂变活动-->
<bean id="payFissionConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['payFissionGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="payFissionListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['groundPushTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="payChannelOperateConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['payChannelOperateGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="payChannelOperateListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['payChannelOperateTopic']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
<bean id="communityActivityPushConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" >
<map>
<entry key="GROUP_ID" value="#{messagePushGroup['communityActivityPushGroupId']}"/>
<entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
<entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
</map>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="communityActivityPushListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="#{messagePushGroup['communityActivityPushTopicId']}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
</map>
</property>
</bean>
</beans>
-
消费者业务监听
package com.ruubypay.marketing.mq.message.consumer;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.marketingCommunityApi.dto.response.CommonModel;
import com.ruubypay.log.annotation.LogMarker;
import com.ruubypay.marketing.mq.model.PayChannelInfo;
import com.ruubypay.marketing.mq.service.ActivityCommunityPushService;
import com.ruubypay.marketing.mq.service.ActivityPayChannelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @Author KanLina
* @Description 社区操作通知
* @Date 2022/9/01 下午2:50
**/
@Slf4j
public class MqCommunityActivityPushListener implements MessageListener {
@Autowired
private ActivityCommunityPushService activityCommunityPushService;
@LogMarker(businessDescription = "社区活动操作通知")
@Override
public Action consume(Message message, ConsumeContext context) {
log.debug("收到消息,Topic:{},MsgID:{},开始执行[{}]", message.getTopic(), message.getMsgID(), message.getReconsumeTimes());
try {
CommonModel request = JSONObject.parseObject(message.getBody(), CommonModel.class);
activityCommunityPushService.dealRequest(request);
} catch (Exception e) {
log.error("执行异常", e);
return Action.ReconsumeLater;
}
log.debug("收到消息,Topic:{},MsgID:{},处理成功[{}]",message.getTopic(), message.getMsgID(), message.getReconsumeTimes());
return Action.CommitMessage;
}
}
?
|