From 788f4254b0e24725588f6c5df22673d78d8041f2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:00:42 -0500 Subject: [PATCH] feat: implement jetstream filestore recovery baseline --- .../JetStream/Storage/FileStore.cs | 100 ++++++++++++++++++ .../JetStream/Storage/FileStoreBlock.cs | 6 ++ .../JetStream/Storage/FileStoreOptions.cs | 6 ++ tests/NATS.Server.Tests/FileStoreTests.cs | 18 ++++ 4 files changed, 130 insertions(+) create mode 100644 src/NATS.Server/JetStream/Storage/FileStore.cs create mode 100644 src/NATS.Server/JetStream/Storage/FileStoreBlock.cs create mode 100644 src/NATS.Server/JetStream/Storage/FileStoreOptions.cs create mode 100644 tests/NATS.Server.Tests/FileStoreTests.cs diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs new file mode 100644 index 0000000..04f05e7 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -0,0 +1,100 @@ +using System.Text.Json; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.JetStream.Storage; + +public sealed class FileStore : IStreamStore, IAsyncDisposable +{ + private readonly string _dataFilePath; + private readonly Dictionary _messages = new(); + private ulong _last; + + public FileStore(FileStoreOptions options) + { + Directory.CreateDirectory(options.Directory); + _dataFilePath = Path.Combine(options.Directory, "messages.jsonl"); + LoadExisting(); + } + + public async ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) + { + _last++; + var stored = new StoredMessage + { + Sequence = _last, + Subject = subject, + Payload = payload.ToArray(), + }; + _messages[_last] = stored; + + var line = JsonSerializer.Serialize(new FileRecord + { + Sequence = stored.Sequence, + Subject = stored.Subject, + PayloadBase64 = Convert.ToBase64String(stored.Payload.ToArray()), + }); + await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct); + return _last; + } + + public ValueTask LoadAsync(ulong sequence, CancellationToken ct) + { + _messages.TryGetValue(sequence, out var msg); + return ValueTask.FromResult(msg); + } + + public ValueTask PurgeAsync(CancellationToken ct) + { + _messages.Clear(); + _last = 0; + if (File.Exists(_dataFilePath)) + File.Delete(_dataFilePath); + return ValueTask.CompletedTask; + } + + public ValueTask GetStateAsync(CancellationToken ct) + { + return ValueTask.FromResult(new StreamState + { + Messages = (ulong)_messages.Count, + FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(), + LastSeq = _last, + }); + } + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + + private void LoadExisting() + { + if (!File.Exists(_dataFilePath)) + return; + + foreach (var line in File.ReadLines(_dataFilePath)) + { + if (string.IsNullOrWhiteSpace(line)) + continue; + + var record = JsonSerializer.Deserialize(line); + if (record == null) + continue; + + var message = new StoredMessage + { + Sequence = record.Sequence, + Subject = record.Subject ?? string.Empty, + Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty), + }; + + _messages[message.Sequence] = message; + if (message.Sequence > _last) + _last = message.Sequence; + } + } + + private sealed class FileRecord + { + public ulong Sequence { get; init; } + public string? Subject { get; init; } + public string? PayloadBase64 { get; init; } + } +} diff --git a/src/NATS.Server/JetStream/Storage/FileStoreBlock.cs b/src/NATS.Server/JetStream/Storage/FileStoreBlock.cs new file mode 100644 index 0000000..4d4689f --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/FileStoreBlock.cs @@ -0,0 +1,6 @@ +namespace NATS.Server.JetStream.Storage; + +public sealed class FileStoreBlock +{ + public required string Path { get; init; } +} diff --git a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs new file mode 100644 index 0000000..17bf85d --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs @@ -0,0 +1,6 @@ +namespace NATS.Server.JetStream.Storage; + +public sealed class FileStoreOptions +{ + public string Directory { get; set; } = string.Empty; +} diff --git a/tests/NATS.Server.Tests/FileStoreTests.cs b/tests/NATS.Server.Tests/FileStoreTests.cs new file mode 100644 index 0000000..625be3d --- /dev/null +++ b/tests/NATS.Server.Tests/FileStoreTests.cs @@ -0,0 +1,18 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class FileStoreTests +{ + [Fact] + public async Task FileStore_recovers_messages_after_restart() + { + var dir = Directory.CreateTempSubdirectory(); + + await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName })) + await store.AppendAsync("foo", "payload"u8.ToArray(), default); + + await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName }); + (await recovered.GetStateAsync(default)).Messages.ShouldBe((ulong)1); + } +}