核心组件
- RabbitAdmin
- SpringAMQP声明
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
RabbitAdmin
- RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
- 注意:autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类
- RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
- 然后使用RabbitTemplate的execute方法执行对于的声明、修改、删除等一系列RabbitMQ基础功能操作
- 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等
代码实现
- 增加pom依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置相关Bean
package club.yunqiang.rabbitmqspring;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* @author 张云强
* @date 2019/12/9
*/
@Configuration
@ComponentScan({"club.yunqiang.rabbitmqspring"})
public class RammitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("localhost:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
- 声明交换机、声明队列、建立绑定关系
package club.yunqiang.rabbitmqspring;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitmqSpringApplicationTests {
@Test
void contextLoads() {
}
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin(){
// 声明交换机
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
// 声明队列
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
// 建立绑定关系
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", null));
rabbitAdmin.declareBinding(new Binding("test.topic.queue", Binding.DestinationType.QUEUE, "test.topic", "topic", null));
rabbitAdmin.declareBinding(new Binding("test.fanout.queue", Binding.DestinationType.QUEUE, "test.fanout", "fanout", null));
// 另一种写法
rabbitAdmin.declareBinding(BindingBuilder
// 直接创建队列
.bind(new Queue("test.topic.queue", false))
// 直接建立交换机关联关系
.to(new TopicExchange("test.topic", false, false))
// 指定路由key
.with("topic"));
// fanout 类型的绑定
rabbitAdmin.declareBinding(BindingBuilder
// 直接创建队列
.bind(new Queue("test.fanout", false))
// 直接建立交换机关联关系
.to(new FanoutExchange("test.fanout.queue", false, false)));
// 清空指定队列的消息
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
}
注意:本文归作者所有,未经作者允许,不得转载