Push Consumer
Push Consumer provides an event-driven approach to message consumption where messages are automatically delivered from the broker.
Creating a Push Consumer
use rocketmq::consumer::PushConsumer;
use rocketmq::conf::ConsumerOption;
let mut consumer_option = ConsumerOption::default();
consumer_option.set_name_server_addr("localhost:9876");
consumer_option.set_group_name("my_consumer_group");
let consumer = PushConsumer::new(consumer_option);
consumer.subscribe("TopicTest", "*").await?;
consumer.start().await?;
Message Listeners
Sequential Message Listener
Messages are processed sequentially within a queue:
use rocketmq::listener::MessageListenerConcurrently;
struct MyListener;
impl MessageListenerConcurrently for MyListener {
fn consume_message(
&self,
messages: Vec<MessageExt>,
) -> ConsumeResult {
for msg in messages {
println!("Processing: {:?}", msg.get_msg_id());
// Process message
}
ConsumeResult::Success
}
}
consumer.register_message_listener(Box::new(MyListener));
Ordered Message Listener
Maintain strict ordering within a queue:
use rocketmq::listener::MessageListenerOrderly;
struct OrderListener;
impl MessageListenerOrderly for OrderListener {
fn consume_message(
&self,
messages: Vec<MessageExt>,
) -> ConsumeResult {
// Messages are processed one by one in order
for msg in messages {
process_in_order(msg);
}
ConsumeResult::Success
}
}
consumer.register_message_listener(Box::new(OrderListener));
Subscription Patterns
Single Topic
consumer.subscribe("TopicTest", "*").await?;
Multiple Topics
consumer.subscribe("TopicA", "*").await?;
consumer.subscribe("TopicB", "tag1 || tag2").await?;
consumer.subscribe("TopicC", "region = 'us-west'").await?;
Tag Filtering
// Subscribe to specific tags
consumer.subscribe("OrderEvents", "order_created || order_paid").await?;
// Subscribe to all tags
consumer.subscribe("OrderEvents", "*").await?;
// Exclude tags
consumer.subscribe("OrderEvents", "!(order_cancelled)").await?;
Concurrency Configuration
// Thread pool configuration
consumer_option.set_consume_thread_min(2);
consumer_option.set_consume_thread_max(20);
// Process queue size
consumer_option.set_process_queue_size(64);
// Pull batch size
consumer_option.set_pull_batch_size(32);
// Pull interval (milliseconds)
consumer_option.set_pull_interval(0);
Advanced Features
Pause and Resume
// Pause consumption
consumer.suspend();
// Resume consumption
consumer.resume();
Message Selectors
// Use message selector to filter at broker side
use rocketmq::filter::MessageSelector;
let selector = MessageSelector::by_sql("amount > 100 AND region = 'us-west'");
consumer.subscribe_with_selector("OrderEvents", selector).await?;
Offset Management
// Set starting position
consumer_option.set_consume_from_where(ConsumeFromWhere::ConsumeFromLastOffset);
// Reset offset to specific timestamp
consumer.seek_by_timestamp("TopicTest", 1699200000000).await?;
// Reset offset to specific offset
consumer.seek_to_offset("TopicTest", 0, 100).await?;
Best Practices
- Use appropriate thread pool size: Match to your message processing complexity
- Handle exceptions: Return appropriate consume results
- Monitor consumer lag: Track how far behind you are
- Implement idempotency: Handle duplicate message processing
- Use appropriate subscription filters: Reduce unnecessary message delivery
Next Steps
- Pull Consumer - Learn about pull consumer
- Message Filtering - Advanced filtering techniques
- Configuration - Consumer configuration options