之前已经通过Nacos进行了Gateway的动态路由处理(SpringCloud Gateway基于Nacos配置中心动态路由),同时抽取了Nacos的操作方法(NacosConfig操作),但是每次新增Gateway项目的时候,都还是需要将ApplicationEventPublisher 重复去写,于是考虑将此部分功能也抽取出来。
common-nacos-listener
pom
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
定义Nacos监听接口
定义两个接口,一个是纯监听时需要的接口,一个是需要初始化并且监听时需要的接口
import java.util.Properties;
import java.util.function.Consumer;
public interface NacosManualListener {
String getDataId();
String getGroup();
default Properties getProperties() {
return null;
}
Consumer<String> refreshConsumer();
default boolean dataLog() {
return true;
}
}
public interface NacosManualInitAndListener extends NacosManualListener {
long DEFAULT_TIMEOUT = 5000L;
default long getTimeout() {
return DEFAULT_TIMEOUT;
}
Consumer<String> initConsumer();
}
增加Nacos处理类对接口的支持
处理NacosConfigListenerService ,使其支持上述两个接口
@Slf4j
@ConditionalOnBean(NacosConfigProperties.class)
@Service
public class NacosConfigListenerService {
private static final long DEFAULT_TIMEOUT = 5000L;
@Autowired
private NacosConfigProperties nacosConfigProperties;
public void initAndAddListener(NacosManualInitAndListener config) throws NacosException {
Properties properties = config.getProperties();
if (properties == null) {
properties = getNacosProperties();
}
initAndAddListener(properties, config.getDataId(), config.getGroup(), config.getTimeout(),
config.initConsumer(), config.refreshConsumer(), config.dataLog());
}
public void addListener(NacosManualListener config) throws NacosException {
Properties properties = config.getProperties();
if (properties == null) {
properties = getNacosProperties();
}
addListener(properties, config.getDataId(), config.getGroup(), config.refreshConsumer(), config.dataLog());
}
public void initAndAddListener(String dataId, String group, Consumer<String> consumer) throws NacosException {
initAndAddListener(dataId, group, consumer, false);
}
public void initAndAddListener(String dataId, String group, Consumer<String> consumer, boolean dataLog)
throws NacosException {
initAndAddListener(dataId, group, consumer, consumer, dataLog);
}
public void initAndAddListener(String dataId, String group, Consumer<String> initConsumer,
Consumer<String> listenerConsumer) throws NacosException {
initAndAddListener(dataId, group, initConsumer, listenerConsumer, false);
}
public void initAndAddListener(String dataId, String group, Consumer<String> initConsumer,
Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
initAndAddListener(dataId, group, DEFAULT_TIMEOUT, initConsumer, listenerConsumer, dataLog);
}
public void initAndAddListener(Properties properties, String dataId, String group, Consumer<String> consumer)
throws NacosException {
initAndAddListener(properties, dataId, group, DEFAULT_TIMEOUT, consumer, consumer, false);
}
public void initAndAddListener(String dataId, String group, long timeout, Consumer<String> initConsumer,
Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
initAndAddListener(getNacosProperties(), dataId, group, timeout, initConsumer, listenerConsumer, dataLog);
}
public void initAndAddListener(Properties properties, String dataId, String group, long timeout,
Consumer<String> initConsumer, Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
String config = getConfig(properties, dataId, group, timeout);
log.info("初始化数据,dataId:{},group:{}", dataId, group);
if (dataLog) {
initConsumer = logConsumer(dataId, group).andThen(initConsumer);
}
initConsumer.accept(config);
addListener(properties, dataId, group, listenerConsumer, dataLog);
}
public ConfigService getConfigService(Properties properties) throws NacosException {
return NacosFactory.createConfigService(properties);
}
public ConfigService getConfigService() throws NacosException {
ConfigService configService = getConfigService(getNacosProperties());
return configService;
}
public String getConfig(String dataId, String group) throws NacosException {
return getConfig(dataId, group, DEFAULT_TIMEOUT);
}
public String getConfig(String dataId, String group, Long timeout) throws NacosException {
return getConfig(getNacosProperties(), dataId, group, timeout);
}
public String getConfig(Properties properties, String dataId, String group, Long timeout) throws NacosException {
checkDataIdAndGroupBlank(dataId, group);
log.info("获取配置,dataId:{},group:{}", dataId, group);
if (timeout <= 0) {
timeout = DEFAULT_TIMEOUT;
}
String config = getConfigService(properties).getConfig(dataId, group, timeout);
return config;
}
public <T> T getConfigAndParse(String dataId, String group, Function<String, T> parseFunction)
throws NacosException {
return getConfigAndParse(dataId, group, DEFAULT_TIMEOUT, parseFunction);
}
public <T> T getConfigAndParse(String dataId, String group, Long timeout, Function<String, T> parseFunction)
throws NacosException {
return getConfigAndParse(getNacosProperties(), dataId, group, timeout, parseFunction);
}
public <T> T getConfigAndParse(Properties properties, String dataId, String group, Long timeout,
Function<String, T> parseFunction) throws NacosException {
String config = getConfig(properties, dataId, group, timeout);
return parseFunction.apply(config);
}
public void addListener(Properties properties, String dataId, String group, Listener listener)
throws NacosException {
log.info("添加dataId:{},group:{}的监听", dataId, group);
getConfigService(properties).addListener(dataId, group, listener);
}
public void addListener(Properties properties, String dataId, String group, Consumer<String> consumer,
boolean dataLog) throws NacosException {
checkDataIdAndGroupBlank(dataId, group);
log.info("添加dataId:{},group:{}的监听", dataId, group);
if (dataLog) {
consumer = logConsumer(dataId, group).andThen(consumer);
}
getConfigService(properties).addListener(dataId, group, new NacosListener(consumer));
}
public void addListener(String dataId, String group, Listener listener) throws NacosException {
addListener(getNacosProperties(), dataId, group, listener);
}
public void addListener(String dataId, String group, Consumer<String> consumer) throws NacosException {
addListener(getNacosProperties(), dataId, group, consumer, true);
}
public boolean pushConfig(Properties properties, String dataId, String group, String config) throws NacosException {
log.info("向dataId:{},group:{}推送配置", dataId, group);
return getConfigService(properties).publishConfig(dataId, group, config);
}
public boolean pushConfig(String dataId, String group, String config) throws NacosException {
return pushConfig(getNacosProperties(), dataId, group, config);
}
public Properties getNacosProperties() {
return nacosConfigProperties.assembleConfigServiceProperties();
}
private void checkDataIdAndGroupBlank(String dataId, String group) {
if (StringUtils.isEmpty(dataId) || StringUtils.isEmpty(group)) {
throw new RuntimeException("dataId or group is blank");
}
}
private Consumer<String> logConsumer(String dataId, String group) {
return conf -> log.info("获取到dataId:{},group:{}的数据 config:{}", dataId, group, conf);
}
public static class NacosListener implements Listener {
private final Consumer<String> doReceiveConfigInfoConsumer;
public NacosListener(Consumer<String> doReceiveConfigInfoConsumer) {
this.doReceiveConfigInfoConsumer = doReceiveConfigInfoConsumer;
}
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String config) {
doReceiveConfigInfoConsumer.accept(config);
}
}
}
创建自动触发监听操作的处理类
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
@Slf4j
@ConditionalOnBean({NacosConfigListenerService.class})
@Service
public class ManualListenerProcessor {
@Autowired(required = false)
private List<NacosManualListener> nacosManualListenerList = Collections.emptyList();
@Autowired
private NacosConfigListenerService nacosConfigListenerService;
@PostConstruct
public void initAndAddListener() {
if (nacosManualListenerList.isEmpty()) {
return;
}
for (NacosManualListener listenerService : nacosManualListenerList) {
try {
if (listenerService instanceof NacosManualInitAndListener){
nacosConfigListenerService.initAndAddListener((NacosManualInitAndListener) listenerService);
}else {
nacosConfigListenerService.addListener(listenerService);
}
} catch (NacosException e) {
log.error("手动添加监听失败", e);
throw new RuntimeException("手动添加监听失败", e);
}
}
}
}
common-gateway
pom
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>xxx.xxx.xxx</groupId>
<artifactId>common-nacos-listener</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
创建动态路由的抽象类
@Slf4j
public abstract class AbstractDynamicRoute implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Autowired
private RouteDefinitionLocator routeDefinitionLocator;
@Autowired
private RouteDefinitionWriter routeDefinitionWriter;
public final String addOrModifyRoute(RouteDefinition definition) {
try {
routeDefinitionWriter.save(Mono.just(definition)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return "success";
} catch (Exception e) {
log.error("update route fail", e);
return "update route fail";
}
}
public final String deleteRoute(String id) {
try {
log.info("delete route : {}", id);
this.routeDefinitionWriter.delete(Mono.just(id)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return "delete success";
} catch (Exception e) {
log.error("delete fail", e);
return "delete fail";
}
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
protected abstract void initConfigAndListenerConfigRefresh();
protected final List<RouteDefinition> getRunningRouteInfo() {
List<RouteDefinition> runningDefinitionList = Optional.ofNullable(routeDefinitionLocator.getRouteDefinitions())
.map(Flux::collectList).map(Mono::block).orElse(Collections.emptyList());
return runningDefinitionList;
}
}
创建基于Nacos的动态路由处理类
@Slf4j
public abstract class AbstractDynamicRouteByNacos extends AbstractDynamicRoute implements NacosManualInitAndListener {
@Autowired
protected NacosConfigProperties nacosConfigProperties;
@Autowired
protected Environment environment;
@Autowired
protected NacosConfigListenerService nacosConfigListenerService;
@Autowired
protected ObjectMapper objectMapper;
@Override
public Consumer<String> refreshConsumer() {
return conf -> {
log.info("refresh gateway dynamic route begin ====================>>");
List<RouteDefinition> runningDefinitionList = getRunningRouteInfo();
List<RouteDefinition> definitionList = getRouteDefinitions(conf);
saveOrUpdateRoute(definitionList);
removeRoute(runningDefinitionList, definitionList);
log.info("refresh gateway dynamic route end <<====================");
};
}
@Override
public Consumer<String> initConsumer() {
return conf -> {
log.info("init gateway dynamic route begin====================>>");
List<RouteDefinition> definitionList = getRouteDefinitions(conf);
saveOrUpdateRoute(definitionList);
log.info("init gateway dynamic route end <<====================");
};
}
@Override
public boolean dataLog() {
return true;
}
@Override
public void initConfigAndListenerConfigRefresh() {
try {
nacosConfigListenerService.initAndAddListener(getProperties(), getDataId(), getGroup(), getTimeout(),
initConsumer(), refreshConsumer(), dataLog());
} catch (NacosException e) {
log.error("添加监听失败", e);
}
}
protected void modifyMultiArgs(RouteDefinition definition) {
List<PredicateDefinition> predicates = definition.getPredicates();
for (PredicateDefinition predicate : predicates) {
Map<String, String> predicateArgs = predicate.getArgs();
if (predicateArgs == null || predicateArgs.size() != 1) {
continue;
}
String[] args = StringUtils.tokenizeToStringArray(predicateArgs.values().iterator().next(), ",");
predicateArgs.clear();
for (int i = 0; i < args.length; i++) {
predicateArgs.put(NameUtils.generateName(i), args[i]);
}
}
}
protected void removeRoute(List<RouteDefinition> runningDefinitionList, List<RouteDefinition> definitionList) {
List<String> newRunningDefinitionIds =
definitionList.stream().map(RouteDefinition::getId).collect(Collectors.toList());
runningDefinitionList.stream().map(RouteDefinition::getId)
.filter(routId -> !newRunningDefinitionIds.contains(routId)).forEach(this::deleteRoute);
}
protected void saveOrUpdateRoute(List<RouteDefinition> definitionList) {
for (RouteDefinition definition : definitionList) {
modifyMultiArgs(definition);
log.info("update route : {}", definition);
addOrModifyRoute(definition);
}
}
protected List<RouteDefinition> getRouteDefinitions(String conf) {
List<RouteDefinition> definitionList = null;
try {
definitionList = objectMapper.readValue(conf, new TypeReference<List<RouteDefinition>>() {});
} catch (JsonProcessingException e) {
log.error("反序列化路由失败",e);
}
return definitionList;
}
}
创建基于Nacos的动态路由默认实现
public class DefaultDynamicRouteByNacos extends AbstractDynamicRouteByNacos {
@Override
public String getDataId() {
return environment.getProperty("nacos.gateway.route.config.data-id");
}
@Override
public String getGroup() {
return environment.getProperty("nacos.gateway.route.config.group");
}
@Override
public Properties getProperties() {
return nacosConfigListenerService.getNacosProperties();
}
}
注入默认实现
@Configuration
public class DynamicRouteConditionConfig {
@Bean
@ConditionalOnMissingBean(value = {AbstractDynamicRoute.class})
@ConditionalOnBean(value = {NacosConfigProperties.class, NacosConfigListenerService.class})
@ConditionalOnClass(value = {NacosConfigAutoConfiguration.class})
public DefaultDynamicRouteByNacos defaultDynamicRouteByNacos() {
return new DefaultDynamicRouteByNacos();
}
}
如此一来,新的Gateway项目只需要引入相关依赖,实现NacosManualInitAndListener接口,配置相关配置即可,如果动态路由配置的data-id与group-id与默认的一致,可以不写相关代码即可使gateway项目具有动态路由的功能。
pom中的依赖的scope 都用了provide ,所以需要引入的项目还需要显式的引入相关的依赖。
同时,对于其他需要进行手动监听的的项目,引入common-nacos-listener,也可以更方便的去处理监听。
|