Spring Boot RabbitMQ 延迟消息实现完整版示例
作者: / 2019-11-18 / 浏览次数:

能够用行列,订单下完后,发送一个音讯到行列里,并指定过期时刻,时刻一到,履行回调接口。

面试官听完后,就不再问了。其实我其时的思路是对的,只不过讲的不是很专业罢了。专业说法是运用 推迟音讯 。

其有用守时使命,的确有点问题,本来事务体系期望10分钟后,假如订单未付出,就立刻撤销订单,并开释产品库存。可是一旦数据量大的话,就会加长获取未付出订单数据的时刻,部分订单就做不到10分钟后撤销了,可能是15分钟,20分钟之类的。这样的话,库存就无法及时得到开释,也就会影响成单数。而运用推迟音讯,则理论上是能够做到依照设定的时刻,进行订单撤销操作的。

现在网上关于运用rabbitmq完成推迟音讯的文章,大多都是讲怎么运用rabbitmq的死信行列来完成,完成计划看起来都很繁琐杂乱,并且仍是运用原始的rabbitmq client api来完成的,愈加显得烦琐。

spring boot 现已对rabbitmq client api进行了包装,运用起来简练许多,下面具体介绍一下怎么运用 rabbitmq_delayed_message_exchange 插件和spring boot来完成推迟音讯。

软件预备

erlang


rabbitmq

本文运用的是 window 版别的rabbitmq,版别号是:3.7.4

rabbitmq_delayed_message_exchange插件

插件下载地址:

翻开网址后,ctrl + f,查找 rabbitmq_delayed_message_exchange 。

千万记住,必定选好版别号,由于我运用的是rabbitmq 3.7.4,因而对应的 rabbitmq_delayed_message_exchange 插件也有必要挑选3.7.x的。

假如没有选对版别,在运用推迟音讯的时分,会遇到各式各样的奇葩问题,并且网上还找不到解决计划。我由于这个问题,折腾了整整一个晚上。请紧记,要选对插件版别。

下载完插件后,将其放置到rabbitmq装置目录下的 plugins 目录下,并运用如下指令发动这个插件:


假如发动成功会呈现如下信息:


发动插件成功后,记住重启一下rabbitmq,让其收效。

集成rabbitmq

这个就十分简略了,直接在maven工程的pom.xml文件中参加

 dependency 
 groupid org.springframework.boot /groupid 
 artifactid spring-boot-starter-amqp /artifactid 
 /dependency 

spring boot的版别我运用的是 2.0.1.release .

接下来在 application.properties 文件中参加redis装备:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

界说connectionfactory和rabbittemplate

也很简略,代码如下:

package com.mq.rabbitmq;
凯发体育网址import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
@configurationproperties
public class rabbitmqconfig {
 private string host;
 private int port;
 private string username;
 private string password;
 @bean
 public connectionfactory connectionfactory {
 cachingconnectionfactory cachingconnectionfactory = new cachingconnectionfactory;
 cachingconnectionfactory.setusername;
 cachingconnectionfactory.setpassword;
 cachingconnectionfactory.setvirtualhost;
 cachingconnectionfactory.setpublisherconfirms;
 return cachingconnectionfactory;
 @bean
 public rabbittemplate rabbittemplate {
 rabbittemplate rabbittemplate = new rabbittemplate);
 return rabbittemplate;
 public string gethost {
 return host;
 public void sethost {
 this.host = host;
 public int getport {
 return port;
 public void setport {
 this.port = port;
 public string getusername {
 return username;
 public void setusername {
 this.username = username;
 public string getpassword {
 return password;
 public void setpassword {
 this.password = password;

exchange和queue装备

package com.mq.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.util.hashmap;
import java.util.map;
@configuration
public class queueconfig {
 @bean
 public customexchange delayexchange {
 map string, object args = new hashmap ;
 args.put;
 return new customexchange;
 @bean
 public queue queue {
 queue queue = new queue;
 return queue;
 @bean
 public binding binding {
 return bindingbuilder.bind).to).with.noargs;

这儿要特别注意的是,运用的是 customexchange ,不是 directexchange ,别的 customexchange 的类型有必要是 x-delayed-message 。

完成音讯发送

package com.mq.rabbitmq;
import org.springframework.amqp.amqpexception;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagepostprocessor;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
import java.text.simpledateformat;
import java.util.date;
@service
public class messageserviceimpl {
 @autowired
 private rabbittemplate rabbittemplate;
 public void sendmsg {
 simpledateformat sdf = new simpledateformat;
 system.out.println));
 rabbittemplate.convertandsend {
 @override
 public message postprocessmessage throws amqpexception {
 message.getmessageproperties.setheader;
 return message;

注意在发送的时分,有必要加上一个header

x-delay

在这儿我设置的推迟时刻是3秒。

音讯顾客

package com.mq.rabbitmq;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.text.simpledateformat;
import java.util.date;
@component
public class messagereceiver {
 @rabbitlistener
 public void receive {
 simpledateformat sdf = new simpledateformat;
 system.out.println));
 system.out.println;

运转spring boot程序和发送音讯

直接在main办法里运转spring boot程序,spring boot会主动解析 messagereceiver 类的。

接下来只需要用junit运转一下发送音讯的接口即可。

package com.mq.rabbitmq;
import org.junit.test;
import org.junit.runner.runwith;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.test.context.junit4.springrunner;
@runwith
@springboottest
public class rabbitmqapplicationtests {
 @autowired
 private messageserviceimpl messageservice;
 @test
 public void send {
 messageservice.sendmsg;





以上便是本文的全部内容,期望对我们的学习有所协助,也期望我们多多支撑萬仟网。

【某某业务】网站建设、网站设计、服务器空间租售、网站维护、网站托管、网站优化、百度推广、自媒体营销、微信公众号
如有意向---联系我们
热门栏目
热门资讯

网站建设 网站托管 成功案例 新闻动态 关于我们 联系我们 服务器空间 加盟合作 网站优化

备案号: 

公司地址:江苏省南京市玄武区玄武湖 咨询QQ:9490489 手机: 电话: