IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 美团Cat监控集成Hystrix线程池链路分析 -> 正文阅读

[Java知识库]美团Cat监控集成Hystrix线程池链路分析

目录

? ? ??一、背景

? ? ??二、集成Hystrix线程池

? ? ? 三、福尔摩斯监控冲突

? ????四、透传Cat线程本地Context


一、背景

CAT(Central Application Tracking)是一个实时和接近全量的监控系统,它侧重于对Java应用的监控,在中间件(MVC、RPC、数据库、缓存等)框架中得到广泛应用,为业务线提供系统的性能指标、健康状况、监控告警等。

CAT整体主要分为三个模块:CAT-client、CAT-consumer、CAT-home。

  • Cat-client 提供给业务以及中间层埋点的底层SDK。

  • Cat-consumer 用于实时分析从客户端提供的数据。

  • Cat-home 作为用户给用户提供展示的控制端。

在实际开发和部署中,Cat-consumer和Cat-home是部署在一个JVM内部,每个CAT服务端都可以作为consumer也可以作为home,这样既能减少整个层级结构,也可以增加系统稳定性。

Cat除了全面、实时的监控,还有一个比较重要的功能就是能够提供一个类似全链路监控的功能(并非完全意义上的全链路监控),如下所示:

链路监控的重要性不言而喻,他可以帮助我们遇到问题时快速的定位问题,也能帮助我们找到系统的优化点。

在未集成Hystrix线程池的时候一切都是没有问题的,但是使用了Hystrix线程池之后, Cat的链路就断了。

只有一个最终调用的结果耗时,这绝对不是我们想要的。

所以基于这个起因,本文就来介绍Cat在集成Hystrix线程池实现链路监控上的一些的尝试。

二、集成Hystrix线程池

首先,我们从Hystrix上的一些扩展说起,因为,Hystrix集成Cat监控实现链路监控必然要涉及到从Hystrix上的扩展作为切入点进行实现我们自定义的业务逻辑。

而这里所说的Hystrix扩展其实就是Hystrix并发策略的扩展,如下我列出了一个扩展Hystrix并发策略的例子。

public class KapiHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

    private HystrixConcurrencyStrategy delegate;

    public RequestAttributeHystrixConcurrencyStrategy() {
        try {
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
                return;
            }
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
            this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);
            HystrixPlugins.reset();
            //注册新的并发策略
            HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
        } catch (Exception e) {
            LoggerUtil.logger().error("构建hystrix并发策略异常:", e);
        }
    }

    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier, HystrixMetricsPublisher metricsPublisher, HystrixPropertiesStrategy propertiesStrategy) {
    }

    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
    }

    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.delegate.getBlockingQueue(maxQueueSize);
    }

    public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
        return this.delegate.getRequestVariable(rv);
    }

    public <T> WrappedCallable<T> wrapCallable(Callable<T> callable) {
       return this.delegate.wrapCallable(callable);
    }
}

在这个Hystrix扩展当中,我们通过重写抽象类HystrixConcurrencyStrategy一系列方法可以实现我们需要的逻辑功能,在这个方法当中,有一个方法wrapCallable(Callable<T> callable)正是对Hystrix线程池执行任务的时候线程的包装,这也正是我们透传线程上下文需要找到的切入点。

三、福尔摩斯监控冲突

为了验证我的这个猜想,我在接入的过长当中,首先在wrapCallable(Callable<T> callable)打了个断点,试图程序在运行期能够进入到我的断点,但是却发现,使用postman进行接口测试的时候,断点并没有执行到wrapCallable(Callable<T> callable)断点处,这令我非常不解,难道猜想是错的,

我并没有立刻对我的猜想产生怀疑,而是又做了如下猜想:

Hystrix线程池执行线程之所以没有走这个对线程进行包装的地方,难道是有其他实现也重写了这个方法进行覆盖掉了?

于是,我找到抽象类HystrixConcurrencyStrategy的wrapCallable(Callable<T> callable)处,查看该方法被哪些实现类实现了,果不其然,有一个实现类是我们引入的三方包福尔摩斯进行监控上报接口调用情况的类也重写了抽象类HystrixConcurrencyStrategy的wrapCallable(Callable<T> callable)方法。

此刻,我需要重新进行以下验证,是不是程序运行的时候真的执行到了这个三方包的实现类,当我重新在这个三方包实现类打断点时,确实是执行了这个wrapCallable(Callable<T> callable)方法,而这个实现类里面的HystrixConcurrencyStrategy delegate变量就是我自己的抽象类HystrixConcurrencyStrategy的实现类。也就是说在我的实现类之上进行了一次包装,把我的实现方法给覆盖掉了。

四、透传Cat线程本地Context

找到问题之后,我将两者进行了合并兼容。

到这里,我们只是找到了Cat集成Hystrix线程池实现链路的切入点,然后下面就需要分析查看线程上下文之间传什么?怎么进行传?

这点非常关键,也是实现链路监控的核心。

从我们项目的业务逻辑来看,从Controller进来的请求,调用下游接口我们统一使用的都是spring的resttemplate进行调用的,在使用resttemplate进行调用的时候,我们已经添加了Cat埋点监控,那么其实也就是需要搞清楚在使用resttemplate进行调用的时候他的线程上下文使用到的是什么东西保证进行的该请求的信息上报的。

于是,我看了resttemplate接入Cat埋点的地方。

public static <T> T newTransaction(Callable<T> function, String type, String name, Map<String, Object> data) throws Exception {
        Transaction transaction = Cat.newTransaction(type, name);
        if (data != null && !data.isEmpty()) {
            data.forEach(transaction::addData);
        }
        try {
            T result = function.call();
            transaction.setStatus(Message.SUCCESS);
            return result;
        } catch (Exception e) {
            Cat.logError(e);
            if (e.getMessage() != null) {
                Cat.logEvent(type + "_Error", name + e.getMessage());
            }
            transaction.setStatus(e);
            throw e;
        } finally {
            transaction.complete();
        }
    }

也就是上面Cat.newTransaction(type,?name)之处。

继续看Cat.newTransaction(type,?name)源码。

public static Transaction newTransaction(String type, String name) {
        if (isEnabled()) {
            try {
                return getProducer().newTransaction(type, name);
            } catch (Exception var3) {
                errorHandler(var3);
                return NullMessage.TRANSACTION;
            }
        } else {
            return NullMessage.TRANSACTION;
        }
    }
    
 public Transaction newTransaction(String type, String name) {
        if (!this.manager.hasContext()) {
            this.manager.setup();
        }

        DefaultTransaction transaction = new DefaultTransaction(type, name, this.manager);
        this.manager.start(transaction, false);
        return transaction;
  }

直到进入到DefaultMessageProducer的newTransaction(String type, String name)方法。

在这里可以看到,判断manager(DefaultMessageManager)的hasContext()这个方法是否为空,如果为空,就调用setup()方法。

进入hasContext()这个方法。

 private ThreadLocal<DefaultMessageManager.Context> context = new ThreadLocal();
 
 public boolean hasContext() {
        DefaultMessageManager.Context context = (DefaultMessageManager.Context)this.context.get();
        return context != null;
}

可以看到就是在DefaultMessageManager这个类中的ThreadLocal<DefaultMessageManager.Context> context中获取当前线程的私有线程副本变量。

我们回来再看setup()方法。

setup()方法的实现同样是在DefaultMessageManager这个类中。

public void setup() {
        DefaultMessageManager.Context ctx = new DefaultMessageManager.Context(this.domain, this.hostName, this.ip);
        double samplingRate = this.configService.getSamplingRate();
        if (samplingRate < 1.0D && this.hitSample(samplingRate)) {
            ctx.tree.setHitSample(true);
        }
        this.context.set(ctx);
    }

核心逻辑就是创建一个DefaultMessageManager.Context ctx?,然后调用context.set()方法,这个context.set()方法其实就是上面ThreadLocal的set()方法,进行线程绑定。

看到这里就明白了,原来是resttemplate进行调用的时候,在newTransaction(String type, String name)的时候,通过ThreadLocal获取当前线程绑定的对象Context,如果有的话,?才进行后续信息采集,如果没有就新创建一个Context了。

所以,我们在进行线程上下文传递的时候,需要传递的正是这个Context。

?然后我们再回到Hystrix扩展点,包装线程的地方,也就是在这里需要把这个Context从主线程传递到子线程中去。

传递的前提有两步:

①、我们获取到这个Context

②、我们通过一定的方法将Context设置到子线程的本地变量中

首先, 获取这个Context并不麻烦,DefaultMessageManager直接提供了getContext()方法,但是第二部分,进行设置到子线程的时候却发现了问题,在DefaultMessageManager这个类中,并没有提供setContext()方法,或者其他可以往线程本地变量赋值的api,而Context的传递还必须使用这个DefaultMessageManager类中的instance。不能使用其他的DefaultMessageManager实例或者ThreadLocal。

public class DefaultMessageManager implements MessageManager {
    ...... 
    private ThreadLocal<DefaultMessageManager.Context> context = new ThreadLocal();
    private static MessageManager INSTANCE = new DefaultMessageManager();
}

而且,我在DefaultMessageManager的接口处也发现了这么一句话。

/**
 * Message manager to help build CAT message.
 * <p>
 * <p>
 * Notes: This method is reserved for internal usage only. Application developer should never call this method directly.
 */
public interface MessageManager {

}

看这个注释,翻译成中文就是此方法仅供内部使用。应用程序开发人员不应该直接调用此方法。

这就emoji了,人家不建议外部使用,也就是说人家不让你扩展自己的业务逻辑。

但是,既然DefaultMessageManager实例是通过这个类中的全局static实例进行使用的,也就是说我们可以获取到这个实例,既然我们能获取到这个实例,那么我们就可以通过反射获取这个实例中的任何字段和方法。

有了这个思路,于是我企图通过反射获取DefaultMessageManager实例中的ThreadLocal,这样不就能给子线程赋值主线程的Context了吗。

于是,使用反射给子线程透传Context这段代码是这样的。

完了之后,进行测试,确实是可以实现的,子线程获取到了Context,从而整个链路就串起来了。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 17:27:33  更:2022-04-18 17:29:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 4:29:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码