0%

springboot系列(012)之接入rabbitmq

消息队列的基本介绍】 【RabbitMQ的基本介绍

一、引入包

为了方便引入了spring-boot-starter-web.jar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.13.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.13.RELEASE</version>
</dependency>
</dependencies>

二、配置rabbitmq访问地址

使用application.yml

1
2
3
4
5
6
spring:
rabbitmq:
host: 127.0.0.1
username: admin
password: admin
port: 5672

三、rabbitMQ的3种exchange的配置

1、广播模式 fanout

1.1、使用@Configuration注解、定义Exchange和Queue以及他们之间的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

package net.zuze.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author Administrator
* @date 2022/12/6
**/
@Configuration
public class FanoutRabbitConfig {

@Bean
public Queue fanoutQueue(){
//参数介绍、1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数Map<String, Object>格式
return new Queue("fanout_queue",true,true,true,null);
}

@Bean
FanoutExchange fanoutExchange(){
//参数介绍 1.交换器名 2.是否持久化 3.自动删除 4.其他参数Map<String, Object>格式
return new FanoutExchange("fanout_exchange",true,true,null);
}


@Bean
Binding bindingFanout(){
return BindingBuilder.
bind(fanoutQueue()) //绑定队列
.to(fanoutExchange());//队列绑定到哪个交换器
}
}

1.2、生产者 使用RabbitTemplate模板发送消息到RabbitMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package net.zuze.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
* @author Administrator
* @date 2022/12/6
**/
@RequestMapping("/")
@RestController
public class ProduceController {

@Resource
RabbitTemplate rabbitTemplate;

@PostMapping("fanout")
public void fanout(){
Map<String,Object> map=new HashMap<>();
map.put("ts",System.currentTimeMillis());
map.put("message","这是一个Fanout模式的消息");
rabbitTemplate.convertAndSend("fanout_exchange", "", map);
}
}

1.3、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package net.zuze.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @author Administrator
* @date 2022/12/6
**/
@Component
@RabbitListener(queues = "fanout_queue")
public class FanoutReceiver {

@RabbitHandler
public void process(Map<Object,Object> message){
System.out.println("FanoutReceiver消费者收到的消息:"+message.toString());
}
}

2、直连模式 direct

2.1、使用@Configuration注解、定义Exchange和Queue以及他们之间的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package net.zuze.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* @author Administrator
* @date 2022/12/6
**/

@Configuration
public class DirectRabbitConfig {

@Bean
public Queue directQueue(){
return new Queue("direct_queue");
}

@Bean
DirectExchange directExchange(){
return new DirectExchange("direct_exchange");
}


@Bean
Binding bindingDirect(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct_routing");
}
}

2.2、生产者 使用RabbitTemplate模板发送消息到RabbitMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package net.zuze.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
* @author Administrator
* @date 2022/12/6
**/
@RequestMapping("/")
@RestController
public class ProduceController {

@Resource
RabbitTemplate rabbitTemplate;

@PostMapping("direct")
public void direct(){
Map<String,Object> map=new HashMap<>();
map.put("ts",System.currentTimeMillis());
map.put("message","这是一个Direct模式的消息");
rabbitTemplate.convertAndSend("direct_exchange", "direct_routing", map);
}
}

2.3、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package net.zuze.service;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @author Administrator
* @date 2022/12/6
**/
@Component
public class DirectReceiver {

@RabbitListener(queues = "direct_queue")
public void process(Map<Object,Object> message){
System.out.println("DirectReceiver消费者收到的消息:"+message.toString());
}
}

3、主题模式 Topic

3.1、使用@Configuration注解、定义Exchange和Queue以及他们之间的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package net.zuze.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author Administrator
* @date 2022/12/6
**/

@Configuration
public class TopicRabbitConfig {

@Bean
public Queue topicQueue(){
return new Queue("topic_queue");
}

@Bean
TopicExchange topicExchange(){
return new TopicExchange("topic_exchange");
}

@Bean
Binding bindingTopic(){
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.a.*");
}

}

3.2、生产者 使用RabbitTemplate模板发送消息到RabbitMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package net.zuze.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
* @author Administrator
* @date 2022/12/6
**/
@RequestMapping("/")
@RestController
public class ProduceController {

@Resource
RabbitTemplate rabbitTemplate;

@PostMapping("topic")
public void topic(){
Map<String,Object> map=new HashMap<>();
map.put("ts",System.currentTimeMillis());
map.put("message","这是一个Topic模式的消息");
rabbitTemplate.convertAndSend("topic_exchange", "you.a.me", map);
}
}

3.3、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package net.zuze.service;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @author Administrator
* @date 2022/12/6
**/
@Component

public class TopicReceiver {

@RabbitListener(queues = "topic_queue")
public void process(Map<Object,Object> message){
System.out.println("TopicReceiver消费者收到的消息:"+message.toString());
}
}