1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| @Slf4j @Component public class MyTaskScheduler { @Resource private TasksMapper tasksMapper;
@Resource private TaskDependencyMapper taskDependencyMapper;
@Resource private MyTaskExecutor myTaskExecutor; @Resource private SubOrderMapper subOrderMapper;
private final Map<String, TaskFunction> taskFunctionStrategies = new ConcurrentHashMap<>();
public interface TaskFunction { void execute(MyTaskExecutor myTaskExecutor) throws Exception; }
public void addTask(String taskId, TaskFunction taskFunction) { SubOrder existingSubOrder = subOrderMapper.selectOne(Wrappers.lambdaQuery(SubOrder.class).eq(SubOrder::getOrderId, taskId)); if (existingSubOrder != null) { if (existingSubOrder.getStatus() == Common.ORDER_FINISH_STATUS) { log.warn("任务 {} 已存在,跳过添加。", taskId); } if (existingSubOrder.getStatus() != Common.ORDER_FINISH_STATUS) { log.error("任务:{},状态:{},添加到调度系统", taskId, existingSubOrder.getStatus()); existingSubOrder.setStatus(Common.ORDER_SUCCESS_PAY_STATUS); subOrderMapper.updateById(existingSubOrder); taskFunctionStrategies.put(taskId, taskFunction); } return; }
taskFunctionStrategies.put(taskId, taskFunction); }
public void addDependency(String taskId, String dependencyTaskId) { TaskDependency existingDependency = taskDependencyMapper.selectOne(Wrappers.lambdaQuery(TaskDependency.class) .eq(TaskDependency::getTaskId, taskId) .eq(TaskDependency::getDependencyTaskId, dependencyTaskId));
if (existingDependency != null) { log.warn("任务 {} 的依赖 {} 已存在,跳过添加。", taskId, dependencyTaskId); return; }
TaskDependency dependency = new TaskDependency() .setTaskId(taskId) .setDependencyTaskId(dependencyTaskId) .setCreateTime(LocalDateTime.now()) .setUpdateTime(LocalDateTime.now());
taskDependencyMapper.insert(dependency); }
public void executeTasks(String rootTaskId) throws ExecutionException, InterruptedException { Queue<String> queue = new LinkedList<>(); if (isRootTask(rootTaskId)) { queue.add(rootTaskId); } else { log.error("当前任务节点不是根节点:{}", rootTaskId); throw new IllegalArgumentException("Root task ID is not valid or has dependencies."); }
ExecutorService executorService = Executors.newFixedThreadPool(20);
try { while (!queue.isEmpty()) { List<Future<String>> futures = new ArrayList<>(); int size = queue.size();
for (int i = 0; i < size; i++) { String task = queue.poll(); Future<String> future = executorService.submit(() -> { TaskFunction taskFunction = taskFunctionStrategies.get(task); if (taskFunction != null) { taskFunction.execute(myTaskExecutor); } return task; }); futures.add(future); }
for (Future<String> future : futures) { String completedTask = future.get(); List<String> dependents = getDependentTasks(completedTask); for (String dependent : dependents) { if (allDependenciesMet(dependent)) { queue.add(dependent); } } } } } finally { executorService.shutdown(); } }
private boolean isRootTask(String taskId) { return taskDependencyMapper.selectCount(Wrappers.lambdaQuery(TaskDependency.class) .eq(TaskDependency::getTaskId, taskId)) == 0; }
private List<String> getDependentTasks(String taskId) { return taskDependencyMapper.selectList(Wrappers.lambdaQuery(TaskDependency.class) .eq(TaskDependency::getDependencyTaskId, taskId)) .stream().map(TaskDependency::getTaskId).collect(Collectors.toList()); }
private boolean allDependenciesMet(String taskId) { List<String> dependencyTaskIds = taskDependencyMapper.selectList(Wrappers.lambdaQuery(TaskDependency.class) .eq(TaskDependency::getTaskId, taskId)) .stream().map(TaskDependency::getDependencyTaskId).collect(Collectors.toList());
List<String> completedTaskIds = subOrderMapper.selectList(Wrappers.lambdaQuery(SubOrder.class) .eq(SubOrder::getStatus, Common.ORDER_FINISH_STATUS)) .stream().map(SubOrder::getOrderId).collect(Collectors.toList());
return dependencyTaskIds.stream().allMatch(completedTaskIds::contains); } }
|