⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-listener/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Elastic-Job-Lite 作业监听器

涉及到主要类的类图如下( 打开大图 ):

  • 绿色监听器接口 ElasticJobListener,每台作业节点均执行。
  • 粉色监听器接口 AbstractDistributeOnceElasticJobListener,分布式场景中仅单一节点执行。
  • 蓝色类在 com.dangdang.ddframe.job.lite.internal.guarantee 里,保证分布式任务全部开始和结束状态。 AbstractDistributeOnceElasticJobListener 通过 guarantee 功能,实现分布式场景中仅单一节点执行。

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门

2. ElasticJobListener

ElasticJobListener,作业监听器接口,每台作业节点均执行

若作业处理作业服务器的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单,且无需考虑全局分布式任务是否完成,请尽量使用此类型监听器。

接口代码如下:

public interface ElasticJobListener {

/**
* 作业执行前的执行的方法.
*
* @param shardingContexts 分片上下文
*/
void beforeJobExecuted(final ShardingContexts shardingContexts);

/**
* 作业执行后的执行的方法.
*
* @param shardingContexts 分片上下文
*/
void afterJobExecuted(final ShardingContexts shardingContexts);
}

调用执行如下:

// AbstractElasticJobExecutor.java
public final void execute() {
// ...省略无关代码

// 执行 作业执行前的方法
try {
jobFacade.beforeJobExecuted(shardingContexts);
} catch (final Throwable cause) {
jobExceptionHandler.handleException(jobName, cause);
}
// ...省略无关代码(执行 普通触发的作业)
// ...省略无关代码(执行 被跳过触发的作业)
// ...省略无关代码(执行 作业失效转移)

// ...执行 作业执行后的方法
try {
jobFacade.afterJobExecuted(shardingContexts);
} catch (final Throwable cause) {
jobExceptionHandler.handleException(jobName, cause);
}
}

  • JobFacade 对作业监听器简单封装进行调用。

    // LiteJobFacade.java
    @Override
    public void beforeJobExecuted(final ShardingContexts shardingContexts) {
    for (ElasticJobListener each : elasticJobListeners) {
    each.beforeJobExecuted(shardingContexts);
    }
    }

    @Override
    public void afterJobExecuted(final ShardingContexts shardingContexts) {
    for (ElasticJobListener each : elasticJobListeners) {
    each.afterJobExecuted(shardingContexts);
    }
    }

  • 下文提到的 AbstractDistributeOnceElasticJobListener,也是这么调用。

3. AbstractDistributeOnceElasticJobListener

AbstractDistributeOnceElasticJobListener,在分布式作业中只执行一次的监听器。

若作业处理数据库数据,处理完成后只需一个节点完成数据清理任务即可。此类型任务处理复杂,需同步分布式环境下作业的状态同步,提供了超时设置来避免作业不同步导致的死锁,请谨慎使用。

创建 AbstractDistributeOnceElasticJobListener 代码如下:

public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {

/**
* 开始超时时间
*/
private final long startedTimeoutMilliseconds;
/**
* 开始等待对象
*/
private final Object startedWait = new Object();
/**
* 完成超时时间
*/
private final long completedTimeoutMilliseconds;
/**
* 完成等待对象
*/
private final Object completedWait = new Object();
/**
* 保证分布式任务全部开始和结束状态的服务
*/
@Setter
private GuaranteeService guaranteeService;

private TimeService timeService = new TimeService();

public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
if (startedTimeoutMilliseconds <= 0L) {
this.startedTimeoutMilliseconds = Long.MAX_VALUE;
} else {
this.startedTimeoutMilliseconds = startedTimeoutMilliseconds;
}
if (completedTimeoutMilliseconds <= 0L) {
this.completedTimeoutMilliseconds = Long.MAX_VALUE;
} else {
this.completedTimeoutMilliseconds = completedTimeoutMilliseconds;
}
}
}

  • 超时参数 startedTimeoutMillisecondscompletedTimeoutMilliseconds 务必传递,避免作业不同步导致的死锁。

👇下面,我们来看本文的重点:AbstractDistributeOnceElasticJobListener,在分布式作业中只执行一次:

@Override
public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
// 注册作业分片项开始运行
guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());
// 判断是否所有的分片项开始运行
if (guaranteeService.isAllStarted()) {
// 执行
doBeforeJobExecutedAtLastStarted(shardingContexts);
// 清理启动信息
guaranteeService.clearAllStartedInfo();
return;
}
// 等待
long before = timeService.getCurrentMillis();
try {
synchronized (startedWait) {
startedWait.wait(startedTimeoutMilliseconds);
}
} catch (final InterruptedException ex) {
Thread.interrupted();
}
// 等待超时
if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {
// 清理启动信息
guaranteeService.clearAllStartedInfo();
handleTimeout(startedTimeoutMilliseconds);
}
}

  • 调用 GuaranteeService#registerStart(...) 方法,注册作业分片项开始运行。

    // GuaranteeService.java
    public void registerStart(final Collection<Integer> shardingItems) {
    for (int each : shardingItems) {
    jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each));
    }
    }

    // GuaranteeNode.java
    public final class GuaranteeNode {
    static final String ROOT = "guarantee";
    static final String STARTED_ROOT = ROOT + "/started";
    }

    static String getStartedNode(final int shardingItem) {
    return Joiner.on("/").join(STARTED_ROOT, shardingItem);
    }

    • Zookeeper 数据节点 /${JOB_NAME}/guarantee/started/${ITEM_INDEX}永久节点,存储空串( "" )。为什么是永久节点呢?在 GuaranteeService#isAllStarted() 见分晓。
  • 调用 GuaranteeService#isAllStarted() 方法,判断是否所有的分片项开始运行。

    /**
    * 判断是否所有的任务均启动完毕.
    *
    * @return 是否所有的任务均启动完毕
    */
    public boolean isAllStarted() {
    return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT)
    && configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size();
    }

    • /${JOB_NAME}/guarantee/started/ 目录下,所有作业分片项都开始运行,即运行总数等于作业分片总数( JobCoreConfiguration.ShardingTotalCount ),代表所有的任务均启动完毕
    • 等待所有任务启动过程中,不排除有作业节点会挂掉,如果 /${JOB_NAME}/guarantee/started/${ITEM_INDEX} 存储临时节点,会导致不能满足所有的分片项开始运行的条件。
    • 等待过程中,如果调整作业分片总数( JobCoreConfiguration.ShardingTotalCount ),会导致异常。
  • 当不满足所有的分片项开始运行时,作业节点调用 Object#wait(...) 方法进行等待。该等待怎么结束等待?当满足所有的分片项开始运行的作业节点调用 GuaranteeService#clearAllStartedInfo() 时,StartedNodeRemovedJobListener 会监听到 /${JOB_NAME}/guarantee/started/ 被删除,调用 Object#notifyAll(...) 方法进行唤醒全部。

    // GuaranteeService.java
    /**
    * 清理所有任务启动信息.
    */
    public void clearAllStartedInfo() {
    jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT);
    }

    // StartedNodeRemovedJobListener.java
    class StartedNodeRemovedJobListener extends AbstractJobListener {

    @Override
    protected void dataChanged(final String path, final Type eventType, final String data) {
    if (Type.NODE_REMOVED == eventType && guaranteeNode.isStartedRootNode(path)) {
    for (ElasticJobListener each : elasticJobListeners) {
    if (each instanceof AbstractDistributeOnceElasticJobListener) {
    ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
    }
    }
    }
    }
    }

  • 调用 #doBeforeJobExecutedAtLastStarted(...) 方法,执行最后一个作业执行前的执行的方法,实现该抽象方法,完成自定义逻辑。#doAfterJobExecutedAtLastCompleted(...) 实现的方式一样,就不重复解析了。

    // AbstractDistributeOnceElasticJobListener.java
    /**
    * 分布式环境中最后一个作业执行前的执行的方法.
    *
    * @param shardingContexts 分片上下文
    */
    public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts);

    /**
    * 分布式环境中最后一个作业执行后的执行的方法.
    *
    * @param shardingContexts 分片上下文
    */
    public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts);

  • 整体流程如下图:

666. 彩蛋

知识星球

旁白君:哎哟喂,AbstractDistributeOnceElasticJobListener 还不错哟。
芋道君:那必须必的。

道友,赶紧上车,分享一波朋友圈!

文章目录
  1. 1. 1. 概述
  2. 2. 2. ElasticJobListener
  3. 3. 3. AbstractDistributeOnceElasticJobListener
  4. 4. 666. 彩蛋