diff --git a/src/NATS.Server/JetStream/Storage/AtomicFileWriter.cs b/src/NATS.Server/JetStream/Storage/AtomicFileWriter.cs new file mode 100644 index 0000000..9949e96 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/AtomicFileWriter.cs @@ -0,0 +1,49 @@ +namespace NATS.Server.JetStream.Storage; + +/// +/// Provides crash-safe atomic file writes using the write-to-temp-then-rename pattern. +/// Each call writes to a uniquely-named sibling temp file ({path}.{random}.tmp), +/// flushes to disk, then atomically renames over the target path so that: +/// +/// A partial write never leaves a corrupt target file. +/// Concurrent callers to different paths never collide on temp files. +/// +/// +/// Reference: golang/nats-server/server/filestore.go:10599 (_writeFullState) +/// +public static class AtomicFileWriter +{ + /// + /// Writes atomically to . + /// The data is written to a unique {path}.{random}.tmp sibling, flushed, + /// then renamed over with overwrite semantics. + /// + public static async Task WriteAtomicallyAsync(string path, byte[] data) + { + var tmpPath = path + "." + Path.GetRandomFileName() + ".tmp"; + await using (var fs = new FileStream(tmpPath, FileMode.CreateNew, FileAccess.Write, FileShare.None, + bufferSize: 4096, useAsync: true)) + { + await fs.WriteAsync(data); + await fs.FlushAsync(); + } + File.Move(tmpPath, path, overwrite: true); + } + + /// + /// Writes atomically to . + /// The data is written to a unique {path}.{random}.tmp sibling, flushed, + /// then renamed over with overwrite semantics. + /// + public static async Task WriteAtomicallyAsync(string path, ReadOnlyMemory data) + { + var tmpPath = path + "." + Path.GetRandomFileName() + ".tmp"; + await using (var fs = new FileStream(tmpPath, FileMode.CreateNew, FileAccess.Write, FileShare.None, + bufferSize: 4096, useAsync: true)) + { + await fs.WriteAsync(data); + await fs.FlushAsync(); + } + File.Move(tmpPath, path, overwrite: true); + } +} diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index fb0d76e..ffade76 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -47,6 +47,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Reference: golang/nats-server/server/filestore.go:290 (fss/ttl fields). private HashWheel? _ttlWheel; + // Mutual-exclusion lock for state file writes. Ensures that concurrent + // FlushAllPending calls (e.g. from a flush timer and a shutdown path) do not + // race on the stream.state / stream.state.tmp files. + // Reference: golang/nats-server/server/filestore.go — fsMsgBlock.mu (write lock). + private readonly SemaphoreSlim _stateWriteLock = new(1, 1); + public int BlockCount => _blocks.Count; public bool UsedIndexManifestOnStartup { get; private set; } @@ -662,16 +668,18 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask DisposeAsync() { DisposeAllBlocks(); + _stateWriteLock.Dispose(); return ValueTask.CompletedTask; } /// - /// Synchronous dispose — releases all block file handles. + /// Synchronous dispose — releases all block file handles and the state-write semaphore. /// Allows the store to be used in synchronous test contexts with using blocks. /// public void Dispose() { DisposeAllBlocks(); + _stateWriteLock.Dispose(); } /// @@ -1811,23 +1819,24 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// good sequence without re-scanning every block. /// Reference: golang/nats-server/server/filestore.go:5783 (flushPendingWritesUnlocked). /// - public void FlushAllPending() + public async Task FlushAllPending() { _activeBlock?.Flush(); - WriteStreamState(); + await WriteStreamStateAsync(); } /// - /// Atomically persists a compact stream state snapshot to disk using the - /// write-to-temp-then-rename pattern so that a partial write never leaves - /// a corrupt state file. + /// Atomically persists a compact stream state snapshot to disk using + /// (write-to-temp-then-rename) so that a + /// partial write never leaves a corrupt state file. The + /// semaphore serialises concurrent flush calls so that only one write is + /// in-flight at a time. /// The file is written as JSON to {Directory}/stream.state. /// Reference: golang/nats-server/server/filestore.go:5820 (writeFullState). /// - private void WriteStreamState() + private async Task WriteStreamStateAsync() { var statePath = Path.Combine(_options.Directory, "stream.state"); - var tmpPath = statePath + ".tmp"; var snapshot = new StreamStateSnapshot { @@ -1837,9 +1846,17 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable Bytes = (ulong)_blocks.Sum(b => b.BytesUsed), }; - var json = JsonSerializer.Serialize(snapshot); - File.WriteAllText(tmpPath, json); - File.Move(tmpPath, statePath, overwrite: true); + var json = JsonSerializer.SerializeToUtf8Bytes(snapshot); + + await _stateWriteLock.WaitAsync(); + try + { + await AtomicFileWriter.WriteAtomicallyAsync(statePath, json); + } + finally + { + _stateWriteLock.Release(); + } } // ------------------------------------------------------------------------- diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs index f9ad283..a034a6f 100644 --- a/src/NATS.Server/JetStream/Storage/IStreamStore.cs +++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs @@ -56,7 +56,7 @@ public interface IStreamStore => throw new NotSupportedException("Block-engine SkipMsgs not yet implemented."); // Go: StreamStore.FlushAllPending — flush any buffered writes to backing storage - void FlushAllPending() + Task FlushAllPending() => throw new NotSupportedException("Block-engine FlushAllPending not yet implemented."); // Go: StreamStore.LoadMsg — load message by exact sequence; sm is an optional reusable buffer diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index e80d1d5..71c8c3f 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -355,7 +355,7 @@ public sealed class MemStore : IStreamStore } // Go: memStore.FlushAllPending server/memstore.go:423 — no-op for in-memory store - void IStreamStore.FlushAllPending() { } + Task IStreamStore.FlushAllPending() => Task.CompletedTask; // Go: memStore.LoadMsg server/memstore.go:1692 StoreMsg IStreamStore.LoadMsg(ulong seq, StoreMsg? sm) diff --git a/tests/NATS.Server.Tests/JetStream/Storage/AtomicFileWriterTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/AtomicFileWriterTests.cs new file mode 100644 index 0000000..8cb7735 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/AtomicFileWriterTests.cs @@ -0,0 +1,149 @@ +// Go ref: filestore.go:10599 (_writeFullState) +// AtomicFileWriter wraps the write-to-temp-then-rename pattern used by +// Go's fileStore._writeFullState to guarantee crash-safe state persistence. + +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +public sealed class AtomicFileWriterTests : IDisposable +{ + private readonly DirectoryInfo _dir; + + public AtomicFileWriterTests() + { + _dir = Directory.CreateTempSubdirectory("atomic_writer_tests_"); + } + + public void Dispose() => _dir.Delete(recursive: true); + + private string TempPath(string name) => Path.Combine(_dir.FullName, name); + + // ------------------------------------------------------------------------- + // byte[] overload + // ------------------------------------------------------------------------- + + [Fact] + public async Task WriteAtomicallyAsync_creates_file() + { + var path = TempPath("state.json"); + var data = "{ \"seq\": 1 }"u8.ToArray(); + + await AtomicFileWriter.WriteAtomicallyAsync(path, data); + + File.Exists(path).ShouldBeTrue(); + var written = await File.ReadAllBytesAsync(path); + written.ShouldBe(data); + } + + [Fact] + public async Task WriteAtomicallyAsync_no_temp_file_remains() + { + var path = TempPath("state.json"); + var data = "hello world"u8.ToArray(); + + await AtomicFileWriter.WriteAtomicallyAsync(path, data); + + // No .tmp file should remain after a successful write. + // The temp file uses a random component ({path}.{random}.tmp) so check by extension. + _dir.EnumerateFiles("*.tmp").ShouldBeEmpty(); + } + + [Fact] + public async Task WriteAtomicallyAsync_overwrites_existing() + { + var path = TempPath("state.json"); + + await AtomicFileWriter.WriteAtomicallyAsync(path, "first"u8.ToArray()); + await AtomicFileWriter.WriteAtomicallyAsync(path, "second"u8.ToArray()); + + var written = await File.ReadAllTextAsync(path); + written.ShouldBe("second"); + } + + [Fact] + public async Task WriteAtomicallyAsync_concurrent_writes_are_safe() + { + // Multiple concurrent writes to the same file must not corrupt it. + // Each write uses a unique payload; after all writes complete the + // file must contain exactly one of the payloads (no partial data). + var path = TempPath("concurrent.json"); + const int concurrency = 20; + + var tasks = Enumerable.Range(0, concurrency).Select(i => + AtomicFileWriter.WriteAtomicallyAsync(path, System.Text.Encoding.UTF8.GetBytes($"payload-{i:D3}"))); + + await Task.WhenAll(tasks); + + // File must exist and contain exactly one complete payload. + File.Exists(path).ShouldBeTrue(); + var content = await File.ReadAllTextAsync(path); + content.ShouldMatch(@"^payload-\d{3}$"); + } + + // ------------------------------------------------------------------------- + // ReadOnlyMemory overload + // ------------------------------------------------------------------------- + + [Fact] + public async Task WriteAtomicallyAsync_memory_overload_creates_file() + { + var path = TempPath("state_mem.json"); + ReadOnlyMemory data = "{ \"seq\": 42 }"u8.ToArray(); + + await AtomicFileWriter.WriteAtomicallyAsync(path, data); + + File.Exists(path).ShouldBeTrue(); + var written = await File.ReadAllBytesAsync(path); + written.ShouldBe(data.ToArray()); + } + + [Fact] + public async Task WriteAtomicallyAsync_memory_overload_no_temp_file_remains() + { + var path = TempPath("state_mem.json"); + ReadOnlyMemory data = "memory data"u8.ToArray(); + + await AtomicFileWriter.WriteAtomicallyAsync(path, data); + + // The temp file uses a random component ({path}.{random}.tmp) so check by extension. + _dir.EnumerateFiles("*.tmp").ShouldBeEmpty(); + } + + [Fact] + public async Task WriteAtomicallyAsync_memory_overload_overwrites_existing() + { + var path = TempPath("state_mem.json"); + + await AtomicFileWriter.WriteAtomicallyAsync(path, (ReadOnlyMemory)"first"u8.ToArray()); + await AtomicFileWriter.WriteAtomicallyAsync(path, (ReadOnlyMemory)"second"u8.ToArray()); + + var written = await File.ReadAllTextAsync(path); + written.ShouldBe("second"); + } + + [Fact] + public async Task WriteAtomicallyAsync_writes_empty_data() + { + var path = TempPath("empty.json"); + + await AtomicFileWriter.WriteAtomicallyAsync(path, Array.Empty()); + + File.Exists(path).ShouldBeTrue(); + var written = await File.ReadAllBytesAsync(path); + written.ShouldBeEmpty(); + } + + [Fact] + public async Task WriteAtomicallyAsync_writes_large_payload() + { + var path = TempPath("large.bin"); + var data = new byte[256 * 1024]; // 256 KB + Random.Shared.NextBytes(data); + + await AtomicFileWriter.WriteAtomicallyAsync(path, data); + + var written = await File.ReadAllBytesAsync(path); + written.ShouldBe(data); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Storage/FileStoreCrashRecoveryTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreCrashRecoveryTests.cs index ea5e7f8..d1e6cc0 100644 --- a/tests/NATS.Server.Tests/JetStream/Storage/FileStoreCrashRecoveryTests.cs +++ b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreCrashRecoveryTests.cs @@ -54,7 +54,7 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable // Go: TestFileStoreSyncIntervals (filestore_test.go) — verifies that pending writes // are flushed to disk and the .blk file is non-empty after FlushAllPending. [Fact] - public void FlushAllPending_flushes_active_block() + public async Task FlushAllPending_flushes_active_block() { // Arrange: write a message to the store. const string sub = "flush-block"; @@ -64,7 +64,7 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable store.StoreMsg("events.a", null, "payload-for-flush"u8.ToArray(), 0L); // Act: flush all pending writes. - store.FlushAllPending(); + await store.FlushAllPending(); // Assert: at least one .blk file exists and it is non-empty, proving the // active block was flushed to disk. @@ -81,7 +81,7 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable // Go: TestFileStoreWriteFullStateBasics (filestore_test.go:5461) — verifies that // WriteStreamState creates a valid, atomic stream.state checkpoint file. [Fact] - public void FlushAllPending_writes_stream_state_file() + public async Task FlushAllPending_writes_stream_state_file() { // Arrange: store several messages across subjects. const string sub = "state-file"; @@ -93,13 +93,13 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable store.StoreMsg("events.a", null, "event-1"u8.ToArray(), 0L); // Act: flush — this should write stream.state atomically. - store.FlushAllPending(); + await store.FlushAllPending(); // Assert: stream.state exists and no leftover .tmp file. + // AtomicFileWriter uses {path}.{random}.tmp so check by extension, not exact name. var statePath = Path.Combine(dir, "stream.state"); - var tmpPath = statePath + ".tmp"; File.Exists(statePath).ShouldBeTrue("stream.state checkpoint must exist after FlushAllPending"); - File.Exists(tmpPath).ShouldBeFalse("stream.state.tmp must be renamed away after atomic write"); + Directory.GetFiles(dir, "*.tmp").ShouldBeEmpty("all .tmp staging files must be renamed away after atomic write"); // Assert: the file is valid JSON with the expected fields. var json = File.ReadAllText(statePath); @@ -118,17 +118,17 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable // Go: FlushAllPending is idempotent — calling it twice must not throw and must // overwrite the previous state file with the latest state. [Fact] - public void FlushAllPending_is_idempotent() + public async Task FlushAllPending_is_idempotent() { const string sub = "flush-idempotent"; var dir = StoreDir(sub); using var store = CreateStore(sub); store.StoreMsg("foo", null, "msg-1"u8.ToArray(), 0L); - store.FlushAllPending(); + await store.FlushAllPending(); store.StoreMsg("foo", null, "msg-2"u8.ToArray(), 0L); - store.FlushAllPending(); + await store.FlushAllPending(); // The state file should reflect the second flush (2 messages, seq 1..2). var statePath = Path.Combine(dir, "stream.state"); @@ -197,7 +197,7 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable store.StoreMsg("events", null, Encoding.UTF8.GetBytes($"msg-{i}"), 0L); // Flush to ensure data is on disk before we close. - store.FlushAllPending(); + await store.FlushAllPending(); } // Simulate a crash mid-write by truncating the .blk file by a few bytes at