IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMq消息消费通用模板 -> 正文阅读

[大数据]RabbitMq消息消费通用模板

通过配置通用消费代码模板、扫描指定的方法注解 、反射,与具体的service进行剥离减少代码变更的次数,提供简单的配置,即可完成消息的消费

具体的注解定义?

package com.xes.customer.annotation;

import com.xes.enums.BusinessTypeEnum;

import java.lang.annotation.*;

/**
 * @author lzl
 * @date: 2021-07-29 14:53
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqBusinessProcessing {

    /**
     * 实体对象处理的业务类型
     * 不指定默认值
     * 指定默认值
     * @return
     */
    BusinessTypeEnum businessType();

}

枚举对象定义?

package com.xes.enums;

import org.apache.commons.lang3.StringUtils;

/**
 * @author lzl
 * @date: 2021-04-06 15:17
 */
public enum BusinessTypeEnum {

    TEST_MQ("TEST_MQ","测试mq通道畅通"),
    TEST_MQ_NEW("TEST_MQ_NEW","测试mq通道畅通"),

    ;

    private String type;

    private String desc;

    BusinessTypeEnum(String type, String desc) {
        this.type = type;
        this.desc = desc;
    }

    public String getType() {
        return type;
    }

    public String getDesc() {
        return desc;
    }

    public static BusinessTypeEnum getByType(String type){
        if(StringUtils.isEmpty(type)){
            return null;
        }
        for(BusinessTypeEnum typeEnum : BusinessTypeEnum.values()){
            if(typeEnum.getType().equals(type)){
                return typeEnum;
            }
        }
        return null;
    }
}

1、监听器配置?

package com.xes.consumer;

import com.rabbitmq.client.Channel;
import com.xes.config.mq.MqConstantConfig;
import com.xes.web.service.inter.ConsumerServiceAbstractTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * MQ消息消费监控者
 * @author lzl
 * @date: 2021-04-06 13:28
 */
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MessageConsumer {

    /**
     * 处理流程模板对象
     */
    private final ConsumerServiceAbstractTemplate consumerServiceAbstractTemplate;

    @RabbitHandler
    @RabbitListener(queues = {MqConstantConfig.OTHER})
    public void otherInfoMessageConsumer(Channel channel, Message message){
        consumerServiceAbstractTemplate.consumer(channel,message);
    }

    @RabbitHandler
    @RabbitListener(queues = {MqConstantConfig.BURYING_POINT})
    public void buryingPointMessageConsumer(Channel channel, Message message){
        consumerServiceAbstractTemplate.consumer(channel,message);
    }

}

2、模板对象配置?

package com.xes.web.service.inter;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xes.common.ConstantStr;
import com.xes.web.entity.MqMessageRecord;
import com.xes.web.entity.mq.MqMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;

import java.util.List;

/**
 * 定义消费的处理流程模板
 * @author lzl
 * @date: 2021-04-19 10:54
 */
@Slf4j
public abstract class ConsumerServiceAbstractTemplate {

    /**
     * 公共的消费消费
     * 正常情况处理流程
     * consumer() -> handleMessage()-> doConsumer()
     * 异常情况处理流程
     * consumer() -> handleMessage()-> doConsumer()-> handleException->()
     * @param channel
     * @param message
     */
    public void consumer(Channel channel, Message message){
        handleMessage(channel,message);
    }

    /**
     * 消息消费的处理流程
     * @param message
     * @param channel
     * @return
     */
    private void handleMessage (Channel channel,Message message){
        MessageProperties properties = message.getMessageProperties();
        long deliveryTagLong = properties.getDeliveryTag();
        String encoding = properties.getContentEncoding();
        if(StringUtils.isEmpty(encoding)){
            encoding = ConstantStr.UTF_EIGHT;
        }
        MqMessage mqMessage = null;
        String jsonStr = "";
        try {
            byte [] bodyByte = message.getBody();
            if(null == bodyByte || bodyByte.length == 0){
                channel.basicAck(deliveryTagLong,false);
                log.error("消息实体异常。message->{}",message);
                return;
            }
            jsonStr = new String(bodyByte, encoding);
            mqMessage = JSONObject.parseObject(jsonStr,MqMessage.class);
            log.info("mqMessage->{}",mqMessage);
            doConsumer(mqMessage);
            channel.basicAck(deliveryTagLong,false);
        }catch (Exception e){
            log.error("ConsumerServiceAbstractTemplate.jsonStr-{},mqMessage->{}",jsonStr,mqMessage);
            handleException(channel,deliveryTagLong,message,mqMessage,e);
        }
    }

    /**
     * 实际处理逻辑
     * 子类实现
     * @param mqMessage
     * @throws Exception
     */
    protected abstract void doConsumer(MqMessage mqMessage) throws Exception;

    /**
     * 异常处理模块
     * 统一实现 子类可以进行方法重写
     * @param channel
     * @param deliveryTagLong
     * @param mqMessage
     * @param e
     * @param message
     */
    protected abstract void handleException(Channel channel,Long deliveryTagLong,
                                            Message message,MqMessage mqMessage,Exception e);

    /**
     * DB消息处理
     * @param recordList
     */
    public void dbConsumer(List<MqMessageRecord> recordList){
        doDbConsumer(recordList);
    }

    /**
     * 实际处理DB数据
     * @param recordList
     */
    protected abstract void doDbConsumer(List<MqMessageRecord> recordList);

}

3、具体的消费消息、反射对象配置

package com.xes.web.service.impl;

import com.rabbitmq.client.Channel;
import com.xes.customer.annotation.MqBusinessProcessing;
import com.xes.enums.BusinessTypeEnum;
import com.xes.web.entity.MethodClass;
import com.xes.web.entity.MqMessageRecord;
import com.xes.web.entity.mq.MqMessage;
import com.xes.web.service.inter.ConsumerServiceAbstractTemplate;
import com.xes.web.service.inter.MqMessageRecordService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import org.springframework.amqp.core.Message;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Method;
import java.util.*;

/**
 * @author lzl
 * @date: 2021-04-19 10:58
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class ConsumerServiceFactory extends ConsumerServiceAbstractTemplate {

    /**
     * 从SpringBean中获取指定对象
     */
    private final ApplicationContext applicationContext;

    private final MqMessageRecordService messageRecordService;

    /**
     * 接口定义以及实现类的路径
     */
    private static final String SERVICE_CLASS_PATH = "com.xes.web.service.impl";

    /**
     * 缓存已经加载的类
     */
    private static Set<Class> SERVICE_OBJ_CLASS = new HashSet<>();

    /**
     * 业务类型-方法-class的缓存
     */
    private static Map<String, MethodClass> BUSINESS_TYPE_METHOD_CLA = new HashMap<>();

    /**
     * 方法调用
     * @param mqMessage
     * @throws Exception
     */
    private void invokeMethod(MqMessage mqMessage) throws Exception{
        BusinessTypeEnum typeEnum = mqMessage.getType();
        String businessType = typeEnum.getType();
        if(BUSINESS_TYPE_METHOD_CLA.containsKey(businessType)){
            MethodClass methodClass = BUSINESS_TYPE_METHOD_CLA.get(businessType);
            Method method = methodClass.getMethod();
            Class cla = methodClass.getCla();
            Object result = method.invoke(applicationContext.getBean(cla),mqMessage);
            log.info("缓存反射调用结果。result->{}",result);
            return;
        }else {
            if(CollectionUtils.isEmpty(SERVICE_OBJ_CLASS)){
                // 扫描路径 加载指定对象 只执行一次
                Reflections reflections = new Reflections(SERVICE_CLASS_PATH);
                Set<Class<?>> setReflections =  reflections.getTypesAnnotatedWith(Service.class);
                SERVICE_OBJ_CLASS.addAll(setReflections);
            }
            for(Class<?> cla : SERVICE_OBJ_CLASS){
                Method[] methods = cla.getDeclaredMethods();
                for(Method method : methods){
                    MqBusinessProcessing declaredAnnotation = method.getDeclaredAnnotation(MqBusinessProcessing.class);
                    if(null != declaredAnnotation){
                        String type = declaredAnnotation.businessType().getType();
                        if(type.equals(typeEnum.getType())){
                            MethodClass methodClass = new MethodClass();
                            methodClass.setMethod(method);
                            methodClass.setCla(cla);
                            // 数据缓存
                            BUSINESS_TYPE_METHOD_CLA.put(type,methodClass);

                            // 方法执行
                            Object bean = applicationContext.getBean(cla);
                            Object result = method.invoke(bean,mqMessage);
                            log.info("首次 反射调用结果。result->{}",result);
                            // 匹配之后 循环结束
                            return;
                        }
                    }
                }
            }
        }
    }

    /**
     * 消费消息
     * @param mqMessage
     */
    @Override
    public void doConsumer(MqMessage mqMessage) throws Exception{
        invokeMethod(mqMessage);
    }

    @Override
    protected void handleException(Channel channel,Long deliveryTagLong,Message message,MqMessage mqMessage,Exception e) {
        try {
            log.error("消息消费异常。message->{},异常信息->",message,e);
            if(mqMessage != null){
                long messageId = mqMessage.getMessageId();
                // 检查未消费消息 在DB中是否存在
                List<MqMessageRecord> list = messageRecordService.listByMessageId(messageId);
                if(CollectionUtils.isEmpty(list)){
                    MqMessageRecord record = new MqMessageRecord();
                    record.setMessageId(messageId);
                    record.setMessageType(mqMessage.getType().getType());
                    record.setMessageBody(record.getMessageBody());
                    record.setCreatedTime(new Date());
                    messageRecordService.insert(record);
                }else {
                    log.info("消息已经存在DB中。message->{}",message);
                }
            }
            // 防止消息队列消息堆积
            channel.basicAck(deliveryTagLong,false);
        }catch (Exception e1){
            log.error("消息消费异常message->{},补偿异常,异常信息-> ",message,e1);
        }
    }

    /**
     * 处理DB数据
     * @param recordList
     */
    @Override
    protected void doDbConsumer(List<MqMessageRecord> recordList){
        List<Integer> idList = new ArrayList<>();
        for(MqMessageRecord record : recordList) {
            handleDbMessage(record);
            idList.add(record.getId());
        }
        // 批量删除数据
        if(!CollectionUtils.isEmpty(idList)){
            messageRecordService.deleteByIds(idList);
        }
    }

    /**
     * 对DB的实际操作
     * @param record
     */
    private void handleDbMessage(MqMessageRecord record){
        BusinessTypeEnum typeEnum = BusinessTypeEnum.getByType(record.getMessageType());
        if(null == typeEnum){
            log.error("定时任务执行数据补偿操作-数据异常。record->{}",record);
            return;
        }
        try {
            switch (typeEnum){
                default:break;
            }
        }catch (Exception e){
            log.error("定时任务执行数据补偿操作-执行异常。record->{},异常信息:",record,e);
        }
    }
}

4、具体service 被扫描类

package com.xes.web.service.impl;

import com.xes.customer.annotation.MqBusinessProcessing;
import com.xes.enums.BusinessTypeEnum;
import com.xes.web.entity.mq.MqMessage;
import com.xes.web.service.inter.TestCustomerInterfaceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * @author lzl
 * @date: 2021-07-29 15:03
 */
@Slf4j
@Service
public class TestCustomerInterfaceServiceImpl implements TestCustomerInterfaceService {

    @Override
    @MqBusinessProcessing(businessType = BusinessTypeEnum.TEST_MQ)
    public Object handler(MqMessage mqMessage) {
        return mqMessage;
    }

    @Override
    @MqBusinessProcessing(businessType = BusinessTypeEnum.TEST_MQ_NEW)
    public Object handlerTest(MqMessage mqMessage) {
        return mqMessage;
    }

}

5、代码中使用到的Reflections

 <dependency>
      <groupId>org.reflections</groupId>
      <artifactId>reflections</artifactId>
      <version>0.9.12</version>
 </dependency>

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-02 10:53:03  更:2021-08-02 10:54:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/22 4:29:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码