LTS 分布式任务调度文档
LTS 介绍
LTS(light-task-scheduler)主要用于解决分布式任务调度问题,支持实时任务,定时任务,Cron 任务,Repeat 任务。有较好的伸缩性,扩展性,健壮稳定性而被多家公司使用,同时也希望开源爱好者一起贡献。
主要功能
- 支持分布式,解决多点故障,支持动态扩容,容错重试等
- Spring 扩展支持,SpringBoot 支持,Spring Quartz Cron 任务的无缝接入支持
- 节点监控支持,任务执行监控支持,JVM 监控支持
- 后台运维操作支持, 可以动态提交,更改,停止 任务
项目地址
LTS 技术架构
LTS 着力于解决分布式任务调度问题,将任务的提交者和执行者解耦,解决任务执行的单点故障,支持动态扩容,出错重试等机制。代码程序设计上,参考了优秀开源项目 Dubbo,Hadoop 的部分思想。
LTS 目前支持四种任务
- 实时任务:提交了之后立即就要执行的任务。
- 定时任务:在指定时间点执行的任务,譬如 今天 3 点执行(单次)。
- Cron 任务:CronExpression,和 quartz 类似(但是不是使用 quartz 实现的)譬如 0 0/1 * ?
- Repeat 任务:譬如每隔 5 分钟执行一次,重复 50 次就停止。
架构设计上,LTS 框架中包含以下五种类型的节点
- JobClient :主要负责提交任务, 并接收任务执行反馈结果。
- JobTracker :负责任务调度,接收并分配任务。
- TaskTracker :负责执行任务,执行完反馈给 JobTracker。
- LTS-Monitor :主要负责收集各个节点的监控信息,包括任务监控信息,节点 JVM 监控信息
- LTS-Admin :管理后台)主要负责节点管理,任务队列管理,监控管理等。
架构图

- Registry: 注册中心,LTS 提供多种实现,目前支持 zookeeper(推荐)和 redis, 主要用于 LTS 的节点信息暴露和 master 节点选举。
- FailStore: 失败存储,主要用于在部分场景远程 RPC 调用失败的情况,采取现存储本地 KV 文件系统,待远程通信恢复的时候再进行数据补偿。目前 FailStore 场景,主要有 RetryJobClient 提交**任务失败的时候,存储 FailStore;TaskTracker 返回任务执行结果给 JobTracker 的失败 时候,FailStore;TaskTracker 提交 BizLogger 的失败的时候,存储 FailStore. 目前 FailStore 有四种实现:leveldb,rocksdb,berkeleydb,mapdb(当然用户也可以实现扩展接口实现自己的 FailStore)
- QueueManager: 任务队列,目前提供 mysql(推荐)和 mongodb 两种实现(同样的用户可以自己扩容展示其他的,譬如 oracle 等),主要存储任务数据和任务执行日志等。
- RPC: 远程 RPC 通信框架,目前也支持多种实现,LTS 自带有 netty 和 mina,用户可以自行选择,或者自己 SPI 扩展实现其他的。
- NodeGroup: 节点组,同一个节点组中的任何节点都是对等的,等效的,对外提供相同的服务。譬如 TaskTracker 中有 10 个 nodeGroup 都是 send_msg 的节点组,专门执行发送短信的任务。每个节点组中都有一个 master 节点,这个 master 节点是由 LTS 动态选出来的,当一个 master 节点挂掉之后,LTS 会立马选出另外一个 master 节点,框架提供 API 监听接口给用户。
- ClusterName: LTS 集群,就如上图所示,整个图就是一个集群,包含 LTS 的五种节点。
节点图

- 一个节点组等同于一个集群,同一个节点组中的各个节点是对等的,外界无论连接节点组中的任务一个节点都是可以的。
- 每个节点组中都有一个 master 节点,采用 zookeeper 进行 master 选举(master 宕机,会自动选举出新的 master 节点),框架会提供接口 API 来监听 master 节点的变化,用户可以自己使用 master 节点做自己想做的事情。
- JobClient 和 TaskTracker 都可以存在多个节点组。譬如 JobClient 可以存在多个节点组。 譬如:JobClient 节点组为 ‘lts_WEB’ 中的一个节点提交提交一个 只有节点组为’lts_TRADE’的 TaskTracker 才能执行的任务。
- (每个集群中)JobTacker 只有一个节点组。
- 多个 JobClient 节点组和多个 TaskTracker 节点组再加上一个 JobTacker 节点组, 组成一个大的集群。
工作流程
- JobClient 提交一个 任务 给 JobTracker, 这里我提供了两种客户端 API, 一种是如果 JobTracker 不存在或者提交失败,直接返回提交失败。另一种客户端是重试客户端, 如果提交失败,先存储到本地 leveldb(可以使用 NFS 来达到同个节点组共享 leveldb 文件的目的,多线程访问,做了文件锁处理),返回给客户端提交成功的信息,待 JobTracker 可用的时候,再将任务提交。
- JobTracker 收到 JobClient 提交来的任务,先生成一个唯一的 JobID。然后将任务储存在 Mongo 集群中。JobTracker 发现有(任务执行的)可用的 TaskTracker 节点(组) 之后,将优先级最大,最先提交的任务分发给 TaskTracker。这里 JobTracker 会优先分配给比较空闲的 TaskTracker 节点,达到负载均衡。
- TaskTracker 收到 JobTracker 分发来的任务之后,执行。执行完毕之后,再反馈任务执行结果给 JobTracker(成功 or 失败[失败有失败错误信息]),如果发现 JobTacker 不可用,那么存储本地 leveldb,等待 TaskTracker 可用的时候再反馈。反馈结果的同时,询问 JobTacker 有没有新的任务要执行。
- JobTacker 收到 TaskTracker 节点的任务结果信息,生成并插入(mongo)任务执行日志。根据任务信息决定要不要反馈给客户端。不需要反馈的直接删除, 需要反馈的(同样 JobClient 不可用存储文件,等待可用重发)。
- JobClient 收到任务执行结果,进行自己想要的逻辑处理。
特性
LTS 可以完全不用 Spring 框架,但是考虑到很用用户项目中都是用了 Spring 框架,所以 LTS 也提供了对 Spring 的支持,包括 Xml 和注解,引入 lts-spring.jar 即可。
在 TaskTracker 端提供了业务日志记录器,供应用程序使用,通过这个业务日志器,可以将业务日志提交到 JobTracker,这些业务日志可以通过任务 ID 串联起来,可以在 LTS-Admin 中实时查看任务的执行进度。
SPI 扩展可以达到零侵入,只需要实现相应的接口,并实现即可被 LTS 使用,目前开放出来的扩展接口有
对任务队列的扩展,用户可以不选择使用 mysql 或者 mongo 作为队列存储,也可以自己实现。
对业务日志记录器的扩展,目前主要支持 console,mysql,mongo,用户也可以通过扩展选择往其他地方输送日志。
可以对 JobTracker,TaskTracker 节点进行资源监控,任务监控等,可以实时的在 LTS-Admin 管理后台查看,进而进行合理的资源调配。
LTS 框架提供四种执行结果支持,EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,并对每种结果采取相应的处理机制,譬如重试。
EXECUTE_SUCCESS: 执行成功,这种情况,直接反馈客户端(如果任务被设置了要反馈给客户端)。
EXECUTE_FAILED:执行失败,这种情况,直接反馈给客户端,不进行重试。
EXECUTE_LATER:稍后执行(需要重试),这种情况,不反馈客户端,重试策略采用 30s 的策略,默认最大重试次数为 10 次,用户可以通过参数设置修改这些参数。
EXECUTE_EXCEPTION:执行异常, 这中情况也会重试(重试策略,同上)
采用 FailStore 机制来进行节点容错,Fail And Store,不会因为远程通信的不稳定性而影响当前应用的运行。
负载均衡
JobClient 和 TaskTracker 会随机连接 JobTracker 节点组中的一个节点,实现 JobTracker 负载均衡。当连接上后,将一直保持连接这个节点,保持连接通道,知道这个节点不可用,减少每次都重新连接一个节点带来的性能开销。
JobTracker 分发任务时,是优先分配给最空闲的一个 TaskTracker 节点,实现 TaskTracker 节点的负载均衡。
健壮性(故障转移)
当节点组中的一个节点当机之后,自动转到其他节点工作。当整个节点组当机之后,将会采用存储文件的方式,待节点组可用的时候进行重发。
当执行任务的 TaskTracker 节点当机之后,JobTracker 会将这个 TaskTracker 上的未完成的任务(死任务),重新分配给节点组中其他节点执行。
因为 LTS 各个节点都是无状态的,所以支持动态增加删除节点,达到负载均衡的目的
环境准备
1. Java JDK
因为 LTS 是使用 Java 语言编写的,所以必须要有个 Java 编译运行环境,目前 LTS 支持 JDK1.6 及以上版本。
2. Maven
LTS 项目是基于 Maven 做项目依赖管理的,所以用户机器上需要配置 Maven 环境
因 LTS 目前支持 Zookeeper 和 Redis 作为注册中心,主要用于节点信息暴露、监听、master 节点选举。用于选择其一即可,建议 zookeeper。
LTS 目前支持 Mysql 和 mongodb 作为任务队列的存储引擎。用户同样的选择其中一个即可。
部署
部署建议
- Admin 后台: 建议 Admin 后台单独部署,默认会嵌入一个 Monitor
- Monitor:默认在 Admin 后台进程中有一个,如果一个不够,也可以单独启动多个
- JobTracker: 建议单独部署
- JobClient:,这个是提交任务的工程,一般是和业务相关的,所以会放在业 务工程中, 当然也要看业务场景
- TaskTracker,这个因为是跑任务的,具体看业务场景, 一般情况下也可以是独立部署
1. JobTracker 和 LTS-Admin 部署
提供(cmd)windows 和(shell)linux 两种版本脚本来进行编译和部署:
- 运行根目录下的 sh build.sh 或 build.cmd 脚本,会在 dist 目录下生成 lts-{version}-bin 文件夹
- 下面是其目录结构,其中 bin 目录主要是 JobTracker 和 LTS-Admin 的启动脚本。jobtracker 中是 JobTracker 的配置文件和需要使用到的 jar 包,lts-admin 是 LTS-Admin 相关的 war 包和配置文件。 lts-{version}-bin 的文件结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| -- lts-${version}-bin |-- bin | |-- jobtracker.cmd | |-- jobtracker.sh | |-- lts-admin.cmd | |-- lts-admin.sh | |-- lts-monitor.cmd | |-- lts-monitor.sh | |-- tasktracker.sh |-- conf | |-- log4j.properties | |-- lts-admin.cfg | |-- lts-monitor.cfg | |-- readme.txt | |-- tasktracker.cfg | |-- zoo | |-- jobtracker.cfg | |-- log4j.properties | |-- lts-monitor.cfg |-- lib | |-- *.jar |-- war |-- jetty | |-- lib | |-- *.jar |-- lts-admin.war
|
JobTracker 启动
如果你想启动一个节点,直接修改下 conf/zoo 下的配置文件,然后运行 sh jobtracker.sh zoo start 即可,如果你想启动两个 JobTracker 节点,那么你需要拷贝一份 zoo,譬如命名为 zoo2,修改下 zoo2 下的配置文件,然后运行 sh jobtracker.sh zoo2 start 即可。logs 文件夹下生成 jobtracker-zoo.out 日志。
LTS-Admin 启动
修改 conf/lts-monitor.cfg 和 conf/lts-admin.cfg 下的配置,然后运行 bin 下的 sh lts-admin.sh 或 lts-admin.cmd 脚本即可。logs 文件夹下会生成 lts-admin.out 日志,启动成功在日志中会打印出访问地址,用户可以通过这个访问地址访问了。

2. JobClient(部署)使用
需要引入 lts 的 jar 包有 lts-jobclient-{version}.jar,lts-core-{version}.jar 及其它第三方依赖 jar。
API 方式启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| JobClient jobClient = new RetryJobClient(); jobClient.setNodeGroup("test_jobClient"); jobClient.setClusterName("test_cluster"); jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); jobClient.start();
// 提交任务 Job job = new Job(); job.setTaskId("3213213123"); job.setParam("shopId", "11111"); job.setTaskTrackerNodeGroup("test_trade_TaskTracker"); // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式 // job.setTriggerTime(new Date()); // 支持指定时间执行 Response response = jobClient.submitJob(job);
|
Spring XML 方式启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean"> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_jobClient"/> <property name="masterChangeListeners"> <list> <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="jobFinishedHandler"> <bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/> </property> <property name="configs"> <props> <!-- 参数 --> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
|
Spring 全注解方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration public class LTSSpringConfig {
@Bean(name = "jobClient") public JobClient getJobClient() throws Exception { JobClientFactoryBean factoryBean = new JobClientFactoryBean(); factoryBean.setClusterName("test_cluster"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setNodeGroup("test_jobClient"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); return factoryBean.getObject(); } }
|
3. TaskTracker(部署使用)
需要引入 lts 的 jar 包有 lts-tasktracker-{version}.jar,lts-core-{version}.jar 及其它第三方依赖 jar。
定义自己的任务执行类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class MyJobRunner implements JobRunner { private final static BizLogger bizLogger = LtsLoggerFactory.getBizLogger(); @Override public Result run(Job job) throws Throwable { try { bizLogger.info("测试,业务日志啊啊啊啊啊");
} catch (Exception e) { return new Result(Action.EXECUTE_FAILED, e.getMessage()); } return new Result(Action.EXECUTE_SUCCESS, "执行成功了,哈哈"); } }
|
API 方式启动
1 2 3 4 5 6 7
| TaskTracker taskTracker = new TaskTracker(); taskTracker.setJobRunnerClass(MyJobRunner.class); taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); taskTracker.setNodeGroup("test_trade_TaskTracker"); taskTracker.setClusterName("test_cluster"); taskTracker.setWorkThreads(20); taskTracker.start();
|
Spring XML 方式启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start"> <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/> <property name="bizLoggerLevel" value="INFO"/> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_trade_TaskTracker"/> <property name="workThreads" value="20"/> <property name="masterChangeListeners"> <list> <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="configs"> <props> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
|
Spring 注解方式启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Configuration public class LTSSpringConfig implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Bean(name = "taskTracker") public TaskTracker getTaskTracker() throws Exception { TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean(); factoryBean.setApplicationContext(applicationContext); factoryBean.setClusterName("test_cluster"); factoryBean.setJobRunnerClass(MyJobRunner.class); factoryBean.setNodeGroup("test_trade_TaskTracker"); factoryBean.setBizLoggerLevel("INFO"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); factoryBean.setWorkThreads(20); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs);
factoryBean.afterPropertiesSet();
return factoryBean.getObject(); } }
|
包引入说明
1. JobTracker,JobClient,TaskTracker 都需要引入的包
1.1 lts-core
1 2 3 4 5
| <dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-core</artifactId> <version>${lts版本号}</version> </dependency>
|
1.2 zk 客户端包
二选一, 通过 addConfig(“zk.client”, “可选值: curator, zkclient, lts”) 设置, 如果用 lts,可以不用引入包
zkclient
1 2 3 4 5
| <dependency> <groupId>com.github.sgroschupf</groupId> <artifactId>zkclient</artifactId> <version>0.1</version> </dependency>
|
curator
1 2 3 4 5
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.9.1</version> </dependency>
|
zookeeper 包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zk.version}</version> <exclusions> <exclusion> <groupId>org.jboss.netty</groupId> <artifactId>netty</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
|
1.3 通讯包
netty 或者 mina, 二选一, 通过 addConfig(“lts.remoting”, “可选值: netty, mina”) 设置
netty
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.20.Final</version> </dependency>
|
mina
1 2 3 4 5
| <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.9</version> </dependency>
|
1.4 json 包
fastjson 或者 jackson, 二选一, 通过 addConfig(“lts.json”, “可选值: fastjson, jackson”) 设置
fastjson
1 2 3 4 5
| <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency>
|
jackson
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.6.3</version> </dependency>
|
1.5 日志包
可以选用 slf4j, jcl, log4j, 或者使用 jdk 原生 logger
LoggerFactory.setLoggerAdapter(“可选值: slf4j, jcl, log4j, jdk”), 不手动设置, 默认按这个顺序加载
log4j
1 2 3 4 5
| <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency>
|
slf4j
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency>
|
jcl
1 2 3 4 5
| <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging-api</artifactId> <version>1.1</version> </dependency>
|
1.6
如果需要 spring 的话,需要引入 lts-spring 及 spring 的相关包
1 2 3 4 5
| <dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-spring</artifactId> <version>${lts版本号}</version> </dependency>
|
2. 对于 JobTracker 端
2.1 必须引入的包:
1 2 3 4 5
| <dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-jobtracker</artifactId> <version>${lts版本号}</version> </dependency>
|
2.2 除了基础包之外还需要引入任务队列的包(可以是 mongo 或者 mysql)
mysql
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.26</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.14</version> </dependency>
|
mongo
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.mongodb.morphia</groupId> <artifactId>morphia</artifactId> <version>1.0.0-rc1</version> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.0.2</version> </dependency>
|
3. JobClient 需要引入的包
必须引入的包
1 2 3 4 5
| <dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-jobclient</artifactId> <version>${project.version}</version> </dependency>
|
FailStore 存储包(四选一)
通过 jobClient.addConfig(“job.fail.store”, “可选值: leveldb, mapdb, berkeleydb, rocksdb”) 设置
mapdb
1 2 3 4 5
| <dependency> <groupId>org.mapdb</groupId> <artifactId>mapdb</artifactId> <version>2.0-beta10</version> </dependency>
|
leveldb
1 2 3 4 5
| <dependency> <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> <version>1.2.7<version> </dependency>
|
berkeleydb
1 2 3 4 5
| <dependency> <groupId>com.sleepycat</groupId> <artifactId>je</artifactId> <version>5.0.73</version> </dependency>
|
rocksdb
1 2 3 4 5
| <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>3.10.1</version> </dependency>
|
3. TaskTracker 需要引入的包
必须引入的包
1 2 3 4 5
| <dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-tasktracker</artifactId> <version>${project.version}</version> </dependency>
|
FailStore 存储包(四选一)
通过 taskTracker.addConfig(“job.fail.store”, “可选值: leveldb, mapdb, berkeleydb, rocksdb”) 设置
mapdb
1 2 3 4 5
| <dependency> <groupId>org.mapdb</groupId> <artifactId>mapdb</artifactId> <version>2.0-beta10</version> </dependency>
|
leveldb
1 2 3 4 5
| <dependency> <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> <version>1.2.7<version> </dependency>
|
berkeleydb
1 2 3 4 5
| <dependency> <groupId>com.sleepycat</groupId> <artifactId>je</artifactId> <version>5.0.73</version> </dependency>
|
rocksdb
1 2 3 4 5
| <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>3.10.1</version> </dependency>
|