第四章 (1-3)RabbitMQ整合Spring AMQP实战

1年前 ⋅ 883 阅读

核心组件

  • RabbitAdmin
  • SpringAMQP声明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapter
  • MessageConverter

RabbitAdmin

  • RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.setAutoStartup(true);
    return rabbitAdmin;
}
  • 注意:autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类
  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
  • 然后使用RabbitTemplate的execute方法执行对于的声明、修改、删除等一系列RabbitMQ基础功能操作
  • 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等

代码实现

  • 增加pom依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置相关Bean
package club.yunqiang.rabbitmqspring;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
 * @author 张云强
 * @date 2019/12/9
 */
@Configuration
@ComponentScan({"club.yunqiang.rabbitmqspring"})
public class RammitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("localhost:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}
  • 声明交换机、声明队列、建立绑定关系
package club.yunqiang.rabbitmqspring;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqSpringApplicationTests {

    @Test
    void contextLoads() {
    }

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void testAdmin(){
        // 声明交换机
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));

        // 声明队列
        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

        // 建立绑定关系
        rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", null));
        rabbitAdmin.declareBinding(new Binding("test.topic.queue", Binding.DestinationType.QUEUE, "test.topic", "topic", null));
        rabbitAdmin.declareBinding(new Binding("test.fanout.queue", Binding.DestinationType.QUEUE, "test.fanout", "fanout", null));

        // 另一种写法
        rabbitAdmin.declareBinding(BindingBuilder
                // 直接创建队列
                .bind(new Queue("test.topic.queue", false))
                // 直接建立交换机关联关系
                .to(new TopicExchange("test.topic", false, false))
                // 指定路由key
                .with("topic"));

        // fanout 类型的绑定
        rabbitAdmin.declareBinding(BindingBuilder
                // 直接创建队列
                .bind(new Queue("test.fanout", false))
                // 直接建立交换机关联关系
                .to(new FanoutExchange("test.fanout.queue", false, false)));

        // 清空指定队列的消息
        rabbitAdmin.purgeQueue("test.topic.queue", false);
    }
}

全部评论: 0

    我有话说: