feat(filestore): implement StoreRawMsg, LoadPrevMsg, Type, Stop on FileStore
Complete IStreamStore Batch 1 — all core operations now have FileStore implementations instead of throwing NotSupportedException: - StoreRawMsg: caller-specified seq/ts for replication/mirroring - LoadPrevMsg: backward scan for message before given sequence - Type: returns StorageType.File - Stop: flush + dispose blocks, reject further writes 14 new tests in FileStoreStreamStoreTests.
This commit is contained in:
@@ -35,6 +35,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
||||
private ulong _last;
|
||||
private ulong _first; // Go: first.seq — watermark for the first live or expected-first sequence
|
||||
|
||||
// Set to true after Stop() is called. Prevents further writes.
|
||||
private bool _stopped;
|
||||
|
||||
// Resolved at construction time: which format family to use.
|
||||
private readonly bool _useS2; // true -> S2Codec (FSV2 compression path)
|
||||
private readonly bool _useAead; // true -> AeadEncryptor (FSV2 encryption path)
|
||||
@@ -66,6 +69,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
||||
|
||||
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
if (_stopped)
|
||||
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
|
||||
|
||||
// Go: check and remove expired messages before each append.
|
||||
// Reference: golang/nats-server/server/filestore.go — storeMsg, expire check.
|
||||
ExpireFromWheel();
|
||||
@@ -255,6 +261,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
||||
/// </summary>
|
||||
public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[] msg, long ttl)
|
||||
{
|
||||
if (_stopped)
|
||||
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
|
||||
|
||||
// Go: expire check before each store (same as AppendAsync).
|
||||
// Reference: golang/nats-server/server/filestore.go:6793 (expireMsgs call).
|
||||
ExpireFromWheel();
|
||||
@@ -1614,6 +1623,135 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
|
||||
}
|
||||
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Go-parity IStreamStore methods: StoreRawMsg, LoadPrevMsg, Type, Stop
|
||||
// Reference: golang/nats-server/server/filestore.go
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Stores a message at a caller-specified sequence number and timestamp.
|
||||
/// Used for replication and mirroring — the caller (NRG, mirror source) controls
|
||||
/// the sequence/timestamp rather than the store auto-incrementing them.
|
||||
/// <para>Unlike <see cref="StoreMsg"/>, this does NOT call <c>ExpireFromWheel</c>
|
||||
/// or auto-increment <c>_last</c>. It updates <c>_last</c> via
|
||||
/// <c>Math.Max(_last, seq)</c> so the watermark reflects the highest stored
|
||||
/// sequence.</para>
|
||||
/// Reference: golang/nats-server/server/filestore.go:6756 (storeRawMsg).
|
||||
/// </summary>
|
||||
public void StoreRawMsg(string subject, byte[]? hdr, byte[] msg, ulong seq, long ts, long ttl, bool discardNewCheck)
|
||||
{
|
||||
if (_stopped)
|
||||
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
|
||||
|
||||
// Combine headers and payload, same as StoreMsg.
|
||||
byte[] combined;
|
||||
if (hdr is { Length: > 0 })
|
||||
{
|
||||
combined = new byte[hdr.Length + msg.Length];
|
||||
hdr.CopyTo(combined, 0);
|
||||
msg.CopyTo(combined, hdr.Length);
|
||||
}
|
||||
else
|
||||
{
|
||||
combined = msg;
|
||||
}
|
||||
|
||||
var persistedPayload = TransformForPersist(combined.AsSpan());
|
||||
// Recover UTC DateTime from caller-supplied Unix nanosecond timestamp.
|
||||
var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime;
|
||||
|
||||
var stored = new StoredMessage
|
||||
{
|
||||
Sequence = seq,
|
||||
Subject = subject,
|
||||
Payload = combined,
|
||||
TimestampUtc = storedUtc,
|
||||
};
|
||||
_messages[seq] = stored;
|
||||
|
||||
// Go: update _last to the high-water mark — do not decrement.
|
||||
_last = Math.Max(_last, seq);
|
||||
|
||||
// Register TTL using the caller-supplied timestamp and TTL.
|
||||
var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L);
|
||||
RegisterTtl(seq, ts, effectiveTtlNs);
|
||||
|
||||
EnsureActiveBlock();
|
||||
try
|
||||
{
|
||||
_activeBlock!.WriteAt(seq, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, ts);
|
||||
}
|
||||
catch (InvalidOperationException)
|
||||
{
|
||||
RotateBlock();
|
||||
_activeBlock!.WriteAt(seq, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, ts);
|
||||
}
|
||||
|
||||
if (_activeBlock!.IsSealed)
|
||||
RotateBlock();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Loads the message immediately before <paramref name="start"/> by walking
|
||||
/// backward from <c>start - 1</c> to <c>_first</c>.
|
||||
/// Throws <see cref="KeyNotFoundException"/> if no such message exists.
|
||||
/// Reference: golang/nats-server/server/filestore.go — LoadPrevMsg.
|
||||
/// </summary>
|
||||
public StoreMsg LoadPrevMsg(ulong start, StoreMsg? sm)
|
||||
{
|
||||
if (start == 0)
|
||||
throw new KeyNotFoundException("No message found before seq 0.");
|
||||
|
||||
var first = _messages.Count > 0 ? _messages.Keys.Min() : 1UL;
|
||||
|
||||
for (var seq = start - 1; seq >= first && seq <= _last; seq--)
|
||||
{
|
||||
if (_messages.TryGetValue(seq, out var stored))
|
||||
{
|
||||
sm ??= new StoreMsg();
|
||||
sm.Clear();
|
||||
sm.Subject = stored.Subject;
|
||||
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
|
||||
sm.Sequence = stored.Sequence;
|
||||
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||
return sm;
|
||||
}
|
||||
|
||||
// Prevent underflow on ulong subtraction.
|
||||
if (seq == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
throw new KeyNotFoundException($"No message found before seq {start}.");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the storage backend type for this store instance.
|
||||
/// Reference: golang/nats-server/server/filestore.go — fileStore.Type.
|
||||
/// </summary>
|
||||
public StorageType Type() => StorageType.File;
|
||||
|
||||
/// <summary>
|
||||
/// Flushes the active block to disk and marks the store as stopped.
|
||||
/// After <c>Stop()</c> returns, calls to <see cref="StoreMsg"/> or
|
||||
/// <see cref="AppendAsync"/> will throw <see cref="ObjectDisposedException"/>.
|
||||
/// Blocks are NOT deleted — use <see cref="Delete"/> if data removal is needed.
|
||||
/// Reference: golang/nats-server/server/filestore.go — fileStore.Stop.
|
||||
/// </summary>
|
||||
public void Stop()
|
||||
{
|
||||
if (_stopped)
|
||||
return;
|
||||
|
||||
_stopped = true;
|
||||
|
||||
// Flush the active block to ensure all buffered writes reach disk.
|
||||
_activeBlock?.Flush();
|
||||
|
||||
// Dispose all blocks to release OS file handles. The files remain on disk.
|
||||
DisposeAllBlocks();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// ConsumerStore factory
|
||||
// Reference: golang/nats-server/server/filestore.go — fileStore.ConsumerStore
|
||||
|
||||
@@ -0,0 +1,294 @@
|
||||
// Reference: golang/nats-server/server/filestore.go
|
||||
// Tests in this file:
|
||||
// StoreRawMsg_stores_at_specified_sequence — IStreamStore.StoreRawMsg preserves caller seq/ts
|
||||
// LoadPrevMsg_returns_message_before_seq — IStreamStore.LoadPrevMsg backward scan
|
||||
// Type_returns_file — IStreamStore.Type() returns StorageType.File
|
||||
// Stop_prevents_further_writes — IStreamStore.Stop() sets _stopped flag
|
||||
|
||||
using NATS.Server.JetStream.Storage;
|
||||
using StorageType = NATS.Server.JetStream.Models.StorageType;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Storage;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for IStreamStore methods added to FileStore in Batch 1:
|
||||
/// StoreRawMsg, LoadPrevMsg, Type, and Stop.
|
||||
/// </summary>
|
||||
public sealed class FileStoreStreamStoreTests : IDisposable
|
||||
{
|
||||
private readonly string _root;
|
||||
|
||||
public FileStoreStreamStoreTests()
|
||||
{
|
||||
_root = Path.Combine(Path.GetTempPath(), $"nats-js-sstest-{Guid.NewGuid():N}");
|
||||
Directory.CreateDirectory(_root);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
// Best-effort cleanup of temp directory. If it fails (e.g. open handles on CI),
|
||||
// the OS will clean it up on the next reboot. Letting it throw would suppress
|
||||
// the real test failure so we absorb IO errors explicitly.
|
||||
if (!Directory.Exists(_root))
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
Directory.Delete(_root, recursive: true);
|
||||
}
|
||||
catch (IOException ex)
|
||||
{
|
||||
// Open file handles (common on Windows CI) — log and continue.
|
||||
Console.Error.WriteLine($"[FileStoreStreamStoreTests] Dispose: {ex.Message}");
|
||||
}
|
||||
catch (UnauthorizedAccessException ex)
|
||||
{
|
||||
// Read-only files left by the test — log and continue.
|
||||
Console.Error.WriteLine($"[FileStoreStreamStoreTests] Dispose: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
private FileStore CreateStore(string subDir, FileStoreOptions? opts = null)
|
||||
{
|
||||
var dir = Path.Combine(_root, subDir);
|
||||
Directory.CreateDirectory(dir);
|
||||
var o = opts ?? new FileStoreOptions();
|
||||
o.Directory = dir;
|
||||
return new FileStore(o);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// StoreRawMsg
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
// Go: filestore.go storeRawMsg — caller specifies seq and ts; store must not
|
||||
// auto-increment, and _last must be updated to Math.Max(_last, seq).
|
||||
[Fact]
|
||||
public void StoreRawMsg_stores_at_specified_sequence()
|
||||
{
|
||||
using var store = CreateStore("raw-seq");
|
||||
IStreamStore ss = store;
|
||||
|
||||
var subject = "events.raw";
|
||||
var data = "hello raw"u8.ToArray();
|
||||
// Use a specific Unix nanosecond timestamp.
|
||||
var tsNs = new DateTimeOffset(2024, 6, 1, 12, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||
const ulong targetSeq = 42UL;
|
||||
|
||||
ss.StoreRawMsg(subject, null, data, targetSeq, tsNs, 0, false);
|
||||
|
||||
// Verify by loading the message back via LoadMsg.
|
||||
var sm = ss.LoadMsg(targetSeq, null);
|
||||
sm.Subject.ShouldBe(subject);
|
||||
sm.Sequence.ShouldBe(targetSeq);
|
||||
sm.Timestamp.ShouldBe(tsNs);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void StoreRawMsg_updates_last_watermark()
|
||||
{
|
||||
using var store = CreateStore("raw-wm");
|
||||
IStreamStore ss = store;
|
||||
|
||||
// Store a message at seq 100 — _last should become 100.
|
||||
ss.StoreRawMsg("foo", null, "x"u8.ToArray(), 100UL, 1_000_000L, 0, false);
|
||||
|
||||
var state = new StreamState();
|
||||
ss.FastState(ref state);
|
||||
state.LastSeq.ShouldBe(100UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void StoreRawMsg_does_not_decrement_last_for_lower_seq()
|
||||
{
|
||||
using var store = CreateStore("raw-order");
|
||||
IStreamStore ss = store;
|
||||
|
||||
// Write seq 50 first, then seq 30 (out-of-order replication scenario).
|
||||
ss.StoreRawMsg("foo", null, "x"u8.ToArray(), 50UL, 1_000_000L, 0, false);
|
||||
ss.StoreRawMsg("bar", null, "y"u8.ToArray(), 30UL, 2_000_000L, 0, false);
|
||||
|
||||
var state = new StreamState();
|
||||
ss.FastState(ref state);
|
||||
// _last should remain 50, not go down to 30.
|
||||
state.LastSeq.ShouldBe(50UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void StoreRawMsg_preserves_caller_timestamp()
|
||||
{
|
||||
using var store = CreateStore("raw-ts");
|
||||
IStreamStore ss = store;
|
||||
|
||||
var subject = "ts.test";
|
||||
var data = "payload"u8.ToArray();
|
||||
// A deterministic Unix nanosecond timestamp.
|
||||
var tsNs = 1_717_238_400_000_000_000L; // 2024-06-01 00:00:00 UTC in ns
|
||||
|
||||
ss.StoreRawMsg(subject, null, data, 7UL, tsNs, 0, false);
|
||||
|
||||
var sm = ss.LoadMsg(7UL, null);
|
||||
sm.Timestamp.ShouldBe(tsNs);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void StoreRawMsg_throws_after_stop()
|
||||
{
|
||||
using var store = CreateStore("raw-stop");
|
||||
IStreamStore ss = store;
|
||||
|
||||
ss.Stop();
|
||||
|
||||
Should.Throw<ObjectDisposedException>(() =>
|
||||
ss.StoreRawMsg("foo", null, "x"u8.ToArray(), 1UL, 1_000_000L, 0, false));
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// LoadPrevMsg
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
// Go: filestore.go LoadPrevMsg — walks backward from start-1 to first.
|
||||
[Fact]
|
||||
public void LoadPrevMsg_returns_message_before_seq()
|
||||
{
|
||||
using var store = CreateStore("prev-basic");
|
||||
IStreamStore ss = store;
|
||||
|
||||
// Write 3 messages at seqs 1, 2, 3.
|
||||
ss.StoreMsg("a", null, "msg1"u8.ToArray(), 0);
|
||||
ss.StoreMsg("b", null, "msg2"u8.ToArray(), 0);
|
||||
ss.StoreMsg("c", null, "msg3"u8.ToArray(), 0);
|
||||
|
||||
// LoadPrevMsg(3) should return seq 2.
|
||||
var sm = ss.LoadPrevMsg(3UL, null);
|
||||
sm.Sequence.ShouldBe(2UL);
|
||||
sm.Subject.ShouldBe("b");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LoadPrevMsg_skips_deleted_message()
|
||||
{
|
||||
using var store = CreateStore("prev-skip");
|
||||
IStreamStore ss = store;
|
||||
|
||||
ss.StoreMsg("a", null, "msg1"u8.ToArray(), 0); // seq 1
|
||||
ss.StoreMsg("b", null, "msg2"u8.ToArray(), 0); // seq 2
|
||||
ss.StoreMsg("c", null, "msg3"u8.ToArray(), 0); // seq 3
|
||||
|
||||
// Delete seq 2 — LoadPrevMsg(3) must skip it and return seq 1.
|
||||
ss.RemoveMsg(2UL);
|
||||
|
||||
var sm = ss.LoadPrevMsg(3UL, null);
|
||||
sm.Sequence.ShouldBe(1UL);
|
||||
sm.Subject.ShouldBe("a");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LoadPrevMsg_throws_when_no_message_before_seq()
|
||||
{
|
||||
using var store = CreateStore("prev-none");
|
||||
IStreamStore ss = store;
|
||||
|
||||
ss.StoreMsg("a", null, "msg1"u8.ToArray(), 0); // seq 1
|
||||
|
||||
// LoadPrevMsg(1) — nothing before seq 1.
|
||||
Should.Throw<KeyNotFoundException>(() => ss.LoadPrevMsg(1UL, null));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LoadPrevMsg_reuses_provided_container()
|
||||
{
|
||||
using var store = CreateStore("prev-reuse");
|
||||
IStreamStore ss = store;
|
||||
|
||||
ss.StoreMsg("x", null, "d1"u8.ToArray(), 0); // seq 1
|
||||
ss.StoreMsg("y", null, "d2"u8.ToArray(), 0); // seq 2
|
||||
|
||||
var container = new StoreMsg();
|
||||
var result = ss.LoadPrevMsg(2UL, container);
|
||||
|
||||
// Should return the same object reference.
|
||||
result.ShouldBeSameAs(container);
|
||||
container.Sequence.ShouldBe(1UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Type
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
// Go: filestore.go fileStore.Type — returns StorageType.File.
|
||||
[Fact]
|
||||
public void Type_returns_file()
|
||||
{
|
||||
using var store = CreateStore("type");
|
||||
IStreamStore ss = store;
|
||||
|
||||
ss.Type().ShouldBe(StorageType.File);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Stop
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
// Go: filestore.go fileStore.Stop — flushes and marks as stopped.
|
||||
[Fact]
|
||||
public void Stop_prevents_further_writes_via_StoreMsg()
|
||||
{
|
||||
using var store = CreateStore("stop-storemsg");
|
||||
IStreamStore ss = store;
|
||||
|
||||
ss.StoreMsg("ok", null, "before"u8.ToArray(), 0);
|
||||
|
||||
ss.Stop();
|
||||
|
||||
Should.Throw<ObjectDisposedException>(() =>
|
||||
ss.StoreMsg("fail", null, "after"u8.ToArray(), 0));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Stop_prevents_further_writes_via_AppendAsync()
|
||||
{
|
||||
using var store = CreateStore("stop-append");
|
||||
|
||||
await store.AppendAsync("ok", "before"u8.ToArray(), CancellationToken.None);
|
||||
|
||||
((IStreamStore)store).Stop();
|
||||
|
||||
await Should.ThrowAsync<ObjectDisposedException>(() =>
|
||||
store.AppendAsync("fail", "after"u8.ToArray(), CancellationToken.None).AsTask());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Stop_is_idempotent()
|
||||
{
|
||||
using var store = CreateStore("stop-idem");
|
||||
IStreamStore ss = store;
|
||||
|
||||
ss.Stop();
|
||||
|
||||
// Second Stop() must not throw.
|
||||
var ex = Record.Exception(() => ss.Stop());
|
||||
ex.ShouldBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Stop_preserves_messages_on_disk()
|
||||
{
|
||||
var dir = Path.Combine(_root, "stop-persist");
|
||||
Directory.CreateDirectory(dir);
|
||||
|
||||
FileStore CreateWithDir() => new FileStore(new FileStoreOptions { Directory = dir });
|
||||
|
||||
// Write a message, stop the store.
|
||||
using (var store = CreateWithDir())
|
||||
{
|
||||
((IStreamStore)store).StoreMsg("saved", null, "payload"u8.ToArray(), 0);
|
||||
((IStreamStore)store).Stop();
|
||||
}
|
||||
|
||||
// Re-open and verify the message survived.
|
||||
using var recovered = CreateWithDir();
|
||||
var sm = ((IStreamStore)recovered).LoadMsg(1UL, null);
|
||||
sm.Subject.ShouldBe("saved");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user