1.mc-competence-mq模块分析
初始化延迟队列:
package com.hst.mc.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class RabbitDelayConfiguration {
public static final String IMMEDIATE_QUEUE = "course_immediate_queue";
public static final String IMMEDIATE_EXCHANGE = "exchange.course.immediate";
public static final String IMMEDIATE_ROUTING_KEY = "routingkey.course.immediate";
public static final String DELAY_QUEUE = "course_delay_queue";
public static final String DEAD_LETTER_EXCHANGE = "exchange.course.delay";
public static final String DELAY_ROUTING_KEY = "routingkey.course.delay";
@Bean
public Queue immediateQueue() {
return new Queue(IMMEDIATE_QUEUE, true);
}
@Bean
public Queue delayQueue() {
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", IMMEDIATE_EXCHANGE);
params.put("x-dead-letter-routing-key", IMMEDIATE_ROUTING_KEY);
return new Queue(DELAY_QUEUE, true, false, false, params);
}
@Bean
public DirectExchange immediateExchange() {
return new DirectExchange(IMMEDIATE_EXCHANGE, true, false);
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}
@Bean
public Binding immediateBinding() {
return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(IMMEDIATE_ROUTING_KEY);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(DELAY_ROUTING_KEY);
}
}
初始化队列:
package com.hst.mc.mq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.hst.mc.mq.constant.ChannelConstants;
@Configuration(proxyBeanMethods = false)
public class RabbitQueueConfiguration {
@Bean
public Queue conferenceQueue() {
return new Queue(ChannelConstants.CONFERENCE_QUEUE);
}
@Bean
public Queue terminalQueue() {
return new Queue(ChannelConstants.TERMINAL_QUEUE);
}
@Bean
public Queue fspQueue() {
return new Queue(ChannelConstants.FSP_QUEUE);
}
@Bean
public Queue paasQueue() {
return new Queue(ChannelConstants.PAAS_QUEUE);
}
@Bean
public Queue rescQueue() {
return new Queue(ChannelConstants.RESCENTER_QUEUE);
}
@Bean
public Queue competenceQueue() {
return new Queue(ChannelConstants.COMPETENCE_QUEUE);
}
@Bean
public Queue fspCompetenceQueue() {
return new Queue(ChannelConstants.FSP_COMPETENCE_QUEUE);
}
@Bean
public Queue notifyQueue() {
return new Queue(ChannelConstants.NOTIFY_QUEUE);
}
}
接口定义常量:
package com.hst.mc.mq.constant;
public interface ChannelConstants {
String TERMINAL_QUEUE = "terminal_queue";
String RESCENTER_QUEUE = "rescenter_queue";
String PAAS_QUEUE = "paas_queue";
String FSP_QUEUE = "fsp_queue";
String CONFERENCE_QUEUE = "conference_queue";
String COMPETENCE_QUEUE = "competence_queue";
String FSP_COMPETENCE_QUEUE = "fsp_competence_queue";
String NOTIFY_QUEUE = "notify_queue";
}
package com.hst.mc.mqConsumer;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.XmlUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.hst.mc.competence.api.consts.CompetenceCloudCode;
import com.hst.mc.competence.api.exception.CompetenceCloudException;
import com.hst.mc.mqConsumer.Bo.MsgBo;
import com.hst.mc.mqConsumer.Bo.XmlTransBo;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
@Component
@Slf4j
public class QueueMsgRecevierHandler {
private static final String CONFIG_BEAN_XML = "mqmsg-bean-handler.xml";
private static final String EXECUTOR_XML_ELEMENT = "excutor";
private static final String ATTR_MSGID = "msgId";
private static final String ATTR_BEANNAME = "beanName";
private static final String ATTR_METHOD = "method";
private static final String ATTR_PARAMCLASS = "paramClass";
private static HashMap<Integer, XmlTransBo> xmlTransBoHashMap;
static {
xmlTransBoHashMap = geneXmlTransBoHashMap();
}
private static HashMap<Integer, XmlTransBo> geneXmlTransBoHashMap() {
Document document = null;
try {
SAXReader saxReader = new SAXReader();
document = saxReader.read(QueueMsgRecevierHandler.class.getClassLoader()
.getResourceAsStream(CONFIG_BEAN_XML));
List<Element> elements = document.getRootElement().elements(EXECUTOR_XML_ELEMENT);
HashMap<Integer, XmlTransBo> xmlTransBoHashMap = elements.stream().map(element -> {
XmlTransBo bo = new XmlTransBo();
bo.setBeanName(element.attribute(ATTR_BEANNAME).getValue());
bo.setParamClass(element.attribute(ATTR_PARAMCLASS).getValue());
bo.setMethod(element.attribute(ATTR_METHOD).getValue());
bo.setMsgId(Integer.valueOf(element.attribute(ATTR_MSGID).getValue()));
return bo;
}).collect(Collectors.toMap(XmlTransBo::getMsgId, Function.identity(), (n1, n2) -> n1, HashMap::new));
log.info("init {} to hashMap success", CONFIG_BEAN_XML);
return xmlTransBoHashMap;
} catch (Exception ex) {
ex.printStackTrace();
}
return null;
}
@SneakyThrows
public Object handlerMsg(MsgBo bo) {
try {
log.info("处理收到消息bo.msgId:{}", bo.getMsgId());
Integer msgId = bo.getMsgId();
JSONObject dataObject = bo.getData();
String token = bo.getToken();
dataObject.set("token", token);
XmlTransBo xmlTransBo = xmlTransBoHashMap.get(msgId);
Class paramClass = Class.forName(xmlTransBo.getParamClass());
Object reflectBean = SpringUtil.getBean(xmlTransBo.getBeanName());
Object resObj = ReflectUtil.invoke(reflectBean, xmlTransBo.getMethod(),
new Object[]{dataObject.toBean(paramClass)});
log.info("处理收到消息bo.msgId:{}, 反射返回的消息体:{}", msgId, JSONUtil.toJsonStr(resObj));
return resObj;
} catch (Exception ex) {
log.error("反射处理消息异常", ex);
throw new CompetenceCloudException(CompetenceCloudCode.HANDLE_MSG_ERROR);
}
}
}
指定执行器从具体队列上面获取对应的msg。
<?xml version="1.0" encoding="UTF-8"?>
<excutor-workers>
<excutor-worker excutor="cloudListExcutor" worker="competence_queue" msgId="1004"/>
<excutor-worker excutor="cloudOrgExcutor" worker="competence_queue" msgId="1006"/>
<excutor-worker excutor="cloudDevListExcutor" worker="competence_queue" msgId="1008"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1010"/>
<excutor-worker excutor="terRegisterExcutor" worker="competence_queue" msgId="1102"/>
<excutor-worker excutor="terLogoutExcutor" worker="competence_queue" msgId="1104"/>
<excutor-worker excutor="terStatusReportExcutor" worker="competence_queue" msgId="1106"/>
<excutor-worker excutor="queryDevMsgExcutor" worker="competence_queue" msgId="1108"/>
<excutor-worker excutor="terUpdateExcutor" worker="competence_queue" msgId="1110"/>
<excutor-worker excutor="call" worker="competence_queue" msgId="1601"/>
<excutor-worker excutor="call" worker="notify_queue" msgId="1409"/>
<excutor-worker excutor="call" worker="notify_queue" msgId="1211"/>
<excutor-worker excutor="record" worker="competence_queue" msgId="1402"/>
<excutor-worker excutor="record" worker="competence_queue" msgId="1404"/>
<excutor-worker excutor="record" worker="competence_queue" msgId="1408"/>
<excutor-worker excutor="record" worker="competence_queue" msgId="1406"/>
<excutor-worker excutor="demand" worker="competence_queue" msgId="1702"/>
<excutor-worker excutor="demand" worker="competence_queue" msgId="1704"/>
<excutor-worker excutor="demand" worker="competence_queue" msgId="1706"/>
<excutor-worker excutor="demand" worker="competence_queue" msgId="1708"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1302"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1304"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1306"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1308"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1310"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1312"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1314"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1316"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1318"/>
<excutor-worker excutor="live" worker="competence_queue" msgId="1324"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2001"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2003"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2103"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2105"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2107"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2201"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2203"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2205"/>
<excutor-worker excutor="assistant" worker="terminal_queue" msgId="2207"/>
<excutor-worker excutor="syncAddRoom" worker="conference_queue" msgId="1202"/>
<excutor-worker excutor="grantRoomTerminals" worker="conference_queue" msgId="1204"/>
<excutor-worker excutor="grantTerminalRooms" worker="conference_queue" msgId="1206"/>
<excutor-worker excutor="syncDeleteRoom" worker="conference_queue" msgId="1208"/>
<excutor-worker excutor="findRoomInfo" worker="conference_queue" msgId="1210"/>
<excutor-worker excutor="updateRoomInfo" worker="conference_queue" msgId="1214"/>
<excutor-worker excutor="closeRoom" worker="conference_queue" msgId="1216"/>
</excutor-workers>
2.mc-conference模块分析
package com.hst.mc.mcu.api.enums;
import java.util.stream.Stream;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum DataTypeEnum {
DEPT(0, "dept", "机构", "mcu_dept"),
USER(1, "user", "用户", "mcu_user"),
ROOM(2, "room", "会议室", "mcu_room"),
MEETING(3, "meeting", "会议", "mcu_meeting");
private Integer index;
private String code;
private String name;
private String tableName;
public static DataTypeEnum getByIndex(Integer index) {
return Stream.of(DataTypeEnum.values()).filter(x -> x.getIndex() == index).findAny().orElse(null);
}
}
package com.hst.mc.mcu.job;
import java.time.LocalDateTime;
import java.util.List;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.Objects;
import com.hst.mc.mcu.McuLiveConfig;
import com.hst.mc.mcu.api.entity.McuLive;
import com.hst.mc.mcu.api.entity.McuRoom;
import com.hst.mc.mcu.api.enums.RoomTypeEnum;
import com.hst.mc.mcu.mapper.McuRoomMapper;
import com.hst.mc.mcu.service.McuLiveService;
import com.hst.mc.mcu.service.McuRoomService;
import com.pig4cloud.pig.admin.api.entity.AbilityCloudConfig;
import com.pig4cloud.pig.common.core.constant.CacheConstants;
import com.pig4cloud.pig.common.core.constant.enums.LiveTypeEnum;
import com.pig4cloud.pig.common.core.util.RedisUtils;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
@Component
public class McuRoomLiveJob {
@Resource
private McuRoomService mcuRoomService;
@Resource
private McuRoomMapper mcuRoomMapper;
@Resource
private McuLiveService mcuLiveService;
@Resource
private McuLiveConfig mcuLiveConfig;
@Resource
private RedisUtils redisUtils;
@XxlJob("initRoomLive")
public void initRoomLive() {
XxlJobHelper.log("init room live start");
AbilityCloudConfig config = (AbilityCloudConfig) redisUtils.hget(CacheConstants.ABILITY_CLOUD,
CacheConstants.LIVE_CONFIG);
if (Objects.isNull(config)) {
XxlJobHelper.log("no live config !!!");
return;
}
XxlJobHelper.log("current live config is {}", JSONUtil.toJsonStr(config));
if (StrUtil.equals(config.getVenderId(), LiveTypeEnum.UNSUPPORTED.getRemark())) {
XxlJobHelper.log("live Type is none");
} else if (StrUtil.equals(config.getVenderId(), LiveTypeEnum.PUB.getRemark())) {
if (StrUtil.isBlank(config.getVenderSecret())) {
XxlJobHelper.log("open live type do not configuration access token, please config first");
return;
}
initLive(LiveTypeEnum.PUB);
} else if (StrUtil.equals(config.getVenderId(), LiveTypeEnum.PRI.getRemark())) {
initLive(LiveTypeEnum.PRI);
} else if (StrUtil.equals(config.getVenderId(), LiveTypeEnum.PUB_PRI.getRemark())) {
initLive(LiveTypeEnum.PRI);
if (StrUtil.isBlank(config.getVenderSecret())) {
XxlJobHelper.log("open live type do not configuration access token, please config first");
return;
}
initLive(LiveTypeEnum.PUB);
}
XxlJobHelper.log("init room live end");
}
private void initLive(LiveTypeEnum type) {
XxlJobHelper.log("live type is {}", type.getRemark());
Integer count = mcuRoomService.findNoLiveRoomByLiveTypeCount(type.getRemark());
XxlJobHelper.log("to int live num is {}", count);
for (int idx = 0; idx <= Math.ceil(count / 1000); idx++) {
List<McuRoom> roomList = mcuRoomService.findNoLiveRoomByLiveType(type.getRemark());
if (CollectionUtils.isEmpty(roomList)) {
break;
} else if (CollectionUtils.isNotEmpty(roomList)) {
XxlJobHelper.log("batch index is {}", idx);
roomList.parallelStream().forEach(mcuRoom -> {
if (!(type.getRemark().equals(LiveTypeEnum.PUB.getRemark())
&& mcuRoom.getRoomType().equals(RoomTypeEnum.USER.getValue()))) {
try {
addRoomLive(mcuRoom, type.getRemark());
} catch (Exception ex) {
XxlJobHelper.log("room{} add {} live fail: {}", mcuRoom.getMcuRoomId(), type.getRemark(),
ex.getMessage());
}
mcuRoom.setUpdateTime(LocalDateTime.now());
mcuRoomMapper.updateById(mcuRoom);
}
});
}
}
}
private void addRoomLive(McuRoom mcuRoom, String liveType) {
McuLive mcuLiveExists = mcuLiveService.findByRoomId(mcuRoom.getMcuRoomId().intValue(), liveType);
if (Objects.nonNull(mcuLiveExists)) {
mcuLiveExists.setRoomName(StrUtil.isNotBlank(mcuLiveExists.getRoomName())
? mcuLiveExists.getRoomName() : mcuRoom.getRoomName());
mcuLiveService.saveOrUpdate(mcuLiveExists);
return;
}
McuLive mcuLive = new McuLive();
mcuLive.setConfRoomId(mcuRoom.getMcuRoomId().intValue());
mcuLive.setLiveName(mcuRoom.getRoomName());
mcuLive.setRoomName(mcuRoom.getRoomName());
mcuLive.setAnchorName(mcuRoom.getRoomName());
mcuLive.setLiveType(liveType);
mcuLiveService.addLive(mcuLive);
}
}
2.响应实体类
package com.pig4cloud.pig.common.core.util;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.pig4cloud.pig.common.core.constant.CommonConstants;
import com.pig4cloud.pig.common.core.exception.McRuntimeException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.Objects;
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class R<T> implements Serializable {
private static final long serialVersionUID = 1L;
@Getter
@Setter
private int code;
@Getter
@Setter
private String msg;
@Getter
@Setter
private T data;
public static <T> R<T> ok() {
return restResult(null, CommonConstants.SUCCESS, CommonConstants.SUCCESS_MSG);
}
public static <T> R<T> ok(T data) {
return restResult(data, CommonConstants.SUCCESS, CommonConstants.SUCCESS_MSG);
}
public static <T> R<T> ok(T data, String msg) {
return restResult(data, CommonConstants.SUCCESS, msg);
}
public static <T> R<T> failed() {
return restResult(null, CommonConstants.FAIL, CommonConstants.FAIL_MSG);
}
public static <T> R<T> failed(String msg) {
return restResult(null, CommonConstants.FAIL, msg);
}
public static <T> R<T> failed(T data) {
return restResult(data, CommonConstants.FAIL, CommonConstants.FAIL_MSG);
}
public static <T> R<T> failed(T data, String msg) {
return restResult(data, CommonConstants.FAIL, msg);
}
public static <T> R<T> failedWithException(McRuntimeException mcException) {
return restResult(null, mcException.getErrCode(), mcException.getErrMsg());
}
public static <T> R<T> failedWithException(Exception exception) {
return restResult(null, CommonConstants.FAIL, exception.getMessage());
}
private static <T> R<T> restResult(T data, int code, String msg) {
R<T> apiResult = new R<>();
apiResult.setCode(code);
apiResult.setData(data);
apiResult.setMsg(msg);
return apiResult;
}
@JsonIgnore
public boolean isSuccess() {
return Objects.equals(CommonConstants.SUCCESS, this.getCode());
}
}
5.异步调用
package com.hst.mc.terminal.async;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class TerminalAsync {
@Async
public void changeTerminalState(String terminalId, int status) {
}
}
异步调用
|