BophGuan 1 년 전
커밋
d5af0bdcdf

+ 76 - 0
springboot-rabbitmq/pom.xml

@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.springframework.boot</groupId>
+		<artifactId>spring-boot-starter-parent</artifactId>
+		<version>2.7.5</version>
+		<relativePath/> <!-- lookup parent from repository -->
+	</parent>
+	<groupId>com.gjz</groupId>
+	<artifactId>springboot-rabbitmq</artifactId>
+	<version>0.0.1-SNAPSHOT</version>
+	<name>springboot-rabbitmq</name>
+	<description>springboot-rabbitmq</description>
+	<properties>
+		<java.version>11</java.version>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter</artifactId>
+		</dependency>
+
+		<!--RabbitMQ 依赖-->
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-amqp</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-web</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.alibaba</groupId>
+			<artifactId>fastjson</artifactId>
+			<version>1.2.47</version>
+		</dependency>
+		<dependency>
+			<groupId>org.projectlombok</groupId>
+			<artifactId>lombok</artifactId>
+		</dependency>
+		<!--swagger-->
+		<dependency>
+			<groupId>io.springfox</groupId>
+			<artifactId>springfox-swagger2</artifactId>
+			<version>2.9.2</version>
+		</dependency>
+		<dependency>
+			<groupId>io.springfox</groupId>
+			<artifactId>springfox-swagger-ui</artifactId>
+			<version>2.9.2</version>
+		</dependency>
+		<!--RabbitMQ 测试依赖-->
+		<dependency>
+			<groupId>org.springframework.amqp</groupId>
+			<artifactId>spring-rabbit-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.springframework.boot</groupId>
+				<artifactId>spring-boot-maven-plugin</artifactId>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

+ 13 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/SpringbootRabbitmqApplication.java

@@ -0,0 +1,13 @@
+package com.gjz.rabbitmq;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class SpringbootRabbitmqApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(SpringbootRabbitmqApplication.class, args);
+    }
+
+}

+ 75 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/config/ConfirmConfig.java

@@ -0,0 +1,75 @@
+package com.gjz.rabbitmq.config;
+
+import com.rabbitmq.client.AMQP;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.annotation.QueueBinding;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author Guan
+ * @date 2023/3/12
+ * discription
+ * 配置类 发布确认(高级)
+ */
+@Configuration
+public class ConfirmConfig {
+    //交换机
+    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
+    //队列
+    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
+    //RoutingKey
+    public static final String CONFIRM_ROUTING_KEY = "key1";
+    //备份交换机
+    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
+    //备份队列
+    public static final String BACK_QUEUE_NAME = "back_queue";
+    //报警队列
+    public static final String WARNING_QUEUE_NAME = "warning_queue";
+
+    //声明交换机
+    @Bean
+    public DirectExchange confirmExchange(){
+        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).
+                withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
+    }
+
+    @Bean
+    public Queue confirmQueue(){
+        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
+    }
+
+    @Bean
+    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
+                                        @Qualifier("confirmExchange") DirectExchange confirmExchange){
+        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
+    }
+
+    //备份交换机
+    @Bean
+    public FanoutExchange backupExchange(){
+        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
+    }
+    //备份队列
+    @Bean
+    public Queue backupQueue(){
+        return QueueBuilder.durable(BACK_QUEUE_NAME).build();
+    }
+    //报警队列
+    @Bean
+    public Queue warningQueue(){
+        return new Queue(WARNING_QUEUE_NAME,true);
+    }
+    @Bean
+    public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
+                                                    @Qualifier("backupExchange") FanoutExchange backupExchange){
+        return BindingBuilder.bind(backupQueue).to(backupExchange);
+    }
+    @Bean
+    public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,
+                                                    @Qualifier("backupExchange") FanoutExchange backupExchange){
+        return BindingBuilder.bind(warningQueue).to(backupExchange);
+    }
+
+}

+ 56 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/config/DelayedQueueConfig.java

@@ -0,0 +1,56 @@
+package com.gjz.rabbitmq.config;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.CustomExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Guan
+ * @date 2023/3/12
+ * discription
+ */
+@Configuration
+public class DelayedQueueConfig {
+    //交换机
+    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
+    //队列
+    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
+    //routingKey
+    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
+
+    @Bean
+    public Queue delayedQueue(){
+        return new Queue(DELAYED_QUEUE_NAME);
+    }
+    //声明交换机
+    @Bean
+    public CustomExchange delayedExchange(){
+        Map<String,Object> arguments = new HashMap<>();
+        arguments.put("x-delayed-type", "direct");
+        /**
+         * 1.交换机的名称
+         * 2.交换机的类型
+         * 3.是否需要持久化
+         * 4.是否需要自动删除
+         * 5.其他的参数
+         * 6.
+         */
+        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
+    }
+
+    //绑定
+    @Bean
+    public Binding delayedQueueBindingDelayedExchange(
+            @Qualifier("delayedQueue") Queue delayedQueue,
+            @Qualifier("delayedExchange") CustomExchange delayedExchange
+        ){
+        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
+    }
+}

+ 57 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/config/MyCallBack.java

@@ -0,0 +1,57 @@
+package com.gjz.rabbitmq.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.ReturnedMessage;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author Guan
+ * @date 2023/3/13
+ * discription
+ */
+@Slf4j
+@Component
+public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
+
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    @PostConstruct
+    //注入
+    public void init(){
+        rabbitTemplate.setConfirmCallback(this);
+        rabbitTemplate.setReturnsCallback(this);
+    }
+    /**
+     * 交换机确认回调方法
+     * 1.发消息 交换机接收到了 回调
+     *  1.1 correlationData 保存回调消息的ID及相关信息
+     *  1.2 交换机收到消息 ack=true
+     *  1.3 cause null
+     * 2.发消息 交换机接收失败 回调
+     *  2.1 correlationData 保存回调消息的ID及相关信息
+     *  2.2 交换机收到消息 ack=false
+     *  2.3 cause 失败的原因
+     *
+     */
+    @Override
+    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+        String id = correlationData != null ? correlationData.getId() : "";
+        if (ack){
+            log.info("交换机已经收到了消息ID为:{}的消息",id);
+        }else {
+            log.info("交换机还未收到ID为:{}的消息,由于:{}",id,cause);
+        }
+    }
+
+    @Override
+    public void returnedMessage(ReturnedMessage returnedMessage) {
+        log.error("消息{},被交换机{}退回,退回的原因:{},路由Key:{}",new String(returnedMessage.getMessage().getBody()),returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
+    }
+}

+ 31 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/config/SwaggerConfig.java

@@ -0,0 +1,31 @@
+//package com.gjz.rabbitmq.config;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import springfox.documentation.builders.ApiInfoBuilder;
+//import springfox.documentation.service.ApiInfo;
+//import springfox.documentation.service.Contact;
+//import springfox.documentation.spi.DocumentationType;
+//import springfox.documentation.spring.web.plugins.Docket;
+//import springfox.documentation.swagger2.annotations.EnableSwagger2;
+//
+//@Configuration
+//@EnableSwagger2
+//public class SwaggerConfig {
+//    @Bean
+//    public Docket webApiConfig(){
+//        return new Docket(DocumentationType.SWAGGER_2)
+//                .groupName("webApi")
+//                .apiInfo(webApiInfo())
+//                .select()
+//                .build();
+//    }
+//    private ApiInfo webApiInfo(){
+//        return new ApiInfoBuilder()
+//                .title("rabbitmq 接口文档")
+//                .description("本文档描述了 rabbitmq 微服务接口定义")
+//                .version("1.0")
+//                .contact(new Contact("gjz", "http://gjz.com",
+//                        "123456789@qq.com"))
+//                .build();
+//    }
+//}

+ 113 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/config/TtlQueueConfig.java

@@ -0,0 +1,113 @@
+package com.gjz.rabbitmq.config;
+
+import org.springframework.amqp.core.*;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Controller;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Guan
+ * @date 2023/3/9
+ * discription
+ * TTL队列 配置文件类代码
+ */
+@Configuration
+public class TtlQueueConfig {
+
+    //普通交换机的名称
+    public static final String X_EXCHANGE = "X";
+    //死信交换机名称
+    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
+    //普通队列的名称
+    public static final String QUEUE_A = "QA";
+    public static final String QUEUE_B = "QB";
+
+    public static final String QUEUE_C = "QC";
+    //死信队列名称
+    public static final String DEAD_LETTER_QUEUE = "QD";
+
+    //声明X交换机
+    @Bean("xExchange")
+    public DirectExchange xExchange(){
+        return new DirectExchange(X_EXCHANGE);
+    }
+    //声明Y交换机
+    @Bean("yExchange")
+    public DirectExchange yExchange(){
+        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
+    }
+
+    //声明普通队列 TTL为10s
+    @Bean("queueA")
+    public Queue queueA(){
+        Map<String,Object> arguments = new HashMap<>(3);
+        //设置死信交换机
+        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
+        //设置死信RoutingKey
+        arguments.put("x-dead-letter-routing-key", "YD");
+        //设置TTL 单位ms
+        arguments.put("x-message-ttl", 10*1000);
+        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
+    }
+
+    //声明普通队列 TTL为10s
+    @Bean("queueB")
+    public Queue queueB(){
+        Map<String,Object> arguments = new HashMap<>(3);
+        //设置死信交换机
+        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
+        //设置死信RoutingKey
+        arguments.put("x-dead-letter-routing-key", "YD");
+        //设置TTL 单位ms
+        arguments.put("x-message-ttl", 40*1000);
+        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
+    }
+
+    //声明QC
+    @Bean("queueC")
+    public Queue queueC(){
+        Map<String,Object> arguments = new HashMap<>(3);
+        //设置死信交换机
+        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
+        //设置死信RoutingKey
+        arguments.put("x-dead-letter-routing-key", "YD");
+        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
+    }
+
+
+    //死信队列
+    @Bean("queueD")
+    public Queue queueD(){
+        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
+    }
+
+    //绑定
+    @Bean
+    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
+                                  @Qualifier("xExchange") DirectExchange xExchange){
+        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
+
+    }
+    @Bean
+    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
+                                  @Qualifier("xExchange") DirectExchange xExchange){
+        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
+
+    }
+    @Bean
+    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
+                                  @Qualifier("xExchange") DirectExchange xExchange){
+        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
+    }
+    @Bean
+    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
+                                  @Qualifier("yExchange") DirectExchange yExchange){
+        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
+
+    }
+
+}

+ 22 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/consumer/Consumer.java

@@ -0,0 +1,22 @@
+package com.gjz.rabbitmq.consumer;
+
+import com.gjz.rabbitmq.config.ConfirmConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Guan
+ * @date 2023/3/13
+ * discription
+ */
+@Slf4j
+@Component
+public class Consumer {
+    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
+    public void receiveConfirmMessage(Message message){
+        String msg = new String(message.getBody());
+        log.info("接收到的队列confirm.queue消息:{}",msg);
+    }
+}

+ 29 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/consumer/DeadLetterQueueConsumer.java

@@ -0,0 +1,29 @@
+package com.gjz.rabbitmq.consumer;
+
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+
+/**
+ * @author Guan
+ * @date 2023/3/10
+ * discription
+ * 队列TTL 消费者
+ */
+@Slf4j
+@Component
+public class DeadLetterQueueConsumer {
+
+    //接收消息
+    @RabbitListener(queues = "QD")
+    public void receiveD(Message message, Channel channel) throws Exception{
+        String msg = new String(message.getBody());
+        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
+
+    }
+
+}

+ 26 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/consumer/DelayQueueConsumer.java

@@ -0,0 +1,26 @@
+package com.gjz.rabbitmq.consumer;
+
+import com.gjz.rabbitmq.config.DelayedQueueConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+
+/**
+ * @author Guan
+ * @date 2023/3/12
+ * discription
+ * 消费者 基于插件的延迟消息
+ */
+@Slf4j
+@Component
+public class DelayQueueConsumer {
+    //监听消息
+    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
+    public void receiverDelayQueue(Message message){
+        String msg = new String(message.getBody());
+        log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
+    }
+}

+ 25 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/consumer/WarningConsumer.java

@@ -0,0 +1,25 @@
+package com.gjz.rabbitmq.consumer;
+
+import com.gjz.rabbitmq.config.ConfirmConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Guan
+ * @date 2023/3/14
+ * discription
+ * 报警消费者
+ */
+@Slf4j
+@Component
+public class WarningConsumer {
+    //接收报警消息
+    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
+    public void receiveWarningMsg(Message message){
+        String msg = new String(message.getBody());
+        log.error("报警发现不可路由消息:{}",msg);
+    }
+
+}

+ 42 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/controller/ProducerController.java

@@ -0,0 +1,42 @@
+package com.gjz.rabbitmq.controller;
+
+import com.gjz.rabbitmq.config.ConfirmConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @author Guan
+ * @date 2023/3/13
+ * discription
+ * 测速确认
+ */
+@RestController
+@RequestMapping("/confirm")
+@Slf4j
+public class ProducerController {
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+    //发消息
+    @GetMapping("/sendMessage/{message}")
+    public void sendMessage(@PathVariable String message){
+        CorrelationData correlationData1 = new CorrelationData("1");
+
+        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
+                ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData1);
+        log.info("1发送消息内容:{}",message);
+
+        CorrelationData correlationData2 = new CorrelationData("2");
+
+        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
+                ConfirmConfig.CONFIRM_ROUTING_KEY + "2", message,correlationData2);
+        log.info("2发送消息内容:{}",message);
+
+    }
+
+}

+ 67 - 0
springboot-rabbitmq/src/main/java/com/gjz/rabbitmq/controller/SendMsgController.java

@@ -0,0 +1,67 @@
+package com.gjz.rabbitmq.controller;
+
+import com.gjz.rabbitmq.config.DelayedQueueConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Date;
+
+/**
+ * @author Guan
+ * @date 2023/3/10
+ * discription
+ * 发送延迟消息
+ */
+@Slf4j
+@RestController
+@RequestMapping("/ttl")
+public class SendMsgController {
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    //开始发消息
+    @GetMapping("/sendMsg/{message}")
+    public void sendMsg(@PathVariable String message){
+        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
+        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s得队列:" + message);
+        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:" + message);
+
+    }
+
+    //开始发消息 消息+ttl
+    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
+    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime ){
+        log.info("当前时间:{},发送一条时长--{}毫秒TTL消息给队列:{}",new Date().toString(),ttlTime,message);
+        rabbitTemplate.convertAndSend("X", "XC","消息来自自定义ttl的队列:" + message , msg -> {
+            //发送消息的时候 延迟时长
+            msg.getMessageProperties().setExpiration(ttlTime);
+            return msg;
+        });
+
+
+    }
+    //开始发消息 基于插件的
+    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
+    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime ){
+        log.info("当前时间:{},发送一条时长{}毫秒的消息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);
+
+        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY,
+                message , msg -> {
+            //发送消息的时候 延迟时长
+            msg.getMessageProperties().setDelay(delayTime);
+            return msg;
+        });
+
+    }
+
+
+
+
+}

+ 6 - 0
springboot-rabbitmq/src/main/resources/application.properties

@@ -0,0 +1,6 @@
+spring.rabbitmq.host=192.168.52.121
+spring.rabbitmq.port=5672
+spring.rabbitmq.username=root
+spring.rabbitmq.password=abc_123
+spring.rabbitmq.publisher-confirm-type=correlated
+spring.rabbitmq.publisher-returns=true