From 95691fa9e713a4d2999bed8d2d8105d82be9559d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:03:24 -0500 Subject: [PATCH] feat: route publishes to jetstream with puback --- .../JetStream/Publish/JetStreamPublisher.cs | 30 +++++++++++++ src/NATS.Server/JetStream/Publish/PubAck.cs | 8 ++++ src/NATS.Server/NatsClient.cs | 7 +++ src/NATS.Server/NatsServer.cs | 18 +++++++- .../NATS.Server.Tests/JetStreamApiFixture.cs | 43 +++++++++++++++++-- .../JetStreamPublishTests.cs | 14 ++++++ 6 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs create mode 100644 src/NATS.Server/JetStream/Publish/PubAck.cs create mode 100644 tests/NATS.Server.Tests/JetStreamPublishTests.cs diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs new file mode 100644 index 0000000..c1fe76e --- /dev/null +++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs @@ -0,0 +1,30 @@ +namespace NATS.Server.JetStream.Publish; + +public sealed class JetStreamPublisher +{ + private readonly StreamManager _streamManager; + + public JetStreamPublisher(StreamManager streamManager) + { + _streamManager = streamManager; + } + + public bool TryCapture(string subject, ReadOnlyMemory payload, out PubAck ack) + { + var stream = _streamManager.FindBySubject(subject); + if (stream == null) + { + ack = new PubAck(); + return false; + } + + var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult(); + ack = new PubAck + { + Stream = stream.Config.Name, + Seq = seq, + }; + + return true; + } +} diff --git a/src/NATS.Server/JetStream/Publish/PubAck.cs b/src/NATS.Server/JetStream/Publish/PubAck.cs new file mode 100644 index 0000000..ef7fbf8 --- /dev/null +++ b/src/NATS.Server/JetStream/Publish/PubAck.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.JetStream.Publish; + +public sealed class PubAck +{ + public string Stream { get; init; } = string.Empty; + public ulong Seq { get; init; } + public int? ErrorCode { get; init; } +} diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 0380e14..49017ec 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -8,6 +8,7 @@ using System.Text.Json; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Server.Auth; +using NATS.Server.JetStream.Publish; using NATS.Server.Protocol; using NATS.Server.Subscriptions; using NATS.Server.Tls; @@ -103,6 +104,7 @@ public sealed class NatsClient : IDisposable public bool InfoAlreadySent { get; set; } public IReadOnlyDictionary Subscriptions => _subs; + public PubAck? LastJetStreamPubAck { get; private set; } public NatsClient(ulong id, Stream stream, Socket socket, NatsOptions options, ServerInfo serverInfo, AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats, @@ -593,6 +595,11 @@ public sealed class NatsClient : IDisposable Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); } + public void RecordJetStreamPubAck(PubAck ack) + { + LastJetStreamPubAck = ack; + } + private void SendInfo() { // Use the cached INFO bytes from the server when there is no per-connection diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 537acb6..928aff5 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -12,6 +12,7 @@ using NATS.Server.Configuration; using NATS.Server.Gateways; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Publish; using NATS.Server.LeafNodes; using NATS.Server.Monitoring; using NATS.Server.Protocol; @@ -49,6 +50,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly LeafNodeManager? _leafNodeManager; private readonly JetStreamService? _jetStreamService; private readonly JetStreamApiRouter? _jetStreamApiRouter; + private readonly StreamManager? _jetStreamStreamManager; + private readonly JetStreamPublisher? _jetStreamPublisher; private Socket? _listener; private Socket? _wsListener; private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously); @@ -94,6 +97,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public IEnumerable GetAccounts() => _accounts.Values; public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject); + public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory payload, out PubAck ack) + { + if (_jetStreamPublisher != null) + return _jetStreamPublisher.TryCapture(subject, payload, out ack); + + ack = new PubAck(); + return false; + } public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -329,8 +340,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (options.JetStream != null) { + _jetStreamStreamManager = new StreamManager(); _jetStreamService = new JetStreamService(options.JetStream); - _jetStreamApiRouter = new JetStreamApiRouter(); + _jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager); + _jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager); } if (options.HasTls) @@ -746,6 +759,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender) { + if (TryCaptureJetStreamPublish(subject, payload, out var pubAck)) + sender.RecordJetStreamPubAck(pubAck); + // Apply subject transforms if (_subjectTransforms.Length > 0) { diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index 3547905..7da48ac 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -1,14 +1,51 @@ using System.Text; +using NATS.Server.JetStream; using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Publish; namespace NATS.Server.Tests; -internal static class JetStreamApiFixture +internal sealed class JetStreamApiFixture : IAsyncDisposable { - private static readonly JetStreamApiRouter Router = new(); + private static readonly StreamManager SharedStreamManager = new(); + private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager); + + private readonly StreamManager _streamManager; + private readonly JetStreamApiRouter _router; + private readonly JetStreamPublisher _publisher; + + private JetStreamApiFixture() + { + _streamManager = new StreamManager(); + _router = new JetStreamApiRouter(_streamManager); + _publisher = new JetStreamPublisher(_streamManager); + } public static Task RequestAsync(string subject, string payload) { - return Task.FromResult(Router.Route(subject, Encoding.UTF8.GetBytes(payload))); + return Task.FromResult(SharedRouter.Route(subject, Encoding.UTF8.GetBytes(payload))); } + + public static async Task StartWithStreamAsync(string streamName, string subject, int maxMsgs = 0) + { + var fixture = new JetStreamApiFixture(); + var payload = $"{{\"name\":\"{streamName}\",\"subjects\":[\"{subject}\"],\"max_msgs\":{maxMsgs}}}"; + _ = await fixture.RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload); + return fixture; + } + + public Task PublishAndGetAckAsync(string subject, string payload) + { + if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), out var ack)) + return Task.FromResult(ack); + + return Task.FromResult(new PubAck { ErrorCode = 404 }); + } + + public Task RequestLocalAsync(string subject, string payload) + { + return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); + } + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } diff --git a/tests/NATS.Server.Tests/JetStreamPublishTests.cs b/tests/NATS.Server.Tests/JetStreamPublishTests.cs new file mode 100644 index 0000000..63e62a1 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamPublishTests.cs @@ -0,0 +1,14 @@ +namespace NATS.Server.Tests; + +public class JetStreamPublishTests +{ + [Fact] + public async Task Publish_to_stream_subject_returns_puback() + { + await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + var ack = await fixture.PublishAndGetAckAsync("orders.created", "{\"id\":1}"); + + ack.Stream.ShouldBe("ORDERS"); + ack.Seq.ShouldBe((ulong)1); + } +}