Skip to content

Commit df2bf91

Browse files
author
minibear2333
committed
[+]消息队列积压,丢失,优先级队列,延迟队列,队列顺序性保证等问题
1 parent 678c6cf commit df2bf91

File tree

5 files changed

+130
-13
lines changed

5 files changed

+130
-13
lines changed

SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* [Java](interview/Java.md)
1616
* [Redis](interview/redis.md)
1717
* [Mysql](interview/mysql.md)
18+
* [消息队列](interview/消息队列.md)
1819
* [网络](interview/网络.md)
1920
* [Docker](interview/docker.md)
2021
* [k8s](interview/k8s.md)

interview/mysql.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,22 @@ issue
254254

255255
来自 [胡慢慢滚雪球](https://www.zhihu.com/question/34781415/answer/767552025)
256256

257+
### 什么是乐观锁和悲观锁
258+
259+
通常mysql中使用的都是悲观锁,分为共享锁和排他锁两种,共享锁也就是读锁,可以多个线程同时读,不能修改;排他锁是写锁,未获得锁的线程需要阻塞
260+
261+
乐观锁:假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。乐观锁适用于读操作多的场景,这样可以提高程序的吞吐量。一般有两种实现方式`CAS`和基于版本号
262+
263+
引用: [什么是乐观锁,什么是悲观锁](https://www.jianshu.com/p/d2ac26ca6525)
264+
265+
### CAS与ABA问题
266+
267+
`CAS`是乐观锁的实现方式之一,`CAS`操作包含三个操作数—— 内存位置的值(V)、预期原值(A)和新值(B),在更新的时候做检查,内存值必须与期望值相同。举个例子,内存值V、期望值A、更新值B,当V == A的时候将V更新为B。
268+
269+
ABA问题,某个值中间被改动又被改回来,`A-B-A`,用`CAS`是无法识别的,考虑通过控制变量值的版本号来保证`CAS`的正确性。具体解决思路就是在变量前追加上版本号,每次变量更新的时候把版本号加一,那么`A - B - A`就会变成`1A - 2B - 3A`
270+
271+
引用:[深入理解CAS](https://www.jianshu.com/p/db5c964a61ee)
272+
257273
### 最后
258274

259275
如果文中有误,欢迎提pr或者issue,**一旦合并或采纳作为贡献奖励可以联系我直接无门槛**加入[技术交流群](https://mp.weixin.qq.com/s/ErQFjJbIsMVGjIRWbQCD1Q)

interview/其他.md

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,18 @@ openstack是一个虚拟化平台,在云计算中承担iaas层的角色,具
3535
* 不是,大部分都是在host指定的机器上在执行
3636
* 如果使用委托,则在委托主机上执行任务,其他机器不会执行
3737

38-
## 服务治理
38+
### 服务治理
3939

40-
### 熔断
40+
以下方式都属于有损方式,需要在架构设计层面对各服务做分流、压缩、缓存等处理
41+
42+
#### 熔断
4143

4244
被调用方故障,调用方会主动停止调用
4345

44-
### 限流
46+
#### 限流
4547

4648
请求数量超出服务的处理能力时,会自动丢弃新来的请求。
4749

48-
### 降级
49-
50-
通过开关配置将某些不重要的业务功能屏蔽掉,以提高服务处理能力
51-
52-
## rabbitmq
53-
54-
### 如何保证rabbitmq集群数据一致性
50+
#### 降级
5551

52+
通过开关配置将某些不重要的业务功能屏蔽掉,以提高服务处理能力

interview/消息队列.md

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
2+
### 如何保证队列数据一致性,防止重复消费,幂等性
3+
4+
所有的消息中间件在实际的业务场景中都逃脱不了保证消息的一致性的问题
5+
6+
`kafka`实际上有个`offset`的概念,就是每个消息写进去,都有一个`offset`,可以理解成一个自增持的序号,一个个排下来
7+
8+
然后消费者`consumer`,每隔一段时间,就把自己消费过的消息提交一下,如果说出现宕机或者重启,则会继续从上次消费的序号接着往下排,继续消费
9+
10+
但是有的时候,消费者`consumer`消费的消息,由于各种原因,比如网络、宕机、停电。。。都没来得及写`offset`,这个时候少数消息会再次消费一次
11+
12+
这个时候,我们可以**用一个唯一的id标识来区分**,这不是消息中间件做的事,而是开发者要做的,比如你消费一个就往数据库插入一条记录,然后下次再去消费的时候,你去查一下,看看这个消息是否被消费了,消费了那就不要重复消费了。
13+
14+
(补充一下:确认一条数据在百万级别海量数据里是否存在?--可以用**布隆过滤器**
15+
16+
* 根据主键查一下,如果这数据都有了,就别插入了,update一下(虽然重复插入会因为唯一键约束而报错,我觉得我们还是应该避免报错)
17+
* 如果是写redis,反正每次都是set,天然幂等性
18+
* 如果不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,就别处理了,保证别重复处理相同的消息即可。
19+
20+
21+
引用:
22+
* [rabbitMq-kafka消息高可用,一致性](https://blog.csdn.net/lanshen110119/article/details/89399084)
23+
* [如何保证消息队列的高可用和幂等性以及数据丢失,顺序一致性](https://www.bilibili.com/read/cv1923046/)
24+
25+
### 如何预防rabbitmq消息的丢失
26+
27+
**情况1、生产者自己丢失了消息(网络故障/发送失败)**
28+
29+
解决方案:
30+
31+
rabbitmq:一般这种都是采用回调接口的方案(confirm模式),就是说你扔一个消息过去了,对方给你一个回调接口,告诉你成功了或者失败了,失败了你可以选择继续扔消息, (重试机制等),来保证消息一定送达
32+
33+
开启confirm模式(异步的)之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
34+
35+
引用: [如何保证消息队列的高可用和幂等性以及数据丢失,顺序一致性](https://www.bilibili.com/read/cv1923046/)
36+
37+
**情况2、消息中间件弄丢了消息**
38+
39+
解决方案:以rabbitmq来说,开启**持久化**就好了,当发生宕机的时候,queue会自动从磁盘恢复数据,除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小
40+
41+
**情况3、消费者弄丢了消息**
42+
43+
rabbitmq如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,rabbitmq认为你都消费了,这数据就丢了。
44+
45+
关闭rabbitmq自动ack机制,可以通过一个api来调用,每次代码里确保处理完的时候,再程序里ack。这样的话,如果还没处理完,就没有ack,那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不会丢的
46+
47+
### 如何预防kafak消息丢失
48+
49+
[TODO](https://github.com/minibear2333/interview-leetcode/issues/35)
50+
51+
### 如何处理消息积压
52+
53+
出现场景:消费能力被阻塞(消费者挂掉或者处理速度慢),生产者还不停的往队列里扔消息
54+
55+
解决方法:
56+
* 快速恢复`consumer`服务,慢慢消费,如果积压的数据量太大的话恢复较慢
57+
* 临时写脚本快速的把这批消息给消费掉,或者增加消费者数量/消费速度,要避免负载把其他服务打挂
58+
* 扩容:提高相同消息的队列数量,出现问题时写脚本分发到不同队列里,再给每个队列指定消费者,消费结束后再恢复
59+
60+
预防:
61+
* 提前准备多个队列在投递时随机投递,存储同类型无顺序要求的消息
62+
* 使用多个消费者
63+
64+
### 消息过期或者队列满了怎么办
65+
66+
消息队列TTL超时或者队列满了数据会丢失,这个时候可以自己再去找消息,然后临时写个代码,自己再手动的去把这些消息重新推送到队列里去。
67+
68+
另一种解决方案
69+
* 可以在 rabbitmq 中声明死信队列,死信队列为处理过期或不能正确路由的消息提供了驻留场所,可以防止消息丢失,便于分析无法消费的原因
70+
* 写程序处理死信队列里的数据,并接入告警分析
71+
72+
如果投递不成功,需要把数据暂存内存或者暂存redis之类的数据库中,等待恢复时重试
73+
74+
### 怎么实现一个延时队列?
75+
76+
rabbitmq 可以配置[延时队列插件](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange)
77+
78+
消费者是无感知的,可以正常消费,生产者设置延迟时间,到了时间后队列里才会出现消息
79+
80+
如果一定要手动实现,可以维护不同的队列指代不同的延迟时间,程序根据相应队列来保证最新消息的时间戳与当前时间延迟转发到实际的消费队列
81+
82+
引用:[过期时间,死信队列,延迟队列,优先队列,持久化](http://www.gxitsky.com/article/1604455229805099)
83+
84+
### 什么是优先队列,怎么实现
85+
86+
优先级高的消息先消费
87+
88+
在rabbitmq中,手动开启设置队列支持的最大优先级(建议不要设置太大,会消耗资源),在投递消息的时间设置优先级数值,队列会自动排序,但是如果消费速度太快,没有任何数据积压的时候,不存在排序也不存在优先级的问题
89+
90+
### 如何保证消息队列的顺序性
91+
92+
同一订单不同消息投递到不同位置,不同消费者消费了同一订单的不同消息,一般出现在
93+
* 程序设计问题投递到不同队列
94+
* 同一队列多个消费者并发消费,无法保证消费顺序
95+
96+
解决方法:
97+
* 队列都是有顺序性保证的,在投递时,创建多个队列,hash投递,hash相同订单号投递同一个队列
98+
* 消费时,保证同一个队列只允许一个消费者消费
99+
* 多线程并发处理时,避免多进程,而是增加线程数,维护多个内存队列把消息归类
100+
101+
引用: [如何保证消息的顺序性](https://xie.infoq.cn/article/c84491a814f99c7b9965732b1)

项目问什么.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@
4646

4747
引用:[程序员如何快速上手一个自己不太熟悉的新项目?有什么技巧?](https://www.zhihu.com/question/38865497)
4848

49-
### 如何实现一个分布式id分发器?
49+
### 分布式锁如何实现
5050

51-
### 如何实现秒杀系统?
51+
[redis](interview/redis.md)
5252

53-
### 怎么实现一个延时队列?
53+
### 如何实现一个分布式id生成器
54+
55+
### 如何实现秒杀系统

0 commit comments

Comments
 (0)