RabbitMQ客户端参数性能测试(1)

最近我在公司上线了rabbitmq,替换了原来阿里出的rocketmq(别说我黑阿里的东西,这玩意真的都是坑),我并不想告诉你rabbitmq安装过程是怎么样的,去看官网就知道,戳这里

看看网上说rabbitmq效率多高多高,但是怎么测试也只有15000Qps,还是用golang的客户端来测试访问的中间没有任何处理逻辑,悲催的是java原生客户端只能跑到11000Qps,(好吧,我承认我也黑了java),看似基本上这个问题也不大,但是我相信优化的空间应该还是蛮大的,所以做了一下测试,先上代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.enniu.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by coffee on 15/10/28.
*/
public class Main {
static final String exchangeName = "testblock";
static final String routingKey = "testblock";
static final String queueName = "testblock";
private static int producterConnection_size = 1; //消息生产者连接数
private static int consumerConnection_size = 1; //消费者连接数
private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
private static int qos = 1; //Qos设置
private static long sleep_time = 0; //模拟每条消息的处理时间
private static boolean autoAck = true; //是否默认Ack
private static Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws Exception {
final AtomicLong count = new AtomicLong(10000000000L);
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("test");
factory.setPassword("test");
factory.setVirtualHost("/test");
factory.setHost("10.0.88.88");
factory.setPort(5672);
//启动监控程序
Thread t = new Thread(new Runnable() {
@Override
public void run() {
long c = count.get();
while (c != 0){
try{
Thread.sleep(1000);
long c1 = count.get();
logger.debug("每秒消费为:{}Qps",c-c1);
c=c1;
}catch (Exception e){
}
}
}
});
t.start();
//启动
for (int i=0;i<producterConnection_size;i++){
Connection conn1 = factory.newConnection();
Thread t1 = producter(conn1, count.get());
t1.start();
}
//启动consumer
for (int i=0;i<consumerConnection_size;i++){
Connection conn1 = factory.newConnection();
Thread t2 = consumer(conn1, count);
t2.start();
}
}
public static Thread consumer(final Connection conn, final AtomicLong count) throws Exception {
return new Thread(new Runnable() {
@Override
public void run() {
logger.debug("start consumer");
try {
final CountDownLatch cdl = new CountDownLatch(1000);
for(int i = 0;i<consumer_size;i++) {
final Channel channel = conn.createChannel();
channel.basicQos(0, qos, false);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (count.decrementAndGet() == 0) {
channel.basicCancel(consumerTag);
cdl.countDown();
try {
channel.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(sleep_time);
} catch (InterruptedException e) {
}
if (!autoAck){
getChannel().basicAck(envelope.getDeliveryTag(), true);
}
}
};
String consumerTag = channel.basicConsume(queueName,autoAck, "testConsumer" + i, consumer);
logger.debug("consumerTag is {}", consumerTag);
}
cdl.await();
} catch (Exception e) {
}
}
});
}
public static Thread producter(final Connection conn, final long count) throws Exception {
return new Thread(new Runnable() {
@Override
public void run() {
logger.debug("start send Message");
try {
Channel channel = conn.createChannel();
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
BasicProperties properties = new BasicProperties.Builder().deliveryMode(2).build();
for (long i = 0; i < count; i++) {
byte[] messageBodyBytes = ("{\"merchantsId\":13}").getBytes();
channel.basicPublish(exchangeName, routingKey, properties, messageBodyBytes);
// logger.debug("add message {}",i);
}
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}

说说我的测试环境,两台vm安装rabbitmq做集群,配置是CPU:2core,mem:2GB,disk:187G sata
客户端是我的macpro,雷电口转千兆网卡接入测试网络.但是据说中间有一个交换机是百兆的(哭死),所以只能按照百兆连接来计算.

测试一:

测试内容:

启动一个生产者,无消费者

配置:

1
2
3
4
5
6
private static int producterConnection_size = 1; //消息生产者连接数
private static int consumerConnection_size = 0; //消费者连接数
private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
private static int qos = 1; //Qos设置
private static long sleep_time = 0; //模拟每条消息的处理时间
private static boolean autoAck = true; //是否默认Ack

结果:

自己看图,基本上不用我解释

测试二:

测试内容:

启动三个生产者,无消费者

配置:

1
2
3
4
5
6
private static int producterConnection_size = 3; //消息生产者连接数
private static int consumerConnection_size = 0; //消费者连接数
private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
private static int qos = 1; //Qos设置
private static long sleep_time = 0; //模拟每条消息的处理时间
private static boolean autoAck = true; //是否默认Ack

结果:

另外我还发现mq已经启动了流控,看来他已经认为我发送过快.


由此可见生产者的个数对整体的消息publish没有太大的影响,至少在单机的情况下是这样,而且publish的时候会一直抖动,不会一直保持在一个范围内,应该是流控机制引起的,具体怎么优化流控还请各位大牛指教,我暂时还没有什么头绪.


测试三:

测试内容:

启动无生产者,一个消费者,Qos为0.默认Ack,接收到消息马上返回,不休眠

配置:

1
2
3
4
5
6
private static int producterConnection_size = 0; //消息生产者连接数
private static int consumerConnection_size = 1; //消费者连接数
private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
private static int qos = 0; //Qos设置
private static long sleep_time = 0; //模拟每条消息的处理时间
private static boolean autoAck = true; //是否默认Ack

结果:

测试四:

测试内容:

启动无产者,一个消费者,Qos为1.默认Ack,接收到消息马上返回,不休眠

配置:

1
2
3
4
5
6
private static int producterConnection_size = 0; //消息生产者连接数
private static int consumerConnection_size = 1; //消费者连接数
private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
private static int qos = 1; //Qos设置
private static long sleep_time = 0; //模拟每条消息的处理时间
private static boolean autoAck = true; //是否默认Ack

结果:

貌似这里没有什么差别,消费能力都是在10k/s左右,看来Qos设置为1还是不设置并不影响消费能力,可以吧Qos加大看看效果.

测试五:

测试内容:

启动无产者,一个消费者,Qos为10.默认Ack,接收到消息马上返回,不模拟业务处理时间

配置:

1
2
3
4
5
6
private static int producterConnection_size = 0; //消息生产者连接数
private static int consumerConnection_size = 1; //消费者连接数
private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
private static int qos = 10; //Qos设置
private static long sleep_time = 0; //模拟每条消息的处理时间
private static boolean autoAck = true; //是否默认Ack

结果:

这里Qos的增加并没有对消费效率产生影响,其实这是说得通的,Qos本质是控制consumer处理消息的缓存大小,

==但是如果在网络比较差得情况下Qos=1和Qos=10对消费会有很大的差异==.
例如,消息从mq传递到consumer需要50ms,处理只需要5ms的时候,如果Qos=1,那么就必须等到这条消息消费完了再分配下一条消息,这样一条消息处理的整体时间是50*2+5=105ms,但是如果这时Qos=10的话,相当于当一条消息到达之后不用等消息处理完,可以就再分配下一条消息,这样基本上保证时时刻刻都有消息都消费,不需要等待网络传输的时间当缓存消息达到10条的时候正好可以吧传输的50ms冲抵掉.

今天最最后一个测试,就是publish对consumer的影响到底多大.

测试六:

测试内容:

启动一个产者,一个消费者,Qos为10.默认Ack,接收到消息马上返回,不模拟业务处理时间

配置:

1
2
3
4
5
6
private static int producterConnection_size = 1; //消息生产者连接数
private static int consumerConnection_size = 1; //消费者连接数
private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
private static int qos = 10; //Qos设置
private static long sleep_time = 0; //模拟每条消息的处理时间
private static boolean autoAck = true; //是否默认Ack

结果:

当然已经触发了流控的,如下图:

流控机制可以自行google,这里不做描述

看来consumer与publish同时工作的话还是有影响的,这种影响到底有多大,因素有哪些,就这一个测试当然不能说明,由于今天时间比较晚了,明天继续.

坚持原创技术分享,您的支持将鼓励我继续创作!