为什么要是用MQ

  • 核心:解耦、异步、削峰
    1. 解耦:A系统发送数据到BCD三个系统,通过接口调用发送。如果此时E系统也想要这个数据呢?C系统不想要这个数据了呢?A系统的负责人直接崩溃,因为现在A系统跟其他各种乱七八糟的系统严重耦合在了一起。A系统产生了一条比较关键的数据,很多系统都需要A系统将这个数据发送过来。如果使用MQ,A系统将这条数据发送到MQ里去,哪个系统需要数据自己就去MQ里消费,哪个系统不需要这条数据了,就取消对MQ消息的消费。这样下来,A系统就不用去考虑给谁发送数据了,不需要维护这个代码,也不用考虑别的系统是否调用成功、失败超时等情况。核心思想就是舍弃同步调用其他接口,使用MQ异步化解耦。
    2. 异步:A系统接收到一个请求,需要在自己本地写库,还需要在BCD三个系统写库。自己本地写库要3ms,BCD三个系统分别写库要300ms、400ms、500ms。最终总延时接近1.2s,给用户的体验极差。用户通过浏览器发起请求,如果使用MQ,假如A系统连续发送3条消息到MQ队列中耗时5ms,那么A系统从接受一个请求到返回响应给用户,总时长是3 + 5 = 8ms
    3. 削峰:减少高峰时期对服务器的压力

MQ有什么优缺点

  • 优点上面已经说了:解耦、异步、削峰
  • 缺点如下:
    1. 系统可用性降低:系统引入的外部依赖越多,越容易挂掉,万一MQ挂了,那么整套系统都崩溃了。
    2. 系统复杂度提高:硬生生加个MQ进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
    3. 一致性问题:A系统处理完了直接返回成功了,用户就真的以为你这个请求成功了。但是问题是:要是BCD三个系统里,BC两个系统写库成功了,D系统写库失败了,那么此时数据就会出现不一致性问题。

Kafka、ActiveMQ、RabbitMQ、RocketMQ都有什么区别?

  1. 从吞吐量来说,Kafka和RocketMQ支持高吞吐,ActiveMQ和RabbitMQ比它们低一个数量级。对于延迟量来说,RabbitMQ是最低的
  2. 从社区活跃度来说,RabbitMQ是首选
  3. 持久化消息比较:ActiveMQ和RabbitMQ都支持。持久化消息主要是指我们的机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。
  4. 综合技术实现:可靠性、灵活的路由、集群、事务、高可用队列、消息排序、问题追踪、可视化管理工具、插件系统等。RabbitMQ和Kafka最好
  5. 高并发:毋庸置疑,RabbitMQ最高,原因是它的实现语言是天生具备高并发高可用的erlang语言

如何保证高可用

  • RabbitMQ是基于主从做高可用性的,RabbitMQ有三种模式:单机模式、普通集群模式、镜像集群模式。
    1. 单机模式:就是Demo级别的,一般就是本地启动玩玩儿的,生产环境没人用单机模式
    2. 普通集群模式:在多台机器上启动多个RabbitMQ实例,每个机器启动一个。你创建的queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue的元数据(元数据可以认为是queue的一些配置信息,通过元数据可以找到queue所在实例)。在消费消息的时候,如果连接到了别的RabbitMQ实例,那么这个示例会从queue所在的实例上拉取数据过来。这个方案主要是为了提高吞吐量的,让集群中的多个节点来服务某个queue的读写操作。
    3. 镜像集群模式:这种模式才是所谓的RabbitMQ高可用模式。跟普通集群不一样的是,在镜像集群模式下,你创建的queue,无论是元数据还是queue里的消息,都会存在于多个实例上,就是说,每个RabbitMQ节点都有这个queue的完整镜像,包含queue的全部数据。每次写消息到queue的时候,都会自动把消息同步到多个实例的queue上。RabbitMQ的管理控制台可以在后台增加一个集群模式的策略,可以要求数据同步到所有节点,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的好处在于,任何一个机器宕机了,其他机器还有包含了这个queue的完整数据,别的消费者可以到其他的节点上去消费数据。坏处在于,1.性能开销太大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重。2. 扩展性差:如果某个 Queue 负载很重,即便加机器,新增的机器也包含了这个 Queue 的所有数据,并没有办法线性扩展你的 Queue。

如何保证消息传输的可靠传输?消息丢失怎么办?

  • 首先这个消息丢失,有三处都可能发生
    1. 生产者丢失:生产者将数据发送到RabbitMQ的时候,由于网络的原因,可能数据半路就丢了。此时可以选择使用RabbitMQ提供的事务功能,就是生产者发送数据之前开启RabbitMQ事务channel.txSelect,然后发送消息,如果消息没有被RabbitMQ接收到,那么生产者会收到异常报错,此时可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。吞吐量会下来,因为太耗性能。所以一般来说,如果你要确保写RabbitMQ的消息别丢,在生产者那里设置开启confirm模式,每次写消息都会分配一个唯一id,如果写入了RabbitMQ中,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok了。如果RabbitMQ没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间没有接收到这个消息的回调,那么你可以重发。
    2. MQ中丢失:这种情况必须开启RabbitMQ的持久化,就是消息写入之后会持久化到硬盘,哪怕RabbitMQ自己挂了,恢复之后也会自动读取之前存储的数据,一般数据不会丢失。设置持久化有两个步骤:第一个是创建queue的时候将其设置为持久化,这样可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里的数据。第二个是发送消息的时候,将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。必须同时设置这两个持久化才行,
    3. 消费端丢失:消费的时候,刚消费到,还没处理,此时服务挂了,服务重启了,那么就尴尬了,RabbitMQ人为你消费了,这数据就丢了。这个时候需要使用RabbitMQ提供的ack机制,简单来说就是,关闭RabbitMQ的自动ack,然后在服务处理消息完毕后再手动ack,这样的话,如果消息还没处理完,那么就不会ack,消息就不会丢了。

如何保证消息的顺序性

  • 拆分多个 Queue,每个 Queue一个 Consumer;或者就一个 Queue 但是对应一个 Consumer,然后这个 Consumer 内部用内存队列做排队,然后分发给底层不同的 Worker 来处理。

大量消息在MQ里长时间积压,该如何解决?

  • 一般这个时候,只能临时紧急扩容了,具体操作步骤思路如下
    1. 先修复consumer的问题,确保其恢复消费速度,然后将现有的consumer都停掉。
    2. 新建一个topic,partition是原来的10倍,临时建立好原先10倍的queue数量。
    3. 写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。
    4. 接着临时用10倍的机器来部署consumer,每一批consumer消费一个临时queue数据,这样的做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
    5. 等快速消费完积压数据后,得回复原先部署的架构,重新用原先的consumer机器来消费消息。

MQ中规定消息过期失效了怎么办?

  • 如果使用的是RabbitMQ,RabbitMQ是可以设置过期时间的(TTL)。如果消息在queue中积压超过一定时间就会被RabbitMQ给清理掉,这个数据就没了。此时的问题就不是数据大量积压在MQ里,而是大量的数据会直接搞丢。这个情况下,解决方案就不是增加consumer消费积压的信息了,而是需要批量重导,当大量积压的时候,直接将数据写到数据库,等过了高峰期之后在将这批数据一点一点查出来,重新写入到MQ中,将丢的数据补回来。