一、业务背景
工作中遇到了一个需求如下:
1、给项目中的所有定时任务添加自检监控,确保能及时知道定时任务是否成功执行 2、有写定时任务中的方法并没有业务操作,只是通过线程池提交任务方式,开启多个子线程去执行任务,这些子线程的执行情况也需要记录。 3、多个执行任务的子线程中,若有其中一个失败,则整个监控结果定义为失败;全部成功,则整个监控结果定义为成功。 4、含有子线程失败时,只需要记录其中一个子线程的失败信息即可(因为错误信息可能都一样);所有子线程成功时,则说明定时任务成功,但得确保设置成功结果只操作了数据库一次(防止每个线程都操作设置一次成功)。 5、尽量做到代码简洁,因为这个自检监控并非主要业务逻辑,尽可能减少代码入侵。
二、分析实现
ThreadPoolExecutor pool = new ThreadPoolExecutor (100,100,60L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(200))
@Scheduled(initialDelay = 10000,fixedDelay = 3000)
public void test(){
List<App> appList = appService.list();
for(App app : appList){
pool.execute(()->{
try{
int i = 10/0;
}catch (Exception e){
}
});
}
}
public void setCheckResult(String id,String flag,String erro){
}
如上述简化后得代码,已经提供了定时任务和设置检查结果得方法,要求实现: 1、实现监控功能 2、代码简洁 3、性能尽量最优,也就是尽可能的降低对数据库操作,也不推荐用锁
2.1 方案一:将setCheckResult至于在方法结尾
ThreadPoolExecutor pool = new ThreadPoolExecutor (100,100,60L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(200))
@Scheduled(initialDelay = 10000,fixedDelay = 3000)
public void test(){
List<App> appList = appService.list();
for(App app : appList){
pool.execute(()->{
try{
int i = 10/0;
}catch (Exception e){
}
});
}
setCheckResult(...);
}
public void setCheckResult(...){
}
这是最初的监控方案,但是对于含子线程的定时任务,这种方案是达不到监控效果的。因为主线程执行完了,子线程未必执行完,也无法监控到子线程的执行状况
2.2 方案二:将setCheckResult至于run方法中
ThreadPoolExecutor pool = new ThreadPoolExecutor (100,100,60L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(200))
@Scheduled(initialDelay = 10000,fixedDelay = 3000)
public void test(){
List<App> appList = appService.list();
for(App app : appList){
pool.execute(()->{
try{
int i = 10/0;
setCheckResult(...);
}catch (Exception e){
setCheckResult(...);
}
});
}
}
public void setCheckResult(...){
}
这个方案看似达到监控效果了,但实际不仅没达到,而且还会造成多次重复写数据库,耗费资源。 假设一:所有线程失败,所有都重复执行了setCheckResult方法 假设二:所有线程都成功,setCheckResult也重复执行了 假设三:部分线程成功,部分失败。setCheckResult属于共享资源,若最后一个抢到资源的线程,决定了定时任务的成功与否。
因此,方案二也不可取。
2.3 方案三:设置两个标志位
ThreadPoolExecutor pool = new ThreadPoolExecutor (100,100,60L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(200))
@Scheduled(initialDelay = 10000,fixedDelay = 3000)
public void test(){
Map<String, String> errMap = new ConcurrentHashMap<>(2);
List<App> appList = appService.list();
for(App app : appList){
pool.execute(()->{
try{
int i = 10/0;
} catch (Exception e) {
if (errMap.get(ERRMSG) == null) {
errMap.put(ERRMSG, ERRMSG);
setCheckResult(...);
}
} finally {
if (errMap.get(ERRMSG) == null && errMap.get(SUCCESS) == null) {
errMap.put(SUCCESS, SUCCESS);
setCheckResult(...);
}
}
});
}
}
public void setCheckResult(...){
}
该方案,可以实现当多个线程出现错误时,成功限制操作数据库次数;当所有线程执行成功时,也成功限制了无效操作数据库次数。若决定map不好用,还可以采用AtomicReference,如下:
ThreadPoolExecutor pool = new ThreadPoolExecutor (100,100,60L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(200))
@Scheduled(initialDelay = 10000,fixedDelay = 3000)
public void test(){
AtomicReference<Boolean> error = new AtomicReference<>(false);
AtomicReference<Boolean> success = new AtomicReference<>(false);
List<App> appList = appService.list();
for(App app : appList){
pool.execute(()->{
try {
int i = 10/0;
} catch (Exception e) {
if (!error.get()) {
error.set(true);
}
} finally {
if (!error.get() && !success.get()) {
success.set(true);
}
}
});
}
}
public void setCheckResult(...){
}
三、小结
记录下某些特定场景的解决方案,备用。
|