Producer Overview
Producers are responsible for sending messages to RocketMQ brokers. RocketMQ-Rust provides a powerful, async producer implementation with advanced features.
Creating a Producer
use rocketmq::producer::Producer;
use rocketmq::conf::ProducerOption;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create producer configuration
let mut producer_option = ProducerOption::default();
producer_option.set_name_server_addr("localhost:9876");
producer_option.set_group_name("my_producer_group");
// Create and start producer
let producer = Producer::new(producer_option);
producer.start().await?;
// Use producer...
Ok(())
}
Producer Configuration
Basic Configuration
let mut producer_option = ProducerOption::default();
// Required
producer_option.set_name_server_addr("localhost:9876");
producer_option.set_group_name("producer_group");
// Optional
producer_option.set_send_msg_timeout(3000); // milliseconds
producer_option.set_retry_times_when_send_failed(2);
producer_option.set_max_message_size(4 * 1024 * 1024); // 4MB
producer_option.set_compress_msg_body_over_threshold(4 * 1024); // 4KB
Advanced Configuration
// Enable message compression
producer_option.set_compress_msg_body_over_threshold(4 * 1024);
// Set retry policy
producer_option.set_retry_times_when_send_failed(3);
producer_option.set_retry_next_server(true);
// Configure TCP settings
producer_option.set_tcp_transport_try_lock_timeout(1000);
producer_option.set_tcp_transport_connect_timeout(3000);
Message Sending Modes
Synchronous Send
Blocks until the broker acknowledges:
use rocketmq::model::Message;
let message = Message::new("TopicTest".to_string(), b"Hello".to_vec());
let result = producer.send(message).await?;
println!("Send result: {:?}", result);
Asynchronous Send
Non-blocking, returns immediately:
use rocketmq::model::Message;
let message = Message::new("TopicTest".to_string(), b"Hello".to_vec());
producer.send_async(message, |result| {
match result {
Ok(send_result) => println!("Message sent: {:?}", send_result),
Err(e) => println!("Send failed: {:?}", e),
}
}).await?;
One-way Send
Fire-and-forget, no result:
let message = Message::new("TopicTest".to_string(), b"Hello".to_vec());
producer.send_oneway(message).await?;
// No result, best effort delivery
Queue Selection
Automatic Load Balancing
By default, messages are distributed across queues:
Round-robin queue selection:
Message 1 → Queue 0
Message 2 → Queue 1
Message 3 → Queue 2
Message 4 → Queue 3
Message 5 → Queue 0
...
Custom Queue Selector
Route specific messages to specific queues using a closure:
use rocketmq::common::message::message_queue::MessageQueue;
// Send with selector closure
let order_id = 12345i64;
producer.send_with_selector(
message,
|queues, _msg, order_id: &i64| {
// Route by order_id to maintain ordering
let index = (*order_id % queues.len() as i64) as usize;
queues.get(index).cloned()
},
order_id,
).await?;
You can also create reusable selector functions:
// Define a reusable selector function
fn order_hash_selector(
queues: &[MessageQueue],
_msg: &dyn MessageTrait,
order_id: &i64,
) -> Option<MessageQueue> {
let index = (*order_id % queues.len() as i64) as usize;
queues.get(index).cloned()
}
// Use the selector function
let order_id = 12345i64;
producer.send_with_selector(message, order_hash_selector, order_id).await?;
Error Handling
Automatic Retry
RocketMQ-Rust automatically retries failed sends:
// Configure retry behavior
producer_option.set_retry_times_when_send_failed(3);
producer_option.set_retry_next_server(true);
// Automatic retry on failure
match producer.send(message).await {
Ok(result) => println!("Sent: {:?}", result),
Err(e) => {
// Only returned after all retries exhausted
eprintln!("Failed after retries: {:?}", e);
}
}
Handling Errors
use rocketmq::error::MQClientError;
match producer.send(message).await {
Ok(result) => {
println!("Message sent successfully");
println!("Message ID: {}", result.msg_id);
println!("Queue: {}", result.message_queue);
}
Err(MQClientError::BrokerNotFound) => {
eprintln!("No broker available for topic");
}
Err(MQClientError::ServiceNotAvailable) => {
eprintln!("Broker service not available");
}
Err(e) => {
eprintln!("Send failed: {:?}", e);
}
}
Performance Tuning
Batch Sending
Send multiple messages in one request:
let messages = vec![
Message::new("TopicTest".to_string(), b"Msg1".to_vec()),
Message::new("TopicTest".to_string(), b"Msg2".to_vec()),
Message::new("TopicTest".to_string(), b"Msg3".to_vec()),
];
producer.send_batch(messages).await?;
Message Compression
Automatically compress large messages:
// Compress messages larger than 4KB
producer_option.set_compress_msg_body_over_threshold(4 * 1024);
// Compression is automatic for large messages
let large_body = vec![0u8; 100 * 1024]; // 100KB
let message = Message::new("TopicTest".to_string(), large_body);
producer.send(message).await?; // Compressed automatically
Connection Pooling
// Configure connection pool
producer_option.set_client_channel_max_idle_time_seconds(120);
producer_option.set_client_channel_expire_timeout_seconds(180);
Monitoring
Send Statistics
// Producer tracks statistics automatically
let stats = producer.get_stats()?;
println!("Messages sent: {}", stats.send_success_count);
println!("Messages failed: {}", stats.send_failure_count);
println!("Average RT: {} ms", stats.average_rt);
Metrics
RocketMQ-Rust exposes metrics for monitoring:
rocketmq_producer_send_success_total: Total successful sendsrocketmq_producer_send_failure_total: Total failed sendsrocketmq_producer_send_latency_ms: Send latency in millisecondsrocketmq_producer_queue_size: Current queue size
Best Practices
- Use producer groups: Group related producers
- Handle errors gracefully: Implement proper error handling
- Monitor performance: Track send success rate and latency
- Use appropriate send mode: Choose sync/async/one-way based on needs
- Implement idempotency: Handle potential duplicate sends
- Configure retries: Set appropriate retry count and timeout
- Use batching: Batch messages for higher throughput
- Set appropriate timeouts: Balance between reliability and performance
Next Steps
- Sending Messages - Advanced message sending techniques
- Transaction Messages - Implement transactional messaging
- Configuration - Detailed configuration options