Elastic-Job
1.1简介
Elastic-Job:Elastic-Job(项目开源地址)是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架,去掉了dd-job中的监控和ddframe接入规范部分。该项目基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发。由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。Elastic-Job-Cloud基于mesos运行,是mesos的Framework。这里介绍的是Elastic-Job-Lite。
1.2基本概念
- 分片概念 任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。
- 分片项与业务处理解耦 Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
- 个性化参数的使用场景 个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。
1.3快速入门
引入maven依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
1.3.1作业开发
作业开发:Elastic-Job提供Simple、Dataflow和Script(文中没介绍)3种作业类型。
a.Simple类型作业
public class MySimpleJob implements SimpleJob{
/**
* 执行作业.
* @param shardingContext 分片上下文
*/
@Override
public void execute(ShardingContext context) {
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date())
+ " 分片项 : "+context.getShardingItem()
+ " 总片数 : " + context.getShardingTotalCount());
}
}
b.Dataflow类型作业
public class MyDataflowJob implements DataflowJob{
/**
* 获取待处理数据.
* @param shardingContext 分片上下文
* @return 待处理的数据集合
*/
@Override
public List fetchData(ShardingContext shardingContext) {
return Arrays.asList("1","2","3");
}
/**
* 处理数据.
* @param shardingContext 分片上下文
* @param data 待处理数据集合
*/
@Override
public void processData(ShardingContext shardingContext, List data) {
System.out.println("处理数据:" + data.toString());
}
}
流式处理,可通过DataflowJobConfiguration配置是否为流式处理。流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
c.启动作业
public class JobDemo {
public static void main(String[] args) {
//作业调度器初始化作业
new JobScheduler(createRegistryCenter(),createJobConfiguration()).init();
//启动DataflowJob
setUpDataflowJob(createRegistryCenter());
}
//注册中心
private static CoordinatorRegistryCenter createRegistryCenter(){
//ZookeeperConfiguration构造方法两个参数,serverLists(连接Zookeeper服务器的列表,包括IP地址和端口号,,多个地址用逗号分隔)和namespace(命名空间)
CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("0.0.0.0:2181","elastic-job-demo"));
registryCenter.init();
return registryCenter;
}
//配置SimpleJob
private static LiteJobConfiguration createJobConfiguration(){
//创建简单作业配置构建器,三个参数为:jobName(作业名称),cron(作业启动时间的cron表达式),shardingTotalCount(作业分片总数)
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("mySimpleJob","0/10 * * * * ?",12).build();
//简单作业配置,第二个参数为jobClass
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());
//创建Lite作业配置构建器,参数jobConfig(作业配置)
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
//配置DataflowJob
private static void setUpDataflowJob(final CoordinatorRegistryCenter registryCenter){
JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder("myDataflowJob","0/10 * * * * ?",2).build();
//数据流作业配置,第三个参数为streamingProcess(是否为流式处理)
DataflowJobConfiguration dataflowJobConfiguration = new DataflowJobConfiguration(coreConfiguration,MyDataflowJob.class.getCanonicalName(),true);
new JobScheduler(registryCenter,LiteJobConfiguration.newBuilder(dataflowJobConfiguration).build()).init();
}
}
1.3.2作业配置
从JobDemo类中可以看出配置分为3个层级,分别是Core,Type和Root,每个层级使用相似于装饰者模式的方式装配。
Core对应JobCoreConfiguration,用于提供作业核心配置信息。
Type对应JobTypeConfiguration,有3个子类分别对应SIMPLE,DATAFLOW,SCRIPT类型作业,提供3种作业需要的不同配置。
Root对应JobRootConfiguration,有2个子类分别对应Lite和Cloud部署类型,提供不同部署类型所需的配置。