消息队列的设计方案

阅读: 评论:0

消息队列的设计⽅案
构建消息队列的整体思路
设计消息队列的整体思路是先创建⼀个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确
认,broker删除/备份消息等。 利⽤RPC将数据流串起来。然后考虑RPC的⾼可⽤性,尽量做到⽆状
态,⽅便⽔平扩展。 之后考虑如何承载消息堆积,然后在合适的时机投递消息,⽽处理堆积的最佳⽅式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。 为了实现⼴播功能,我们必须要维护消费关系,可以利⽤zookeeper/config server等保存消费关系。
当然也并⾮每个消息队列的设计都是有broker的,broker(消息队列的服务端)的作⽤是对消息进⾏转存,以便在更合适的时间进⾏投递。
消息队列的基本功能的实现
1.RPC通信协议
其实消息队列⽤接地府⼀点的话来讲,就是将⽣产者producer产⽣发送给consumer消费者的⼀次RPC,先⾏发送到了消息队列中做暂存,再由消息队列在⼀个合适的时间发送给消费者进⾏消费,这样,⼀次RPC便被转为了两次的RPC,所以我们必须使⽤/⾃⼰实现⼀个RPC框架,⾃⼰实现RPC框架如果并⾮是对性能的极致追求,属实是没有必要的(⽽且个⼈实现的效果估计很难达到已有的成熟实现的⽔平),⽽已经有的RPC框架可以使⽤:Dubbo或者Thrift等成熟的实现。
2.对消息的存储(存储⼦系统的选择)
为了满⾜我们错峰/流控/最终可达等⼀系列需求,把消息存储下来,然后选择时机投递就是我们broker
的意义。⽽这个存储⼜可以分为持久化和⾮持久化两种⽅式,持久化能更⼤程度地保证消息的可靠性(不易失)并且⼀般情况下可⽤存储空间都⽐较⼤(外存显然会⽐内存的存储容量⼤)。但是很多消息对于投递性能的要求⼤于可靠性的要求,且数量极⼤(如⽇志)。此时我们也可⽤⾮持久化的⽅式,将其缓存在内存,然后进⾏投递。
如果我们选择持久化的⽅案,从速度⽅⾯来抉择,⽂件系统>分布式KV(持久化)>分布式⽂件系统>数据库,但是可靠性却恰好相反。例如Kafka使⽤磁盘⽂件的持久化⽅式,没有提供不持久化的选择。采⽤数据⽂件+索引⽂件的⽅式处理,这块的设计⽐较复杂,我个机会写篇博客,当然我们也可以选择 分布式KV(如MongoDB,HBase)等,或者持久化的Redis进⾏存储。
3.消费关系的保存(⼴播、单播关系)
在我们的消息队列现在已经初步具有了消息的转发能⼒(RPC)和存储能⼒(持久化/⾮持久化)的现在,我们需要更进⼀步的完善我们的消息队列,我们就得对发送与接收关系进⾏解析,这样才能实现正确的消息投递。例如Kafka定义的Topic主题、Partition分区、ConsumerGroup消费者组,消费者组对主题进⾏订阅,其中的消费者对各个(可能⼀个消费者对应多个分区)分区进⾏消费。这就是消息通知到⼀个业务集,⽽⼀个业务集内可以有很多台机器,⼀个消息只要⼀台机器消费就可以了。
⼀般⽐较通⽤的设计是⽀持组间⼴播,不同的组注册不同的订阅。组内的不同机器,如果注册⼀个相
同的ID,则单播;如果注册不同的
ID(如IP地址+端⼝),则⼴播。 ⾄于⼴播关系的维护,⼀般由于消息队列本⾝都是集,所以都维护在公共存储上,如config server、zookeeper等。维护⼴播关系所要做的事情基本是⼀致的:1.维护发送关系  2.对发送关系的改变进⾏通知
消息队列⾼级特性的实现
并⾮每个消息队列都会兼顾到所有的⾼级特性,我们需要依照业务的需求,来衡量各种特性实现的利弊,最终做出最为合理的设计。
1.可靠投递(最终⼀致性)
徐山泉
⼀种简单的实现⽅案就是当每次发⽣⼀次可能会发⽣不可靠投递的情况的时候(RPC),先将消息持久化于本地,待发送成功并返回ack确认的时候,再删除本地的持久化消息。(如⽣产者持久化⽣成的数据在本地后,再将数据发往broker,待broker确保持久化了/接收到数据了,再返回确认ack到⽣产者,待⽣产者收到后,才会删除持久化的数据)。
上述⽅案会产⽣⼀个问题,就是消息的重复。对于broker未收到消息进⾏不断的重推送,可能部分消息只是因为⽹络阻塞较迟抵达broker,此时就会产⽣重复的消息。所以,消息重复和消息丢失,我们
总会⾯临⼀个,但是对于消息的重复我们有⽐较多的解决⽅案(见下⽂),但是消息的丢失,想要恢复可能会⾮常困难。
注意,并⾮所有系统都要求实现可靠投递。⽐如⼀个论坛系统。⼀个重复的话题,可能⽐丢失了⼀个发布显得更让⽤户⽆法接受。
2.重复消息的解决⽅案(尽量减少重复消息,毕竟保证重复消息下业务的正确是业务⽅进⾏的,我们消息队列只是⼀个中间件,不应该也⽆法定义业务⽅的解决⽅案)
运行网我们必须尽可能减少重复的消息。曲柄销
⼀种解决⽅案是:broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产⽣重复消息。对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问⽆果,再重发。
如果仍然有重复的消息,我们就像对调⽤的接⼝做幂等性处理⼀样,重复的消息我们同样可以对消息做幂等性处理
我们可以从⼀下三种⽅案来解决消息的重复(摘⾃)。
1.消息id判重
⼀个消息应该有它的唯⼀⾝份。不管是业务⽅⾃定义的,还是根据IP/PID/时间戳⽣成的MessageId,如果有地⽅记录这个MessageId,消息到来是能够进⾏⽐对就 能完成重复的鉴定。数据库的唯⼀键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种原因投递失败,不停重试,都到了下游清理消息的时间)。但这种事情毕竟是少数情况。
2.版本号
举个简单的例⼦,⼀个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第⼆次发⽣在2之后,如果不做重复性判断,显然最终状态是错误的。 但是,如果每个消息⾃带⼀个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护⼀个版本号。 每次只接受⽐当前版本号⼤的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另⼀条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。 如果业务⽅只关⼼消息重复不重复,那么问题就已经解决了。但很多时候另⼀个头疼的问题来了,就是消息顺序如果和想象的顺序不⼀致。⽐如应该的顺序是12,
到来的顺序是21。则最后会发⽣状态错误。 参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收⽐当前版本号⼤⼀的消息。并且在⼀个session周期内要⼀直保存各个消息的版本号。 如果到来的顺序是21,则先把2存起来,待1到来后,先处理1,再处理2,这样重复性和顺序性要求就都达到了。
3.状态机
基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使⽤版本号的最⼤问题是:
1. 对发送⽅必须要求消息带业务版本号。
2. 下游必须存储消息的版本号,对于要严格保证顺序的。
还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。⽽且必须要对此做出处理。试想⼀个永不过期的”session”,⽐如⼀个物品的状态,会不停流转于上下线。那么中间环节的所有存储 就必须保留,直到在某个版本号之前的版本⼀个不丢的到来,成本太⾼。 就刚才的场景看,如果消息没有版本号,该怎么解决呢?业务⽅只需要⾃⼰维护⼀个状态机,定义各种状态的流转关系。例如,”下线”状态只允许接收”上线”消息,“上线”状态只能接收“下线消息”,如果上线收到上线消息,或
者下线收到下线消息,在消息不丢失和上游业务正确的前提下。要么是消息发重了,要么是顺序到达反了。这时消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过⼀段时间重发即可。⽽且重发⼀定要有次数限制,⽐如5次,避免死循环,就解决了。 举例⼦说明,假设产品本⾝状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。 那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。 此时⽆论重发的消息1或者3到来,还是可以接收。另外的重发,在⼀定次数拒绝后停⽌重发,业务正确。
3.消息确认
把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。例如:当broker把消息投递给消费者后,消费者可以⽴即响应我收到了这个消息。但收到了这个消息只是第⼀步,我能不能处理这个消息却不⼀定。或许因为消费能⼒的问题,系统的负荷已经不能处理这个消息;或者是刚才状态机⾥⾯提到的消息不是我想要接收的消息,主动要求重发。
对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但⼀定要允许消费⽅主动进⾏消费确认ack,并与broker约定下次投递时间。例如:收到⼀个消息开始build索引,可能这个消息要处理半个⼩时,但
消息量却是⾮常的⼩。所以reject这块建议做成滑动窗⼝/线程池类似的模型来控制, 消费能⼒不匹配的时候,直接拒绝,过⼀段时间重发,减少业务的负担。 但业务出错这件事情是只有业务⽅⾃⼰知道的,就像上⽂提到的状态机等等。
4.事务
只满⾜持久性不⼀定能满⾜事务的特性。 就⽐如经典的存取款问题,你持久化了存款或者取款都不⾏,必须满⾜事务的⼀致性特征,必须要么都不进⾏,要么都能成功。解决⽅案有两种:
1. 2PC,分布式事务。无纺布折叠机
2. 本地事务,本地持久化,补偿发送。
⼀般对于交易密集型或者I/O密集型的应⽤,采⽤⽅案2。但是⽅案2容易产⽣误解,这个事务并⾮是对RPC成功的事务,⽽是对消息持久化的事务,不然事务嵌套RPC,长事务锁死可以轻松⼲碎你的系统。具体例⼦就像可靠传输所说的⼀样,先将消息持久化于本地(⽤事务,业务⽅可以直接使⽤Spring的@Transactional),待发送成功并返回ack确认的时候,再删除本地的持久化消息。(如⽣产者持久化⽣成的数据在本地后,再将数据发往broker,待broker确保持久化了/接收到数据了,再返回确认ack到⽣产者,待⽣产者收到后,才会删除持久化的数据)。但⽅案2同样也有缺点:配置较为复杂,“”业务⽅,必须本地数据库实例提供⼀个库表来进⾏持久化。
但是,也并⾮所有的业务都是需要进⾏事务操作的,显然银⾏存取款是需要事务保证,但是例如购买业务和发送感谢短信业务,我们不能因为的业务失败⽽要求购买业务跟着回滚,这是不应该的,所以,消息队列应该实现多种类型的消息,例如事务型消息,本地⾮持久型消息,以及服务端不落地的⾮可靠消息。
电动黄包车5.同步/异步
同步能够保证结果,异步能够保证效率,任何的RPC都是存在客户端异步与服务端异步的,⽽且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。
⼀种设计思想是:在消息队列中,我们当然不希望消息的发送阻塞主流程(服务端如果使⽤异步模型,则可能因消息合并带来⼀定程度上的消息延迟),所以可以先使⽤线程池(当然线程池也并⾮是唯⼀的异步⼿段如NIO、事件)提交⼀个发送请求,主流程继续往下⾛。 但是线程池中的请求关⼼结果吗?这是当然,必须等待服务端消息成功落地,才算是消息发送成功。所以这⾥的模型,准确地说事客户端半同步半异步(使⽤线程池不阻塞主流程,但线程池中的任务需要等待服务端的返回),服务端是纯异步。客户端的线程池wait在服务端吐回的future上,直到服务端处理完毕,才解除阻塞继续进⾏。
快门式3d
6.批处理
谈到批处理就得谈到什么时候进⾏消费动作,有三种情况:
1. 攒够了⼀定数量。
2. 到达了⼀定时间。
3. 队列⾥有新的数据到来。
对于及时性要求⾼的数据,可以采⽤⽅式3,⽐如客户端向服务端投递数据。这样在每次将积攒的数据往外刷的过程中,还在等待回复的延迟时,⼜可以积攒⼀部分数据,待新数据⼀来,再次刷出,依次往复,但可能会因为发送过快⽽造成批量⼤⼩国⼩产⽣性能上限。
7.数据消费⽅式(摘⾃)
push还是pull?以下就是这两种模型各⾃的利弊。
慢消费
慢消费⽆疑是push模型最⼤的致命伤,穿成流⽔线来看,如果消费者的速度⽐发送者的速度慢很多,
势必造成消息在broker的堆积。假设这些消息都是有⽤的⽆法丢弃的,消息就要⼀直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送⼀堆consumer⽆法处理的消息,consumer不是reject就是error,然后来回踢⽪球。 反观pull模式,consumer可以按需消费,不⽤担⼼⾃⼰处理不了的消息来骚扰⾃⼰,⽽broker堆积消息也会相对简单,⽆需记录每⼀个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于建⽴索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式⽐较合适。
消息延迟与忙等
这是pull模式最⼤的短板。由于主动权在消费⽅,消费⽅⽆法准确地决定何时去拉取最新的消息。如果⼀次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待⼀段时间重新pull。 但等待多久就很难判定了。你可能会说,我可以有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费⽅。也许1分钟内连续来了1000条消息,然后半个⼩时没有新消息产⽣, 可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让⼈沮丧? 当然也不是说延迟就没有解决⽅案了,业界较成熟的做法是从短时间开始(不会对broker有太⼤负担),然后指数级增长等待。⽐如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。 即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,⽽且对于半个⼩时来⼀次的消息,这些开销就是
⽩⽩浪费的。 在阿⾥的RocketMq⾥,有⼀种优化的做法-长轮询,来平衡推拉模型各⾃的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,⽽是把连接挂在那⾥wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容⼩觑的,还是要合理的评估时间间隔,给wait加⼀个时间上限⽐较好~
顺序消息
如果push模式的消息队列,⽀持分区,单分区只⽀持⼀个消费者消费,并且消费者只有确认⼀个消息消费后才能push送另外⼀个消息,还要发送者保证全局顺序唯⼀,听起来也能做顺序消息,但成本太⾼了,尤其是必须每个消息消费确认后才能发下⼀条消息,这对于本⾝堆积能⼒和慢消费就是瓶颈的push模式的消息队列,简直是⼀场灾难。 反观pull模式,如果想做到全局顺序消息,就相对容易很多:
1. producer对应partition,并且单线程。
2. consumer对应partition,消费确认(或批量确认),继续消费即可。
所以对于⽇志push送这种最好全局有序,但允许出现⼩误差的场景,pull模式⾮常合适。如果你不想看到通篇乱套的⽇志~~ Anyway,需要顺序消息的场景还是⽐较有限的⽽且成本太⾼,请慎重考虑。

本文发布于:2023-07-18 22:26:57,感谢您对本站的认可!

本文链接:https://patent.en369.cn/patent/3/183294.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   投递   业务
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 369专利查询检索平台 豫ICP备2021025688号-20 网站地图