之前做异步的时候通常都是用消息队列来做,今天使用spring的异步任务
首先异步任务需要定义线程池,spring异步提供了默认线程池,但是线程池通常都是自己定义,这样更能符合业务需求。
将自定义线程池注入到sprin中,并设置异步任务异常捕获处理器
package com.imooc.ecommerce.config;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Created By 丛梓祺 on 2021/11/9
* Write this code and change the world
* 自定义异步任务线程池 异步任务异常捕获处理器
*/
@Slf4j
@EnableAsync //开启spring 异步任务支持
@Configuration
public class AsyncPoolConfig implements AsyncConfigurer {
/**
* 将自定义线程池注入到spring容器中
*
* @return
*/
@Bean
@Override
public Executor getAsyncExecutor() {
//spring提供的自定义线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(20);
executor.setKeepAliveSeconds(60);
//给这个线程池里的线程起一个名字,这个很重要
executor.setThreadNamePrefix("czq-async-");
//等待所有任务结果后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//线程池的等待时间
executor.setAwaitTerminationSeconds(60);
// 定于拒绝策略
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.initialize();
return executor;
}
/**
* 指定系统中的异步任务在出现异常的时候使用到的处理器
*
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
/**
* 异步任务异常捕获处理器
*/
class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
ex.printStackTrace();
log.error("Async Error:[{}], Method:[{}], Param:[{}]", ex.getMessage(), method.getName(), JSON.toJSONString(params));
//todo 发送邮件或者是短信
}
}
}
商品的异步任务入库,将数据存储到redis和db中,这里主要是面向运营使用,所以入库的时候要保证三点
? ? ? ? 1.判断商品的属性是否符合正常规范
? ? ? ? 2.新入库商品中没有重复的数据,如果有重复的数据直接返回,入库不成功,让运营同学进行判断商品去重,这里要保证商品唯一,在数据库中添加唯一键。
? ? ? ? 实现方式,将保证唯一的属性进行拼接存放到set当中,进行判断是否存在。
? ? ? ? 3.保证添加的商品在数据库中没有重复
? ? ? ? 实现方式,只需要每次从数据中根据唯一键去查询,判断是否有重复数据。如果有重复数据就去除重复数据不添加
存入redis中的数据应该是简化的商品详情,如果将商品的全部详情都存入,一是没有必要的,用户访问数据得时候并不是都会查看所有详情,二是,数据量过大占用缓存过多
package com.imooc.ecommerce.service.async;
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.constant.GoodsConstant;
import com.imooc.ecommerce.dao.EcommerceGoodsDao;
import com.imooc.ecommerce.entity.EcommerceGoods;
import com.imooc.ecommerce.goods.GoodsInfo;
import com.imooc.ecommerce.goods.SimpleGoodsInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.IterableUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Created By 丛梓祺 on 2021/11/9
* Write this code and change the world
* <>异步服务接口实现</>
*/
@Service
@Transactional
@Slf4j
public class AsyncServiceImpl implements IAsyncService {
@Autowired
private EcommerceGoodsDao ecommerceGoodsDao;
@Autowired
private RedisTemplate redisTemplate;
/**
* 异步任务处理两件事,将商品信息保存到数据表
* 更新商品缓存
*
* @param goodsInfos
* @param taskId 异步任务管理器的id
*/
//指定自定义线程池,不指定就是使用spring默认线程池
@Async("getAsyncExecutor")
@Override
public void asyncImportGoods(List<GoodsInfo> goodsInfos, String taskId) {
log.info("async task running taskId :[{}]", taskId);
StopWatch stopWatch = StopWatch.createStarted();
//1.如果是goodsInfo中存在重复商品,不保存,直接返回,记录错误日志
//请求数据是否合法的标记
boolean isIllegal = false;
//将商品信息字段 joint 在一起,用来判断是否存在重复
Set<String> goodsJointInfos = new HashSet<>(goodsInfos.size());
//过滤出来的可以入库的商品信息 按照自己的业务需求定义
List<GoodsInfo> filteredGoodsInfo = new ArrayList<>(goodsInfos.size());
//走一遍循环,过滤非法参数与判定当前请求是否合法
for (GoodsInfo goods : goodsInfos) {
//基本条件不满足的直接过滤
if (goods.getPrice() <= 0 || goods.getSupply() <= 0) {
log.info("goods info is invalid:[{}]", JSON.toJSONString(goods));
continue;
}
//组合每一个商品信息
String jointInfo = String.format("%s,%s,%s", goods.getGoodsCategory(), goods.getBrandCategory(), goods.getGoodsName());
if (goodsJointInfos.contains(jointInfo)) {
isIllegal = true;
}
//加入到两个容器中
goodsJointInfos.add(jointInfo);
filteredGoodsInfo.add(goods);
}
//如果存在重复商品或者没有需要入库的商品直接返回
if (isIllegal || CollectionUtils.isEmpty(filteredGoodsInfo)) {
stopWatch.stop();
log.warn("import nothing: [{}]", JSON.toJSONString(filteredGoodsInfo));
log.info("check and import goods done:[{}]", stopWatch.getTime(TimeUnit.SECONDS));
return;
}
List<EcommerceGoods> ecommerceGoods = filteredGoodsInfo.stream().map(EcommerceGoods::to)
.collect(Collectors.toList());
List<EcommerceGoods> targetGoods = new ArrayList<>(goodsInfos.size());
//2. 保存goodsInfo之前先判断下是否有存在重复的商品
ecommerceGoods.forEach(
//limit 1
g -> {
if (null != ecommerceGoodsDao.findFirst1ByGoodsCategoryAndBrandCategoryAndGoodsName(
g.getGoodsCategory(), g.getBrandCategory(), g.getGoodsName()
).orElse(null)) {
return;
}
targetGoods.add(g);
});
List<EcommerceGoods> savedGoods = IterableUtils.toList(
ecommerceGoodsDao.saveAll(targetGoods)
);
saveNewGoodsInfoToRedis(savedGoods);
log.info("save goods info to db and redis :[{}]", savedGoods.size());
stopWatch.stop();
log.info("check and import goods success:[{}ms]", stopWatch.getTime(TimeUnit.MINUTES));
}
/**
*将保存呆数据表中的数据保存到redis当中
* dict : key -> <id,simpleGoodsInfo(Json)>
* @param ecommerceGoods
*/
private void saveNewGoodsInfoToRedis(List<EcommerceGoods> ecommerceGoods) {
//由于redis是内存存储不应该保存大量的商品信息
List<SimpleGoodsInfo> simpleGoodsInfos = ecommerceGoods.stream()
.map(EcommerceGoods::toSimple)
.collect(Collectors.toList());
Map<String,String> id2JsonObject = new HashMap<>(simpleGoodsInfos.size());
simpleGoodsInfos.forEach(g->id2JsonObject.put(g.getId().toString(),JSON.toJSONString(g)));
//保存到redis中
redisTemplate.opsForHash().putAll(
GoodsConstant.ECOMMERCE_GOODS_DICT_KEY,
id2JsonObject
);
}
}
|