项目中经常遇到海量数据批量处理的问题,第一时间想到通过多线程来处理,要想海量数据快速的处理完成, 就需要使用多台服务器并行处理。本文演示如何在分布式环境下使用 ElasticJob 处理海量数据, 通过 Demo 展示多机任务如何分片,作业如何拆分,如何水平扩容等问题。
1?概述
Elastic Job 是面向互联网生态和海量任务的分布式调度解决方案。最初由当当基于 ZooKeeper、?Quartz 进行二次开发的分布式解决方案。2020年 6月,经过 Apache ShardingSphere 社区投票,接纳 ElasticJob 为其子项目。 目前 ElasticJob 的四个子项目已经正式迁入 Apache 仓库。
2?演示目标
- leader 节点选举;
- 任务分片, 分片是否均匀;
- 水平扩容;
3?演示源码
3.1?相关依赖包引入
- 演示代码基于 3.0.0-RC1 版本编写,环境要求: Java?8及以上版本、Maven 3.5.0?及以上版本、 ZooKeeper 3.6.0?及以上版本;
- 本文采用 Java 8、 Maven 3.6.3、 ZooKeeper 3.7.0、SpringBoot 2.3.4.RELEASE;
- ZooKeeper 3.7.0 下载地址:?https://apache.claz.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
- 项目启动前需要下载并启动 ZooKeeper:?./zkServer.sh start
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>3.0.0-RC1</version>
</dependency>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
3.2?配置文件
elasticjob:
zookeeper:
server-lists: localhost:2181
namespace: note-elasticjob
3.3 ElasticJob 配置类
@Configuration
@ConditionalOnExpression("'${elasticjob.zookeeper.server-lists}'.length() > 0")
public class ElasticJobConfig {
@Value("${elasticjob.zookeeper.server-lists}")
private String serverList;
@Value("${elasticjob.zookeeper.namespace}")
private String namespace;
@Bean(initMethod = "init")
public CoordinatorRegistryCenter coordinatorRegistryCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
// 失败重试次数
zookeeperConfiguration.setMaxRetries(3);
// 20 分钟
zookeeperConfiguration.setSessionTimeoutMilliseconds(20 * 60 * 1000);
CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
registryCenter.init();
return registryCenter;
}
}
3.4?构建 Job 生成器
@Component
public class ElasticJobGenerator {
protected final Logger logger = LoggerFactory.getLogger(ElasticJobGenerator.class);
@Autowired
protected CoordinatorRegistryCenter coordinatorRegistryCenter;
public void build(String jobName, String jobParams, SimpleJob jobInstance, String cronStr, int shardingTotalCnt) {
JobConfiguration jobConfiguration = JobConfiguration.newBuilder(jobName, shardingTotalCnt)
.cron(cronStr)
.jobParameter(jobParams)
.misfire(false)
.overwrite(true).build();
ScheduleJobBootstrap scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter,
jobInstance, jobConfiguration);
try {
logger.info("开始创建任务");
scheduleJobBootstrap.schedule();
} catch (Exception e) {
logger.error("任务创建失败", e);
}
}
}
3.5?Job处理类
@Component
public class MyJob implements SimpleJob {
private static Logger logger = LoggerFactory.getLogger(MyJob.class);
@Override
public void execute(ShardingContext shardingContext) {
int shardingTotalCnt = shardingContext.getShardingTotalCount();
int shardingItem = shardingContext.getShardingItem();
String params = shardingContext.getJobParameter();
MDC.put("shardingItem", "sharding_" + shardingItem);
logger.debug("当前作业分片:{}", shardingItem);
logger.debug("作业参数: {}", params);
long startTime = System.currentTimeMillis();
int dataLen = 100;
logger.info("当前分片:{}, 开始执行任务", shardingItem);
for (int i = 0; i < dataLen; i++) {
if (i % shardingTotalCnt != shardingItem) {
logger.debug("当前分片:{}, 其他分片的任务: {}", shardingItem, i);
continue;
}
logger.debug("当前分片:{}, 任务: {}, 开始工作", shardingItem, i);
try {
// 模拟任务执行 1s
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
}
logger.debug("当前分片:{}, 任务: {}, 工作完成", shardingItem, i);
}
logger.info("当前分片:{},任务执行完成, 耗时: {}秒", shardingItem, (System.currentTimeMillis() - startTime) / 1000);
MDC.remove("shardingItem");
}
}
3.6 写个测试接口
@RestController
public class TestJobController {
@Autowired
protected ElasticJobGenerator elasticJobGenerator;
@Autowired
protected MyJob myJob;
@PostMapping("/test_job")
public String testJob(@RequestParam("jobName") String jobName, @RequestParam("shardingTotalCount") int shardingTotalCount) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ss mm HH dd MM ? yyyy");
Date startTime = new Date();
// 延迟 5秒后执行
startTime.setTime(startTime.getTime() + 5 * 1000);
elasticJobGenerator.build(jobName,
"",
myJob,
simpleDateFormat.format(startTime),
shardingTotalCount);
return "finished";
}
}
4?单机启动
- 启动参数配置:?-Dserver.port=8081
- 查看 ZooKeeper , 可以发现 Leader 选举情况, 以及分片情况
- 验证是否拆分了 10个线程执行, 输出日志中的分片个数;
# 分片拆分情况
grep -r -n 'test1' ~/logs/note-elastic-job/app/note-elastic-job.log | awk '{ for(i=1;i<=NF;i++) { if (match($i, /sharding_/)) {print $i} }}' | sort | uniq
输出结果 =>
[sharding_0]
[sharding_1]
[sharding_2]
[sharding_3]
[sharding_4]
[sharding_5]
[sharding_6]
[sharding_7]
[sharding_8]
[sharding_9]
# 查询任务是否分布均匀
grep -r -n 'elasticjob-test1' ~/logs/note-elastic-job/app/note-elastic-job.log | grep "sharding_[[:digit:]]\{1,2\}" | grep '工作完成' | awk '{count[$6]++;} END { for(i in count) {print "分片:" i " 任务个数: " count[i]} }'|sort
分片:[sharding_0] 任务个数: 10
分片:[sharding_1] 任务个数: 10
分片:[sharding_2] 任务个数: 10
分片:[sharding_3] 任务个数: 10
分片:[sharding_4] 任务个数: 10
分片:[sharding_5] 任务个数: 10
分片:[sharding_6] 任务个数: 10
分片:[sharding_7] 任务个数: 10
分片:[sharding_8] 任务个数: 10
分片:[sharding_9] 任务个数: 10
- 可以看出每个分片上任务拆分很均匀, 每个分片上平均都拆分 10个任务进行处理, 默认采用平均分片算法;?
# 任务执行时长
grep -r -n 'elasticjob-test1' ~/logs/note-elastic-job/app/note-elastic-job.log | grep '任务执行完成'
[elasticjob-test1-10] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_9] - 当前分片:9,任务执行完成, 耗时: 10秒 [elasticjob-test1-7] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_6] - 当前分片:6,任务执行完成, 耗时: 10秒 [elasticjob-test1-8] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_7] - 当前分片:7,任务执行完成, 耗时: 10秒 [elasticjob-test1-9] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_8] - 当前分片:8,任务执行完成, 耗时: 10秒 [elasticjob-test1-6] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_5] - 当前分片:5,任务执行完成, 耗时: 10秒 [elasticjob-test1-5] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_4] - 当前分片:4,任务执行完成, 耗时: 10秒 [elasticjob-test1-2] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_1] - 当前分片:1,任务执行完成, 耗时: 10秒 [elasticjob-test1-3] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_2] - 当前分片:2,任务执行完成, 耗时: 10秒 [elasticjob-test1-4] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_3] - 当前分片:3,任务执行完成, 耗时: 10秒 [elasticjob-test1-1] [INFO ] [cn.yuerbest.note.spring.boot.elasticjob.job.MyJob:44] [sharding_0] - 当前分片:0,任务执行完成, 耗时: 10秒
- 任务执行耗时均匀, 每个任务耗时 10秒完成;
- 原本单线程执行需要 100秒完成的任务, 现在进行 10个分片后, 只需要 10秒就可以完成任务了。
5?验证水平扩容效果
- 打包项目, 发布项目到两台测试服务器分别是 192.170.3.129 和 192.170.3.139;
# 打包项目
mvn clean package -Dmaven.test.skip=true -e
# 上传到服务器
scp target/note-spring-boot-elasticjob.jar vagrant@192.170.3.129:/home/vagrant/jar/note-elasticjob
# 启动项目
java -jar -Xmx1024M -Xss100M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heap.hprof -XX:+UseG1GC -Xloggc:g1-gc.log -Dspring.profiles.active=test note-spring-boot-elasticjob.jar &
# 查询任务是否分布均匀
grep -r -n 'elasticjob-test1' ~/logs/note-elastic-job/app/note-elastic-job.log | grep "sharding_[[:digit:]]\{1,2\}" | grep '工作完成' | awk '{count[$6]++;} END { for(i in count) {print "分片:" i " 任务个数: " count[i]} }'|sort
192.170.3.129 服务器的信息 =>
分片:[sharding_0] 任务个数: 10
分片:[sharding_1] 任务个数: 10
分片:[sharding_2] 任务个数: 10
分片:[sharding_3] 任务个数: 10
分片:[sharding_4] 任务个数: 10
192.170.3.139 服务器的信息 =>
分片:[sharding_5] 任务个数: 10
分片:[sharding_6] 任务个数: 10
分片:[sharding_7] 任务个数: 10
分片:[sharding_8] 任务个数: 10
分片:[sharding_9] 任务个数: 10
- 任务已经很均匀的分配到两台服务器, 水平扩容成功;
- 为了充分利用服务器的性能, 可以尝试增加分片数。
5 总结
- Elastic Job 可以用于处理分布式任务调度, 使用较为简单, 也可以很轻松的进行水平扩容, 不用再担心单机性能瓶颈;
- 根据实际服务器的配置进行合理的调整分片个数, 充分利用服务器资源, 不是分片个数越多越好, 根据实际业务逻辑进行合理调配, 达到最佳效果;
- 不同分片线程需要对根据分片信息合理拆分任务, 避免重复执行, 造成性能或数据问题;
- 如果有数据库操作, 增加分片的同时, 也要注意数据库连接池的大小, 避免连接池不够,大量任务处理失败;
- 根据实际业务场景,调整 JVM 参数, 充分利用内存, CPU 算力,避免频繁 Full GC;
- Demo 采用最新的?3.0.0-RC1 版本, 该版本对 JDK、 ZooKeeper、 Maven 都有版本要求, 为了达到更好的效果, 建议使用稳定版本,同时注意相关依赖的版本要求。
|