讲交换机之前,先上图,复习一下rabbitmq的架构原理

WechatIMG817.png

如图所示,rabbitmq 发消息,都是先经过交换机来分发消息,再到队列的。所以了解了交换机模式,更有利于在不同场景使用不同的模式。

交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机和队列是通过路由键来绑定的。

所以通过 rabbitmq 发消息,必备的三要素
交换机名称,路由键,消息内容

RabbitMQ有四种交换机类型,分别是

Direct exchange 直连交换机 (完全匹配)

WechatIMG815.png

发送消息使用的routing-key,要与交换机使用的routing-key完全相同。

比如exchange使用的routing-key是"my_routing_key",那发送消息使用的routing_key也要是"my_routing_key"

Fanout exchange 扇形交换机(模糊匹配,可以使用通配符)

WechatIMG814.png

*只能匹配一个词,#可以匹配一个或多个词。

注意是词,不是字符。"my_routing_key.key1",my_routing_key是一个词,key1是一个词,词之间用点号分隔。

比如exchange的routing-key是"my_routing_key.#" ,则发送消息使用的routing-key以"my_routing_key."开头即可

我在示例中用的就是这种。这种方式非常适合把一个消息投递到多个queue(应用)

Topic exchange 主题交换机(广播模式)

WechatIMG816.png

不使用routing-key,一般把routing-key都设置为空串,当然设置为什么字符串都行,反正都不用。

exchange会把消息投递(广播)到此exchange绑定的所有的queue。

这种模式效率很高,因为不进行routing-key的匹配,大大减小了时间开销。

Headers exchange 首部模式

不使用routing-key(路由键),根据header将消息投递到匹配的queue。


Map<String, Object> exchange_headers = new Hashtable<String, Object>();
headers.put("x-match", "any"); //指定键值对匹配模式any、all
headers.put("key1", "value1");  //放入一些键值对
headers.put("key2", "value2");

//绑定queue时指定指定map。至于routing-key,设置为什么串都行,反正不使用
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", exchange_headers);


//发送消息时也要使用map
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("key1", "value1");  //放入一些键值对


Builder properties = new BasicProperties.Builder();
properties.headers(headers);


String msg="hello";
//指定消息要使用的header。要使用properties的形式,不能直接发送map。会放在http请求头中
channel.basicPublish(EXCHANGE_NAME, "",properties.build(),msg.getBytes());

x-match指定匹配模式,all:发送消息的header(map)中的键值对要和exchange的header中的所有的键值对都要相同,exchange的header有2个键值对key1、key2,发送消息的header中也要有这2个键值对(需要完全相同)。any:发送消息的header中只要有一个键值对和exchange中的键值对完全相同就行,比如key1、key2都行,只要有一个就行了。

header这种方式不常用,因为有点复杂。都要匹配queue,首部交换机的优势是匹配的规则不被限定为字符串(string)。

名司解释:properties

一个消息由properties(消息的一些属性)、body(消息内容)组成,也就是basicPublish()的后2个参数。

properties可以设置此消息的一些参数,比如延时投递、优先级,这些参数写成键值对放在map中,将map转换为properties,再将properties作为basicPublish()的参数。

demo

生产者:

public class Producer {
    private final static String QUEUE_NAME = "my_queue"; //队列名称
    private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名称
    private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名称
    private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key

    public static void send() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口号
        connectionFactory.setVirtualHost("/");  //使用的虚拟主机

        //由连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //通过连接创建信道
        Channel channel = connection.createChannel();

        //通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建
        //参数:name、type、是否支持持久化、此交换机没有绑定一个queue时是否自动删除、是否只在rabbitmq内部使用此交换机、此交换机的其它参数(map)
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

        //通过信道声明一个queue,如果此队列已存在,则直接使用;如果不存在,会自动创建
        //参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //将queue绑定至某个exchange。一个exchange可以绑定多个queue
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

        //发送消息
        String msg = "hello";  //消息内容
        String routing_key = "my_routing_key.key1";  //发送消息使用的routing-key
        channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes()); //消息是byte[],可以传递所有类型(转换为byte[]),不局限于字符串
        System.out.println("send message:"+msg);

        //关闭连接
        channel.close();
        connection.close();
    }

}

消费者:


public class Consumer {
    private final static String QUEUE_NAME = "my_queue"; //队列名称
    private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名称
    private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名称
    private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key

    public static void receive() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //设置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口号
        connectionFactory.setVirtualHost("/");  //使用的虚拟主机

        //由连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //通过连接创建信道
        Channel channel = connection.createChannel();

        //通过信道声明一个exchange,若已存在则直接使用,不存在会自动创建
        //参数:name、type、是否支持持久化、此交换机没有绑定一个queue时是否自动删除、是否只在rabbitmq内部使用此交换机、此交换机的其它参数(map)
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

        //通过信道声明一个queue,如果此队列已存在,则直接使用;如果不存在,会自动创建
        //参数:name、是否支持持久化、是否是排它的、是否支持自动删除、其他参数(map)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //将queue绑定至某个exchange。一个exchange可以绑定多个queue
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

        //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);  //处理完了,应答|签收
                //channel.basicReject(envelope.getDeliveryTag(), true); //拒收
            }
        };

        //监听指定的queue。会一直监听。
        //参数:要监听的queue、是否自动确认消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

就算消费者处理消息时宕机,只要不应答,queue中的这条消息就一直存在,消费者再次启动时还会投递此消息。
basicAck()的第一个参数是DeliveryTag,在一个queue中唯一标识一条消息,相当于一条消息的id。

第二个参数是multiple,多个、批处理,是否将多个消息的应答放在一起、一次性发给queue,设置为true可减少网络流量、防止网络阻塞,但是之前消息的应答有时延。

如果处理消息时发生了异常(代码执行出了问题),在catch中拒收就是了:

catch (...){ 
  channel.basicReject(envelope.getDeliveryTag(), true); //拒收,重新入队
  //.....     //记录日志
}  

第二个参数是requeue,是否重新入队,设置为fasle,不再重新入队,queue会删除此消息;设置为true,重新入队,queue会将此消息重新投递给消费者。

没必要把消息的整个处理流程都放在try中,只把可能出现异常的代码块放在try中即可,在catch中拒收。
这就是rabbitmq提供的可靠投递机制。再加上消息的持久化,做到了rabbitmq的高可靠性。

但重新入队有一个问题:如果大量的消息重新入队,重新投递这些消息会占用资源,使其它消息的投递变慢。

开发过程中可能遇到的问题

1、exchange的name唯一标识一个exchange,调试时可能修改了exchange的类型,如果之前存在一个同名的exchange,会报错。如果之前的同名的exchange不要了,到rabbitmq控制台删除同名的exchange即可;如果之前的同名的exchange还要用,就把现在的exchange改下name。


2、消费者要一直监听queue,所以消费者的channel、connection都没关闭,再次启动时可能连接不上,会报错,因为rabbitmq上还保持着这个连接。等几分钟再运行,等连接超时被删除即可。

3、可以在rabbitmq控制台设置queue的绑定、发送消息到queue

1685101-20200317211627684-150372580.jpg

delivery 投递、交付

persistent 持续的、持久的、坚持不懈的。表示此消息会持久化到硬盘。

payload即消息的body。

点击publish message会将此消息投递到当前queue。

4、rabbitmq控制台的queue:

8181630393806_.pic_hd.jpg

ready是此queue中待投递的消息数,unacked是已投递、但消费者尚未确认的消息数(和快递已签收、未收货差不多),total是消息总数,即前面2个之和。

incoming是一个消息从exchange进入queue花的时间

deliver/get是一个消息从queue投递到消费者花的时间,

ack是一条消息投递给消费者后,过了多长时间queue才收到消费者的应答。

/s表示单位是秒

ack 确认、应答。

消费者收到queue投递的消息,然后处理消息,处理后发送一个数据包给queue作为确认、应答(相当于拿到包裹,试了下没问题,收货),

queue将消息投递给消费者后,queue中仍然保留此消息,要消费者应答后才会删除此消息。


5、消息的有效期

有些消息对即时性要求很高,过了一些时间,如果这条消息还积压在queue中,这条消息可能就没有使用价值了,没必要再投递,需要删除这条消息。

可以设置消息的有效期,如果指定的时间内没有投递此消息,queue会自动删除此消息,不再投递。

文章目录