IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> 十、sentinel-dashboard规则推送到客户端源码分析 -> 正文阅读

[移动开发]十、sentinel-dashboard规则推送到客户端源码分析

1、snetinel-dashboard如何将配置推送到我们的微服务内存?

以dashboard新增流控规则为例,看下代码,会调用/v1/flow/rule POST方法。

@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
	Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
	if (checkResult != null) {
		return checkResult;
	}
	entity.setId(null);
	Date date = new Date();
	entity.setGmtCreate(date);
	entity.setGmtModified(date);
	entity.setLimitApp(entity.getLimitApp().trim());
	entity.setResource(entity.getResource().trim());
	try {
	    //1、保存FlowRuleEntity到自己内存
		entity = repository.save(entity);
		//2、发布规则
		publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
		return Result.ofSuccess(entity);
	} catch (Throwable t) {
		Throwable e = t instanceof ExecutionException ? t.getCause() : t;
		logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
		return Result.ofFail(-1, e.getMessage());
	}
}

InMemoryRuleRepositoryAdapter.save()方法,将FlowRuleEntity对象保存到三个map中。

//内存中三个map,保存流控规则
private Map<MachineInfo, Map<Long, T>> machineRules = new ConcurrentHashMap<>(16);
private Map<Long, T> allRules = new ConcurrentHashMap<>(16);
private Map<String, Map<Long, T>> appRules = new ConcurrentHashMap<>(16);
//保存到map中
public T save(T entity) {
	if (entity.getId() == null) {
		entity.setId(nextId());
	}
	T processedEntity = preProcess(entity);
	if (processedEntity != null) {
		allRules.put(processedEntity.getId(), processedEntity);
		machineRules.computeIfAbsent(MachineInfo.of(processedEntity.getApp(), processedEntity.getIp(),
			processedEntity.getPort()), e -> new ConcurrentHashMap<>(32))
			.put(processedEntity.getId(), processedEntity);
		appRules.computeIfAbsent(processedEntity.getApp(), v -> new ConcurrentHashMap<>(32))
			.put(processedEntity.getId(), processedEntity);
	}

	return processedEntity;
}

再看下发布规则代码,publishRules方法

private CompletableFuture<Void> publishRules(String app, String ip, Integer port) {
    //组织MachineInfo信息,通过machineRules这个map中获得流控规则
	List<FlowRuleEntity> rules = repository.findAllByMachine(MachineInfo.of(app, ip, port));
	//异步给Machine信息的主机设置FlowRule
	return sentinelApiClient.setFlowRuleOfMachineAsync(app, ip, port, rules);
}

setFlowRuleOfMachineAsync继续跟下代码,会调用setRulesAsync方法

private CompletableFuture<Void> setRulesAsync(String app, String ip, int port, String type, List<? extends RuleEntity> entities) {
	try {
		AssertUtil.notNull(entities, "rules cannot be null");
		AssertUtil.notEmpty(app, "Bad app name");
		AssertUtil.notEmpty(ip, "Bad machine IP");
		AssertUtil.isTrue(port > 0, "Bad machine port");
		//r.toRule() 将FlowRuleEntity转化成Rule
		String data = JSON.toJSONString(
			entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
		Map<String, String> params = new HashMap<>(2);
		// type是flow
		params.put("type", type);
		//是流控规则
		params.put("data", data);
		//执行命令。SET_RULES_PATH是setRules
		return executeCommand(app, ip, port, SET_RULES_PATH, params, true)
			.thenCompose(r -> {
				if ("success".equalsIgnoreCase(r.trim())) {
					return CompletableFuture.completedFuture(null);
				}
				return AsyncUtils.newFailedFuture(new CommandFailedException(r));
			});
	} catch (Exception e) {
		logger.error("setRulesAsync API failed, type={}", type, e);
		return AsyncUtils.newFailedFuture(e);
	}
}

executeCommand方法,组织http请求信息

private CompletableFuture<String> executeCommand(String app, String ip, int port, String api, Map<String, String> params, boolean useHttpPost) {
	CompletableFuture<String> future = new CompletableFuture<>();
	if (StringUtil.isBlank(ip) || StringUtil.isBlank(api)) {
		future.completeExceptionally(new IllegalArgumentException("Bad URL or command name"));
		return future;
	}
	StringBuilder urlBuilder = new StringBuilder();
	urlBuilder.append("http://");
	urlBuilder.append(ip).append(':').append(port).append('/').append(api);
	if (params == null) {
		params = Collections.emptyMap();
	}
	//我的urlBuilder 是 http://192.168.163.144:8720/setRules
	if (!useHttpPost || !isSupportPost(app, ip, port)) {
		// httpClient调用
		if (!params.isEmpty()) {
			if (urlBuilder.indexOf("?") == -1) {
				urlBuilder.append('?');
			} else {
				urlBuilder.append('&');
			}
			urlBuilder.append(queryString(params));
		}
		return executeCommand(new HttpGet(urlBuilder.toString()));
	} else {
		// httpClient调用
		return executeCommand(
				postRequest(urlBuilder.toString(), params, isSupportEnhancedContentType(app, ip, port)));
	}
}

2、我们的微服务如何接收处理 http://192.168.163.144:8720/setRules?

Env类会加载静态代码块,调用InitExecutor.doInit();

public class Env {

    public static final Sph sph = new CtSph();

    static {
        //核心代码
        InitExecutor.doInit();
    }

}

InitExecutor.doInit()代码

public static void doInit() {
	if (!initialized.compareAndSet(false, true)) {
		return;
	}
	try {
	    // SPI加载InitFunc
		ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);
		List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
		for (InitFunc initFunc : loader) {
			RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
			//排序
			insertSorted(initList, initFunc);
		}
		for (OrderWrapper w : initList) {
		    //调用init方法
			w.func.init();
			RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
				w.func.getClass().getCanonicalName(), w.order));
		}
	} catch (Exception ex) {
		RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
		ex.printStackTrace();
	} catch (Error error) {
		RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
		error.printStackTrace();
	}
}

CommandCenterInitFunc 类实现 InitFunc,init方法

@Override
public void init() throws Exception {
	//SPI加载CommandCenter
	CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

	if (commandCenter == null) {
		RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
		return;
	}

	commandCenter.beforeStart();
	//核心代码,调用start
	commandCenter.start();
	RecordLog.info("[CommandCenterInit] Starting command center: "
			+ commandCenter.getClass().getCanonicalName());
}

start()方法

@Override
public void start() throws Exception {
	int nThreads = Runtime.getRuntime().availableProcessors();
	this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
		new ArrayBlockingQueue<Runnable>(10),
		new NamedThreadFactory("sentinel-command-center-service-executor"),
		new RejectedExecutionHandler() {
			@Override
			public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
				CommandCenterLog.info("EventTask rejected");
				throw new RejectedExecutionException();
			}
		});

	Runnable serverInitTask = new Runnable() {
		int port;

		{
			try {
				port = Integer.parseInt(TransportConfig.getPort());
			} catch (Exception e) {
				port = DEFAULT_PORT;
			}
		}

		@Override
		public void run() {
			boolean success = false;
			//根据端口创建ServerSocket
			ServerSocket serverSocket = getServerSocketFromBasePort(port);

			if (serverSocket != null) {
				CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
				socketReference = serverSocket;
				//线程池执行ServerThread,ServerThread绑定了ServerSocket
				executor.submit(new ServerThread(serverSocket));
				success = true;
				port = serverSocket.getLocalPort();
			} else {
				CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
			}

			if (!success) {
				port = PORT_UNINITIALIZED;
			}

			TransportConfig.setRuntimePort(port);
			executor.shutdown();
		}

	};

	new Thread(serverInitTask).start();
}

ServerThread 的run方法

@Override
public void run() {
	while (true) {
		Socket socket = null;
		try {
			socket = this.serverSocket.accept();
			setSocketSoTimeout(socket);
			//将socket绑定到HttpEventTask
			HttpEventTask eventTask = new HttpEventTask(socket);
			//执行eventTask的run方法
			bizExecutor.submit(eventTask);
		} catch (Exception e) {
			CommandCenterLog.info("Server error", e);
			if (socket != null) {
				try {
					socket.close();
				} catch (Exception e1) {
					CommandCenterLog.info("Error when closing an opened socket", e1);
				}
			}
			try {
				// In case of infinite log.
				Thread.sleep(10);
			} catch (InterruptedException e1) {
				// Indicates the task should stop.
				break;
			}
		}
	}
}

HttpEventTask的run方法

@Override
public void run() {
	if (socket == null) {
		return;
	}

	PrintWriter printWriter = null;
	InputStream inputStream = null;
	try {
		long start = System.currentTimeMillis();
		inputStream = new BufferedInputStream(socket.getInputStream());
		OutputStream outputStream = socket.getOutputStream();

		printWriter = new PrintWriter(
			new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));

		String firstLine = readLine(inputStream);
		CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
			+ ", addr: " + socket.getInetAddress());
		CommandRequest request = processQueryString(firstLine);

		if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
			// Deal with post method
			processPostRequest(inputStream, request);
		}

		//获取commandName ,也就是前面url上的 setRules
		String commandName = HttpCommandUtils.getTarget(request);
		if (StringUtil.isBlank(commandName)) {
			writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
			return;
		}

		// 根据commandName获得CommandHandler,我们获取到的是ModifyRulesCommandHandler
		CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
		if (commandHandler != null) {
			//调用handle方法
			CommandResponse<?> response = commandHandler.handle(request);
			handleResponse(response, printWriter);
		} else {
			// No matching command handler.
			writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
		}

		long cost = System.currentTimeMillis() - start;
		CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
			+ ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
	} catch (RequestException e) {
		writeResponse(printWriter, e.getStatusCode(), e.getMessage());
	} catch (Throwable e) {
		CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
		try {
			if (printWriter != null) {
				String errorMessage = SERVER_ERROR_MESSAGE;
				e.printStackTrace();
				if (!writtenHead) {
					writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
				} else {
					printWriter.println(errorMessage);
				}
				printWriter.flush();
			}
		} catch (Exception e1) {
			CommandCenterLog.warn("Failed to write error response", e1);
		}
	} finally {
		closeResource(inputStream);
		closeResource(printWriter);
		closeResource(socket);
	}
}

到此我们找到了ModifyRulesCommandHandler类的handle方法

//setRules就是我们要找的command
@CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")
public class ModifyRulesCommandHandler implements CommandHandler<String> {
    private static final int FASTJSON_MINIMAL_VER = 0x01020C00;

    @Override
    public CommandResponse<String> handle(CommandRequest request) {
        // XXX from 1.7.2, force to fail when fastjson is older than 1.2.12
        // We may need a better solution on this.
        if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) {
            // fastjson too old
            return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION
                    + "\" introduced in application is too old, you need fastjson-1.2.12 at least."));
        }
		//类型 我们是flow
        String type = request.getParam("type");
        //规则json数据
        String data = request.getParam("data");
        if (StringUtil.isNotEmpty(data)) {
            try {
                data = URLDecoder.decode(data, "utf-8");
            } catch (Exception e) {
                RecordLog.info("Decode rule data error", e);
                return CommandResponse.ofFailure(e, "decode rule data error");
            }
        }

        RecordLog.info("Receiving rule change (type: {}): {}", type, data);

        String result = "success";
		//流控规则
        if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
		    //解析流控规则
            List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
			//重点来了: 将流控规则加载到我们微服务的内存中
            FlowRuleManager.loadRules(flowRules);
			// 扩展点:如果有FlowDataSource,则写到数据源
            if (!writeToDataSource(getFlowDataSource(), flowRules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);
        } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
            List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
            AuthorityRuleManager.loadRules(rules);
            if (!writeToDataSource(getAuthorityDataSource(), rules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);
        } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
            List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
            DegradeRuleManager.loadRules(rules);
            if (!writeToDataSource(getDegradeDataSource(), rules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);
        } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
            List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
            SystemRuleManager.loadRules(rules);
            if (!writeToDataSource(getSystemSource(), rules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);
        }
        return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
    }
	...
}

FlowRuleManager.loadRules(flowRules)代码

public static void loadRules(List<FlowRule> rules) {
	currentProperty.updateValue(rules);
}

再调用DynamicSentinelProperty的updateValue(rules)方法

@Override
public boolean updateValue(T newValue) {
	if (isEqual(value, newValue)) {
		return false;
	}
	RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);

	value = newValue;
	for (PropertyListener<T> listener : listeners) {
	    //监听器处理配置变化,FlowPropertyListener
		listener.configUpdate(newValue);
	}
	return true;
}

FlowPropertyListener的configUpdate(newValue)方法:

@Override
public void configUpdate(List<FlowRule> value) {
	Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
	if (rules != null) {
		flowRules.clear();
		//设置流控规则
		flowRules.putAll(rules);
	}
	RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}

总结:

1、sentinel-dashboard创建流控规则,提交到dashboard后端。
2、后端先将规则保存到内存中,再通过http的方式,调用我们的微服务客户端,发布规则。地址:(http://192.168.163.144:8720/setRules)
3、微服务启动开启了一个socket(CommandCenter实现InitFunc接口),端口默认是8720;接收到数据,根据url,解析为command,如:setRules。
4、根据command找到CommandHandler(ModifyRulesCommandHandler),调用它的handle
5、根据传参的type,找到规则管理器,加载相应的规则。如: FlowRuleManager.loadRules(flowRules);
6、监听器处理配置更新,将其设置到内存中。

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-04-22 18:47:45  更:2022-04-22 18:51:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 22:31:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码