feat: implement jetstream filestore recovery baseline
This commit is contained in:
100
src/NATS.Server/JetStream/Storage/FileStore.cs
Normal file
100
src/NATS.Server/JetStream/Storage/FileStore.cs
Normal file
@@ -0,0 +1,100 @@
|
||||
using System.Text.Json;
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public sealed class FileStore : IStreamStore, IAsyncDisposable
|
||||
{
|
||||
private readonly string _dataFilePath;
|
||||
private readonly Dictionary<ulong, StoredMessage> _messages = new();
|
||||
private ulong _last;
|
||||
|
||||
public FileStore(FileStoreOptions options)
|
||||
{
|
||||
Directory.CreateDirectory(options.Directory);
|
||||
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
|
||||
LoadExisting();
|
||||
}
|
||||
|
||||
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
_last++;
|
||||
var stored = new StoredMessage
|
||||
{
|
||||
Sequence = _last,
|
||||
Subject = subject,
|
||||
Payload = payload.ToArray(),
|
||||
};
|
||||
_messages[_last] = stored;
|
||||
|
||||
var line = JsonSerializer.Serialize(new FileRecord
|
||||
{
|
||||
Sequence = stored.Sequence,
|
||||
Subject = stored.Subject,
|
||||
PayloadBase64 = Convert.ToBase64String(stored.Payload.ToArray()),
|
||||
});
|
||||
await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct);
|
||||
return _last;
|
||||
}
|
||||
|
||||
public ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct)
|
||||
{
|
||||
_messages.TryGetValue(sequence, out var msg);
|
||||
return ValueTask.FromResult(msg);
|
||||
}
|
||||
|
||||
public ValueTask PurgeAsync(CancellationToken ct)
|
||||
{
|
||||
_messages.Clear();
|
||||
_last = 0;
|
||||
if (File.Exists(_dataFilePath))
|
||||
File.Delete(_dataFilePath);
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
|
||||
{
|
||||
return ValueTask.FromResult(new StreamState
|
||||
{
|
||||
Messages = (ulong)_messages.Count,
|
||||
FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),
|
||||
LastSeq = _last,
|
||||
});
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
|
||||
private void LoadExisting()
|
||||
{
|
||||
if (!File.Exists(_dataFilePath))
|
||||
return;
|
||||
|
||||
foreach (var line in File.ReadLines(_dataFilePath))
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(line))
|
||||
continue;
|
||||
|
||||
var record = JsonSerializer.Deserialize<FileRecord>(line);
|
||||
if (record == null)
|
||||
continue;
|
||||
|
||||
var message = new StoredMessage
|
||||
{
|
||||
Sequence = record.Sequence,
|
||||
Subject = record.Subject ?? string.Empty,
|
||||
Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty),
|
||||
};
|
||||
|
||||
_messages[message.Sequence] = message;
|
||||
if (message.Sequence > _last)
|
||||
_last = message.Sequence;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class FileRecord
|
||||
{
|
||||
public ulong Sequence { get; init; }
|
||||
public string? Subject { get; init; }
|
||||
public string? PayloadBase64 { get; init; }
|
||||
}
|
||||
}
|
||||
6
src/NATS.Server/JetStream/Storage/FileStoreBlock.cs
Normal file
6
src/NATS.Server/JetStream/Storage/FileStoreBlock.cs
Normal file
@@ -0,0 +1,6 @@
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public sealed class FileStoreBlock
|
||||
{
|
||||
public required string Path { get; init; }
|
||||
}
|
||||
6
src/NATS.Server/JetStream/Storage/FileStoreOptions.cs
Normal file
6
src/NATS.Server/JetStream/Storage/FileStoreOptions.cs
Normal file
@@ -0,0 +1,6 @@
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public sealed class FileStoreOptions
|
||||
{
|
||||
public string Directory { get; set; } = string.Empty;
|
||||
}
|
||||
18
tests/NATS.Server.Tests/FileStoreTests.cs
Normal file
18
tests/NATS.Server.Tests/FileStoreTests.cs
Normal file
@@ -0,0 +1,18 @@
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class FileStoreTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task FileStore_recovers_messages_after_restart()
|
||||
{
|
||||
var dir = Directory.CreateTempSubdirectory();
|
||||
|
||||
await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName }))
|
||||
await store.AppendAsync("foo", "payload"u8.ToArray(), default);
|
||||
|
||||
await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName });
|
||||
(await recovered.GetStateAsync(default)).Messages.ShouldBe((ulong)1);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user