Producer Overview
RocketMQ-Rust producers are built with DefaultMQProducer and the MQProducer trait. They support synchronous, asynchronous, one-way, selector-based, batch, and transactional send patterns.
Creating a Producer
use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_error::RocketMQResult;
#[tokio::main]
async fn main() -> RocketMQResult<()> {
let mut producer = DefaultMQProducer::builder()
.producer_group("my_producer_group")
.name_server_addr("localhost:9876")
.build();
producer.start().await?;
// Use producer...
producer.shutdown().await;
Ok(())
}
Producer Configuration
Basic Configuration
let mut producer = DefaultMQProducer::builder()
.producer_group("producer_group")
.name_server_addr("localhost:9876")
.send_msg_timeout(3_000)
.retry_times_when_send_failed(2)
.max_message_size(4 * 1024 * 1024)
.compress_msg_body_over_howmuch(4 * 1024)
.build();
Advanced Configuration
let mut producer = DefaultMQProducer::builder()
.producer_group("producer_group")
.name_server_addr("localhost:9876")
.retry_times_when_send_failed(3)
.retry_times_when_send_async_failed(3)
.retry_another_broker_when_not_store_ok(true)
.send_msg_max_timeout_per_request(5_000)
.batch_max_delay_ms(10)
.batch_max_bytes(512 * 1024)
.total_batch_max_bytes(4 * 1024 * 1024)
.enable_backpressure_for_async_mode(true)
.back_pressure_for_async_send_num(10_000)
.back_pressure_for_async_send_size(64 * 1024 * 1024)
.build();
Message Sending Modes
Synchronous Send
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;
let message = Message::builder()
.topic("TopicTest")
.tags("TagA")
.body("Hello")
.build()?;
let result = producer.send(message).await?;
println!("Send result: {:?}", result);
Asynchronous Send
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;
let message = Message::builder()
.topic("TopicTest")
.body("Hello")
.build()?;
producer
.send_with_callback(message, |result, error| {
if let Some(send_result) = result {
println!("Message sent: {:?}", send_result);
}
if let Some(err) = error {
eprintln!("Send failed: {}", err);
}
})
.await?;
One-way Send
let message = Message::builder().topic("TopicTest").body("Fire and forget").build()?;
producer.send_oneway(message).await?;
Queue Selection
Use selector functions to route related messages to the same queue.
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
let message = Message::builder().topic("OrderEvents").body("order-123").build()?;
let order_id = 123_i64;
producer
.send_with_selector(
message,
|queues: &[MessageQueue], _msg: &Message, id: &i64| {
let index = (*id % queues.len() as i64) as usize;
queues.get(index).cloned()
},
order_id,
)
.await?;
Error Handling
match producer.send(message).await {
Ok(result) => println!("Sent: {:?}", result),
Err(e) => {
// Returned after retries are exhausted.
eprintln!("Send failed: {}", e);
}
}
Performance Tuning
Batch Sending
let messages = vec![
Message::builder().topic("TopicTest").body("Msg1").build()?,
Message::builder().topic("TopicTest").body("Msg2").build()?,
Message::builder().topic("TopicTest").body("Msg3").build()?,
];
producer.send_batch(messages).await?;
Compression
let mut producer = DefaultMQProducer::builder()
.producer_group("producer_group")
.name_server_addr("localhost:9876")
.compress_msg_body_over_howmuch(4 * 1024)
.build();
Monitoring
RocketMQ-Rust does not expose a single get_stats() facade on DefaultMQProducer. In production, use:
- Send results and callback outcomes.
- Structured logs for latency and failures.
- Application metrics (for example counters/histograms around
send*calls).
Best Practices
- Use dedicated producer groups per service boundary.
- Set retry and timeout values per business SLA.
- Use callback-based sending for high throughput paths.
- Keep message payloads small and compress large bodies.
- Use queue selectors for ordered business keys.
- Add idempotency on the consumer side for at-least-once delivery.
Next Steps
- Sending Messages - Advanced message sending techniques
- Transaction Messages - Implement transactional messaging
- Client Configuration - Detailed configuration options