diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamEvents.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamEvents.cs new file mode 100644 index 0000000..7b7f10b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamEvents.cs @@ -0,0 +1,86 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal bool PublishAdvisory(Account? acc, string subject, object advisory) => + PublishAdvisory(acc, subject, advisory, SendInternalAccountMsg); + + internal bool PublishAdvisory( + Account? acc, + string subject, + object advisory, + Func sendInternalAccountMessage, + Func? hasGatewayInterest = null) + { + ArgumentNullException.ThrowIfNull(sendInternalAccountMessage); + + var account = acc ?? SystemAccount(); + if (account is null) + { + return false; + } + + var sublist = account.Sublist; + var gatewayInterestCheck = hasGatewayInterest ?? HasGatewayInterest; + var hasLocalInterest = sublist != null && sublist.HasInterest(subject); + if (!hasLocalInterest && !gatewayInterestCheck(account, subject)) + { + return false; + } + + byte[] payload; + try + { + payload = JsonSerializer.SerializeToUtf8Bytes(advisory); + } + catch (Exception ex) + { + Warnf("Advisory could not be serialized for account {0}: {1}", account.Name, ex); + return false; + } + + Exception? err; + try + { + err = sendInternalAccountMessage(account, subject, payload); + } + catch (Exception ex) + { + err = ex; + } + + if (err != null) + { + Warnf("Advisory could not be sent for account {0}: {1}", account.Name, err); + return false; + } + + return true; + } + + internal Exception? SendInternalAccountMsg(Account account, string subject, byte[] message) + { + try + { + var sendQueue = account.GetSendQueue() ?? NewSendQueue(account); + SendQueue.Send(sendQueue, subject, string.Empty, [], message); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal bool HasGatewayInterest(Account account, string subject) + { + _ = account; + _ = subject; + return false; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/NatsServerJetStreamEventsTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/NatsServerJetStreamEventsTests.cs new file mode 100644 index 0000000..68294cc --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/NatsServerJetStreamEventsTests.cs @@ -0,0 +1,146 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests; + +public sealed class NatsServerJetStreamEventsTests +{ + [Fact] + public void PublishAdvisory_NullAccount_UsesSystemAccount() + { + var server = NewServer(new ServerOptions()); + server.SetDefaultSystemAccount().ShouldBeNull(); + var systemAccount = server.SystemAccount(); + systemAccount.ShouldNotBeNull(); + + const string subject = "$JS.EVENT.ADVISORY.API"; + AddInterest(systemAccount!, subject); + + Account? capturedAccount = null; + + var result = server.PublishAdvisory( + acc: null, + subject, + new { Event = "ok" }, + sendInternalAccountMessage: (account, _, _) => + { + capturedAccount = account; + return null; + }); + + result.ShouldBeTrue(); + capturedAccount.ShouldBe(systemAccount); + } + + [Fact] + public void PublishAdvisory_NoInterest_ReturnsFalseWithoutSend() + { + var server = NewServer(new ServerOptions { NoSystemAccount = true }); + var account = new Account { Name = "A" }; + var sendCalls = 0; + + var result = server.PublishAdvisory( + account, + "$JS.EVENT.ADVISORY.API", + new { Event = "no-interest" }, + sendInternalAccountMessage: (_, _, _) => + { + sendCalls++; + return null; + }, + hasGatewayInterest: (_, _) => false); + + result.ShouldBeFalse(); + sendCalls.ShouldBe(0); + } + + [Fact] + public void PublishAdvisory_MarshalFailure_ReturnsFalse() + { + var server = NewServer(new ServerOptions { NoSystemAccount = true }); + var account = new Account { Name = "A" }; + AddInterest(account, "$JS.EVENT.ADVISORY.API"); + + var sendCalls = 0; + var advisory = new CyclicAdvisory(); + advisory.Self = advisory; + + var result = server.PublishAdvisory( + account, + "$JS.EVENT.ADVISORY.API", + advisory, + sendInternalAccountMessage: (_, _, _) => + { + sendCalls++; + return null; + }); + + result.ShouldBeFalse(); + sendCalls.ShouldBe(0); + } + + [Fact] + public void PublishAdvisory_SendFailure_ReturnsFalse() + { + var server = NewServer(new ServerOptions { NoSystemAccount = true }); + var account = new Account { Name = "A" }; + AddInterest(account, "$JS.EVENT.ADVISORY.API"); + + var result = server.PublishAdvisory( + account, + "$JS.EVENT.ADVISORY.API", + new { Event = "send-error" }, + sendInternalAccountMessage: (_, _, _) => new InvalidOperationException("send failed")); + + result.ShouldBeFalse(); + } + + [Fact] + public void PublishAdvisory_SendSucceeds_ReturnsTrue() + { + var server = NewServer(new ServerOptions { NoSystemAccount = true }); + var account = new Account { Name = "A" }; + AddInterest(account, "$JS.EVENT.ADVISORY.API"); + + byte[]? payload = null; + + var result = server.PublishAdvisory( + account, + "$JS.EVENT.ADVISORY.API", + new { Event = "sent" }, + sendInternalAccountMessage: (_, _, bytes) => + { + payload = bytes; + return null; + }); + + result.ShouldBeTrue(); + payload.ShouldNotBeNull(); + Encoding.UTF8.GetString(payload!).ShouldContain("\"Event\":\"sent\""); + } + + private static NatsServer NewServer(ServerOptions options) + { + var (server, err) = NatsServer.NewServer(options); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + return server!; + } + + private static void AddInterest(Account account, string subject) + { + account.Sublist ??= SubscriptionIndex.NewSublistWithCache(); + var err = account.Sublist.Insert(new Subscription + { + Subject = Encoding.ASCII.GetBytes(subject), + }); + err.ShouldBeNull(); + } + + private sealed class CyclicAdvisory + { + public CyclicAdvisory? Self { get; set; } + } +} diff --git a/porting.db b/porting.db index bca7a4b..e798f92 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index ca710ca..421bc0b 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 17:44:45 UTC +Generated: 2026-02-28 17:51:02 UTC ## Modules (12 total) @@ -13,10 +13,10 @@ Generated: 2026-02-28 17:44:45 UTC | Status | Count | |--------|-------| | complete | 14 | -| deferred | 1969 | +| deferred | 1968 | | n_a | 24 | | stub | 1 | -| verified | 1665 | +| verified | 1666 | ## Unit Tests (3257 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 17:44:45 UTC ## Overall Progress -**3016/6942 items complete (43.4%)** +**3017/6942 items complete (43.5%)**