feat: add jetstream api router and error envelope

This commit is contained in:
Joseph Doherty
2026-02-23 05:58:34 -05:00
parent 7fe15d7ce1
commit 6d23e89fe8
6 changed files with 90 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.JetStream.Api;
public sealed class JetStreamApiError
{
public int Code { get; init; }
public string Description { get; init; } = string.Empty;
}

View File

@@ -0,0 +1,46 @@
namespace NATS.Server.JetStream.Api;
public sealed class JetStreamApiResponse
{
public JetStreamApiError? Error { get; init; }
public JetStreamStreamInfo? StreamInfo { get; init; }
public JetStreamConsumerInfo? ConsumerInfo { get; init; }
public static JetStreamApiResponse NotFound(string subject) => new()
{
Error = new JetStreamApiError
{
Code = 404,
Description = $"unknown api subject '{subject}'",
},
};
public static JetStreamApiResponse Ok() => new();
}
public sealed class JetStreamStreamInfo
{
public required JetStreamStreamConfig Config { get; init; }
public required JetStreamStreamState State { get; init; }
}
public sealed class JetStreamConsumerInfo
{
public required JetStreamConsumerConfig Config { get; init; }
}
public sealed class JetStreamStreamConfig
{
public string Name { get; init; } = string.Empty;
}
public sealed class JetStreamStreamState
{
public ulong Messages { get; init; }
public ulong FirstSeq { get; init; }
}
public sealed class JetStreamConsumerConfig
{
public string DurableName { get; init; } = string.Empty;
}

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.JetStream.Api;
public sealed class JetStreamApiRouter
{
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
=> JetStreamApiResponse.NotFound(subject);
}

View File

@@ -11,6 +11,7 @@ using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.Gateways;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.LeafNodes;
using NATS.Server.Monitoring;
using NATS.Server.Protocol;
@@ -47,6 +48,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly GatewayManager? _gatewayManager;
private readonly LeafNodeManager? _leafNodeManager;
private readonly JetStreamService? _jetStreamService;
private readonly JetStreamApiRouter? _jetStreamApiRouter;
private Socket? _listener;
private Socket? _wsListener;
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -84,6 +86,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
public string? ClusterListen => _routeManager?.ListenEndpoint;
public JetStreamApiRouter? JetStreamApiRouter => _jetStreamApiRouter;
public Action? ReOpenLogFile { get; set; }
public IEnumerable<NatsClient> GetClients() => _clients.Values;
@@ -327,6 +330,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (options.JetStream != null)
{
_jetStreamService = new JetStreamService(options.JetStream);
_jetStreamApiRouter = new JetStreamApiRouter();
}
if (options.HasTls)

View File

@@ -0,0 +1,14 @@
using System.Text;
using NATS.Server.JetStream.Api;
namespace NATS.Server.Tests;
internal static class JetStreamApiFixture
{
private static readonly JetStreamApiRouter Router = new();
public static Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
{
return Task.FromResult(Router.Route(subject, Encoding.UTF8.GetBytes(payload)));
}
}

View File

@@ -0,0 +1,12 @@
namespace NATS.Server.Tests;
public class JetStreamApiRouterTests
{
[Fact]
public async Task Unknown_js_api_subject_returns_structured_error()
{
var response = await JetStreamApiFixture.RequestAsync("$JS.API.BAD", "{}");
response.Error.ShouldNotBeNull();
response.Error!.Code.ShouldBe(404);
}
}