德必信生活网

您现在的位置是:首页 > 生活资讯 > 正文

生活资讯

消息队列实现(消息队列实现最终一致性)

阿信2023-04-13生活资讯86

今天给各位分享消息队列实现的知识,其中也会对消息队列实现最终一致性进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

Redis 实现 消息队列

虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。

消费者在处理消息的时候,还猛耐可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

生产者提供一个消息id,消费者要把已经处理过的消息 ID 号记录下来。当收到一条消息后,大侍消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

发送消息日志

消费消息日志

Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。

XADD:插入消息,保证有序,可以自动生成全局唯一 ID;

XREAD:用于读取消息,可以按 ID 读取数据;

XREADGROUP:按消费组形式读取消息;

XPENDING :XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,

XACK:XACK 命令用于向消息队列确认消息处理已完成。

监听器

监听器容器

初始化消息队列(integral-queue)和消费者组(integral-group),执行RedisMqTest.initMessageQueue()即可

可参考List实现的处理方式

可使用RedisMqTest.getPendingQueue()获取未滚知吵成功消费的消息队列进行处理

.NetCore利用BlockingCollection实现简易消息队列

消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。

我们用兄答凳BlockingCollection来实现简单的消息队列。

用Vs2017创建一个控制台应用程序。创建 DemoQueueBlock 类,封装一些常用判断。

为了不把BlockingCollection直接暴漏给使用者,我们封装一个 DemoQueueBlock 类

通过控制台,添加元素

通过判举野断IsComleted,来确定是否羡旅获取队列

这样我们就实现了简易的消息队列。

简易队列

BlockingCollection

Orleans源码分析

Redis实现简单消息队列

打开浏览器,输入地址,按下回车,打开凯芹了页面。于是一个 HTTP 请求( request )就由客户端发送到服务器,服务器处理请求,返回响应( response )内容。

我们每天都在浏览网页,发送大大小小的请求给服务器。有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情。

更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做。从事异步任务的工具有很多。主要原理还是处理通知消息,针对通知消息通常采取是队列结构。生产和消费消息进行通信和业务实现。

上述异步任务的实现,知纤可以抽象为生产者消费模型。如同一个餐馆,厨师在做饭,吃货在吃饭。如果厨师做了很多,暂时卖不完,厨师就会休息;如果客户很多,厨师马不停蹄的忙碌,客户则需要慢慢等待。实现生产者和消费者的方式用很多,下面使用 Python 标准库 Queue 写个小例子:

大概输出如下:

Python内置了一个好用的队列结构。我们也可以是用redis实现类似的操作。并做一个简单的异步任务。

Redis提供了两种方式来作消息队列。一个是使用 生产者消费模式 模式,另外一个方法就是 发布订阅者模式 。前者会让一个或者多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的,如果队列里没有消息,则消费者继续监听。后者也是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。

主要使用了redis提供的blpop获取队列数据,如果队列没有数据则阻塞盯猛毕等待,也就是监听。

使用redis的pubsub功能,订阅者订阅频道,发布者发布消息到频道了,频道就是一个消息队列。

我们分别实现了两种异步任务的后端服务,直接启动他们,就能监听redis队列或频道的消息了。简单的测试如下:

启动脚本,使用

可以分别在监听的脚本输入中看到异步消息。在异步的任务中,可以执行一些耗时间的操作,当然目前这些做法并不知道异步的执行结果,如果需要知道异步的执行结果,可以考虑设计协程任务或者使用一些工具如 RQ 或者 celery 等。

消息队列实现的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于消息队列实现最终一致性、消息队列实现的信息别忘了在本站进行查找喔。