using System.Text.Json; using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.Storage; public sealed class FileStore : IStreamStore, IAsyncDisposable { private readonly string _dataFilePath; private readonly Dictionary _messages = new(); private ulong _last; public FileStore(FileStoreOptions options) { Directory.CreateDirectory(options.Directory); _dataFilePath = Path.Combine(options.Directory, "messages.jsonl"); LoadExisting(); } public async ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) { _last++; var stored = new StoredMessage { Sequence = _last, Subject = subject, Payload = payload.ToArray(), }; _messages[_last] = stored; var line = JsonSerializer.Serialize(new FileRecord { Sequence = stored.Sequence, Subject = stored.Subject, PayloadBase64 = Convert.ToBase64String(stored.Payload.ToArray()), }); await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct); return _last; } public ValueTask LoadAsync(ulong sequence, CancellationToken ct) { _messages.TryGetValue(sequence, out var msg); return ValueTask.FromResult(msg); } public ValueTask PurgeAsync(CancellationToken ct) { _messages.Clear(); _last = 0; if (File.Exists(_dataFilePath)) File.Delete(_dataFilePath); return ValueTask.CompletedTask; } public ValueTask GetStateAsync(CancellationToken ct) { return ValueTask.FromResult(new StreamState { Messages = (ulong)_messages.Count, FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(), LastSeq = _last, }); } public void TrimToMaxMessages(ulong maxMessages) { while ((ulong)_messages.Count > maxMessages) { var first = _messages.Keys.Min(); _messages.Remove(first); } RewriteDataFile(); } public ValueTask DisposeAsync() => ValueTask.CompletedTask; private void LoadExisting() { if (!File.Exists(_dataFilePath)) return; foreach (var line in File.ReadLines(_dataFilePath)) { if (string.IsNullOrWhiteSpace(line)) continue; var record = JsonSerializer.Deserialize(line); if (record == null) continue; var message = new StoredMessage { Sequence = record.Sequence, Subject = record.Subject ?? string.Empty, Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty), }; _messages[message.Sequence] = message; if (message.Sequence > _last) _last = message.Sequence; } } private void RewriteDataFile() { var lines = new List(_messages.Count); foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) { lines.Add(JsonSerializer.Serialize(new FileRecord { Sequence = message.Sequence, Subject = message.Subject, PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()), })); } File.WriteAllLines(_dataFilePath, lines); } private sealed class FileRecord { public ulong Sequence { get; init; } public string? Subject { get; init; } public string? PayloadBase64 { get; init; } } }