Performance Tuning
Optimize RocketMQ-Rust for maximum throughput and minimal latency.
Broker Tuning
Thread Pool Configuration
# For high-throughput scenarios
sendMessageThreadPoolNums = 32
pullMessageThreadPoolNums = 32
# For low-latency scenarios
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
Flush Strategy
# Maximum performance (may lose data on failure)
flushDiskType = ASYNC_FLUSH
flushCommitLogLeastPages = 0
# Balanced performance
flushDiskType = ASYNC_FLUSH
flushCommitLogLeastPages = 4
# Maximum reliability
flushDiskType = SYNC_FLUSH
flushCommitLogLeastPages = 0
Memory Configuration
# Increase OS page cache (Linux)
# sysctl -w vm.dirty_bytes=4194304
# sysctl -w vm.dirty_background_bytes=2097152
Producer Tuning
Batch Size
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
// Increase batch size for higher throughput
let mut producer = DefaultMQProducer::builder()
.producer_group("perf_group")
.name_server_addr("localhost:9876")
.max_message_size(4 * 1024 * 1024) // 4MB
.build();
// Send multiple messages
let messages: Vec<Message> = /* ... */;
producer.send_batch(messages).await?;
Compression
// Enable compression for large messages
producer.set_compress_msg_body_over_howmuch(4 * 1024);
Connection Pool
// Tune producer-side request and async backpressure controls
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
let mut producer = DefaultMQProducer::builder()
.producer_group("perf_group")
.name_server_addr("localhost:9876")
.send_msg_max_timeout_per_request(5_000)
.enable_backpressure_for_async_mode(true)
.back_pressure_for_async_send_num(10_000)
.back_pressure_for_async_send_size(64 * 1024 * 1024)
.build();
Consumer Tuning
Thread Pool
use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
// For CPU-intensive processing
let mut cpu_consumer = DefaultMQPushConsumer::builder()
.consumer_group("cpu_group")
.name_server_addr("localhost:9876")
.consume_thread_min(num_cpus::get() as u32)
.consume_thread_max((num_cpus::get() as u32) * 2)
.build();
// For I/O-intensive processing
let mut io_consumer = DefaultMQPushConsumer::builder()
.consumer_group("io_group")
.name_server_addr("localhost:9876")
.consume_thread_min((num_cpus::get() as u32) * 2)
.consume_thread_max((num_cpus::get() as u32) * 4)
.build();
Pull Batch Size
// Increase for higher throughput
cpu_consumer.set_pull_batch_size(64);
cpu_consumer.set_pull_interval(0);
Process Queue
// Limit memory usage
cpu_consumer.set_pull_threshold_for_topic(10000);
cpu_consumer.set_pull_threshold_for_queue(1000);
Network Tuning
TCP Buffer Sizes
# broker.conf
clientSocketRcvBufSize = 262144
clientSocketSndBufSize = 262144
Kernel Parameters
# Linux kernel optimization
sysctl -w net.core.rmem_max=134217728
sysctl -w net.core.wmem_max=134217728
sysctl -w net.ipv4.tcp_rmem="4096 87380 67108864"
sysctl -w net.ipv4.tcp_wmem="4096 65536 67108864"
JVM Tuning (for Java brokers)
# Heap size
JAVA_OPT="${JAVA_OPT} -Xms8g -Xmx8g"
# GC settings
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=200"
# Metaspace size
JAVA_OPT="${JAVA_OPT} -XX:MetaspaceSize=128m"
JAVA_OPT="${JAVA_OPT} -XX:MaxMetaspaceSize=256m"
Disk I/O Tuning
Use SSDs
Commit logs on SSDs provide 5-10x better performance:
Sequential Write (HDD): ~100 MB/s
Sequential Write (SSD): ~500 MB/s
Random Read (HDD): ~1 MB/s
Random Read (SSD): ~200 MB/s
Separate Storage
# Commit log on fast disk
storePathCommitLog = /fast_disk/commitlog
# Consume queue on separate disk
storePathConsumeQueue = /separate_disk/consumequeue
Filesystem
# Use XFS or ext4
mkfs.xfs /dev/sdb
mount -t xfs /dev/sdb /data/rocketmq
# Mount options
mount -t xfs -o noatime,nodiratime /dev/sdb /data/rocketmq
Monitoring
Key Metrics
- Send TPS: Messages sent per second
- Consume TPS: Messages consumed per second
- Send Latency: Time to send message
- Consume Lag: Messages waiting to be consumed
- Disk Usage: Commit log and consume queue size
- CPU Usage: Broker and client CPU utilization
- Memory Usage: JVM and OS memory usage
Monitoring Tools
use std::time::Instant;
let start = Instant::now();
let result = producer.send(message).await?;
let elapsed = start.elapsed();
println!("Send result: {:?}", result);
println!("Latency: {} ms", elapsed.as_millis());
Performance Benchmarks
Expected Performance
| Scenario | Expected Throughput | Expected Latency |
|---|---|---|
| Single Producer | 50K-100K msg/s | < 5ms |
| Multiple Producers | 500K+ msg/s | < 10ms |
| Single Consumer | 50K-100K msg/s | < 10ms |
| Multiple Consumers | 500K+ msg/s | < 20ms |
Benchmark Script
use std::time::Instant;
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;
async fn benchmark_producer(
producer: &mut DefaultMQProducer,
num_messages: usize,
) -> rocketmq_error::RocketMQResult<()> {
let start = Instant::now();
for i in 0..num_messages {
let message = Message::builder()
.topic("BenchmarkTopic")
.body(format!("Message {}", i))
.build()?;
producer.send(message).await?;
}
let elapsed = start.elapsed();
let tps = num_messages as f64 / elapsed.as_secs_f64();
let avg_latency = elapsed.as_millis() as f64 / num_messages as f64;
println!("Sent {} messages in {:.2}s", num_messages, elapsed.as_secs_f64());
println!("TPS: {:.2}", tps);
println!("Avg Latency: {:.2} ms", avg_latency);
Ok(())
}
Best Practices
- Use appropriate hardware: SSDs for commit logs
- Tune thread pools: Match to your workload
- Optimize network settings: Increase buffer sizes
- Monitor metrics: Track performance continuously
- Use compression: For larger messages
- Batch when possible: Reduce round trips
- Test before production: Benchmark your workload
Troubleshooting
Low Throughput
- Check disk I/O performance
- Increase thread pool sizes
- Reduce flush frequency
- Check network bandwidth
High Latency
- Check CPU usage
- Reduce flush frequency
- Optimize message processing
- Check GC pauses (for Java brokers)
Memory Issues
- Limit process queue size
- Increase JVM heap (if applicable)
- Check for memory leaks
- Monitor OS page cache
Next Steps
- Broker Configuration - Broker settings
- Client Configuration - Client settings
- Common Issues - Common issues