Message Model
Understanding RocketMQ's message model is crucial for designing effective messaging applications.
Message Structure
Basic Message
A message in RocketMQ consists of:
pub struct Message {
// Topic name
topic: String,
// Message body (byte array)
body: Vec<u8>,
// Optional tags for filtering
tags: Option<String>,
// Optional keys for indexing
keys: Option<String>,
// Optional properties
properties: HashMap<String, String>,
}
Message Example
use rocketmq_common::common::message::message_single::Message;
let message = Message::builder()
.topic("OrderEvents")
.body_slice(b"{\"order_id\": \"12345\", \"amount\": 99.99}")
.tags("order_created")
.key("order_12345")
.raw_property("region", "us-west")?
.raw_property("priority", "high")?
.build()?;
Topics and Queues
Topic
A topic is a logical channel for categorizing messages:
- Hierarchical naming: e.g.,
orders,payments,logs - Multi-tenant: Different applications use different topics
- Logical isolation: Messages in different topics are completely separate
Queue
Topics are divided into multiple queues for parallel processing:
Topic: OrderEvents (4 queues)
┌───────────────────────────────────────┐
│ Queue 0 │ Queue 1 │ Queue 2 │ Queue 3 │
├───────────────────────────────────────┤
│ Msg 0 │ Msg 1 │ Msg 2 │ Msg 3 │
│ Msg 4 │ Msg 5 │ Msg 6 │ Msg 7 │
│ Msg 8 │ Msg 9 │ Msg 10 │ Msg 11 │
└───────────────────────────────────────┘
Purpose of Multiple Queues:
- Parallel processing by multiple consumers
- Load distribution
- Improved throughput
Message Types
Normal Messages
Regular messages with no special delivery guarantees:
let message = Message::builder()
.topic("NormalTopic")
.body(body)
.build()?;
producer.send(message).await?;
Ordered Messages
Messages that must be consumed in order within a queue:
let message = Message::builder()
.topic("OrderEvents")
.body("ordered payload")
.build()?;
let order_id = "order_123".to_string();
producer
.send_with_selector(
message,
|queues: &[MessageQueue], _msg: &Message, id: &String| {
let hash = compute_hash(id);
let index = (hash % queues.len() as u64) as usize;
queues.get(index).cloned()
},
order_id,
)
.await?;
Transactional Messages
Messages that are sent atomically with a database transaction:
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_client_rust::producer::transaction_mq_producer::TransactionMQProducer;
let mut transaction_producer = TransactionMQProducer::builder()
.producer_group("tx_group")
.name_server_addr("localhost:9876")
.topics(vec!["OrderEvents"])
.transaction_listener(OrderTransactionListener::default())
.build();
let tx_result = transaction_producer
.send_message_in_transaction(message, Some("order_123".to_string()))
.await?;
println!("tx_result = {}", tx_result);
Delayed Messages
Messages that are delivered after a specified delay:
let message = Message::builder()
.topic("DelayedTopic")
.body(body)
.delay_level(3) // Delay level 3 (e.g., 10 seconds)
.build()?;
producer.send(message).await?;
Message Filtering
Tag-based Filtering
Filter messages by tags at the broker side:
let message = Message::builder()
.topic("OrderEvents")
.body("payload")
.tags("order_paid")
.build()?;
// Consumer subscribes to specific tags
consumer.subscribe("OrderEvents", "order_paid || order_shipped").await?;
SQL92 Filtering
Advanced filtering using SQL92 syntax:
let message = Message::builder()
.topic("OrderEvents")
.body("payload")
.raw_property("region", "us-west")?
.raw_property("amount", "100")?
.build()?;
// Consumer uses SQL expression
consumer.subscribe("OrderEvents", "region = 'us-west' AND amount > 50").await?;
Message Properties
System Properties
RocketMQ automatically adds system properties to each message:
MSG_ID: Unique message IDTOPIC: Topic nameQUEUE_ID: Queue IDQUEUE_OFFSET: Message position in queueSTORE_SIZE: Message storage sizeBORN_TIMESTAMP: Message creation timestampSTORE_TIMESTAMP: Message storage timestamp
User Properties
You can add custom properties:
let message = Message::builder()
.topic("OrderEvents")
.body("payload")
.raw_property("source", "mobile_app")?
.raw_property("version", "2.1.0")?
.raw_property("user_id", "user_12345")?
.build()?;
Message Lifecycle
Send Flow
1. Create Message
2. Set topic, body, tags, keys, properties
3. Select queue (load balancing or custom)
4. Send to broker
5. Broker stores in CommitLog
6. Broker updates ConsumeQueue
7. Return result to producer
Consume Flow
1. Consumer pulls messages from queue
2. Deserialize message
3. Process message (user logic)
4. Acknowledge message
5. Update consumer offset
6. Continue to next batch
Message Persistence
RocketMQ provides highly reliable message persistence:
┌─────────────────────────────────────┐
│ CommitLog │
│ (Sequential storage of all msgs) │
├─────────────────────────────────────┤
│ [Msg 1][Msg 2][Msg 3][Msg 4]... │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ ConsumeQueue per Queue │
│ (Index structure for fast access) │
├─────────────────────────────────────┤
│ Queue 0: [Offset 0][Offset 8]... │
│ Queue 1: [Offset 16][Offset 24]... │
└─────────────────────────────────────┘
Best Practices
- Use meaningful topic names: Follow a clear naming convention
- Set appropriate tags: Use tags for message categorization
- Add message keys: Enable message tracing and querying
- Keep message size reasonable: Typically < 256KB
- Use properties for metadata: Don't encode metadata in message body
- Consider ordering requirements: Choose appropriate message type
- Handle idempotency: Design consumers to handle duplicate messages