简介
- Return Listerner 用于处理一些不可路由的消息!
- 我们的消息生产者,通过制定一个Exchange和Routingkey,把消息送达到每一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
- 但是在某些情况下,如果我们在发消息的时候,当前的exchange不存在或者制定的路由key路由不到,这个时候我们监听这种不可达的消息,就要用Return Listener!
关键属性
- Mandatory:如果为true,则监听器会接收到路由不可达的消息然后进行后续处理,如果未false,那么broker端自动删除该消息!
Return消息机制流程
代码实现
生产端
package club.yunqiang.rabbitmqapi.returnlistener;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 张云强
* @date 2019/12/7
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("----- handle return -----");
System.err.println("replyCode:" + replyCode);
System.err.println("replyText:" + replyText);
System.err.println("exchange:" + exchange);
System.err.println("routingKey:" + routingKey);
System.err.println("properties:" + properties);
System.err.println("body:" + new String(body));
}
});
String msg = "Hello Rabbit MQ Return Message";
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}
消费端
package club.yunqiang.rabbitmqapi.returnlistener;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 张云强
* @date 2019/12/7
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费端:" + msg);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
注意:本文归作者所有,未经作者允许,不得转载