IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Elastic-Job的使用、进行动态 CRUD,以及使用过程中遇到的一些问题 -> 正文阅读

[大数据]Elastic-Job的使用、进行动态 CRUD,以及使用过程中遇到的一些问题

Elastic-Job的使用、进行动态 CRUD,以及使用过程中遇到的一些问题

目录:

1、elastic-job 能干什么

2、elastic-job 怎么用

1. 引入依赖
2. yml配置文件
3. 自定义注解
4. zookeeper配置类
5. elasticjob配置类
6. 动态新增、删除定时任务job Handler类
7. Demo

3、遇到过的一些问题


1、elastic-job 能干什么

Elastic-job:当当网基于quartz 二次开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分
布式协调,实现任务高可用以及分片。

官网介绍:https://shardingsphere.apache.org/elasticjob/index.html

ElasticJob 是一个分布式调度解决方案,由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。
ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务;
ElasticJob-Cloud 使用 Mesos 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。

官网已更新到3.x版本,这里就不做介绍了。

新的技术不一定是最适合的。


2、elastic-job 怎么用

SpringBoot项目引入elasticjob

2.1引入依赖:

        <!-- elasticJob -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
          <!--由于我的项目,google存在版本冲突,故排除 用的时候大家注意下-->
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 引入elastic-job-lite核心模块 -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

2.2 yml配置文件

spring:
	#自定义配置
  elastic-job:
    url: jdbc:mysql:你的数据库地址?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
    username: xxx
    password: xxxx
    poolName: elasticPool
    connectionTimeout: 60000
    maxLifetime: 1800000
    maximumPoolSize: 15
    minimumIdle: 5
  zk:
    maxRetries: 9
    namespace: elastic-job
    serverList: 127.0.0.1:2181

2.3 自定义注解
/**
 * 自定义任务定时注解
 *
 * @author pengwei
 * @date 2021/3/29
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {

    /**
     * cron表达式,用于控制作业触发时间
     *
     * @return
     */
    @AliasFor("cron")
    String value() default "";

    /**
     * cron表达式,用于控制作业触发时间
     *
     * @return
     */
    @AliasFor("value")
    String cron() default "";

    /**
     * 任务名称
     *
     * @return
     */
    String jobName() default "";

    /**
     * 总分片数
     *
     * @return
     */
    int shardingTotalCount() default 1;

    /**
     * 分片参数
     *
     * @return
     */
    String shardingItemParameters() default "";

    String jobParameter() default "";


    /**
     * 任务描述
     *
     * @return
     */
    String description() default "";

    /**
     * 是否自动失效转移
     *
     * @return
     */
    boolean misfire() default false;

    /**
     * 错过是否重执行
     *
     * @return
     */
    boolean failover() default false;

    /**
     * 作业是否启动时禁止
     *
     * @return
     */
    boolean disabled() default false;

}

2.4 配置类
/**
 * elastic job 配置类
 * @author pengwei
 * @date 2021/3/29
 */
@Configuration
@RequiredArgsConstructor
public class ElasticJobAutoConfiguration {

    private final ApplicationContext applicationContext;


    @Value("${spring.elastic-job.url}")
    private String url;

    @Value("${spring.elastic-job.username}")
    private String username;

    @Value("${spring.elastic-job.password}")
    private String password;

    @Value("${spring.elastic-job.connectionTimeout}")
    private Long connectionTimeout;

    @Value("${spring.elastic-job.maxLifetime}")
    private Long maxLifetime;

    @Value("${spring.elastic-job.minimumIdle}")
    private Integer minimumIdle;

    @Value("${spring.elastic-job.maximumPoolSize}")
    private Integer maximumPoolSize;

    @Value("${spring.elastic-job.poolName}")
    private String poolName;


    private final ZookeeperRegistryCenter zookeeperRegistryCenter;

    @Bean
    public void initElasticJob() {

        Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);

        for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
            SimpleJob simpleJob = entry.getValue();
            ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
            if (elasticSimpleJobAnnotation == null) {
                continue;
            }
            String jobName = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.jobName(), simpleJob.getClass().getSimpleName());
            String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());

            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobName, cron,
                    elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(),
                    simpleJob.getClass().getCanonicalName());
            LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
                    // jobShardingStrategyClass:分片策略
                    .jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName())
                    .build();

            HikariDataSource dataSource = new HikariDataSource();
            dataSource.setJdbcUrl(url);
            dataSource.setUsername(username);
            dataSource.setPassword(password);
            dataSource.setConnectionTimeout(connectionTimeout);
            dataSource.setMaxLifetime(maxLifetime);
            dataSource.setMinimumIdle(minimumIdle);
            dataSource.setMaximumPoolSize(maximumPoolSize);
            dataSource.setPoolName(poolName);
            JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
            SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, zookeeperRegistryCenter, liteJobConfiguration, jobEventRdbConfiguration);
            jobScheduler.init();
        }
    }
}
2.5 zookeeper配置类
/**
 * @author pengwei
 * @date 2021/3/29
 */
@Getter
@Configuration
public class ZkRegistryConfiguration {

    /**
     * zookeeper地址信息(zookeeper要部署集群)
     */
    @Value("${spring.zk.serverList}")
    private String serverList;

    /**
     * 命名空间
     */
    @Value("${spring.zk.namespace}")
    private String namespace;

    @Value("${spring.zk.maxRetries}")
    private int maxRetries;

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter registryCenter() {
        ZookeeperConfiguration configuration = new ZookeeperConfiguration(serverList, namespace);
        configuration.setMaxRetries(maxRetries);
        return new ZookeeperRegistryCenter(configuration);
    }

}

2.6 动态新增、删除定时任务job
传参可以封装成一个对象
/**
 * 新增、更新、删除任务
 *
 * @author pengwei
 * @date 2021/3/31
 */
@Component
@RequiredArgsConstructor
public class ElasticJobHandler {

    private final ZookeeperRegistryCenter zookeeperRegistryCenter;


    /**
     * 添加、更新任务  
     * 
     * 传参可以封装成一个对象
     * @param jobName:任务名称,注意不要带#特殊字符。
     * @param simpleJob:实现simpleJob自己的任务类
     * @param cron
     * @param shardingTotalCount:分片总数
     * @param shardingItemParameters:个性化参数,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
     */
    public void addJob(final String jobName, final SimpleJob simpleJob, final String cron, final int shardingTotalCount,
                       final String shardingItemParameters) {
        simpleJobScheduler(jobName, simpleJob, cron, shardingTotalCount, shardingItemParameters).init();
    }


    private JobScheduler simpleJobScheduler(final String jobName, final SimpleJob simpleJob, final String cron, final int shardingTotalCount,
                                            final String shardingItemParameters) {
        LiteJobConfiguration liteJobConfiguration = getLiteJobConfiguration(jobName, simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters);
        return new SpringJobScheduler(simpleJob, zookeeperRegistryCenter, liteJobConfiguration);
    }

    private LiteJobConfiguration getLiteJobConfiguration(final String jobName, final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {

        /**
         * JobCoreConfiguration.newBuilder(String jobName, String cron, int shardingTotalCount)
         * 其中jobName是任务名称
         * cron是cron表达式
         * shardingTotalCount是分片总数
         * 这样就可以把jobClass.getName()变成我们自己命名的jobName
         */
//        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
//                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();

        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
                jobName, cron, shardingTotalCount)
                .shardingItemParameters(shardingItemParameters)
                //开启失效转移
                .misfire(true)
                .build();


        JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());

        return LiteJobConfiguration.newBuilder(jobTypeConfiguration)
                // overwrite:本地配置是否可覆盖注册中心配置
                .overwrite(true)
                // jobShardingStrategyClass:分片策略
                .jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName())
                .build();


    }

    /**
     * 删除定时任务
     *
     * @param jobName
     */
    public void removeJob(String jobName) {
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (jobScheduleController != null) {
            //暂停任务
            jobScheduleController.pauseJob();
            //关闭调度器
            jobScheduleController.shutdown();
            //删除节点
            zookeeperRegistryCenter.remove("/" + jobName);

        }

    }

    /**
     * 暂停任务
     *
     * @param jobName
     */
    public void pauseJob(String jobName) {
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (jobScheduleController != null) {
            jobScheduleController.pauseJob();
        }
    }


    /**
     * 添加、更新任务,封装对象
     *
     * @param elasticJobPO
     * @param simpleJob
     */
    public void addJobToBean(ElasticJobPO elasticJobPO, final SimpleJob simpleJob) {
        simpleJobScheduler(elasticJobPO.getJobName(), simpleJob, elasticJobPO.getCron(), elasticJobPO.getShardingTotalCount(), elasticJobPO.getShardingItemParameters()).init();
    }

    /**
     * 立刻启动作业
     *
     * @param jobName
     * @param
     */
    public boolean start(String jobName) {
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (jobScheduleController != null) {
            jobScheduleController.triggerJob();
            return true;
        }
        return false;
    }

2.7 示例
@Test	
public void add(ElasticJobPO defaultJob) {
        elasticJobHandler.addJob(defaultJob.getJobName(), defaultTaskJob, defaultJob.getCron(),
                defaultJob.getShardingTotalCount(), defaultJob.getShardingItemParameters());
    }

3、问题描述

问题1:

暂停、和删除任务有的时候会无效; (可以通过运维平台进行调试)。

研究了源码和它是采用zookeeper实现分布式协调的本质,采用删除节点的方式可以解决。

  			   //删除节点
            zookeeperRegistryCenter.remove("/" + jobName);

问题2:

官网3.0X下载的运维监控平台的版本不能兼容2.X的

elasticjob-console-2.1.6 链接: https://pan.baidu.com/s/1FKP7AmO58GwIsRD2dfRa6w 密码: l60p

账号密码:root root
在这里插入图片描述


以上就是在项目开发过程使用遇到的一些问题,如果大家有其他发现,欢迎留言一起讨论。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-29 11:43:03  更:2021-07-29 11:45:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/2 23:02:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码