任务调度之Elastic-Job1

任务调度之Elastic-Job1目标1、了解Elastic-Job的基本特性2、掌握Elastic-Job开发与配置方式(包括Java开发和SpringBoot开发),掌握任务类型和任务分片策略3、了解Elastic-Job运维平台的使用4、掌握Elastic-Job运行原理内容定位适合了解了Quartz的调度模型之后,想要知道如何基于ZK配置Quartz和如何实现任务分片的同学Quartz-Misfire什么情况下错过触发?错过触发怎么办?线程池只有5个线程,当有5个任务都在执

大家好,欢迎来到IT知识分享网。

目标

1、了解Elastic-Job 的基本特性

2、掌握Elastic-Job 开发与配置方式(包括Java 开发和Spring Boot 开发),掌握任务类型和任务分片策略

3、了解Elastic-Job 运维平台的使用

4、掌握Elastic-Job 运行原理

内容定位

适合了解了Quartz 的调度模型之后,想要知道如何基于ZK 配置Quartz 和如何实现任务分片的同学

Quartz-Misfire

什么情况下错过触发?错过触发怎么办?

线程池只有5 个线程,当有5 个任务都在执行的时候,第六个任务即将触发,这个时候任务就不能得到执行。在quartz.properties 有一个属性misfireThreshold,用来定义触发器超时的”临界值”,也就是超过了这个时间,就算错过触发了。

例如,如果misfireThreshold 是60000(60 秒),9 点整应该执行的任务,9 点零1 分还没有可用线程执行它,就会超时(misfires)。

下面这些原因可能造成misfired job:

1、没有可用线程

2、Trigger 被暂停

3、系统重启

4、禁止并发执行的任务在到达触发时间时,上次执行还没有结束。

错过触发怎么办?Misfire 策略设置

每一种Trigger 都定义了自己的Misfire 策略,不同的策略通过不同的方法来设置。
standalone 工程MisfireTest

Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
	.withSchedule(SimpleScheduleBuilder.simpleSchedule().
		withMisfireHandlingInstructionNowWithExistingCount().
		withIntervalInSeconds(1).
		repeatForever()).build();

大体上来说有3 种:

1、忽略

2、立即跑一次

3、下次跑

详细内容参考:

https://gper.club/articles/7e7e7f7ff7g59gc5g69

怎么避免任务错过触发?

合理地设置线程池数量,以及任务触发间隔。

认识E-Job

任务调度高级需求

Quartz 的不足:

1、作业只能通过DB 抢占随机负载,无法协调

2、任务不能分片——单个任务数据太多了跑不完,消耗线程,负载不均

3、作业日志可视化监控、统计

发展历史

E-Job 是怎么来的?

在当当的ddframe 框架中,需要一个任务调度系统(作业系统)

任务调度之Elastic-Job1

实现的话有两种思路,一个是修改开源产品,一种是基于开源产品搭建(封装),当当选择了后者,最开始这个调度系统叫做dd-job。它是一个无中心化的分布式调度框架。因为数据库缺少分布式协调功能(比如选主),替换为Zookeeper 后,增加了弹性
扩容和数据分片的功能。

Elastic-Job 是ddframe 中的dd-job 作业模块分离出来的作业框架,基于Quartz和Curator 开发,在2015 年开源。

轻量级,无中心化解决方案。

为什么说是去中心化呢?因为没有统一的调度中心。集群的每个节点都是对等的,节点之间通过注册中心进行分布式协调。E-Job 存在主节点的概念,但是主节点没有调度的功能,而是用于处理一些集中式任务,如分片,清理运行时信息等。

思考:如果ZK 挂了怎么办?

每个任务有独立的线程池。

从官网开始

http://elasticjob.io/docs/elastic-job-lite/00-overview/

https://github.com/elasticjob

Elastic-Job 最开始只有一个elastic-job-core 的项目,在2.X 版本以后主要分为Elastic-Job-Lite 和Elastic-Job-Cloud 两个子项目。其中,Elastic-Job-Lite 定位为轻量级无中心化解决方案, 使用jar 包的形式提供分布式任务的协调服务。而Elastic-Job-Cloud 使用Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务(跟Lite 的区别只是部署方式不同,他们使用相同的API,只要开发一次)。

功能特性

分布式调度协调:用ZK 实现注册中心

错过执行作业重触发(Misfire)

支持并行调度(任务分片)

作业分片一致性,保证同一分片在分布式环境中仅一个执行实例

弹性扩容缩容:将任务拆分为n 个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job 将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。

失效转移failover:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。

支持作业生命周期操作(Listener)

丰富的作业类型(Simple、DataFlow、Script)

Spring 整合以及命名空间提供

运维平台

项目架构

应用在各自的节点执行任务,通过ZK 注册中心协调。节点注册、节点选举、任务分片、监听都在E-Job 的代码中完成。

任务调度之Elastic-Job1

Java 开发

工程:ejob-standalone

pom 依赖

<dependency>
	<groupId>com.dangdang</groupId>
	<artifactId>elastic-job-lite-core</artifactId>
	<version>2.1.5</version>
</dependency>

任务类型

standalone 工程

任务类型有三种:

SimpleJob

SimpleJob: 简单实现,未经任何封装的类型。需实现SimpleJob 接口。

ejob-standalone MySimpleJob.java

public class MyElasticJob implements SimpleJob {
	public void execute(ShardingContext context) {
		System.out.println(String.format("Item: %s | Time: %s | Thread: %s ",
		context.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()),
		Thread.currentThread().getId()));
	}
}

DataFlowJob

DataFlowJob:Dataflow 类型用于处理数据流,必须实现fetchData()和processData()的方法,一个用来获取数据,一个用来处理获取到的数据。

ejob-standalone MyDataFlowJob.java

public class MyDataFlowJob implements DataflowJob<String> {
	@Override
	public List<String> fetchData(ShardingContext shardingContext) {
		// 获取到了数据
		return Arrays.asList("leon","jack","seven");
	}
	@Override
	public void processData(ShardingContext shardingContext, List<String> data) {
		data.forEach(x-> System.out.println("开始处理数据:"+x));
	}
}

ScriptJob

Script:Script 类型作业意为脚本类型作业,支持shell,python,perl 等所有类型脚本。D 盘下新建1.bat,内容:

@echo ------【脚本任务】Sharding Context: %*

ejob-standalone script.ScriptJobTest

package script;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import dataflow.MyDataFlowJob;

public class ScriptJobTest {
    // 如果修改了代码,跑之前清空ZK
    public static void main(String[] args) {
        // ZK注册中心
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "ejob-standalone"));
        regCenter.init();

        // 定义作业核心配置
        JobCoreConfiguration scriptJobCoreConfig = JobCoreConfiguration.newBuilder("MyScriptJob", "0/4 * * * * ?", 2).build();
        // 定义SCRIPT类型配置
        ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptJobCoreConfig,"D:/1.bat");

        // 作业分片策略
        // 基于平均分配算法的分片策略
        String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();

        // 定义Lite作业根配置
        // LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();
        LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).build();

        // 构建Job
        new JobScheduler(regCenter, scriptJobRootConfig).init();
        // new JobScheduler(regCenter, scriptJobRootConfig, jobEventConfig).init();
    }

}

只要指定脚本的内容或者位置

E-Job 配置

配置步骤

配置手册:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/

1、ZK 注册中心配置(后面继续分析)

2、作业配置(从底层往上层:Core——Type——Lite)

配置级别 配置类 配置内容
Core JobCoreConfiguration 用于提供作业核心配置信息,如:作业名称、CRON 表达式、分片总数等。
Type JobTypeConfiguration 有3 个子类分别对应SIMPLE, DATAFLOW 和SCRIPT 类型作业,提供3 种作
业需要的不同配置,如:DATAFLOW 类型是否流式处理或SCRIPT 类型的命
令行等。Simple 和DataFlow 需要指定任务类的路径。
Root JobRootConfiguration 有2 个子类分别对应Lite 和Cloud 部署类型,提供不同部署类型所需的配
置,如:Lite 类型的是否需要覆盖本地配置或Cloud 占用CPU 或Memory
数量等。
可以定义分片策略。
http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/
public class SimpleJobTest {
	public static void main(String[] args) {
		// ZK 注册中心
		CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new
		ZookeeperConfiguration("localhost:2181", "elastic-job-demo"));
		regCenter.init();
		// 定义作业核心配置
		JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("MyElasticJob", "0/2 * * * * ?",
		1).build();
		// 定义SIMPLE 类型配置
		SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
		MyElasticJob.class.getCanonicalName());
		// 定义Lite 作业根配置
		LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
		// 构建Job
		new JobScheduler(regCenter, simpleJobRootConfig).init();
	}
}

作业配置分为3 级,分别是JobCoreConfiguration,JobTypeConfiguration 和LiteJobConfiguration 。LiteJobConfiguration 使用JobTypeConfiguration ,JobTypeConfiguration 使用JobCoreConfiguration,层层嵌套。

任务调度之Elastic-Job1

JobTypeConfiguration 根据不同实现类型分为SimpleJobConfiguration ,DataflowJobConfiguration 和ScriptJobConfiguration。

E-Job 使用ZK 来做分布式协调,所有的配置都会写入到ZK 节点。

ZK 注册中心数据结构

一个任务一个二级节点。

这里面有些节点是临时节点,只有任务运行的时候才能看到。

注意:修改了任务重新运行任务不生效,是因为ZK 的信息不会更新, 除非把overwrite 修改成true。

任务调度之Elastic-Job1

config 节点

JSON 格式存储。

存储任务的配置信息,包含执行类,cron 表达式,分片算法类,分片数量,分片参数等等。

{
    "jobName": "MySimpleJob",
    "jobClass": "job.MySimpleJob",
    "jobType": "SIMPLE",
    "cron": "0/2 * * * * ?",
    "shardingTotalCount": 1,
    "shardingItemParameters": "",
    "jobParameter": "",
    "failover": false,
    "misfire": true,
    "description": "",
    "jobProperties": {
        "job_exception_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler",
        "executor_service_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"
    },
    "monitorExecution": true,
    "maxTimeDiffSeconds": -1,
    "monitorPort": -1,
    "jobShardingStrategyClass": "",
    "reconcileIntervalMinutes": 10,
    "disabled": false,
    "overwrite": false
}

config 节点的数据是通过ConfigService 持久化到zookeeper 中去的。默认状态下,如果你修改了Job 的配置比如cron 表达式、分片数量等是不会更新到zookeeper 上去的,除非你在Lite 级别的配置把参数overwrite 修改成true。

LiteJobConfiguration simpleJobRootConfig =
LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();

instances 节点

同一个Job 下的elastic-job 的部署实例。一台机器上可以启动多个Job 实例,也就是Jar 包。instances 的命名是IP+@-@+PID。只有在运行的时候能看到。

任务调度之Elastic-Job1

leader 节点

任务调度之Elastic-Job1

任务实例的主节点信息,通过zookeeper 的主节点选举,选出来的主节点信息。在elastic job 中,任务的执行可以分布在不同的实例(节点)中,但任务分片等核心控制,需要由主节点完成。因此,任务执行前,需要选举出主节点。

下面有三个子节点:

election:主节点选举

sharding:分片

failover:失效转移

election 下面的instance 节点显示了当前主节点的实例ID:jobInstanceId。

election 下面的latch 节点也是一个永久节点用于选举时候的实现分布式锁。

sharding 节点下面有一个临时节点,necessary,是否需要重新分片的标记。如果分片总数变化,或任务实例节点上下线或启用/禁用,以及主节点选举,都会触发设置重分片标记,主节点会进行分片计算。

servers 节点

任务调度之Elastic-Job1

任务实例的信息,主要是IP 地址,任务实例的IP 地址。跟instances 不同,如果多个任务实例在同一台机器上运行则只会出现一个IP 子节点。可在IP 地址节点写入DISABLED 表示该任务实例禁用。

sharding 节点

任务调度之Elastic-Job1

任务的分片信息,子节点是分片项序号,从0 开始。分片个数是在任务配置中设置的。分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。最主要的子节点就是instance。

子节点名 是否临时节点 描述
instance 执行该分片项的作业运行实例主键
running 分片项正在运行的状态
仅配置monitorExecution 时有效
failover 如果该分片项被失效转移分配给其他作业服务器,则此节点值记录执行此分
片的作业服务器IP
misfire 是否开启错过任务重新执行
disabled 是否禁用此分片项

 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/12709.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信