前面介绍了 xxl-job admin 的源码,现在我们介绍 executor 的源码。
admin 和 executor 的交互主要包括:
- executor 向 admin 发起 注册、心跳、下线 请求;
- admin 服务向 executor 手动或自动调度任务;
- executor 接收到 admin 的调度会,会调用 jobHandler 来执行 job;
- executor 执行完 job 后,会给 admin 发起一个回调,返回 job 的执行结果。
下面,我们通过源码来学习,executor 实例是如何实现这些功能的。
在官方自带的 springboot executor 代码中,主要有两个类:XxlJobConfig、SampleXxlJob,如下图所示。前者是 executor 的配置类,后者是向 Spring 容器注入 Job 和对应的 JobHandler。
和 admin 服务一样,executor 实例的功能也是在配置类中启动完成的,下面我们从配置类 XxlJobConfig 开始学习 executor 实例的源码。
XxlJobConfig
配置类,创建 executor Bean
XxlJobConfig 的作用就是读取配置文件中的参数,创建一个 XxlJobSpringExecutor Bean,代码如下:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
executor 实例会向 admin 服务的 xxl_job_registry 表注册数据,就是把这里配置的 XxlJobSpringExecutor 注册过去,它代表了 executor 实例本身,所以 executor 实例的所有功能也都是在这个类中实现的。
XxlJobSpringExecutor
executor 代码中有两个主要的 excutor 类:XxlJobSimpleExecutor、XxlJobSpringExecutor,两者的功能大同小异(后面只介绍 XxlJobSpringExecutor),都继承自 XxlJobExecutor,三者的 UML 图如下所示:
容器接口
从上图可以看出,XxlJobSpringExecutor 实现了 Spring 框架的三个容器接口,三个容器接口会在 Spring 容器初始化和销毁过程中,提供回调接口,功能如下:
1、ApplicationContextAware
提供 setApplicationContext 方法,Spring 容器初始化后,回调该方法,把应用上下文对象 ApplicationContext 注入到实现该接口的 Bean 中,从而可以通过 ApplicationContext 的对象引用获得 Spring 容器中的 Bean。
2、SmartInitializingSingleton
提供 afterSingletonsInstantiated 方法,所有单例 Bean 初始化完成后, 回调该方法,执行自定义的初始化操作。
3、DisposableBean
提供 destroy 方法,Spring 容器销毁时,回调该方法,执行自定义的销毁操作。
从上面三个接口的作用可以看出,XxlJobSpringExecutor 是在 afterSingletonsInstantiated 中执行初始化操作,在 destroy 方法中执行销毁操作:
初始化和启动
afterSingletonsInstantiated 方法主要执行了以下逻辑:
1、初始化 JobHandler 仓库;
2、刷新 GlueFactory 实例;
3、调用父类 XxlJobExecutor 的 start 方法;
代码如下:
@Override
public void afterSingletonsInstantiated() {
initJobHandlerMethodRepository(applicationContext);
GlueFactory.refreshInstance(1);
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
destroy 方法主要执行了父类的 destroy 方法,代码如下:
@Override
public void destroy() {
super.destroy();
}
下面我们依次介绍调用的方法。
1.1 initJobHandlerMethodRepository
初始化 jobHandler 仓库
executor 最重要的功能之一:接收 admin 服务的调度请求,选择对应的 jobHandler 来执行 job,所以初始化的第一件事就是注册 jobHandler 到仓库中,也就是把所有 JobHandler 保存到一个 map 对象中,代码如下:
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
(MethodIntrospector.MetadataLookup<XxlJob>) method -> AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class));
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods == null || annotatedMethods.isEmpty()) {
continue;
}
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
registJobHandler(xxlJob, bean, executeMethod);
}
}
}
因为 executor 实例中使用注解 @XxlJob 来标记 jobHandler 方法,所以要先从 applicationContext 中取出 Bean,再从 Bean 中找出添加了注解的方法,代码的主要步骤如下:
1、从 applicationContext 中取出所有 Bean 的名称,再根据名称遍历每一个 Bean;
2、从 Bean 中取出添加了注解 @XxlJob 的方法;
3、调用父类 XxlJobExecutor 的方法 registJobHandler ,把每一个添加了注解 @XxlJob 的方法,注册到 map 中。
问题:这里是从 applicationContext 中取出所有 Bean,然后再遍历每个 Bean 的方法,这样做会有什么问题?
上述操作是在 afterSingletonsInstantiated 方法中执行的,只能从 applicationContext 取出 Spring 容器里已经初始化的单例 Bean,但对于非单例的 Bean(比如作用域是 request scope),在这里就无法取出。
为了解决这个问题,可以让 XxlJobSpringExecutor 实现 BeanPostProcessor 接口的 postProcessAfterInitialization 方法,该方法会在Bean的初始化方法(init-method)被容器调用之后、afterSingletonsInstantiated 方法之前执行,传入的参数 bean 就是刚刚由Spring容器初始化完成的(调用过 init-method)的 Bean。
XxlJobExecutor
核心类,XxlJobSpringExecutor 的父类
XxlJobExecutor 是 xxl-job 框架 executor 服务的核心类,executor 的主要功能都与该类有关。”初始化 JobHandler 仓库" 的后半部分调用了该类的registJobHandler 方法。
1.2 registJobHandler
把所有 JobHandler 注册到一个 map 中,需要关注的是:是否存在并发安全?JobHandler 是否存在重名?可以使用一个全局的 map 来保存 JobHandler 对象,并且 map 要求是并发安全的。
xx-job 使用一个 static 修饰的 ConcurrentMap 来保存 JobHandler,同时满足了以上两个要求。
JobHandler 仓库 map
该方法的参数有:注解 xxlJob、所在 Bean 对象、添加了该注解的方法 executeMethod;作用是:把传入的参数,封装成一个 IJobHandler 对象,保存进一个 map 中,该 map 的声明及相关方法如下:
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<>();
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
return jobHandlerRepository.put(name, jobHandler);
}
问题:为什么使用 ConcurrentMap 来保存 JobHandler?
ConcurrentMap 很显然是为了保证线程安全,那么在什么情况下会有线程安全呢?
Spring 装配 Bean 都是线程安全的,但是这里的 ConcurrentMap 对象(jobHandlerRepository)是一个 static 变量,每个 executor Bean 都可以向它添加 IJobHandler 对象,用户可以自己手动多线程调用 executor Bean 来注册 IJobHandler 或者装配多个 executor Bean,会导致该 map 并发安全,所以需要使用 ConcurrentMap 来保证线程安全。
JobHandler 注册过程
该方法会执行以下操作:
- 检查 name 是否全局唯一、取出所在类的 init 方法、取出所在类的 destroy 方法;
- 检查都通过后,封装成 IJobHandler 对象,保存进 map 中;
代码如下:
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
if (xxlJob == null) {
return;
}
String name = xxlJob.value();
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
throw new RuntimeException("xxx");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException();
}
executeMethod.setAccessible(true);
Method initMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException();
}
}
Method destroyMethod = null;
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException();
}
}
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
从这个方法可以看出,所有添加了 @XxlJob 注解的方法,被封装成 MethodJobHandler(IJobHandler 的子类)后保存进 map 中,因为 map 是 static 变量(全局唯一),这样即使 Spring 容器中有多个 executor Bean,也都会把 xxlJob 保存进该 map,保证了 xxlJob 全局唯一性。
问题:代码中的 method.setAccessible(true) 作用是什么?
在 Java 反射中,把对象(Field、Method)的 accessible 标志设置为 true,表示该对象在使用时取消访问安全检查(不是说 true 代表能使用)。因为JDK的安全检查耗时较多,所以通过 setAccessible(true) 可以达到提升反射速度的目的。
吐槽一下,个人觉得的在 registJobHandler 方法里设置 “executeMethod.setAccessible(true)” 很奇怪,因为在该方法的后续并没有调用这些方法(它们是在 MethodJobHandler 中被调用),别人看到没有被调用会很容易误删,个人觉得应该封装在 MethodJobHandler 的构造方法里,避免误操作。
问题:猜测 MethodJobHandler 中属性的可见性应该怎么设置?
从构造器方法可以看出,MethodJobHandler 的字段属性来自于 @XxlJob 的配置,应该不支持在运行时修改。所以,它的属性应该是 private 修饰的,且不提供 setter 方法,如果有查询需要,可以提供 getter 方法。
从 MethodJobHandler 的代码可以看到,猜测基本符合。
看完 registJobHandler 方法,我们回到 XxlJobSpringExecutor 类的 afterSingletonsInstantiated 方法继续向后看,一共执行了两个方法:
- 执行方法:GlueFactory.refreshInstance(1),生成了一个新的 GlueFactory 实例,暂时跳过;
- 调用了 XxlJobExecutor 的
start 方法。
很显然,初始化完所有 JobHandler 之后,就要在 start 方法中初始化和启动 executor,下面我就详细查看该方法执行的操作。
2. start
首选我们需要知道 executor 要具备的功能:
1、记录日志;
2、注册自己到 admin 服务;
3、清理过期日志;
4、接收 admin 服务的调度请求;
5、调度请求执行结束后,要给 admin 服务一个回调响应。
这些功能都是在 start 方法中启动,代码如下:
public void start() throws Exception {
XxlJobFileAppender.initLogPath(logPath);
initAdminBizList(adminAddresses, accessToken);
JobLogFileCleanThread.getInstance().start(logRetentionDays);
TriggerCallbackThread.getInstance().start();
initEmbedServer(address, ip, port, appname, accessToken);
}
2.1 initLogPath
初始化日志路径,位于 XxlJobFileAppender 类中
该方法位于 XxlJobFileAppender 类中,该类的主要方法如下图所示:
从图中圈出的方法可以看出,该类的主要功能是读写日志。要调用该类的日志读写方法,前提是初始化日志文件路径,也就是调用 initLogPath ,代码如下:
private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
public static void initLogPath(String logPath) {
if (logPath != null && logPath.trim().length() > 0) {
logBasePath = logPath;
}
File logPathDir = new File(logBasePath);
if (!logPathDir.exists()) {
logPathDir.mkdirs();
}
logBasePath = logPathDir.getPath();
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}
该方法的作用就是创建了基础日志文件、glue日志文件所在的文件夹。
2.2 initAdminBizList
位于 XxlJobExecutor 类中,初始化 admin 服务列表,但没有执行注册操作
因为 admin 服务是可以集群部署的(一个 admin 服务部署多个实例),所以需要初始化 amin 服务列表。
这个方法就是初始化 admin 服务的代理对象 adminBizClient,添加到列表中,代码如下:
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) {
if (adminAddresses != null && adminAddresses.trim().length() > 0) {
for (String address : adminAddresses.trim().split(",")) {
if (address != null && address.trim().length() > 0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<>();
}
adminBizList.add(adminBiz);
}
}
}
}
即使 admin 服务是多实例部署,也是共享 accessToken 的,所以每个 adminBizClient 对象即使 IP 不同,使用相同的 accessToken。
这里有两个注意事项:
1、该方法只是初始化 adminClient,并没有向 admin 服务执行注册操作;
2、在 admin 服务的 JobRegistryHelper 类中,admin 服务接收到注册请求后,会把 executor 的数据添加到 xxl_job_registry 表中,所以这里的 admin ip 不能存在重复值,否则在 admin 服务的 xxl_job_registry 表中会出现重复数据。
在后面 adminClient 的 registry 方法部分,我们会看到,只要有一个 adminClient 注册成功,就不会继续注册剩下的 adminClient,避免了在多实例部署时的重复问题。
问题:这个方法有什么问题?
这个方法是修改静态变量 adminBizList,如果在一台机器上装配了多个 executor Bean,会重复添加 adminBiz 对象到 adminBizList 里,所以要避免装配多个 executor bean。
问题:为什么这里的 adminBizList 不是并发安全的,而前面的 jobHandlerRepository 是并发安全的?它们是在同一个方法里初始化的,为什么设计上会有差别?
因为我们要保证 JobHandler 是全局唯一的,所以使用了并发安全的 map。但是我们不需要保证 adminClient 是唯一的,因为在后面的注册过程会看到,只要有一个 adminClient 注册成功,就不会注册其他 adminClient。
尽管两者都是在同一个方法里初始化的,但是要满足的业务功能不同,所以设计上有差异。
这里涉及到了 admin 服务的代理类 “AdminBizClient”,它实现了 AdminBiz 接口,提供了三个基本方法 ”callback、registry、registryRemove“,如下图所示:
这些方法又都是调用 XxlJobRemotingUtil.postBody 来实现 http 请求,三者的 UML 如下图所示:
2.3 JobLogFileCleanThread
日志文件清理线程,每天执行一次,设置的有效天数要大于 3
与 admin 服务相似,executor 也要定时清理过期的日志。与 admin 不同的是,executor 清理的是日志文件夹,admin 服务清理的是数据库中的日志记录。
同样,这种“自动任务”类的功能,可以和 admin 服务的实现方式类似,使用线程定时执行。
该类持有一个线程,用来定时清理日志文件;在线程的 run 方法中,先调用 XxlJobFileAppender 类返回日志文件路径(在 2.1 initLogPath 中初始化),然后遍历它的子文件夹。文件夹的名称就是文件夹创建日期,如果超过了过期天数,就递归地删除文件夹。
主要代码如下:
private Thread localThread;
if (logRetentionDays < 3) {
return;
}
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
Date logFileCreateDate = null;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
logFileCreateDate = simpleDateFormat.parse(childFile.getName());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}
if ((todayDate.getTime() - logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)) {
FileUtil.deleteRecursively(childFile);
}
try {
TimeUnit.DAYS.sleep(1);
} catch (InterruptedException e) {
}
线程会每天执行一次,直到启动标志 toStop 被设置为 true。
2.4 TriggerCallbackThread
启动调度请求的回调线程、回调重试线程
executor 会接收到 admin 服务执行 job 的请求,在 admin 服务中,接口类的功能都是通过多线程异步执行的,executor 也不例外。
在后面我们看到,executor 会开启线程来处理 job 执行请求;而 job 执行结束之后,会把回调给 admin 的参数 push 进一个列表中,再由一个回调线程来消费列表中的回调参数,TriggerCallbackThread 就负责持有这个回调线程。通过异步执行 job、异步回调参数,executor 降低了阻塞,提升了处理请求的性能。
回调
在回调过程中还需要考虑的是:执行 job 是多线程并行的,那么保存回调参数的列表就需要是并发安全的。executor 使用一个 LinkedBlockingQueue 对象来保存回调参数。当列表中有数据时,回调线程会取出列表中的参数,依次执行回调。
代码如下:
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<>();
private Thread triggerCallbackThread;
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
List<HandleCallbackParam> callbackParamList = new ArrayList<>();
getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
}
当线程关闭时,为了消费调已有的回调参数,避免数据丢失,执行相同的操作,代码如下:
List<HandleCallbackParam> callbackParamList = new ArrayList<>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
当执行回调时,会把队列中当前时间之前的所有参数全部处理完毕,这样做的目的有两个:
1、清空 callBackQueue 队列,让它能继续接受新的回调参数,防止队列过长;
2、把该时间点之前的 JobHandler 结果全部执行回调,减少延时;
上面的回调线程,通过调用 doCallback 方法来执行回调,该方法会遍历前面初始化的 adminBizList(在 initAdminBizList 方法中初始化),来执行回调方法,代码如下:
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
if (!callbackRet) {
appendFailCallbackFile(callbackParamList);
}
}
在 doCallback 方法中,只要有一个 AdminBizClient 回调成功,就退出循环。这里要注意的是:
- xxl-job 支持 admin 集群部署,并要求 admin 多个服务共用同一个数据库,所以这里只需要向一个 admin 服务回调就可以了;
- 遍历 adminBiz list 是为了保证“高可用”,只要有一个能成功即可;
- 在 admin 服务的 JobCompleteHelper 类中,接收的参数是 callbackParamList,它会依次更新每个 job 的完成状态。
在方法的最后,如果所有 AdminBizClient 都回调失败,会保存失败日志文件,等待重试。
重试
前面如果回调失败,会把失败的参数保存到失败日志文件中,有一个重试线程来进行重试,代码如下:
private Thread triggerRetryCallbackThread;
private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);
private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");
重试的相关方法:
1、回调方法最后,如果回调失败,会执行方法 appendFailCallbackFile 把回调参数写入失败日志文件;
2、重试线程每 90 秒执行一次 retryFailCallbackFile 方法,读取前面的失败文件内容,然后再次调用 doCallback 方法;
2.5 initEmbedServer
初始化服务
在前面的工作中,初始化了 jobHandler 仓库、初始化了 admin client 列表、日志清理线程、回调线程,但是 executor 还有两个重要的功能没有启动:
- 把自己注册到 admin 中,并定时发送心跳;
- 接收 admin 的调度请求,并执行请求;
前一个功能属于 ”自动任务“ 类,可以使用一个线程来定时执行;后一个功能属于“接口请求“类,可以像 admin 服务一样,使用线程池 + 线程 的方式异步执行。
xxl-job 对后一个功能做了进一步优化,开启了一个 netty 进程来监听端口,再使用线程池 + 线程的方式来处理。
initEmbedServer 方法先使用配置的 IP、端口、应用名、accessToken 来启动 EmbedServer 对象,然后由该对象来启动 netty 进程和线程池,代码如下:
private EmbedServer embedServer = null;
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
port = port > 0 ? port : NetUtil.findAvailablePort(9999);
ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();
if (address == null || address.trim().length() == 0) {
String ip_port_address = IpUtil.getIpPort(ip, port);
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
下面我们就详细介绍 embedServer 类。
EmbedServer
2.5.1 注册注销
在 EmbedServer.start() 方法中,调用了注册方法 startRegistry(appname, address) ,代码如下:
startRegistry(appname, address);
public void startRegistry(final String appname, final String address) {
ExecutorRegistryThread.getInstance().start(appname, address);
}
该方法启动了一个注册线程 ExecutorRegistryThread,它的功能有两个:
-
30秒一次的频率,向 admin 服务注册自己; -
关闭时,从 admin 注销自己;
与前面的回调线程类似,只要 adminBizList 中有一个 adminClient 注册成功,就结束注册,代码如下:
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult != null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
break;
} else {
}
} catch (Exception e) {
}
}
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
当 executor 实例销毁时,该线程也会结束,在结束之前,会向 admin 服务注销自己的信息,代码如下:
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult != null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
break;
} else {
}
} catch (Exception e) {
}
}
2.5.2 监听请求
启动 netty 进程
EmbedServer.start 方法启动了一个 Netty 进程来监听我们配置的端口,首先是启动了一个 nettyServer,代码如下:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024))
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
finally {
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
这段代码的作用是:
- 设置了管理连接的 bossGroup、处理管理请求的 workerGroup、channel;
- 启动一个 netty 进程,用来监控端口;
- 在 channel.pipeline() 设置了 ChannelHandler;
- 设置了一个自定义的 EmbedHttpServerHandler;
前面几步都是 Netty 官方配置,只有最后一步 EmbedHttpServerHandler 是 xxl-job 自定义的。当 netty 监听到端口有请求时,会调用这个类来处理请求。
关于 netty 的设置,可以阅读 《参考阅读》中的《netty的引导程序ServerBootStrap》
2.5.3 EmbedHttpServerHandler
EmbedHttpServerHandler 是一个实现了 ChannelHandler 接口的自定义类,如下图所示:
如同我们前面所说的,该类执行请求的步骤是:
- 在一个线程池中执行创建线程来执行 admin 的调度请求;
- 线程自身没有执行对 jobHandler 的调度,而是通过调用 ExecutorBiz 来调度 jobHandler;
channelRead0 + process
当调度中心发起请求的时候,EmbedHttpServerHandler 调用 channelRead0 方法来处理请求。该方法解析请求参数后,在线程池中创建一个线程,使用 process 方法来处理请求,最后把结果写入响应。代码如下:
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
String responseJson = GsonTool.toJson(responseObj);
writeResponse(ctx, keepAlive, responseJson);
}
});
}
我们继续查看 process 方法,该方法调用 ExecutorBiz 来执行不同的方法。代码如下:
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
总结一下 EmbedServer 类的主要功能:
1、在 XxlJobExecutor 初始化和启动的最后一步,会启动 EmbedServer;
2、EmbedServer 会启动一个线程定时注册和注销 executor;
3、EmbedServer 会启动一个 netty 进程监听端口,接收 admin 服务的请求;
4、EmbedServer 接收到 admin 服务的请求后,会在线程池中开启一个线程,调用 executorBiz 的方法。
注意:在第4步之后,并没有立即执行 job,而是又使用了一个线程来执行 job。
ExecutorBiz、JobThread
如果每接收到 admin 服务的一个调度请求,executor 就在线程池中开启一个线程来调用 executorBiz 的 run 方法,那么在请求高峰期或者 job 比较耗时时,会导致线程池阻塞。
为了避免这个问题,xxl-job 在这里做了一个优化:它给每个 jobHandler 创建了一个线程 jobThread,executorBiz 的 run 方法先取出(或注册) jobThread,然后把调度参数添加进 jobThread 的参数队列中,启动 jobThread 之后让 jobThread 去消费队列中的调度参数。
实际上是每个 job 对应一个 jobThread,一般一个 job 对应一个 jobHandler,所以我们这里认为一个 jobHandler 对应一个 jobThread
这样做的好处是:
- 每个 jobHandler 都有一个对应的线程 jobThread ,避免了 jobHandler 之间竞争线程;
- 避免开启的线程数过多,阻塞线程池;
- jobThread 从队列中消费调度参数,异步非阻塞地方式执行 job,提升了系统的性能和稳定性,避免了任务阻塞。
从这里也可以看出,在 EmbedServer 中初始化的线程池的目的不是执行 job,而是把调度参数添加到对应的 jobThread,这样线程池就不会因为大量的请求而被阻塞。
3.1 注册 jobThread
在前面我们没有看到 jobThread 有关的代码,所以在使用 jobThread 之前要先注册它。另外,在 admin 服务中,我们可以给一个 job 指定一个已存在的 jobHandler 或者配置一个 glue 类型、script 类型的 jobHandler,以及我们可以指定 jobHandler 的阻塞策略等等,这些配置都会在这里起作用。
代码如下:
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread != null ? jobThread.getHandler() : null;
String removeOldReason = null;
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
if (jobThread != null && jobHandler != newJobHandler) {
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
if (jobHandler == null) {
jobHandler = newJobHandler;
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
} else if (glueTypeEnum != null && glueTypeEnum.isScript()) {
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<>(ReturnT.FAIL_CODE, "block strategy effect:" + ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
}
}
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
return jobThread.pushTriggerQueue(triggerParam);
}
这个方法的主要逻辑如下:
1、根据 jobId 取出一个 jobThread 和 jobHandler,如果是第一次调度 job,那么 jobThread 是 null,会新建 jobThread;
2、比 jobHandler 和 jobHandlerRepository 中的 newJobHandler:
- jobHandlerRepository 就是在 《1.1 initJobHandlerMethodRepository》部分初始化的 jobHandler 仓库;
- 如果两者不一样,说明 jobThread 中的 jobHandler 不是最新版本,注意,所有 jobHandler 都是先保存在 jobHandlerRepository 中,且是全局唯一的,然后才封装到 jobThread 中,所以两者不一致时,是把 jobThread 中的 jobHandler 指向 jobHandlerRepository 中 jobHandler;
3、根据 阻塞策略 和 jobThread 的状态做处理;
4、如果 jobThread 为 null,原因有三个:
那么需要到 XxlJobExecutor 中注册一个新 jobThread,并立刻启动和停止旧的 jobThread,代码如下所示:
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
return newJobThread;
}
5、把请求中的触发参数 triggerParam 推入 jobThread 的触发队列;
6、如果 jobHandler 是 glue、script 类型,会创建对应的 IJobHandler;
执行完这些操作后,jobThread 就会从队列取出调度参数执行 job。
3.2 构造 JobThread
在构造 JobThread 的时候,需要考虑一个问题:我们怎么避免一个 job 被重复调用?
因为 admin 是可以调用一个 job 多次的,所以无法根据 job id 来去重。
在 admin 服务中,每次调度都创建一条 log,executor 利用这个特性,通过对 job log id 去重,实现对 job 的去重,避免 admin 服务一个请求被重复发送。所以,在下面代码的 pushTriggerQueue 方法中,logId 添加到 triggerLogIdSet 成功后,才会继续添调度参数加到 triggerQueue。
private int jobId;
private IJobHandler handler;
private LinkedBlockingQueue<TriggerParam> triggerQueue;
private Set<Long> triggerLogIdSet;
private boolean running = false;
private int idleTimes = 0;
public JobThread(int jobId, IJobHandler handler) {
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue<>();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<>());
this.setName("xxl-job, JobThread-" + jobId + "-" + System.currentTimeMillis());
}
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
问题:如果一个 job 在不同的 admin 服务有不同的 id 值,它在 executor 实例中会有多个 jobThread 吗?
如果 executor 注册到了不同的 admin 服务(不同的 IP 地址、不同的数据库),那么同一个 job 在两个 admin 服务上有两个不同的 job id,在 executor 实例中会有两个 jobThread。但是,在前面的“注册逻辑、回调逻辑”中我们已经知道,即使 admin 是集群部署、多实例部署,也共用一个数据库,所以一个 job 只会有一个 job id,只有一个 jobThread。
问题:一个 job 可以有多个 jobThread 吗?
可以,如果一个 job (一个 job 的含义是,使用同一个 jobHandler 执行的任务)在 admin 服务上创建了多条记录,那么就有多个 job id,在 executor 上就有多个 jobThread。
一般而言,一个 job 只会创建一条记录,所以我们前面说,一个 job 对应一个 jobHandler,对应一个 jobThread。
问题:为什么 triggerQueue 要是线程安全的?
从前面知道,该方法是在一个线程池中创建线程后执行的,所以要把参数队列设置为线程安全的。
3.3 执行调度
run 方法是 JobThread 执行 jobHandler 调度的地方,该方法的主要逻辑如下:
1、执行 JobHandler 的 init 方法,进行初始化;
2、取出调度参数;
3、记录调度日志;
4、执行 JobHandler 的 execute 方法,执行 Job;
5、移除长时间不用的 jobThread;
6、封装成回调参数,添加到回调队列;
7、关闭线程时,把剩余参数添加到回调队列;
8、关闭线程时,执行 JobHandler 的 destroy 方法;
主要逻辑的代码如下:
1、执行 JobHandler 的 init 方法,进行初始化
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
2、取出调度参数
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
这里为了避免阻塞,使用的是 poll 而不是 take。如果使用 take,队列中没有参数时会一直阻塞在这个步骤,这时手动关闭线程,也不会跳出循环。
3、记录调度日志
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
XxlJobContext.setXxlJobContext(xxlJobContext);
if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
XxlJobHelper.handleFail("job handle result lost.");
} else {
String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
tempHandleMsg = (tempHandleMsg != null && tempHandleMsg.length() > 50000)
? tempHandleMsg.substring(0, 50000).concat("...")
: tempHandleMsg;
XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
}
XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
+ XxlJobContext.getXxlJobContext().getHandleCode()
+ ", handleMsg = "
+ XxlJobContext.getXxlJobContext().getHandleMsg()
);
4、执行 JobHandler 的 execute 方法
if (triggerParam.getExecutorTimeout() > 0) {
Thread futureThread = null;
try {
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute();
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
XxlJobHelper.log(e);
XxlJobHelper.handleTimeout("job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
handler.execute();
}
5、移除长时间不用的 jobThread
running = false;
idleTimes++;
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
if (idleTimes > 30) {
if (triggerQueue.size() == 0) {
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
每次执行 job前,idleTimes 参数会加1,如果从 triggerQueue 中取出了触发参数,能够正常执行 job 了,就把该参数值设置为0。如果超过 30 次,都没有执行,说明该 job 长时间没有被触发,就移除该 jobThread(从 triggerQueue 中取出参数超时时间是3秒,一个 JobThread 如果 90 秒没有执行过,就会被移除)。
通过这种方式,减少无用线程的个数,提升线程的利用率。
6、封装成回调参数,添加到回调队列
finally {
if (triggerParam != null) {
if (!toStop) {
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg())
);
} else {
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job running, killed]")
);
}
}
}
这里把回调参数添加到会回调队列,然后被 《2.4 TriggerCallbackThread》的回调线程消费。
7、关闭线程时,把剩余参数添加到回调队列
while (triggerQueue != null && triggerQueue.size() > 0) {
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam != null) {
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job not executed, in the job queue, killed.]")
);
}
}
5、关闭线程时,执行 JobHandler 的 destroy 方法
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
这样,一个完整的 job 调度过程就结束了,它的调度结果会被回调线程处理。
执行 job 的过程可以简单表示为:embedServer --> netty --> threadPool --> thread --> executorBiz --> jobThread --> jobHandler。
总结
从前面介绍可以总结出以下内容:
1、在初始化 executor 时,初始化了 executor 自身 Bean 信息、adminBiz 对象列表、日志文件线程、回调线程;
2、初始化 EmbedServer 服务时,会初使用 注册线程注册 executor、始化 netty 进程来监听端口、线程池 + 线程、JobThread 来处理请求;
可以看出,大部分的操作(日志、回调、注册、请求处理)都是多线程异步执行的,提高了系统的并发性能。特别是在处理请求的过程中,使用 netty 接收请求后,再交给一个线程池把调度参数 push 进队列,再由对应的 jobThread 来异步处理,这样可以解耦 netty 监听和处理请求之间的逻辑,避免相互阻塞。
参考阅读
1、xxl-job学习(二)——执行器的启动过程源码分析
2、netty的引导程序ServerBootStrap
附录
@XxlJob
注解 @XxlJob 的源码如下,一共有三个参数,各个参数功能如下:
- name:该 xxlJob 的唯一标识名;
- init:Bean 中该 job 的初始化方法;
- destroy:Bean 中该 job 的销毁方法;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlJob {
String value();
String init() default "";
String destroy() default "";
}
IJobHandler
IJobHandler 是 JobHandler 的抽象类,主要提供三个抽象方法:execute、init、destroy,分别用于:
- jobHandler 的执行逻辑
- jobHandler 的初始化逻辑
- jobHandler 的销毁逻辑
源码如下:
public abstract class IJobHandler {
public abstract void execute() throws Exception;
public void init() throws Exception {
}
public void destroy() throws Exception {
}
}
它的子类必须要实现 execute() 方法,它一共有三个子类,UML 图如下所示,我们使用的主要是 MethodJobHandler:
MethodJobHandler
MethodJobHandler 是在把 @XxlJob 注解所在的方法封装而成的对象,它持有注解 @XxlJob 所在的 Bean 对象、执行方法、初始化方法、销毁方法,方法实际执行的是 Bean 的对应方法,代码如下:
public class MethodJobHandler extends IJobHandler {
private final Object target;
private final Method method;
private Method initMethod;
private Method destroyMethod;
public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
this.target = target;
this.method = method;
this.initMethod = initMethod;
this.destroyMethod = destroyMethod;
}
@Override
public void execute() throws Exception {
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]);
} else {
method.invoke(target);
}
}
@Override
public void init() throws Exception {
if(initMethod != null) {
initMethod.invoke(target);
}
}
@Override
public void destroy() throws Exception {
if(destroyMethod != null) {
destroyMethod.invoke(target);
}
}
}
UML 图如下所示:
参数配置
### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30
|