diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 23e7c7d..2eb0764 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -35,6 +35,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private ulong _last; private ulong _first; // Go: first.seq — watermark for the first live or expected-first sequence + // Set to true after Stop() is called. Prevents further writes. + private bool _stopped; + // Resolved at construction time: which format family to use. private readonly bool _useS2; // true -> S2Codec (FSV2 compression path) private readonly bool _useAead; // true -> AeadEncryptor (FSV2 encryption path) @@ -66,6 +69,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public async ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) { + if (_stopped) + throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped."); + // Go: check and remove expired messages before each append. // Reference: golang/nats-server/server/filestore.go — storeMsg, expire check. ExpireFromWheel(); @@ -255,6 +261,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[] msg, long ttl) { + if (_stopped) + throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped."); + // Go: expire check before each store (same as AppendAsync). // Reference: golang/nats-server/server/filestore.go:6793 (expireMsgs call). ExpireFromWheel(); @@ -1614,6 +1623,135 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } + // ------------------------------------------------------------------------- + // Go-parity IStreamStore methods: StoreRawMsg, LoadPrevMsg, Type, Stop + // Reference: golang/nats-server/server/filestore.go + // ------------------------------------------------------------------------- + + /// + /// Stores a message at a caller-specified sequence number and timestamp. + /// Used for replication and mirroring — the caller (NRG, mirror source) controls + /// the sequence/timestamp rather than the store auto-incrementing them. + /// Unlike , this does NOT call ExpireFromWheel + /// or auto-increment _last. It updates _last via + /// Math.Max(_last, seq) so the watermark reflects the highest stored + /// sequence. + /// Reference: golang/nats-server/server/filestore.go:6756 (storeRawMsg). + /// + public void StoreRawMsg(string subject, byte[]? hdr, byte[] msg, ulong seq, long ts, long ttl, bool discardNewCheck) + { + if (_stopped) + throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped."); + + // Combine headers and payload, same as StoreMsg. + byte[] combined; + if (hdr is { Length: > 0 }) + { + combined = new byte[hdr.Length + msg.Length]; + hdr.CopyTo(combined, 0); + msg.CopyTo(combined, hdr.Length); + } + else + { + combined = msg; + } + + var persistedPayload = TransformForPersist(combined.AsSpan()); + // Recover UTC DateTime from caller-supplied Unix nanosecond timestamp. + var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime; + + var stored = new StoredMessage + { + Sequence = seq, + Subject = subject, + Payload = combined, + TimestampUtc = storedUtc, + }; + _messages[seq] = stored; + + // Go: update _last to the high-water mark — do not decrement. + _last = Math.Max(_last, seq); + + // Register TTL using the caller-supplied timestamp and TTL. + var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L); + RegisterTtl(seq, ts, effectiveTtlNs); + + EnsureActiveBlock(); + try + { + _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, ts); + } + catch (InvalidOperationException) + { + RotateBlock(); + _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, ts); + } + + if (_activeBlock!.IsSealed) + RotateBlock(); + } + + /// + /// Loads the message immediately before by walking + /// backward from start - 1 to _first. + /// Throws if no such message exists. + /// Reference: golang/nats-server/server/filestore.go — LoadPrevMsg. + /// + public StoreMsg LoadPrevMsg(ulong start, StoreMsg? sm) + { + if (start == 0) + throw new KeyNotFoundException("No message found before seq 0."); + + var first = _messages.Count > 0 ? _messages.Keys.Min() : 1UL; + + for (var seq = start - 1; seq >= first && seq <= _last; seq--) + { + if (_messages.TryGetValue(seq, out var stored)) + { + sm ??= new StoreMsg(); + sm.Clear(); + sm.Subject = stored.Subject; + sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; + sm.Sequence = stored.Sequence; + sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + return sm; + } + + // Prevent underflow on ulong subtraction. + if (seq == 0) + break; + } + + throw new KeyNotFoundException($"No message found before seq {start}."); + } + + /// + /// Returns the storage backend type for this store instance. + /// Reference: golang/nats-server/server/filestore.go — fileStore.Type. + /// + public StorageType Type() => StorageType.File; + + /// + /// Flushes the active block to disk and marks the store as stopped. + /// After Stop() returns, calls to or + /// will throw . + /// Blocks are NOT deleted — use if data removal is needed. + /// Reference: golang/nats-server/server/filestore.go — fileStore.Stop. + /// + public void Stop() + { + if (_stopped) + return; + + _stopped = true; + + // Flush the active block to ensure all buffered writes reach disk. + _activeBlock?.Flush(); + + // Dispose all blocks to release OS file handles. The files remain on disk. + DisposeAllBlocks(); + } + // ------------------------------------------------------------------------- // ConsumerStore factory // Reference: golang/nats-server/server/filestore.go — fileStore.ConsumerStore diff --git a/tests/NATS.Server.Tests/JetStream/Storage/FileStoreStreamStoreTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreStreamStoreTests.cs new file mode 100644 index 0000000..751d1ef --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreStreamStoreTests.cs @@ -0,0 +1,294 @@ +// Reference: golang/nats-server/server/filestore.go +// Tests in this file: +// StoreRawMsg_stores_at_specified_sequence — IStreamStore.StoreRawMsg preserves caller seq/ts +// LoadPrevMsg_returns_message_before_seq — IStreamStore.LoadPrevMsg backward scan +// Type_returns_file — IStreamStore.Type() returns StorageType.File +// Stop_prevents_further_writes — IStreamStore.Stop() sets _stopped flag + +using NATS.Server.JetStream.Storage; +using StorageType = NATS.Server.JetStream.Models.StorageType; + +namespace NATS.Server.Tests.JetStream.Storage; + +/// +/// Tests for IStreamStore methods added to FileStore in Batch 1: +/// StoreRawMsg, LoadPrevMsg, Type, and Stop. +/// +public sealed class FileStoreStreamStoreTests : IDisposable +{ + private readonly string _root; + + public FileStoreStreamStoreTests() + { + _root = Path.Combine(Path.GetTempPath(), $"nats-js-sstest-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_root); + } + + public void Dispose() + { + // Best-effort cleanup of temp directory. If it fails (e.g. open handles on CI), + // the OS will clean it up on the next reboot. Letting it throw would suppress + // the real test failure so we absorb IO errors explicitly. + if (!Directory.Exists(_root)) + return; + + try + { + Directory.Delete(_root, recursive: true); + } + catch (IOException ex) + { + // Open file handles (common on Windows CI) — log and continue. + Console.Error.WriteLine($"[FileStoreStreamStoreTests] Dispose: {ex.Message}"); + } + catch (UnauthorizedAccessException ex) + { + // Read-only files left by the test — log and continue. + Console.Error.WriteLine($"[FileStoreStreamStoreTests] Dispose: {ex.Message}"); + } + } + + private FileStore CreateStore(string subDir, FileStoreOptions? opts = null) + { + var dir = Path.Combine(_root, subDir); + Directory.CreateDirectory(dir); + var o = opts ?? new FileStoreOptions(); + o.Directory = dir; + return new FileStore(o); + } + + // ------------------------------------------------------------------------- + // StoreRawMsg + // ------------------------------------------------------------------------- + + // Go: filestore.go storeRawMsg — caller specifies seq and ts; store must not + // auto-increment, and _last must be updated to Math.Max(_last, seq). + [Fact] + public void StoreRawMsg_stores_at_specified_sequence() + { + using var store = CreateStore("raw-seq"); + IStreamStore ss = store; + + var subject = "events.raw"; + var data = "hello raw"u8.ToArray(); + // Use a specific Unix nanosecond timestamp. + var tsNs = new DateTimeOffset(2024, 6, 1, 12, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds() * 1_000_000L; + const ulong targetSeq = 42UL; + + ss.StoreRawMsg(subject, null, data, targetSeq, tsNs, 0, false); + + // Verify by loading the message back via LoadMsg. + var sm = ss.LoadMsg(targetSeq, null); + sm.Subject.ShouldBe(subject); + sm.Sequence.ShouldBe(targetSeq); + sm.Timestamp.ShouldBe(tsNs); + } + + [Fact] + public void StoreRawMsg_updates_last_watermark() + { + using var store = CreateStore("raw-wm"); + IStreamStore ss = store; + + // Store a message at seq 100 — _last should become 100. + ss.StoreRawMsg("foo", null, "x"u8.ToArray(), 100UL, 1_000_000L, 0, false); + + var state = new StreamState(); + ss.FastState(ref state); + state.LastSeq.ShouldBe(100UL); + } + + [Fact] + public void StoreRawMsg_does_not_decrement_last_for_lower_seq() + { + using var store = CreateStore("raw-order"); + IStreamStore ss = store; + + // Write seq 50 first, then seq 30 (out-of-order replication scenario). + ss.StoreRawMsg("foo", null, "x"u8.ToArray(), 50UL, 1_000_000L, 0, false); + ss.StoreRawMsg("bar", null, "y"u8.ToArray(), 30UL, 2_000_000L, 0, false); + + var state = new StreamState(); + ss.FastState(ref state); + // _last should remain 50, not go down to 30. + state.LastSeq.ShouldBe(50UL); + } + + [Fact] + public void StoreRawMsg_preserves_caller_timestamp() + { + using var store = CreateStore("raw-ts"); + IStreamStore ss = store; + + var subject = "ts.test"; + var data = "payload"u8.ToArray(); + // A deterministic Unix nanosecond timestamp. + var tsNs = 1_717_238_400_000_000_000L; // 2024-06-01 00:00:00 UTC in ns + + ss.StoreRawMsg(subject, null, data, 7UL, tsNs, 0, false); + + var sm = ss.LoadMsg(7UL, null); + sm.Timestamp.ShouldBe(tsNs); + } + + [Fact] + public void StoreRawMsg_throws_after_stop() + { + using var store = CreateStore("raw-stop"); + IStreamStore ss = store; + + ss.Stop(); + + Should.Throw(() => + ss.StoreRawMsg("foo", null, "x"u8.ToArray(), 1UL, 1_000_000L, 0, false)); + } + + // ------------------------------------------------------------------------- + // LoadPrevMsg + // ------------------------------------------------------------------------- + + // Go: filestore.go LoadPrevMsg — walks backward from start-1 to first. + [Fact] + public void LoadPrevMsg_returns_message_before_seq() + { + using var store = CreateStore("prev-basic"); + IStreamStore ss = store; + + // Write 3 messages at seqs 1, 2, 3. + ss.StoreMsg("a", null, "msg1"u8.ToArray(), 0); + ss.StoreMsg("b", null, "msg2"u8.ToArray(), 0); + ss.StoreMsg("c", null, "msg3"u8.ToArray(), 0); + + // LoadPrevMsg(3) should return seq 2. + var sm = ss.LoadPrevMsg(3UL, null); + sm.Sequence.ShouldBe(2UL); + sm.Subject.ShouldBe("b"); + } + + [Fact] + public void LoadPrevMsg_skips_deleted_message() + { + using var store = CreateStore("prev-skip"); + IStreamStore ss = store; + + ss.StoreMsg("a", null, "msg1"u8.ToArray(), 0); // seq 1 + ss.StoreMsg("b", null, "msg2"u8.ToArray(), 0); // seq 2 + ss.StoreMsg("c", null, "msg3"u8.ToArray(), 0); // seq 3 + + // Delete seq 2 — LoadPrevMsg(3) must skip it and return seq 1. + ss.RemoveMsg(2UL); + + var sm = ss.LoadPrevMsg(3UL, null); + sm.Sequence.ShouldBe(1UL); + sm.Subject.ShouldBe("a"); + } + + [Fact] + public void LoadPrevMsg_throws_when_no_message_before_seq() + { + using var store = CreateStore("prev-none"); + IStreamStore ss = store; + + ss.StoreMsg("a", null, "msg1"u8.ToArray(), 0); // seq 1 + + // LoadPrevMsg(1) — nothing before seq 1. + Should.Throw(() => ss.LoadPrevMsg(1UL, null)); + } + + [Fact] + public void LoadPrevMsg_reuses_provided_container() + { + using var store = CreateStore("prev-reuse"); + IStreamStore ss = store; + + ss.StoreMsg("x", null, "d1"u8.ToArray(), 0); // seq 1 + ss.StoreMsg("y", null, "d2"u8.ToArray(), 0); // seq 2 + + var container = new StoreMsg(); + var result = ss.LoadPrevMsg(2UL, container); + + // Should return the same object reference. + result.ShouldBeSameAs(container); + container.Sequence.ShouldBe(1UL); + } + + // ------------------------------------------------------------------------- + // Type + // ------------------------------------------------------------------------- + + // Go: filestore.go fileStore.Type — returns StorageType.File. + [Fact] + public void Type_returns_file() + { + using var store = CreateStore("type"); + IStreamStore ss = store; + + ss.Type().ShouldBe(StorageType.File); + } + + // ------------------------------------------------------------------------- + // Stop + // ------------------------------------------------------------------------- + + // Go: filestore.go fileStore.Stop — flushes and marks as stopped. + [Fact] + public void Stop_prevents_further_writes_via_StoreMsg() + { + using var store = CreateStore("stop-storemsg"); + IStreamStore ss = store; + + ss.StoreMsg("ok", null, "before"u8.ToArray(), 0); + + ss.Stop(); + + Should.Throw(() => + ss.StoreMsg("fail", null, "after"u8.ToArray(), 0)); + } + + [Fact] + public async Task Stop_prevents_further_writes_via_AppendAsync() + { + using var store = CreateStore("stop-append"); + + await store.AppendAsync("ok", "before"u8.ToArray(), CancellationToken.None); + + ((IStreamStore)store).Stop(); + + await Should.ThrowAsync(() => + store.AppendAsync("fail", "after"u8.ToArray(), CancellationToken.None).AsTask()); + } + + [Fact] + public void Stop_is_idempotent() + { + using var store = CreateStore("stop-idem"); + IStreamStore ss = store; + + ss.Stop(); + + // Second Stop() must not throw. + var ex = Record.Exception(() => ss.Stop()); + ex.ShouldBeNull(); + } + + [Fact] + public void Stop_preserves_messages_on_disk() + { + var dir = Path.Combine(_root, "stop-persist"); + Directory.CreateDirectory(dir); + + FileStore CreateWithDir() => new FileStore(new FileStoreOptions { Directory = dir }); + + // Write a message, stop the store. + using (var store = CreateWithDir()) + { + ((IStreamStore)store).StoreMsg("saved", null, "payload"u8.ToArray(), 0); + ((IStreamStore)store).Stop(); + } + + // Re-open and verify the message survived. + using var recovered = CreateWithDir(); + var sm = ((IStreamStore)recovered).LoadMsg(1UL, null); + sm.Subject.ShouldBe("saved"); + } +}