feat: implement TAR-based stream snapshot with S2 compression (Gap 4.7)

Enhance StreamSnapshotService with CreateTarSnapshotAsync / RestoreTarSnapshotAsync
methods that produce a Snappy-compressed TAR archive (stream.json + messages/*.json).
Add CreateTarSnapshotWithDeadlineAsync for deadline-bounded snapshots, and a
SnapshotRestoreResult record carrying stats. 10 new unit tests in
JetStream/Snapshots/StreamSnapshotTests.cs exercise the full create/restore
round-trip, compression format, empty-stream edge case, and deadline enforcement.
This commit is contained in:
Joseph Doherty
2026-02-25 11:22:25 -05:00
parent b9f6a8cc0b
commit 79a3ccba4c
2 changed files with 483 additions and 0 deletions

View File

@@ -1,10 +1,210 @@
// Go reference: server/jetstream_api.go — jsStreamSnapshotT / jsStreamRestoreT
// TAR-based snapshot with S2 (Snappy) compression. Each snapshot is a Snappy-compressed
// TAR archive containing stream.json (the stream config) and messages/NNNNNN.json files
// (one per stored message, in sequence order).
using System.Formats.Tar;
using System.Text;
using System.Text.Json;
using IronSnappy;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Snapshots;
// ─────────────────────────────────────────────────────────────────────────────
// Result type
// ─────────────────────────────────────────────────────────────────────────────
/// <summary>
/// Statistics returned after a successful TAR snapshot restore.
/// </summary>
public sealed record SnapshotRestoreResult(
string StreamName,
int MessagesRestored,
long BytesRestored);
// ─────────────────────────────────────────────────────────────────────────────
// Wire-format DTO (serialised inside the TAR entries)
// ─────────────────────────────────────────────────────────────────────────────
file sealed class TarMessageEntry
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public string Payload { get; init; } = string.Empty; // base-64
public string Timestamp { get; init; } = string.Empty; // ISO-8601 UTC
}
// ─────────────────────────────────────────────────────────────────────────────
// Service
// ─────────────────────────────────────────────────────────────────────────────
public sealed class StreamSnapshotService
{
// ──────────────────────────────────────────────────────────────────────
// Existing thin wrappers (kept for API compatibility)
// ──────────────────────────────────────────────────────────────────────
public ValueTask<byte[]> SnapshotAsync(StreamHandle stream, CancellationToken ct)
=> stream.Store.CreateSnapshotAsync(ct);
public ValueTask RestoreAsync(StreamHandle stream, ReadOnlyMemory<byte> snapshot, CancellationToken ct)
=> stream.Store.RestoreSnapshotAsync(snapshot, ct);
// ──────────────────────────────────────────────────────────────────────
// TAR + S2 snapshot
// ──────────────────────────────────────────────────────────────────────
/// <summary>
/// Serialise the stream config and every stored message into a TAR archive,
/// then Snappy-compress the result and return the bytes.
///
/// Archive layout:
/// stream.json — JSON-serialised StreamConfig
/// messages/000001.json — one file per message, padded to 6 digits
/// messages/000002.json
/// …
/// </summary>
public async Task<byte[]> CreateTarSnapshotAsync(StreamHandle stream, CancellationToken ct)
{
// Collect messages first (outside the TAR buffer so we hold no lock).
var messages = await stream.Store.ListAsync(ct);
using var tarBuffer = new MemoryStream();
using (var writer = new TarWriter(tarBuffer, TarEntryFormat.Pax, leaveOpen: true))
{
// ── stream.json ──────────────────────────────────────────────
var configJson = JsonSerializer.SerializeToUtf8Bytes(stream.Config);
await WriteEntryAsync(writer, "stream.json", configJson, ct);
// ── messages/NNNNNN.json ─────────────────────────────────────
foreach (var msg in messages.OrderBy(m => m.Sequence))
{
var entry = new TarMessageEntry
{
Sequence = msg.Sequence,
Subject = msg.Subject,
Payload = Convert.ToBase64String(msg.Payload.ToArray()),
Timestamp = msg.TimestampUtc.ToString("O"),
};
var entryJson = JsonSerializer.SerializeToUtf8Bytes(entry);
var entryName = $"messages/{msg.Sequence:D6}.json";
await WriteEntryAsync(writer, entryName, entryJson, ct);
}
}
// Snappy-compress the TAR bytes.
var tarBytes = tarBuffer.ToArray();
return Snappy.Encode(tarBytes);
}
/// <summary>
/// Decompress a Snappy-compressed TAR archive, validate stream.json, and
/// replay all message entries back into the store.
/// </summary>
public async Task<SnapshotRestoreResult> RestoreTarSnapshotAsync(
StreamHandle stream,
ReadOnlyMemory<byte> snapshot,
CancellationToken ct)
{
// Decompress.
var tarBytes = Snappy.Decode(snapshot.ToArray());
using var tarBuffer = new MemoryStream(tarBytes);
using var reader = new TarReader(tarBuffer, leaveOpen: false);
bool foundConfig = false;
int messagesRestored = 0;
long bytesRestored = 0;
// Collect message entries in a list so we can replay in sequence order.
var messageEntries = new List<TarMessageEntry>();
TarEntry? entry;
while ((entry = await reader.GetNextEntryAsync(copyData: true, ct)) is not null)
{
if (entry.DataStream is null)
continue;
if (entry.Name == "stream.json")
{
// Validate that the config is parseable.
var cfgBytes = await ReadStreamAsync(entry.DataStream, ct);
var cfg = JsonSerializer.Deserialize<StreamConfig>(cfgBytes);
if (cfg is null)
throw new InvalidOperationException(
"Snapshot validation failed: stream.json could not be parsed.");
foundConfig = true;
continue;
}
if (entry.Name.StartsWith("messages/", StringComparison.Ordinal)
&& entry.Name.EndsWith(".json", StringComparison.Ordinal))
{
var msgBytes = await ReadStreamAsync(entry.DataStream, ct);
var msg = JsonSerializer.Deserialize<TarMessageEntry>(msgBytes);
if (msg is not null)
messageEntries.Add(msg);
}
}
if (!foundConfig)
throw new InvalidOperationException(
"Snapshot validation failed: stream.json not found in archive.");
// Purge the existing store then replay messages.
await stream.Store.PurgeAsync(ct);
foreach (var msg in messageEntries.OrderBy(m => m.Sequence))
{
var payload = Convert.FromBase64String(msg.Payload);
await stream.Store.AppendAsync(msg.Subject, payload, ct);
messagesRestored++;
bytesRestored += payload.Length;
}
return new SnapshotRestoreResult(
StreamName: stream.Config.Name,
MessagesRestored: messagesRestored,
BytesRestored: bytesRestored);
}
/// <summary>
/// Same as <see cref="CreateTarSnapshotAsync"/> but cancels automatically if
/// the operation has not completed within <paramref name="deadline"/>.
/// </summary>
public async Task<byte[]> CreateTarSnapshotWithDeadlineAsync(
StreamHandle stream,
TimeSpan deadline,
CancellationToken ct)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(deadline);
return await CreateTarSnapshotAsync(stream, cts.Token);
}
// ──────────────────────────────────────────────────────────────────────
// Private helpers
// ──────────────────────────────────────────────────────────────────────
private static async Task WriteEntryAsync(
TarWriter writer,
string name,
byte[] data,
CancellationToken ct)
{
var tarEntry = new PaxTarEntry(TarEntryType.RegularFile, name)
{
DataStream = new MemoryStream(data),
};
await writer.WriteEntryAsync(tarEntry, ct);
}
private static async Task<byte[]> ReadStreamAsync(Stream src, CancellationToken ct)
{
using var ms = new MemoryStream();
await src.CopyToAsync(ms, ct);
return ms.ToArray();
}
}