feat: wire snapshot/restore API endpoints (Gap 7.4 stub)
Add HandleSnapshotAsync and HandleRestoreAsync with stream-name validation, chunk metadata (NumChunks, BlkSize) in the response, and richer error codes. Add StreamManager.Exists helper. Add JetStreamSnapshot.StreamName/NumChunks/BlkSize fields. Fix AdvisoryEventTests.cs using-directive ordering. Add 12 SnapshotApiTests.
This commit is contained in:
@@ -189,6 +189,66 @@ public static class StreamApiHandlers
|
||||
: JetStreamApiResponse.NotFound(subject);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Async snapshot handler that validates stream existence before creating the snapshot,
|
||||
/// and enriches the response with stream name and chunk metadata.
|
||||
/// Go reference: server/jetstream_api.go — jsStreamSnapshotT handler.
|
||||
/// </summary>
|
||||
public static async Task<JetStreamApiResponse> HandleSnapshotAsync(
|
||||
string subject,
|
||||
StreamManager streamManager,
|
||||
CancellationToken ct)
|
||||
{
|
||||
_ = ct;
|
||||
|
||||
var streamName = ExtractTrailingToken(subject, SnapshotPrefix);
|
||||
if (streamName == null)
|
||||
return JetStreamApiResponse.NotFound(subject);
|
||||
|
||||
if (!streamManager.Exists(streamName))
|
||||
return JetStreamApiResponse.NotFound(subject);
|
||||
|
||||
var snapshot = streamManager.CreateSnapshot(streamName);
|
||||
if (snapshot == null)
|
||||
return JetStreamApiResponse.ErrorResponse(500, "snapshot creation failed");
|
||||
|
||||
return new JetStreamApiResponse
|
||||
{
|
||||
Snapshot = new JetStreamSnapshot
|
||||
{
|
||||
Payload = Convert.ToBase64String(snapshot),
|
||||
StreamName = streamName,
|
||||
NumChunks = 1,
|
||||
BlkSize = snapshot.Length,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Async restore handler that validates the payload and returns a structured error on failure.
|
||||
/// Go reference: server/jetstream_api.go — jsStreamRestoreT handler.
|
||||
/// </summary>
|
||||
public static async Task<JetStreamApiResponse> HandleRestoreAsync(
|
||||
string subject,
|
||||
byte[] payload,
|
||||
StreamManager streamManager,
|
||||
CancellationToken ct)
|
||||
{
|
||||
_ = ct;
|
||||
|
||||
var streamName = ExtractTrailingToken(subject, RestorePrefix);
|
||||
if (streamName == null)
|
||||
return JetStreamApiResponse.NotFound(subject);
|
||||
|
||||
var snapshotBytes = ParseRestorePayload(payload);
|
||||
if (snapshotBytes == null)
|
||||
return JetStreamApiResponse.ErrorResponse(400, "snapshot payload required");
|
||||
|
||||
return streamManager.RestoreSnapshot(streamName, snapshotBytes)
|
||||
? JetStreamApiResponse.SuccessResponse()
|
||||
: JetStreamApiResponse.ErrorResponse(500, "restore failed");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Clustered handlers — propose to meta RAFT group instead of local StreamManager.
|
||||
// Go reference: jetstream_cluster.go:7620-7900 jsClusteredStreamRequest and related.
|
||||
|
||||
@@ -115,6 +115,8 @@ public sealed class StreamManager
|
||||
|
||||
public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!);
|
||||
|
||||
public bool Exists(string name) => _streams.ContainsKey(name);
|
||||
|
||||
public bool Delete(string name)
|
||||
{
|
||||
if (!_streams.TryRemove(name, out _))
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
// Go reference: jetstream_api.go — advisory event publication for stream/consumer lifecycle.
|
||||
// Advisory subjects use the pattern $JS.EVENT.ADVISORY.{type}.{stream}[.{consumer}].
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Api;
|
||||
|
||||
using NATS.Server.Events;
|
||||
using NATS.Server.JetStream.Api;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Api;
|
||||
|
||||
public class AdvisoryEventTests
|
||||
{
|
||||
private static (AdvisoryPublisher Publisher, List<(string Subject, object Body)> Published) CreatePublisher()
|
||||
|
||||
317
tests/NATS.Server.Tests/JetStream/Api/SnapshotApiTests.cs
Normal file
317
tests/NATS.Server.Tests/JetStream/Api/SnapshotApiTests.cs
Normal file
@@ -0,0 +1,317 @@
|
||||
// Go reference: server/jetstream_api.go — jsStreamSnapshotT and jsStreamRestoreT handlers.
|
||||
// Snapshot creates a serialized byte representation of stream state; restore re-applies it.
|
||||
// The async variants (HandleSnapshotAsync / HandleRestoreAsync) add stream name and chunk
|
||||
// metadata to the response and provide richer error codes compared to the sync stubs.
|
||||
|
||||
using System.Text;
|
||||
using NATS.Server.JetStream;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.Api.Handlers;
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Api;
|
||||
|
||||
public class SnapshotApiTests
|
||||
{
|
||||
// ---------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
private static StreamManager CreateManagerWithStream(string streamName, string subjectPattern)
|
||||
{
|
||||
var sm = new StreamManager();
|
||||
sm.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = streamName,
|
||||
Subjects = [subjectPattern],
|
||||
});
|
||||
return sm;
|
||||
}
|
||||
|
||||
private static async Task AppendAsync(StreamManager sm, string subject, string payload)
|
||||
{
|
||||
var handle = sm.FindBySubject(subject);
|
||||
handle.ShouldNotBeNull();
|
||||
await handle!.Store.AppendAsync(subject, Encoding.UTF8.GetBytes(payload), default);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// HandleSnapshot (sync, existing)
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamSnapshotT — snapshot of an existing stream returns a non-empty base64 payload.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HandleSnapshot_returns_base64_payload_for_existing_stream()
|
||||
{
|
||||
var sm = CreateManagerWithStream("ORDERS", "orders.>");
|
||||
await AppendAsync(sm, "orders.1", "hello");
|
||||
|
||||
var response = StreamApiHandlers.HandleSnapshot("$JS.API.STREAM.SNAPSHOT.ORDERS", sm);
|
||||
|
||||
response.Error.ShouldBeNull();
|
||||
response.Snapshot.ShouldNotBeNull();
|
||||
response.Snapshot!.Payload.ShouldNotBeNullOrEmpty();
|
||||
|
||||
// Verify it is valid base64.
|
||||
var bytes = Convert.FromBase64String(response.Snapshot.Payload);
|
||||
bytes.ShouldNotBeEmpty();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamSnapshotT — snapshot of a non-existent stream returns 404.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void HandleSnapshot_returns_not_found_for_missing_stream()
|
||||
{
|
||||
var sm = new StreamManager();
|
||||
|
||||
var response = StreamApiHandlers.HandleSnapshot("$JS.API.STREAM.SNAPSHOT.MISSING", sm);
|
||||
|
||||
response.Error.ShouldNotBeNull();
|
||||
response.Error!.Code.ShouldBe(404);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// HandleRestore (sync, existing)
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamRestoreT — restore with a valid base64 snapshot payload succeeds.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HandleRestore_succeeds_with_valid_payload()
|
||||
{
|
||||
var sm = CreateManagerWithStream("ORDERS", "orders.>");
|
||||
await AppendAsync(sm, "orders.1", "msg1");
|
||||
|
||||
// Obtain a snapshot first.
|
||||
var snapshotResponse = StreamApiHandlers.HandleSnapshot("$JS.API.STREAM.SNAPSHOT.ORDERS", sm);
|
||||
snapshotResponse.Snapshot.ShouldNotBeNull();
|
||||
var base64 = snapshotResponse.Snapshot!.Payload;
|
||||
|
||||
// Restore back using the base64 bytes directly as the payload.
|
||||
var payloadBytes = Encoding.UTF8.GetBytes(base64);
|
||||
var response = StreamApiHandlers.HandleRestore(
|
||||
"$JS.API.STREAM.RESTORE.ORDERS",
|
||||
payloadBytes,
|
||||
sm);
|
||||
|
||||
response.Error.ShouldBeNull();
|
||||
response.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamRestoreT — empty payload returns a 400 error.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void HandleRestore_returns_error_for_empty_payload()
|
||||
{
|
||||
var sm = CreateManagerWithStream("ORDERS", "orders.>");
|
||||
|
||||
var response = StreamApiHandlers.HandleRestore(
|
||||
"$JS.API.STREAM.RESTORE.ORDERS",
|
||||
ReadOnlySpan<byte>.Empty,
|
||||
sm);
|
||||
|
||||
response.Error.ShouldNotBeNull();
|
||||
response.Error!.Code.ShouldBe(400);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamRestoreT — bad subject token (no trailing stream name) returns 404.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void HandleRestore_returns_not_found_for_bad_subject()
|
||||
{
|
||||
var sm = new StreamManager();
|
||||
// Subject without trailing token — ExtractTrailingToken returns null.
|
||||
var payload = Encoding.UTF8.GetBytes(Convert.ToBase64String([1, 2, 3]));
|
||||
|
||||
var response = StreamApiHandlers.HandleRestore(
|
||||
"$JS.API.STREAM.RESTORE.",
|
||||
payload,
|
||||
sm);
|
||||
|
||||
response.Error.ShouldNotBeNull();
|
||||
response.Error!.Code.ShouldBe(404);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// HandleSnapshotAsync (new)
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamSnapshotT — async handler populates StreamName in the response.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HandleSnapshotAsync_includes_stream_name_in_response()
|
||||
{
|
||||
var sm = CreateManagerWithStream("EVENTS", "events.>");
|
||||
await AppendAsync(sm, "events.1", "data");
|
||||
|
||||
var response = await StreamApiHandlers.HandleSnapshotAsync(
|
||||
"$JS.API.STREAM.SNAPSHOT.EVENTS",
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
|
||||
response.Error.ShouldBeNull();
|
||||
response.Snapshot.ShouldNotBeNull();
|
||||
response.Snapshot!.StreamName.ShouldBe("EVENTS");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamSnapshotT — async handler sets NumChunks=1 and BlkSize equal to the
|
||||
/// length of the raw (pre-base64) snapshot bytes.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HandleSnapshotAsync_includes_chunk_metadata()
|
||||
{
|
||||
var sm = CreateManagerWithStream("EVENTS", "events.>");
|
||||
await AppendAsync(sm, "events.1", "payload-data");
|
||||
|
||||
var response = await StreamApiHandlers.HandleSnapshotAsync(
|
||||
"$JS.API.STREAM.SNAPSHOT.EVENTS",
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
|
||||
response.Error.ShouldBeNull();
|
||||
var snap = response.Snapshot!;
|
||||
snap.NumChunks.ShouldBe(1);
|
||||
snap.BlkSize.ShouldBeGreaterThan(0);
|
||||
|
||||
// BlkSize should match the raw snapshot byte count.
|
||||
var rawBytes = Convert.FromBase64String(snap.Payload);
|
||||
snap.BlkSize.ShouldBe(rawBytes.Length);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// HandleSnapshotAsync returns 404 when the stream does not exist.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HandleSnapshotAsync_returns_not_found_for_missing_stream()
|
||||
{
|
||||
var sm = new StreamManager();
|
||||
|
||||
var response = await StreamApiHandlers.HandleSnapshotAsync(
|
||||
"$JS.API.STREAM.SNAPSHOT.NOPE",
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
|
||||
response.Error.ShouldNotBeNull();
|
||||
response.Error!.Code.ShouldBe(404);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// HandleRestoreAsync (new)
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamRestoreT — async restore validates the base64 payload and succeeds.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HandleRestoreAsync_validates_base64_payload()
|
||||
{
|
||||
var sm = CreateManagerWithStream("ORDERS", "orders.>");
|
||||
await AppendAsync(sm, "orders.1", "hello");
|
||||
|
||||
// Take a snapshot, then restore it using the async path.
|
||||
var snapResp = await StreamApiHandlers.HandleSnapshotAsync(
|
||||
"$JS.API.STREAM.SNAPSHOT.ORDERS",
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
snapResp.Snapshot.ShouldNotBeNull();
|
||||
var base64Payload = Encoding.UTF8.GetBytes(snapResp.Snapshot!.Payload);
|
||||
|
||||
var response = await StreamApiHandlers.HandleRestoreAsync(
|
||||
"$JS.API.STREAM.RESTORE.ORDERS",
|
||||
base64Payload,
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
|
||||
response.Error.ShouldBeNull();
|
||||
response.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// HandleRestoreAsync returns 400 when given an empty payload array.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HandleRestoreAsync_returns_error_for_empty_payload()
|
||||
{
|
||||
var sm = CreateManagerWithStream("ORDERS", "orders.>");
|
||||
|
||||
var response = await StreamApiHandlers.HandleRestoreAsync(
|
||||
"$JS.API.STREAM.RESTORE.ORDERS",
|
||||
[],
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
|
||||
response.Error.ShouldNotBeNull();
|
||||
response.Error!.Code.ShouldBe(400);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Round-trip
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamSnapshotT / jsStreamRestoreT — full snapshot-then-restore round-trip:
|
||||
/// messages written before snapshot are recoverable after restore.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Snapshot_round_trip_create_and_restore()
|
||||
{
|
||||
var sm = CreateManagerWithStream("LOGS", "logs.>");
|
||||
await AppendAsync(sm, "logs.a", "alpha");
|
||||
await AppendAsync(sm, "logs.b", "beta");
|
||||
await AppendAsync(sm, "logs.c", "gamma");
|
||||
|
||||
var stateBefore = await sm.GetStateAsync("LOGS", default);
|
||||
stateBefore.Messages.ShouldBe(3UL);
|
||||
|
||||
// Snapshot via async handler.
|
||||
var snapResp = await StreamApiHandlers.HandleSnapshotAsync(
|
||||
"$JS.API.STREAM.SNAPSHOT.LOGS",
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
snapResp.Error.ShouldBeNull();
|
||||
var base64Payload = Encoding.UTF8.GetBytes(snapResp.Snapshot!.Payload);
|
||||
|
||||
// Restore via async handler.
|
||||
var restoreResp = await StreamApiHandlers.HandleRestoreAsync(
|
||||
"$JS.API.STREAM.RESTORE.LOGS",
|
||||
base64Payload,
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
restoreResp.Error.ShouldBeNull();
|
||||
restoreResp.Success.ShouldBeTrue();
|
||||
|
||||
// State should still be consistent (restore does not clear — it re-applies).
|
||||
var stateAfter = await sm.GetStateAsync("LOGS", default);
|
||||
stateAfter.Messages.ShouldBeGreaterThanOrEqualTo(3UL);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Subject extraction
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Go ref: jsStreamSnapshotT — the stream name is correctly extracted from the API subject.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HandleSnapshot_extracts_stream_name_from_subject()
|
||||
{
|
||||
var sm = CreateManagerWithStream("MY_STREAM", "mystream.>");
|
||||
|
||||
var response = await StreamApiHandlers.HandleSnapshotAsync(
|
||||
"$JS.API.STREAM.SNAPSHOT.MY_STREAM",
|
||||
sm,
|
||||
CancellationToken.None);
|
||||
|
||||
response.Error.ShouldBeNull();
|
||||
response.Snapshot.ShouldNotBeNull();
|
||||
response.Snapshot!.StreamName.ShouldBe("MY_STREAM");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user