分布式服务链路追踪-SkyWorking
作者:田超凡
原创博文,仿冒必究,部分素材转载自每特教育蚂蚁课堂
1 服务链路追踪的意义
在微服务系统中,随着业务的发展,系统会变得越来越大,那么各个服务之间的调用关系也就变得越来越复杂。一个 HTTP 请求会调用多个不同的微服务来处理返回最后的结果,在这个调用过程中,可能会因为某个服务出现网络延迟过高或发送错误导致请求失败,这个时候,对请求调用的监控就显得尤为重要了。Spring Cloud Sleuth 提供了分布式服务链路监控的解决方案。下面介绍 Spring Cloud Sleuth 整合 Zipkin 的解决方案。
2 分布式服务追踪系统优点
1.RPC远程调用过程中,服务与服务依赖的关系非常复杂,A调用B服务,B服务C服务 如果某个服务出现问题可能会导致整个链响应延迟;
2.如果服务的依赖关系非常复杂,建议改成MQ异步的形式。
3 服务追踪常用框架
1. Sleuth +Zipkin SpringCloud封装了 Sleuth +Zipkin
2. 阿里的鹰眼系统
3. SkyWalking(非常推荐大家使用 功能非常强大)---Skywalking
4 Sleuth+Zipkin 的使用
1.Spring Cloud Sleuth实现了一种分布式的服务链路跟踪解决方案,通过使用Sleuth可以让我们快速定位某个服务的问题。
2.官方文档地址如下:https://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/2.0.1.RELEASE/single/spring-cloud-sleuth.html
服务追踪常见名词:
1.Sleuth可以结合Zipkin可以实现界面化的形式管理我们接口依赖信息;
2.Sleuth 每一次RPC远程调用请求都会生成一个spanid记录每一次rpc请求的信息,还有一个traceid 全局唯一id;
3.spanid为请求单元ID、traceid 为请求链全局id
5 Sleuth+Zipkin环境搭建
在 Spring Boot 2.0 版本之后,官方已不推荐自己搭建定制了,而是直接提供了编译好的 jar 包。详情可以查看官网:Quickstart · OpenZipkin
注意:zipkin官网已经提供定制了,使用官方jar运行即可。
默认端口号启动zipkin服务
java –jar zipkin.jar
默认端口号:9411
http://127.0.0.1:9411
指定端口号启动
java -jar zipkin.jar --server.port=8080
http://127.0.0.1:8080
6 SpringBoot整合Sleuth+Zipkin
Maven依赖
<dependency>
??? <groupId>org.springframework.cloud</groupId>
??? <artifactId>spring-cloud-starter-zipkin</artifactId>
??? <version>2.0.0.RELEASE</version>
</dependency>
<dependency>
??? <groupId>org.springframework.cloud</groupId>
??? <artifactId>spring-cloud-starter-sleuth</artifactId>
??? <version>2.0.0.RELEASE</version>
</dependency>
application.yml配置
zipkin:
? base-url: http://127.0.0.1:9411/
###全部采集
sleuth:
?? sampler:
??? probability: 1.0
从请求头中获取TraceID的方法
log.info("<X-B3-TraceId>:{}", request.getHeader("X-B3-TraceId")); log.info("<X-B3-SpanId>:{}", request.getHeader("X-B3-SpanId")); log.info("<X-B3-ParentSpanId>:{}", request.getHeader("X-B3-ParentSpanId"));
X-B3-TraceId:对应TraceID;
X-B3-SpanId:对应SpanID;
X-B3-ParentSpanId:前面一环的SpanID;
X-B3-Sampled:是否被选中抽样输出;
X-Span-Name:工作单元名称。
7 微信公众号环境搭建
1.导入开源的微信公众号框架项目 weixin-java-mp-demo-springboot-master
2. 申请一个测试的微信公众号
微信公众平台
3.免费为每个用户分配一个测试的appid和appSecret
openid: 微信公众号开放的唯一userId
微信公众号创建模板信息
微信公众平台
服务名称:{{first.DATA}}
IP和端口:{{keyword1.DATA}}
错误内容:{{keyword2.DATA}}
错误时间:{{keyword3.DATA}}
2.模板创建成功之后 会返回一个 模板id
3.发送微信模板代码实现
@Autowired
private WxMpProperties wxMpProperties;
@Value("${tcf.errorTemplateId}")
private String errorTemplateId;
@RequestMapping("sendWechatTemplateError")
public String sendWechatTemplateError(@RequestBody ServiceErrorDto serviceErrorDto) {
WxMpTemplateMsgService wxMpTemplateMsgService = WxMpConfiguration.getMpServices().get(wxMpProperties.getConfigs().get(0).getAppId()).getTemplateMsgService();
WxMpTemplateMessage wxMpTemplateMessage = new WxMpTemplateMessage();
wxMpTemplateMessage.setTemplateId(errorTemplateId);
wxMpTemplateMessage.setToUser(serviceErrorDto.getOpenId());
List<WxMpTemplateData> data = new ArrayList<>();
data.add(new WxMpTemplateData("first", serviceErrorDto.getServiceName()));
data.add(new WxMpTemplateData("keyword1", serviceErrorDto.getIp() + ":" + serviceErrorDto.getPort()));
data.add(new WxMpTemplateData("keyword2", serviceErrorDto.getErrorMsg()));
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = formatter.format(serviceErrorDto.getErrorTime());
data.add(new WxMpTemplateData("keyword3", format));
wxMpTemplateMessage.setData(data);
wxMpTemplateMessage.setUrl("http://www.tcf.com");
try {
wxMpTemplateMsgService.sendTemplateMsg(wxMpTemplateMessage);
} catch (Exception e) {
log.error("<e:>", e);
}
return "ok";
}
@Data
public class ServiceErrorDto {
/**
* ip地址
*/
private String ip;
/**
* 端口号
*/
private Integer port;
/**
* 服务的名称
*/
private String serviceName;
/**
* 错误内容
*/
private String errorMsg;
/**
* 错误时间
*/
private Date errorTime;
/**
* 用户的关注微信公众号的 openId
*/
private String openId;
}
5 测试JSON请求参数
{
"ip":"127.0.0.1",
"port":8080,
"serviceName":"tcf-member",
"errorMsg":"null",
"errorTime":"2022-07-11 00:21:35",
"openId":"okYSmtzp4wWCrDCncMfGSRECVSeM"
}
127.0.0.1:8080/sendWechatTemplateError
6.定义一段测试代码:
@RestController
public class TcfService {
??? @RequestMapping("/tcf")
??? public String tcf(Long id) {
??????? return "YES" + 10 % id;
??? }
}
7.定义全局捕获异常
import com.alibaba.fastjson.JSONObject;
import com.tcf.service.asyn.AsynSendWechatTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.client.RestTemplate;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Enumeration;
/**
* 全局捕获异常
*/
@ControllerAdvice
@Slf4j
public class MyExceptionHandler {
??? @Value("${server.port}")
??? private String serverPort;
??? @Value("${spring.application.name}")
??? private String serviceName;
??? @Autowired
??? private AsynSendWechatTemplate asynSendWechatTemplate;
??? @ExceptionHandler(value = Exception.class)
??? public String exceptionHandler(Exception e) {
??????? log.error("未知异常!原因是:" + e);
??????? JSONObject data = new JSONObject();
??????? data.put("ip", getIpAddress());
??????? data.put("port", serverPort);
??????? data.put("serviceName", serviceName);
??????? data.put("errorMsg", e.getLocalizedMessage());
??????? SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
??????? String format = formatter.format(new Date());
??????? data.put("errorTime", format);
??????? data.put("openId", "okYSmtzp4wWCrDCncMfGSRECVSeM");
//??????? HttpHeaders headers = new HttpHeaders();
//??????? MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
//??????? headers.setContentType(type);
//??????? headers.add("Accept", MediaType.APPLICATION_JSON.toString());
//??????? HttpEntity<String> formEntity = new HttpEntity<String>(data.toString(), headers);
//??????? String result = restTemplate.postForEntity("http://127.0.0.1:9099/sendWechatTemplateError", formEntity, String.class).getBody();
//??????? log.info("result:" + result);
??????? asynSendWechatTemplate.sendWechatTemplate(data);
??????? return "系统出现错误!";
??? }
??? public static String getIpAddress() {
??????? String hostAddress = "";
??????? try {
??????????? InetAddress address = InetAddress.getLocalHost();
??????????? hostAddress = address.getHostAddress();
??????? } catch (UnknownHostException e) {
??????? }
??????? return hostAddress;
??? }
}
8.注入restTemplate
??? @Bean
??? public RestTemplate restTemplate() {
??????? return new RestTemplate();
??? }
9.优化代码 改成异步实现
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
@Component
@Slf4j
public class AsynSendWechatTemplate {
??? private static ArrayBlockingQueue<JSONObject> arrayBlockingQueue =
??????????? new ArrayBlockingQueue<JSONObject>(1024);
??? @Autowired
??? private RestTemplate restTemplate;
??? public AsynSendWechatTemplate() {
??????? new Thread(new ThreadAsynSendWechatTemplate()).start();
??? }
??? /**
???? * 存入到 队列中
???? *
???? * @param data
???? */
??? public void sendWechatTemplate(JSONObject data) {
??????? arrayBlockingQueue.offer(data);
??? }
??? class ThreadAsynSendWechatTemplate implements Runnable {
??????? @Override
??????? public void run() {
??????????? while (true) {
??????????????? JSONObject data = arrayBlockingQueue.poll();
??????????????? if (data != null) {
??????????????????? HttpHeaders headers = new HttpHeaders();
??????????????????? MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
??????????????????? headers.setContentType(type);
??????????????????? headers.add("Accept", MediaType.APPLICATION_JSON.toString());
??????????????????? HttpEntity<String> formEntity = new HttpEntity<String>(data.toString(), headers);
??????????????????? String result = restTemplate.postForEntity("http://127.0.0.1:9099/sendWechatTemplateError", formEntity, String.class).getBody();
??????????????????? log.info("result:" + result);
??????????????? }
??????????????? try {
??????????? ????????// 避免cpu飙高的问题
??????????????????? Thread.sleep(100);
??????????????? } catch (Exception e) {
??????????????? }
??????????? }
??????? }
??? }
??? public static String getIpAddress() {
??????? String hostAddress = "";
??????? try {
??????????? InetAddress address = InetAddress.getLocalHost();
??????????? hostAddress = address.getHostAddress();
??????? } catch (UnknownHostException e) {
??????? }
??????? return hostAddress;
??? }
}
8 SkyWorking简介
SkyWalking是一个开源的观测平台,用于从服务和云原生等基础设施中收集、分析、聚合以及可视化数据,SkyWalking 提供了一种简便的方式来清晰地观测分布式系统,甚至可以观测横跨不同云的系统,SkyWalking 更像是一种现代的应用程序性能监控(Application Performance Monitoring,即APM)工具,专为云原生,基于容器以及分布式系统而设计。
监控--分布式追踪调用链 、jvm内存变化、智能报警、cpu飙高、服务器基本配置信息
(图片摘自:Apache SkyWalking )
9 SkyWalking架构原理
在整个skywalking的系统中,有四个角色:
1.skywalking agent和业务系统关联在一起,负责收集各种监控数据;
2.oapservice是负责处理监控数据的,比如接受skywalking agent的监控数据,并存储在数据库中(例如elasticsearch、mysql中等);接受skywalking webapp的前端请求,从数据库查询数据,并返回数据给前端。,Skywalking oapservice通常以集群的形式存在;
3.skywalking webapp,前端界面,用于展示数据;
4.用于存储监控数据的数据库,比如mysql、elasticsearch等;
10 SkyWalking 环境搭建
1. 下载apache-skywalking-apm-6.5.0.tar安装包
2. 进入到bin目录(直接双击启动startup.bat)
3. 启动启动oapService和我们的webappService
4. 查看管理界面http://127.0.0.1:8080/
5 IDEA启动类相关配置,新增VM Options配置参数
Idea 相关的配置:
-javaagent:D:\apache-skywalking-apm-bin\agent\skywalking-agent.jar
-Dskywalking.agent.service_name=user-service
-Dskywalking.collector.backend_service=127.0.0.1:11800
?
?
11 SkyWalking获取全局追踪id
maven依赖:
??????? <dependency>
??????????? <groupId>org.apache.skywalking</groupId>
??????????? <artifactId>apm-toolkit-trace</artifactId>
??????????? <version>6.5.0</version>
??????? </dependency>
String traceId = TraceContext.traceId();
12 SkyWalking告警功能
SkyWalking 告警功能是在6.x版本新增的,其核心由一组规则驱动,这些规则定义在config/alarm-settings.yml文件中
SkyWalking 的发行版都会默认提供config/alarm-settings.yml文件,里面预先定义了一些常用的告警规则。如下:
1.过去3分钟内服务平均响应时间超过1秒
2.服务成功率在过去2分钟内低于80%
3.服务90%响应时间在过去3分钟内低于1000毫秒
4.服务实例在过去2分钟内的平均响应时间超过1秒
5.端点平均响应时间过去2分钟超过1秒
1.SkyWalking 调用开发者自己定义 接受报警接口;
2.开发者接受报警内容之后 在去调用微信公众号接口发送模板提醒
接受数据的格式
[{
??? "scopeId": 1,
??? "scope": "SERVICE",
??? "name": "serviceA",
??? "id0": 12,
??? "id1": 0,
??? "ruleName": "service_resp_time_rule",
??? "alarmMessage": "alarmMessage xxxx",
??? "startTime": 1560524171000
}, {
??? "scopeId": 1,
??? "scope": "SERVICE",
??? "name": "serviceB",
??? "id0": 23,
??? "id1": 0,
??? "ruleName": "service_resp_time_rule",
??? "alarmMessage": "alarmMessage yyy",
??? "startTime": 1560524171000
}]
scopeId、scope:所有可用的 Scope 详见
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine
name:目标 Scope 的实体名称
id0:Scope 实体的 ID
id1:保留字段,目前暂未使用
ruleName:告警规则名称
alarmMessage:告警消息内容
startTime:告警时间,格式为时间戳
提供报警接口,发送微信模板、封装报警参数代码实现:
/**
* 封装报警参数的对象
*/
public class AlarmMessageDto {
private int scopeId;
private String name;
private int id0;
private int id1;
private String alarmMessage;
private long startTime;
public int getScopeId() {
return scopeId;
}
public String getName() {
return name;
}
public int getId0() {
return id0;
}
public int getId1() {
return id1;
}
public String getAlarmMessage() {
return alarmMessage;
}
public long getStartTime() {
return startTime;
}
public void setScopeId(int scopeId) {
this.scopeId = scopeId;
}
public void setName(String name) {
this.name = name;
}
public void setId0(int id0) {
this.id0 = id0;
}
public void setId1(int id1) {
this.id1 = id1;
}
public void setAlarmMessage(String alarmMessage) {
this.alarmMessage = alarmMessage;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
}
import com.tcf.entity.AlarmMessageDto;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* 报警接口
*/
@RestController
public class PoliceService {
private List<List<AlarmMessageDto>> listAlarmMessage = new ArrayList<>();
/**
* 存放告警信息
*
* @param alarmMessageList
*/
@RequestMapping(value = "/police", method = RequestMethod.POST)
public void alarm(@RequestBody List<AlarmMessageDto> alarmMessageList) {
listAlarmMessage.add(alarmMessageList);
WxMpTemplateMsgService wxMpTemplateMsgService = WxMpConfiguration.getMpServices().get(wxMpProperties.getConfigs().get(0).getAppId()).getTemplateMsgService();
WxMpTemplateMessage wxMpTemplateMessage = new WxMpTemplateMessage();
wxMpTemplateMessage.setTemplateId(errorTemplateId);
wxMpTemplateMessage.setToUser(serviceErrorDto.getOpenId());
List<WxMpTemplateData> data = new ArrayList<>();
data.add(new WxMpTemplateData("first", serviceErrorDto.getServiceName()));
data.add(new WxMpTemplateData("keyword1", serviceErrorDto.getIp() + ":" + serviceErrorDto.getPort()));
data.add(new WxMpTemplateData("keyword2", serviceErrorDto.getErrorMsg()));
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = formatter.format(serviceErrorDto.getErrorTime());
data.add(new WxMpTemplateData("keyword3", format));
wxMpTemplateMessage.setData(data);
wxMpTemplateMessage.setUrl("http://supertian.blog.csdn.net");
try {
wxMpTemplateMsgService.sendTemplateMsg(wxMpTemplateMessage);
} catch (Exception e) {
log.error("<e:>", e);
}
}
/**
* 打印告警信息
*
* @return
*/
@RequestMapping("/getListAlarmMessageDto")
public List<List<AlarmMessageDto>> getListAlarmMessageDto() {
return listAlarmMessage;
}
}
13 SkyWalking数据持久化
1 需要在oap-libs/ 放入 mysql-connector-java-8.0.16.jar
mysql-connector-java-8.0.16.jar 下载:
2 重启SkyWalking
自动创建表结构
|