前言

  • 本文涉及到了大量JDK 8的新特性,对这部分不太了解的,可以参考我之前写的这篇文章

背景

  • 项目里原本是使用Kafka当消息队列的,但是组内大师觉得Kafka太重了,只要确保订单信息都持久化到数据库里,每次重启的时候重新加载任务,那么其实是可以自己写一个消息队列来替代Kafka的
  • 下面是我自己写的一个任务队列执行器,使用了ArrayBlockingQueue来存储任务,并创建一个单独的线程来处理队列中的任务。并且允许指定处理任务的处理器(Processor),可以根据需要执行不同的任务。
  • 关于TaskQueueExecutor的构造函数,我这里自定义了一个函数式接口,但是其实也可以直接用Java内置的函数式接口:Function,写法类似于public TaskQueueExecutor(String threadName, int queueSize, Function<T, R> processor) {}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Slf4j
public class TaskQueueExecutor<T> {
// 为记录消息和错误信息定义一个日志记录器

// 用于存储类型为T的任务的阻塞队列
private final ArrayBlockingQueue<T> queue;

// 用于处理任务的独立线程
private final Thread msgLooper;

// 用于处理任务的处理器接口
private final Processor<T> processor;

// 用于跟踪队列中任务的数量的AtomicInteger是原子整数类型,可以避免并发环境下的线程安全问题
private final AtomicInteger taskNum;

// 定义一个处理任务的接口
public interface Processor<T> {
void process(T task);
}

// 构造函数用于初始化TaskQueueExecutor
public TaskQueueExecutor(String threadName, int queueSize, Processor<T> processor) {
// 使用给定的大小初始化阻塞队列
this.queue = new ArrayBlockingQueue<T>(queueSize);

// 存储用于处理任务的处理器
this.processor = processor;

// 创建一个新线程用于消息处理
this.msgLooper = new Thread(this::looper);

// 设置线程名称,我这里是根据业务名称来设置的,这样排查问题会比较方便
this.msgLooper.setName(threadName);

// 启动消息处理线程
this.msgLooper.start();

// 初始化taskNum
taskNum = new AtomicInteger(0);
}

// 方法用于将消息(任务)发送到队列
public void sendMessage(T task) {
try {
// 将任务放入队列
this.queue.put(task);

// 增加任务计数
taskNum.incrementAndGet();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

// 方法用于获取队列中的任务数量
public int getTaskNum() {
return taskNum.get();
}

// 主要的消息处理循环
private void looper() {
while (true) {
try {
// 从队列中获取任务(如果队列为空,则阻塞)
T task = queue.take();

// 使用提供的处理器处理任务
processor.process(task);
} catch (InterruptedException e) {
// 如果线程被中断,则中断循环
break;
} catch (Exception e) {
// 处理任务处理过程中出现的任何异常
log.error(String.format("TaskQueueExecutor %s run task exception",
this.msgLooper.getName()), e);
} finally {
// 即使发生异常,也会在finally块中减少任务计数
taskNum.decrementAndGet();
}
}
}
}
  • QueueService:是用于管理不同类型的任务队列。(我这里目前就俩任务队列)
  • @PostConstruct init():使用注解来标记初始化方法,该方法在类实例化后自动调用,主要是初始化了两个ArrayList,分别用于存储不同类型的任务队列执行器,同时可以通过配置项来判断是否需要从数据库加载未完成任务。
  • sendMessage():用于将订单添加到适当的队列执行器,根据订单的类型选择合适的执行器列表,计算任务最少的分区并添加订单。
  • getQueueSize(): 用于获取特定订单的队列大小,根据订单类型选择相应的队列执行器列表并返回队列大小。
  • getTaskQueueExecutors():根据订单类型返回相应的任务队列执行器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@Service
@Slf4j
public class QueueService {

// 引入依赖的服务和组件
@Resource
private ThirdManager thirdManager;

@Resource
private QueueConfig queueConfig;

@Resource
private OrderMapper orderMapper;

@Resource
private OrderConsumerService orderConsumerService;

// 定义两个ArrayList,用于存储不同类型的任务队列执行器,这里可以根据自身业务需求来定义多个。
private ArrayList<TaskQueueExecutor<Torder>> paperTaskExecutors;

private ArrayList<TaskQueueExecutor<Torder>> textTaskExecutors;

// 初始化方法,在类实例化后自动调用,创建信息从配置文件中读取。
@PostConstruct
public void init() {
// 加载配置,设置队列大小和处理器
for (int idx = 0; idx < queueConfig.getPaperTopicPartitionsNum(); idx++) {
int partition = idx;
paperTaskExecutors.add(new TaskQueueExecutor<>(
String.format("%s-%d", queueConfig.getPaperTopic(), partition),
1000,
torder -> {
orderConsumerService.paperConsumeMessage(torder, partition);
}
));
}

// 加载配置,设置队列大小和处理器
for (int idx = 0; idx < queueConfig.getTextTopicPartitionsNum(); idx++) {
int partition = idx;
textTaskExecutors.add(new TaskQueueExecutor<>(
String.format("%s-%d", queueConfig.getTextTopic(), partition),
1000,
torder -> {
orderConsumerService.textConsumeMessage(torder, partition);
}
));
}

// 每次重启的时候,从数据库加载未完成订单,可以通过配置项配置,生产环境加载未完成任务,开发环境不加载
if (queueConfig.isNeedLoadFromDB()) {
// 查询数据库中未处理的订单。
List<Torder> orders = orderMapper.selectList(Wrappers.lambdaQuery(Torder.class).eq(Torder::getStatus, 1));
for (Torder torder : orders) {
log.info("添加未处理订单:{}", torder);
// 将这些订单添加到适当的消息队列,以便后续处理。
sendMessage(torder);
}
}
}

// 用于将订单消息发送到适当的队列执行器。
public void sendMessage(Torder torder) {
var executorOptional = getTaskQueueExecutors(torder);
executorOptional.ifPresent(executors -> {
// stream操作,用于寻找任务数最少的队列,默认值为0
int partition = IntStream.range(0, executors.size())
.reduce((i, j) -> executors.get(i).getTaskNum() > executors.get(j).getTaskNum() ? j : i)
.orElse(0);
int taskNum = executors.get(partition).getTaskNum();

// 这里是我的业务逻辑
torder.setZone(partition);
orderMapper.updateById(torder);
log.info("当前消息入队列,所在分区{}, 当前订单号为{}, 前面还有{}人在排队",
partition, torder.getOrderId(), executors.get(partition).getTaskNum());
if (taskNum > 100) {
thirdManager.sendExceptionToFeiShu("报告降重,分区:【" + partition + "】当前排队人数已达:【" + taskNum + "】人次");
}

// 添加订单到选定的队列执行器。
executors.get(partition).sendMessage(torder);
});
}

// 方法用于获取特定订单的队列大小(即队列中等待处理的订单数量)。
public int getQueueSize(Torder torder) {
var executorOptional = getTaskQueueExecutors(torder);
return executorOptional.map(taskQueueExecutors ->
taskQueueExecutors.get(torder.getZone()).getTaskNum())
.orElse(0);
}

// 方法用于根据订单类型获取相应的任务队列执行器列表。
private Optional<ArrayList<TaskQueueExecutor<Torder>>> getTaskQueueExecutors(Torder torder) {
ArrayList<TaskQueueExecutor<Torder>> executors;
if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_PAPER)) {
executors = paperTaskExecutors;
} else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_TEXT)) {
executors = textTaskExecutors;
} else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_EXPAND)) {
executors = textTaskExecutors;
} else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_RELINE)) {
executors = textTaskExecutors;
} else {
log.error("Error PolishType: {}", torder.getPolishType());
return Optional.empty();
}
return Optional.of(executors);
}
}