From 56177a70996777587afd3caec21de2413f917155 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 14:51:30 -0500 Subject: [PATCH] feat: enforce filestore durability and recovery invariants --- .../JetStream/Storage/FileStore.cs | 5 +++ .../JetStreamFileStoreInvariantTests.cs | 35 ++++++++++++++++++ ...treamFileStoreRecoveryStrictParityTests.cs | 37 +++++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/JetStreamFileStoreInvariantTests.cs create mode 100644 tests/NATS.Server.Tests/JetStream/JetStreamFileStoreRecoveryStrictParityTests.cs diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 7c12199..8252350 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -91,7 +91,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable { var removed = _messages.Remove(sequence); if (removed) + { + if (sequence == _last) + _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); RewriteDataFile(); + } return ValueTask.FromResult(removed); } @@ -227,6 +231,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable _blockCount = 0; _activeBlockBytes = 0; _writeOffset = 0; + _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read); using var writer = new StreamWriter(stream, Encoding.UTF8); diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreInvariantTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreInvariantTests.cs new file mode 100644 index 0000000..d4fb4d7 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreInvariantTests.cs @@ -0,0 +1,35 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamFileStoreInvariantTests +{ + [Fact] + public async Task Filestore_recovery_preserves_sequence_subject_index_and_integrity_after_prune_and_restart_cycles() + { + var dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-invariants-{Guid.NewGuid():N}"); + var options = new FileStoreOptions { Directory = dir }; + + try + { + await using var store = new FileStore(options); + var seq1 = await store.AppendAsync("orders.created", "1"u8.ToArray(), default); + var seq2 = await store.AppendAsync("orders.updated", "2"u8.ToArray(), default); + seq1.ShouldBe((ulong)1); + seq2.ShouldBe((ulong)2); + + (await store.RemoveAsync(seq2, default)).ShouldBeTrue(); + var state = await store.GetStateAsync(default); + + state.Messages.ShouldBe((ulong)1); + state.LastSeq.ShouldBe((ulong)1); + state.FirstSeq.ShouldBe((ulong)1); + (await store.LoadLastBySubjectAsync("orders.updated", default)).ShouldBeNull(); + } + finally + { + if (Directory.Exists(dir)) + Directory.Delete(dir, recursive: true); + } + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreRecoveryStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreRecoveryStrictParityTests.cs new file mode 100644 index 0000000..6829bdc --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreRecoveryStrictParityTests.cs @@ -0,0 +1,37 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamFileStoreRecoveryStrictParityTests +{ + [Fact] + public async Task Filestore_recovery_preserves_sequence_subject_index_and_integrity_after_prune_and_restart_cycles() + { + var dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-recovery-{Guid.NewGuid():N}"); + var options = new FileStoreOptions { Directory = dir }; + + try + { + await using (var store = new FileStore(options)) + { + await store.AppendAsync("orders.created", "a"u8.ToArray(), default); + await store.AppendAsync("orders.created", "b"u8.ToArray(), default); + await store.RemoveAsync(2, default); + } + + await using var reopened = new FileStore(options); + var state = await reopened.GetStateAsync(default); + state.Messages.ShouldBe((ulong)1); + state.LastSeq.ShouldBe((ulong)1); + + var msg1 = await reopened.LoadAsync(1, default); + msg1.ShouldNotBeNull(); + msg1.Subject.ShouldBe("orders.created"); + } + finally + { + if (Directory.Exists(dir)) + Directory.Delete(dir, recursive: true); + } + } +}