1、snetinel-dashboard如何将配置推送到我们的微服务内存?
以dashboard新增流控规则为例,看下代码,会调用/v1/flow/rule POST方法。
@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
if (checkResult != null) {
return checkResult;
}
entity.setId(null);
Date date = new Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
entity.setLimitApp(entity.getLimitApp().trim());
entity.setResource(entity.getResource().trim());
try {
entity = repository.save(entity);
publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
return Result.ofSuccess(entity);
} catch (Throwable t) {
Throwable e = t instanceof ExecutionException ? t.getCause() : t;
logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
return Result.ofFail(-1, e.getMessage());
}
}
InMemoryRuleRepositoryAdapter.save()方法,将FlowRuleEntity对象保存到三个map中。
private Map<MachineInfo, Map<Long, T>> machineRules = new ConcurrentHashMap<>(16);
private Map<Long, T> allRules = new ConcurrentHashMap<>(16);
private Map<String, Map<Long, T>> appRules = new ConcurrentHashMap<>(16);
public T save(T entity) {
if (entity.getId() == null) {
entity.setId(nextId());
}
T processedEntity = preProcess(entity);
if (processedEntity != null) {
allRules.put(processedEntity.getId(), processedEntity);
machineRules.computeIfAbsent(MachineInfo.of(processedEntity.getApp(), processedEntity.getIp(),
processedEntity.getPort()), e -> new ConcurrentHashMap<>(32))
.put(processedEntity.getId(), processedEntity);
appRules.computeIfAbsent(processedEntity.getApp(), v -> new ConcurrentHashMap<>(32))
.put(processedEntity.getId(), processedEntity);
}
return processedEntity;
}
再看下发布规则代码,publishRules方法
private CompletableFuture<Void> publishRules(String app, String ip, Integer port) {
List<FlowRuleEntity> rules = repository.findAllByMachine(MachineInfo.of(app, ip, port));
return sentinelApiClient.setFlowRuleOfMachineAsync(app, ip, port, rules);
}
setFlowRuleOfMachineAsync继续跟下代码,会调用setRulesAsync方法
private CompletableFuture<Void> setRulesAsync(String app, String ip, int port, String type, List<? extends RuleEntity> entities) {
try {
AssertUtil.notNull(entities, "rules cannot be null");
AssertUtil.notEmpty(app, "Bad app name");
AssertUtil.notEmpty(ip, "Bad machine IP");
AssertUtil.isTrue(port > 0, "Bad machine port");
String data = JSON.toJSONString(
entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
Map<String, String> params = new HashMap<>(2);
params.put("type", type);
params.put("data", data);
return executeCommand(app, ip, port, SET_RULES_PATH, params, true)
.thenCompose(r -> {
if ("success".equalsIgnoreCase(r.trim())) {
return CompletableFuture.completedFuture(null);
}
return AsyncUtils.newFailedFuture(new CommandFailedException(r));
});
} catch (Exception e) {
logger.error("setRulesAsync API failed, type={}", type, e);
return AsyncUtils.newFailedFuture(e);
}
}
executeCommand方法,组织http请求信息
private CompletableFuture<String> executeCommand(String app, String ip, int port, String api, Map<String, String> params, boolean useHttpPost) {
CompletableFuture<String> future = new CompletableFuture<>();
if (StringUtil.isBlank(ip) || StringUtil.isBlank(api)) {
future.completeExceptionally(new IllegalArgumentException("Bad URL or command name"));
return future;
}
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append("http://");
urlBuilder.append(ip).append(':').append(port).append('/').append(api);
if (params == null) {
params = Collections.emptyMap();
}
if (!useHttpPost || !isSupportPost(app, ip, port)) {
if (!params.isEmpty()) {
if (urlBuilder.indexOf("?") == -1) {
urlBuilder.append('?');
} else {
urlBuilder.append('&');
}
urlBuilder.append(queryString(params));
}
return executeCommand(new HttpGet(urlBuilder.toString()));
} else {
return executeCommand(
postRequest(urlBuilder.toString(), params, isSupportEnhancedContentType(app, ip, port)));
}
}
2、我们的微服务如何接收处理 http://192.168.163.144:8720/setRules?
Env类会加载静态代码块,调用InitExecutor.doInit();
public class Env {
public static final Sph sph = new CtSph();
static {
InitExecutor.doInit();
}
}
InitExecutor.doInit()代码
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
w.func.getClass().getCanonicalName(), w.order));
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
CommandCenterInitFunc 类实现 InitFunc,init方法
@Override
public void init() throws Exception {
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
if (commandCenter == null) {
RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
return;
}
commandCenter.beforeStart();
commandCenter.start();
RecordLog.info("[CommandCenterInit] Starting command center: "
+ commandCenter.getClass().getCanonicalName());
}
start()方法
@Override
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
ServerThread 的run方法
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
break;
}
}
}
}
HttpEventTask的run方法
@Override
public void run() {
if (socket == null) {
return;
}
PrintWriter printWriter = null;
InputStream inputStream = null;
try {
long start = System.currentTimeMillis();
inputStream = new BufferedInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String firstLine = readLine(inputStream);
CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
+ ", addr: " + socket.getInetAddress());
CommandRequest request = processQueryString(firstLine);
if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
processPostRequest(inputStream, request);
}
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter);
} else {
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
}
long cost = System.currentTimeMillis() - start;
CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
+ ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
} catch (RequestException e) {
writeResponse(printWriter, e.getStatusCode(), e.getMessage());
} catch (Throwable e) {
CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
try {
if (printWriter != null) {
String errorMessage = SERVER_ERROR_MESSAGE;
e.printStackTrace();
if (!writtenHead) {
writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
} else {
printWriter.println(errorMessage);
}
printWriter.flush();
}
} catch (Exception e1) {
CommandCenterLog.warn("Failed to write error response", e1);
}
} finally {
closeResource(inputStream);
closeResource(printWriter);
closeResource(socket);
}
}
到此我们找到了ModifyRulesCommandHandler类的handle方法
@CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")
public class ModifyRulesCommandHandler implements CommandHandler<String> {
private static final int FASTJSON_MINIMAL_VER = 0x01020C00;
@Override
public CommandResponse<String> handle(CommandRequest request) {
if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) {
return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION
+ "\" introduced in application is too old, you need fastjson-1.2.12 at least."));
}
String type = request.getParam("type");
String data = request.getParam("data");
if (StringUtil.isNotEmpty(data)) {
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
RecordLog.info("Decode rule data error", e);
return CommandResponse.ofFailure(e, "decode rule data error");
}
}
RecordLog.info("Receiving rule change (type: {}): {}", type, data);
String result = "success";
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
FlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
...
}
FlowRuleManager.loadRules(flowRules)代码
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
再调用DynamicSentinelProperty的updateValue(rules)方法
@Override
public boolean updateValue(T newValue) {
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
value = newValue;
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue);
}
return true;
}
FlowPropertyListener的configUpdate(newValue)方法:
@Override
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
总结:
1、sentinel-dashboard创建流控规则,提交到dashboard后端。 2、后端先将规则保存到内存中,再通过http的方式,调用我们的微服务客户端,发布规则。地址:(http://192.168.163.144:8720/setRules) 3、微服务启动开启了一个socket(CommandCenter实现InitFunc接口),端口默认是8720;接收到数据,根据url,解析为command,如:setRules。 4、根据command找到CommandHandler(ModifyRulesCommandHandler),调用它的handle 5、根据传参的type,找到规则管理器,加载相应的规则。如: FlowRuleManager.loadRules(flowRules); 6、监听器处理配置更新,将其设置到内存中。
|