Skip to main content

Performance Tuning

Optimize RocketMQ-Rust for maximum throughput and minimal latency.

Broker Tuning

Thread Pool Configuration

# For high-throughput scenarios
sendMessageThreadPoolNums = 32
pullMessageThreadPoolNums = 32

# For low-latency scenarios
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16

Flush Strategy

# Maximum performance (may lose data on failure)
flushDiskType = ASYNC_FLUSH
flushCommitLogLeastPages = 0

# Balanced performance
flushDiskType = ASYNC_FLUSH
flushCommitLogLeastPages = 4

# Maximum reliability
flushDiskType = SYNC_FLUSH
flushCommitLogLeastPages = 0

Memory Configuration

# Increase OS page cache (Linux)
# sysctl -w vm.dirty_bytes=4194304
# sysctl -w vm.dirty_background_bytes=2097152

Producer Tuning

Batch Size

use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;

// Increase batch size for higher throughput
let mut producer = DefaultMQProducer::builder()
.producer_group("perf_group")
.name_server_addr("localhost:9876")
.max_message_size(4 * 1024 * 1024) // 4MB
.build();

// Send multiple messages
let messages: Vec<Message> = /* ... */;
producer.send_batch(messages).await?;

Compression

// Enable compression for large messages
producer.set_compress_msg_body_over_howmuch(4 * 1024);

Connection Pool

// Tune producer-side request and async backpressure controls
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;

let mut producer = DefaultMQProducer::builder()
.producer_group("perf_group")
.name_server_addr("localhost:9876")
.send_msg_max_timeout_per_request(5_000)
.enable_backpressure_for_async_mode(true)
.back_pressure_for_async_send_num(10_000)
.back_pressure_for_async_send_size(64 * 1024 * 1024)
.build();

Consumer Tuning

Thread Pool

use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;

// For CPU-intensive processing
let mut cpu_consumer = DefaultMQPushConsumer::builder()
.consumer_group("cpu_group")
.name_server_addr("localhost:9876")
.consume_thread_min(num_cpus::get() as u32)
.consume_thread_max((num_cpus::get() as u32) * 2)
.build();

// For I/O-intensive processing
let mut io_consumer = DefaultMQPushConsumer::builder()
.consumer_group("io_group")
.name_server_addr("localhost:9876")
.consume_thread_min((num_cpus::get() as u32) * 2)
.consume_thread_max((num_cpus::get() as u32) * 4)
.build();

Pull Batch Size

// Increase for higher throughput
cpu_consumer.set_pull_batch_size(64);
cpu_consumer.set_pull_interval(0);

Process Queue

// Limit memory usage
cpu_consumer.set_pull_threshold_for_topic(10000);
cpu_consumer.set_pull_threshold_for_queue(1000);

Network Tuning

TCP Buffer Sizes

# broker.conf
clientSocketRcvBufSize = 262144
clientSocketSndBufSize = 262144

Kernel Parameters

# Linux kernel optimization
sysctl -w net.core.rmem_max=134217728
sysctl -w net.core.wmem_max=134217728
sysctl -w net.ipv4.tcp_rmem="4096 87380 67108864"
sysctl -w net.ipv4.tcp_wmem="4096 65536 67108864"

JVM Tuning (for Java brokers)

# Heap size
JAVA_OPT="${JAVA_OPT} -Xms8g -Xmx8g"

# GC settings
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=200"

# Metaspace size
JAVA_OPT="${JAVA_OPT} -XX:MetaspaceSize=128m"
JAVA_OPT="${JAVA_OPT} -XX:MaxMetaspaceSize=256m"

Disk I/O Tuning

Use SSDs

Commit logs on SSDs provide 5-10x better performance:

Sequential Write (HDD):  ~100 MB/s
Sequential Write (SSD): ~500 MB/s

Random Read (HDD): ~1 MB/s
Random Read (SSD): ~200 MB/s

Separate Storage

# Commit log on fast disk
storePathCommitLog = /fast_disk/commitlog

# Consume queue on separate disk
storePathConsumeQueue = /separate_disk/consumequeue

Filesystem

# Use XFS or ext4
mkfs.xfs /dev/sdb
mount -t xfs /dev/sdb /data/rocketmq

# Mount options
mount -t xfs -o noatime,nodiratime /dev/sdb /data/rocketmq

Monitoring

Key Metrics

  • Send TPS: Messages sent per second
  • Consume TPS: Messages consumed per second
  • Send Latency: Time to send message
  • Consume Lag: Messages waiting to be consumed
  • Disk Usage: Commit log and consume queue size
  • CPU Usage: Broker and client CPU utilization
  • Memory Usage: JVM and OS memory usage

Monitoring Tools

use std::time::Instant;

let start = Instant::now();
let result = producer.send(message).await?;
let elapsed = start.elapsed();

println!("Send result: {:?}", result);
println!("Latency: {} ms", elapsed.as_millis());

Performance Benchmarks

Expected Performance

ScenarioExpected ThroughputExpected Latency
Single Producer50K-100K msg/s< 5ms
Multiple Producers500K+ msg/s< 10ms
Single Consumer50K-100K msg/s< 10ms
Multiple Consumers500K+ msg/s< 20ms

Benchmark Script

use std::time::Instant;
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;

async fn benchmark_producer(
producer: &mut DefaultMQProducer,
num_messages: usize,
) -> rocketmq_error::RocketMQResult<()> {
let start = Instant::now();

for i in 0..num_messages {
let message = Message::builder()
.topic("BenchmarkTopic")
.body(format!("Message {}", i))
.build()?;
producer.send(message).await?;
}

let elapsed = start.elapsed();
let tps = num_messages as f64 / elapsed.as_secs_f64();
let avg_latency = elapsed.as_millis() as f64 / num_messages as f64;

println!("Sent {} messages in {:.2}s", num_messages, elapsed.as_secs_f64());
println!("TPS: {:.2}", tps);
println!("Avg Latency: {:.2} ms", avg_latency);

Ok(())
}

Best Practices

  1. Use appropriate hardware: SSDs for commit logs
  2. Tune thread pools: Match to your workload
  3. Optimize network settings: Increase buffer sizes
  4. Monitor metrics: Track performance continuously
  5. Use compression: For larger messages
  6. Batch when possible: Reduce round trips
  7. Test before production: Benchmark your workload

Troubleshooting

Low Throughput

  • Check disk I/O performance
  • Increase thread pool sizes
  • Reduce flush frequency
  • Check network bandwidth

High Latency

  • Check CPU usage
  • Reduce flush frequency
  • Optimize message processing
  • Check GC pauses (for Java brokers)

Memory Issues

  • Limit process queue size
  • Increase JVM heap (if applicable)
  • Check for memory leaks
  • Monitor OS page cache

Next Steps