diff --git a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs new file mode 100644 index 0000000..af5fd0e --- /dev/null +++ b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs @@ -0,0 +1,72 @@ +using System.Text.Json; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.JetStream.Api.Handlers; + +public static class ConsumerApiHandlers +{ + private const string CreatePrefix = "$JS.API.CONSUMER.CREATE."; + private const string InfoPrefix = "$JS.API.CONSUMER.INFO."; + + public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan payload, ConsumerManager consumerManager) + { + var parsed = ParseSubject(subject, CreatePrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, durableName) = parsed.Value; + var config = ParseConfig(payload); + if (string.IsNullOrWhiteSpace(config.DurableName)) + config.DurableName = durableName; + + return consumerManager.CreateOrUpdate(stream, config); + } + + public static JetStreamApiResponse HandleInfo(string subject, ConsumerManager consumerManager) + { + var parsed = ParseSubject(subject, InfoPrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, durableName) = parsed.Value; + return consumerManager.GetInfo(stream, durableName); + } + + private static (string Stream, string Durable)? ParseSubject(string subject, string prefix) + { + if (!subject.StartsWith(prefix, StringComparison.Ordinal)) + return null; + + var remainder = subject[prefix.Length..]; + var split = remainder.Split('.', 2, StringSplitOptions.RemoveEmptyEntries); + if (split.Length != 2) + return null; + + return (split[0], split[1]); + } + + private static ConsumerConfig ParseConfig(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return new ConsumerConfig(); + + try + { + using var doc = JsonDocument.Parse(payload.ToArray()); + var root = doc.RootElement; + var config = new ConsumerConfig(); + + if (root.TryGetProperty("durable_name", out var durableEl)) + config.DurableName = durableEl.GetString() ?? string.Empty; + + if (root.TryGetProperty("filter_subject", out var filterEl)) + config.FilterSubject = filterEl.GetString(); + + return config; + } + catch (JsonException) + { + return new ConsumerConfig(); + } + } +} diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs index b277bab..7b7b971 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs @@ -5,15 +5,17 @@ namespace NATS.Server.JetStream.Api; public sealed class JetStreamApiRouter { private readonly StreamManager _streamManager; + private readonly ConsumerManager _consumerManager; public JetStreamApiRouter() - : this(new StreamManager()) + : this(new StreamManager(), new ConsumerManager()) { } - public JetStreamApiRouter(StreamManager streamManager) + public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager) { _streamManager = streamManager; + _consumerManager = consumerManager; } public JetStreamApiResponse Route(string subject, ReadOnlySpan payload) @@ -24,6 +26,12 @@ public sealed class JetStreamApiRouter if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal)) return StreamApiHandlers.HandleInfo(subject, _streamManager); + if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal)) + return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager); + + if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal)) + return ConsumerApiHandlers.HandleInfo(subject, _consumerManager); + return JetStreamApiResponse.NotFound(subject); } } diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs new file mode 100644 index 0000000..816069b --- /dev/null +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -0,0 +1,57 @@ +using System.Collections.Concurrent; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.JetStream; + +public sealed class ConsumerManager +{ + private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); + + public int ConsumerCount => _consumers.Count; + + public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config) + { + if (string.IsNullOrWhiteSpace(config.DurableName)) + return JetStreamApiResponse.ErrorResponse(400, "durable name required"); + + var key = (stream, config.DurableName); + var handle = _consumers.AddOrUpdate(key, + _ => new ConsumerHandle(stream, config), + (_, existing) => existing with { Config = config }); + + return new JetStreamApiResponse + { + ConsumerInfo = new JetStreamConsumerInfo + { + Config = handle.Config, + }, + }; + } + + public JetStreamApiResponse GetInfo(string stream, string durableName) + { + if (_consumers.TryGetValue((stream, durableName), out var handle)) + { + return new JetStreamApiResponse + { + ConsumerInfo = new JetStreamConsumerInfo + { + Config = handle.Config, + }, + }; + } + + return JetStreamApiResponse.NotFound($"$JS.API.CONSUMER.INFO.{stream}.{durableName}"); + } + + public bool TryGet(string stream, string durableName, out ConsumerHandle handle) + => _consumers.TryGetValue((stream, durableName), out handle!); +} + +public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) +{ + public ulong NextSequence { get; set; } = 1; + public Queue Pending { get; } = new(); +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 928aff5..7feb887 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -51,6 +51,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly JetStreamService? _jetStreamService; private readonly JetStreamApiRouter? _jetStreamApiRouter; private readonly StreamManager? _jetStreamStreamManager; + private readonly ConsumerManager? _jetStreamConsumerManager; private readonly JetStreamPublisher? _jetStreamPublisher; private Socket? _listener; private Socket? _wsListener; @@ -341,8 +342,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (options.JetStream != null) { _jetStreamStreamManager = new StreamManager(); + _jetStreamConsumerManager = new ConsumerManager(); _jetStreamService = new JetStreamService(options.JetStream); - _jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager); + _jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager, _jetStreamConsumerManager); _jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager); } diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index 7ae5588..966fc5d 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -9,16 +9,19 @@ namespace NATS.Server.Tests; internal sealed class JetStreamApiFixture : IAsyncDisposable { private static readonly StreamManager SharedStreamManager = new(); - private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager); + private static readonly ConsumerManager SharedConsumerManager = new(); + private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager, SharedConsumerManager); private readonly StreamManager _streamManager; + private readonly ConsumerManager _consumerManager; private readonly JetStreamApiRouter _router; private readonly JetStreamPublisher _publisher; private JetStreamApiFixture() { _streamManager = new StreamManager(); - _router = new JetStreamApiRouter(_streamManager); + _consumerManager = new ConsumerManager(); + _router = new JetStreamApiRouter(_streamManager, _consumerManager); _publisher = new JetStreamPublisher(_streamManager); } @@ -56,5 +59,17 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return _streamManager.GetStateAsync(streamName, default).AsTask(); } + public Task CreateConsumerAsync(string stream, string durableName, string filterSubject) + { + var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}""}}"; + return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload); + } + + public async Task GetConsumerInfoAsync(string stream, string durableName) + { + var response = await RequestLocalAsync($"$JS.API.CONSUMER.INFO.{stream}.{durableName}", "{}"); + return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found."); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } diff --git a/tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs b/tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs new file mode 100644 index 0000000..cd71302 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs @@ -0,0 +1,16 @@ +namespace NATS.Server.Tests; + +public class JetStreamConsumerApiTests +{ + [Fact] + public async Task Create_consumer_and_fetch_info_roundtrip() + { + await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + + var create = await fixture.CreateConsumerAsync("ORDERS", "DUR", "orders.created"); + create.Error.ShouldBeNull(); + + var info = await fixture.GetConsumerInfoAsync("ORDERS", "DUR"); + info.Config.DurableName.ShouldBe("DUR"); + } +}