博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot + RabbitMQ 使用示例
阅读量:5250 次
发布时间:2019-06-14

本文共 8135 字,大约阅读时间需要 27 分钟。

基础知识

  1. 虚拟主机 (Virtual Host): 每个 virtual host 拥有自己的 exchanges, queues 等 (类似 MySQL 中的库)
  2. 交换器 (Exchange): 生产者产生的消息并不是直接发送给 queue 的,而是要经过 exchange 路由, exchange 类型如下:
    1. fanout: 把所有发送到该 exchange 的消息路由到所有与它绑定的 queue 中
    2. direct: 把消息路由到 binding key 与routing key 完全匹配的 queue 中
    3. topic: 模糊匹配 (单词间使用”.”分割,”*” 匹配一个单词,”#” 匹配零个或多个单词)
    4. headers: 根据发送的消息内容中的 headers 属性进行匹配
  3. 信道 (Channel): 建立在真实的 TCP 连接之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过 channel 完成的

使用示例

RabbitMQ 安装参考:

新建 Spring Boot 项目,添加配置:

spring:  rabbitmq:    host: 192.168.30.101    port: 5672    username: admin    password: admin    virtual-host: my_vhostlogging:  level:    com: INFO

1.

Queue

@Configurationpublic class RabbitmqConfig {    @Bean    public Queue hello() {        return new Queue("hello");    }}

Producer

@Component@EnableAsyncpublic class SenderTask {    private static final Logger logger = LoggerFactory.getLogger(SenderTask.class);    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private Queue queue;    @Async    @Scheduled(cron = "0/1 * * * * ? ")    public void send(){        String message = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));        rabbitTemplate.convertAndSend(queue.getName(), message);        logger.info(" [x] Sent '" + message + "'");    }}

Consumer

@Component@RabbitListener(queues = "hello")public class ReceiverTask {    private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class);    @RabbitHandler    public void receive(String in){        logger.info(" [x] Received '" + in + "'");    }}

2.

Exchange, Queue, Binding

@Configurationpublic class RabbitmqConfig {    @Bean    public FanoutExchange fanout() {        return new FanoutExchange("fanoutExchangeTest");    }    @Bean    public Queue autoDeleteQueue1() {        return new AnonymousQueue();// 创建一个非持久的,独占的自动删除队列    }    @Bean    public Queue autoDeleteQueue2() {        return new AnonymousQueue();    }    @Bean    public Binding binding1(FanoutExchange fanout,                            Queue autoDeleteQueue1) {        return BindingBuilder.bind(autoDeleteQueue1).to(fanout);    }    @Bean    public Binding binding2(FanoutExchange fanout,                            Queue autoDeleteQueue2) {        return BindingBuilder.bind(autoDeleteQueue2).to(fanout);    }}

Producer

@Component@EnableAsyncpublic class SenderTask {    private static final Logger logger = LoggerFactory.getLogger(SenderTask.class);    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private FanoutExchange fanoutExchange;    @Async    @Scheduled(cron = "0/1 * * * * ? ")    public void send(){        String message = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));        rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message);        logger.info(" [x] Sent '" + message + "'");    }}

Consumer

@Componentpublic class ReceiverTask {    private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class);    @RabbitListener(queues = "#{autoDeleteQueue1.name}")    public void receive1(String in){        receive(in, 1);    }    @RabbitListener(queues = "#{autoDeleteQueue2.name}")    public void receive2(String in){        receive(in, 2);    }    public void receive(String in, int receiver){        logger.info("instance " + receiver + " [x] Received '" + in + "'");    }}

3.

Exchange, Queue, Binding

@Configurationpublic class RabbitmqConfig {    @Bean    public DirectExchange direct() {        return new DirectExchange("directExchangeTest");    }    @Bean    public Queue autoDeleteQueue1() {        return new AnonymousQueue();// 创建一个非持久的,独占的自动删除队列    }    @Bean    public Queue autoDeleteQueue2() {        return new AnonymousQueue();    }    @Bean    public Binding binding1a(DirectExchange direct,                            Queue autoDeleteQueue1) {        return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("orange");    }    @Bean    public Binding binding1b(DirectExchange direct,                             Queue autoDeleteQueue1) {        return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("green");    }    @Bean    public Binding binding2a(DirectExchange direct,                             Queue autoDeleteQueue2) {        return BindingBuilder.bind(autoDeleteQueue2).to(direct).with("green");    }    @Bean    public Binding binding2b(DirectExchange direct,                             Queue autoDeleteQueue2) {        return BindingBuilder.bind(autoDeleteQueue2).to(direct).with("black");    }}

Producer

@Component@EnableAsyncpublic class SenderTask {    private static final Logger logger = LoggerFactory.getLogger(SenderTask.class);    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private DirectExchange directExchange;    private final String[] keys = {"orange", "black", "green"};    @Async    @Scheduled(cron = "0/1 * * * * ? ")    public void send(){        Random random = new Random();        String key = keys[random.nextInt(keys.length)];        String message = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))                + " to: " + key;        rabbitTemplate.convertAndSend(directExchange.getName(), key, message);        logger.info(" [x] Sent '" + message + "'");    }}

Consumer

@Componentpublic class ReceiverTask {    private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class);    @RabbitListener(queues = "#{autoDeleteQueue1.name}")    public void receive1(String in){        receive(in, 1);    }    @RabbitListener(queues = "#{autoDeleteQueue2.name}")    public void receive2(String in){        receive(in, 2);    }    public void receive(String in, int receiver){        logger.info("instance " + receiver + " [x] Received '" + in + "'");    }}

4.

Exchange, Queue, Binding

@Configurationpublic class RabbitmqConfig {    @Bean    public TopicExchange topic() {        return new TopicExchange("topicExchangeTest");    }    @Bean    public Queue autoDeleteQueue1() {        return new AnonymousQueue();// 创建一个非持久的,独占的自动删除队列    }    @Bean    public Queue autoDeleteQueue2() {        return new AnonymousQueue();    }    @Bean    public Binding binding1a(TopicExchange topic,                            Queue autoDeleteQueue1) {        return BindingBuilder.bind(autoDeleteQueue1).to(topic).with("*.orange.*");    }    @Bean    public Binding binding1b(TopicExchange topic,                             Queue autoDeleteQueue1) {        return BindingBuilder.bind(autoDeleteQueue1).to(topic).with("*.*.rabbit");    }    @Bean    public Binding binding2a(TopicExchange topic,                             Queue autoDeleteQueue2) {        return BindingBuilder.bind(autoDeleteQueue2).to(topic).with("lazy.#");    }}

Producer

@Component@EnableAsyncpublic class SenderTask {    private static final Logger logger = LoggerFactory.getLogger(SenderTask.class);    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private TopicExchange topicExchange;    private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",            "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};    @Async    @Scheduled(cron = "0/1 * * * * ? ")    public void send(){        Random random = new Random();        String key = keys[random.nextInt(keys.length)];        String message = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))                + " to: " + key;        rabbitTemplate.convertAndSend(topicExchange.getName(), key, message);        logger.info(" [x] Sent '" + message + "'");    }}

Consumer

@Componentpublic class ReceiverTask {    private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class);    @RabbitListener(queues = "#{autoDeleteQueue1.name}")    public void receive1(String in){        receive(in, 1);    }    @RabbitListener(queues = "#{autoDeleteQueue2.name}")    public void receive2(String in){        receive(in, 2);    }    public void receive(String in, int receiver){        logger.info("instance " + receiver + " [x] Received '" + in + "'");    }}

完整代码:

转载于:https://www.cnblogs.com/victorbu/p/11181397.html

你可能感兴趣的文章
如莲开发平台(MIS基础框架、Java技术、B/S结构)
查看>>
根据图片url地址获取图片的宽高
查看>>
JIRA地址
查看>>
centos下安装lanmp
查看>>
Bytom资产发行与部署合约教程
查看>>
Python3 yield使用总结
查看>>
hdu 2147 kiki's game
查看>>
java 面试题总结
查看>>
BZOJ3746 : [POI2015]Czarnoksiężnicy okrągłego stołu
查看>>
BZOJ4310 : 跳蚤
查看>>
NSNotificationCenter详解
查看>>
4、Spring Boot 2.x 自动配置原理
查看>>
让Windows Server 2008 + IIS 7+ ASP.NET 支持10万个同时请求
查看>>
Oracle中字符串连接的实现方法
查看>>
Memcache笔记04-Memcached机制深入了解
查看>>
基于c++的日志文件实现
查看>>
JDBC_PreparedStatement用法_占位符_参数处理
查看>>
ASP.NET MVC 3 Beta 发布了
查看>>
1分钟了解协同过滤,pm都懂了
查看>>
SpringMVC-2-(Controller)
查看>>