From 5f530de2e42fcd78e59656eb7e92ca3a813867ba Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:02:07 -0500 Subject: [PATCH] feat: add jetstream stream lifecycle api --- .../Api/Handlers/StreamApiHandlers.cs | 91 +++++++++++++++++++ .../JetStream/Api/JetStreamApiResponse.cs | 33 +++---- .../JetStream/Api/JetStreamApiRouter.cs | 24 ++++- src/NATS.Server/JetStream/StreamManager.cs | 78 ++++++++++++++++ .../JetStreamStreamApiTests.cs | 15 +++ 5 files changed, 221 insertions(+), 20 deletions(-) create mode 100644 src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs create mode 100644 src/NATS.Server/JetStream/StreamManager.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStreamApiTests.cs diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs new file mode 100644 index 0000000..b269ab9 --- /dev/null +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -0,0 +1,91 @@ +using System.Text.Json; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.JetStream.Api.Handlers; + +public static class StreamApiHandlers +{ + private const string CreatePrefix = "$JS.API.STREAM.CREATE."; + private const string InfoPrefix = "$JS.API.STREAM.INFO."; + + public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan payload, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, CreatePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var config = ParseConfig(payload); + if (string.IsNullOrWhiteSpace(config.Name)) + config.Name = streamName; + + if (config.Subjects.Count == 0) + config.Subjects.Add(streamName.ToLowerInvariant() + ".>"); + + return streamManager.CreateOrUpdate(config); + } + + public static JetStreamApiResponse HandleInfo(string subject, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, InfoPrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + return streamManager.GetInfo(streamName); + } + + private static string? ExtractTrailingToken(string subject, string prefix) + { + if (!subject.StartsWith(prefix, StringComparison.Ordinal)) + return null; + + var token = subject[prefix.Length..].Trim(); + return token.Length == 0 ? null : token; + } + + private static StreamConfig ParseConfig(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return new StreamConfig(); + + try + { + using var doc = JsonDocument.Parse(payload.ToArray()); + var root = doc.RootElement; + var config = new StreamConfig(); + + if (root.TryGetProperty("name", out var nameEl)) + config.Name = nameEl.GetString() ?? string.Empty; + + if (root.TryGetProperty("subjects", out var subjectsEl)) + { + if (subjectsEl.ValueKind == JsonValueKind.Array) + { + foreach (var item in subjectsEl.EnumerateArray()) + { + var value = item.GetString(); + if (!string.IsNullOrWhiteSpace(value)) + config.Subjects.Add(value); + } + } + else if (subjectsEl.ValueKind == JsonValueKind.String) + { + var value = subjectsEl.GetString(); + if (!string.IsNullOrWhiteSpace(value)) + config.Subjects.Add(value); + } + } + + if (root.TryGetProperty("max_msgs", out var maxMsgsEl) && maxMsgsEl.TryGetInt32(out var maxMsgs)) + config.MaxMsgs = maxMsgs; + + if (root.TryGetProperty("replicas", out var replicasEl) && replicasEl.TryGetInt32(out var replicas)) + config.Replicas = replicas; + + return config; + } + catch (JsonException) + { + return new StreamConfig(); + } + } +} diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs index aba69c7..ea070e6 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs @@ -1,3 +1,5 @@ +using NATS.Server.JetStream.Models; + namespace NATS.Server.JetStream.Api; public sealed class JetStreamApiResponse @@ -16,31 +18,24 @@ public sealed class JetStreamApiResponse }; public static JetStreamApiResponse Ok() => new(); + + public static JetStreamApiResponse ErrorResponse(int code, string description) => new() + { + Error = new JetStreamApiError + { + Code = code, + Description = description, + }, + }; } public sealed class JetStreamStreamInfo { - public required JetStreamStreamConfig Config { get; init; } - public required JetStreamStreamState State { get; init; } + public required StreamConfig Config { get; init; } + public required StreamState State { get; init; } } public sealed class JetStreamConsumerInfo { - public required JetStreamConsumerConfig Config { get; init; } -} - -public sealed class JetStreamStreamConfig -{ - public string Name { get; init; } = string.Empty; -} - -public sealed class JetStreamStreamState -{ - public ulong Messages { get; init; } - public ulong FirstSeq { get; init; } -} - -public sealed class JetStreamConsumerConfig -{ - public string DurableName { get; init; } = string.Empty; + public required ConsumerConfig Config { get; init; } } diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs index f5445a8..b277bab 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs @@ -1,7 +1,29 @@ +using NATS.Server.JetStream.Api.Handlers; + namespace NATS.Server.JetStream.Api; public sealed class JetStreamApiRouter { + private readonly StreamManager _streamManager; + + public JetStreamApiRouter() + : this(new StreamManager()) + { + } + + public JetStreamApiRouter(StreamManager streamManager) + { + _streamManager = streamManager; + } + public JetStreamApiResponse Route(string subject, ReadOnlySpan payload) - => JetStreamApiResponse.NotFound(subject); + { + if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal)) + return StreamApiHandlers.HandleCreate(subject, payload, _streamManager); + + if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal)) + return StreamApiHandlers.HandleInfo(subject, _streamManager); + + return JetStreamApiResponse.NotFound(subject); + } } diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs new file mode 100644 index 0000000..020ec6e --- /dev/null +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -0,0 +1,78 @@ +using System.Collections.Concurrent; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; +using NATS.Server.Subscriptions; + +namespace NATS.Server.JetStream; + +public sealed class StreamManager +{ + private readonly ConcurrentDictionary _streams = + new(StringComparer.Ordinal); + + public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); + + public JetStreamApiResponse CreateOrUpdate(StreamConfig config) + { + if (string.IsNullOrWhiteSpace(config.Name)) + return JetStreamApiResponse.ErrorResponse(400, "stream name required"); + + var normalized = NormalizeConfig(config); + var handle = _streams.AddOrUpdate( + normalized.Name, + _ => new StreamHandle(normalized, new MemStore()), + (_, existing) => existing with { Config = normalized }); + + return BuildStreamInfoResponse(handle); + } + + public JetStreamApiResponse GetInfo(string name) + { + if (_streams.TryGetValue(name, out var stream)) + return BuildStreamInfoResponse(stream); + + return JetStreamApiResponse.NotFound($"$JS.API.STREAM.INFO.{name}"); + } + + public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!); + + public StreamHandle? FindBySubject(string subject) + { + foreach (var stream in _streams.Values) + { + if (stream.Config.Subjects.Any(p => SubjectMatch.MatchLiteral(subject, p))) + return stream; + } + + return null; + } + + private static StreamConfig NormalizeConfig(StreamConfig config) + { + var copy = new StreamConfig + { + Name = config.Name, + Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects], + MaxMsgs = config.MaxMsgs, + Replicas = config.Replicas, + }; + + return copy; + } + + private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle) + { + var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult(); + return new JetStreamApiResponse + { + StreamInfo = new JetStreamStreamInfo + { + Config = handle.Config, + State = state, + }, + }; + } +} + +public sealed record StreamHandle(StreamConfig Config, IStreamStore Store); diff --git a/tests/NATS.Server.Tests/JetStreamStreamApiTests.cs b/tests/NATS.Server.Tests/JetStreamStreamApiTests.cs new file mode 100644 index 0000000..8c3a430 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStreamApiTests.cs @@ -0,0 +1,15 @@ +namespace NATS.Server.Tests; + +public class JetStreamStreamApiTests +{ + [Fact] + public async Task Stream_create_and_info_roundtrip() + { + var create = await JetStreamApiFixture.RequestAsync("$JS.API.STREAM.CREATE.ORDERS", "{\"name\":\"ORDERS\",\"subjects\":[\"orders.*\"]}"); + create.Error.ShouldBeNull(); + + var info = await JetStreamApiFixture.RequestAsync("$JS.API.STREAM.INFO.ORDERS", "{}"); + info.Error.ShouldBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("ORDERS"); + } +}