Common Issues
Solutions to frequently encountered problems when using RocketMQ-Rust.
Connection Issues
Connection Refused
Problem: Connection refused error when connecting to broker.
Solutions:
-
Verify broker is running:
# Check broker status
ps aux | grep rocketmq
# Check if port is listening
netstat -an | grep 10911 -
Verify name server address:
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
let producer = DefaultMQProducer::builder()
.producer_group("check_group")
.name_server_addr("localhost:9876")
.build(); -
Check firewall settings:
# Allow ports 9876 and 10911
sudo ufw allow 9876
sudo ufw allow 10911
Cannot Find Brokers
Problem: Producer or consumer cannot discover brokers.
Solutions:
-
Verify name server connectivity:
# Test name server connection
telnet localhost 9876 -
Check broker registration:
# Use RocketMQ admin tools
sh mqadmin clusterList -n localhost:9876 -
Verify topic exists:
# List topics
sh mqadmin topicList -n localhost:9876
Message Issues
Message Not Received
Problem: Consumer not receiving messages.
Solutions:
-
Check subscription:
// Verify topic and subscription expression match
consumer.subscribe("TopicTest", "*").await?; -
Check consumer group:
// Ensure consumer group is correct
consumer.set_consumer_group("correct_consumer_group"); -
Check message model:
// Verify clustering vs broadcasting
consumer.set_message_model(MessageModel::Clustering); -
Check consumer position:
// May be consuming from last offset
consumer.set_consume_from_where(ConsumeFromWhere::ConsumeFromFirstOffset);
Duplicate Messages
Problem: Receiving duplicate messages.
Explanation: This is expected behavior. RocketMQ guarantees at-least-once delivery.
Solution: Implement idempotency:
use std::collections::HashSet;
struct IdempotentProcessor {
processed: HashSet<String>,
}
impl IdempotentProcessor {
fn process(&mut self, msg_id: &str) -> bool {
if self.processed.contains(msg_id) {
return false; // Already processed
}
self.processed.insert(msg_id.to_string());
true // Process new message
}
}
Performance Issues
Low Throughput
Problem: Send or consume rate is low.
Solutions:
-
Increase batch size:
producer.set_max_message_size(4 * 1024 * 1024);
consumer.set_pull_batch_size(64); -
Use compression:
producer.set_compress_msg_body_over_howmuch(4 * 1024); -
Optimize thread pools:
consumer.set_consume_thread_min(10);
consumer.set_consume_thread_max(20); -
Use async sending:
producer.send_with_callback(message, |result, error| {
if let Some(send_result) = result {
println!("Sent: {:?}", send_result);
}
if let Some(err) = error {
eprintln!("Send failed: {}", err);
}
}).await?;
High Latency
Problem: Messages take too long to be delivered.
Solutions:
-
Check broker flush settings:
# broker.conf
flushDiskType = ASYNC_FLUSH
flushCommitLogLeastPages = 4 -
Reduce message size:
// Compress large messages
producer.set_compress_msg_body_over_howmuch(4 * 1024); -
Check network latency:
# Test network latency
ping broker_hostname -
Monitor system resources:
# Check CPU usage
top
# Check disk I/O
iostat -x 1
Memory Issues
Out of Memory
Problem: Consumer crashes with out of memory error.
Solutions:
-
Limit process queue size:
consumer.set_pull_threshold_for_topic(10000);
consumer.set_pull_threshold_for_queue(1000); -
Reduce pull batch size:
consumer.set_pull_batch_size(32); -
Process messages faster:
// Optimize message processing logic
// Use async processing
Memory Leak
Problem: Memory usage keeps increasing.
Solutions:
-
Check for unbounded collections:
// Use bounded channels
let (tx, rx) = mpsc::channel(1000);
// Periodically clean up processed data
if processed.len() > 10000 {
processed.clear();
} -
Release message references:
// Don't hold onto messages after processing
consumer.register_message_listener_concurrently(|messages, _ctx| {
for msg in messages {
process_message(&msg);
// Don't store msg in long-lived structures
}
Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
});use rocketmq_client_rust::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
use rocketmq_client_rust::consumer::mq_push_consumer::MQPushConsumer;
use rocketmq_common::common::message::message_ext::MessageExt;
fn process_message(_msg: &&MessageExt) {
// business handling
}
Build Issues
Compilation Errors
Problem: cargo build fails.
Solutions:
-
Check Rust version:
# Requires Rust 1.70+
rustc --version -
Update dependencies:
cargo update -
Clean build:
cargo clean
cargo build
Link Errors
Problem: Link errors on Windows.
Solutions:
-
Install C++ build tools:
# Install Visual Studio Build Tools
# Or use: rustup component add llvm-tools-preview -
Use MSVC toolchain:
rustup default stable-x86_64-pc-windows-msvc
Troubleshooting Checklist
Before asking for help, check:
- Broker is running and accessible
- Name server is running and accessible
- Client configuration is correct
- Topic exists on broker
- Consumer group is correct
- Network connectivity is working
- Firewall rules are configured
- Sufficient disk space
- Sufficient memory
- Latest version of RocketMQ-Rust
Getting Help
If you're still stuck:
- Check GitHub Issues
- Read Architecture Documentation
- Ask on Stack Overflow
- Join the mailing list
Next Steps
- Performance FAQ - Performance-related issues
- Troubleshooting - Advanced debugging
- Broker Configuration - Configuration options