1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| @Service @Slf4j public class QueueService {
@Resource private ThirdManager thirdManager;
@Resource private QueueConfig queueConfig;
@Resource private OrderMapper orderMapper;
@Resource private OrderConsumerService orderConsumerService;
private ArrayList<TaskQueueExecutor<Torder>> paperTaskExecutors;
private ArrayList<TaskQueueExecutor<Torder>> textTaskExecutors;
@PostConstruct public void init() { for (int idx = 0; idx < queueConfig.getPaperTopicPartitionsNum(); idx++) { int partition = idx; paperTaskExecutors.add(new TaskQueueExecutor<>( String.format("%s-%d", queueConfig.getPaperTopic(), partition), 1000, torder -> { orderConsumerService.paperConsumeMessage(torder, partition); } )); }
for (int idx = 0; idx < queueConfig.getTextTopicPartitionsNum(); idx++) { int partition = idx; textTaskExecutors.add(new TaskQueueExecutor<>( String.format("%s-%d", queueConfig.getTextTopic(), partition), 1000, torder -> { orderConsumerService.textConsumeMessage(torder, partition); } )); }
if (queueConfig.isNeedLoadFromDB()) { List<Torder> orders = orderMapper.selectList(Wrappers.lambdaQuery(Torder.class).eq(Torder::getStatus, 1)); for (Torder torder : orders) { log.info("添加未处理订单:{}", torder); sendMessage(torder); } } }
public void sendMessage(Torder torder) { var executorOptional = getTaskQueueExecutors(torder); executorOptional.ifPresent(executors -> { int partition = IntStream.range(0, executors.size()) .reduce((i, j) -> executors.get(i).getTaskNum() > executors.get(j).getTaskNum() ? j : i) .orElse(0); int taskNum = executors.get(partition).getTaskNum();
torder.setZone(partition); orderMapper.updateById(torder); log.info("当前消息入队列,所在分区{}, 当前订单号为{}, 前面还有{}人在排队", partition, torder.getOrderId(), executors.get(partition).getTaskNum()); if (taskNum > 100) { thirdManager.sendExceptionToFeiShu("报告降重,分区:【" + partition + "】当前排队人数已达:【" + taskNum + "】人次"); }
executors.get(partition).sendMessage(torder); }); }
public int getQueueSize(Torder torder) { var executorOptional = getTaskQueueExecutors(torder); return executorOptional.map(taskQueueExecutors -> taskQueueExecutors.get(torder.getZone()).getTaskNum()) .orElse(0); }
private Optional<ArrayList<TaskQueueExecutor<Torder>>> getTaskQueueExecutors(Torder torder) { ArrayList<TaskQueueExecutor<Torder>> executors; if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_PAPER)) { executors = paperTaskExecutors; } else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_TEXT)) { executors = textTaskExecutors; } else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_EXPAND)) { executors = textTaskExecutors; } else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_RELINE)) { executors = textTaskExecutors; } else { log.error("Error PolishType: {}", torder.getPolishType()); return Optional.empty(); } return Optional.of(executors); } }
|