【消息队列的基本介绍】 【RabbitMQ的基本介绍】
一、引入包
为了方便引入了spring-boot-starter-web.jar
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.13.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.1.13.RELEASE</version> </dependency> </dependencies>
|
二、配置rabbitmq访问地址
使用application.yml
1 2 3 4 5 6
| spring: rabbitmq: host: 127.0.0.1 username: admin password: admin port: 5672
|
三、rabbitMQ的3种exchange的配置
1、广播模式 fanout
1.1、使用@Configuration注解、定义Exchange和Queue以及他们之间的关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package net.zuze.config;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutRabbitConfig {
@Bean public Queue fanoutQueue(){ return new Queue("fanout_queue",true,true,true,null); }
@Bean FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout_exchange",true,true,null); }
@Bean Binding bindingFanout(){ return BindingBuilder. bind(fanoutQueue()) .to(fanoutExchange()); } }
|
1.2、生产者 使用RabbitTemplate模板发送消息到RabbitMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package net.zuze.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;
@RequestMapping("/") @RestController public class ProduceController {
@Resource RabbitTemplate rabbitTemplate;
@PostMapping("fanout") public void fanout(){ Map<String,Object> map=new HashMap<>(); map.put("ts",System.currentTimeMillis()); map.put("message","这是一个Fanout模式的消息"); rabbitTemplate.convertAndSend("fanout_exchange", "", map); } }
|
1.3、消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package net.zuze.service;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.util.Map;
@Component @RabbitListener(queues = "fanout_queue") public class FanoutReceiver {
@RabbitHandler public void process(Map<Object,Object> message){ System.out.println("FanoutReceiver消费者收到的消息:"+message.toString()); } }
|
2、直连模式 direct
2.1、使用@Configuration注解、定义Exchange和Queue以及他们之间的关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package net.zuze.config;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class DirectRabbitConfig {
@Bean public Queue directQueue(){ return new Queue("direct_queue"); }
@Bean DirectExchange directExchange(){ return new DirectExchange("direct_exchange"); }
@Bean Binding bindingDirect(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct_routing"); } }
|
2.2、生产者 使用RabbitTemplate模板发送消息到RabbitMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package net.zuze.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;
@RequestMapping("/") @RestController public class ProduceController {
@Resource RabbitTemplate rabbitTemplate;
@PostMapping("direct") public void direct(){ Map<String,Object> map=new HashMap<>(); map.put("ts",System.currentTimeMillis()); map.put("message","这是一个Direct模式的消息"); rabbitTemplate.convertAndSend("direct_exchange", "direct_routing", map); } }
|
2.3、消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package net.zuze.service;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.util.Map;
@Component public class DirectReceiver {
@RabbitListener(queues = "direct_queue") public void process(Map<Object,Object> message){ System.out.println("DirectReceiver消费者收到的消息:"+message.toString()); } }
|
3、主题模式 Topic
3.1、使用@Configuration注解、定义Exchange和Queue以及他们之间的关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package net.zuze.config;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class TopicRabbitConfig {
@Bean public Queue topicQueue(){ return new Queue("topic_queue"); }
@Bean TopicExchange topicExchange(){ return new TopicExchange("topic_exchange"); }
@Bean Binding bindingTopic(){ return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.a.*"); }
}
|
3.2、生产者 使用RabbitTemplate模板发送消息到RabbitMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package net.zuze.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;
@RequestMapping("/") @RestController public class ProduceController {
@Resource RabbitTemplate rabbitTemplate;
@PostMapping("topic") public void topic(){ Map<String,Object> map=new HashMap<>(); map.put("ts",System.currentTimeMillis()); map.put("message","这是一个Topic模式的消息"); rabbitTemplate.convertAndSend("topic_exchange", "you.a.me", map); } }
|
3.3、消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package net.zuze.service;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class TopicReceiver {
@RabbitListener(queues = "topic_queue") public void process(Map<Object,Object> message){ System.out.println("TopicReceiver消费者收到的消息:"+message.toString()); } }
|