Storage
RocketMQ-Rust employs a high-performance storage mechanism designed for reliable message persistence and fast retrieval.
Storage Architecture
CommitLog
The CommitLog is the core storage file that stores all messages sequentially.
Characteristics
- Sequential writes: All messages are written in append-only mode
- Fixed size: Each CommitLog file has a fixed size (default 1GB)
- Rolling: When full, a new CommitLog file is created
- No deletes: Messages are deleted only after expiration
Structure
CommitLog File (1GB each)
┌────────────────────────────────────────────────────┐
│ [Message 1][Message 2][Message 3]...[Message N] │
│ ↑ │
│ Sequential append writes │
└────────────────────────────────────────────────────┘
File naming: 00000000000000000000, 00000000000000001000, ...
Message Format in CommitLog
pub struct CommitLogMessage {
// Total message size (4 bytes)
total_size: u32,
// Magic code (4 bytes) - for file integrity check
magic_code: u32,
// Message body CRC32 (4 bytes)
body_crc: u32,
// Queue ID (4 bytes)
queue_id: u32,
// Message flag (4 bytes)
flag: u32,
// Message properties
properties: ByteBuffer,
// Message body
body: ByteBuffer,
}
Sequential Write Performance
Sequential writes to CommitLog provide excellent performance:
Traditional random I/O: ~10,000 ops/sec
Sequential I/O (SSD): ~100,000+ ops/sec
Sequential I/O (HDD): ~50,000+ ops/sec
ConsumeQueue
ConsumeQueue is an index structure for fast message consumption.
Structure
Each topic queue has its own ConsumeQueue:
ConsumeQueue for Topic:OrderEvents, Queue:0
┌─────────────────────────────────────────────┐
│ Entry Size: 20 bytes │
├─────────────────────────────────────────────┤
│ [CommitLog Offset][Size][Tags Hash] │
│ [8 bytes ][4B ][8 bytes ] │
│ │
│ Example: │
│ [0x00000000][0x0064][0x12345678] │
│ [0x00000064][0x0080][0x87654321] │
│ [0x000000E4][0x0050][0xABCDEF12] │
└─────────────────────────────────────────────┘
Purpose
- Fast lookup: Quickly locate messages by offset
- Memory mapped: Can be memory-mapped for fast access
- Small size: Each entry is only 20 bytes
- Filtering: Supports tag-based filtering
Reading Messages
IndexFile
IndexFile provides fast message lookup by key.
Structure
IndexFile
┌─────────────────────────────────────────────┐
│ Hash Slot (500万个 slots) │
│ ↓ │
│ [Slot 0] → [Head Index] → ... │
│ [Slot 1] → [Head Index] → ... │
│ [Slot 2] → [Head Index] → ... │
│ ... │
│ │
│ Each Index Entry (20 bytes): │
│ - Key Hash (4 bytes) │
│ - CommitLog Offset (8 bytes) │
│ - Time Diff (4 bytes) │
│ - Next Index Offset (4 bytes) │
└─────────────────────────────────────────────┘
Usage
// Query messages by key
let messages = broker.query_message_by_key("OrderEvents", "order_12345")?;
// Returns all messages with key "order_12345"
Flush Strategies
RocketMQ supports different flush strategies to balance performance and reliability.
ASYNC_FLUSH (Default)
- Messages are written to OS page cache
- Returns immediately
- Background thread flushes to disk
- Performance: Highest
- Reliability: May lose messages on system failure
SYNC_FLUSH
- Messages are written to OS page cache
- Forces flush to disk before returning
- Performance: Lower
- Reliability: No message loss
// Configure flush mode
let mut broker_config = BrokerConfig::default();
broker_config.set_flush_disk_type(FlushDiskType::SYNC_FLUSH);
File Deletion
RocketMQ automatically deletes expired files to free disk space.
Deletion Policy
Files are deleted when any of these conditions are met:
- Disk space low: Disk usage exceeds threshold (default 85%)
- Time-based: Files older than default (72 hours)
- Manual: Triggered by admin command
// Configure retention policy
let mut broker_config = BrokerConfig::default();
broker_config.set_delete_when(DeleteWhen::DiskFull);
broker_config.set_file_reserved_time(72); // hours
Memory Mapping
ConsumeQueue and IndexFile use memory-mapped files for fast access:
// Memory-mapped file I/O
let mmap = unsafe { MmapOptions::new().map(&file)? };
// Access memory directly
let offset = mmap.read_u64(offset_position)?;
let size = mmap.read_u32(size_position)?;
Benefits
- Zero-copy I/O
- Fast access to frequently read data
- OS manages paging
Storage Performance
Write Performance
Type | Throughput | Latency
--------------------|---------------|--------------
Single Thread | 100K+ msg/s | < 1ms
Multi Thread | 500K+ msg/s | < 5ms
Batch Send | 1M+ msg/s | < 10ms
Read Performance
Operation | Latency
--------------------|--------------
Sequential Read | < 1ms
Random Read (mmap) | < 1ms
Index Lookup | < 1ms
Storage Configuration
[broker]
# CommitLog file size (1GB default)
commit_log_file_size = 1073741824
# ConsumeQueue file size (30MB default)
consume_queue_file_size = 31457280
# Flush disk type: ASYNC_FLUSH or SYNC_FLUSH
flush_disk_type = "ASYNC_FLUSH"
# Delete policy
delete_when = "DiskFull"
file_reserved_time = 72 # hours
# Maximum disk usage ratio
disk_max_used_space_ratio = 85
# Minimum free disk space (GB)
disk_space_warning_level_ratio = 90
Best Practices
- Use SSDs: Significantly improves random read performance
- Monitor disk usage: Set up alerts for disk space
- Choose appropriate flush mode: Balance performance vs reliability
- Separate CommitLog and ConsumeQueue: Use different disks if possible
- Regular backups: Implement backup strategy for critical data
- Configure retention: Set appropriate file retention time
Next Steps
- Producer - Learn about sending messages
- Consumer - Learn about consuming messages
- Configuration - Configure storage settings