feat: add jetstream consumer api lifecycle
This commit is contained in:
@@ -0,0 +1,72 @@
|
|||||||
|
using System.Text.Json;
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream.Api.Handlers;
|
||||||
|
|
||||||
|
public static class ConsumerApiHandlers
|
||||||
|
{
|
||||||
|
private const string CreatePrefix = "$JS.API.CONSUMER.CREATE.";
|
||||||
|
private const string InfoPrefix = "$JS.API.CONSUMER.INFO.";
|
||||||
|
|
||||||
|
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager)
|
||||||
|
{
|
||||||
|
var parsed = ParseSubject(subject, CreatePrefix);
|
||||||
|
if (parsed == null)
|
||||||
|
return JetStreamApiResponse.NotFound(subject);
|
||||||
|
|
||||||
|
var (stream, durableName) = parsed.Value;
|
||||||
|
var config = ParseConfig(payload);
|
||||||
|
if (string.IsNullOrWhiteSpace(config.DurableName))
|
||||||
|
config.DurableName = durableName;
|
||||||
|
|
||||||
|
return consumerManager.CreateOrUpdate(stream, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JetStreamApiResponse HandleInfo(string subject, ConsumerManager consumerManager)
|
||||||
|
{
|
||||||
|
var parsed = ParseSubject(subject, InfoPrefix);
|
||||||
|
if (parsed == null)
|
||||||
|
return JetStreamApiResponse.NotFound(subject);
|
||||||
|
|
||||||
|
var (stream, durableName) = parsed.Value;
|
||||||
|
return consumerManager.GetInfo(stream, durableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static (string Stream, string Durable)? ParseSubject(string subject, string prefix)
|
||||||
|
{
|
||||||
|
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
var remainder = subject[prefix.Length..];
|
||||||
|
var split = remainder.Split('.', 2, StringSplitOptions.RemoveEmptyEntries);
|
||||||
|
if (split.Length != 2)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return (split[0], split[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ConsumerConfig ParseConfig(ReadOnlySpan<byte> payload)
|
||||||
|
{
|
||||||
|
if (payload.IsEmpty)
|
||||||
|
return new ConsumerConfig();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var doc = JsonDocument.Parse(payload.ToArray());
|
||||||
|
var root = doc.RootElement;
|
||||||
|
var config = new ConsumerConfig();
|
||||||
|
|
||||||
|
if (root.TryGetProperty("durable_name", out var durableEl))
|
||||||
|
config.DurableName = durableEl.GetString() ?? string.Empty;
|
||||||
|
|
||||||
|
if (root.TryGetProperty("filter_subject", out var filterEl))
|
||||||
|
config.FilterSubject = filterEl.GetString();
|
||||||
|
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
catch (JsonException)
|
||||||
|
{
|
||||||
|
return new ConsumerConfig();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,15 +5,17 @@ namespace NATS.Server.JetStream.Api;
|
|||||||
public sealed class JetStreamApiRouter
|
public sealed class JetStreamApiRouter
|
||||||
{
|
{
|
||||||
private readonly StreamManager _streamManager;
|
private readonly StreamManager _streamManager;
|
||||||
|
private readonly ConsumerManager _consumerManager;
|
||||||
|
|
||||||
public JetStreamApiRouter()
|
public JetStreamApiRouter()
|
||||||
: this(new StreamManager())
|
: this(new StreamManager(), new ConsumerManager())
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
public JetStreamApiRouter(StreamManager streamManager)
|
public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager)
|
||||||
{
|
{
|
||||||
_streamManager = streamManager;
|
_streamManager = streamManager;
|
||||||
|
_consumerManager = consumerManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
|
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
|
||||||
@@ -24,6 +26,12 @@ public sealed class JetStreamApiRouter
|
|||||||
if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
|
if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
|
||||||
return StreamApiHandlers.HandleInfo(subject, _streamManager);
|
return StreamApiHandlers.HandleInfo(subject, _streamManager);
|
||||||
|
|
||||||
|
if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal))
|
||||||
|
return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager);
|
||||||
|
|
||||||
|
if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal))
|
||||||
|
return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);
|
||||||
|
|
||||||
return JetStreamApiResponse.NotFound(subject);
|
return JetStreamApiResponse.NotFound(subject);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
57
src/NATS.Server/JetStream/ConsumerManager.cs
Normal file
57
src/NATS.Server/JetStream/ConsumerManager.cs
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using NATS.Server.JetStream.Api;
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream;
|
||||||
|
|
||||||
|
public sealed class ConsumerManager
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
||||||
|
|
||||||
|
public int ConsumerCount => _consumers.Count;
|
||||||
|
|
||||||
|
public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(config.DurableName))
|
||||||
|
return JetStreamApiResponse.ErrorResponse(400, "durable name required");
|
||||||
|
|
||||||
|
var key = (stream, config.DurableName);
|
||||||
|
var handle = _consumers.AddOrUpdate(key,
|
||||||
|
_ => new ConsumerHandle(stream, config),
|
||||||
|
(_, existing) => existing with { Config = config });
|
||||||
|
|
||||||
|
return new JetStreamApiResponse
|
||||||
|
{
|
||||||
|
ConsumerInfo = new JetStreamConsumerInfo
|
||||||
|
{
|
||||||
|
Config = handle.Config,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public JetStreamApiResponse GetInfo(string stream, string durableName)
|
||||||
|
{
|
||||||
|
if (_consumers.TryGetValue((stream, durableName), out var handle))
|
||||||
|
{
|
||||||
|
return new JetStreamApiResponse
|
||||||
|
{
|
||||||
|
ConsumerInfo = new JetStreamConsumerInfo
|
||||||
|
{
|
||||||
|
Config = handle.Config,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return JetStreamApiResponse.NotFound($"$JS.API.CONSUMER.INFO.{stream}.{durableName}");
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryGet(string stream, string durableName, out ConsumerHandle handle)
|
||||||
|
=> _consumers.TryGetValue((stream, durableName), out handle!);
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
||||||
|
{
|
||||||
|
public ulong NextSequence { get; set; } = 1;
|
||||||
|
public Queue<StoredMessage> Pending { get; } = new();
|
||||||
|
}
|
||||||
@@ -51,6 +51,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
private readonly JetStreamService? _jetStreamService;
|
private readonly JetStreamService? _jetStreamService;
|
||||||
private readonly JetStreamApiRouter? _jetStreamApiRouter;
|
private readonly JetStreamApiRouter? _jetStreamApiRouter;
|
||||||
private readonly StreamManager? _jetStreamStreamManager;
|
private readonly StreamManager? _jetStreamStreamManager;
|
||||||
|
private readonly ConsumerManager? _jetStreamConsumerManager;
|
||||||
private readonly JetStreamPublisher? _jetStreamPublisher;
|
private readonly JetStreamPublisher? _jetStreamPublisher;
|
||||||
private Socket? _listener;
|
private Socket? _listener;
|
||||||
private Socket? _wsListener;
|
private Socket? _wsListener;
|
||||||
@@ -341,8 +342,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
if (options.JetStream != null)
|
if (options.JetStream != null)
|
||||||
{
|
{
|
||||||
_jetStreamStreamManager = new StreamManager();
|
_jetStreamStreamManager = new StreamManager();
|
||||||
|
_jetStreamConsumerManager = new ConsumerManager();
|
||||||
_jetStreamService = new JetStreamService(options.JetStream);
|
_jetStreamService = new JetStreamService(options.JetStream);
|
||||||
_jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager);
|
_jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager, _jetStreamConsumerManager);
|
||||||
_jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager);
|
_jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,16 +9,19 @@ namespace NATS.Server.Tests;
|
|||||||
internal sealed class JetStreamApiFixture : IAsyncDisposable
|
internal sealed class JetStreamApiFixture : IAsyncDisposable
|
||||||
{
|
{
|
||||||
private static readonly StreamManager SharedStreamManager = new();
|
private static readonly StreamManager SharedStreamManager = new();
|
||||||
private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager);
|
private static readonly ConsumerManager SharedConsumerManager = new();
|
||||||
|
private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager, SharedConsumerManager);
|
||||||
|
|
||||||
private readonly StreamManager _streamManager;
|
private readonly StreamManager _streamManager;
|
||||||
|
private readonly ConsumerManager _consumerManager;
|
||||||
private readonly JetStreamApiRouter _router;
|
private readonly JetStreamApiRouter _router;
|
||||||
private readonly JetStreamPublisher _publisher;
|
private readonly JetStreamPublisher _publisher;
|
||||||
|
|
||||||
private JetStreamApiFixture()
|
private JetStreamApiFixture()
|
||||||
{
|
{
|
||||||
_streamManager = new StreamManager();
|
_streamManager = new StreamManager();
|
||||||
_router = new JetStreamApiRouter(_streamManager);
|
_consumerManager = new ConsumerManager();
|
||||||
|
_router = new JetStreamApiRouter(_streamManager, _consumerManager);
|
||||||
_publisher = new JetStreamPublisher(_streamManager);
|
_publisher = new JetStreamPublisher(_streamManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,5 +59,17 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|||||||
return _streamManager.GetStateAsync(streamName, default).AsTask();
|
return _streamManager.GetStateAsync(streamName, default).AsTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task<JetStreamApiResponse> CreateConsumerAsync(string stream, string durableName, string filterSubject)
|
||||||
|
{
|
||||||
|
var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}""}}";
|
||||||
|
return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
|
||||||
|
{
|
||||||
|
var response = await RequestLocalAsync($"$JS.API.CONSUMER.INFO.{stream}.{durableName}", "{}");
|
||||||
|
return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found.");
|
||||||
|
}
|
||||||
|
|
||||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|||||||
16
tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs
Normal file
16
tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class JetStreamConsumerApiTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Create_consumer_and_fetch_info_roundtrip()
|
||||||
|
{
|
||||||
|
await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
|
||||||
|
|
||||||
|
var create = await fixture.CreateConsumerAsync("ORDERS", "DUR", "orders.created");
|
||||||
|
create.Error.ShouldBeNull();
|
||||||
|
|
||||||
|
var info = await fixture.GetConsumerInfoAsync("ORDERS", "DUR");
|
||||||
|
info.Config.DurableName.ShouldBe("DUR");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user