RabbitMQ系列(四)Rabbitmq的四种交接机模式和常见问题
讲交换机之前,先上图,复习一下rabbitmq的架构原理
如图所示,rabbitmq 发消息,都是先经过交换机来分发消息,再到队列的。所以了解了交换机模式,更有利于在不同场景使用不同的模式。
交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机和队列是通过路由键来绑定的。
所以通过 rabbitmq 发消息,必备的三要素
交换机名称,路由键,消息内容
RabbitMQ有四种交换机类型,分别是
Direct exchange 直连交换机 (完全匹配)
发送消息使用的routing-key,要与交换机使用的routing-key完全相同。
比如exchange使用的routing-key是"my_routing_key",那发送消息使用的routing_key也要是"my_routing_key"
Fanout exchange 扇形交换机(模糊匹配,可以使用通配符)
*只能匹配一个词,#可以匹配一个或多个词。
注意是词,不是字符。"my_routing_key.key1",my_routing_key是一个词,key1是一个词,词之间用点号分隔。
比如exchange的routing-key是"my_routing_key.#" ,则发送消息使用的routing-key以"my_routing_key."开头即可
我在示例中用的就是这种。这种方式非常适合把一个消息投递到多个queue(应用)
Topic exchange 主题交换机(广播模式)
不使用routing-key,一般把routing-key都设置为空串,当然设置为什么字符串都行,反正都不用。
exchange会把消息投递(广播)到此exchange绑定的所有的queue。
这种模式效率很高,因为不进行routing-key的匹配,大大减小了时间开销。
Headers exchange 首部模式
不使用routing-key(路由键),根据header将消息投递到匹配的queue。
Map<String, Object> exchange_headers = new Hashtable<String, Object>();
headers.put("x-match", "any"); //指定键值对匹配模式any、all
headers.put("key1", "value1"); //放入一些键值对
headers.put("key2", "value2");
//绑定queue时指定指定map。至于routing-key,设置为什么串都行,反正不使用
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", exchange_headers);
//发送消息时也要使用map
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("key1", "value1"); //放入一些键值对
Builder properties = new BasicProperties.Builder();
properties.headers(headers);
String msg="hello";
//指定消息要使用的header。要使用properties的形式,不能直接发送map。会放在http请求头中
channel.basicPublish(EXCHANGE_NAME, "",properties.build(),msg.getBytes());
x-match指定匹配模式,all:发送消息的header(map)中的键值对要和exchange的header中的所有的键值对都要相同,exchange的header有2个键值对key1、key2,发送消息的header中也要有这2个键值对(需要完全相同)。any:发送消息的header中只要有一个键值对和exchange中的键值对完全相同就行,比如key1、key2都行,只要有一个就行了。
header这种方式不常用,因为有点复杂。都要匹配queue,首部交换机的优势是匹配的规则不被限定为字符串(string)。
名司解释:properties
一个消息由properties(消息的一些属性)、body(消息内容)组成,也就是basicPublish()的后2个参数。
properties可以设置此消息的一些参数,比如延时投递、优先级,这些参数写成键值对放在map中,将map转换为properties,再将properties作为basicPublish()的参数。
demo
生产者:
public class Producer {
private final static String QUEUE_NAME = "my_queue"; //队列名称
private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名称
private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名称
private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key
public static void send() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址
connectionFactory.setPort(5672); //使用的端口号
connectionFactory.setVirtualHost("/"); //使用的虚拟主机
//由连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建信道
Channel channel = connection.createChannel();
//通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建
//参数:name、type、是否支持持久化、此交换机没有绑定一个queue时是否自动删除、是否只在rabbitmq内部使用此交换机、此交换机的其它参数(map)
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
//通过信道声明一个queue,如果此队列已存在,则直接使用;如果不存在,会自动创建
//参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//将queue绑定至某个exchange。一个exchange可以绑定多个queue
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
//发送消息
String msg = "hello"; //消息内容
String routing_key = "my_routing_key.key1"; //发送消息使用的routing-key
channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes()); //消息是byte[],可以传递所有类型(转换为byte[]),不局限于字符串
System.out.println("send message:"+msg);
//关闭连接
channel.close();
connection.close();
}
}
消费者:
public class Consumer {
private final static String QUEUE_NAME = "my_queue"; //队列名称
private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名称
private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名称
private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key
public static void receive() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址
connectionFactory.setPort(5672); //使用的端口号
connectionFactory.setVirtualHost("/"); //使用的虚拟主机
//由连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建信道
Channel channel = connection.createChannel();
//通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建
//参数:name、type、是否支持持久化、此交换机没有绑定一个queue时是否自动删除、是否只在rabbitmq内部使用此交换机、此交换机的其它参数(map)
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
//通过信道声明一个queue,如果此队列已存在,则直接使用;如果不存在,会自动创建
//参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//将queue绑定至某个exchange。一个exchange可以绑定多个queue
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
//创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
DefaultConsumer consumer = new DefaultConsumer(channel){
//监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
java.lang.String msg = new java.lang.String(body);
System.out.println("received msg: " + msg);
channel.basicAck(envelope.getDeliveryTag(), false); //处理完了,应答|签收
//channel.basicReject(envelope.getDeliveryTag(), true); //拒收
}
};
//监听指定的queue。会一直监听。
//参数:要监听的queue、是否自动确认消息、使用的Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
就算消费者处理消息时宕机,只要不应答,queue中的这条消息就一直存在,消费者再次启动时还会投递此消息。
basicAck()的第一个参数是DeliveryTag,在一个queue中唯一标识一条消息,相当于一条消息的id。
第二个参数是multiple,多个、批处理,是否将多个消息的应答放在一起、一次性发给queue,设置为true可减少网络流量、防止网络阻塞,但是之前消息的应答有时延。
如果处理消息时发生了异常(代码执行出了问题),在catch中拒收就是了:
catch (...){
channel.basicReject(envelope.getDeliveryTag(), true); //拒收,重新入队
//..... //记录日志
}
第二个参数是requeue,是否重新入队,设置为fasle,不再重新入队,queue会删除此消息;设置为true,重新入队,queue会将此消息重新投递给消费者。
没必要把消息的整个处理流程都放在try中,只把可能出现异常的代码块放在try中即可,在catch中拒收。
这就是rabbitmq提供的可靠投递机制。再加上消息的持久化,做到了rabbitmq的高可靠性。
但重新入队有一个问题:如果大量的消息重新入队,重新投递这些消息会占用资源,使其它消息的投递变慢。
开发过程中可能遇到的问题
1、exchange的name唯一标识一个exchange,调试时可能修改了exchange的类型,如果之前存在一个同名的exchange,会报错。如果之前的同名的exchange不要了,到rabbitmq控制台删除同名的exchange即可;如果之前的同名的exchange还要用,就把现在的exchange改下name。
2、消费者要一直监听queue,所以消费者的channel、connection都没关闭,再次启动时可能连接不上,会报错,因为rabbitmq上还保持着这个连接。等几分钟再运行,等连接超时被删除即可。
3、可以在rabbitmq控制台设置queue的绑定、发送消息到queue
delivery 投递、交付
persistent 持续的、持久的、坚持不懈的。表示此消息会持久化到硬盘。
payload即消息的body。
点击publish message会将此消息投递到当前queue。
4、rabbitmq控制台的queue:
ready是此queue中待投递的消息数,unacked是已投递、但消费者尚未确认的消息数(和快递已签收、未收货差不多),total是消息总数,即前面2个之和。
incoming是一个消息从exchange进入queue花的时间
deliver/get是一个消息从queue投递到消费者花的时间,
ack是一条消息投递给消费者后,过了多长时间queue才收到消费者的应答。
/s表示单位是秒
ack 确认、应答。
消费者收到queue投递的消息,然后处理消息,处理后发送一个数据包给queue作为确认、应答(相当于拿到包裹,试了下没问题,收货),
queue将消息投递给消费者后,queue中仍然保留此消息,要消费者应答后才会删除此消息。
5、消息的有效期
有些消息对即时性要求很高,过了一些时间,如果这条消息还积压在queue中,这条消息可能就没有使用价值了,没必要再投递,需要删除这条消息。
可以设置消息的有效期,如果指定的时间内没有投递此消息,queue会自动删除此消息,不再投递。
- 扫码关注“火龙果编程”公众号,早日成为编程大神
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。