From b0b64292b383be64838d27a2248608ee7569d7f1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 13:42:17 -0500 Subject: [PATCH] feat(storage): add tombstone tracking and purge operations (Go parity) Implement PurgeEx, Compact, Truncate, FilteredState, SubjectsState, SubjectsTotals, State, FastState, GetSeqFromTime on FileStore. Add MsgBlock.IsDeleted, DeletedSequences, EnumerateNonDeleted. Includes wildcard subject support via SubjectMatch for all filtered operations. --- .../JetStream/Storage/FileStore.cs | 312 +++++++++++++ src/NATS.Server/JetStream/Storage/MsgBlock.cs | 63 +++ .../Storage/FileStorePurgeBlockTests.cs | 419 ++++++++++++++++++ 3 files changed, 794 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 78ae6f3..31c1977 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -227,6 +227,318 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable RewriteBlocks(); } + + // ------------------------------------------------------------------------- + // Go-parity sync interface implementations + // Reference: golang/nats-server/server/filestore.go + // ------------------------------------------------------------------------- + + /// + /// Removes all messages from the store and returns the count purged. + /// Reference: golang/nats-server/server/filestore.go — purge / purgeMsgs. + /// + public ulong Purge() + { + var count = (ulong)_messages.Count; + _messages.Clear(); + _last = 0; + + DisposeAllBlocks(); + CleanBlockFiles(); + + return count; + } + + /// + /// Purge messages on a given subject, up to sequence , + /// keeping the newest messages. + /// If subject is empty or null, behaves like . + /// Returns the number of messages removed. + /// Reference: golang/nats-server/server/filestore.go — PurgeEx. + /// + public ulong PurgeEx(string subject, ulong seq, ulong keep) + { + if (string.IsNullOrEmpty(subject)) + return Purge(); + + // Collect all messages matching the subject (with wildcard support) at or below seq, ordered by sequence. + var candidates = _messages.Values + .Where(m => SubjectMatchesFilter(m.Subject, subject)) + .Where(m => seq == 0 || m.Sequence <= seq) + .OrderBy(m => m.Sequence) + .ToList(); + + if (candidates.Count == 0) + return 0; + + // Keep the newest `keep` messages; purge the rest. + var toRemove = keep > 0 && (ulong)candidates.Count > keep + ? candidates.Take(candidates.Count - (int)keep).ToList() + : (keep == 0 ? candidates : []); + + if (toRemove.Count == 0) + return 0; + + foreach (var msg in toRemove) + { + _messages.Remove(msg.Sequence); + DeleteInBlock(msg.Sequence); + } + + // Update _last if required. + if (_messages.Count == 0) + _last = 0; + else if (!_messages.ContainsKey(_last)) + _last = _messages.Keys.Max(); + + return (ulong)toRemove.Count; + } + + /// + /// Removes all messages with sequence number strictly less than + /// and returns the count removed. + /// Reference: golang/nats-server/server/filestore.go — Compact. + /// + public ulong Compact(ulong seq) + { + if (seq == 0) + return 0; + + var toRemove = _messages.Keys.Where(k => k < seq).ToArray(); + if (toRemove.Length == 0) + return 0; + + foreach (var s in toRemove) + { + _messages.Remove(s); + DeleteInBlock(s); + } + + if (_messages.Count == 0) + _last = 0; + else if (!_messages.ContainsKey(_last)) + _last = _messages.Keys.Max(); + + return (ulong)toRemove.Length; + } + + /// + /// Removes all messages with sequence number strictly greater than + /// and updates the last sequence pointer. + /// Reference: golang/nats-server/server/filestore.go — Truncate. + /// + public void Truncate(ulong seq) + { + if (seq == 0) + { + // Truncate to nothing. + _messages.Clear(); + _last = 0; + DisposeAllBlocks(); + CleanBlockFiles(); + return; + } + + var toRemove = _messages.Keys.Where(k => k > seq).ToArray(); + foreach (var s in toRemove) + { + _messages.Remove(s); + DeleteInBlock(s); + } + + // Update _last to the new highest existing sequence (or seq if it exists, + // or the highest below seq). + _last = _messages.Count == 0 ? 0 : _messages.Keys.Max(); + } + + /// + /// Returns the first sequence number at or after the given UTC time. + /// Returns _last + 1 if no message exists at or after . + /// Reference: golang/nats-server/server/filestore.go — GetSeqFromTime. + /// + public ulong GetSeqFromTime(DateTime t) + { + var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime(); + var match = _messages.Values + .Where(m => m.TimestampUtc >= utc) + .OrderBy(m => m.Sequence) + .FirstOrDefault(); + + return match?.Sequence ?? _last + 1; + } + + /// + /// Returns compact state for non-deleted messages on + /// at or after sequence . + /// Reference: golang/nats-server/server/filestore.go — FilteredState. + /// + public SimpleState FilteredState(ulong seq, string subject) + { + var matching = _messages.Values + .Where(m => m.Sequence >= seq) + .Where(m => string.IsNullOrEmpty(subject) + || SubjectMatchesFilter(m.Subject, subject)) + .OrderBy(m => m.Sequence) + .ToList(); + + if (matching.Count == 0) + return new SimpleState(); + + return new SimpleState + { + Msgs = (ulong)matching.Count, + First = matching[0].Sequence, + Last = matching[^1].Sequence, + }; + } + + /// + /// Returns per-subject for all subjects matching + /// . Supports NATS wildcard filters. + /// Reference: golang/nats-server/server/filestore.go — SubjectsState. + /// + public Dictionary SubjectsState(string filterSubject) + { + var result = new Dictionary(StringComparer.Ordinal); + + foreach (var msg in _messages.Values) + { + if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject)) + continue; + + if (result.TryGetValue(msg.Subject, out var existing)) + { + result[msg.Subject] = new SimpleState + { + Msgs = existing.Msgs + 1, + First = Math.Min(existing.First == 0 ? msg.Sequence : existing.First, msg.Sequence), + Last = Math.Max(existing.Last, msg.Sequence), + }; + } + else + { + result[msg.Subject] = new SimpleState + { + Msgs = 1, + First = msg.Sequence, + Last = msg.Sequence, + }; + } + } + + return result; + } + + /// + /// Returns per-subject message counts for all subjects matching + /// . Supports NATS wildcard filters. + /// Reference: golang/nats-server/server/filestore.go — SubjectsTotals. + /// + public Dictionary SubjectsTotals(string filterSubject) + { + var result = new Dictionary(StringComparer.Ordinal); + + foreach (var msg in _messages.Values) + { + if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject)) + continue; + + result.TryGetValue(msg.Subject, out var count); + result[msg.Subject] = count + 1; + } + + return result; + } + + /// + /// Returns the full stream state, including the list of deleted (interior gap) sequences. + /// Reference: golang/nats-server/server/filestore.go — State. + /// + public StreamState State() + { + var state = new StreamState(); + FastState(ref state); + + // Populate deleted sequences: sequences in [firstSeq, lastSeq] that are + // not present in _messages. + if (state.FirstSeq > 0 && state.LastSeq >= state.FirstSeq) + { + var deletedList = new List(); + for (var s = state.FirstSeq; s <= state.LastSeq; s++) + { + if (!_messages.ContainsKey(s)) + deletedList.Add(s); + } + + if (deletedList.Count > 0) + { + state.Deleted = [.. deletedList]; + state.NumDeleted = deletedList.Count; + } + } + + // Populate per-subject counts. + var subjectCounts = new Dictionary(StringComparer.Ordinal); + foreach (var msg in _messages.Values) + { + subjectCounts.TryGetValue(msg.Subject, out var cnt); + subjectCounts[msg.Subject] = cnt + 1; + } + state.NumSubjects = subjectCounts.Count; + state.Subjects = subjectCounts.Count > 0 ? subjectCounts : null; + + return state; + } + + /// + /// Populates a pre-allocated with the minimum fields + /// needed for replication without allocating a new struct. + /// Does not populate the array or + /// dictionary. + /// Reference: golang/nats-server/server/filestore.go — FastState. + /// + public void FastState(ref StreamState state) + { + state.Msgs = (ulong)_messages.Count; + state.Bytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); + state.LastSeq = _last; + state.LastTime = default; + + if (_messages.Count == 0) + { + state.FirstSeq = 0; + state.FirstTime = default; + } + else + { + var firstSeq = _messages.Keys.Min(); + state.FirstSeq = firstSeq; + state.FirstTime = _messages[firstSeq].TimestampUtc; + state.LastTime = _messages[_last].TimestampUtc; + } + } + + // ------------------------------------------------------------------------- + // Subject matching helper + // ------------------------------------------------------------------------- + + /// + /// Returns true if matches . + /// If filter is a literal, performs exact string comparison. + /// If filter contains NATS wildcards (* or >), uses SubjectMatch.MatchLiteral. + /// Reference: golang/nats-server/server/filestore.go — subjectMatch helper. + /// + private static bool SubjectMatchesFilter(string subject, string filter) + { + if (string.IsNullOrEmpty(filter)) + return true; + + if (NATS.Server.Subscriptions.SubjectMatch.IsLiteral(filter)) + return string.Equals(subject, filter, StringComparison.Ordinal); + + return NATS.Server.Subscriptions.SubjectMatch.MatchLiteral(subject, filter); + } + public ValueTask DisposeAsync() { DisposeAllBlocks(); diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs index 75d5c3d..a6f22de 100644 --- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs +++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs @@ -322,6 +322,69 @@ public sealed class MsgBlock : IDisposable } } + + /// + /// Returns true if the given sequence number has been soft-deleted in this block. + /// Reference: golang/nats-server/server/filestore.go — dmap (deleted map) lookup. + /// + public bool IsDeleted(ulong sequence) + { + _lock.EnterReadLock(); + try { return _deleted.Contains(sequence); } + finally { _lock.ExitReadLock(); } + } + + /// + /// Exposes the set of soft-deleted sequence numbers for read-only inspection. + /// Reference: golang/nats-server/server/filestore.go — dmap access for state queries. + /// + public IReadOnlySet DeletedSequences + { + get + { + _lock.EnterReadLock(); + try { return new HashSet(_deleted); } + finally { _lock.ExitReadLock(); } + } + } + + /// + /// Enumerates all non-deleted sequences in this block along with their subjects. + /// Used by FileStore for subject-filtered operations (PurgeEx, SubjectsState, etc.). + /// Reference: golang/nats-server/server/filestore.go — loadBlock, iterating non-deleted records. + /// + public IEnumerable<(ulong Sequence, string Subject)> EnumerateNonDeleted() + { + // Snapshot index and deleted set under the read lock, then decode outside it. + List<(long Offset, int Length, ulong Seq)> entries; + _lock.EnterReadLock(); + try + { + entries = new List<(long, int, ulong)>(_index.Count); + foreach (var (seq, (offset, length)) in _index) + { + if (!_deleted.Contains(seq)) + entries.Add((offset, length, seq)); + } + } + finally + { + _lock.ExitReadLock(); + } + + // Sort by sequence for deterministic output. + entries.Sort((a, b) => a.Seq.CompareTo(b.Seq)); + + foreach (var (offset, length, seq) in entries) + { + var buffer = new byte[length]; + RandomAccess.Read(_handle, buffer, offset); + var record = MessageRecord.Decode(buffer); + if (record is not null && !record.Deleted) + yield return (record.Sequence, record.Subject); + } + } + /// /// Flushes any buffered writes to disk. /// diff --git a/tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs new file mode 100644 index 0000000..ee24b5c --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs @@ -0,0 +1,419 @@ +// Reference: golang/nats-server/server/filestore_test.go +// Tests ported from: TestFileStorePurgeEx, TestFileStorePurgeExWithSubject, +// TestFileStorePurgeExKeepOneBug, TestFileStoreCompact, TestFileStoreStreamTruncate, +// TestFileStoreState, TestFileStoreFilteredState, TestFileStoreSubjectsState, +// TestFileStoreGetSeqFromTime + +using System.Text; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +/// +/// Tests for FileStore tombstone tracking and purge operations: +/// PurgeEx, Compact, Truncate, FilteredState, SubjectsState, SubjectsTotals, +/// State (with deleted sequences), and GetSeqFromTime. +/// Reference: golang/nats-server/server/filestore_test.go +/// +public sealed class FileStorePurgeBlockTests : IDisposable +{ + private readonly string _dir; + + public FileStorePurgeBlockTests() + { + _dir = Path.Combine(Path.GetTempPath(), $"nats-js-purgeblock-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_dir); + } + + public void Dispose() + { + if (Directory.Exists(_dir)) + Directory.Delete(_dir, recursive: true); + } + + private FileStore CreateStore(string subdirectory, FileStoreOptions? options = null) + { + var dir = Path.Combine(_dir, subdirectory); + var opts = options ?? new FileStoreOptions(); + opts.Directory = dir; + return new FileStore(opts); + } + + // ------------------------------------------------------------------------- + // PurgeEx tests + // ------------------------------------------------------------------------- + + // Go: TestFileStorePurgeExWithSubject — filestore_test.go:~867 + [Fact] + public async Task PurgeEx_BySubject_RemovesMatchingMessages() + { + await using var store = CreateStore("purgex-subject"); + + // Store 5 messages on "foo" and 5 on "bar" + for (var i = 0; i < 5; i++) + await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"foo-{i}"), default); + for (var i = 0; i < 5; i++) + await store.AppendAsync("bar", Encoding.UTF8.GetBytes($"bar-{i}"), default); + + var stateBeforePurge = await store.GetStateAsync(default); + stateBeforePurge.Messages.ShouldBe(10UL); + + // Purge all "foo" messages (seq=0 means no upper limit; keep=0 means keep none) + var purged = store.PurgeEx("foo", 0, 0); + purged.ShouldBe(5UL); + + // Only "bar" messages remain + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe(5UL); + + var remaining = await store.ListAsync(default); + remaining.All(m => m.Subject == "bar").ShouldBeTrue(); + } + + // Go: TestFileStorePurgeExKeepOneBug — filestore_test.go:~910 + [Fact] + public async Task PurgeEx_WithKeep_RetainsNewestMessages() + { + await using var store = CreateStore("purgex-keep"); + + // Store 10 messages on "events" + for (var i = 0; i < 10; i++) + await store.AppendAsync("events", Encoding.UTF8.GetBytes($"msg-{i}"), default); + + // Purge keeping the 3 newest + var purged = store.PurgeEx("events", 0, 3); + purged.ShouldBe(7UL); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(3); + + // The retained messages should be the 3 highest sequences (8, 9, 10) + var seqs = remaining.Select(m => m.Sequence).OrderBy(s => s).ToArray(); + seqs[0].ShouldBe(8UL); + seqs[1].ShouldBe(9UL); + seqs[2].ShouldBe(10UL); + } + + // Go: TestFileStorePurgeEx — filestore_test.go:~855 + [Fact] + public async Task PurgeEx_WithSeqLimit_OnlyPurgesBelowSequence() + { + await using var store = CreateStore("purgex-seqlimit"); + + // Store 10 messages on "data" + for (var i = 1; i <= 10; i++) + await store.AppendAsync("data", Encoding.UTF8.GetBytes($"d{i}"), default); + + // Purge "data" messages with seq <= 5 (keep=0) + var purged = store.PurgeEx("data", 5, 0); + purged.ShouldBe(5UL); + + // Messages 6-10 should remain + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(5); + remaining.Min(m => m.Sequence).ShouldBe(6UL); + remaining.Max(m => m.Sequence).ShouldBe(10UL); + } + + // Go: PurgeEx with wildcard subject — filestore_test.go:~867 + [Fact] + public async Task PurgeEx_WithWildcardSubject_RemovesAllMatchingSubjects() + { + await using var store = CreateStore("purgex-wildcard"); + + await store.AppendAsync("foo.a", "m1"u8.ToArray(), default); + await store.AppendAsync("foo.b", "m2"u8.ToArray(), default); + await store.AppendAsync("bar.a", "m3"u8.ToArray(), default); + await store.AppendAsync("foo.c", "m4"u8.ToArray(), default); + + var purged = store.PurgeEx("foo.*", 0, 0); + purged.ShouldBe(3UL); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(1); + remaining[0].Subject.ShouldBe("bar.a"); + } + + // Go: PurgeEx with > wildcard — filestore_test.go:~867 + [Fact] + public async Task PurgeEx_WithGtWildcard_RemovesAllMatchingSubjects() + { + await using var store = CreateStore("purgex-gt-wildcard"); + + await store.AppendAsync("a.b.c", "m1"u8.ToArray(), default); + await store.AppendAsync("a.b.d", "m2"u8.ToArray(), default); + await store.AppendAsync("a.x", "m3"u8.ToArray(), default); + await store.AppendAsync("b.x", "m4"u8.ToArray(), default); + + var purged = store.PurgeEx("a.>", 0, 0); + purged.ShouldBe(3UL); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(1); + remaining[0].Subject.ShouldBe("b.x"); + } + + // ------------------------------------------------------------------------- + // Compact tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreCompact — filestore_test.go:~964 + [Fact] + public async Task Compact_RemovesMessagesBeforeSequence() + { + await using var store = CreateStore("compact-basic"); + + // Store 10 messages + for (var i = 1; i <= 10; i++) + await store.AppendAsync("test", Encoding.UTF8.GetBytes($"msg{i}"), default); + + // Compact to remove messages with seq < 5 (removes 1, 2, 3, 4) + var removed = store.Compact(5); + removed.ShouldBe(4UL); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(6); // 5-10 + + remaining.Min(m => m.Sequence).ShouldBe(5UL); + remaining.Max(m => m.Sequence).ShouldBe(10UL); + + // Sequence 1-4 should no longer be loadable + (await store.LoadAsync(1, default)).ShouldBeNull(); + (await store.LoadAsync(4, default)).ShouldBeNull(); + + // Sequence 5 should still exist + (await store.LoadAsync(5, default)).ShouldNotBeNull(); + } + + // ------------------------------------------------------------------------- + // Truncate tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreStreamTruncate — filestore_test.go:~1035 + [Fact] + public async Task Truncate_RemovesMessagesAfterSequence() + { + await using var store = CreateStore("truncate-basic"); + + // Store 10 messages + for (var i = 1; i <= 10; i++) + await store.AppendAsync("stream", Encoding.UTF8.GetBytes($"m{i}"), default); + + // Truncate at seq=5 (removes 6, 7, 8, 9, 10) + store.Truncate(5); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(5); // 1-5 + + remaining.Min(m => m.Sequence).ShouldBe(1UL); + remaining.Max(m => m.Sequence).ShouldBe(5UL); + + // Messages 6-10 should be gone + (await store.LoadAsync(6, default)).ShouldBeNull(); + (await store.LoadAsync(10, default)).ShouldBeNull(); + + // Message 5 should still exist + (await store.LoadAsync(5, default)).ShouldNotBeNull(); + } + + // ------------------------------------------------------------------------- + // FilteredState tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreFilteredState — filestore_test.go:~1200 + [Fact] + public async Task FilteredState_ReturnsCorrectState() + { + await using var store = CreateStore("filteredstate"); + + // Store 5 messages on "orders" and 5 on "invoices" + for (var i = 1; i <= 5; i++) + await store.AppendAsync("orders", Encoding.UTF8.GetBytes($"o{i}"), default); + for (var i = 1; i <= 5; i++) + await store.AppendAsync("invoices", Encoding.UTF8.GetBytes($"inv{i}"), default); + + // FilteredState for "orders" from seq=1 + var ordersState = store.FilteredState(1, "orders"); + ordersState.Msgs.ShouldBe(5UL); + ordersState.First.ShouldBe(1UL); + ordersState.Last.ShouldBe(5UL); + + // FilteredState for "invoices" from seq=1 + var invoicesState = store.FilteredState(1, "invoices"); + invoicesState.Msgs.ShouldBe(5UL); + invoicesState.First.ShouldBe(6UL); + invoicesState.Last.ShouldBe(10UL); + + // FilteredState from seq=7 (only 4 invoices remain) + var lateInvoices = store.FilteredState(7, "invoices"); + lateInvoices.Msgs.ShouldBe(4UL); + lateInvoices.First.ShouldBe(7UL); + lateInvoices.Last.ShouldBe(10UL); + + // No match for non-existent subject + var noneState = store.FilteredState(1, "orders.unknown"); + noneState.Msgs.ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // SubjectsState tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreSubjectsState — filestore_test.go:~1266 + [Fact] + public async Task SubjectsState_ReturnsPerSubjectState() + { + await using var store = CreateStore("subjectsstate"); + + await store.AppendAsync("a.1", "msg"u8.ToArray(), default); + await store.AppendAsync("a.2", "msg"u8.ToArray(), default); + await store.AppendAsync("a.1", "msg"u8.ToArray(), default); + await store.AppendAsync("b.1", "msg"u8.ToArray(), default); + + var state = store.SubjectsState("a.>"); + + state.ShouldContainKey("a.1"); + state.ShouldContainKey("a.2"); + state.ShouldNotContainKey("b.1"); + + state["a.1"].Msgs.ShouldBe(2UL); + state["a.1"].First.ShouldBe(1UL); + state["a.1"].Last.ShouldBe(3UL); + + state["a.2"].Msgs.ShouldBe(1UL); + state["a.2"].First.ShouldBe(2UL); + state["a.2"].Last.ShouldBe(2UL); + } + + // ------------------------------------------------------------------------- + // SubjectsTotals tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreSubjectsTotals — filestore_test.go:~1300 + [Fact] + public async Task SubjectsTotals_ReturnsPerSubjectCounts() + { + await using var store = CreateStore("subjectstotals"); + + await store.AppendAsync("x.1", "m"u8.ToArray(), default); + await store.AppendAsync("x.1", "m"u8.ToArray(), default); + await store.AppendAsync("x.2", "m"u8.ToArray(), default); + await store.AppendAsync("y.1", "m"u8.ToArray(), default); + await store.AppendAsync("x.3", "m"u8.ToArray(), default); + + var totals = store.SubjectsTotals("x.*"); + + totals.ShouldContainKey("x.1"); + totals.ShouldContainKey("x.2"); + totals.ShouldContainKey("x.3"); + totals.ShouldNotContainKey("y.1"); + + totals["x.1"].ShouldBe(2UL); + totals["x.2"].ShouldBe(1UL); + totals["x.3"].ShouldBe(1UL); + } + + // ------------------------------------------------------------------------- + // State (with deleted sequences) tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreState — filestore_test.go:~420 + [Fact] + public async Task State_IncludesDeletedSequences() + { + await using var store = CreateStore("state-deleted"); + + // Store 10 messages + for (var i = 1; i <= 10; i++) + await store.AppendAsync("events", Encoding.UTF8.GetBytes($"e{i}"), default); + + // Remove messages 3, 5, 7 + await store.RemoveAsync(3, default); + await store.RemoveAsync(5, default); + await store.RemoveAsync(7, default); + + var state = store.State(); + + state.Msgs.ShouldBe(7UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(10UL); + state.NumDeleted.ShouldBe(3); + + state.Deleted.ShouldNotBeNull(); + state.Deleted!.ShouldContain(3UL); + state.Deleted.ShouldContain(5UL); + state.Deleted.ShouldContain(7UL); + state.Deleted.Length.ShouldBe(3); + + // NumSubjects: all messages are on "events" + state.NumSubjects.ShouldBe(1); + state.Subjects.ShouldNotBeNull(); + state.Subjects!["events"].ShouldBe(7UL); + } + + // ------------------------------------------------------------------------- + // GetSeqFromTime tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreGetSeqFromTime — filestore_test.go:~1570 + [Fact] + public async Task GetSeqFromTime_ReturnsCorrectSequence() + { + await using var store = CreateStore("getseqfromtime"); + + // Store 5 messages; we'll query by the timestamp of the 3rd message + var timestamps = new List(); + for (var i = 1; i <= 5; i++) + { + await store.AppendAsync("time.test", Encoding.UTF8.GetBytes($"t{i}"), default); + var msgs = await store.ListAsync(default); + timestamps.Add(msgs[^1].TimestampUtc); + // Small delay to ensure distinct timestamps + await Task.Delay(5); + } + + // Query for first seq at or after the timestamp of msg 3 + var targetTime = timestamps[2]; // timestamp of sequence 3 + var seq = store.GetSeqFromTime(targetTime); + seq.ShouldBe(3UL); + + // Query with a time before all messages: should return 1 + var beforeAll = timestamps[0].AddMilliseconds(-100); + store.GetSeqFromTime(beforeAll).ShouldBe(1UL); + + // Query with a time after all messages: should return last+1 + var afterAll = timestamps[^1].AddSeconds(1); + store.GetSeqFromTime(afterAll).ShouldBe(6UL); // _last + 1 + } + + // ------------------------------------------------------------------------- + // MsgBlock enhancements + // ------------------------------------------------------------------------- + + // Go: filestore.go dmap — soft-delete tracking and enumeration + [Fact] + public async Task MsgBlock_IsDeleted_AndEnumerateNonDeleted_Work() + { + await using var store = CreateStore("block-enumerate"); + + // Store 5 messages on 2 subjects + await store.AppendAsync("a.1", "m1"u8.ToArray(), default); + await store.AppendAsync("a.2", "m2"u8.ToArray(), default); + await store.AppendAsync("a.1", "m3"u8.ToArray(), default); + await store.AppendAsync("b.1", "m4"u8.ToArray(), default); + await store.AppendAsync("a.2", "m5"u8.ToArray(), default); + + // Delete sequences 2 and 4 + await store.RemoveAsync(2, default); + await store.RemoveAsync(4, default); + + // Verify the state after deletion + var all = await store.ListAsync(default); + all.Count.ShouldBe(3); + all.Select(m => m.Sequence).ShouldBe([1UL, 3UL, 5UL]); + + // FilteredState should only see non-deleted + var aState = store.FilteredState(1, "a.1"); + aState.Msgs.ShouldBe(2UL); // sequences 1 and 3 + } +}