第3章-深入RabbitMQ高级特性

3-2 消息如何保障 100% 的投递成功方案

image

image

image

image

image

3-4 幂等性概念及业界主流解决方案

image

image

image

3-5 Confirm确认消息详解

image

image

image

3-6 Return返回消息详解

image

image

image

3-7 自定义消费者使用

image

3-8 消费端的限流策略

image

image

image

image

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {


public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchange = "test_qos_exchange";
String routingKey = "qos.save";

String msg = "Hello RabbitMQ QOS Message";

for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}

}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {


public static void main(String[] args) throws Exception {


ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();


String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//1 限流方式 第一件事就是 autoAck设置为 false

channel.basicQos(0, 1, false);

channel.basicConsume(queueName, false, new MyConsumer(channel));


}
}

MyConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.bfxy.rabbitmq.api.limit;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {


private Channel channel ;

public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));

channel.basicAck(envelope.getDeliveryTag(), false);

}


}

3-10 消费端ACK与重回队列机制

image

image

3-11 TTL消息详解

image

3-12 死信队列详解

image

image

image

image

image

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.bfxy.rabbitmq.api.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {


public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";

String msg = "Hello RabbitMQ DLX Message";

for(int i =0; i<1; i ++){

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}

}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.bfxy.rabbitmq.api.dlx;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {


public static void main(String[] args) throws Exception {


ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);

//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");

channel.basicConsume(queueName, true, new MyConsumer(channel));


}
}

MyConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.bfxy.rabbitmq.api.dlx;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {


public MyConsumer(Channel channel) {
super(channel);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}


}


第3章-深入RabbitMQ高级特性
http://yoursite.com/2022/07/11/数据库/RabbitMQ/RabbitMQ消息中间件技术精讲/第3章-深入RabbitMQ高级特性/
作者
mohuani
发布于
2022年7月11日
许可协议