前言

背景

  • 项目里原本是使用Kafka当消息队列的,但是组内大师觉得Kafka太重了,只要确保订单信息都持久化到数据库里,每次重启的时候重新加载任务,那么其实是可以自己写一个消息队列来替代Kafka的
  • 下面是我自己写的一个任务队列执行器,使用了ArrayBlockingQueue来存储任务,并创建一个单独的线程来处理队列中的任务。并且允许指定处理任务的处理器(Processor),可以根据需要执行不同的任务。
  • 关于TaskQueueExecutor的构造函数,我这里自定义了一个函数式接口,但是其实也可以直接用Java内置的函数式接口:Function,写法类似于public TaskQueueExecutor(String threadName, int queueSize, Function<T, R> processor) {}
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    @Slf4j
    public class TaskQueueExecutor<T> {
    // 为记录消息和错误信息定义一个日志记录器

    // 用于存储类型为T的任务的阻塞队列
    private final ArrayBlockingQueue<T> queue;

    // 用于处理任务的独立线程
    private final Thread msgLooper;

    // 用于处理任务的处理器接口
    private final Processor<T> processor;

    // 用于跟踪队列中任务的数量的AtomicInteger是原子整数类型,可以避免并发环境下的线程安全问题
    private final AtomicInteger taskNum;

    // 定义一个处理任务的接口
    public interface Processor<T> {
    void process(T task);
    }

    // 构造函数用于初始化TaskQueueExecutor
    public TaskQueueExecutor(String threadName, int queueSize, Processor<T> processor) {
    // 使用给定的大小初始化阻塞队列
    this.queue = new ArrayBlockingQueue<T>(queueSize);

    // 存储用于处理任务的处理器
    this.processor = processor;

    // 创建一个新线程用于消息处理
    this.msgLooper = new Thread(this::looper);

    // 设置线程名称,我这里是根据业务名称来设置的,这样排查问题会比较方便
    this.msgLooper.setName(threadName);

    // 启动消息处理线程
    this.msgLooper.start();

    // 初始化taskNum
    taskNum = new AtomicInteger(0);
    }

    // 方法用于将消息(任务)发送到队列
    public void sendMessage(T task) {
    try {
    // 将任务放入队列
    this.queue.put(task);

    // 增加任务计数
    taskNum.incrementAndGet();
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }

    // 方法用于获取队列中的任务数量
    public int getTaskNum() {
    return taskNum.get();
    }

    // 主要的消息处理循环
    private void looper() {
    while (true) {
    try {
    // 从队列中获取任务(如果队列为空,则阻塞)
    T task = queue.take();

    // 使用提供的处理器处理任务
    processor.process(task);
    } catch (InterruptedException e) {
    // 如果线程被中断,则中断循环
    break;
    } catch (Exception e) {
    // 处理任务处理过程中出现的任何异常
    log.error(String.format("TaskQueueExecutor %s run task exception",
    this.msgLooper.getName()), e);
    } finally {
    // 即使发生异常,也会在finally块中减少任务计数
    taskNum.decrementAndGet();
    }
    }
    }
    }
  • QueueService:是用于管理不同类型的任务队列。(我这里目前就俩任务队列)
  • @PostConstruct init():使用注解来标记初始化方法,该方法在类实例化后自动调用,主要是初始化了两个ArrayList,分别用于存储不同类型的任务队列执行器,同时可以通过配置项来判断是否需要从数据库加载未完成任务。
  • sendMessage():用于将订单添加到适当的队列执行器,根据订单的类型选择合适的执行器列表,计算任务最少的分区并添加订单。
  • getQueueSize(): 用于获取特定订单的队列大小,根据订单类型选择相应的队列执行器列表并返回队列大小。
  • getTaskQueueExecutors():根据订单类型返回相应的任务队列执行器。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    @Service
    @Slf4j
    public class QueueService {

    // 引入依赖的服务和组件
    @Resource
    private ThirdManager thirdManager;

    @Resource
    private QueueConfig queueConfig;

    @Resource
    private OrderMapper orderMapper;

    @Resource
    private OrderConsumerService orderConsumerService;

    // 定义两个ArrayList,用于存储不同类型的任务队列执行器,这里可以根据自身业务需求来定义多个。
    private ArrayList<TaskQueueExecutor<Torder>> paperTaskExecutors;

    private ArrayList<TaskQueueExecutor<Torder>> textTaskExecutors;

    // 初始化方法,在类实例化后自动调用,创建信息从配置文件中读取。
    @PostConstruct
    public void init() {
    // 加载配置,设置队列大小和处理器
    for (int idx = 0; idx < queueConfig.getPaperTopicPartitionsNum(); idx++) {
    int partition = idx;
    paperTaskExecutors.add(new TaskQueueExecutor<>(
    String.format("%s-%d", queueConfig.getPaperTopic(), partition),
    1000,
    torder -> {
    orderConsumerService.paperConsumeMessage(torder, partition);
    }
    ));
    }

    // 加载配置,设置队列大小和处理器
    for (int idx = 0; idx < queueConfig.getTextTopicPartitionsNum(); idx++) {
    int partition = idx;
    textTaskExecutors.add(new TaskQueueExecutor<>(
    String.format("%s-%d", queueConfig.getTextTopic(), partition),
    1000,
    torder -> {
    orderConsumerService.textConsumeMessage(torder, partition);
    }
    ));
    }

    // 每次重启的时候,从数据库加载未完成订单,可以通过配置项配置,生产环境加载未完成任务,开发环境不加载
    if (queueConfig.isNeedLoadFromDB()) {
    // 查询数据库中未处理的订单。
    List<Torder> orders = orderMapper.selectList(Wrappers.lambdaQuery(Torder.class).eq(Torder::getStatus, 1));
    for (Torder torder : orders) {
    log.info("添加未处理订单:{}", torder);
    // 将这些订单添加到适当的消息队列,以便后续处理。
    sendMessage(torder);
    }
    }
    }

    // 用于将订单消息发送到适当的队列执行器。
    public void sendMessage(Torder torder) {
    var executorOptional = getTaskQueueExecutors(torder);
    executorOptional.ifPresent(executors -> {
    // stream操作,用于寻找任务数最少的队列,默认值为0
    int partition = IntStream.range(0, executors.size())
    .reduce((i, j) -> executors.get(i).getTaskNum() > executors.get(j).getTaskNum() ? j : i)
    .orElse(0);
    int taskNum = executors.get(partition).getTaskNum();

    // 这里是我的业务逻辑
    torder.setZone(partition);
    orderMapper.updateById(torder);
    log.info("当前消息入队列,所在分区{}, 当前订单号为{}, 前面还有{}人在排队",
    partition, torder.getOrderId(), executors.get(partition).getTaskNum());
    if (taskNum > 100) {
    thirdManager.sendExceptionToFeiShu("报告降重,分区:【" + partition + "】当前排队人数已达:【" + taskNum + "】人次");
    }

    // 添加订单到选定的队列执行器。
    executors.get(partition).sendMessage(torder);
    });
    }

    // 方法用于获取特定订单的队列大小(即队列中等待处理的订单数量)。
    public int getQueueSize(Torder torder) {
    var executorOptional = getTaskQueueExecutors(torder);
    return executorOptional.map(taskQueueExecutors ->
    taskQueueExecutors.get(torder.getZone()).getTaskNum())
    .orElse(0);
    }

    // 方法用于根据订单类型获取相应的任务队列执行器列表。
    private Optional<ArrayList<TaskQueueExecutor<Torder>>> getTaskQueueExecutors(Torder torder) {
    ArrayList<TaskQueueExecutor<Torder>> executors;
    if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_PAPER)) {
    executors = paperTaskExecutors;
    } else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_TEXT)) {
    executors = textTaskExecutors;
    } else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_EXPAND)) {
    executors = textTaskExecutors;
    } else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_RELINE)) {
    executors = textTaskExecutors;
    } else {
    log.error("Error PolishType: {}", torder.getPolishType());
    return Optional.empty();
    }
    return Optional.of(executors);
    }
    }