Performance Tuning
Optimize RocketMQ-Rust for maximum throughput and minimal latency.
Broker Tuning
Thread Pool Configuration
# For high-throughput scenarios
sendMessageThreadPoolNums = 32
pullMessageThreadPoolNums = 32
# For low-latency scenarios
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
Flush Strategy
# Maximum performance (may lose data on failure)
flushDiskType = ASYNC_FLUSH
flushCommitLogLeastPages = 0
# Balanced performance
flushDiskType = ASYNC_FLUSH
flushCommitLogLeastPages = 4
# Maximum reliability
flushDiskType = SYNC_FLUSH
flushCommitLogLeastPages = 0
Memory Configuration
# Increase OS page cache (Linux)
# sysctl -w vm.dirty_bytes=4194304
# sysctl -w vm.dirty_background_bytes=2097152
Producer Tuning
Batch Size
// Increase batch size for higher throughput
producer_option.set_max_message_size(4 * 1024 * 1024); // 4MB
// Send multiple messages
let messages: Vec<Message> = /* ... */;
producer.send_batch(messages).await?;
Compression
// Enable compression for large messages
producer_option.set_compress_msg_body_over_threshold(4 * 1024);
Connection Pool
// Increase connection pool size
producer_option.set_client_channel_max_idle_time_seconds(300);
Consumer Tuning
Thread Pool
// For CPU-intensive processing
consumer_option.set_consume_thread_min(num_cpus::get() as i32);
consumer_option.set_consume_thread_max(num_cpus::get() as i32 * 2);
// For I/O-intensive processing
consumer_option.set_consume_thread_min(num_cpus::get() as i32 * 2);
consumer_option.set_consume_thread_max(num_cpus::get() as i32 * 4);
Pull Batch Size
// Increase for higher throughput
consumer_option.set_pull_batch_size(64);
consumer_option.set_pull_interval(0);
Process Queue
// Limit memory usage
consumer_option.set_pull_threshold_for_all(10000);
consumer_option.set_pull_threshold_for_queue(1000);
Network Tuning
TCP Buffer Sizes
# broker.conf
clientSocketRcvBufSize = 262144
clientSocketSndBufSize = 262144
Kernel Parameters
# Linux kernel optimization
sysctl -w net.core.rmem_max=134217728
sysctl -w net.core.wmem_max=134217728
sysctl -w net.ipv4.tcp_rmem="4096 87380 67108864"
sysctl -w net.ipv4.tcp_wmem="4096 65536 67108864"
JVM Tuning (for Java brokers)
# Heap size
JAVA_OPT="${JAVA_OPT} -Xms8g -Xmx8g"
# GC settings
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=200"
# Metaspace size
JAVA_OPT="${JAVA_OPT} -XX:MetaspaceSize=128m"
JAVA_OPT="${JAVA_OPT} -XX:MaxMetaspaceSize=256m"
Disk I/O Tuning
Use SSDs
Commit logs on SSDs provide 5-10x better performance:
Sequential Write (HDD): ~100 MB/s
Sequential Write (SSD): ~500 MB/s
Random Read (HDD): ~1 MB/s
Random Read (SSD): ~200 MB/s
Separate Storage
# Commit log on fast disk
storePathCommitLog = /fast_disk/commitlog
# Consume queue on separate disk
storePathConsumeQueue = /separate_disk/consumequeue
Filesystem
# Use XFS or ext4
mkfs.xfs /dev/sdb
mount -t xfs /dev/sdb /data/rocketmq
# Mount options
mount -t xfs -o noatime,nodiratime /dev/sdb /data/rocketmq
Monitoring
Key Metrics
- Send TPS: Messages sent per second
- Consume TPS: Messages consumed per second
- Send Latency: Time to send message
- Consume Lag: Messages waiting to be consumed
- Disk Usage: Commit log and consume queue size
- CPU Usage: Broker and client CPU utilization
- Memory Usage: JVM and OS memory usage
Monitoring Tools
// RocketMQ-Rust metrics
use rocketmq::metrics::Metrics;
let metrics = producer.get_metrics();
println!("Send TPS: {}", metrics.send_tps);
println!("Avg Latency: {} ms", metrics.avg_latency);
println!("Success Rate: {:.2}%", metrics.success_rate * 100.0);
Performance Benchmarks
Expected Performance
| Scenario | Expected Throughput | Expected Latency |
|---|---|---|
| Single Producer | 50K-100K msg/s | < 5ms |
| Multiple Producers | 500K+ msg/s | < 10ms |
| Single Consumer | 50K-100K msg/s | < 10ms |
| Multiple Consumers | 500K+ msg/s | < 20ms |
Benchmark Script
use std::time::Instant;
async fn benchmark_producer(producer: &Producer, num_messages: usize) -> Result<(), Error> {
let start = Instant::now();
for i in 0..num_messages {
let body = format!("Message {}", i).into_bytes();
let message = Message::new("BenchmarkTopic".to_string(), body);
producer.send(message).await?;
}
let elapsed = start.elapsed();
let tps = num_messages as f64 / elapsed.as_secs_f64();
let avg_latency = elapsed.as_millis() as f64 / num_messages as f64;
println!("Sent {} messages in {:.2}s", num_messages, elapsed.as_secs_f64());
println!("TPS: {:.2}", tps);
println!("Avg Latency: {:.2} ms", avg_latency);
Ok(())
}
Best Practices
- Use appropriate hardware: SSDs for commit logs
- Tune thread pools: Match to your workload
- Optimize network settings: Increase buffer sizes
- Monitor metrics: Track performance continuously
- Use compression: For larger messages
- Batch when possible: Reduce round trips
- Test before production: Benchmark your workload
Troubleshooting
Low Throughput
- Check disk I/O performance
- Increase thread pool sizes
- Reduce flush frequency
- Check network bandwidth
High Latency
- Check CPU usage
- Reduce flush frequency
- Optimize message processing
- Check GC pauses (for Java brokers)
Memory Issues
- Limit process queue size
- Increase JVM heap (if applicable)
- Check for memory leaks
- Monitor OS page cache
Next Steps
- Broker Configuration - Broker settings
- Client Configuration - Client settings
- FAQ - Common issues