feat: add atomic file writer with SemaphoreSlim for crash-safe state writes (Gap 1.6)
- Add AtomicFileWriter static helper: writes to {path}.{random}.tmp, flushes,
then File.Move(overwrite:true) — concurrent-safe via unique temp path per call
- Add _stateWriteLock (SemaphoreSlim 1,1) to FileStore; dispose in both Dispose
and DisposeAsync paths
- Promote WriteStreamState to async WriteStreamStateAsync using AtomicFileWriter
under the write lock; FlushAllPending now returns Task
- Update IStreamStore.FlushAllPending signature to Task; fix MemStore no-op impl
- Fix FileStoreCrashRecoveryTests to await FlushAllPending (3 sync→async tests)
- Add 9 AtomicFileWriterTests covering create, no-tmp-remains, overwrite,
concurrent safety, memory overload, empty data, and large payload
This commit is contained in:
49
src/NATS.Server/JetStream/Storage/AtomicFileWriter.cs
Normal file
49
src/NATS.Server/JetStream/Storage/AtomicFileWriter.cs
Normal file
@@ -0,0 +1,49 @@
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
/// <summary>
|
||||
/// Provides crash-safe atomic file writes using the write-to-temp-then-rename pattern.
|
||||
/// Each call writes to a uniquely-named sibling temp file (<c>{path}.{random}.tmp</c>),
|
||||
/// flushes to disk, then atomically renames over the target path so that:
|
||||
/// <list type="bullet">
|
||||
/// <item>A partial write never leaves a corrupt target file.</item>
|
||||
/// <item>Concurrent callers to different paths never collide on temp files.</item>
|
||||
/// </list>
|
||||
///
|
||||
/// Reference: golang/nats-server/server/filestore.go:10599 (_writeFullState)
|
||||
/// </summary>
|
||||
public static class AtomicFileWriter
|
||||
{
|
||||
/// <summary>
|
||||
/// Writes <paramref name="data"/> atomically to <paramref name="path"/>.
|
||||
/// The data is written to a unique <c>{path}.{random}.tmp</c> sibling, flushed,
|
||||
/// then renamed over <paramref name="path"/> with overwrite semantics.
|
||||
/// </summary>
|
||||
public static async Task WriteAtomicallyAsync(string path, byte[] data)
|
||||
{
|
||||
var tmpPath = path + "." + Path.GetRandomFileName() + ".tmp";
|
||||
await using (var fs = new FileStream(tmpPath, FileMode.CreateNew, FileAccess.Write, FileShare.None,
|
||||
bufferSize: 4096, useAsync: true))
|
||||
{
|
||||
await fs.WriteAsync(data);
|
||||
await fs.FlushAsync();
|
||||
}
|
||||
File.Move(tmpPath, path, overwrite: true);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Writes <paramref name="data"/> atomically to <paramref name="path"/>.
|
||||
/// The data is written to a unique <c>{path}.{random}.tmp</c> sibling, flushed,
|
||||
/// then renamed over <paramref name="path"/> with overwrite semantics.
|
||||
/// </summary>
|
||||
public static async Task WriteAtomicallyAsync(string path, ReadOnlyMemory<byte> data)
|
||||
{
|
||||
var tmpPath = path + "." + Path.GetRandomFileName() + ".tmp";
|
||||
await using (var fs = new FileStream(tmpPath, FileMode.CreateNew, FileAccess.Write, FileShare.None,
|
||||
bufferSize: 4096, useAsync: true))
|
||||
{
|
||||
await fs.WriteAsync(data);
|
||||
await fs.FlushAsync();
|
||||
}
|
||||
File.Move(tmpPath, path, overwrite: true);
|
||||
}
|
||||
}
|
||||
@@ -47,6 +47,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
||||
// Reference: golang/nats-server/server/filestore.go:290 (fss/ttl fields).
|
||||
private HashWheel? _ttlWheel;
|
||||
|
||||
// Mutual-exclusion lock for state file writes. Ensures that concurrent
|
||||
// FlushAllPending calls (e.g. from a flush timer and a shutdown path) do not
|
||||
// race on the stream.state / stream.state.tmp files.
|
||||
// Reference: golang/nats-server/server/filestore.go — fsMsgBlock.mu (write lock).
|
||||
private readonly SemaphoreSlim _stateWriteLock = new(1, 1);
|
||||
|
||||
public int BlockCount => _blocks.Count;
|
||||
public bool UsedIndexManifestOnStartup { get; private set; }
|
||||
|
||||
@@ -662,16 +668,18 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
DisposeAllBlocks();
|
||||
_stateWriteLock.Dispose();
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Synchronous dispose — releases all block file handles.
|
||||
/// Synchronous dispose — releases all block file handles and the state-write semaphore.
|
||||
/// Allows the store to be used in synchronous test contexts with <c>using</c> blocks.
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
DisposeAllBlocks();
|
||||
_stateWriteLock.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -1811,23 +1819,24 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
||||
/// good sequence without re-scanning every block.
|
||||
/// Reference: golang/nats-server/server/filestore.go:5783 (flushPendingWritesUnlocked).
|
||||
/// </summary>
|
||||
public void FlushAllPending()
|
||||
public async Task FlushAllPending()
|
||||
{
|
||||
_activeBlock?.Flush();
|
||||
WriteStreamState();
|
||||
await WriteStreamStateAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Atomically persists a compact stream state snapshot to disk using the
|
||||
/// write-to-temp-then-rename pattern so that a partial write never leaves
|
||||
/// a corrupt state file.
|
||||
/// Atomically persists a compact stream state snapshot to disk using
|
||||
/// <see cref="AtomicFileWriter"/> (write-to-temp-then-rename) so that a
|
||||
/// partial write never leaves a corrupt state file. The <see cref="_stateWriteLock"/>
|
||||
/// semaphore serialises concurrent flush calls so that only one write is
|
||||
/// in-flight at a time.
|
||||
/// The file is written as JSON to <c>{Directory}/stream.state</c>.
|
||||
/// Reference: golang/nats-server/server/filestore.go:5820 (writeFullState).
|
||||
/// </summary>
|
||||
private void WriteStreamState()
|
||||
private async Task WriteStreamStateAsync()
|
||||
{
|
||||
var statePath = Path.Combine(_options.Directory, "stream.state");
|
||||
var tmpPath = statePath + ".tmp";
|
||||
|
||||
var snapshot = new StreamStateSnapshot
|
||||
{
|
||||
@@ -1837,9 +1846,17 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
||||
Bytes = (ulong)_blocks.Sum(b => b.BytesUsed),
|
||||
};
|
||||
|
||||
var json = JsonSerializer.Serialize(snapshot);
|
||||
File.WriteAllText(tmpPath, json);
|
||||
File.Move(tmpPath, statePath, overwrite: true);
|
||||
var json = JsonSerializer.SerializeToUtf8Bytes(snapshot);
|
||||
|
||||
await _stateWriteLock.WaitAsync();
|
||||
try
|
||||
{
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(statePath, json);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_stateWriteLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -56,7 +56,7 @@ public interface IStreamStore
|
||||
=> throw new NotSupportedException("Block-engine SkipMsgs not yet implemented.");
|
||||
|
||||
// Go: StreamStore.FlushAllPending — flush any buffered writes to backing storage
|
||||
void FlushAllPending()
|
||||
Task FlushAllPending()
|
||||
=> throw new NotSupportedException("Block-engine FlushAllPending not yet implemented.");
|
||||
|
||||
// Go: StreamStore.LoadMsg — load message by exact sequence; sm is an optional reusable buffer
|
||||
|
||||
@@ -355,7 +355,7 @@ public sealed class MemStore : IStreamStore
|
||||
}
|
||||
|
||||
// Go: memStore.FlushAllPending server/memstore.go:423 — no-op for in-memory store
|
||||
void IStreamStore.FlushAllPending() { }
|
||||
Task IStreamStore.FlushAllPending() => Task.CompletedTask;
|
||||
|
||||
// Go: memStore.LoadMsg server/memstore.go:1692
|
||||
StoreMsg IStreamStore.LoadMsg(ulong seq, StoreMsg? sm)
|
||||
|
||||
@@ -0,0 +1,149 @@
|
||||
// Go ref: filestore.go:10599 (_writeFullState)
|
||||
// AtomicFileWriter wraps the write-to-temp-then-rename pattern used by
|
||||
// Go's fileStore._writeFullState to guarantee crash-safe state persistence.
|
||||
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Storage;
|
||||
|
||||
public sealed class AtomicFileWriterTests : IDisposable
|
||||
{
|
||||
private readonly DirectoryInfo _dir;
|
||||
|
||||
public AtomicFileWriterTests()
|
||||
{
|
||||
_dir = Directory.CreateTempSubdirectory("atomic_writer_tests_");
|
||||
}
|
||||
|
||||
public void Dispose() => _dir.Delete(recursive: true);
|
||||
|
||||
private string TempPath(string name) => Path.Combine(_dir.FullName, name);
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// byte[] overload
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_creates_file()
|
||||
{
|
||||
var path = TempPath("state.json");
|
||||
var data = "{ \"seq\": 1 }"u8.ToArray();
|
||||
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, data);
|
||||
|
||||
File.Exists(path).ShouldBeTrue();
|
||||
var written = await File.ReadAllBytesAsync(path);
|
||||
written.ShouldBe(data);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_no_temp_file_remains()
|
||||
{
|
||||
var path = TempPath("state.json");
|
||||
var data = "hello world"u8.ToArray();
|
||||
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, data);
|
||||
|
||||
// No .tmp file should remain after a successful write.
|
||||
// The temp file uses a random component ({path}.{random}.tmp) so check by extension.
|
||||
_dir.EnumerateFiles("*.tmp").ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_overwrites_existing()
|
||||
{
|
||||
var path = TempPath("state.json");
|
||||
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, "first"u8.ToArray());
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, "second"u8.ToArray());
|
||||
|
||||
var written = await File.ReadAllTextAsync(path);
|
||||
written.ShouldBe("second");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_concurrent_writes_are_safe()
|
||||
{
|
||||
// Multiple concurrent writes to the same file must not corrupt it.
|
||||
// Each write uses a unique payload; after all writes complete the
|
||||
// file must contain exactly one of the payloads (no partial data).
|
||||
var path = TempPath("concurrent.json");
|
||||
const int concurrency = 20;
|
||||
|
||||
var tasks = Enumerable.Range(0, concurrency).Select(i =>
|
||||
AtomicFileWriter.WriteAtomicallyAsync(path, System.Text.Encoding.UTF8.GetBytes($"payload-{i:D3}")));
|
||||
|
||||
await Task.WhenAll(tasks);
|
||||
|
||||
// File must exist and contain exactly one complete payload.
|
||||
File.Exists(path).ShouldBeTrue();
|
||||
var content = await File.ReadAllTextAsync(path);
|
||||
content.ShouldMatch(@"^payload-\d{3}$");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// ReadOnlyMemory<byte> overload
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_memory_overload_creates_file()
|
||||
{
|
||||
var path = TempPath("state_mem.json");
|
||||
ReadOnlyMemory<byte> data = "{ \"seq\": 42 }"u8.ToArray();
|
||||
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, data);
|
||||
|
||||
File.Exists(path).ShouldBeTrue();
|
||||
var written = await File.ReadAllBytesAsync(path);
|
||||
written.ShouldBe(data.ToArray());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_memory_overload_no_temp_file_remains()
|
||||
{
|
||||
var path = TempPath("state_mem.json");
|
||||
ReadOnlyMemory<byte> data = "memory data"u8.ToArray();
|
||||
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, data);
|
||||
|
||||
// The temp file uses a random component ({path}.{random}.tmp) so check by extension.
|
||||
_dir.EnumerateFiles("*.tmp").ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_memory_overload_overwrites_existing()
|
||||
{
|
||||
var path = TempPath("state_mem.json");
|
||||
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, (ReadOnlyMemory<byte>)"first"u8.ToArray());
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, (ReadOnlyMemory<byte>)"second"u8.ToArray());
|
||||
|
||||
var written = await File.ReadAllTextAsync(path);
|
||||
written.ShouldBe("second");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_writes_empty_data()
|
||||
{
|
||||
var path = TempPath("empty.json");
|
||||
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, Array.Empty<byte>());
|
||||
|
||||
File.Exists(path).ShouldBeTrue();
|
||||
var written = await File.ReadAllBytesAsync(path);
|
||||
written.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAtomicallyAsync_writes_large_payload()
|
||||
{
|
||||
var path = TempPath("large.bin");
|
||||
var data = new byte[256 * 1024]; // 256 KB
|
||||
Random.Shared.NextBytes(data);
|
||||
|
||||
await AtomicFileWriter.WriteAtomicallyAsync(path, data);
|
||||
|
||||
var written = await File.ReadAllBytesAsync(path);
|
||||
written.ShouldBe(data);
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,7 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable
|
||||
// Go: TestFileStoreSyncIntervals (filestore_test.go) — verifies that pending writes
|
||||
// are flushed to disk and the .blk file is non-empty after FlushAllPending.
|
||||
[Fact]
|
||||
public void FlushAllPending_flushes_active_block()
|
||||
public async Task FlushAllPending_flushes_active_block()
|
||||
{
|
||||
// Arrange: write a message to the store.
|
||||
const string sub = "flush-block";
|
||||
@@ -64,7 +64,7 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable
|
||||
store.StoreMsg("events.a", null, "payload-for-flush"u8.ToArray(), 0L);
|
||||
|
||||
// Act: flush all pending writes.
|
||||
store.FlushAllPending();
|
||||
await store.FlushAllPending();
|
||||
|
||||
// Assert: at least one .blk file exists and it is non-empty, proving the
|
||||
// active block was flushed to disk.
|
||||
@@ -81,7 +81,7 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable
|
||||
// Go: TestFileStoreWriteFullStateBasics (filestore_test.go:5461) — verifies that
|
||||
// WriteStreamState creates a valid, atomic stream.state checkpoint file.
|
||||
[Fact]
|
||||
public void FlushAllPending_writes_stream_state_file()
|
||||
public async Task FlushAllPending_writes_stream_state_file()
|
||||
{
|
||||
// Arrange: store several messages across subjects.
|
||||
const string sub = "state-file";
|
||||
@@ -93,13 +93,13 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable
|
||||
store.StoreMsg("events.a", null, "event-1"u8.ToArray(), 0L);
|
||||
|
||||
// Act: flush — this should write stream.state atomically.
|
||||
store.FlushAllPending();
|
||||
await store.FlushAllPending();
|
||||
|
||||
// Assert: stream.state exists and no leftover .tmp file.
|
||||
// AtomicFileWriter uses {path}.{random}.tmp so check by extension, not exact name.
|
||||
var statePath = Path.Combine(dir, "stream.state");
|
||||
var tmpPath = statePath + ".tmp";
|
||||
File.Exists(statePath).ShouldBeTrue("stream.state checkpoint must exist after FlushAllPending");
|
||||
File.Exists(tmpPath).ShouldBeFalse("stream.state.tmp must be renamed away after atomic write");
|
||||
Directory.GetFiles(dir, "*.tmp").ShouldBeEmpty("all .tmp staging files must be renamed away after atomic write");
|
||||
|
||||
// Assert: the file is valid JSON with the expected fields.
|
||||
var json = File.ReadAllText(statePath);
|
||||
@@ -118,17 +118,17 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable
|
||||
// Go: FlushAllPending is idempotent — calling it twice must not throw and must
|
||||
// overwrite the previous state file with the latest state.
|
||||
[Fact]
|
||||
public void FlushAllPending_is_idempotent()
|
||||
public async Task FlushAllPending_is_idempotent()
|
||||
{
|
||||
const string sub = "flush-idempotent";
|
||||
var dir = StoreDir(sub);
|
||||
using var store = CreateStore(sub);
|
||||
|
||||
store.StoreMsg("foo", null, "msg-1"u8.ToArray(), 0L);
|
||||
store.FlushAllPending();
|
||||
await store.FlushAllPending();
|
||||
|
||||
store.StoreMsg("foo", null, "msg-2"u8.ToArray(), 0L);
|
||||
store.FlushAllPending();
|
||||
await store.FlushAllPending();
|
||||
|
||||
// The state file should reflect the second flush (2 messages, seq 1..2).
|
||||
var statePath = Path.Combine(dir, "stream.state");
|
||||
@@ -197,7 +197,7 @@ public sealed class FileStoreCrashRecoveryTests : IDisposable
|
||||
store.StoreMsg("events", null, Encoding.UTF8.GetBytes($"msg-{i}"), 0L);
|
||||
|
||||
// Flush to ensure data is on disk before we close.
|
||||
store.FlushAllPending();
|
||||
await store.FlushAllPending();
|
||||
}
|
||||
|
||||
// Simulate a crash mid-write by truncating the .blk file by a few bytes at
|
||||
|
||||
Reference in New Issue
Block a user