diff --git a/src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs b/src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs index c222849..a89cc75 100644 --- a/src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs +++ b/src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs @@ -1,10 +1,210 @@ +// Go reference: server/jetstream_api.go — jsStreamSnapshotT / jsStreamRestoreT +// TAR-based snapshot with S2 (Snappy) compression. Each snapshot is a Snappy-compressed +// TAR archive containing stream.json (the stream config) and messages/NNNNNN.json files +// (one per stored message, in sequence order). + +using System.Formats.Tar; +using System.Text; +using System.Text.Json; +using IronSnappy; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + namespace NATS.Server.JetStream.Snapshots; +// ───────────────────────────────────────────────────────────────────────────── +// Result type +// ───────────────────────────────────────────────────────────────────────────── + +/// +/// Statistics returned after a successful TAR snapshot restore. +/// +public sealed record SnapshotRestoreResult( + string StreamName, + int MessagesRestored, + long BytesRestored); + +// ───────────────────────────────────────────────────────────────────────────── +// Wire-format DTO (serialised inside the TAR entries) +// ───────────────────────────────────────────────────────────────────────────── + +file sealed class TarMessageEntry +{ + public ulong Sequence { get; init; } + public string Subject { get; init; } = string.Empty; + public string Payload { get; init; } = string.Empty; // base-64 + public string Timestamp { get; init; } = string.Empty; // ISO-8601 UTC +} + +// ───────────────────────────────────────────────────────────────────────────── +// Service +// ───────────────────────────────────────────────────────────────────────────── + public sealed class StreamSnapshotService { + // ────────────────────────────────────────────────────────────────────── + // Existing thin wrappers (kept for API compatibility) + // ────────────────────────────────────────────────────────────────────── + public ValueTask SnapshotAsync(StreamHandle stream, CancellationToken ct) => stream.Store.CreateSnapshotAsync(ct); public ValueTask RestoreAsync(StreamHandle stream, ReadOnlyMemory snapshot, CancellationToken ct) => stream.Store.RestoreSnapshotAsync(snapshot, ct); + + // ────────────────────────────────────────────────────────────────────── + // TAR + S2 snapshot + // ────────────────────────────────────────────────────────────────────── + + /// + /// Serialise the stream config and every stored message into a TAR archive, + /// then Snappy-compress the result and return the bytes. + /// + /// Archive layout: + /// stream.json — JSON-serialised StreamConfig + /// messages/000001.json — one file per message, padded to 6 digits + /// messages/000002.json + /// … + /// + public async Task CreateTarSnapshotAsync(StreamHandle stream, CancellationToken ct) + { + // Collect messages first (outside the TAR buffer so we hold no lock). + var messages = await stream.Store.ListAsync(ct); + + using var tarBuffer = new MemoryStream(); + using (var writer = new TarWriter(tarBuffer, TarEntryFormat.Pax, leaveOpen: true)) + { + // ── stream.json ────────────────────────────────────────────── + var configJson = JsonSerializer.SerializeToUtf8Bytes(stream.Config); + await WriteEntryAsync(writer, "stream.json", configJson, ct); + + // ── messages/NNNNNN.json ───────────────────────────────────── + foreach (var msg in messages.OrderBy(m => m.Sequence)) + { + var entry = new TarMessageEntry + { + Sequence = msg.Sequence, + Subject = msg.Subject, + Payload = Convert.ToBase64String(msg.Payload.ToArray()), + Timestamp = msg.TimestampUtc.ToString("O"), + }; + var entryJson = JsonSerializer.SerializeToUtf8Bytes(entry); + var entryName = $"messages/{msg.Sequence:D6}.json"; + await WriteEntryAsync(writer, entryName, entryJson, ct); + } + } + + // Snappy-compress the TAR bytes. + var tarBytes = tarBuffer.ToArray(); + return Snappy.Encode(tarBytes); + } + + /// + /// Decompress a Snappy-compressed TAR archive, validate stream.json, and + /// replay all message entries back into the store. + /// + public async Task RestoreTarSnapshotAsync( + StreamHandle stream, + ReadOnlyMemory snapshot, + CancellationToken ct) + { + // Decompress. + var tarBytes = Snappy.Decode(snapshot.ToArray()); + + using var tarBuffer = new MemoryStream(tarBytes); + using var reader = new TarReader(tarBuffer, leaveOpen: false); + + bool foundConfig = false; + int messagesRestored = 0; + long bytesRestored = 0; + + // Collect message entries in a list so we can replay in sequence order. + var messageEntries = new List(); + + TarEntry? entry; + while ((entry = await reader.GetNextEntryAsync(copyData: true, ct)) is not null) + { + if (entry.DataStream is null) + continue; + + if (entry.Name == "stream.json") + { + // Validate that the config is parseable. + var cfgBytes = await ReadStreamAsync(entry.DataStream, ct); + var cfg = JsonSerializer.Deserialize(cfgBytes); + if (cfg is null) + throw new InvalidOperationException( + "Snapshot validation failed: stream.json could not be parsed."); + foundConfig = true; + continue; + } + + if (entry.Name.StartsWith("messages/", StringComparison.Ordinal) + && entry.Name.EndsWith(".json", StringComparison.Ordinal)) + { + var msgBytes = await ReadStreamAsync(entry.DataStream, ct); + var msg = JsonSerializer.Deserialize(msgBytes); + if (msg is not null) + messageEntries.Add(msg); + } + } + + if (!foundConfig) + throw new InvalidOperationException( + "Snapshot validation failed: stream.json not found in archive."); + + // Purge the existing store then replay messages. + await stream.Store.PurgeAsync(ct); + + foreach (var msg in messageEntries.OrderBy(m => m.Sequence)) + { + var payload = Convert.FromBase64String(msg.Payload); + await stream.Store.AppendAsync(msg.Subject, payload, ct); + messagesRestored++; + bytesRestored += payload.Length; + } + + return new SnapshotRestoreResult( + StreamName: stream.Config.Name, + MessagesRestored: messagesRestored, + BytesRestored: bytesRestored); + } + + /// + /// Same as but cancels automatically if + /// the operation has not completed within . + /// + public async Task CreateTarSnapshotWithDeadlineAsync( + StreamHandle stream, + TimeSpan deadline, + CancellationToken ct) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + cts.CancelAfter(deadline); + return await CreateTarSnapshotAsync(stream, cts.Token); + } + + // ────────────────────────────────────────────────────────────────────── + // Private helpers + // ────────────────────────────────────────────────────────────────────── + + private static async Task WriteEntryAsync( + TarWriter writer, + string name, + byte[] data, + CancellationToken ct) + { + var tarEntry = new PaxTarEntry(TarEntryType.RegularFile, name) + { + DataStream = new MemoryStream(data), + }; + await writer.WriteEntryAsync(tarEntry, ct); + } + + private static async Task ReadStreamAsync(Stream src, CancellationToken ct) + { + using var ms = new MemoryStream(); + await src.CopyToAsync(ms, ct); + return ms.ToArray(); + } } diff --git a/tests/NATS.Server.Tests/JetStream/Snapshots/StreamSnapshotTests.cs b/tests/NATS.Server.Tests/JetStream/Snapshots/StreamSnapshotTests.cs new file mode 100644 index 0000000..1b06f7e --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Snapshots/StreamSnapshotTests.cs @@ -0,0 +1,283 @@ +// Go reference: server/jetstream_api.go — jsStreamSnapshotT / jsStreamRestoreT +// Tests for TAR-based stream snapshot/restore with Snappy (S2) compression. + +using System.Formats.Tar; +using System.Text; +using System.Text.Json; +using IronSnappy; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Snapshots; +using NATS.Server.JetStream.Storage; +using Shouldly; + +namespace NATS.Server.Tests.JetStream.Snapshots; + +public sealed class StreamSnapshotTests +{ + // ────────────────────────────────────────────────────────────────────────── + // Helpers + // ────────────────────────────────────────────────────────────────────────── + + private static StreamHandle MakeHandle(string name = "TEST") + { + var config = new StreamConfig { Name = name }; + IStreamStore store = new MemStore(config); + return new StreamHandle(config, store); + } + + private static async Task StoreAsync(StreamHandle h, string subject, string payload) + => await h.Store.AppendAsync(subject, Encoding.UTF8.GetBytes(payload), default); + + private static async Task<(byte[] tarBytes, List entryNames)> DecompressAndListEntries( + byte[] snappyBytes) + { + var tarBytes = Snappy.Decode(snappyBytes); + using var ms = new MemoryStream(tarBytes); + using var reader = new TarReader(ms, leaveOpen: false); + + var names = new List(); + TarEntry? entry; + while ((entry = await reader.GetNextEntryAsync()) is not null) + names.Add(entry.Name); + + return (tarBytes, names); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 1 — stream.json is present in the TAR + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/jetstream_api.go:jsStreamSnapshotT — snapshot includes config JSON. + [Fact] + public async Task CreateTarSnapshot_includes_stream_config() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle("MYSTREAM"); + + var compressed = await svc.CreateTarSnapshotAsync(handle, default); + + var (_, names) = await DecompressAndListEntries(compressed); + names.ShouldContain("stream.json"); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 2 — all messages appear in the TAR + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/jetstream_api.go:jsStreamSnapshotT — snapshot contains all stored messages. + [Fact] + public async Task CreateTarSnapshot_includes_all_messages() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle(); + + for (var i = 1; i <= 5; i++) + await StoreAsync(handle, $"foo.{i}", $"payload-{i}"); + + var compressed = await svc.CreateTarSnapshotAsync(handle, default); + + var (_, names) = await DecompressAndListEntries(compressed); + var messageEntries = names.Where(n => n.StartsWith("messages/")).ToList(); + messageEntries.Count.ShouldBe(5); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 3 — the snapshot bytes are Snappy-compressed + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/filestore.go — S2 / Snappy compression is used for snapshots. + [Fact] + public async Task CreateTarSnapshot_compresses_with_snappy() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle(); + await StoreAsync(handle, "test.subject", "test payload data"); + + var compressed = await svc.CreateTarSnapshotAsync(handle, default); + + // Round-trip decode must succeed — this verifies it is valid Snappy data. + var decoded = Should.NotThrow(() => Snappy.Decode(compressed)); + decoded.ShouldNotBeEmpty(); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 4 — restore replays messages back into the store + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/jetstream_api.go:jsStreamRestoreT — restore re-populates the store. + [Fact] + public async Task RestoreTarSnapshot_replays_messages() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle(); + + await StoreAsync(handle, "a.1", "first"); + await StoreAsync(handle, "a.2", "second"); + await StoreAsync(handle, "a.3", "third"); + + var compressed = await svc.CreateTarSnapshotAsync(handle, default); + + // Purge the store to simulate data loss. + await handle.Store.PurgeAsync(default); + var stateAfterPurge = await handle.Store.GetStateAsync(default); + stateAfterPurge.Messages.ShouldBe(0UL); + + // Restore from the snapshot. + await svc.RestoreTarSnapshotAsync(handle, compressed, default); + + var stateAfterRestore = await handle.Store.GetStateAsync(default); + stateAfterRestore.Messages.ShouldBe(3UL); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 5 — restore returns correct stats + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/jetstream_api.go:jsStreamRestoreT — response reports bytes/messages restored. + [Fact] + public async Task RestoreTarSnapshot_returns_correct_stats() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle("STATSSTREAM"); + + await StoreAsync(handle, "s.1", "hello"); + await StoreAsync(handle, "s.2", "world"); + + var compressed = await svc.CreateTarSnapshotAsync(handle, default); + await handle.Store.PurgeAsync(default); + + var result = await svc.RestoreTarSnapshotAsync(handle, compressed, default); + + result.StreamName.ShouldBe("STATSSTREAM"); + result.MessagesRestored.ShouldBe(2); + result.BytesRestored.ShouldBeGreaterThan(0L); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 6 — restore rejects a TAR that is missing stream.json + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/jetstream_api.go:jsStreamRestoreT — invalid snapshot returns an error. + [Fact] + public async Task RestoreTarSnapshot_validates_stream_json() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle(); + + // Build a TAR that contains no stream.json entry. + using var tarBuffer = new MemoryStream(); + using (var writer = new TarWriter(tarBuffer, TarEntryFormat.Pax, leaveOpen: true)) + { + var bogus = new PaxTarEntry(TarEntryType.RegularFile, "other/file.txt") + { + DataStream = new MemoryStream("not a config"u8.ToArray()), + }; + await writer.WriteEntryAsync(bogus); + } + + var badSnappy = Snappy.Encode(tarBuffer.ToArray()); + + await Should.ThrowAsync( + () => svc.RestoreTarSnapshotAsync(handle, badSnappy, default)); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 7 — snapshot with deadline completes when deadline is generous + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/filestore.go — snapshot creation must respect deadlines. + [Fact] + public async Task CreateTarSnapshotWithDeadline_completes_within_deadline() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle(); + await StoreAsync(handle, "d.1", "data"); + + var compressed = await svc.CreateTarSnapshotWithDeadlineAsync( + handle, + TimeSpan.FromSeconds(30), + default); + + compressed.ShouldNotBeEmpty(); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 8 — snapshot of an empty stream produces a valid archive + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/jetstream_api.go:jsStreamSnapshotT — empty stream is a valid snapshot. + [Fact] + public async Task CreateTarSnapshot_empty_stream() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle("EMPTY"); + + var compressed = await svc.CreateTarSnapshotAsync(handle, default); + + compressed.ShouldNotBeEmpty(); + var (_, names) = await DecompressAndListEntries(compressed); + names.ShouldContain("stream.json"); + names.Where(n => n.StartsWith("messages/")).ShouldBeEmpty(); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 9 — round-trip preserves message subjects + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/jetstream_api.go — snapshot/restore is lossless for subjects. + [Fact] + public async Task Roundtrip_snapshot_preserves_message_subjects() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle("SUBJECTS"); + + var subjects = new[] { "alpha.1", "beta.2", "gamma.3" }; + foreach (var s in subjects) + await StoreAsync(handle, s, $"payload for {s}"); + + var compressed = await svc.CreateTarSnapshotAsync(handle, default); + + // Restore to a fresh store. + var newConfig = new StreamConfig { Name = "SUBJECTS" }; + IStreamStore newStore = new MemStore(newConfig); + var newHandle = new StreamHandle(newConfig, newStore); + + await svc.RestoreTarSnapshotAsync(newHandle, compressed, default); + + var restored = await newStore.ListAsync(default); + var restoredSubjects = restored.Select(m => m.Subject).OrderBy(x => x).ToArray(); + var expectedSubjects = subjects.OrderBy(x => x).ToArray(); + restoredSubjects.ShouldBe(expectedSubjects); + } + + // ────────────────────────────────────────────────────────────────────────── + // Test 10 — round-trip preserves message payloads + // ────────────────────────────────────────────────────────────────────────── + + // Go ref: server/jetstream_api.go — snapshot/restore is lossless for payload bytes. + [Fact] + public async Task Roundtrip_snapshot_preserves_message_payloads() + { + var svc = new StreamSnapshotService(); + var handle = MakeHandle("PAYLOADS"); + + var payloads = new[] { "hello world"u8.ToArray(), new byte[] { 0, 1, 2, 3, 255 } }; + await handle.Store.AppendAsync("p.1", payloads[0], default); + await handle.Store.AppendAsync("p.2", payloads[1], default); + + var compressed = await svc.CreateTarSnapshotAsync(handle, default); + + // Restore to a fresh store. + var newConfig = new StreamConfig { Name = "PAYLOADS" }; + IStreamStore newStore = new MemStore(newConfig); + var newHandle = new StreamHandle(newConfig, newStore); + + await svc.RestoreTarSnapshotAsync(newHandle, compressed, default); + + var restored = (await newStore.ListAsync(default)).OrderBy(m => m.Sequence).ToList(); + restored.Count.ShouldBe(2); + restored[0].Payload.ToArray().ShouldBe(payloads[0]); + restored[1].Payload.ToArray().ShouldBe(payloads[1]); + } +}