微服务网关服务
认识 gateway 微服务网关组件
Spring Cloud GateWay 是 spring 官方推出的一款 基于 springframework5,Project Reactor和 spring boot2 之上开发的网关,其性能,高吞吐量,将代替zuul称为新一代的网关,用于给微服务提供 统一的api管理方式
与第一代的区别
和第一代网关zuul 相比 不同的事 gateway 是异步非阻塞的 (netty+webflux实现); zuul是同步阻塞请求的,性能上有这很大的差异
Gateway 组成部分
工作模型
- 请求发送到网关,由分发器将请求匹配到响应的 handlerMapping(这里的handlermapping不是MVC的那个,可以理解为匹配url的网关处理器)
- 请求和处理器之间有一个映射,路由到网关处理程序, web Handler他最用是把请求放入过滤器链路中,
- 执行特定的请求和过滤器链路,(我们自定义的)依次执行过滤器
- 最终到达代理微服务
思考
可以看到我们这个模型图 都是双向剪头的 那么找到了对应的 服务 返回的结果是如何回来的呢?
首先网关有相关的代理服务,然后把请求交给对应的代理服务处理,处理完后,将结果返回到Gateway客户端。
这里 filter 可以看到时 用虚线隔开的
pre filter : 请求必须要执行完pre filter并且执行完毕之后才会到对应的代理服务中处理,
post filter :对应的代理服务执行完处理完之后,才会执行 psot filter中的过滤器
模块搭建 三部曲
我们创建 gateway 服务项目
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.hyc.ecommerce</groupId>
<artifactId>e-commerce-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
编写配置
server:
port: 9001
servlet:
context-path: /imooc
spring:
application:
name: e-commerce-gateway
cloud:
nacos:
discovery:
enabled: true
server-addr: 127.0.0.1:8848
namespace: 1bc13fd5-843b-4ac0-aa55-695c25bc0ac6
metadata:
management:
context-path: ${server.servlet.context-path}/actuator
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
retries: 3
consumer:
auto-offset-reset: latest
zipkin:
sender:
type: kafka
base-url: http://localhost:9411/
main:
allow-bean-definition-overriding: true
nacos:
gateway:
route:
config:
data-id: e-commerce-gateway-router
group: e-commerce
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
谓词 Predicate 的原理与应用
想要 理解gateway 的Predicate ,首先我们要 理解 java 8 中的Predicate
我们先去看看 java8 的predicate吧
java8 predicate
由 java 8 引入位于 package java.util.function; 包中 是一个 函数式接口
@FunctionalInterface
public interface Predicate<T>
? 来看这个 test 方法
需要输入一个的参数 返回 boolean 类型 通常用于 stream的filter 中 表示是否满足过滤条件
可能到这里,都是比较懵的。上手编写 java8 predicate 的效果
理解一下
Java 8 Predicate 常用的一些方法 执行测试看到效果就会 理解很多,这里提供 Test code
@SpringBootTest
@Slf4j
@RunWith(SpringRunner.class)
public class PredicateTest {
public static List<String> MICRO_SERVER = Arrays.asList(
"nacos", "authority", "gateway", "ribbon", "feign", "Hystrix", "e-comerce"
);
@Test
public void testPredicateTest() {
Predicate<String> letterlengthLimit = s -> s.length() > 5;
MICRO_SERVER.stream().filter(letterlengthLimit).forEach(System.out::println);
}
@Test
public void testPredicateAnd() {
Predicate<String> letterlengthLimit = s -> s.length() > 5;
Predicate<String> letterStartWith = s -> s.startsWith("gate");
MICRO_SERVER.stream().filter(
letterlengthLimit.and(letterStartWith)
).forEach(System.out::println);
}
@Test
public void testPredicateOr() {
Predicate<String> letterlengthLimit = s -> s.length() > 5;
Predicate<String> letterStartWith = s -> s.startsWith("gate");
MICRO_SERVER.stream().filter(
letterlengthLimit.or(letterStartWith)
).forEach(System.out::println);
}
@Test
public void testPredicateNegate() {
Predicate<String> letterStartWith = s -> s.startsWith("gate");
MICRO_SERVER.stream().filter(
letterStartWith.negate()
).forEach(System.out::println);
}
@Test
public void testPredicateIsEqual() {
Predicate<String> equalGateway = s -> Predicate.isEqual("gateway").test(s);
MICRO_SERVER.stream().filter(
equalGateway
).forEach(System.out::println);
}
}
简单了解了一下,Predicate 这里我们去查看一下,gateway的 路径匹配路由工厂PathRoutePredicateFactory
从名字我们可以看出,这个工厂是负责 路径匹配的
看到 apply 方法
他其实就是集成了 java8 的predicate
这里我们看到返回的GatewayPredicate ,这里其实就是对请求的url
- 首先这个方法先获得了 path方法获取到当前请求的路径信息
- 之后和我们的配置进行一个匹配(正则表达式)返回匹配,否则就在去寻找
Tips:
- 这里我们首先要理解 Predicate 的几个方法
- 之后去分析一个 Gateway 的一个 Predicate 实现 查看一下 Gateway是如何实现的
@Override
public Predicate<ServerWebExchange> apply(Config config) {
final ArrayList<PathPattern> pathPatterns = new ArrayList<>();
synchronized (this.pathPatternParser) {
pathPatternParser.setMatchOptionalTrailingSeparator(
config.isMatchOptionalTrailingSeparator());
config.getPatterns().forEach(pattern -> {
PathPattern pathPattern = this.pathPatternParser.parse(pattern);
pathPatterns.add(pathPattern);
});
}
return new GatewayPredicate() {
@Override
public boolean test(ServerWebExchange exchange) {
PathContainer path = parsePath(
exchange.getRequest().getURI().getRawPath());
Optional<PathPattern> optionalPathPattern = pathPatterns.stream()
.filter(pattern -> pattern.matches(path)).findFirst();
if (optionalPathPattern.isPresent()) {
PathPattern pathPattern = optionalPathPattern.get();
traceMatch("Pattern", pathPattern.getPatternString(), path, true);
PathMatchInfo pathMatchInfo = pathPattern.matchAndExtract(path);
putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());
return true;
}
else {
traceMatch("Pattern", config.getPatterns(), path, false);
return false;
}
}
@Override
public String toString() {
return String.format("Paths: %s, match trailing slash: %b",
config.getPatterns(), config.isMatchOptionalTrailingSeparator());
}
};
}
alibaba nacos 实现动态路由配置
这里其实动态静态的配置,就是是否放到nacos上的区别
- 静态路由 配置写在配置文件中 (yml 或者 proprieties文件中),端点,是
spring.cloud,gateway ,缺点是每次更改都系要网关重新部署 - 动态其实就是,从nacos上获取到配置,我们需要创建配置在nacos web端,之后相关服务启动的时候,我们需要配置 config 定义要去哪里获取到配置,我们在gateway解析配置,监听变化,如果有变化就刷新配置就好了
我们打开nacos的web 页面
[
{
"id": "e-commerce-nacos-client",
"predicates": [
{
"args": {
"pattern": "/imooc/ecommerce-nacos-client/**"
},
"name": "Path"
}
],
"uri": "lb://e-commerce-nacos-client",
"filters": [
{
"name": "HeaderToken"
},
{
"name": "StripPrefix",
"args": {
"parts": "1"
}
}
]
},
{
"id": "e-commerce-account-service",
"predicates": [
{
"args": {
"pattern": "/imooc/ecommerce-account-service/**"
},
"name": "Path"
}
],
"uri": "lb://e-commerce-account-service",
"filters": [
{
"name": "StripPrefix",
"args": {
"parts": "1"
}
}
]
},
{
"id": "e-commerce-goods-service",
"predicates": [
{
"args": {
"pattern": "/imooc/ecommerce-goods-service/**"
},
"name": "Path"
}
],
"uri": "lb://e-commerce-goods-service",
"filters": [
{
"name": "StripPrefix",
"args": {
"parts": "1"
}
}
]
}
]
动态路由网关的配置
GatewayConfig 创建 config 设置一些 需要的参数 ,比如
- 超时时间
- nacos 服务器的地址
- 命名空间
- data-id
- Group id
@Configuration
public class GatewayConfig {
public static final long DEFAULT_TIMEOUT = 30000;
public static String NACOS_SERVER_ADDR;
public static String NACOS_NAMESPACE;
public static String NACOS_ROUTE_DATA_ID;
public static String NACOS_ROUTE_GROUP;
@Value("${spring.cloud.nacos.discovery.server-addr}")
public void setNacosServerAddr(String nacosServerAddr) {
NACOS_SERVER_ADDR = nacosServerAddr;
}
@Value("${spring.cloud.nacos.discovery.namespace}")
public void setNacosNamespace(String nacosNamespace) {
NACOS_NAMESPACE = nacosNamespace;
}
@Value("${nacos.gateway.route.config.data-id}")
public void setNacosRouteDataId(String nacosRouteDataId) {
NACOS_ROUTE_DATA_ID = nacosRouteDataId;
}
@Value("${nacos.gateway.route.config.group}")
public void setNacosRouteGroup(String nacosRouteGroup) {
NACOS_ROUTE_GROUP = nacosRouteGroup;
}
}
这里我们再次梳理一下思路 这里我们保存的这些配置信息,这里我们做的是保存当前配置的,之后发生改变了,我们先监听,再去获取配置信息之后刷新配置。 接下来我们编写注册网关事件更新操作
编写注册网关事件更新
@Slf4j
@Service
@SuppressWarnings("all")
public class DynamicRouteServiceImpl implements ApplicationEventPublisherAware {
private final RouteDefinitionWriter routeDefinitionWriter;
private final RouteDefinitionLocator routeDefinitionLocator;
private ApplicationEventPublisher publisher;
public DynamicRouteServiceImpl(RouteDefinitionWriter routeDefinitionWriter,
RouteDefinitionLocator routeDefinitionLocator) {
this.routeDefinitionWriter = routeDefinitionWriter;
this.routeDefinitionLocator = routeDefinitionLocator;
}
@Override
public void setApplicationEventPublisher(
ApplicationEventPublisher applicationEventPublisher) {
this. = applicationEventPublisher;
}
public String addRouteDefinition(RouteDefinition definition) {
log.info("gateway add route: [{}]", definition);
routeDefinitionWriter.save(Mono.just(definition)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return "success";
}
public String updateList(List<RouteDefinition> definitions) {
log.info("gateway update route: [{}]", definitions);
List<RouteDefinition> routeDefinitionsExits =
routeDefinitionLocator.getRouteDefinitions().buffer().blockFirst();
if (!CollectionUtils.isEmpty(routeDefinitionsExits)) {
routeDefinitionsExits.forEach(rd -> {
log.info("delete route definition: [{}]", rd);
deleteById(rd.getId());
});
}
definitions.forEach(definition -> updateByRouteDefinition(definition));
return "success";
}
private String deleteById(String id) {
try {
log.info("gateway delete route id: [{}]", id);
this.routeDefinitionWriter.delete(Mono.just(id)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return "delete success";
} catch (Exception ex) {
log.error("gateway delete route fail: [{}]", ex.getMessage(), ex);
return "delete fail";
}
}
private String updateByRouteDefinition(RouteDefinition definition) {
try {
log.info("gateway update route: [{}]", definition);
this.routeDefinitionWriter.delete(Mono.just(definition.getId()));
} catch (Exception ex) {
return "update fail, not find route routeId: " + definition.getId();
}
try {
this.routeDefinitionWriter.save(Mono.just(definition)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return "success";
} catch (Exception ex) {
return "update route fail";
}
}
}
这里我们进行了 事件推送器的操作,更新配置和删除,新增操作,可能没接触过的小伙伴会比较的蒙圈,这里可以去 补充学习一下 Spring5 reactor编程
编写完对应的操作,我们就需要去连接 nacos 之后通过 nacos 的api 获取配置和初始化进行一些操作了
编写 连接 nacos 获取配置
@Slf4j
@Component
@DependsOn({"gatewayConfig"})
public class DynamicRouteServiceImplByNacos {
private ConfigService configService;
private final DynamicRouteServiceImpl dynamicRouteService;
public DynamicRouteServiceImplByNacos(DynamicRouteServiceImpl dynamicRouteService) {
this.dynamicRouteService = dynamicRouteService;
}
@PostConstruct
public void init() {
log.info("gateway route init....");
try {
configService = initConfigService();
if (null == configService) {
log.error("init config service fail");
return;
}
String configInfo = configService.getConfig(
GatewayConfig.NACOS_ROUTE_DATA_ID,
GatewayConfig.NACOS_ROUTE_GROUP,
GatewayConfig.DEFAULT_TIMEOUT
);
log.info("get current gateway config: [{}]", configInfo);
List<RouteDefinition> definitionList =
JSON.parseArray(configInfo, RouteDefinition.class);
if (CollectionUtils.isNotEmpty(definitionList)) {
for (RouteDefinition definition : definitionList) {
log.info("init gateway config: [{}]", definition.toString());
dynamicRouteService.addRouteDefinition(definition);
}
}
} catch (Exception ex) {
log.error("gateway route init has some error: [{}]", ex.getMessage(), ex);
}
dynamicRouteByNacosListener(GatewayConfig.NACOS_ROUTE_DATA_ID,
GatewayConfig.NACOS_ROUTE_GROUP);
}
private ConfigService initConfigService() {
try {
Properties properties = new Properties();
properties.setProperty("serverAddr", GatewayConfig.NACOS_SERVER_ADDR);
properties.setProperty("namespace", GatewayConfig.NACOS_NAMESPACE);
return configService = NacosFactory.createConfigService(properties);
} catch (Exception ex) {
log.error("init gateway nacos config error: [{}]", ex.getMessage(), ex);
return null;
}
}
private void dynamicRouteByNacosListener(String dataId, String group) {
try {
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
log.info("start to update config: [{}]", configInfo);
List<RouteDefinition> definitionList =
JSON.parseArray(configInfo, RouteDefinition.class);
log.info("update route: [{}]", definitionList.toString());
dynamicRouteService.updateList(definitionList);
}
});
} catch (NacosException ex) {
log.error("dynamic update gateway config error: [{}]", ex.getMessage(), ex);
}
}
}
验证动态配置的可用性
2021-12-08 14:15:58.335 INFO [e-commerce-gateway,,,] 32948 --- [ main] c.i.e.c.DynamicRouteServiceImplByNacos : gateway route init....
2021-12-08 14:15:58.687 INFO [e-commerce-gateway,,,] 32948 --- [ main] c.i.e.c.DynamicRouteServiceImplByNacos : get current gateway config: [[
{
"id": "e-commerce-nacos-client",
"predicates": [
{
"args": {
"pattern": "/imooc/ecommerce-nacos-client/**"
},
"name": "Path"
}
],
"uri": "lb://e-commerce-nacos-client"
}]]
2021-12-08 14:15:58.776 INFO [e-commerce-gateway,,,] 32948 --- [ main] c.i.e.c.DynamicRouteServiceImplByNacos : init gateway config: [RouteDefinition{id='e-commerce-nacos-client', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]
2021-12-08 14:15:58.776 INFO [e-commerce-gateway,,,] 32948 --- [ main] c.i.e.config.DynamicRouteServiceImpl : gateway add route: [RouteDefinition{id='e-commerce-nacos-client', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]
可以看到 我们成功的连接到了 nacos 并且拿到了配置
这个时候我们 修改配置 id 变动 这个时候查看我们的控制台
2021-12-08 14:53:35.641 WARN [e-commerce-gateway,,,] 32948 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
2021-12-08 14:53:37.788 INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.c.DynamicRouteServiceImplByNacos : start to update config: [[
{
"id": "e-commerce-nacos-client1",
"predicates": [
{
"args": {
"pattern": "/imooc/ecommerce-nacos-client/**"
},
"name": "Path"
}
],
"uri": "lb://e-commerce-nacos-client"
}]]
2021-12-08 14:53:37.788 INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.c.DynamicRouteServiceImplByNacos : update route: [[RouteDefinition{id='e-commerce-nacos-client1', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]]
2021-12-08 14:53:37.788 INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.config.DynamicRouteServiceImpl : gateway update route: [[RouteDefinition{id='e-commerce-nacos-client1', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]]
就可以看到我们配置更新 日志打出 证明 我们可以在项目启动的时候动态的修改路由配置,网关随着负责增加,需要频繁的变更,所以我们这里才会使用动态配置。
SpringCloud Gateway Filter
认识过滤器 , SpringCloud Gateway Filter
基于过滤器的思想实现,与 zuul 类似 。有 pre 和 post 两种方式都filter,分别处理前置逻辑和后置逻辑
前置 : 客户端请求会经过pre类型的filter 然后将请求转发到具体的业务服务,
**后置:**收到服务端响应后 经过 post 类型的filter 处理 最后返回给客户端
**Filter有两大类别:**全局过滤器和局部过滤器
这里我们查看一下Gateway给我们提供的 局部和全局过滤器的各别思路
全局的过滤器
这里我们可以看到,每一个全局过滤器都需要实现 全局过滤器接口和对应的 filter方法,下面我们来看一下其中一个实现类
RouteToRequestUrlFilter
这个类的核心方法,我们来解读一下这个方法的作用 (以对应代码部分的注释的方式解读)
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();
if (hasAnotherScheme(routeUri)) {
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}
URI mergedUrl = UriComponentsBuilder.fromUri(uri)
.scheme(routeUri.getScheme()).host(routeUri.getHost())
.port(routeUri.getPort()).build(encoded).toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}
局部过滤器
局部过滤器主要方法 都是返回一个 GatewayFilter对象
PrefixPathGatewayFilterFactory 局部 前置过滤器
public class PrefixPathGatewayFilterFactory
extends AbstractGatewayFilterFactory<PrefixPathGatewayFilterFactory.Config> {
public static final String PREFIX_KEY = "prefix";
private static final Log log = LogFactory
.getLog(PrefixPathGatewayFilterFactory.class);
public PrefixPathGatewayFilterFactory() {
super(Config.class);
}
@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList(PREFIX_KEY);
}
@Override
public GatewayFilter apply(Config config) {
return new GatewayFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange,
GatewayFilterChain chain) {
boolean alreadyPrefixed = exchange
.getAttributeOrDefault(GATEWAY_ALREADY_PREFIXED_ATTR, false);
if (alreadyPrefixed) {
return chain.filter(exchange);
}
exchange.getAttributes().put(GATEWAY_ALREADY_PREFIXED_ATTR, true);
ServerHttpRequest req = exchange.getRequest();
addOriginalRequestUrl(exchange, req.getURI());
String newPath = config.prefix + req.getURI().getRawPath();
ServerHttpRequest request = req.mutate().path(newPath).build();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, request.getURI());
if (log.isTraceEnabled()) {
log.trace("Prefixed URI with: " + config.prefix + " -> "
+ request.getURI());
}
return chain.filter(exchange.mutate().request(request).build());
}
@Override
public String toString() {
return filterToStringCreator(PrefixPathGatewayFilterFactory.this)
.append("prefix", config.getPrefix()).toString();
}
};
}
public static class Config {
private String prefix;
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
}
}
StripPrefixGatewayFilterFactory 后置过滤器
这里 StripPrefix的意思就是 去掉前缀,
public class StripPrefixGatewayFilterFactory
extends AbstractGatewayFilterFactory<StripPrefixGatewayFilterFactory.Config> {
public static final String PARTS_KEY = "parts";
public StripPrefixGatewayFilterFactory() {
super(Config.class);
}
@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList(PARTS_KEY);
}
@Override
public GatewayFilter apply(Config config) {
return new GatewayFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange,
GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
addOriginalRequestUrl(exchange, request.getURI());
String path = request.getURI().getRawPath();
String newPath = "/"
+ Arrays.stream(StringUtils.tokenizeToStringArray(path, "/"))
.skip(config.parts).collect(Collectors.joining("/"));
newPath += (newPath.length() > 1 && path.endsWith("/") ? "/" : "");
ServerHttpRequest newRequest = request.mutate().path(newPath).build();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR,
newRequest.getURI());
return chain.filter(exchange.mutate().request(newRequest).build());
}
@Override
public String toString() {
return filterToStringCreator(StripPrefixGatewayFilterFactory.this)
.append("parts", config.getParts()).toString();
}
};
}
public static class Config {
private int parts;
public int getParts() {
return parts;
}
public void setParts(int parts) {
this.parts = parts;
}
}
}
过滤器的执行流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J2DcdAbW-1639027996653)(springcloudalibaba项目.assets/image-20211209003355832.png)]
- 过滤器有优先级之分,Order越大 优先级越来越低,越晚被执行
- 全局过滤器 所有的请求都会执行
- 局部过滤器只有配置了对应请求才会执行
|