Merge branch 'codex/filestore-payload-index-optimization'
This commit is contained in:
@@ -286,43 +286,31 @@ public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload
|
|||||||
|
|
||||||
### FileStore
|
### FileStore
|
||||||
|
|
||||||
`FileStore` appends messages to a JSONL file (`messages.jsonl`) and keeps a full in-memory index (`Dictionary<ulong, StoredMessage>`) identical in structure to `MemStore`. It is not production-safe for several reasons:
|
`FileStore` now persists messages into block files via `MsgBlock`, keeps a live in-memory message cache for load paths, and maintains a compact metadata index (`Dictionary<ulong, StoredMessageIndex>`) plus a per-subject last-sequence map for hot-path lookups such as `LoadLastBySubjectAsync`. Headers and payloads are stored separately and remain separate across snapshot, restore, block rewrite, and crash recovery. On startup, any legacy `messages.jsonl` file is migrated into block storage before recovery continues.
|
||||||
|
|
||||||
- **No locking**: `AppendAsync`, `LoadAsync`, `GetStateAsync`, and `TrimToMaxMessages` are not synchronized. Concurrent access from `StreamManager.Capture` and `PullConsumerEngine.FetchAsync` is unsafe.
|
|
||||||
- **Per-write file I/O**: Each `AppendAsync` calls `File.AppendAllTextAsync`, issuing a separate file open/write/close per message.
|
|
||||||
- **Full rewrite on trim**: `TrimToMaxMessages` calls `RewriteDataFile()`, which rewrites the entire file from the in-memory index. This is O(n) in message count and blocking.
|
|
||||||
- **Full in-memory index**: The in-memory dictionary holds every undeleted message payload; there is no paging or streaming read path.
|
|
||||||
|
|
||||||
```csharp
|
```csharp
|
||||||
// FileStore.cs
|
// FileStore.cs
|
||||||
public void TrimToMaxMessages(ulong maxMessages)
|
private readonly Dictionary<ulong, StoredMessage> _messages = new();
|
||||||
|
private readonly Dictionary<ulong, StoredMessageIndex> _messageIndexes = new();
|
||||||
|
private readonly Dictionary<string, ulong> _lastSequenceBySubject = new(StringComparer.Ordinal);
|
||||||
|
|
||||||
|
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
|
||||||
{
|
{
|
||||||
while ((ulong)_messages.Count > maxMessages)
|
if (_lastSequenceBySubject.TryGetValue(subject, out var sequence)
|
||||||
|
&& _messages.TryGetValue(sequence, out var match))
|
||||||
{
|
{
|
||||||
var first = _messages.Keys.Min();
|
return ValueTask.FromResult<StoredMessage?>(match);
|
||||||
_messages.Remove(first);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RewriteDataFile();
|
return ValueTask.FromResult<StoredMessage?>(null);
|
||||||
}
|
|
||||||
|
|
||||||
private void RewriteDataFile()
|
|
||||||
{
|
|
||||||
var lines = new List<string>(_messages.Count);
|
|
||||||
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
|
|
||||||
{
|
|
||||||
lines.Add(JsonSerializer.Serialize(new FileRecord
|
|
||||||
{
|
|
||||||
Sequence = message.Sequence,
|
|
||||||
Subject = message.Subject,
|
|
||||||
PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()),
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
File.WriteAllLines(_dataFilePath, lines);
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
The Go reference (`filestore.go`) uses block-based binary storage with S2 compression, per-block indexes, and memory-mapped I/O. This implementation shares none of those properties.
|
The current implementation is still materially simpler than Go `filestore.go`:
|
||||||
|
|
||||||
|
- **No synchronization**: `FileStore` still exposes unsynchronized mutation and read paths. It is safe only under the current test and single-process usage assumptions.
|
||||||
|
- **Payloads still stay resident**: the compact index removes duplicate payload ownership for metadata-heavy operations, but `_messages` still retains live payload bytes in memory for direct load paths.
|
||||||
|
- **No Go-equivalent block index stack**: there is no per-block subject tree, mmap-backed read path, or Go-style cache/compaction parity. Deletes and trims rely on tombstones plus later block maintenance rather than Go's full production filestore behavior.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -445,7 +433,7 @@ The following features are present in the Go reference (`golang/nats-server/serv
|
|||||||
- **Ephemeral consumers**: `ConsumerManager.CreateOrUpdate` requires a non-empty `DurableName`. There is no support for unnamed ephemeral consumers.
|
- **Ephemeral consumers**: `ConsumerManager.CreateOrUpdate` requires a non-empty `DurableName`. There is no support for unnamed ephemeral consumers.
|
||||||
- **Push delivery over the NATS wire**: Push consumers enqueue `PushFrame` objects into an in-memory queue. No MSG is written to any connected NATS client's TCP socket.
|
- **Push delivery over the NATS wire**: Push consumers enqueue `PushFrame` objects into an in-memory queue. No MSG is written to any connected NATS client's TCP socket.
|
||||||
- **Consumer filter subject enforcement**: `FilterSubject` is stored on `ConsumerConfig` but is never applied in `PullConsumerEngine.FetchAsync`. All messages in the stream are returned regardless of filter.
|
- **Consumer filter subject enforcement**: `FilterSubject` is stored on `ConsumerConfig` but is never applied in `PullConsumerEngine.FetchAsync`. All messages in the stream are returned regardless of filter.
|
||||||
- **FileStore production safety**: No locking, per-write file I/O, full-rewrite-on-trim, and full in-memory index make `FileStore` unsuitable for production use.
|
- **FileStore production safety**: `FileStore` now uses block files and compact metadata indexes, but it still lacks synchronization and Go-level block indexing, so it remains unsuitable for production use.
|
||||||
- **RAFT persistence and networking**: `RaftNode` log entries are not persisted across restarts. Replication uses direct in-process method calls; there is no network transport for multi-server consensus.
|
- **RAFT persistence and networking**: `RaftNode` log entries are not persisted across restarts. Replication uses direct in-process method calls; there is no network transport for multi-server consensus.
|
||||||
- **Cross-server replication**: Mirror and source coordinators work only within one `StreamManager` in one process. Messages published on a remote server are not replicated.
|
- **Cross-server replication**: Mirror and source coordinators work only within one `StreamManager` in one process. Messages published on a remote server are not replicated.
|
||||||
- **Duplicate message window**: `PublishPreconditions` tracks message IDs for deduplication but there is no configurable `DuplicateWindow` TTL to expire old IDs.
|
- **Duplicate message window**: `PublishPreconditions` tracks message IDs for deduplication but there is no configurable `DuplicateWindow` TTL to expire old IDs.
|
||||||
@@ -460,4 +448,4 @@ The following features are present in the Go reference (`golang/nats-server/serv
|
|||||||
- [Configuration Overview](../Configuration/Overview.md)
|
- [Configuration Overview](../Configuration/Overview.md)
|
||||||
- [Protocol Overview](../Protocol/Overview.md)
|
- [Protocol Overview](../Protocol/Overview.md)
|
||||||
|
|
||||||
<!-- Last verified against codebase: 2026-02-23 -->
|
<!-- Last verified against codebase: 2026-03-13 -->
|
||||||
|
|||||||
@@ -95,9 +95,6 @@ public sealed class CompiledFilter
|
|||||||
|
|
||||||
public sealed class PullConsumerEngine
|
public sealed class PullConsumerEngine
|
||||||
{
|
{
|
||||||
// Reusable fetch buffer to avoid per-fetch List allocation.
|
|
||||||
// Go reference: consumer.go — reuses slice backing array across fetch calls.
|
|
||||||
[ThreadStatic] private static List<StoredMessage>? t_fetchBuf;
|
|
||||||
// Go: consumer.go — cluster-wide pending pull request tracking keyed by reply subject.
|
// Go: consumer.go — cluster-wide pending pull request tracking keyed by reply subject.
|
||||||
// Reference: golang/nats-server/server/consumer.go waitingRequestsPending / proposeWaitingRequest
|
// Reference: golang/nats-server/server/consumer.go waitingRequestsPending / proposeWaitingRequest
|
||||||
private readonly ConcurrentDictionary<string, PullWaitingRequest> _clusterPending =
|
private readonly ConcurrentDictionary<string, PullWaitingRequest> _clusterPending =
|
||||||
@@ -158,11 +155,7 @@ public sealed class PullConsumerEngine
|
|||||||
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct)
|
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct)
|
||||||
{
|
{
|
||||||
var batch = Math.Max(request.Batch, 1);
|
var batch = Math.Max(request.Batch, 1);
|
||||||
// Use thread-static buffer to avoid per-fetch List allocation.
|
var messages = new List<StoredMessage>(batch);
|
||||||
// Results are snapshot'd into PullFetchBatch before the buffer is reused.
|
|
||||||
var messages = t_fetchBuf ??= new List<StoredMessage>();
|
|
||||||
messages.Clear();
|
|
||||||
if (messages.Capacity < batch) messages.Capacity = batch;
|
|
||||||
|
|
||||||
// Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests.
|
// Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests.
|
||||||
// When ExpiresMs > 0, create a linked CancellationTokenSource that fires
|
// When ExpiresMs > 0, create a linked CancellationTokenSource that fires
|
||||||
@@ -313,6 +306,7 @@ public sealed class PullConsumerEngine
|
|||||||
await Task.Delay(5, ct).ConfigureAwait(false);
|
await Task.Delay(5, ct).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ct.ThrowIfCancellationRequested();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
// In-memory cache: keyed by sequence number. This is the primary data structure
|
// In-memory cache: keyed by sequence number. This is the primary data structure
|
||||||
// for reads and queries. The blocks are the on-disk persistence layer.
|
// for reads and queries. The blocks are the on-disk persistence layer.
|
||||||
private readonly Dictionary<ulong, StoredMessage> _messages = new();
|
private readonly Dictionary<ulong, StoredMessage> _messages = new();
|
||||||
|
private readonly Dictionary<ulong, StoredMessageIndex> _messageIndexes = new();
|
||||||
|
private readonly Dictionary<string, ulong> _lastSequenceBySubject = new(StringComparer.Ordinal);
|
||||||
|
|
||||||
// Block-based storage: the active (writable) block and sealed blocks.
|
// Block-based storage: the active (writable) block and sealed blocks.
|
||||||
private readonly List<MsgBlock> _blocks = [];
|
private readonly List<MsgBlock> _blocks = [];
|
||||||
@@ -141,20 +143,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
// We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray().
|
// We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray().
|
||||||
var persistedPayload = TransformForPersist(payload.Span);
|
var persistedPayload = TransformForPersist(payload.Span);
|
||||||
var storedPayload = _noTransform ? persistedPayload : payload.ToArray();
|
var storedPayload = _noTransform ? persistedPayload : payload.ToArray();
|
||||||
_messages[_last] = new StoredMessage
|
TrackMessage(new StoredMessage
|
||||||
{
|
{
|
||||||
Sequence = _last,
|
Sequence = _last,
|
||||||
Subject = subject,
|
Subject = subject,
|
||||||
Payload = storedPayload,
|
Payload = storedPayload,
|
||||||
TimestampUtc = now,
|
TimestampUtc = now,
|
||||||
};
|
});
|
||||||
_generation++;
|
_generation++;
|
||||||
|
|
||||||
_messageCount++;
|
|
||||||
_totalBytes += (ulong)payload.Length;
|
|
||||||
if (_messageCount == 1)
|
|
||||||
_firstSeq = _last;
|
|
||||||
|
|
||||||
// Go: register TTL only when TTL > 0.
|
// Go: register TTL only when TTL > 0.
|
||||||
if (_options.MaxAgeMs > 0)
|
if (_options.MaxAgeMs > 0)
|
||||||
RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L);
|
RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L);
|
||||||
@@ -195,11 +192,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
|
|
||||||
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
|
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
|
||||||
{
|
{
|
||||||
var match = _messages.Values
|
if (_lastSequenceBySubject.TryGetValue(subject, out var sequence)
|
||||||
.Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal))
|
&& _messages.TryGetValue(sequence, out var match))
|
||||||
.OrderByDescending(m => m.Sequence)
|
{
|
||||||
.FirstOrDefault();
|
return ValueTask.FromResult<StoredMessage?>(match);
|
||||||
return ValueTask.FromResult(match);
|
}
|
||||||
|
|
||||||
|
return ValueTask.FromResult<StoredMessage?>(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
|
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
|
||||||
@@ -212,21 +211,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
|
|
||||||
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
|
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
|
||||||
{
|
{
|
||||||
if (!_messages.TryGetValue(sequence, out var msg))
|
if (!RemoveTrackedMessage(sequence, preserveHighWaterMark: false))
|
||||||
return ValueTask.FromResult(false);
|
return ValueTask.FromResult(false);
|
||||||
|
|
||||||
_messages.Remove(sequence);
|
|
||||||
_generation++;
|
_generation++;
|
||||||
|
|
||||||
// Incremental state tracking.
|
|
||||||
_messageCount--;
|
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
|
||||||
if (sequence == _firstSeq)
|
|
||||||
_firstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min();
|
|
||||||
|
|
||||||
if (sequence == _last)
|
|
||||||
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
|
|
||||||
|
|
||||||
// Soft-delete in the block that contains this sequence.
|
// Soft-delete in the block that contains this sequence.
|
||||||
DeleteInBlock(sequence);
|
DeleteInBlock(sequence);
|
||||||
|
|
||||||
@@ -266,6 +255,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
{
|
{
|
||||||
Sequence = x.Sequence,
|
Sequence = x.Sequence,
|
||||||
Subject = x.Subject,
|
Subject = x.Subject,
|
||||||
|
HeadersBase64 = x.RawHeaders.IsEmpty ? null : Convert.ToBase64String(x.RawHeaders.Span),
|
||||||
PayloadBase64 = Convert.ToBase64String(TransformForPersist(x.Payload.Span)),
|
PayloadBase64 = Convert.ToBase64String(TransformForPersist(x.Payload.Span)),
|
||||||
TimestampUtc = x.TimestampUtc,
|
TimestampUtc = x.TimestampUtc,
|
||||||
})
|
})
|
||||||
@@ -276,10 +266,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
|
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
|
||||||
{
|
{
|
||||||
_messages.Clear();
|
_messages.Clear();
|
||||||
|
_messageIndexes.Clear();
|
||||||
|
_lastSequenceBySubject.Clear();
|
||||||
_last = 0;
|
_last = 0;
|
||||||
_messageCount = 0;
|
_messageCount = 0;
|
||||||
_totalBytes = 0;
|
_totalBytes = 0;
|
||||||
_firstSeq = 0;
|
_firstSeq = 0;
|
||||||
|
_first = 0;
|
||||||
|
|
||||||
// Dispose existing blocks and clean files.
|
// Dispose existing blocks and clean files.
|
||||||
DisposeAllBlocks();
|
DisposeAllBlocks();
|
||||||
@@ -292,25 +285,26 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
{
|
{
|
||||||
foreach (var record in records)
|
foreach (var record in records)
|
||||||
{
|
{
|
||||||
|
var restoredHeaders = string.IsNullOrEmpty(record.HeadersBase64)
|
||||||
|
? ReadOnlyMemory<byte>.Empty
|
||||||
|
: Convert.FromBase64String(record.HeadersBase64);
|
||||||
var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
|
var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
|
||||||
var message = new StoredMessage
|
var message = new StoredMessage
|
||||||
{
|
{
|
||||||
Sequence = record.Sequence,
|
Sequence = record.Sequence,
|
||||||
Subject = record.Subject ?? string.Empty,
|
Subject = record.Subject ?? string.Empty,
|
||||||
|
RawHeaders = restoredHeaders,
|
||||||
Payload = restoredPayload,
|
Payload = restoredPayload,
|
||||||
TimestampUtc = record.TimestampUtc,
|
TimestampUtc = record.TimestampUtc,
|
||||||
};
|
};
|
||||||
_messages[record.Sequence] = message;
|
_messages[record.Sequence] = message;
|
||||||
_last = Math.Max(_last, record.Sequence);
|
_last = Math.Max(_last, record.Sequence);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recompute incremental state from restored messages.
|
|
||||||
_messageCount = (ulong)_messages.Count;
|
|
||||||
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
|
|
||||||
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RebuildIndexesFromMessages();
|
||||||
|
|
||||||
// Write all messages to fresh blocks.
|
// Write all messages to fresh blocks.
|
||||||
RewriteBlocks();
|
RewriteBlocks();
|
||||||
return ValueTask.CompletedTask;
|
return ValueTask.CompletedTask;
|
||||||
@@ -332,14 +326,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
var trimmed = false;
|
var trimmed = false;
|
||||||
while ((ulong)_messages.Count > maxMessages)
|
while ((ulong)_messages.Count > maxMessages)
|
||||||
{
|
{
|
||||||
var first = _messages.Keys.Min();
|
var first = _firstSeq;
|
||||||
if (_messages.TryGetValue(first, out var msg))
|
if (first == 0 || !RemoveTrackedMessage(first, preserveHighWaterMark: true))
|
||||||
{
|
break;
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
|
||||||
_messageCount--;
|
|
||||||
}
|
|
||||||
|
|
||||||
_messages.Remove(first);
|
|
||||||
DeleteInBlock(first);
|
DeleteInBlock(first);
|
||||||
trimmed = true;
|
trimmed = true;
|
||||||
}
|
}
|
||||||
@@ -347,7 +337,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
if (!trimmed)
|
if (!trimmed)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
|
|
||||||
_generation++;
|
_generation++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -377,36 +366,19 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
var now = DateTime.UtcNow;
|
var now = DateTime.UtcNow;
|
||||||
var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L;
|
var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||||
|
|
||||||
// Combine headers and payload (headers precede the body in NATS wire format).
|
var headers = hdr is { Length: > 0 } ? hdr : [];
|
||||||
byte[] combined;
|
var payload = msg ?? [];
|
||||||
if (hdr is { Length: > 0 })
|
var persistedPayload = TransformForPersist(payload);
|
||||||
{
|
TrackMessage(new StoredMessage
|
||||||
combined = new byte[hdr.Length + (msg?.Length ?? 0)];
|
|
||||||
hdr.CopyTo(combined, 0);
|
|
||||||
msg?.CopyTo(combined, hdr.Length);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
combined = msg ?? [];
|
|
||||||
}
|
|
||||||
|
|
||||||
var persistedPayload = TransformForPersist(combined.AsSpan());
|
|
||||||
var stored = new StoredMessage
|
|
||||||
{
|
{
|
||||||
Sequence = _last,
|
Sequence = _last,
|
||||||
Subject = subject,
|
Subject = subject,
|
||||||
Payload = combined,
|
RawHeaders = headers,
|
||||||
|
Payload = payload,
|
||||||
TimestampUtc = now,
|
TimestampUtc = now,
|
||||||
};
|
});
|
||||||
_messages[_last] = stored;
|
|
||||||
_generation++;
|
_generation++;
|
||||||
|
|
||||||
// Incremental state tracking.
|
|
||||||
_messageCount++;
|
|
||||||
_totalBytes += (ulong)combined.Length;
|
|
||||||
if (_messageCount == 1)
|
|
||||||
_firstSeq = _last;
|
|
||||||
|
|
||||||
// Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs.
|
// Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs.
|
||||||
// Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge.
|
// Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge.
|
||||||
var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L);
|
var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L);
|
||||||
@@ -415,16 +387,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
EnsureActiveBlock();
|
EnsureActiveBlock();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
|
_activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp);
|
||||||
}
|
}
|
||||||
catch (InvalidOperationException)
|
catch (InvalidOperationException)
|
||||||
{
|
{
|
||||||
RotateBlock();
|
RotateBlock();
|
||||||
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
|
_activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
|
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
|
||||||
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
|
_writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length);
|
||||||
|
|
||||||
// Signal the background flush loop to coalesce and flush pending writes.
|
// Signal the background flush loop to coalesce and flush pending writes.
|
||||||
_flushSignal.Writer.TryWrite(0);
|
_flushSignal.Writer.TryWrite(0);
|
||||||
@@ -443,11 +415,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
{
|
{
|
||||||
var count = (ulong)_messages.Count;
|
var count = (ulong)_messages.Count;
|
||||||
_messages.Clear();
|
_messages.Clear();
|
||||||
|
_messageIndexes.Clear();
|
||||||
|
_lastSequenceBySubject.Clear();
|
||||||
_generation++;
|
_generation++;
|
||||||
_last = 0;
|
_last = 0;
|
||||||
_messageCount = 0;
|
_messageCount = 0;
|
||||||
_totalBytes = 0;
|
_totalBytes = 0;
|
||||||
_firstSeq = 0;
|
_firstSeq = 0;
|
||||||
|
_first = 0;
|
||||||
|
|
||||||
DisposeAllBlocks();
|
DisposeAllBlocks();
|
||||||
CleanBlockFiles();
|
CleanBlockFiles();
|
||||||
@@ -470,42 +445,53 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
if (string.IsNullOrEmpty(subject) && keep == 0 && seq == 0)
|
if (string.IsNullOrEmpty(subject) && keep == 0 && seq == 0)
|
||||||
return Purge();
|
return Purge();
|
||||||
|
|
||||||
// Collect all messages matching the subject (with wildcard support) at or below seq, ordered by sequence.
|
var upperBound = seq == 0 ? _last : Math.Min(seq, _last);
|
||||||
var candidates = _messages.Values
|
if (upperBound == 0)
|
||||||
.Where(m => SubjectMatchesFilter(m.Subject, subject))
|
|
||||||
.Where(m => seq == 0 || m.Sequence <= seq)
|
|
||||||
.OrderBy(m => m.Sequence)
|
|
||||||
.ToList();
|
|
||||||
|
|
||||||
if (candidates.Count == 0)
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// Keep the newest `keep` messages; purge the rest.
|
ulong candidateCount = 0;
|
||||||
var toRemove = keep > 0 && (ulong)candidates.Count > keep
|
for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound; current++)
|
||||||
? candidates.Take(candidates.Count - (int)keep).ToList()
|
|
||||||
: (keep == 0 ? candidates : []);
|
|
||||||
|
|
||||||
if (toRemove.Count == 0)
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
foreach (var msg in toRemove)
|
|
||||||
{
|
{
|
||||||
_messages.Remove(msg.Sequence);
|
if (!_messages.TryGetValue(current, out var message))
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
continue;
|
||||||
_messageCount--;
|
|
||||||
DeleteInBlock(msg.Sequence);
|
if (!SubjectMatchesFilter(message.Subject, subject))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
candidateCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (candidateCount == 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
var targetRemoveCount = keep > 0
|
||||||
|
? (candidateCount > keep ? candidateCount - keep : 0UL)
|
||||||
|
: candidateCount;
|
||||||
|
|
||||||
|
if (targetRemoveCount == 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
ulong removed = 0;
|
||||||
|
for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound && removed < targetRemoveCount; current++)
|
||||||
|
{
|
||||||
|
if (!_messages.TryGetValue(current, out var message))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (!SubjectMatchesFilter(message.Subject, subject))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (RemoveTrackedMessage(current, preserveHighWaterMark: true))
|
||||||
|
{
|
||||||
|
DeleteInBlock(current);
|
||||||
|
removed++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (removed == 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
_generation++;
|
_generation++;
|
||||||
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
|
return removed;
|
||||||
|
|
||||||
// Update _last if required.
|
|
||||||
if (_messages.Count == 0)
|
|
||||||
_last = 0;
|
|
||||||
else if (!_messages.ContainsKey(_last))
|
|
||||||
_last = _messages.Keys.Max();
|
|
||||||
|
|
||||||
return (ulong)toRemove.Count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -524,13 +510,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
|
|
||||||
foreach (var s in toRemove)
|
foreach (var s in toRemove)
|
||||||
{
|
{
|
||||||
if (_messages.TryGetValue(s, out var msg))
|
RemoveTrackedMessage(s, preserveHighWaterMark: true);
|
||||||
{
|
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
|
||||||
_messageCount--;
|
|
||||||
}
|
|
||||||
|
|
||||||
_messages.Remove(s);
|
|
||||||
DeleteInBlock(s);
|
DeleteInBlock(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -545,11 +525,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (!_messages.ContainsKey(_last))
|
_first = _firstSeq;
|
||||||
_last = _messages.Keys.Max();
|
|
||||||
// Update _first to reflect the real first message.
|
|
||||||
_first = _messages.Keys.Min();
|
|
||||||
_firstSeq = _first;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return (ulong)toRemove.Length;
|
return (ulong)toRemove.Length;
|
||||||
@@ -566,11 +542,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
{
|
{
|
||||||
// Truncate to nothing.
|
// Truncate to nothing.
|
||||||
_messages.Clear();
|
_messages.Clear();
|
||||||
|
_messageIndexes.Clear();
|
||||||
|
_lastSequenceBySubject.Clear();
|
||||||
_generation++;
|
_generation++;
|
||||||
_last = 0;
|
_last = 0;
|
||||||
_messageCount = 0;
|
_messageCount = 0;
|
||||||
_totalBytes = 0;
|
_totalBytes = 0;
|
||||||
_firstSeq = 0;
|
_firstSeq = 0;
|
||||||
|
_first = 0;
|
||||||
DisposeAllBlocks();
|
DisposeAllBlocks();
|
||||||
CleanBlockFiles();
|
CleanBlockFiles();
|
||||||
return;
|
return;
|
||||||
@@ -579,13 +558,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
var toRemove = _messages.Keys.Where(k => k > seq).ToArray();
|
var toRemove = _messages.Keys.Where(k => k > seq).ToArray();
|
||||||
foreach (var s in toRemove)
|
foreach (var s in toRemove)
|
||||||
{
|
{
|
||||||
if (_messages.TryGetValue(s, out var msg))
|
RemoveTrackedMessage(s, preserveHighWaterMark: false);
|
||||||
{
|
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
|
||||||
_messageCount--;
|
|
||||||
}
|
|
||||||
|
|
||||||
_messages.Remove(s);
|
|
||||||
DeleteInBlock(s);
|
DeleteInBlock(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -594,8 +567,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
|
|
||||||
// Update _last to the new highest existing sequence (or seq if it exists,
|
// Update _last to the new highest existing sequence (or seq if it exists,
|
||||||
// or the highest below seq).
|
// or the highest below seq).
|
||||||
_last = _messages.Count == 0 ? 0 : _messages.Keys.Max();
|
if (_messageCount == 0)
|
||||||
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
|
{
|
||||||
|
_last = 0;
|
||||||
|
_first = 0;
|
||||||
|
_firstSeq = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -865,6 +842,119 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void TrackMessage(StoredMessage message)
|
||||||
|
{
|
||||||
|
_messages[message.Sequence] = message;
|
||||||
|
_messageIndexes[message.Sequence] = message.ToIndex();
|
||||||
|
_lastSequenceBySubject[message.Subject] = message.Sequence;
|
||||||
|
_messageCount++;
|
||||||
|
_totalBytes += (ulong)(message.RawHeaders.Length + message.Payload.Length);
|
||||||
|
|
||||||
|
if (_messageCount == 1)
|
||||||
|
{
|
||||||
|
_first = message.Sequence;
|
||||||
|
_firstSeq = message.Sequence;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool RemoveTrackedMessage(ulong sequence, bool preserveHighWaterMark)
|
||||||
|
{
|
||||||
|
if (!_messages.Remove(sequence, out var message))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
_messageIndexes.Remove(sequence);
|
||||||
|
_messageCount--;
|
||||||
|
_totalBytes -= (ulong)(message.RawHeaders.Length + message.Payload.Length);
|
||||||
|
UpdateLastSequenceForSubject(message.Subject, sequence);
|
||||||
|
|
||||||
|
if (_messageCount == 0)
|
||||||
|
{
|
||||||
|
if (preserveHighWaterMark)
|
||||||
|
_first = _last + 1;
|
||||||
|
else
|
||||||
|
_last = 0;
|
||||||
|
|
||||||
|
_firstSeq = 0;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sequence == _firstSeq)
|
||||||
|
AdvanceFirstSequence(sequence + 1);
|
||||||
|
|
||||||
|
if (!preserveHighWaterMark && sequence == _last)
|
||||||
|
_last = FindPreviousLiveSequence(sequence);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void AdvanceFirstSequence(ulong start)
|
||||||
|
{
|
||||||
|
var candidate = start;
|
||||||
|
while (!_messageIndexes.ContainsKey(candidate) && candidate <= _last)
|
||||||
|
candidate++;
|
||||||
|
|
||||||
|
if (candidate <= _last)
|
||||||
|
{
|
||||||
|
_first = candidate;
|
||||||
|
_firstSeq = candidate;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_first = _last + 1;
|
||||||
|
_firstSeq = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ulong FindPreviousLiveSequence(ulong startExclusive)
|
||||||
|
{
|
||||||
|
if (_messageCount == 0 || startExclusive == 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
for (var seq = startExclusive - 1; ; seq--)
|
||||||
|
{
|
||||||
|
if (_messageIndexes.ContainsKey(seq))
|
||||||
|
return seq;
|
||||||
|
|
||||||
|
if (seq == 0)
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void UpdateLastSequenceForSubject(string subject, ulong removedSequence)
|
||||||
|
{
|
||||||
|
if (!_lastSequenceBySubject.TryGetValue(subject, out var currentLast) || currentLast != removedSequence)
|
||||||
|
return;
|
||||||
|
|
||||||
|
for (var seq = removedSequence - 1; ; seq--)
|
||||||
|
{
|
||||||
|
if (_messageIndexes.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
_lastSequenceBySubject[subject] = seq;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (seq == 0)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
_lastSequenceBySubject.Remove(subject);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void RebuildIndexesFromMessages()
|
||||||
|
{
|
||||||
|
_messageIndexes.Clear();
|
||||||
|
_lastSequenceBySubject.Clear();
|
||||||
|
_messageCount = 0;
|
||||||
|
_totalBytes = 0;
|
||||||
|
_firstSeq = 0;
|
||||||
|
_first = 0;
|
||||||
|
|
||||||
|
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
|
||||||
|
TrackMessage(message);
|
||||||
|
|
||||||
|
if (_messageCount == 0 && _last > 0)
|
||||||
|
_first = _last + 1;
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Subject matching helper
|
// Subject matching helper
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
@@ -1056,9 +1146,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
CleanBlockFiles();
|
CleanBlockFiles();
|
||||||
|
|
||||||
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
|
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
|
||||||
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
|
RebuildIndexesFromMessages();
|
||||||
_messageCount = (ulong)_messages.Count;
|
|
||||||
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
|
|
||||||
|
|
||||||
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
|
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
|
||||||
{
|
{
|
||||||
@@ -1068,12 +1156,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
EnsureActiveBlock();
|
EnsureActiveBlock();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
_activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
|
_activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp);
|
||||||
}
|
}
|
||||||
catch (InvalidOperationException)
|
catch (InvalidOperationException)
|
||||||
{
|
{
|
||||||
RotateBlock();
|
RotateBlock();
|
||||||
_activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
|
_activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_activeBlock!.IsSealed)
|
if (_activeBlock!.IsSealed)
|
||||||
@@ -1168,15 +1256,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Sync _first from _messages; if empty, set to _last+1 (watermark).
|
// Sync _first from _messages; if empty, set to _last+1 (watermark).
|
||||||
if (_messages.Count > 0)
|
RebuildIndexesFromMessages();
|
||||||
_first = _messages.Keys.Min();
|
|
||||||
else if (_last > 0)
|
|
||||||
_first = _last + 1;
|
|
||||||
|
|
||||||
// Recompute incremental state from recovered messages.
|
|
||||||
_messageCount = (ulong)_messages.Count;
|
|
||||||
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
|
|
||||||
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -1207,6 +1287,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
{
|
{
|
||||||
Sequence = record.Sequence,
|
Sequence = record.Sequence,
|
||||||
Subject = record.Subject,
|
Subject = record.Subject,
|
||||||
|
RawHeaders = record.Headers,
|
||||||
Payload = originalPayload,
|
Payload = originalPayload,
|
||||||
TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime,
|
TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime,
|
||||||
};
|
};
|
||||||
@@ -1370,20 +1451,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
// Reference: golang/nats-server/server/filestore.go:expireMsgs — dmap-based removal.
|
// Reference: golang/nats-server/server/filestore.go:expireMsgs — dmap-based removal.
|
||||||
foreach (var seq in expired)
|
foreach (var seq in expired)
|
||||||
{
|
{
|
||||||
if (_messages.Remove(seq, out var msg))
|
RemoveTrackedMessage(seq, preserveHighWaterMark: true);
|
||||||
{
|
|
||||||
_messageCount--;
|
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
|
||||||
}
|
|
||||||
|
|
||||||
DeleteInBlock(seq);
|
DeleteInBlock(seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_messages.Count > 0)
|
|
||||||
_firstSeq = _messages.Keys.Min();
|
|
||||||
else
|
|
||||||
_firstSeq = 0;
|
|
||||||
|
|
||||||
_generation++;
|
_generation++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1407,14 +1478,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
|
|
||||||
foreach (var sequence in expired)
|
foreach (var sequence in expired)
|
||||||
{
|
{
|
||||||
if (_messages.Remove(sequence, out var msg))
|
RemoveTrackedMessage(sequence, preserveHighWaterMark: true);
|
||||||
{
|
|
||||||
_messageCount--;
|
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
|
|
||||||
RewriteBlocks();
|
RewriteBlocks();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1676,27 +1741,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public bool RemoveMsg(ulong seq)
|
public bool RemoveMsg(ulong seq)
|
||||||
{
|
{
|
||||||
if (!_messages.Remove(seq, out var msg))
|
if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
_generation++;
|
_generation++;
|
||||||
_messageCount--;
|
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
|
||||||
|
|
||||||
// Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is
|
// Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is
|
||||||
// never decremented on removal. Only FirstSeq advances when the first
|
// never decremented on removal. Only FirstSeq advances when the first
|
||||||
// live message is removed.
|
// live message is removed.
|
||||||
if (_messages.Count == 0)
|
|
||||||
{
|
|
||||||
_first = _last + 1; // All gone — next first would be after last
|
|
||||||
_firstSeq = 0;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_first = _messages.Keys.Min();
|
|
||||||
_firstSeq = _first;
|
|
||||||
}
|
|
||||||
|
|
||||||
DeleteInBlock(seq);
|
DeleteInBlock(seq);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -1709,23 +1760,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public bool EraseMsg(ulong seq)
|
public bool EraseMsg(ulong seq)
|
||||||
{
|
{
|
||||||
if (!_messages.Remove(seq, out var msg))
|
if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
_generation++;
|
_generation++;
|
||||||
_messageCount--;
|
|
||||||
_totalBytes -= (ulong)msg.Payload.Length;
|
|
||||||
|
|
||||||
if (_messages.Count == 0)
|
|
||||||
{
|
|
||||||
_first = _last + 1;
|
|
||||||
_firstSeq = 0;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_first = _messages.Keys.Min();
|
|
||||||
_firstSeq = _first;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Secure erase: overwrite payload bytes with random data before marking deleted.
|
// Secure erase: overwrite payload bytes with random data before marking deleted.
|
||||||
// Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg).
|
// Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg).
|
||||||
@@ -1813,6 +1851,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
sm ??= new StoreMsg();
|
sm ??= new StoreMsg();
|
||||||
sm.Clear();
|
sm.Clear();
|
||||||
sm.Subject = stored.Subject;
|
sm.Subject = stored.Subject;
|
||||||
|
sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null;
|
||||||
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
|
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
|
||||||
sm.Sequence = stored.Sequence;
|
sm.Sequence = stored.Sequence;
|
||||||
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||||
@@ -1833,7 +1872,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
sm ??= new StoreMsg();
|
sm ??= new StoreMsg();
|
||||||
sm.Clear();
|
sm.Clear();
|
||||||
sm.Subject = record.Subject;
|
sm.Subject = record.Subject;
|
||||||
sm.Data = record.Payload.Length > 0 ? record.Payload.ToArray() : null;
|
sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null;
|
||||||
|
var originalPayload = RestorePayload(record.Payload.Span);
|
||||||
|
sm.Data = originalPayload.Length > 0 ? originalPayload : null;
|
||||||
sm.Sequence = record.Sequence;
|
sm.Sequence = record.Sequence;
|
||||||
sm.Timestamp = record.Timestamp;
|
sm.Timestamp = record.Timestamp;
|
||||||
return sm;
|
return sm;
|
||||||
@@ -1887,6 +1928,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
sm ??= new StoreMsg();
|
sm ??= new StoreMsg();
|
||||||
sm.Clear();
|
sm.Clear();
|
||||||
sm.Subject = match.Subject;
|
sm.Subject = match.Subject;
|
||||||
|
sm.Header = match.RawHeaders.Length > 0 ? match.RawHeaders.ToArray() : null;
|
||||||
sm.Data = match.Payload.Length > 0 ? match.Payload.ToArray() : null;
|
sm.Data = match.Payload.Length > 0 ? match.Payload.ToArray() : null;
|
||||||
sm.Sequence = match.Sequence;
|
sm.Sequence = match.Sequence;
|
||||||
sm.Timestamp = new DateTimeOffset(match.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
sm.Timestamp = new DateTimeOffset(match.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||||
@@ -1917,6 +1959,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
sm ??= new StoreMsg();
|
sm ??= new StoreMsg();
|
||||||
sm.Clear();
|
sm.Clear();
|
||||||
sm.Subject = found.Value.Subject;
|
sm.Subject = found.Value.Subject;
|
||||||
|
sm.Header = found.Value.RawHeaders.Length > 0 ? found.Value.RawHeaders.ToArray() : null;
|
||||||
sm.Data = found.Value.Payload.Length > 0 ? found.Value.Payload.ToArray() : null;
|
sm.Data = found.Value.Payload.Length > 0 ? found.Value.Payload.ToArray() : null;
|
||||||
sm.Sequence = found.Key;
|
sm.Sequence = found.Key;
|
||||||
sm.Timestamp = new DateTimeOffset(found.Value.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
sm.Timestamp = new DateTimeOffset(found.Value.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||||
@@ -2041,20 +2084,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
if (_stopped)
|
if (_stopped)
|
||||||
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
|
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
|
||||||
|
|
||||||
// Combine headers and payload, same as StoreMsg.
|
var headers = hdr is { Length: > 0 } ? hdr : [];
|
||||||
byte[] combined;
|
var payload = msg ?? [];
|
||||||
if (hdr is { Length: > 0 })
|
var persistedPayload = TransformForPersist(payload);
|
||||||
{
|
|
||||||
combined = new byte[hdr.Length + msg.Length];
|
|
||||||
hdr.CopyTo(combined, 0);
|
|
||||||
msg.CopyTo(combined, hdr.Length);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
combined = msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
var persistedPayload = TransformForPersist(combined.AsSpan());
|
|
||||||
// Recover UTC DateTime from caller-supplied Unix nanosecond timestamp.
|
// Recover UTC DateTime from caller-supplied Unix nanosecond timestamp.
|
||||||
var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime;
|
var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime;
|
||||||
|
|
||||||
@@ -2062,10 +2094,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
{
|
{
|
||||||
Sequence = seq,
|
Sequence = seq,
|
||||||
Subject = subject,
|
Subject = subject,
|
||||||
Payload = combined,
|
RawHeaders = headers,
|
||||||
|
Payload = payload,
|
||||||
TimestampUtc = storedUtc,
|
TimestampUtc = storedUtc,
|
||||||
};
|
};
|
||||||
_messages[seq] = stored;
|
TrackMessage(stored);
|
||||||
_generation++;
|
_generation++;
|
||||||
|
|
||||||
// Go: update _last to the high-water mark — do not decrement.
|
// Go: update _last to the high-water mark — do not decrement.
|
||||||
@@ -2078,16 +2111,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
EnsureActiveBlock();
|
EnsureActiveBlock();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
_activeBlock!.WriteAt(seq, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, ts);
|
_activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts);
|
||||||
}
|
}
|
||||||
catch (InvalidOperationException)
|
catch (InvalidOperationException)
|
||||||
{
|
{
|
||||||
RotateBlock();
|
RotateBlock();
|
||||||
_activeBlock!.WriteAt(seq, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, ts);
|
_activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
|
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
|
||||||
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
|
_writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length);
|
||||||
|
|
||||||
if (_activeBlock!.IsSealed)
|
if (_activeBlock!.IsSealed)
|
||||||
RotateBlock();
|
RotateBlock();
|
||||||
@@ -2113,6 +2146,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
sm ??= new StoreMsg();
|
sm ??= new StoreMsg();
|
||||||
sm.Clear();
|
sm.Clear();
|
||||||
sm.Subject = stored.Subject;
|
sm.Subject = stored.Subject;
|
||||||
|
sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null;
|
||||||
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
|
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
|
||||||
sm.Sequence = stored.Sequence;
|
sm.Sequence = stored.Sequence;
|
||||||
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||||
@@ -2365,6 +2399,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
|||||||
{
|
{
|
||||||
public ulong Sequence { get; init; }
|
public ulong Sequence { get; init; }
|
||||||
public string? Subject { get; init; }
|
public string? Subject { get; init; }
|
||||||
|
public string? HeadersBase64 { get; init; }
|
||||||
public string? PayloadBase64 { get; init; }
|
public string? PayloadBase64 { get; init; }
|
||||||
public DateTime TimestampUtc { get; init; }
|
public DateTime TimestampUtc { get; init; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ public sealed class StoredMessage
|
|||||||
public ulong Sequence { get; init; }
|
public ulong Sequence { get; init; }
|
||||||
public string Subject { get; init; } = string.Empty;
|
public string Subject { get; init; } = string.Empty;
|
||||||
public ReadOnlyMemory<byte> Payload { get; init; }
|
public ReadOnlyMemory<byte> Payload { get; init; }
|
||||||
|
internal ReadOnlyMemory<byte> RawHeaders { get; init; }
|
||||||
public DateTime TimestampUtc { get; init; } = DateTime.UtcNow;
|
public DateTime TimestampUtc { get; init; } = DateTime.UtcNow;
|
||||||
public string? Account { get; init; }
|
public string? Account { get; init; }
|
||||||
public bool Redelivered { get; init; }
|
public bool Redelivered { get; init; }
|
||||||
@@ -18,4 +19,7 @@ public sealed class StoredMessage
|
|||||||
/// Convenience accessor for the Nats-Msg-Id header value, used by source deduplication.
|
/// Convenience accessor for the Nats-Msg-Id header value, used by source deduplication.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string? MsgId => Headers is not null && Headers.TryGetValue("Nats-Msg-Id", out var id) ? id : null;
|
public string? MsgId => Headers is not null && Headers.TryGetValue("Nats-Msg-Id", out var id) ? id : null;
|
||||||
|
|
||||||
|
internal StoredMessageIndex ToIndex()
|
||||||
|
=> new(Sequence, Subject, Payload.Length, TimestampUtc);
|
||||||
}
|
}
|
||||||
|
|||||||
7
src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs
Normal file
7
src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
namespace NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
public readonly record struct StoredMessageIndex(
|
||||||
|
ulong Sequence,
|
||||||
|
string Subject,
|
||||||
|
int PayloadLength,
|
||||||
|
DateTime TimestampUtc);
|
||||||
@@ -0,0 +1,151 @@
|
|||||||
|
using System.Diagnostics;
|
||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
using Xunit.Abstractions;
|
||||||
|
|
||||||
|
namespace NATS.Server.Benchmark.Tests.JetStream;
|
||||||
|
|
||||||
|
[Collection("Benchmark-JetStream")]
|
||||||
|
public class FileStoreAppendBenchmarks(ITestOutputHelper output)
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
[Trait("Category", "Benchmark")]
|
||||||
|
public async Task FileStore_AppendAsync_128B_Throughput()
|
||||||
|
{
|
||||||
|
var payload = new byte[128];
|
||||||
|
var dir = CreateDirectory("append");
|
||||||
|
var opts = CreateOptions(dir);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await using var store = new FileStore(opts);
|
||||||
|
await MeasureAsync("FileStore AppendAsync (128B)", operations: 20_000, payload.Length,
|
||||||
|
i => store.AppendAsync($"bench.append.{i % 8}", payload, default).AsTask());
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
DeleteDirectory(dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
[Trait("Category", "Benchmark")]
|
||||||
|
public void FileStore_LoadLastBySubject_Throughput()
|
||||||
|
{
|
||||||
|
var payload = new byte[64];
|
||||||
|
var dir = CreateDirectory("load-last");
|
||||||
|
var opts = CreateOptions(dir);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var store = new FileStore(opts);
|
||||||
|
for (var i = 0; i < 25_000; i++)
|
||||||
|
store.StoreMsg($"bench.subject.{i % 16}", null, payload, 0L);
|
||||||
|
|
||||||
|
Measure("FileStore LoadLastBySubject (hot)", operations: 50_000, payload.Length,
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
var loaded = store.LoadLastBySubjectAsync("bench.subject.7", default).GetAwaiter().GetResult();
|
||||||
|
if (loaded is null || loaded.Payload.Length != payload.Length)
|
||||||
|
throw new InvalidOperationException("LoadLastBySubjectAsync returned an unexpected result.");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
DeleteDirectory(dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
[Trait("Category", "Benchmark")]
|
||||||
|
public void FileStore_PurgeEx_Trim_Overhead()
|
||||||
|
{
|
||||||
|
var payload = new byte[96];
|
||||||
|
var dir = CreateDirectory("purge-trim");
|
||||||
|
var opts = CreateOptions(dir);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var store = new FileStore(opts);
|
||||||
|
for (var i = 0; i < 12_000; i++)
|
||||||
|
store.StoreMsg($"bench.purge.{i % 6}", null, payload, 0L);
|
||||||
|
|
||||||
|
Measure("FileStore PurgeEx+Trim", operations: 2_000, payload.Length,
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
store.PurgeEx("bench.purge.1", 0, 8);
|
||||||
|
store.TrimToMaxMessages(10_000);
|
||||||
|
store.StoreMsg("bench.purge.1", null, payload, 0L);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
DeleteDirectory(dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task MeasureAsync(string name, int operations, int payloadSize, Func<int, Task> action)
|
||||||
|
{
|
||||||
|
GC.Collect();
|
||||||
|
GC.WaitForPendingFinalizers();
|
||||||
|
GC.Collect();
|
||||||
|
|
||||||
|
var beforeAlloc = GC.GetAllocatedBytesForCurrentThread();
|
||||||
|
var sw = Stopwatch.StartNew();
|
||||||
|
|
||||||
|
for (var i = 0; i < operations; i++)
|
||||||
|
await action(i);
|
||||||
|
|
||||||
|
sw.Stop();
|
||||||
|
WriteResult(name, operations, (long)operations * payloadSize, sw.Elapsed, GC.GetAllocatedBytesForCurrentThread() - beforeAlloc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void Measure(string name, int operations, int payloadSize, Action action)
|
||||||
|
{
|
||||||
|
GC.Collect();
|
||||||
|
GC.WaitForPendingFinalizers();
|
||||||
|
GC.Collect();
|
||||||
|
|
||||||
|
var beforeAlloc = GC.GetAllocatedBytesForCurrentThread();
|
||||||
|
var sw = Stopwatch.StartNew();
|
||||||
|
|
||||||
|
for (var i = 0; i < operations; i++)
|
||||||
|
action();
|
||||||
|
|
||||||
|
sw.Stop();
|
||||||
|
WriteResult(name, operations, (long)operations * payloadSize, sw.Elapsed, GC.GetAllocatedBytesForCurrentThread() - beforeAlloc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void WriteResult(string name, int operations, long totalBytes, TimeSpan elapsed, long allocatedBytes)
|
||||||
|
{
|
||||||
|
var opsPerSecond = operations / elapsed.TotalSeconds;
|
||||||
|
var megabytesPerSecond = totalBytes / elapsed.TotalSeconds / (1024.0 * 1024.0);
|
||||||
|
var bytesPerOperation = allocatedBytes / (double)operations;
|
||||||
|
|
||||||
|
output.WriteLine($"=== {name} ===");
|
||||||
|
output.WriteLine($"Ops: {opsPerSecond:N0} ops/s");
|
||||||
|
output.WriteLine($"Data: {megabytesPerSecond:F1} MB/s");
|
||||||
|
output.WriteLine($"Alloc: {bytesPerOperation:F1} B/op");
|
||||||
|
output.WriteLine($"Elapsed: {elapsed.TotalMilliseconds:F0} ms");
|
||||||
|
output.WriteLine("");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string CreateDirectory(string suffix)
|
||||||
|
=> Path.Combine(Path.GetTempPath(), $"nats-js-filestore-bench-{suffix}-{Guid.NewGuid():N}");
|
||||||
|
|
||||||
|
private static FileStoreOptions CreateOptions(string dir)
|
||||||
|
{
|
||||||
|
Directory.CreateDirectory(dir);
|
||||||
|
|
||||||
|
return new FileStoreOptions
|
||||||
|
{
|
||||||
|
Directory = dir,
|
||||||
|
BlockSizeBytes = 256 * 1024,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void DeleteDirectory(string dir)
|
||||||
|
{
|
||||||
|
if (Directory.Exists(dir))
|
||||||
|
Directory.Delete(dir, recursive: true);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -45,6 +45,9 @@ Use `-v normal` or `--logger "console;verbosity=detailed"` to see the comparison
|
|||||||
| `MultiClientLatencyTests` | `RequestReply_10Clients2Services_16B` | Request/reply latency, 10 concurrent clients, 2 queue-group services |
|
| `MultiClientLatencyTests` | `RequestReply_10Clients2Services_16B` | Request/reply latency, 10 concurrent clients, 2 queue-group services |
|
||||||
| `SyncPublishTests` | `JSSyncPublish_16B_MemoryStore` | JetStream synchronous publish, memory-backed stream |
|
| `SyncPublishTests` | `JSSyncPublish_16B_MemoryStore` | JetStream synchronous publish, memory-backed stream |
|
||||||
| `AsyncPublishTests` | `JSAsyncPublish_128B_FileStore` | JetStream async batch publish, file-backed stream |
|
| `AsyncPublishTests` | `JSAsyncPublish_128B_FileStore` | JetStream async batch publish, file-backed stream |
|
||||||
|
| `FileStoreAppendBenchmarks` | `FileStore_AppendAsync_128B_Throughput` | FileStore direct append throughput, 128-byte payload |
|
||||||
|
| `FileStoreAppendBenchmarks` | `FileStore_LoadLastBySubject_Throughput` | FileStore hot-path subject index lookup throughput |
|
||||||
|
| `FileStoreAppendBenchmarks` | `FileStore_PurgeEx_Trim_Overhead` | FileStore purge/trim maintenance overhead under repeated updates |
|
||||||
| `OrderedConsumerTests` | `JSOrderedConsumer_Throughput` | JetStream ordered ephemeral consumer read throughput |
|
| `OrderedConsumerTests` | `JSOrderedConsumer_Throughput` | JetStream ordered ephemeral consumer read throughput |
|
||||||
| `DurableConsumerFetchTests` | `JSDurableFetch_Throughput` | JetStream durable consumer fetch-in-batches throughput |
|
| `DurableConsumerFetchTests` | `JSDurableFetch_Throughput` | JetStream durable consumer fetch-in-batches throughput |
|
||||||
|
|
||||||
|
|||||||
@@ -15,4 +15,27 @@ public class FileStoreTests
|
|||||||
await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName });
|
await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName });
|
||||||
(await recovered.GetStateAsync(default)).Messages.ShouldBe((ulong)1);
|
(await recovered.GetStateAsync(default)).Messages.ShouldBe((ulong)1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Snapshot_round_trip_preserves_headers_and_payload_separately()
|
||||||
|
{
|
||||||
|
var srcDir = Directory.CreateTempSubdirectory();
|
||||||
|
var dstDir = Directory.CreateTempSubdirectory();
|
||||||
|
|
||||||
|
await using var src = new FileStore(new FileStoreOptions { Directory = srcDir.FullName });
|
||||||
|
var hdr = "NATS/1.0\r\nX-Test: two\r\n\r\n"u8.ToArray();
|
||||||
|
var msg = "payload-two"u8.ToArray();
|
||||||
|
|
||||||
|
var (seq, _) = src.StoreMsg("events.a", hdr, msg, 0L);
|
||||||
|
var snapshot = await src.CreateSnapshotAsync(default);
|
||||||
|
|
||||||
|
await using var dst = new FileStore(new FileStoreOptions { Directory = dstDir.FullName });
|
||||||
|
await dst.RestoreSnapshotAsync(snapshot, default);
|
||||||
|
|
||||||
|
var loaded = dst.LoadMsg(seq, null);
|
||||||
|
loaded.Header.ShouldNotBeNull();
|
||||||
|
loaded.Header.ShouldBe(hdr);
|
||||||
|
loaded.Data.ShouldNotBeNull();
|
||||||
|
loaded.Data.ShouldBe(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,28 @@
|
|||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream.Tests.JetStream.Storage;
|
||||||
|
|
||||||
|
public sealed class FileStoreOptimizationGuardTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task PurgeEx_updates_last_by_subject_after_recovery()
|
||||||
|
{
|
||||||
|
var dir = Directory.CreateTempSubdirectory();
|
||||||
|
|
||||||
|
await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName }))
|
||||||
|
{
|
||||||
|
store.StoreMsg("events.a", null, "one"u8.ToArray(), 0L);
|
||||||
|
store.StoreMsg("events.a", null, "two"u8.ToArray(), 0L);
|
||||||
|
store.StoreMsg("events.b", null, "other"u8.ToArray(), 0L);
|
||||||
|
store.PurgeEx("events.a", 0, 1);
|
||||||
|
await store.FlushAllPending();
|
||||||
|
}
|
||||||
|
|
||||||
|
await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName });
|
||||||
|
var last = await recovered.LoadLastBySubjectAsync("events.a", default);
|
||||||
|
|
||||||
|
last.ShouldNotBeNull();
|
||||||
|
last.Sequence.ShouldBe(2UL);
|
||||||
|
last.Payload.ToArray().ShouldBe("two"u8.ToArray());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -181,7 +181,7 @@ public sealed class FileStoreTtlTests : IDisposable
|
|||||||
|
|
||||||
// Go: TestFileStoreStoreMsg — filestore.go storeMsg with headers
|
// Go: TestFileStoreStoreMsg — filestore.go storeMsg with headers
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task StoreMsg_WithHeaders_CombinesHeadersAndPayload()
|
public async Task StoreMsg_WithHeaders_KeepsPayloadSeparateFromHeaders()
|
||||||
{
|
{
|
||||||
await using var store = CreateStore(sub: "storemsg-headers");
|
await using var store = CreateStore(sub: "storemsg-headers");
|
||||||
|
|
||||||
@@ -192,10 +192,10 @@ public sealed class FileStoreTtlTests : IDisposable
|
|||||||
seq.ShouldBe(1UL);
|
seq.ShouldBe(1UL);
|
||||||
ts.ShouldBeGreaterThan(0L);
|
ts.ShouldBeGreaterThan(0L);
|
||||||
|
|
||||||
// The stored payload should be the combination of headers + body.
|
// The stored payload should remain the message body only.
|
||||||
var loaded = await store.LoadAsync(seq, default);
|
var loaded = await store.LoadAsync(seq, default);
|
||||||
loaded.ShouldNotBeNull();
|
loaded.ShouldNotBeNull();
|
||||||
loaded!.Payload.Length.ShouldBe(hdr.Length + body.Length);
|
loaded!.Payload.ToArray().ShouldBe(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Go: TestFileStoreStoreMsgPerMsgTtl — filestore.go per-message TTL override
|
// Go: TestFileStoreStoreMsgPerMsgTtl — filestore.go per-message TTL override
|
||||||
|
|||||||
@@ -532,4 +532,22 @@ public sealed class StoreInterfaceTests
|
|||||||
lastMsg = s.LoadLastMsg("foo", null);
|
lastMsg = s.LoadLastMsg("foo", null);
|
||||||
lastMsg.Sequence.ShouldBe(2UL);
|
lastMsg.Sequence.ShouldBe(2UL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void FileStore_LoadMsg_preserves_headers_separately_from_payload()
|
||||||
|
{
|
||||||
|
var dir = Directory.CreateTempSubdirectory();
|
||||||
|
using var store = new FileStore(new FileStoreOptions { Directory = dir.FullName });
|
||||||
|
|
||||||
|
var hdr = "NATS/1.0\r\nX-Test: one\r\n\r\n"u8.ToArray();
|
||||||
|
var msg = "payload"u8.ToArray();
|
||||||
|
|
||||||
|
var (seq, _) = store.StoreMsg("foo", hdr, msg, 0L);
|
||||||
|
var loaded = store.LoadMsg(seq, null);
|
||||||
|
|
||||||
|
loaded.Header.ShouldNotBeNull();
|
||||||
|
loaded.Header.ShouldBe(hdr);
|
||||||
|
loaded.Data.ShouldNotBeNull();
|
||||||
|
loaded.Data.ShouldBe(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,4 +15,22 @@ public class JetStreamStoreIndexTests
|
|||||||
var last = await store.LoadLastBySubjectAsync("orders.created", default);
|
var last = await store.LoadLastBySubjectAsync("orders.created", default);
|
||||||
last!.Payload.Span.SequenceEqual("3"u8).ShouldBeTrue();
|
last!.Payload.Span.SequenceEqual("3"u8).ShouldBeTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FileStore_trim_to_zero_preserves_high_water_mark_for_empty_state()
|
||||||
|
{
|
||||||
|
var dir = Directory.CreateTempSubdirectory();
|
||||||
|
await using var store = new FileStore(new FileStoreOptions { Directory = dir.FullName });
|
||||||
|
|
||||||
|
await store.AppendAsync("orders.created", "1"u8.ToArray(), default);
|
||||||
|
await store.AppendAsync("orders.updated", "2"u8.ToArray(), default);
|
||||||
|
await store.AppendAsync("orders.created", "3"u8.ToArray(), default);
|
||||||
|
|
||||||
|
store.TrimToMaxMessages(0);
|
||||||
|
|
||||||
|
var state = await store.GetStateAsync(default);
|
||||||
|
state.Messages.ShouldBe(0UL);
|
||||||
|
state.LastSeq.ShouldBe(3UL);
|
||||||
|
state.FirstSeq.ShouldBe(4UL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user