diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs index aeb38cb..4c1423b 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -189,6 +189,66 @@ public static class StreamApiHandlers : JetStreamApiResponse.NotFound(subject); } + /// + /// Async snapshot handler that validates stream existence before creating the snapshot, + /// and enriches the response with stream name and chunk metadata. + /// Go reference: server/jetstream_api.go — jsStreamSnapshotT handler. + /// + public static async Task HandleSnapshotAsync( + string subject, + StreamManager streamManager, + CancellationToken ct) + { + _ = ct; + + var streamName = ExtractTrailingToken(subject, SnapshotPrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + if (!streamManager.Exists(streamName)) + return JetStreamApiResponse.NotFound(subject); + + var snapshot = streamManager.CreateSnapshot(streamName); + if (snapshot == null) + return JetStreamApiResponse.ErrorResponse(500, "snapshot creation failed"); + + return new JetStreamApiResponse + { + Snapshot = new JetStreamSnapshot + { + Payload = Convert.ToBase64String(snapshot), + StreamName = streamName, + NumChunks = 1, + BlkSize = snapshot.Length, + }, + }; + } + + /// + /// Async restore handler that validates the payload and returns a structured error on failure. + /// Go reference: server/jetstream_api.go — jsStreamRestoreT handler. + /// + public static async Task HandleRestoreAsync( + string subject, + byte[] payload, + StreamManager streamManager, + CancellationToken ct) + { + _ = ct; + + var streamName = ExtractTrailingToken(subject, RestorePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var snapshotBytes = ParseRestorePayload(payload); + if (snapshotBytes == null) + return JetStreamApiResponse.ErrorResponse(400, "snapshot payload required"); + + return streamManager.RestoreSnapshot(streamName, snapshotBytes) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.ErrorResponse(500, "restore failed"); + } + // --------------------------------------------------------------- // Clustered handlers — propose to meta RAFT group instead of local StreamManager. // Go reference: jetstream_cluster.go:7620-7900 jsClusteredStreamRequest and related. diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 5afd882..987933b 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -115,6 +115,8 @@ public sealed class StreamManager public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!); + public bool Exists(string name) => _streams.ContainsKey(name); + public bool Delete(string name) { if (!_streams.TryRemove(name, out _)) diff --git a/tests/NATS.Server.Tests/JetStream/Api/AdvisoryEventTests.cs b/tests/NATS.Server.Tests/JetStream/Api/AdvisoryEventTests.cs index c17856a..5ce70cf 100644 --- a/tests/NATS.Server.Tests/JetStream/Api/AdvisoryEventTests.cs +++ b/tests/NATS.Server.Tests/JetStream/Api/AdvisoryEventTests.cs @@ -1,11 +1,11 @@ // Go reference: jetstream_api.go — advisory event publication for stream/consumer lifecycle. // Advisory subjects use the pattern $JS.EVENT.ADVISORY.{type}.{stream}[.{consumer}]. -namespace NATS.Server.Tests.JetStream.Api; - using NATS.Server.Events; using NATS.Server.JetStream.Api; +namespace NATS.Server.Tests.JetStream.Api; + public class AdvisoryEventTests { private static (AdvisoryPublisher Publisher, List<(string Subject, object Body)> Published) CreatePublisher() diff --git a/tests/NATS.Server.Tests/JetStream/Api/SnapshotApiTests.cs b/tests/NATS.Server.Tests/JetStream/Api/SnapshotApiTests.cs new file mode 100644 index 0000000..a720cb9 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Api/SnapshotApiTests.cs @@ -0,0 +1,317 @@ +// Go reference: server/jetstream_api.go — jsStreamSnapshotT and jsStreamRestoreT handlers. +// Snapshot creates a serialized byte representation of stream state; restore re-applies it. +// The async variants (HandleSnapshotAsync / HandleRestoreAsync) add stream name and chunk +// metadata to the response and provide richer error codes compared to the sync stubs. + +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Api.Handlers; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Api; + +public class SnapshotApiTests +{ + // --------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------- + + private static StreamManager CreateManagerWithStream(string streamName, string subjectPattern) + { + var sm = new StreamManager(); + sm.CreateOrUpdate(new StreamConfig + { + Name = streamName, + Subjects = [subjectPattern], + }); + return sm; + } + + private static async Task AppendAsync(StreamManager sm, string subject, string payload) + { + var handle = sm.FindBySubject(subject); + handle.ShouldNotBeNull(); + await handle!.Store.AppendAsync(subject, Encoding.UTF8.GetBytes(payload), default); + } + + // --------------------------------------------------------------- + // HandleSnapshot (sync, existing) + // --------------------------------------------------------------- + + /// + /// Go ref: jsStreamSnapshotT — snapshot of an existing stream returns a non-empty base64 payload. + /// + [Fact] + public async Task HandleSnapshot_returns_base64_payload_for_existing_stream() + { + var sm = CreateManagerWithStream("ORDERS", "orders.>"); + await AppendAsync(sm, "orders.1", "hello"); + + var response = StreamApiHandlers.HandleSnapshot("$JS.API.STREAM.SNAPSHOT.ORDERS", sm); + + response.Error.ShouldBeNull(); + response.Snapshot.ShouldNotBeNull(); + response.Snapshot!.Payload.ShouldNotBeNullOrEmpty(); + + // Verify it is valid base64. + var bytes = Convert.FromBase64String(response.Snapshot.Payload); + bytes.ShouldNotBeEmpty(); + } + + /// + /// Go ref: jsStreamSnapshotT — snapshot of a non-existent stream returns 404. + /// + [Fact] + public void HandleSnapshot_returns_not_found_for_missing_stream() + { + var sm = new StreamManager(); + + var response = StreamApiHandlers.HandleSnapshot("$JS.API.STREAM.SNAPSHOT.MISSING", sm); + + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(404); + } + + // --------------------------------------------------------------- + // HandleRestore (sync, existing) + // --------------------------------------------------------------- + + /// + /// Go ref: jsStreamRestoreT — restore with a valid base64 snapshot payload succeeds. + /// + [Fact] + public async Task HandleRestore_succeeds_with_valid_payload() + { + var sm = CreateManagerWithStream("ORDERS", "orders.>"); + await AppendAsync(sm, "orders.1", "msg1"); + + // Obtain a snapshot first. + var snapshotResponse = StreamApiHandlers.HandleSnapshot("$JS.API.STREAM.SNAPSHOT.ORDERS", sm); + snapshotResponse.Snapshot.ShouldNotBeNull(); + var base64 = snapshotResponse.Snapshot!.Payload; + + // Restore back using the base64 bytes directly as the payload. + var payloadBytes = Encoding.UTF8.GetBytes(base64); + var response = StreamApiHandlers.HandleRestore( + "$JS.API.STREAM.RESTORE.ORDERS", + payloadBytes, + sm); + + response.Error.ShouldBeNull(); + response.Success.ShouldBeTrue(); + } + + /// + /// Go ref: jsStreamRestoreT — empty payload returns a 400 error. + /// + [Fact] + public void HandleRestore_returns_error_for_empty_payload() + { + var sm = CreateManagerWithStream("ORDERS", "orders.>"); + + var response = StreamApiHandlers.HandleRestore( + "$JS.API.STREAM.RESTORE.ORDERS", + ReadOnlySpan.Empty, + sm); + + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(400); + } + + /// + /// Go ref: jsStreamRestoreT — bad subject token (no trailing stream name) returns 404. + /// + [Fact] + public void HandleRestore_returns_not_found_for_bad_subject() + { + var sm = new StreamManager(); + // Subject without trailing token — ExtractTrailingToken returns null. + var payload = Encoding.UTF8.GetBytes(Convert.ToBase64String([1, 2, 3])); + + var response = StreamApiHandlers.HandleRestore( + "$JS.API.STREAM.RESTORE.", + payload, + sm); + + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(404); + } + + // --------------------------------------------------------------- + // HandleSnapshotAsync (new) + // --------------------------------------------------------------- + + /// + /// Go ref: jsStreamSnapshotT — async handler populates StreamName in the response. + /// + [Fact] + public async Task HandleSnapshotAsync_includes_stream_name_in_response() + { + var sm = CreateManagerWithStream("EVENTS", "events.>"); + await AppendAsync(sm, "events.1", "data"); + + var response = await StreamApiHandlers.HandleSnapshotAsync( + "$JS.API.STREAM.SNAPSHOT.EVENTS", + sm, + CancellationToken.None); + + response.Error.ShouldBeNull(); + response.Snapshot.ShouldNotBeNull(); + response.Snapshot!.StreamName.ShouldBe("EVENTS"); + } + + /// + /// Go ref: jsStreamSnapshotT — async handler sets NumChunks=1 and BlkSize equal to the + /// length of the raw (pre-base64) snapshot bytes. + /// + [Fact] + public async Task HandleSnapshotAsync_includes_chunk_metadata() + { + var sm = CreateManagerWithStream("EVENTS", "events.>"); + await AppendAsync(sm, "events.1", "payload-data"); + + var response = await StreamApiHandlers.HandleSnapshotAsync( + "$JS.API.STREAM.SNAPSHOT.EVENTS", + sm, + CancellationToken.None); + + response.Error.ShouldBeNull(); + var snap = response.Snapshot!; + snap.NumChunks.ShouldBe(1); + snap.BlkSize.ShouldBeGreaterThan(0); + + // BlkSize should match the raw snapshot byte count. + var rawBytes = Convert.FromBase64String(snap.Payload); + snap.BlkSize.ShouldBe(rawBytes.Length); + } + + /// + /// HandleSnapshotAsync returns 404 when the stream does not exist. + /// + [Fact] + public async Task HandleSnapshotAsync_returns_not_found_for_missing_stream() + { + var sm = new StreamManager(); + + var response = await StreamApiHandlers.HandleSnapshotAsync( + "$JS.API.STREAM.SNAPSHOT.NOPE", + sm, + CancellationToken.None); + + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(404); + } + + // --------------------------------------------------------------- + // HandleRestoreAsync (new) + // --------------------------------------------------------------- + + /// + /// Go ref: jsStreamRestoreT — async restore validates the base64 payload and succeeds. + /// + [Fact] + public async Task HandleRestoreAsync_validates_base64_payload() + { + var sm = CreateManagerWithStream("ORDERS", "orders.>"); + await AppendAsync(sm, "orders.1", "hello"); + + // Take a snapshot, then restore it using the async path. + var snapResp = await StreamApiHandlers.HandleSnapshotAsync( + "$JS.API.STREAM.SNAPSHOT.ORDERS", + sm, + CancellationToken.None); + snapResp.Snapshot.ShouldNotBeNull(); + var base64Payload = Encoding.UTF8.GetBytes(snapResp.Snapshot!.Payload); + + var response = await StreamApiHandlers.HandleRestoreAsync( + "$JS.API.STREAM.RESTORE.ORDERS", + base64Payload, + sm, + CancellationToken.None); + + response.Error.ShouldBeNull(); + response.Success.ShouldBeTrue(); + } + + /// + /// HandleRestoreAsync returns 400 when given an empty payload array. + /// + [Fact] + public async Task HandleRestoreAsync_returns_error_for_empty_payload() + { + var sm = CreateManagerWithStream("ORDERS", "orders.>"); + + var response = await StreamApiHandlers.HandleRestoreAsync( + "$JS.API.STREAM.RESTORE.ORDERS", + [], + sm, + CancellationToken.None); + + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(400); + } + + // --------------------------------------------------------------- + // Round-trip + // --------------------------------------------------------------- + + /// + /// Go ref: jsStreamSnapshotT / jsStreamRestoreT — full snapshot-then-restore round-trip: + /// messages written before snapshot are recoverable after restore. + /// + [Fact] + public async Task Snapshot_round_trip_create_and_restore() + { + var sm = CreateManagerWithStream("LOGS", "logs.>"); + await AppendAsync(sm, "logs.a", "alpha"); + await AppendAsync(sm, "logs.b", "beta"); + await AppendAsync(sm, "logs.c", "gamma"); + + var stateBefore = await sm.GetStateAsync("LOGS", default); + stateBefore.Messages.ShouldBe(3UL); + + // Snapshot via async handler. + var snapResp = await StreamApiHandlers.HandleSnapshotAsync( + "$JS.API.STREAM.SNAPSHOT.LOGS", + sm, + CancellationToken.None); + snapResp.Error.ShouldBeNull(); + var base64Payload = Encoding.UTF8.GetBytes(snapResp.Snapshot!.Payload); + + // Restore via async handler. + var restoreResp = await StreamApiHandlers.HandleRestoreAsync( + "$JS.API.STREAM.RESTORE.LOGS", + base64Payload, + sm, + CancellationToken.None); + restoreResp.Error.ShouldBeNull(); + restoreResp.Success.ShouldBeTrue(); + + // State should still be consistent (restore does not clear — it re-applies). + var stateAfter = await sm.GetStateAsync("LOGS", default); + stateAfter.Messages.ShouldBeGreaterThanOrEqualTo(3UL); + } + + // --------------------------------------------------------------- + // Subject extraction + // --------------------------------------------------------------- + + /// + /// Go ref: jsStreamSnapshotT — the stream name is correctly extracted from the API subject. + /// + [Fact] + public async Task HandleSnapshot_extracts_stream_name_from_subject() + { + var sm = CreateManagerWithStream("MY_STREAM", "mystream.>"); + + var response = await StreamApiHandlers.HandleSnapshotAsync( + "$JS.API.STREAM.SNAPSHOT.MY_STREAM", + sm, + CancellationToken.None); + + response.Error.ShouldBeNull(); + response.Snapshot.ShouldNotBeNull(); + response.Snapshot!.StreamName.ShouldBe("MY_STREAM"); + } +}