feat: route publishes to jetstream with puback
This commit is contained in:
30
src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs
Normal file
30
src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
namespace NATS.Server.JetStream.Publish;
|
||||||
|
|
||||||
|
public sealed class JetStreamPublisher
|
||||||
|
{
|
||||||
|
private readonly StreamManager _streamManager;
|
||||||
|
|
||||||
|
public JetStreamPublisher(StreamManager streamManager)
|
||||||
|
{
|
||||||
|
_streamManager = streamManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
|
||||||
|
{
|
||||||
|
var stream = _streamManager.FindBySubject(subject);
|
||||||
|
if (stream == null)
|
||||||
|
{
|
||||||
|
ack = new PubAck();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
|
||||||
|
ack = new PubAck
|
||||||
|
{
|
||||||
|
Stream = stream.Config.Name,
|
||||||
|
Seq = seq,
|
||||||
|
};
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
8
src/NATS.Server/JetStream/Publish/PubAck.cs
Normal file
8
src/NATS.Server/JetStream/Publish/PubAck.cs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
namespace NATS.Server.JetStream.Publish;
|
||||||
|
|
||||||
|
public sealed class PubAck
|
||||||
|
{
|
||||||
|
public string Stream { get; init; } = string.Empty;
|
||||||
|
public ulong Seq { get; init; }
|
||||||
|
public int? ErrorCode { get; init; }
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ using System.Text.Json;
|
|||||||
using System.Threading.Channels;
|
using System.Threading.Channels;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using NATS.Server.Auth;
|
using NATS.Server.Auth;
|
||||||
|
using NATS.Server.JetStream.Publish;
|
||||||
using NATS.Server.Protocol;
|
using NATS.Server.Protocol;
|
||||||
using NATS.Server.Subscriptions;
|
using NATS.Server.Subscriptions;
|
||||||
using NATS.Server.Tls;
|
using NATS.Server.Tls;
|
||||||
@@ -103,6 +104,7 @@ public sealed class NatsClient : IDisposable
|
|||||||
public bool InfoAlreadySent { get; set; }
|
public bool InfoAlreadySent { get; set; }
|
||||||
|
|
||||||
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
|
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
|
||||||
|
public PubAck? LastJetStreamPubAck { get; private set; }
|
||||||
|
|
||||||
public NatsClient(ulong id, Stream stream, Socket socket, NatsOptions options, ServerInfo serverInfo,
|
public NatsClient(ulong id, Stream stream, Socket socket, NatsOptions options, ServerInfo serverInfo,
|
||||||
AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats,
|
AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats,
|
||||||
@@ -593,6 +595,11 @@ public sealed class NatsClient : IDisposable
|
|||||||
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
|
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void RecordJetStreamPubAck(PubAck ack)
|
||||||
|
{
|
||||||
|
LastJetStreamPubAck = ack;
|
||||||
|
}
|
||||||
|
|
||||||
private void SendInfo()
|
private void SendInfo()
|
||||||
{
|
{
|
||||||
// Use the cached INFO bytes from the server when there is no per-connection
|
// Use the cached INFO bytes from the server when there is no per-connection
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ using NATS.Server.Configuration;
|
|||||||
using NATS.Server.Gateways;
|
using NATS.Server.Gateways;
|
||||||
using NATS.Server.JetStream;
|
using NATS.Server.JetStream;
|
||||||
using NATS.Server.JetStream.Api;
|
using NATS.Server.JetStream.Api;
|
||||||
|
using NATS.Server.JetStream.Publish;
|
||||||
using NATS.Server.LeafNodes;
|
using NATS.Server.LeafNodes;
|
||||||
using NATS.Server.Monitoring;
|
using NATS.Server.Monitoring;
|
||||||
using NATS.Server.Protocol;
|
using NATS.Server.Protocol;
|
||||||
@@ -49,6 +50,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
private readonly LeafNodeManager? _leafNodeManager;
|
private readonly LeafNodeManager? _leafNodeManager;
|
||||||
private readonly JetStreamService? _jetStreamService;
|
private readonly JetStreamService? _jetStreamService;
|
||||||
private readonly JetStreamApiRouter? _jetStreamApiRouter;
|
private readonly JetStreamApiRouter? _jetStreamApiRouter;
|
||||||
|
private readonly StreamManager? _jetStreamStreamManager;
|
||||||
|
private readonly JetStreamPublisher? _jetStreamPublisher;
|
||||||
private Socket? _listener;
|
private Socket? _listener;
|
||||||
private Socket? _wsListener;
|
private Socket? _wsListener;
|
||||||
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
@@ -94,6 +97,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
|
|
||||||
public IEnumerable<Auth.Account> GetAccounts() => _accounts.Values;
|
public IEnumerable<Auth.Account> GetAccounts() => _accounts.Values;
|
||||||
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
|
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
|
||||||
|
public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
|
||||||
|
{
|
||||||
|
if (_jetStreamPublisher != null)
|
||||||
|
return _jetStreamPublisher.TryCapture(subject, payload, out ack);
|
||||||
|
|
||||||
|
ack = new PubAck();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
||||||
|
|
||||||
@@ -329,8 +340,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
|
|
||||||
if (options.JetStream != null)
|
if (options.JetStream != null)
|
||||||
{
|
{
|
||||||
|
_jetStreamStreamManager = new StreamManager();
|
||||||
_jetStreamService = new JetStreamService(options.JetStream);
|
_jetStreamService = new JetStreamService(options.JetStream);
|
||||||
_jetStreamApiRouter = new JetStreamApiRouter();
|
_jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager);
|
||||||
|
_jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (options.HasTls)
|
if (options.HasTls)
|
||||||
@@ -746,6 +759,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
||||||
ReadOnlyMemory<byte> payload, NatsClient sender)
|
ReadOnlyMemory<byte> payload, NatsClient sender)
|
||||||
{
|
{
|
||||||
|
if (TryCaptureJetStreamPublish(subject, payload, out var pubAck))
|
||||||
|
sender.RecordJetStreamPubAck(pubAck);
|
||||||
|
|
||||||
// Apply subject transforms
|
// Apply subject transforms
|
||||||
if (_subjectTransforms.Length > 0)
|
if (_subjectTransforms.Length > 0)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,14 +1,51 @@
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
|
using NATS.Server.JetStream;
|
||||||
using NATS.Server.JetStream.Api;
|
using NATS.Server.JetStream.Api;
|
||||||
|
using NATS.Server.JetStream.Publish;
|
||||||
|
|
||||||
namespace NATS.Server.Tests;
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
internal static class JetStreamApiFixture
|
internal sealed class JetStreamApiFixture : IAsyncDisposable
|
||||||
{
|
{
|
||||||
private static readonly JetStreamApiRouter Router = new();
|
private static readonly StreamManager SharedStreamManager = new();
|
||||||
|
private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager);
|
||||||
|
|
||||||
|
private readonly StreamManager _streamManager;
|
||||||
|
private readonly JetStreamApiRouter _router;
|
||||||
|
private readonly JetStreamPublisher _publisher;
|
||||||
|
|
||||||
|
private JetStreamApiFixture()
|
||||||
|
{
|
||||||
|
_streamManager = new StreamManager();
|
||||||
|
_router = new JetStreamApiRouter(_streamManager);
|
||||||
|
_publisher = new JetStreamPublisher(_streamManager);
|
||||||
|
}
|
||||||
|
|
||||||
public static Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
public static Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
||||||
{
|
{
|
||||||
return Task.FromResult(Router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
return Task.FromResult(SharedRouter.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static async Task<JetStreamApiFixture> StartWithStreamAsync(string streamName, string subject, int maxMsgs = 0)
|
||||||
|
{
|
||||||
|
var fixture = new JetStreamApiFixture();
|
||||||
|
var payload = $"{{\"name\":\"{streamName}\",\"subjects\":[\"{subject}\"],\"max_msgs\":{maxMsgs}}}";
|
||||||
|
_ = await fixture.RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
|
||||||
|
return fixture;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload)
|
||||||
|
{
|
||||||
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), out var ack))
|
||||||
|
return Task.FromResult(ack);
|
||||||
|
|
||||||
|
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
|
||||||
|
{
|
||||||
|
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|||||||
14
tests/NATS.Server.Tests/JetStreamPublishTests.cs
Normal file
14
tests/NATS.Server.Tests/JetStreamPublishTests.cs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class JetStreamPublishTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Publish_to_stream_subject_returns_puback()
|
||||||
|
{
|
||||||
|
await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
|
||||||
|
var ack = await fixture.PublishAndGetAckAsync("orders.created", "{\"id\":1}");
|
||||||
|
|
||||||
|
ack.Stream.ShouldBe("ORDERS");
|
||||||
|
ack.Seq.ShouldBe((ulong)1);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user