前言

  • 本文的灵感来源于
  • 基于该文中的任务调度思想,我设计并实现了一套基于图结构的任务调度器,主要应用于复杂系统中的任务管理与执行。
  • 下文将详细介绍该调度器的设计理念、核心实现以及如何将任务依赖关系存储到数据库中,以提升系统的稳定性和可维护性。

任务调度策略简介

  • 在传统的任务执行环境中,任务通常按序执行,依赖关系较为简单,因此可以采用线性执行的方式。但在复杂系统中,任务之间往往存在多层次的依赖关系,形成复杂的依赖链。如果没有合理的任务调度策略,系统容易出现瓶颈,资源也难以得到充分利用。因此,设计一个能够有效管理任务依赖关系的调度器至关重要,这不仅可以避免资源浪费,还能保证所有任务按计划顺利完成。
  • 本文实现的任务调度逻辑基于有向无环图(DAG),将每个任务抽象为图中的节点,节点之间的边表示任务之间的依赖关系。通过这种方式,可以很好地建模复杂的任务依赖结构,提供了处理复杂调度场景的解决方案。

调度器的设计与实现

线程池与异步执行

  • 为了提升任务调度器的执行效率,采用了Java的线程池和异步操作来实现并发任务的执行
    • 线程池:调度器通过ExecutorService创建了一个线程池,线程池的大小根据系统的处理器核心数量动态调整(根据自身业务需求调整)。确保任务能并行执行,从而最大限度利用系统资源,提升任务处理效率。
    • 异步操作:调度过程中,任务被提交到线程池中异步执行,使用Future对象管理每个任务的执行结果。当任务完成时,会自动更新依赖其的子任务状态,并将准备就绪的子任务加入队列,等待进一步处理。
  • 以下是任务调度器的核心实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

@Slf4j
@Component
public class MyTaskScheduler {

// 这里是根据我自身业务需求,自定义的任务执行器
@Resource
private MyTaskExecutor myTaskExecutor;
// 图结构,key是任务ID,value是其下属子任务
private final Map<String, List<String>> graph = new ConcurrentHashMap<>();
// 节点入度map,key是任务ID,value是节点的度
private final Map<String, Integer> inDegree = new ConcurrentHashMap<>();
// 任务函数map,key是任务ID,value是该任务的执行函数
private final Map<String, TaskFunction> taskFunctions = new ConcurrentHashMap<>();

// 函数式接口
public interface TaskFunction {
void execute(MyTaskExecutor myTaskExecutor) throws Exception;
}

// 添加任务,需指定任务ID和任务执行函数
public void addTask(String taskId, TaskFunction taskFunction) {
// 向图结构中加入任务节点
graph.putIfAbsent(taskId, new ArrayList<>());
// 初始度为0,即没有依赖
inDegree.putIfAbsent(taskId, 0);
// 存储该任务的执行函数
taskFunctions.put(taskId, taskFunction);
}

// 添加依赖,taskId 任务依赖于 dependencyTaskId 任务
public void addDependency(String taskId, String dependencyTaskId) {
graph.putIfAbsent(dependencyTaskId, new ArrayList<>());
graph.get(dependencyTaskId).add(taskId);
// 增加子任务的入度
inDegree.put(taskId, inDegree.getOrDefault(taskId, 0) + 1);
}

// 删除任务
public void deleteTask(String taskId) {
graph.remove(taskId);
inDegree.remove(taskId);
taskFunctions.remove(taskId);
}


// BFS遍历
public void executeTasksInOrder() {
Queue<String> queue = new LinkedList<>();

// 向队列中添加所有度为0的任务,这些任务无需依赖,可直接执行
for (String task : inDegree.keySet()) {
if (inDegree.get(task) == 0) {
queue.add(task);
}
}

// 开个线程池,这里按需调整你的线程池大小
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

try {
// 处理队列中的任务
while (!queue.isEmpty()) {
List<Future<String>> futures = new ArrayList<>();
int size = queue.size();

for (int i = 0; i < size; i++) {
// 取出任务
String task = queue.poll();
Future<String> future = executorService.submit(() -> {
// 异步执行
TaskFunction taskFunction = taskFunctions.get(task);
if (taskFunction != null) {
taskFunction.execute(myTaskExecutor);
}
return task;
});
futures.add(future);
}

// 等待所有任务结束并处理依赖关系
for (Future<String> future : futures) {
try {
// 遍历所有已完成的任务
String completedTask = future.get();
for (String dependent : graph.get(completedTask)) {
// 将依赖任务入度 -1
inDegree.put(dependent, inDegree.get(dependent) - 1);
// 如果入度为0,添加到队列
if (inDegree.get(dependent) == 0) {
queue.add(dependent);
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
} finally {
executorService.shutdown();
}
}
}

自定义任务执行器

  • 这里需要根据自身的业务需求,来自定义任务执行器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
@Component
public class MyTaskExecutor {

public interface CreateTaskExecutor {
String executor() throws Exception;
}

public interface LoopTaskExecutor {
Wrapper<ReportVO> executor() throws Exception;
}

// 暴露给任务调度器的函数,这里面写你自己的业务逻辑即可
public void executeAndMonitorTask(CreateTaskExecutor createTask, LoopTaskExecutor monitorTask, SubOrder subOrder, String taskName) throws Exception {
createTaskExecutor4SubOrder(createTask, subOrder, taskName);
loopTaskExecutor4SubOrder(monitorTask, subOrder, taskName);
}

public void createTaskExecutor4SubOrder(CreateTaskExecutor taskExecutor, SubOrder subOrder, String taskName) throws Exception {
// 执行创建任务逻辑
}

public void loopTaskExecutor4SubOrder(LoopTaskExecutor taskExecutor, SubOrder subOrder, String taskName) throws Exception {
// 执行轮询任务逻辑
}
}

数据库存储依赖关系

  • 在实际应用中,为了确保任务依赖关系在系统重启后不丢失,我们最好将依赖关系存储到数据库中。在这部分代码中,依赖关系和任务信息被持久化到数据库。通过这样的设计,当系统重新启动时,任务调度器可以从数据库中恢复任务状态,并继续执行未完成的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@Slf4j
@Component
public class MyTaskScheduler {
@Resource
private TasksMapper tasksMapper;

@Resource
private TaskDependencyMapper taskDependencyMapper;

@Resource
private MyTaskExecutor myTaskExecutor;
@Resource
private SubOrderMapper subOrderMapper;

private final Map<String, TaskFunction> taskFunctionStrategies = new ConcurrentHashMap<>();

public interface TaskFunction {
void execute(MyTaskExecutor myTaskExecutor) throws Exception;
}

// 添加任务
public void addTask(String taskId, TaskFunction taskFunction) {
// 检查任务是否已经存在
SubOrder existingSubOrder = subOrderMapper.selectOne(Wrappers.lambdaQuery(SubOrder.class).eq(SubOrder::getOrderId, taskId));
if (existingSubOrder != null) {
if (existingSubOrder.getStatus() == Common.ORDER_FINISH_STATUS) {
log.warn("任务 {} 已存在,跳过添加。", taskId);
}
if (existingSubOrder.getStatus() != Common.ORDER_FINISH_STATUS) {
log.error("任务:{},状态:{},添加到调度系统", taskId, existingSubOrder.getStatus());
existingSubOrder.setStatus(Common.ORDER_SUCCESS_PAY_STATUS);
subOrderMapper.updateById(existingSubOrder);
taskFunctionStrategies.put(taskId, taskFunction);
}
return; // 任务已存在,直接返回
}

taskFunctionStrategies.put(taskId, taskFunction);
}


// 添加节点依赖
public void addDependency(String taskId, String dependencyTaskId) {
// 检查依赖是否已经存在
TaskDependency existingDependency = taskDependencyMapper.selectOne(Wrappers.lambdaQuery(TaskDependency.class)
.eq(TaskDependency::getTaskId, taskId)
.eq(TaskDependency::getDependencyTaskId, dependencyTaskId));

if (existingDependency != null) {
log.warn("任务 {} 的依赖 {} 已存在,跳过添加。", taskId, dependencyTaskId);
return; // 依赖已存在,直接返回
}

// 依赖不存在,添加新依赖
TaskDependency dependency = new TaskDependency()
.setTaskId(taskId)
.setDependencyTaskId(dependencyTaskId)
.setCreateTime(LocalDateTime.now())
.setUpdateTime(LocalDateTime.now());

taskDependencyMapper.insert(dependency);
}


// 执行任务
public void executeTasks(String rootTaskId) throws ExecutionException, InterruptedException {
Queue<String> queue = new LinkedList<>();
if (isRootTask(rootTaskId)) {
queue.add(rootTaskId);
} else {
log.error("当前任务节点不是根节点:{}", rootTaskId);
throw new IllegalArgumentException("Root task ID is not valid or has dependencies.");
}

ExecutorService executorService = Executors.newFixedThreadPool(20);

try {
while (!queue.isEmpty()) {
List<Future<String>> futures = new ArrayList<>();
int size = queue.size();

for (int i = 0; i < size; i++) {
String task = queue.poll();
Future<String> future = executorService.submit(() -> {
TaskFunction taskFunction = taskFunctionStrategies.get(task);
if (taskFunction != null) {
taskFunction.execute(myTaskExecutor);
}
return task;
});
futures.add(future);
}

for (Future<String> future : futures) {
String completedTask = future.get();
List<String> dependents = getDependentTasks(completedTask);
for (String dependent : dependents) {
if (allDependenciesMet(dependent)) {
queue.add(dependent);
}
}
}
}
} finally {
executorService.shutdown();
}
}

private boolean isRootTask(String taskId) {
return taskDependencyMapper.selectCount(Wrappers.lambdaQuery(TaskDependency.class)
.eq(TaskDependency::getTaskId, taskId)) == 0;
}

private List<String> getDependentTasks(String taskId) {
return taskDependencyMapper.selectList(Wrappers.lambdaQuery(TaskDependency.class)
.eq(TaskDependency::getDependencyTaskId, taskId))
.stream().map(TaskDependency::getTaskId).collect(Collectors.toList());
}

private boolean allDependenciesMet(String taskId) {
List<String> dependencyTaskIds = taskDependencyMapper.selectList(Wrappers.lambdaQuery(TaskDependency.class)
.eq(TaskDependency::getTaskId, taskId))
.stream().map(TaskDependency::getDependencyTaskId).collect(Collectors.toList());

List<String> completedTaskIds = subOrderMapper.selectList(Wrappers.lambdaQuery(SubOrder.class)
.eq(SubOrder::getStatus, Common.ORDER_FINISH_STATUS))
.stream().map(SubOrder::getOrderId).collect(Collectors.toList());

return dependencyTaskIds.stream().allMatch(completedTaskIds::contains);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Data
@Accessors(chain = true)
public class Tasks {

private Integer id;

private String taskId;

private String taskName;

private Integer status;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
@Accessors(chain = true)
public class TaskDependency {

private Integer id;

private String taskId;

private String dependencyTaskId;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}