消息队列实现(消息队列实现原理)
本篇文章给大家谈谈消息队列实现,以及消息队列实现原理对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
Redis实现简单消息队列
打开浏览器,输入地址,按下回车,打开了页面。于是一个 HTTP 请求( request )就由客户端发送到服务器,服务器处理请求,返回响应( response )内容。
我们每天都在浏览网页,发送大大小小的请求给服务器。有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情。
更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做。从事异步任务的工具有很多。主要原理还是处理通知消息,针对通知消息通常采取是队列结构。生产和消费消息进行通信和业务实现。
上述异步任务的实现,可以抽象为生产者消费模型。如同一个餐馆,厨师在做饭,吃货在吃饭。如果厨师做了很多,暂时卖不完,厨师就会休息;如果客户很多,厨师马不停蹄的忙碌,客户则需要慢慢等待。实现生产者和消费者的方式用很多,下面使用 Python 标准库 Queue 写个小例子:
大概输出如下:
Python内置了一个好用的队列结构。我们也可以是用redis实现类似的操作。并做一个简单的异步任务。
Redis提供了两种方式来作消息队列。一个是使用 生产者消费模式 模式,另外一个方法就是 发布订阅者模式 。前者会让一个或者多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的,如果队列里没有消息,则消费者继续监听。后者也是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。
主要使用了redis提供的blpop获取队列数据,如果队列没有数据则阻塞等待,也就是监听。
使用redis的pubsub功能,订阅者订阅频道,发布者发布消息到频道了,频道就是一个消息队列。
我们分别实现了两种异步任务的后端服务,直接启动他们,就能监听redis队列或频道的消息了。简单的测试如下:
启动脚本,使用
可以分别在监听的脚本输入中看到异步消息。在异步的任务中,可以执行一些耗时间的操作,当然目前这些做法并不知道异步的执行结果,如果需要知道异步的执行结果,可以考虑设计协程任务或者使用一些工具如 RQ 或者 celery 等。
通过Redis消息队列实现大文件处理
一、故事背景
1、读取离线文件数据,再通过【离线数据】作为条件,查询第三方接口,返回最终的结果,再入库。
2、 业务逻辑是很简单, 读取文件、查询接口、返回数据集、入库 四步。
3、业务特性:第三方接口调用400毫秒(ms) 。
如果用普通单线程去跑算500毫秒一个请求,一天也就跑8W多数据量,20多亿的数据不知道跑到猴年马月了。
二、处理方案
A) 初步方案采用ganymed-ssh2(文件都存储在Linux服务器上) 来读文件,Redis来存储消息、多线程来提升处理能力。
B) 流程图:
三、呈现问题
四、优化问题
最终流程图:
1、 通过Redis做一个计数器 每读取一行记录数值,即使服务终止后,先从Redis读取这个数值
再通过cat指定行数开始读数据即可。
2、 通过取模拆Key 分片到不同小Key存储 ,降低单个节点存储压力,也充分利用了存储资源。
3、Redis Push 提供了批量方式(leftPushAll) ,可以指定读取行数再批量入库,而pop并没有提供批量 只能一个一个pop。
4、消费者通过多线程pop、再分发到线程去处理。
五、总结问题
Redis 实现 消息队列
虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。
消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。
生产者提供一个消息id,消费者要把已经处理过的消息 ID 号记录下来。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。
为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。
发送消息日志
消费消息日志
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
XREAD:用于读取消息,可以按 ID 读取数据;
XREADGROUP:按消费组形式读取消息;
XPENDING :XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,
XACK:XACK 命令用于向消息队列确认消息处理已完成。
监听器
监听器容器
初始化消息队列(integral-queue)和消费者组(integral-group),执行RedisMqTest.initMessageQueue()即可
可参考List实现的处理方式
可使用RedisMqTest.getPendingQueue()获取未成功消费的消息队列进行处理
.NetCore利用BlockingCollection实现简易消息队列
消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。
我们用BlockingCollection来实现简单的消息队列。
用Vs2017创建一个控制台应用程序。创建 DemoQueueBlock 类,封装一些常用判断。
为了不把BlockingCollection直接暴漏给使用者,我们封装一个 DemoQueueBlock 类
通过控制台,添加元素
通过判断IsComleted,来确定是否获取队列
这样我们就实现了简易的消息队列。
简易队列
BlockingCollection
Orleans源码分析
消息队列实现的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于消息队列实现原理、消息队列实现的信息别忘了在本站进行查找喔。