高效处理集成数据
从外部系统集成的数据一般需要经过一定的处理才能写入正式表,常见的实现需要满足下面几点:
- 定时任务触发集成数据后处理
- 防止定时任务频繁触发导致数据错乱
- 处理数据要足够快
- 大批量数据同时处理不能出现OOM
- 定时任务平台回调反馈结果
以上几点同时满足,集成数据才能准确。但是经过实践发现,团队写的代码很多都是重复的,可以考虑使用模板方法模式对集成数据进行优化。
1、算法结构
如图,抽象模板类AbstractIntegrateTemplate中的integrate方法定义了算法执行的骨架,该方法中的会调用抽象方法:countRawData、fetchRawData和insertUpdateData;这三个方法分别用来查询待处理数据总数、分批查询待处理数据、待处理数据插入正式表与更新处理状态等;具体的实现交给子类来做,这里的IntegrateBookData子类就实现了上述的三个抽象方法。
2、算法实现
首先定义抽象模板类AbstractIntegrateTemplate,该类的integrate方法定义集成数据的总体流程,为了防止出现定时任务频繁调用,导致数据错乱问题,引入分布式锁对任务进行加锁,保障一个任务完成之后,才能进行下一个任务;为了提高效率,引入线程池,采用多线程处理数据插入和数据更新
@Slf4j
public abstract class AbstractIntegrateTemplate {
private static final int BATCH_SIZE = 100;
private static final int MAX_TREAD_NUM = 10;
@Autowired
private RedissonClient redissonClient;
abstract int countRawData();
abstract public List<?> fetchRawData(int batchSize);
abstract public Integer insertUpdateData(List<?> list);
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public BasicResponse integrate(String bizKey) {
log.info("begin to integrate data,bizKey={}", bizKey);
RLock lock = redissonClient.getLock(bizKey);
int rowDataCount = 0;
Integer handledCount = 0;
long timeCost = 0;
try {
if (lock.tryLock(5, 3600, TimeUnit.SECONDS)) {
long startTime = System.currentTimeMillis();
rowDataCount = countRawData();
if (rowDataCount <= 0) {
return new BasicResponse("no new integration data need to handle");
}
int times = rowDataCount % (MAX_TREAD_NUM * BATCH_SIZE) == 0 ?
rowDataCount / (MAX_TREAD_NUM * BATCH_SIZE) : rowDataCount / (MAX_TREAD_NUM * BATCH_SIZE) + 1;
for (int i = 0; i < times; i++) {
List<?> rowData = fetchRawData(MAX_TREAD_NUM * BATCH_SIZE);
List<Future<Integer>> futureList = new ArrayList<>();
int treadNum = rowData.size() % BATCH_SIZE == 0 ?
rowData.size() / BATCH_SIZE : rowData.size() / BATCH_SIZE + 1;
for (int j = 0; j < treadNum; j++) {
List<?> list;
if ((j + 1) * BATCH_SIZE > rowData.size()) {
list = rowData.subList(j * BATCH_SIZE, rowData.size());
} else {
list = rowData.subList(j * BATCH_SIZE, (j + 1) * BATCH_SIZE);
}
int finalI = i;
int finalJ = j;
Future<Integer> futureTask = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Integer result = insertUpdateData(list);
int startIdx = finalI * (MAX_TREAD_NUM * BATCH_SIZE) + finalJ * BATCH_SIZE;
log.info("success to handle [{}-{}] for bizKey:{}", startIdx, startIdx + BATCH_SIZE, bizKey);
return result;
}
});
futureList.add(futureTask);
}
for (Future<Integer> futureTask : futureList) {
try {
Integer taskResult = (Integer) futureTask.get();
handledCount += taskResult;
log.info("Integration [{}] futureTask result={},current handledCount={}", bizKey,
taskResult, handledCount);
} catch (ExecutionException e) {
log.error("get future task result error:", e);
}
}
rowData.clear();
}
lock.unlock();
log.info("unlock [{}] success", bizKey);
timeCost = System.currentTimeMillis() - startTime;
} else {
return new BasicResponse("current task is blocked by previous one,please wait for a while");
}
} catch (InterruptedException e) {
log.error("get redisson lock error:", e);
}
return new BasicResponse(String.format("[%s] integration result->rowDataCount:%d, handledCount=%d," +
"cost time=%d s", bizKey, rowDataCount, handledCount, timeCost / 1000));
}
}
代码首先获取总的待处理数据总数,然后根据批次处理数量=线程数*单线程处理数量 计算出需要提取数据的数量,再循环去取待处理数据,每次取批次处理数量 条数据,再将数据按照单线程处理数量 进行拆分,新增线程进入线程池处理,接着阻塞等待多线程处理的结果,再进行下一个批次的处理
假设现在需要集成book数据,数据先进入book_ti_t表,然后进入book_t表,IntegrateBookData类继承AbstractIntegrateTemplate,实现其定义的抽取数据的三个抽象方法:
@Component
@Slf4j
public class IntegrateBookData extends AbstractIntegrateTemplate {
@Autowired
private BookTiMapper bookTiMapper;
@Autowired
private BookMapper bookMapper;
@Override
int countRawData() {
QueryWrapper<BookTiPo> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("handle_flag", HandleFlagEnum.NOT_HANDLE.getValue());
return bookTiMapper.selectCount(queryWrapper);
}
@Override
public List<?> fetchRawData(int batchSize) {
return bookTiMapper.selectByOffset(batchSize);
}
@Override
@Transactional
public Integer insertUpdateData(List<?> list) {
log.info("insertUpdateData list.size={}", list.size());
if (CollectionUtils.isEmpty(list)) {
return 0;
}
List<BookTiPo> bookTiPos = (List<BookTiPo>) list;
bookMapper.insertUpdateBook(bookTiPos);
UpdateWrapper<BookTiPo> updateWrapper = new UpdateWrapper<>();
List<Long> ids = bookTiPos.stream().map(BookTiPo::getId).collect(Collectors.toList());
updateWrapper.in("id", ids);
BookTiPo bookTiPo = BookTiPo.builder().handleFlag(HandleFlagEnum.HANDLED.getValue()).build();
bookTiMapper.update(bookTiPo, updateWrapper);
return list.size();
}
}
其中的BookTiMapper和BookMapper使用了mybatis-plus:
public interface BookTiMapper extends BaseMapper<BookTiPo> {
List<BookTiPo> selectByOffset(@Param("pageSize") int pageSize);
}
public interface BookMapper extends BaseMapper<BookPo> {
int insertUpdateBook(@Param("list")List<BookTiPo> list);
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("book_t")
public class BookPo {
@TableId("id")
private Long id;
@TableField("number")
private String number;
@TableField("name")
private String name;
@TableField("author")
private String author;
@TableField("price")
private BigDecimal price;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("book_ti_t")
public class BookTiPo {
@TableId("id")
private Long id;
@TableField("number")
private String number;
@TableField("name")
private String name;
@TableField("author")
private String author;
@TableField("price")
private BigDecimal price;
@TableField("handle_flag")
private Integer handleFlag;
}
xml如下:
<mapper namespace="com.xxx.xlt.mybatis.mapper.BookTiMapper">
<select id="selectByOffset" resultType="com.xxx.xlt.mybatis.model.po.BookTiPo">
SELECT
`id` AS id,
`name` AS NAME,
`number` AS number,
`price` AS price,
`author` AS author,
`handle_flag` AS handleFlag
FROM
book_ti_t t
WHERE
t.handle_flag = 0
LIMIT #{pageSize}
</select>
</mapper>
<mapper namespace="com.xxx.xlt.mybatis.mapper.BookMapper">
<insert id="insertUpdateBook">
<if test="list!=null and list.size()>0">
<foreach collection="list" item="po" index="idx" separator=";">
insert into book_t (
name,
number,
author,
price
) values (
#{po.name},
#{po.number},
#{po.author},
#{po.price}
)
on duplicate key update
name=values(name),
author=values(author),
price=values(price)
</foreach>
</if>
</insert>
</mapper>
创建服务类进行调用
@Service
@Slf4j
public class BookService implements IBookService {
@Autowired
private BookTiMapper bookTiMapper;
@Autowired
private IntegrateBookData integrateBookData;
@Override
public BasicResponse integrateData() {
return integrateBookData.integrate("IntegrateBookData");
}
@Override
public BasicResponse generateTestData(int total) {
if (total<=0) {
return null;
}
for (int i=0;i<total;i++) {
Random random = new Random();
BookTiPo tiPo = BookTiPo.builder().name(RandomStringUtils.randomAlphabetic(10))
.author(RandomStringUtils.randomAlphabetic(8))
.number(RandomStringUtils.randomAlphabetic(8))
.price(new BigDecimal(random.nextInt(100)))
.handleFlag(HandleFlagEnum.NOT_HANDLE.getValue())
.build();
bookTiMapper.insert(tiPo);
}
return new BasicResponse("success to generate:"+total);
}
}
|