diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 78ae6f3..31c1977 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -227,6 +227,318 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable RewriteBlocks(); } + + // ------------------------------------------------------------------------- + // Go-parity sync interface implementations + // Reference: golang/nats-server/server/filestore.go + // ------------------------------------------------------------------------- + + /// + /// Removes all messages from the store and returns the count purged. + /// Reference: golang/nats-server/server/filestore.go — purge / purgeMsgs. + /// + public ulong Purge() + { + var count = (ulong)_messages.Count; + _messages.Clear(); + _last = 0; + + DisposeAllBlocks(); + CleanBlockFiles(); + + return count; + } + + /// + /// Purge messages on a given subject, up to sequence , + /// keeping the newest messages. + /// If subject is empty or null, behaves like . + /// Returns the number of messages removed. + /// Reference: golang/nats-server/server/filestore.go — PurgeEx. + /// + public ulong PurgeEx(string subject, ulong seq, ulong keep) + { + if (string.IsNullOrEmpty(subject)) + return Purge(); + + // Collect all messages matching the subject (with wildcard support) at or below seq, ordered by sequence. + var candidates = _messages.Values + .Where(m => SubjectMatchesFilter(m.Subject, subject)) + .Where(m => seq == 0 || m.Sequence <= seq) + .OrderBy(m => m.Sequence) + .ToList(); + + if (candidates.Count == 0) + return 0; + + // Keep the newest `keep` messages; purge the rest. + var toRemove = keep > 0 && (ulong)candidates.Count > keep + ? candidates.Take(candidates.Count - (int)keep).ToList() + : (keep == 0 ? candidates : []); + + if (toRemove.Count == 0) + return 0; + + foreach (var msg in toRemove) + { + _messages.Remove(msg.Sequence); + DeleteInBlock(msg.Sequence); + } + + // Update _last if required. + if (_messages.Count == 0) + _last = 0; + else if (!_messages.ContainsKey(_last)) + _last = _messages.Keys.Max(); + + return (ulong)toRemove.Count; + } + + /// + /// Removes all messages with sequence number strictly less than + /// and returns the count removed. + /// Reference: golang/nats-server/server/filestore.go — Compact. + /// + public ulong Compact(ulong seq) + { + if (seq == 0) + return 0; + + var toRemove = _messages.Keys.Where(k => k < seq).ToArray(); + if (toRemove.Length == 0) + return 0; + + foreach (var s in toRemove) + { + _messages.Remove(s); + DeleteInBlock(s); + } + + if (_messages.Count == 0) + _last = 0; + else if (!_messages.ContainsKey(_last)) + _last = _messages.Keys.Max(); + + return (ulong)toRemove.Length; + } + + /// + /// Removes all messages with sequence number strictly greater than + /// and updates the last sequence pointer. + /// Reference: golang/nats-server/server/filestore.go — Truncate. + /// + public void Truncate(ulong seq) + { + if (seq == 0) + { + // Truncate to nothing. + _messages.Clear(); + _last = 0; + DisposeAllBlocks(); + CleanBlockFiles(); + return; + } + + var toRemove = _messages.Keys.Where(k => k > seq).ToArray(); + foreach (var s in toRemove) + { + _messages.Remove(s); + DeleteInBlock(s); + } + + // Update _last to the new highest existing sequence (or seq if it exists, + // or the highest below seq). + _last = _messages.Count == 0 ? 0 : _messages.Keys.Max(); + } + + /// + /// Returns the first sequence number at or after the given UTC time. + /// Returns _last + 1 if no message exists at or after . + /// Reference: golang/nats-server/server/filestore.go — GetSeqFromTime. + /// + public ulong GetSeqFromTime(DateTime t) + { + var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime(); + var match = _messages.Values + .Where(m => m.TimestampUtc >= utc) + .OrderBy(m => m.Sequence) + .FirstOrDefault(); + + return match?.Sequence ?? _last + 1; + } + + /// + /// Returns compact state for non-deleted messages on + /// at or after sequence . + /// Reference: golang/nats-server/server/filestore.go — FilteredState. + /// + public SimpleState FilteredState(ulong seq, string subject) + { + var matching = _messages.Values + .Where(m => m.Sequence >= seq) + .Where(m => string.IsNullOrEmpty(subject) + || SubjectMatchesFilter(m.Subject, subject)) + .OrderBy(m => m.Sequence) + .ToList(); + + if (matching.Count == 0) + return new SimpleState(); + + return new SimpleState + { + Msgs = (ulong)matching.Count, + First = matching[0].Sequence, + Last = matching[^1].Sequence, + }; + } + + /// + /// Returns per-subject for all subjects matching + /// . Supports NATS wildcard filters. + /// Reference: golang/nats-server/server/filestore.go — SubjectsState. + /// + public Dictionary SubjectsState(string filterSubject) + { + var result = new Dictionary(StringComparer.Ordinal); + + foreach (var msg in _messages.Values) + { + if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject)) + continue; + + if (result.TryGetValue(msg.Subject, out var existing)) + { + result[msg.Subject] = new SimpleState + { + Msgs = existing.Msgs + 1, + First = Math.Min(existing.First == 0 ? msg.Sequence : existing.First, msg.Sequence), + Last = Math.Max(existing.Last, msg.Sequence), + }; + } + else + { + result[msg.Subject] = new SimpleState + { + Msgs = 1, + First = msg.Sequence, + Last = msg.Sequence, + }; + } + } + + return result; + } + + /// + /// Returns per-subject message counts for all subjects matching + /// . Supports NATS wildcard filters. + /// Reference: golang/nats-server/server/filestore.go — SubjectsTotals. + /// + public Dictionary SubjectsTotals(string filterSubject) + { + var result = new Dictionary(StringComparer.Ordinal); + + foreach (var msg in _messages.Values) + { + if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject)) + continue; + + result.TryGetValue(msg.Subject, out var count); + result[msg.Subject] = count + 1; + } + + return result; + } + + /// + /// Returns the full stream state, including the list of deleted (interior gap) sequences. + /// Reference: golang/nats-server/server/filestore.go — State. + /// + public StreamState State() + { + var state = new StreamState(); + FastState(ref state); + + // Populate deleted sequences: sequences in [firstSeq, lastSeq] that are + // not present in _messages. + if (state.FirstSeq > 0 && state.LastSeq >= state.FirstSeq) + { + var deletedList = new List(); + for (var s = state.FirstSeq; s <= state.LastSeq; s++) + { + if (!_messages.ContainsKey(s)) + deletedList.Add(s); + } + + if (deletedList.Count > 0) + { + state.Deleted = [.. deletedList]; + state.NumDeleted = deletedList.Count; + } + } + + // Populate per-subject counts. + var subjectCounts = new Dictionary(StringComparer.Ordinal); + foreach (var msg in _messages.Values) + { + subjectCounts.TryGetValue(msg.Subject, out var cnt); + subjectCounts[msg.Subject] = cnt + 1; + } + state.NumSubjects = subjectCounts.Count; + state.Subjects = subjectCounts.Count > 0 ? subjectCounts : null; + + return state; + } + + /// + /// Populates a pre-allocated with the minimum fields + /// needed for replication without allocating a new struct. + /// Does not populate the array or + /// dictionary. + /// Reference: golang/nats-server/server/filestore.go — FastState. + /// + public void FastState(ref StreamState state) + { + state.Msgs = (ulong)_messages.Count; + state.Bytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); + state.LastSeq = _last; + state.LastTime = default; + + if (_messages.Count == 0) + { + state.FirstSeq = 0; + state.FirstTime = default; + } + else + { + var firstSeq = _messages.Keys.Min(); + state.FirstSeq = firstSeq; + state.FirstTime = _messages[firstSeq].TimestampUtc; + state.LastTime = _messages[_last].TimestampUtc; + } + } + + // ------------------------------------------------------------------------- + // Subject matching helper + // ------------------------------------------------------------------------- + + /// + /// Returns true if matches . + /// If filter is a literal, performs exact string comparison. + /// If filter contains NATS wildcards (* or >), uses SubjectMatch.MatchLiteral. + /// Reference: golang/nats-server/server/filestore.go — subjectMatch helper. + /// + private static bool SubjectMatchesFilter(string subject, string filter) + { + if (string.IsNullOrEmpty(filter)) + return true; + + if (NATS.Server.Subscriptions.SubjectMatch.IsLiteral(filter)) + return string.Equals(subject, filter, StringComparison.Ordinal); + + return NATS.Server.Subscriptions.SubjectMatch.MatchLiteral(subject, filter); + } + public ValueTask DisposeAsync() { DisposeAllBlocks(); diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs index 75d5c3d..a6f22de 100644 --- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs +++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs @@ -322,6 +322,69 @@ public sealed class MsgBlock : IDisposable } } + + /// + /// Returns true if the given sequence number has been soft-deleted in this block. + /// Reference: golang/nats-server/server/filestore.go — dmap (deleted map) lookup. + /// + public bool IsDeleted(ulong sequence) + { + _lock.EnterReadLock(); + try { return _deleted.Contains(sequence); } + finally { _lock.ExitReadLock(); } + } + + /// + /// Exposes the set of soft-deleted sequence numbers for read-only inspection. + /// Reference: golang/nats-server/server/filestore.go — dmap access for state queries. + /// + public IReadOnlySet DeletedSequences + { + get + { + _lock.EnterReadLock(); + try { return new HashSet(_deleted); } + finally { _lock.ExitReadLock(); } + } + } + + /// + /// Enumerates all non-deleted sequences in this block along with their subjects. + /// Used by FileStore for subject-filtered operations (PurgeEx, SubjectsState, etc.). + /// Reference: golang/nats-server/server/filestore.go — loadBlock, iterating non-deleted records. + /// + public IEnumerable<(ulong Sequence, string Subject)> EnumerateNonDeleted() + { + // Snapshot index and deleted set under the read lock, then decode outside it. + List<(long Offset, int Length, ulong Seq)> entries; + _lock.EnterReadLock(); + try + { + entries = new List<(long, int, ulong)>(_index.Count); + foreach (var (seq, (offset, length)) in _index) + { + if (!_deleted.Contains(seq)) + entries.Add((offset, length, seq)); + } + } + finally + { + _lock.ExitReadLock(); + } + + // Sort by sequence for deterministic output. + entries.Sort((a, b) => a.Seq.CompareTo(b.Seq)); + + foreach (var (offset, length, seq) in entries) + { + var buffer = new byte[length]; + RandomAccess.Read(_handle, buffer, offset); + var record = MessageRecord.Decode(buffer); + if (record is not null && !record.Deleted) + yield return (record.Sequence, record.Subject); + } + } + /// /// Flushes any buffered writes to disk. /// diff --git a/tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs new file mode 100644 index 0000000..ee24b5c --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs @@ -0,0 +1,419 @@ +// Reference: golang/nats-server/server/filestore_test.go +// Tests ported from: TestFileStorePurgeEx, TestFileStorePurgeExWithSubject, +// TestFileStorePurgeExKeepOneBug, TestFileStoreCompact, TestFileStoreStreamTruncate, +// TestFileStoreState, TestFileStoreFilteredState, TestFileStoreSubjectsState, +// TestFileStoreGetSeqFromTime + +using System.Text; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +/// +/// Tests for FileStore tombstone tracking and purge operations: +/// PurgeEx, Compact, Truncate, FilteredState, SubjectsState, SubjectsTotals, +/// State (with deleted sequences), and GetSeqFromTime. +/// Reference: golang/nats-server/server/filestore_test.go +/// +public sealed class FileStorePurgeBlockTests : IDisposable +{ + private readonly string _dir; + + public FileStorePurgeBlockTests() + { + _dir = Path.Combine(Path.GetTempPath(), $"nats-js-purgeblock-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_dir); + } + + public void Dispose() + { + if (Directory.Exists(_dir)) + Directory.Delete(_dir, recursive: true); + } + + private FileStore CreateStore(string subdirectory, FileStoreOptions? options = null) + { + var dir = Path.Combine(_dir, subdirectory); + var opts = options ?? new FileStoreOptions(); + opts.Directory = dir; + return new FileStore(opts); + } + + // ------------------------------------------------------------------------- + // PurgeEx tests + // ------------------------------------------------------------------------- + + // Go: TestFileStorePurgeExWithSubject — filestore_test.go:~867 + [Fact] + public async Task PurgeEx_BySubject_RemovesMatchingMessages() + { + await using var store = CreateStore("purgex-subject"); + + // Store 5 messages on "foo" and 5 on "bar" + for (var i = 0; i < 5; i++) + await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"foo-{i}"), default); + for (var i = 0; i < 5; i++) + await store.AppendAsync("bar", Encoding.UTF8.GetBytes($"bar-{i}"), default); + + var stateBeforePurge = await store.GetStateAsync(default); + stateBeforePurge.Messages.ShouldBe(10UL); + + // Purge all "foo" messages (seq=0 means no upper limit; keep=0 means keep none) + var purged = store.PurgeEx("foo", 0, 0); + purged.ShouldBe(5UL); + + // Only "bar" messages remain + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe(5UL); + + var remaining = await store.ListAsync(default); + remaining.All(m => m.Subject == "bar").ShouldBeTrue(); + } + + // Go: TestFileStorePurgeExKeepOneBug — filestore_test.go:~910 + [Fact] + public async Task PurgeEx_WithKeep_RetainsNewestMessages() + { + await using var store = CreateStore("purgex-keep"); + + // Store 10 messages on "events" + for (var i = 0; i < 10; i++) + await store.AppendAsync("events", Encoding.UTF8.GetBytes($"msg-{i}"), default); + + // Purge keeping the 3 newest + var purged = store.PurgeEx("events", 0, 3); + purged.ShouldBe(7UL); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(3); + + // The retained messages should be the 3 highest sequences (8, 9, 10) + var seqs = remaining.Select(m => m.Sequence).OrderBy(s => s).ToArray(); + seqs[0].ShouldBe(8UL); + seqs[1].ShouldBe(9UL); + seqs[2].ShouldBe(10UL); + } + + // Go: TestFileStorePurgeEx — filestore_test.go:~855 + [Fact] + public async Task PurgeEx_WithSeqLimit_OnlyPurgesBelowSequence() + { + await using var store = CreateStore("purgex-seqlimit"); + + // Store 10 messages on "data" + for (var i = 1; i <= 10; i++) + await store.AppendAsync("data", Encoding.UTF8.GetBytes($"d{i}"), default); + + // Purge "data" messages with seq <= 5 (keep=0) + var purged = store.PurgeEx("data", 5, 0); + purged.ShouldBe(5UL); + + // Messages 6-10 should remain + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(5); + remaining.Min(m => m.Sequence).ShouldBe(6UL); + remaining.Max(m => m.Sequence).ShouldBe(10UL); + } + + // Go: PurgeEx with wildcard subject — filestore_test.go:~867 + [Fact] + public async Task PurgeEx_WithWildcardSubject_RemovesAllMatchingSubjects() + { + await using var store = CreateStore("purgex-wildcard"); + + await store.AppendAsync("foo.a", "m1"u8.ToArray(), default); + await store.AppendAsync("foo.b", "m2"u8.ToArray(), default); + await store.AppendAsync("bar.a", "m3"u8.ToArray(), default); + await store.AppendAsync("foo.c", "m4"u8.ToArray(), default); + + var purged = store.PurgeEx("foo.*", 0, 0); + purged.ShouldBe(3UL); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(1); + remaining[0].Subject.ShouldBe("bar.a"); + } + + // Go: PurgeEx with > wildcard — filestore_test.go:~867 + [Fact] + public async Task PurgeEx_WithGtWildcard_RemovesAllMatchingSubjects() + { + await using var store = CreateStore("purgex-gt-wildcard"); + + await store.AppendAsync("a.b.c", "m1"u8.ToArray(), default); + await store.AppendAsync("a.b.d", "m2"u8.ToArray(), default); + await store.AppendAsync("a.x", "m3"u8.ToArray(), default); + await store.AppendAsync("b.x", "m4"u8.ToArray(), default); + + var purged = store.PurgeEx("a.>", 0, 0); + purged.ShouldBe(3UL); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(1); + remaining[0].Subject.ShouldBe("b.x"); + } + + // ------------------------------------------------------------------------- + // Compact tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreCompact — filestore_test.go:~964 + [Fact] + public async Task Compact_RemovesMessagesBeforeSequence() + { + await using var store = CreateStore("compact-basic"); + + // Store 10 messages + for (var i = 1; i <= 10; i++) + await store.AppendAsync("test", Encoding.UTF8.GetBytes($"msg{i}"), default); + + // Compact to remove messages with seq < 5 (removes 1, 2, 3, 4) + var removed = store.Compact(5); + removed.ShouldBe(4UL); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(6); // 5-10 + + remaining.Min(m => m.Sequence).ShouldBe(5UL); + remaining.Max(m => m.Sequence).ShouldBe(10UL); + + // Sequence 1-4 should no longer be loadable + (await store.LoadAsync(1, default)).ShouldBeNull(); + (await store.LoadAsync(4, default)).ShouldBeNull(); + + // Sequence 5 should still exist + (await store.LoadAsync(5, default)).ShouldNotBeNull(); + } + + // ------------------------------------------------------------------------- + // Truncate tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreStreamTruncate — filestore_test.go:~1035 + [Fact] + public async Task Truncate_RemovesMessagesAfterSequence() + { + await using var store = CreateStore("truncate-basic"); + + // Store 10 messages + for (var i = 1; i <= 10; i++) + await store.AppendAsync("stream", Encoding.UTF8.GetBytes($"m{i}"), default); + + // Truncate at seq=5 (removes 6, 7, 8, 9, 10) + store.Truncate(5); + + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(5); // 1-5 + + remaining.Min(m => m.Sequence).ShouldBe(1UL); + remaining.Max(m => m.Sequence).ShouldBe(5UL); + + // Messages 6-10 should be gone + (await store.LoadAsync(6, default)).ShouldBeNull(); + (await store.LoadAsync(10, default)).ShouldBeNull(); + + // Message 5 should still exist + (await store.LoadAsync(5, default)).ShouldNotBeNull(); + } + + // ------------------------------------------------------------------------- + // FilteredState tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreFilteredState — filestore_test.go:~1200 + [Fact] + public async Task FilteredState_ReturnsCorrectState() + { + await using var store = CreateStore("filteredstate"); + + // Store 5 messages on "orders" and 5 on "invoices" + for (var i = 1; i <= 5; i++) + await store.AppendAsync("orders", Encoding.UTF8.GetBytes($"o{i}"), default); + for (var i = 1; i <= 5; i++) + await store.AppendAsync("invoices", Encoding.UTF8.GetBytes($"inv{i}"), default); + + // FilteredState for "orders" from seq=1 + var ordersState = store.FilteredState(1, "orders"); + ordersState.Msgs.ShouldBe(5UL); + ordersState.First.ShouldBe(1UL); + ordersState.Last.ShouldBe(5UL); + + // FilteredState for "invoices" from seq=1 + var invoicesState = store.FilteredState(1, "invoices"); + invoicesState.Msgs.ShouldBe(5UL); + invoicesState.First.ShouldBe(6UL); + invoicesState.Last.ShouldBe(10UL); + + // FilteredState from seq=7 (only 4 invoices remain) + var lateInvoices = store.FilteredState(7, "invoices"); + lateInvoices.Msgs.ShouldBe(4UL); + lateInvoices.First.ShouldBe(7UL); + lateInvoices.Last.ShouldBe(10UL); + + // No match for non-existent subject + var noneState = store.FilteredState(1, "orders.unknown"); + noneState.Msgs.ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // SubjectsState tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreSubjectsState — filestore_test.go:~1266 + [Fact] + public async Task SubjectsState_ReturnsPerSubjectState() + { + await using var store = CreateStore("subjectsstate"); + + await store.AppendAsync("a.1", "msg"u8.ToArray(), default); + await store.AppendAsync("a.2", "msg"u8.ToArray(), default); + await store.AppendAsync("a.1", "msg"u8.ToArray(), default); + await store.AppendAsync("b.1", "msg"u8.ToArray(), default); + + var state = store.SubjectsState("a.>"); + + state.ShouldContainKey("a.1"); + state.ShouldContainKey("a.2"); + state.ShouldNotContainKey("b.1"); + + state["a.1"].Msgs.ShouldBe(2UL); + state["a.1"].First.ShouldBe(1UL); + state["a.1"].Last.ShouldBe(3UL); + + state["a.2"].Msgs.ShouldBe(1UL); + state["a.2"].First.ShouldBe(2UL); + state["a.2"].Last.ShouldBe(2UL); + } + + // ------------------------------------------------------------------------- + // SubjectsTotals tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreSubjectsTotals — filestore_test.go:~1300 + [Fact] + public async Task SubjectsTotals_ReturnsPerSubjectCounts() + { + await using var store = CreateStore("subjectstotals"); + + await store.AppendAsync("x.1", "m"u8.ToArray(), default); + await store.AppendAsync("x.1", "m"u8.ToArray(), default); + await store.AppendAsync("x.2", "m"u8.ToArray(), default); + await store.AppendAsync("y.1", "m"u8.ToArray(), default); + await store.AppendAsync("x.3", "m"u8.ToArray(), default); + + var totals = store.SubjectsTotals("x.*"); + + totals.ShouldContainKey("x.1"); + totals.ShouldContainKey("x.2"); + totals.ShouldContainKey("x.3"); + totals.ShouldNotContainKey("y.1"); + + totals["x.1"].ShouldBe(2UL); + totals["x.2"].ShouldBe(1UL); + totals["x.3"].ShouldBe(1UL); + } + + // ------------------------------------------------------------------------- + // State (with deleted sequences) tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreState — filestore_test.go:~420 + [Fact] + public async Task State_IncludesDeletedSequences() + { + await using var store = CreateStore("state-deleted"); + + // Store 10 messages + for (var i = 1; i <= 10; i++) + await store.AppendAsync("events", Encoding.UTF8.GetBytes($"e{i}"), default); + + // Remove messages 3, 5, 7 + await store.RemoveAsync(3, default); + await store.RemoveAsync(5, default); + await store.RemoveAsync(7, default); + + var state = store.State(); + + state.Msgs.ShouldBe(7UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(10UL); + state.NumDeleted.ShouldBe(3); + + state.Deleted.ShouldNotBeNull(); + state.Deleted!.ShouldContain(3UL); + state.Deleted.ShouldContain(5UL); + state.Deleted.ShouldContain(7UL); + state.Deleted.Length.ShouldBe(3); + + // NumSubjects: all messages are on "events" + state.NumSubjects.ShouldBe(1); + state.Subjects.ShouldNotBeNull(); + state.Subjects!["events"].ShouldBe(7UL); + } + + // ------------------------------------------------------------------------- + // GetSeqFromTime tests + // ------------------------------------------------------------------------- + + // Go: TestFileStoreGetSeqFromTime — filestore_test.go:~1570 + [Fact] + public async Task GetSeqFromTime_ReturnsCorrectSequence() + { + await using var store = CreateStore("getseqfromtime"); + + // Store 5 messages; we'll query by the timestamp of the 3rd message + var timestamps = new List(); + for (var i = 1; i <= 5; i++) + { + await store.AppendAsync("time.test", Encoding.UTF8.GetBytes($"t{i}"), default); + var msgs = await store.ListAsync(default); + timestamps.Add(msgs[^1].TimestampUtc); + // Small delay to ensure distinct timestamps + await Task.Delay(5); + } + + // Query for first seq at or after the timestamp of msg 3 + var targetTime = timestamps[2]; // timestamp of sequence 3 + var seq = store.GetSeqFromTime(targetTime); + seq.ShouldBe(3UL); + + // Query with a time before all messages: should return 1 + var beforeAll = timestamps[0].AddMilliseconds(-100); + store.GetSeqFromTime(beforeAll).ShouldBe(1UL); + + // Query with a time after all messages: should return last+1 + var afterAll = timestamps[^1].AddSeconds(1); + store.GetSeqFromTime(afterAll).ShouldBe(6UL); // _last + 1 + } + + // ------------------------------------------------------------------------- + // MsgBlock enhancements + // ------------------------------------------------------------------------- + + // Go: filestore.go dmap — soft-delete tracking and enumeration + [Fact] + public async Task MsgBlock_IsDeleted_AndEnumerateNonDeleted_Work() + { + await using var store = CreateStore("block-enumerate"); + + // Store 5 messages on 2 subjects + await store.AppendAsync("a.1", "m1"u8.ToArray(), default); + await store.AppendAsync("a.2", "m2"u8.ToArray(), default); + await store.AppendAsync("a.1", "m3"u8.ToArray(), default); + await store.AppendAsync("b.1", "m4"u8.ToArray(), default); + await store.AppendAsync("a.2", "m5"u8.ToArray(), default); + + // Delete sequences 2 and 4 + await store.RemoveAsync(2, default); + await store.RemoveAsync(4, default); + + // Verify the state after deletion + var all = await store.ListAsync(default); + all.Count.ShouldBe(3); + all.Select(m => m.Sequence).ShouldBe([1UL, 3UL, 5UL]); + + // FilteredState should only see non-deleted + var aState = store.FilteredState(1, "a.1"); + aState.Msgs.ShouldBe(2UL); // sequences 1 and 3 + } +}