写在最前

任务调度策略简介

  • 在传统的任务执行环境中,任务通常只需要按序执行,只考虑最基本的依赖关系,采用线性执行。
  • 但是在复杂系统中,任务往往依赖于其他任务的完成,从而形成复杂的依赖链。为了确保任务能够按正确的顺序执行,有效管理这些依赖关系至关重要。这样不仅可以避免系统瓶颈,还能够优化资源利用率,确保所有任务按计划完成。
  • 我这里实现任务调度的逻辑是基于图结构的思想,其中每个任务都被表示为有向无环图(DAG)中的一个节点,节点之间的边表示任务之间的依赖关系,这种方法是我们能够轻松的建模出复杂的以来结构,为复杂的任务调度场景提供了一种可行的解决方案。

调度器实现概述

  • 在本部分中,我将详细介绍一下使用Java实现的任务调度器,该调度器采用了基于有向无环图(DAG)的任务调度逻辑。为了提升系统性能和任务执行效率,调度器通过线程池和异步操作来并行处理多个任务。
    • 线程池:调度器利用ExecutorService创建了一个线程池,线程池的大小根据系统的处理器核心数量动态调整。这样一来,多个任务可以同时执行,从而最大限度地利用系统资源,提升任务的处理速度。
    • 异步操作:在任务的调度过程中,调度器将任务提交到线程池中执行,利用Future对象来管理异步任务的执行结果。每个任务在执行完成后,会更新其依赖的子任务的状态,并将已准备好的子任务添加到队列中进行进一步的处理。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Component;

      import javax.annotation.Resource;
      import java.util.*;
      import java.util.concurrent.*;

      @Slf4j
      @Component
      public class MyTaskScheduler {

      // 这里是根据我自身业务需求,自定义的任务执行器
      @Resource
      private MyTaskExecutor myTaskExecutor;
      // 图结构,key是任务ID,value是其下属子任务
      private final Map<String, List<String>> graph = new ConcurrentHashMap<>();
      // 节点入度map,key是任务ID,value是节点的度
      private final Map<String, Integer> inDegree = new ConcurrentHashMap<>();
      // 任务函数map,key是任务ID,value是该任务的执行函数
      private final Map<String, TaskFunction> taskFunctions = new ConcurrentHashMap<>();

      // 函数式接口
      public interface TaskFunction {
      void execute(MyTaskExecutor myTaskExecutor) throws Exception;
      }

      // 添加任务,需指定任务ID和任务执行函数
      public void addTask(String taskId, TaskFunction taskFunction) {
      // 向图结构中加入任务节点
      graph.putIfAbsent(taskId, new ArrayList<>());
      // 初始度为0,即没有依赖
      inDegree.putIfAbsent(taskId, 0);
      // 存储该任务的执行函数
      taskFunctions.put(taskId, taskFunction);
      }

      // 添加依赖,taskId 任务依赖于 dependencyTaskId 任务
      public void addDependency(String taskId, String dependencyTaskId) {
      graph.putIfAbsent(dependencyTaskId, new ArrayList<>());
      graph.get(dependencyTaskId).add(taskId);
      // 增加子任务的入度
      inDegree.put(taskId, inDegree.getOrDefault(taskId, 0) + 1);
      }

      // 删除任务
      public void deleteTask(String taskId) {
      graph.remove(taskId);
      inDegree.remove(taskId);
      taskFunctions.remove(taskId);
      }


      // BFS遍历
      public void executeTasksInOrder() {
      Queue<String> queue = new LinkedList<>();

      // 向队列中添加所有度为0的任务,这些任务无需依赖,可直接执行
      for (String task : inDegree.keySet()) {
      if (inDegree.get(task) == 0) {
      queue.add(task);
      }
      }

      // 开个线程池,这里按需调整你的线程池大小
      ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

      try {
      // 处理队列中的任务
      while (!queue.isEmpty()) {
      List<Future<String>> futures = new ArrayList<>();
      int size = queue.size();

      for (int i = 0; i < size; i++) {
      // 取出任务
      String task = queue.poll();
      Future<String> future = executorService.submit(() -> {
      // 异步执行
      TaskFunction taskFunction = taskFunctions.get(task);
      if (taskFunction != null) {
      taskFunction.execute(myTaskExecutor);
      }
      return task;
      });
      futures.add(future);
      }

      // 等待所有任务结束并处理依赖关系
      for (Future<String> future : futures) {
      try {
      // 遍历所有已完成的任务
      String completedTask = future.get();
      for (String dependent : graph.get(completedTask)) {
      // 将依赖任务入度 -1
      inDegree.put(dependent, inDegree.get(dependent) - 1);
      // 如果入度为0,添加到队列
      if (inDegree.get(dependent) == 0) {
      queue.add(dependent);
      }
      }
      } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
      }
      }
      }
      } finally {
      executorService.shutdown();
      }
      }
      }

自定义任务执行器

  • 这里需要根据自身的业务需求,来自定义任务执行器。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    @Slf4j
    @Component
    public class MyTaskExecutor {

    public interface CreateTaskExecutor {
    String executor() throws Exception;
    }

    public interface LoopTaskExecutor {
    Wrapper<ReportVO> executor() throws Exception;
    }

    // 暴露给任务调度器的函数,这里面写你自己的业务逻辑即可
    public void executeAndMonitorTask(CreateTaskExecutor createTask, LoopTaskExecutor monitorTask, SubOrder subOrder, String taskName) throws Exception {
    createTaskExecutor4SubOrder(createTask, subOrder, taskName);
    loopTaskExecutor4SubOrder(monitorTask, subOrder, taskName);
    }

    // 创建任务
    public void createTaskExecutor4SubOrder(CreateTaskExecutor taskExecutor, SubOrder subOrder, String taskName) throws Exception {

    }

    // 轮询任务
    public void loopTaskExecutor4SubOrder(LoopTaskExecutor taskExecutor, SubOrder subOrder, String taskName) throws Exception {

    }
    }