基于图的任务调度算法
写在最前
- 灵感来源是下面这篇文章
任务调度策略简介
- 在传统的任务执行环境中,任务通常只需要按序执行,只考虑最基本的依赖关系,采用线性执行。
- 但是在复杂系统中,任务往往依赖于其他任务的完成,从而形成复杂的依赖链。为了确保任务能够按正确的顺序执行,有效管理这些依赖关系至关重要。这样不仅可以避免系统瓶颈,还能够优化资源利用率,确保所有任务按计划完成。
- 我这里实现任务调度的逻辑是基于图结构的思想,其中每个任务都被表示为有向无环图(DAG)中的一个节点,节点之间的边表示任务之间的依赖关系,这种方法是我们能够轻松的建模出复杂的以来结构,为复杂的任务调度场景提供了一种可行的解决方案。
调度器实现概述
- 在本部分中,我将详细介绍一下使用
Java
实现的任务调度器,该调度器采用了基于有向无环图(DAG)的任务调度逻辑。为了提升系统性能和任务执行效率,调度器通过线程池和异步操作来并行处理多个任务。线程池
:调度器利用ExecutorService
创建了一个线程池,线程池的大小根据系统的处理器核心数量动态调整。这样一来,多个任务可以同时执行,从而最大限度地利用系统资源,提升任务的处理速度。异步操作
:在任务的调度过程中,调度器将任务提交到线程池中执行,利用Future
对象来管理异步任务的执行结果。每个任务在执行完成后,会更新其依赖的子任务的状态,并将已准备好的子任务添加到队列中进行进一步的处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
public class MyTaskScheduler {
// 这里是根据我自身业务需求,自定义的任务执行器
private MyTaskExecutor myTaskExecutor;
// 图结构,key是任务ID,value是其下属子任务
private final Map<String, List<String>> graph = new ConcurrentHashMap<>();
// 节点入度map,key是任务ID,value是节点的度
private final Map<String, Integer> inDegree = new ConcurrentHashMap<>();
// 任务函数map,key是任务ID,value是该任务的执行函数
private final Map<String, TaskFunction> taskFunctions = new ConcurrentHashMap<>();
// 函数式接口
public interface TaskFunction {
void execute(MyTaskExecutor myTaskExecutor) throws Exception;
}
// 添加任务,需指定任务ID和任务执行函数
public void addTask(String taskId, TaskFunction taskFunction) {
// 向图结构中加入任务节点
graph.putIfAbsent(taskId, new ArrayList<>());
// 初始度为0,即没有依赖
inDegree.putIfAbsent(taskId, 0);
// 存储该任务的执行函数
taskFunctions.put(taskId, taskFunction);
}
// 添加依赖,taskId 任务依赖于 dependencyTaskId 任务
public void addDependency(String taskId, String dependencyTaskId) {
graph.putIfAbsent(dependencyTaskId, new ArrayList<>());
graph.get(dependencyTaskId).add(taskId);
// 增加子任务的入度
inDegree.put(taskId, inDegree.getOrDefault(taskId, 0) + 1);
}
// 删除任务
public void deleteTask(String taskId) {
graph.remove(taskId);
inDegree.remove(taskId);
taskFunctions.remove(taskId);
}
// BFS遍历
public void executeTasksInOrder() {
Queue<String> queue = new LinkedList<>();
// 向队列中添加所有度为0的任务,这些任务无需依赖,可直接执行
for (String task : inDegree.keySet()) {
if (inDegree.get(task) == 0) {
queue.add(task);
}
}
// 开个线程池,这里按需调整你的线程池大小
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
// 处理队列中的任务
while (!queue.isEmpty()) {
List<Future<String>> futures = new ArrayList<>();
int size = queue.size();
for (int i = 0; i < size; i++) {
// 取出任务
String task = queue.poll();
Future<String> future = executorService.submit(() -> {
// 异步执行
TaskFunction taskFunction = taskFunctions.get(task);
if (taskFunction != null) {
taskFunction.execute(myTaskExecutor);
}
return task;
});
futures.add(future);
}
// 等待所有任务结束并处理依赖关系
for (Future<String> future : futures) {
try {
// 遍历所有已完成的任务
String completedTask = future.get();
for (String dependent : graph.get(completedTask)) {
// 将依赖任务入度 -1
inDegree.put(dependent, inDegree.get(dependent) - 1);
// 如果入度为0,添加到队列
if (inDegree.get(dependent) == 0) {
queue.add(dependent);
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
} finally {
executorService.shutdown();
}
}
}
自定义任务执行器
- 这里需要根据自身的业务需求,来自定义任务执行器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyTaskExecutor {
public interface CreateTaskExecutor {
String executor() throws Exception;
}
public interface LoopTaskExecutor {
Wrapper<ReportVO> executor() throws Exception;
}
// 暴露给任务调度器的函数,这里面写你自己的业务逻辑即可
public void executeAndMonitorTask(CreateTaskExecutor createTask, LoopTaskExecutor monitorTask, SubOrder subOrder, String taskName) throws Exception {
createTaskExecutor4SubOrder(createTask, subOrder, taskName);
loopTaskExecutor4SubOrder(monitorTask, subOrder, taskName);
}
// 创建任务
public void createTaskExecutor4SubOrder(CreateTaskExecutor taskExecutor, SubOrder subOrder, String taskName) throws Exception {
}
// 轮询任务
public void loopTaskExecutor4SubOrder(LoopTaskExecutor taskExecutor, SubOrder subOrder, String taskName) throws Exception {
}
}
评论