IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Skywalking原理篇(二):Agent 与 OAP 的通信原理 -> 正文阅读

[Java知识库]Skywalking原理篇(二):Agent 与 OAP 的通信原理

GRPC 基础知识

基本介绍

gRPC 是一个高性能、开源和通用的 RPC 框架,面向服务端和移动端,基于 HTTP/2 协议标准而设计,默认使用 ProtoBuf(Protocol Buffers) 序列化协议进行开发,当前支持C、Java、Go等多种语言
gRPC提供了一种简单的方法来精确的定义服务,并且为客户端和服务端自动生成可靠的功能库。与很多RPC系统一样,服务端负责实现定义好的接口并处理客户端的请求,客户端根据接口描述直接本地调用需要的服务而不用去关心具体底层通信细节和调用过程。客户端和服务端可以分别使用gRPC支持的不同语言进行实现

基本通信流程

在这里插入图片描述

  1. gRPC 通信的第一步是定义 IDL(Interface Definition Language)proto 文件
  2. 第二步是编译 proto 文件,得到存根 Stub 文件。Stub 中集成了服务调用、数据序列化等底层功能,客户端使用它与服务端进行交互。
  3. 第三步是服务端实现第一步定义的接口并启动,这些接口的定义也在存根 Stub 文件里面
  4. 最后一步是客户端借助 Stub 文件调用服务端的函数,虽然客户端调用的函数是有服务端实现的,但是调用起来就像是本地函数一样

多语言 Hello World 案例

案例获取地址

准备工作

IDEA 安装 Protobuf SupportProtobuf Editor 插件用于识别 proto 文件

Protobuf Support 插件官方已弃用,不支持IDEA19+的版本,由于我的电脑IDEA版本较低,所以用的这个
Protobuf Editor 可以支持最新版本的IDEA,但不支持部分老版本的

准备好 python 环境并安装 grpc 相关插件

pip3 install grpcio
pip3 install grpcio-tools

项目搭建

项目结构如下

grpc-sample
├── src
│   └── main
│        ├── java
│        │   └── io
│        │       └── grpc
│        │           └── sample
│        │               └── helloworld
│        ├── proto
│        ├── python
│        └── resources
└── pom.xml        

创建一个 Maven 工程,配置 pom 文件,导入 gRPC 的依赖和编译插件

<properties>
  <grpc-version>1.20.0</grpc-version>
</properties>

<dependencies>
  <dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-core</artifactId>
    <version>${grpc-version}</version>
  </dependency>
  <dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>${grpc-version}</version>
  </dependency>
  <dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>${grpc-version}</version>
  </dependency>
  <dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>${grpc-version}</version>
  </dependency>
</dependencies>

<build>
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.5.0.Final</version>
    </extension>
  </extensions>

  <plugins>
    <!-- protobuf插件 -->
    <plugin>
      <groupId>org.xolstice.maven.plugins</groupId>
      <artifactId>protobuf-maven-plugin</artifactId>
      <version>0.5.1</version>
      <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.9.1:exe:${os.detected.classifier}</pluginArtifact>
        <!-- proto文件的所在路径 -->
        <protoSourceRoot>src/main/proto</protoSourceRoot>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>compile-custom</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.6.1</version>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
      </configuration>
    </plugin>
  </plugins>
</build>

在 main/proto 目录下创建 helloworld.proto 文件,定义服务和序列化数据结构

// 指定使用proto3的语法,如果不指定的话,编译器会使用proto2去编译
syntax = "proto3";

// option选项: 影响 特定环境下 的处理方式

// 不同的类会分散为多个java文件(默认false代表所有内容集中在同一个java文件)
option java_multiple_files = true;
// 指定生成的类应该放在什么Java包名下(默认采用包名)
option java_package = "io.grpc.sample.helloworld";
// 指定生成的java文件的类名(默认根据文件名采用驼峰式生成)
option java_outer_classname = "HelloWorldProto";

// 声明包名, 防止项目间命名冲突
package helloworld;

// 定义服务接口
service Greeter {
  // 定义一个简单rpc方法 sayHello(除此之外还有3种流式rpc方法)
  // 接收一个 HelloRequest 消息体, 返回一个 HelloReply 消息体
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// 定义请求消息体
message HelloRequest {
  // 消息字段
  string name = 1;
}

// 定义响应消息体
message HelloReply {
  string message = 1;
}

使用 protobuf-maven-plugin 插件编译 proto 文件生成 Java 文件

前面在 pom 文件中加入了 protobuf 插件后,可以在 maven projects 视图界面看到以下几个编译指令

Plugins
└── protobuf
    ├── protobuf:compile # 编译消息对象, 生成java文件
    ├── protobuf:compile-cpp # 编译消息对象, 生成c文件
    ├── protobuf:compile-custom # 依赖消息对象, 生成接口服务文件
    ├── protobuf:compile-javanano # 编译消息对象, 生成javanano文件
    └── protobuf:compile-python # 编译消息对象, 生成python文件

我们在使用时,只需要执行以下两个指令

  • protobuf:compile
    默认在 target/generated-sources/protobuf/java 目录下生成消息文件
  • protobuf:compile-custom
    默认在 target/generated-sources/protobuf/grpc-java 目录下生成接口服务文件

执行完成后会生成如下文件

target/generated-sources/
├── annotations
└── protobuf
    ├── grpc-java
    │   └── io/grpc/sample/helloworld
    │       └── GreeterGrpc.java
    └── java
        └── io/grpc/sample/helloworld
            ├── HelloReply.java
            ├── HelloReplyOrBuilder.java
            ├── HelloRequest.java
            ├── HelloRequestOrBuilder.java
            └── HelloWorldProto.java

使用 grpcio-tools 工具编译 proto 文件生成 Python 文件

# 进入proto目录
cd proto
# 编译proto文件并将对应的python文件输出到python目录
python -m grpc_tools.protoc --python_out=../python --grpc_python_out=../python -I. helloworld.proto

执行完成后会在 python 目录下生成如下文件

python
├── helloworld_pb2.py # 消息文件
└── helloworld_pb2_grpc.py # 接口服务文件

创建 Java 服务端

  • 首通过监听端口创建 server 实例
  • 然后重载 GreeterGrpc.GreeterImplBase 中定义的 sayHello 方法来提供服务的具体实现,并将该服务注册到 server 实例中
  • 最后再启动 server 服务
package io.grpc.sample.helloworld;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
 * HelloWorld 服务端
 * Server that manages startup/shutdown of a {@code Greeter} server.
 */
public class HelloWorldServer {
    private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());

    private Server server;

    private void start() throws IOException {
        int port = 50051;
        // 创建server对象, 监听50051端口
        server = ServerBuilder.forPort(port)
                // 注册服务
                .addService(new GreeterImpl())
                .build()
                .start();
        logger.info("Server started, listening on " + port);
        // 添加JVM停止时的回调函数
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                try {
                    HelloWorldServer.this.stop();
                } catch (InterruptedException e) {
                    e.printStackTrace(System.err);
                }
                System.err.println("*** server shut down");
            }
        });
    }

    private void stop() throws InterruptedException {
        if (server != null) {
            // 等待所有提交的任务执行结束关闭服务
            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
        }
    }

    /**
     * Await termination on the main thread since the grpc library uses daemon threads.
     */
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    /**
     * Main launches the server from the command line.
     */
    public static void main(String[] args) throws IOException, InterruptedException {
        final HelloWorldServer server = new HelloWorldServer();
        // 启动服务
        server.start();
        // 阻塞当前主线程
        server.blockUntilShutdown();
    }

    // 扩展grpc自动生成的服务接口抽象, 实现业务功能
    static class GreeterImpl extends GreeterGrpc.GreeterImplBase {

        @Override
        public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
            // 构建响应消息, 从请求消息中获取姓名并在前面拼接上"Hello "
            HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
            // 在流关闭或抛出异常前可以调用多次
            responseObserver.onNext(reply);
            // 关闭流
            responseObserver.onCompleted();
        }
    }
}

创建 Java 客户端

客户端的代码主要分为三个步骤

  • 首先用 hostport 生成 channel 连接
  • 然后用 channel 连接和前面生成的 GreeterGrpc 接口服务类创建 Stub
  • 最后使用 StubsayHello 方法发起真正的 Rpc 调用,至于后续的其他通信细节我们就感知不到了
package io.grpc.sample.helloworld;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A simple client that requests a greeting from the {@link HelloWorldServer}.
 * @author wangbo
 * @since 2021/7/6
 */
public class HelloWorldClient {
    private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());

    private final GreeterGrpc.GreeterBlockingStub blockingStub;

    /** Construct client for accessing HelloWorld server using the existing channel. */
    public HelloWorldClient(Channel channel) {
        // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
        // shut it down.

        // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
        // 使用Channel创建阻塞调用的 Stub 存根对象(也可以创建非阻塞的 FutureStub)
        blockingStub = GreeterGrpc.newBlockingStub(channel);
    }

    /** Say hello to server. */
    public void greet(String name) {
        logger.info("Will try to greet " + name + " ...");
        HelloRequest request = HelloRequest.newBuilder().setName(name).build();
        HelloReply response;
        try {
            // 使用 Stub 发起 rpc 调用
            response = blockingStub.sayHello(request);
        } catch (StatusRuntimeException e) {
            logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
            return;
        }
        logger.info("Greeter client received: " + response.getMessage());
    }

    /**
     * Greet server. If provided, the first element of {@code args} is the name to use in the
     * greeting. The second argument is the target server.
     */
    public static void main(String[] args) throws Exception {
        String name = "Java";
        // Access a service running on the local machine on port 50051
        String target = "localhost:50051";
        // Allow passing in the user and target strings as command line arguments
        if (args.length > 0) {
            if ("--help".equals(args[0])) {
                System.err.println("Usage: [name [target]]");
                System.err.println("");
                System.err.println("  name    The name you wish to be greeted by. Defaults to " + name);
                System.err.println("  target  The server to connect to. Defaults to " + target);
                System.exit(1);
            }
            name = args[0];
        }
        if (args.length > 1) {
            target = args[1];
        }

        // Create a communication channel to the server, known as a Channel. Channels are thread-safe
        // and reusable. It is common to create channels at the beginning of your application and reuse
        // them until the application shuts down.
        // 使用 Plaintext 协商模式创建 ManagedChannel
        ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
                // needing certificates.
                .usePlaintext()
                .build();
        try {
            HelloWorldClient client = new HelloWorldClient(channel);
            client.greet(name);
        } finally {
            // ManagedChannels use resources like threads and TCP connections. To prevent leaking these
            // resources the channel should be shut down when it will no longer be used. If it may be used
            // again leave it running.
            channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
        }
    }
}

创建 Python 客户端

from __future__ import print_function
import logging

import grpc

import helloworld_pb2
import helloworld_pb2_grpc


def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    with grpc.insecure_channel('127.0.0.1:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(helloworld_pb2.HelloRequest(name='Python'))
    print("Greeter client received: " + response.message)


if __name__ == '__main__':
    logging.basicConfig()
    run()

结果验证

启动服务端并分别使用 Java 客户端和 Python 客户端进行调用

在这里插入图片描述

以 ServiceManagementClient 服务为例分析 Agent 与 OAP 的网络通信

我们知道,通过 Agent 代理的 Web 服务在启动之后不经过任何访问就能在 UI 控制台看到服务和实例信息(端点信息需要访问具体的接口后通过日志切面插件进行上报),这些信息是如何注册到 OAP 的呢?

Agent客户端

类图

在这里插入图片描述

网络连接管理

Skywalking Agent 采集到各种 Metric 信息后,就是通过 GRPC 的方式上报到 OAP
那么 Agent 作为客户端首先需要创建一个 Channel 连接。对于客户端来说创建和销毁
Channel 代价是昂贵的,但是建立一个 Stub 是很简单的,就像创建一个普通对象。因此
Channel 就需要复用,进而提高交互效率。

GRPCChannel

GRPC 中有两个抽象类

  • ManagedChannel 它逻辑上表示一个 Channel,底层持有一个 TCP 链接
  • ManagedChannelBuilder 它负责创建客户端 Channel,常用的实现有三种
    • NettyChannelBuilder 底层采用 Netty 创建 Channel
    • OkHttpChannelBuilder 底层采用 OkHttp 创建 Channel
    • InProcessChannelBuilder 用于创建进程内通信使用的Channel

Skywalking 对 原生连接 ManagedChannel 进行了一些封装
org.skywalking.apm.agent.core.remote.GRPCChannel

// 原始 channel
private final ManagedChannel originChannel;
// 增加了装饰器的 channel
private final Channel channelWithDecorators;

private GRPCChannel(
    // OAP 的 ip 和 端口
    String host, int port,
    // ChannelBuilder 的构造器集合,默认有两种实现分别用于设置明文传输和密文传输
    List<ChannelBuilder> channelBuilders, 
    // Channel 的装饰器集合
    List<ChannelDecorator> decorators
) throws Exception {
    // 使用 NettyChannelBuilder 创建 ChannelBuilder
    ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port);
	// 为 ChannelBuilder 注入外部实现
    for (ChannelBuilder builder : channelBuilders) {
        channelBuilder = builder.build(channelBuilder);
    }
	// 使用 ChannelBuilder 创建原始 channel
    this.originChannel = channelBuilder.build();

    Channel channel = originChannel;
    // 使用装饰器集合扩展原始 channel 的功能
    for (ChannelDecorator decorator : decorators) {
        channel = decorator.build(channel);
    }

    channelWithDecorators = channel;
}

public static Builder newBuilder(String host, int port) {
	return new Builder(host, port);
}

public static class Builder {
    private final String host;
    private final int port;
    private final List<ChannelBuilder> channelBuilders;
    private final List<ChannelDecorator> decorators;

    private Builder(String host, int port) {
        this.host = host;
        this.port = port;
        this.channelBuilders = new LinkedList<>();
        this.decorators = new LinkedList<>();
    }

    public Builder addChannelDecorator(ChannelDecorator interceptor) {
        this.decorators.add(interceptor);
        return this;
    }

    public GRPCChannel build() throws Exception {
        return new GRPCChannel(host, port, channelBuilders, decorators);
    }

    public Builder addManagedChannelBuilder(ChannelBuilder builder) {
        channelBuilders.add(builder);
        return this;
    }
}

可以看到这里使用到了 装饰器模式 对原始连接进行了扩展,并用 建造者模式 生成装饰后的连接

GRPCChannelManager

该类是 Channel 的连接管理器,它主要负责生成 Channel 连接,提供给其他需要跟 OAP 交互的服务使用,并启动一个定时任务监控连接的活性,如果检测到底层连接处于关闭状态,将会尝试重建连接。

先来介绍下该类中的核心字段
org.skywalking.apm.agent.core.remote.GRPCChannelManager

// 在 ManagedChannel 的基础上进行了封装的 Channel 连接
private volatile GRPCChannel managedChannel = null;
// 连接状态定时检测任务
private volatile ScheduledFuture<?> connectCheckFuture;
// 重连标识符
private volatile boolean reconnect = true;
// 重连次数
private volatile int reconnectCount = 0;
// 监听器列表, 用于在 channel 连接状态发生变化时通知其他使用该连接的服务
private final List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<>());
// 下面三个参数 用于指定当前连接的 OAP 实例
private volatile List<String> grpcServers;
private final Random random = new Random();
private volatile int selectedIdx = -1;

从上一篇文章 Agent 启动流程解析 可以得知实现 BootService 接口的服务会依次调用
preparebootonComplete 方法。
GRPCChannelManagerprepareonComplete 都是空实现,我们直接来看下 boot 方法

public void boot() {
    if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) {
        LOGGER.error("Collector server addresses are not set.");
        LOGGER.error("Agent will not uplink any data.");
        return;
    }
    // 解析 OAP 实例地址
    grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
    // 开启一个检测 Channel 连接状态并适时重启的定时任务, 默认30s
    connectCheckFuture = Executors.newSingleThreadScheduledExecutor(
        // 自定义线程工厂,用于为线程命名并开启守护线程
        new DefaultNamedThreadFactory("GRPCChannelManager")
    ).scheduleAtFixedRate(
        new RunnableWithExceptionProtection(
            this,
            // 线程异常时的回调
            t -> LOGGER.error("unexpected exception.", t)
        ), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS
    );
}

再来看下线程任务的具体实现

public void run() {
    LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
    // 如果开启了 DNS 定期解析功能并且需要重连时,重新解析配置类中的 OAP 实例地址信息赋值给 grpcServers
    if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) {
        String backendService = Config.Collector.BACKEND_SERVICE.split(",")[0];
        try {
            String[] domainAndPort = backendService.split(":");

            List<String> newGrpcServers = Arrays
                    .stream(InetAddress.getAllByName(domainAndPort[0]))
                    .map(InetAddress::getHostAddress)
                    .map(ip -> String.format("%s:%s", ip, domainAndPort[1]))
                    .collect(Collectors.toList());

            grpcServers = newGrpcServers;
        } catch (Throwable t) {
            LOGGER.error(t, "Failed to resolve {} of backend service.", backendService);
        }
    }
	// 判断是否需要重连
    if (reconnect) {
        // 判断是否 OAP 地址列表是否为空
        if (grpcServers.size() > 0) {
            String server = "";
            try {
                // 使用随机数对 OAP 实例数取模,相当于一个随机的负载均衡
                int index = Math.abs(random.nextInt()) % grpcServers.size();
                // 判断当前选中的 OAP 实例是否为上次选中的实例
                if (index != selectedIdx) {
                    // 如果不是,则需要重新初始化 channel 连接
                    // 赋值新选中的OAP Id
                    selectedIdx = index;
					
                    // 取出 OAP 的 ip和端口
                    server = grpcServers.get(index);
                    String[] ipAndPort = server.split(":");

                    // 关闭当前已不可用的 channel 连接
                    if (managedChannel != null) {
                        managedChannel.shutdownNow();
                    }

                    // 重新构建 channel 连接
                    managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
                        						// 设置明文传输模式、传输消息大小限制
                                                .addManagedChannelBuilder(new StandardChannelBuilder())
                                                // 设置密文传输模式
                        						.addManagedChannelBuilder(new TLSChannelBuilder())
                        						// AgentID装饰器,用于在请求头上加一个 key 为 Agent-Version 的版本号
                                                .addChannelDecorator(new AgentIDDecorator())
                        						// 认证装饰器,用于在请求头上加一个 key 为 AUTHENTICATION 的 token
                                                .addChannelDecorator(new AuthenticationDecorator())
                                                .build();
                    // notify方法会循环调用所有注册在当前连接上的 GRPCChannelListener 实例的 statusChanged 方法,通知它们连接创建成功的事件
                    notify(GRPCChannelStatus.CONNECTED);
                    reconnectCount = 0;
                    // 设置重连状态为false
                    reconnect = false;
                // 如果是上次连接的服务,由于该连接已不可用,所以需要尝试重连    
                } else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) {
                    // Reconnect to the same server is automatically done by GRPC,
                    // therefore we are responsible to check the connectivity and
                    // set the state and notify listeners
                    reconnectCount = 0;
                    notify(GRPCChannelStatus.CONNECTED);
                    reconnect = false;
                }

                return;
            } catch (Throwable t) {
                LOGGER.error(t, "Create channel to {} fail.", server);
            }
        }

        LOGGER.debug(
            "Selected collector grpc service is not available. Wait {} seconds to retry",
            Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL
        );
    }
}

private void notify(GRPCChannelStatus status) {
    // 循环遍历所有使用到 GRPCChannel 的服务
    for (GRPCChannelListener listener : listeners) {
        try {
            // 同步连接状态变更信息
            listener.statusChanged(status);
        } catch (Throwable t) {
            LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
        }
    }
}

核心流程图归纳如下
在这里插入图片描述

实例信息注册与心跳服务定义

Management.proto

首先来介绍服务的协议文件定义
apm-protocol/apm-network/src/main/proto/management/Management.proto
其中定义了2个方法
reportInstanceProperties 负责上报服务实例信息与OS信息
keepAlive 负责上报服务心跳包

syntax = "proto3";

package skywalking.v3;

option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.management.v3";
option csharp_namespace = "SkyWalking.NetworkProtocol.V3";
option go_package = "skywalking/network/management/v3";

import "common/Common.proto";

// 定义服务接口
service ManagementService {
    // 实例信息上报方法
    rpc reportInstanceProperties (InstanceProperties) returns (Commands) {
    }

    // 服务心跳检测方法
    rpc keepAlive (InstancePingPkg) returns (Commands) {

    }
}

// 实例信息上报请求体
message InstanceProperties {
    // 服务名
    string service = 1;
    // 服务实例名
    string serviceInstance = 2;
    // 服务实例额外KV信息
    repeated KeyStringValuePair properties = 3;
}

// 心跳检测上报请求体
message InstancePingPkg {
    // 服务名
    string service = 1;
    // 服务实例名
    string serviceInstance = 2;
}

对应生成的文件结构如下

apm-protocol/apm-network/target/generated-sources/
└── protobuf
    ├── grpc-java
    │   └── org/apache/skywalking/apm/network/management.v3
    │       └── ManagementServiceGrpc.java
    └── java
        └── org/apache/skywalking/apm/network/management.v3
            ├── InstancePingPkg.java
            ├── InstancePingPkgOrBuilder.java
            ├── InstanceProperties.java
            ├── InstancePropertiesOrBuilder.java
            └── Management.java

ServiceManagementClient

客户端具体实现

@DefaultImplementor
public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener {
    private static final ILog LOGGER = LogManager.getLogger(ServiceManagementClient.class);
    private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES;

    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
    private volatile ManagementServiceGrpc.ManagementServiceBlockingStub managementServiceBlockingStub;
    private volatile ScheduledFuture<?> heartbeatFuture;
    private volatile AtomicInteger sendPropertiesCounter = new AtomicInteger(0);

    @Override
    public void statusChanged(GRPCChannelStatus status) {
        if (GRPCChannelStatus.CONNECTED.equals(status)) {
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
            managementServiceBlockingStub = ManagementServiceGrpc.newBlockingStub(channel);
        } else {
            managementServiceBlockingStub = null;
        }
        this.status = status;
    }

    @Override
    public void prepare() {
        // 注册连接状态监听器
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);

        SERVICE_INSTANCE_PROPERTIES = new ArrayList<>();

        // 解析 INSTANCE_PROPERTIES 配置并转为 KeyStringValuePair 上报格式
        for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
            SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
                    .setKey(key)
                    .setValue(Config.Agent.INSTANCE_PROPERTIES.get(key))
                    .build());
        }

        // 设置实例名称, 如果配置文件中已配置则直接使用, 否则使用UUID + ip信息生成
        Config.Agent.INSTANCE_NAME = StringUtil.isEmpty(Config.Agent.INSTANCE_NAME)
                ? UUID.randomUUID().toString().replaceAll("-", "") + "@" + OSUtil.getIPV4()
                : Config.Agent.INSTANCE_NAME;
    }

    @Override
    public void boot() {
        // 开启一个定时任务, 每隔一段时间上报服务实例信息
        heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("ServiceManagementClient")
        ).scheduleAtFixedRate(
                new RunnableWithExceptionProtection(
                        this,
                        t -> LOGGER.error("unexpected exception.", t)
                ), 0, Config.Collector.HEARTBEAT_PERIOD,
                TimeUnit.SECONDS
        );
    }

    @Override
    public void onComplete() {
    }

    @Override
    public void shutdown() {
        heartbeatFuture.cancel(true);
    }

    @Override
    public void run() {
        LOGGER.debug("ServiceManagementClient running, status:{}.", status);

        // 如果连接可用
        if (GRPCChannelStatus.CONNECTED.equals(status)) {
            try {
                if (managementServiceBlockingStub != null) {
                    // 服务实例上报周期为 Config.Collector.HEARTBEAT_PERIOD * Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR 默认每隔5min一次
                    if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {

                        // 发起rpc调用上报实例信息
                        managementServiceBlockingStub
                                // 设置GRPC请求超时时间 默认30s
                                .withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                .reportInstanceProperties(InstanceProperties.newBuilder()
                                        // 设置服务名称
                                        .setService(Config.Agent.SERVICE_NAME)
                                        // 设置服务实例名称
                                        .setServiceInstance(Config.Agent.INSTANCE_NAME)
                                        // 设置操作系统相关OS信息
                                        .addAllProperties(OSUtil.buildOSInfo(
                                                Config.OsInfo.IPV4_LIST_SIZE))
                                        .addAllProperties(SERVICE_INSTANCE_PROPERTIES)
                                        .build());
                    } else {
                        // 服务心跳上报周期为 Config.Collector.HEARTBEAT_PERIOD 默认30s
                        final Commands commands = managementServiceBlockingStub.withDeadlineAfter(
                                GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
                        ).keepAlive(InstancePingPkg.newBuilder()
                                // 设置服务名称
                                .setService(Config.Agent.SERVICE_NAME)
                                // 设置服务实例名称
                                .setServiceInstance(Config.Agent.INSTANCE_NAME)
                                .build());

                        // 处理心跳请求响应
                        ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                    }
                }
            } catch (Throwable t) {
                LOGGER.error(t, "ServiceManagementClient execute fail.");
                // 通知 GRPCChannelManager 连接已断开
                ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
            }
        }
    }
}

可以看到客户端发起了2个 RPC 调用

  • reportInstanceProperties 服务实例信息上报默认3分钟发起一次
  • keepAlive 心跳检测默认30秒发起一次

OAP 服务端

类图

在这里插入图片描述

Server

Skywalking OAP 将对外提供的服务抽象为 Server 接口,它有2个实现

  • GRPCServer 主要负责接收 Agent 发送的 GRPC 请求,默认监听 11800 端口
  • JettyServer 主要负责接收 UI 界面发送的 HTTP 请求,默认监听 12800 端口

GRPCServer

封装了 GRPC 服务参数的初始化、服务注册接口、启动方法

public class GRPCServer implements Server {

    // 服务ip与端口
    private final String host;
    private final int port;
    // 每个连接的最大并发请求数
    private int maxConcurrentCallsPerConnection;
    // 单个消息请求体的最大值
    private int maxMessageSize;
    // server实例
    private io.grpc.Server server;
    private NettyServerBuilder nettyServerBuilder;
    // ssl密文相关
    private String certChainFile;
    private String privateKeyFile;
    private DynamicSslContext sslContext;
    // 自定义线程池参数
    private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 4;
    private int threadPoolQueueSize = 10000;

    public GRPCServer(String host, int port) {
        this.host = host;
        this.port = port;
        this.maxConcurrentCallsPerConnection = 4;
        this.maxMessageSize = Integer.MAX_VALUE;
    }

    public void setMaxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
        this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
    }

    public void setMaxMessageSize(int maxMessageSize) {
        this.maxMessageSize = maxMessageSize;
    }

    public void setThreadPoolSize(int threadPoolSize) {
        this.threadPoolSize = threadPoolSize;
    }

    public void setThreadPoolQueueSize(int threadPoolQueueSize) {
        this.threadPoolQueueSize = threadPoolQueueSize;
    }

    /**
     * Require for `server.crt` and `server.pem` for open ssl at server side.
     *
     * @param certChainFile  `server.crt` file
     * @param privateKeyFile `server.pem` file
     */
    public GRPCServer(String host, int port, String certChainFile, String privateKeyFile) {
        this(host, port);
        this.certChainFile = certChainFile;
        this.privateKeyFile = privateKeyFile;
    }

    @Override
    public String hostPort() {
        return host + ":" + port;
    }

    @Override
    public String serverClassify() {
        return "Google-RPC";
    }

    @Override
    public void initialize() {
        InetSocketAddress address = new InetSocketAddress(host, port);
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(threadPoolQueueSize);
        // grpc默认使用 newCachedThreadPool 线程池的最大线程数是无限大, 大流量下容易线程爆炸, 因此这里自定义了一个线程池
        ExecutorService executor = new ThreadPoolExecutor(
            threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue,
            new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler()
        );
        // 构建 nettyServerBuilder 传入各种初始化参数
        nettyServerBuilder = NettyServerBuilder.forAddress(address);
                                                // 设置每个连接的最大并发调用数
        nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection)
                                                // 设置传入消息体的最大值
                                               .maxInboundMessageSize(maxMessageSize)
                                                // 传入服务端线程池
                                               .executor(executor);
        if (!Strings.isNullOrEmpty(privateKeyFile) && !Strings.isNullOrEmpty(certChainFile)) {
            // 密文传输文件设置
            sslContext = DynamicSslContext.forServer(privateKeyFile, certChainFile);
            nettyServerBuilder.sslContext(sslContext);
        }
        log.info("Server started, host {} listening on {}", host, port);
    }

    static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            log.warn("Grpc server thread pool is full, rejecting the task");
        }
    }

    @Override
    public void start() throws ServerException {
        try {
            Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start);
            server = nettyServerBuilder.build();
            // 启动service服务
            server.start();
        } catch (IOException e) {
            throw new GRPCServerException(e.getMessage(), e);
        }
    }

    // 注册实现了proto协议中定义的接口的服务
    public void addHandler(BindableService handler) {
        log.info("Bind handler {} into gRPC server {}:{}", handler.getClass().getSimpleName(), host, port);
        nettyServerBuilder.addService(handler);
    }

    // 下面两个 addHandler 方法是结合起来为已注册的服务添加拦截器
    public void addHandler(ServerServiceDefinition definition) {
        log.info("Bind handler {} into gRPC server {}:{}", definition.getClass().getSimpleName(), host, port);
        nettyServerBuilder.addService(definition);
    }

    public void addHandler(ServerInterceptor serverInterceptor) {
        log.info("Bind interceptor {} into gRPC server {}:{}", serverInterceptor.getClass().getSimpleName(), host, port);
        nettyServerBuilder.intercept(serverInterceptor);
    }

    @Override
    public boolean isSSLOpen() {
        return !Strings.isNullOrEmpty(privateKeyFile) && !Strings.isNullOrEmpty(certChainFile);
    }

    @Override
    public boolean isStatusEqual(Server target) {
        if (this == target)
            return true;
        if (target == null || getClass() != target.getClass())
            return false;
        GRPCServer that = (GRPCServer) target;
        return port == that.port && Objects.equals(host, that.host) && Objects.equals(
            certChainFile, that.certChainFile) && Objects
            .equals(privateKeyFile, that.privateKeyFile);
    }
}

ServerHandler

Server 处理客户端请求逻辑的顶层抽象,具体处理请求的逻辑都会封装在该接口的实现之中
?

GRPCHandler

GRPCServer 处理 GRPC 请求的逻辑都封装在了该接口的实现类中

ManagementServiceHandler

GRPCHandler 的其中一个实现,该类就是实例信息注册与心跳服务的具体实现

public class ManagementServiceHandler extends ManagementServiceGrpc.ManagementServiceImplBase implements GRPCHandler {
    private final SourceReceiver sourceReceiver;
    private final NamingControl namingControl;

    public ManagementServiceHandler(ModuleManager moduleManager) {
        // 通过模块管理器获取Source信息接收器
        this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
        // 获取名称管理实例校验并格式化上报的服务名称信息
        this.namingControl = moduleManager.find(CoreModule.NAME)
                                          .provider()
                                          .getService(NamingControl.class);
    }

    @Override
    public void reportInstanceProperties(final InstanceProperties request,
                                         final StreamObserver<Commands> responseObserver) {
        // 创建实例信息 Source 类
        ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
        // 校验客户端上报的服务名称
        final String serviceName = namingControl.formatServiceName(request.getService());
        // 校验客户端上报的服务实例名称
        final String instanceName = namingControl.formatInstanceName(request.getServiceInstance());
        // 注入根据服务名称获取的服务ID
        serviceInstanceUpdate.setServiceId(IDManager.ServiceID.buildId(serviceName, NodeType.Normal));
        // 注入实例名称
        serviceInstanceUpdate.setName(instanceName);

        JsonObject properties = new JsonObject();
        List<String> ipv4List = new ArrayList<>();
        // 将OS信息的键值对写入到 Source 对象中
        request.getPropertiesList().forEach(prop -> {
            if (InstanceTraffic.PropertyUtil.IPV4.equals(prop.getKey())) {
                ipv4List.add(prop.getValue());
            } else {
                properties.addProperty(prop.getKey(), prop.getValue());
            }
        });
        properties.addProperty(InstanceTraffic.PropertyUtil.IPV4S, ipv4List.stream().collect(Collectors.joining(",")));
        serviceInstanceUpdate.setProperties(properties);
        // 写入采样时间
        serviceInstanceUpdate.setTimeBucket(
            TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute));
        // 上报实例信息, 将实例信息传给SourceReceiver, SourceReceiver会使用Source类指定的Dispatcher转发给worker后写入到DataCarrier的缓存队列中等待存储模块进行消费
        sourceReceiver.receive(serviceInstanceUpdate);

        // 返回响应体
        responseObserver.onNext(Commands.newBuilder().build());
        responseObserver.onCompleted();
    }

    @Override
    public void keepAlive(final InstancePingPkg request, final StreamObserver<Commands> responseObserver) {
        final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
        final String serviceName = namingControl.formatServiceName(request.getService());
        final String instanceName = namingControl.formatInstanceName(request.getServiceInstance());

        // 创建实例信息 Source 类
        ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
        // 注入根据服务名称获取的服务ID
        serviceInstanceUpdate.setServiceId(IDManager.ServiceID.buildId(serviceName, NodeType.Normal));
        // 注入实例名称
        serviceInstanceUpdate.setName(instanceName);
        // 注入采样时间
        serviceInstanceUpdate.setTimeBucket(timeBucket);
        // 上报实例信息
        sourceReceiver.receive(serviceInstanceUpdate);

        // 创建服务信息 Source 类
        ServiceMeta serviceMeta = new ServiceMeta();
        // 注入服务名称
        serviceMeta.setName(serviceName);
        // 注入服务类型
        serviceMeta.setNodeType(NodeType.Normal);
        // 注入采样时间
        serviceMeta.setTimeBucket(timeBucket);
        // 上报服务信息
        sourceReceiver.receive(serviceMeta);

        // 返回响应体
        responseObserver.onNext(Commands.newBuilder().build());
        responseObserver.onCompleted();
    }
}

InstanceUpdateDispatcher & ServiceMetaDispatcher

从上一步服务的具体实现类中我们可以看到,上报的信息被转换成了 Source 类,然后传入了
SourceReceiver 中,SourceReceiver 会根据 Source 类注解上指定的 Dispatcher 调度器再将
Source 类转换为指标类
实例信息和服务信息的调度器如下
对应转换的指标类分别是 InstanceTrafficServiceTraffic

public class InstanceUpdateDispatcher implements SourceDispatcher<ServiceInstanceUpdate> {
    @Override
    public void dispatch(final ServiceInstanceUpdate source) {
        InstanceTraffic traffic = new InstanceTraffic();
        traffic.setTimeBucket(source.getTimeBucket());
        traffic.setName(source.getName());
        traffic.setServiceId(source.getServiceId());
        traffic.setLastPingTimestamp(source.getTimeBucket());
        traffic.setProperties(source.getProperties());
        MetricsStreamProcessor.getInstance().in(traffic);
    }
}
public class ServiceMetaDispatcher implements SourceDispatcher<ServiceMeta> {
    @Override
    public void dispatch(final ServiceMeta source) {
        ServiceTraffic traffic = new ServiceTraffic();
        traffic.setTimeBucket(source.getTimeBucket());
        traffic.setName(source.getName());
        traffic.setNodeType(source.getNodeType());
        MetricsStreamProcessor.getInstance().in(traffic);
    }
}

InstanceTraffic & ServiceTraffic

再来看一下这两个指标类
其中 Stream 注解的 name 字段就是指标名称,存储模块例如 ES 在写入的时候会根据它进行索引的创建

@Stream(name = InstanceTraffic.INDEX_NAME, scopeId = SERVICE_INSTANCE,
    builder = InstanceTraffic.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = true)
@EqualsAndHashCode(of = {
    "serviceId",
    "name"
})
public class InstanceTraffic extends Metrics {
    public static final String INDEX_NAME = "instance_traffic";
    public static final String SERVICE_ID = "service_id";
    public static final String NAME = "name";
    public static final String LAST_PING_TIME_BUCKET = "last_ping";
    public static final String PROPERTIES = "properties";

    private static final Gson GSON = new Gson();
    
    // ......省略
}        
@Stream(name = ServiceTraffic.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE,
    builder = ServiceTraffic.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = false)
@EqualsAndHashCode(of = {
    "name",
    "nodeType"
})
public class ServiceTraffic extends Metrics {
    public static final String INDEX_NAME = "service_traffic";

    public static final String NAME = "name";
    public static final String NODE_TYPE = "node_type";
    public static final String GROUP = "service_group";
    
    // ......省略
}

之后调度器会将指标类传入到流处理器中,流处理器再将指标类传入到各种 Worker 中进行聚合、持久化存储等操作,具体的原理就不再赘述了,我们用 ES 作为存储模块验证一下实例信息和服务信息上报的结果

结果验证

Live-Demo 项目中的 projectB 为例
使用 business-zone::projectB 作为服务名称启动 ProjectB 项目,在
ServiceManagementClientprepare 方法结尾打上断点,查看生成的实例名称
在这里插入图片描述

这里可以看到生成的实例名称为 e4143e44d8ff448583bef96d77b57efc@192.168.100.48

启动 UI 服务,可以看到当前服务和当前实例的名称和预期的一样
在这里插入图片描述
再到 ES 中查看对应指标索引的结果
实例信息指标
在这里插入图片描述
服务信息指标
在这里插入图片描述
以上就是实例信息和服务信息上报的整个流程验证


总结

Skywalking AgentOAP 分别实现了 GRPC 的客户端与服务端并进行远程通信。
GRPC 本身是一个基于HTTP/2 标准设计高性能的 RPC 框架,默认使用的 Protocol Buffers 序列化协议性能相较于 RESTful Json 好很多。并且Skywalking 在使用 GRPC 的长连接时,也保证了多个服务复用一个连接,减少了网络带宽,提高了交互效率,这种思想也值得我们借鉴。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-07-29 11:30:08  更:2021-07-29 11:31:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/28 23:06:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码