Consumer Overview
Consumers receive and process messages from RocketMQ brokers. RocketMQ-Rust provides both push and pull consumer implementations.
Consumer Types
Push Consumer
Messages are automatically pushed from the broker to the consumer:
use rocketmq::consumer::PushConsumer;
let consumer = PushConsumer::new(consumer_option);
consumer.subscribe("TopicTest", "*").await?;
consumer.start().await?;
Benefits:
- Event-driven architecture
- Automatic message pulling
- Built-in thread pool for concurrent processing
- Automatic offset management
Pull Consumer
Consumer actively pulls messages from the broker:
use rocketmq::consumer::PullConsumer;
let consumer = PullConsumer::new(consumer_option);
consumer.start().await?;
loop {
let messages = consumer.pull("TopicTest", "*", 32).await?;
for msg in messages {
process_message(msg);
}
}
Benefits:
- Full control over message pulling
- Custom batch size
- Explicit control over processing flow
- Suitable for batch processing
Creating a Push Consumer
use rocketmq::consumer::PushConsumer;
use rocketmq::conf::ConsumerOption;
use rocketmq::listener::MessageListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure consumer
let mut consumer_option = ConsumerOption::default();
consumer_option.set_name_server_addr("localhost:9876");
consumer_option.set_group_name("my_consumer_group");
consumer_option.set_consume_thread_min(1);
consumer_option.set_consume_thread_max(10);
// Create consumer
let consumer = PushConsumer::new(consumer_option);
// Subscribe to topic
consumer.subscribe("TopicTest", "*").await?;
// Register message listener
consumer.register_message_listener(Box::new(MyListener));
// Start consumer
consumer.start().await?;
// Keep running
tokio::signal::ctrl_c().await?;
Ok(())
}
Message Listener
Implement the MessageListener trait to handle messages:
use rocketmq::listener::MessageListener;
use rocketmq::error::ConsumeResult;
struct MyListener;
impl MessageListener for MyListener {
fn consume_message(
&self,
messages: Vec<rocketmq::model::MessageExt>,
) -> ConsumeResult {
for msg in messages {
match process_message(&msg) {
Ok(_) => continue,
Err(e) => {
eprintln!("Failed to process message: {:?}", e);
return ConsumeResult::SuspendCurrentQueueAMoment;
}
}
}
ConsumeResult::Success
}
}
fn process_message(msg: &MessageExt) -> Result<(), Error> {
println!("Received message: {}", String::from_utf8_lossy(msg.get_body()));
Ok(())
}
Consumer Configuration
Basic Configuration
let mut consumer_option = ConsumerOption::default();
// Required
consumer_option.set_name_server_addr("localhost:9876");
consumer_option.set_group_name("my_consumer_group");
// Thread pool
consumer_option.set_consume_thread_min(2);
consumer_option.set_consume_thread_max(10);
// Message batch size
consumer_option.set_pull_batch_size(32);
consumer_option.set_pull_interval(0);
Advanced Configuration
// Offset management
consumer_option.set_enable_msg_trace(true);
consumer_option.set_consume_from_where(ConsumeFromWhere::ConsumeFromLastOffset);
// Retry settings
consumer_option.set_max_reconsume_times(3);
// Message model
consumer_option.set_message_model(MessageModel::Clustering);
Message Consumption Models
Clustering (Default)
Messages are distributed among consumers in a group:
consumer_option.set_message_model(MessageModel::Clustering);
Each message is consumed by only one consumer.
Broadcasting
Each consumer receives all messages:
consumer_option.set_message_model(MessageModel::Broadcasting);
Every consumer in the group receives all messages.
Message Filtering
Tag-based Filtering
// Subscribe to specific tags
consumer.subscribe("OrderEvents", "order_created || order_paid").await?;
// Subscribe to all tags
consumer.subscribe("OrderEvents", "*").await?;
// Exclude specific tag
consumer.subscribe("OrderEvents", "!(order_cancelled)").await?;
SQL92 Filtering
// Subscribe using SQL expression
consumer.subscribe(
"OrderEvents",
"region = 'us-west' AND amount > 100"
).await?;
Offset Management
Starting Position
// Start from latest offset
consumer_option.set_consume_from_where(ConsumeFromWhere::ConsumeFromLastOffset);
// Start from earliest offset
consumer_option.set_consume_from_where(ConsumeFromWhere::ConsumeFromFirstOffset);
// Start from specific timestamp
consumer_option.set_consume_from_where(ConsumeFromWhere::ConsumeFromTimestamp);
consumer_option.set_consume_timestamp("20230101000000");
Manual Offset Commit
// Disable auto commit
consumer_option.set_enable_auto_commit(false);
// Process messages
for msg in messages {
process_message(msg);
consumer.commit_sync(msg.get_queue_offset(), msg.get_queue_id())?;
}
Error Handling
Consume Results
impl MessageListener for MyListener {
fn consume_message(&self, messages: Vec<MessageExt>) -> ConsumeResult {
for msg in messages {
match process_message(&msg) {
Ok(_) => continue,
Err(e) => {
eprintln!("Error: {:?}", e);
return ConsumeResult::ReconsumeLater;
}
}
}
ConsumeResult::Success
}
}
Retry Handling
// Configure max retry attempts
consumer_option.set_max_reconsume_times(3);
// Check retry count
impl MessageListener for MyListener {
fn consume_message(&self, messages: Vec<MessageExt>) -> ConsumeResult {
for msg in messages {
let retry_count = msg.get_reconsume_times();
if retry_count >= 3 {
eprintln!("Max retries exceeded for message: {:?}", msg.get_msg_id());
// Send to dead letter queue or log
continue;
}
// Process message
}
ConsumeResult::Success
}
}
Performance Tuning
Thread Pool Configuration
// Minimum threads (always active)
consumer_option.set_consume_thread_min(2);
// Maximum threads (scale up under load)
consumer_option.set_consume_thread_max(20);
// Process queue size
consumer_option.set_pull_threshold_for_all(10000);
Batch Processing
// Increase pull batch size
consumer_option.set_pull_batch_size(64);
// Reduce pull interval
consumer_option.set_pull_interval(0);
Concurrency Control
// Limit messages per queue
consumer_option.set_pull_threshold_for_queue(1000);
// Limit messages per consumer
consumer_option.set_pull_threshold_for_all(10000);
Best Practices
- Use appropriate consumer model: Choose clustering vs broadcasting based on requirements
- Handle errors gracefully: Return appropriate consume results
- Implement idempotency: Handle duplicate message processing
- Configure thread pool: Balance between throughput and resource usage
- Monitor consumer lag: Track message consumption backlog
- Use filtering: Reduce unnecessary message processing
- Set appropriate retry limits: Prevent infinite retry loops
- Implement dead letter queue: Handle messages that fail after max retries
Next Steps
- Push Consumer - Deep dive into push consumer
- Pull Consumer - Deep dive into pull consumer
- Message Filtering - Advanced filtering techniques