From 4b2875141c56a7a986c1310322d59cfb4a2aa386 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 12:51:02 -0500 Subject: [PATCH] feat(batch10): task2 implement jetstream advisory publish --- .../NatsServer.JetStreamEvents.cs | 86 +++++++++++ .../NatsServerJetStreamEventsTests.cs | 146 ++++++++++++++++++ porting.db | Bin 6541312 -> 6541312 bytes reports/current.md | 8 +- 4 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamEvents.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/NatsServerJetStreamEventsTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamEvents.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamEvents.cs new file mode 100644 index 0000000..7b7f10b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamEvents.cs @@ -0,0 +1,86 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal bool PublishAdvisory(Account? acc, string subject, object advisory) => + PublishAdvisory(acc, subject, advisory, SendInternalAccountMsg); + + internal bool PublishAdvisory( + Account? acc, + string subject, + object advisory, + Func sendInternalAccountMessage, + Func? hasGatewayInterest = null) + { + ArgumentNullException.ThrowIfNull(sendInternalAccountMessage); + + var account = acc ?? SystemAccount(); + if (account is null) + { + return false; + } + + var sublist = account.Sublist; + var gatewayInterestCheck = hasGatewayInterest ?? HasGatewayInterest; + var hasLocalInterest = sublist != null && sublist.HasInterest(subject); + if (!hasLocalInterest && !gatewayInterestCheck(account, subject)) + { + return false; + } + + byte[] payload; + try + { + payload = JsonSerializer.SerializeToUtf8Bytes(advisory); + } + catch (Exception ex) + { + Warnf("Advisory could not be serialized for account {0}: {1}", account.Name, ex); + return false; + } + + Exception? err; + try + { + err = sendInternalAccountMessage(account, subject, payload); + } + catch (Exception ex) + { + err = ex; + } + + if (err != null) + { + Warnf("Advisory could not be sent for account {0}: {1}", account.Name, err); + return false; + } + + return true; + } + + internal Exception? SendInternalAccountMsg(Account account, string subject, byte[] message) + { + try + { + var sendQueue = account.GetSendQueue() ?? NewSendQueue(account); + SendQueue.Send(sendQueue, subject, string.Empty, [], message); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal bool HasGatewayInterest(Account account, string subject) + { + _ = account; + _ = subject; + return false; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/NatsServerJetStreamEventsTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/NatsServerJetStreamEventsTests.cs new file mode 100644 index 0000000..68294cc --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/NatsServerJetStreamEventsTests.cs @@ -0,0 +1,146 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests; + +public sealed class NatsServerJetStreamEventsTests +{ + [Fact] + public void PublishAdvisory_NullAccount_UsesSystemAccount() + { + var server = NewServer(new ServerOptions()); + server.SetDefaultSystemAccount().ShouldBeNull(); + var systemAccount = server.SystemAccount(); + systemAccount.ShouldNotBeNull(); + + const string subject = "$JS.EVENT.ADVISORY.API"; + AddInterest(systemAccount!, subject); + + Account? capturedAccount = null; + + var result = server.PublishAdvisory( + acc: null, + subject, + new { Event = "ok" }, + sendInternalAccountMessage: (account, _, _) => + { + capturedAccount = account; + return null; + }); + + result.ShouldBeTrue(); + capturedAccount.ShouldBe(systemAccount); + } + + [Fact] + public void PublishAdvisory_NoInterest_ReturnsFalseWithoutSend() + { + var server = NewServer(new ServerOptions { NoSystemAccount = true }); + var account = new Account { Name = "A" }; + var sendCalls = 0; + + var result = server.PublishAdvisory( + account, + "$JS.EVENT.ADVISORY.API", + new { Event = "no-interest" }, + sendInternalAccountMessage: (_, _, _) => + { + sendCalls++; + return null; + }, + hasGatewayInterest: (_, _) => false); + + result.ShouldBeFalse(); + sendCalls.ShouldBe(0); + } + + [Fact] + public void PublishAdvisory_MarshalFailure_ReturnsFalse() + { + var server = NewServer(new ServerOptions { NoSystemAccount = true }); + var account = new Account { Name = "A" }; + AddInterest(account, "$JS.EVENT.ADVISORY.API"); + + var sendCalls = 0; + var advisory = new CyclicAdvisory(); + advisory.Self = advisory; + + var result = server.PublishAdvisory( + account, + "$JS.EVENT.ADVISORY.API", + advisory, + sendInternalAccountMessage: (_, _, _) => + { + sendCalls++; + return null; + }); + + result.ShouldBeFalse(); + sendCalls.ShouldBe(0); + } + + [Fact] + public void PublishAdvisory_SendFailure_ReturnsFalse() + { + var server = NewServer(new ServerOptions { NoSystemAccount = true }); + var account = new Account { Name = "A" }; + AddInterest(account, "$JS.EVENT.ADVISORY.API"); + + var result = server.PublishAdvisory( + account, + "$JS.EVENT.ADVISORY.API", + new { Event = "send-error" }, + sendInternalAccountMessage: (_, _, _) => new InvalidOperationException("send failed")); + + result.ShouldBeFalse(); + } + + [Fact] + public void PublishAdvisory_SendSucceeds_ReturnsTrue() + { + var server = NewServer(new ServerOptions { NoSystemAccount = true }); + var account = new Account { Name = "A" }; + AddInterest(account, "$JS.EVENT.ADVISORY.API"); + + byte[]? payload = null; + + var result = server.PublishAdvisory( + account, + "$JS.EVENT.ADVISORY.API", + new { Event = "sent" }, + sendInternalAccountMessage: (_, _, bytes) => + { + payload = bytes; + return null; + }); + + result.ShouldBeTrue(); + payload.ShouldNotBeNull(); + Encoding.UTF8.GetString(payload!).ShouldContain("\"Event\":\"sent\""); + } + + private static NatsServer NewServer(ServerOptions options) + { + var (server, err) = NatsServer.NewServer(options); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + return server!; + } + + private static void AddInterest(Account account, string subject) + { + account.Sublist ??= SubscriptionIndex.NewSublistWithCache(); + var err = account.Sublist.Insert(new Subscription + { + Subject = Encoding.ASCII.GetBytes(subject), + }); + err.ShouldBeNull(); + } + + private sealed class CyclicAdvisory + { + public CyclicAdvisory? Self { get; set; } + } +} diff --git a/porting.db b/porting.db index bca7a4b2282cc69a2bde36c23032553040d7e196..e798f92dd354e9aefc4a96685ff70b4084d840b5 100644 GIT binary patch delta 902 zcmZY7%}*0S9LDkPl(MwYx>ZU+s4n0O0)2&2sQ7|_0xFgkMMYk=+l6i{ZL_&>M%eE(I#%tX+@lf;T_0k7 z)O*KdGPQPugTcA^7z-s}1`Cuz8Cb!l&&Rf0ddYm%O&r(5Fb3LZ=kDuY?A&BA^PJwU z8*QPFom>sQSIK2C$egW;13PSmZBPylr~oHaLKSR>YN!Dh?0}uXLoL)nJ?w(rum|=+ z1MGuFXo6;Ng9p6e13v_y1zI5pZ4iQX=zuWnhfX*E2jLJLhAub)-OvNQa1@R~AN0cj z9ETHd5(eQEL|_P_5QAYj4QF5k&cZo352J7a#$X&KU=jp~>$WQK$3tT|p_F|k))yOJ z8yg7KYu6mp#96Q|*vtkOarL{dwoFNamQy5^U7g`Ts$SK$*x3RRx`?FJ}xDNyqr-2zCfGX7jOsKd4DMU_kR?e3s%cUr(oY4 zdJ+{q^hJ$e9as55K~rOrl9v=z6ipUVyqw;&>lu)=n5Ia=OmALFYpO?7|4;dI3uPM< zIYu^z3KL>ur#zEQNi#Srr>FU0LA}Ta_$+d9K@*d1zmLzRg!DgmhQfXyo~e`yZ$$_F O^-k>7@4gYIOMe3$tOhm! delta 647 zcmWN{yH8V50KnmUZXdS~TJA+!+EUSis64A)5fL9$idyl3w#7&BQL1rLqhO4K35`Ps zLnJZrU`*52!4V0Gp2fidV~qX*?l`H7n~96R;oH82`ZLD@R_jUe^hqPB8c8wqPWRT- z_aCCaPdt_`Q~&F@;hKJL>0ce-{2=}) zmuaSjoV3i>BCLK}X3;Iy3Fi1zV9oC%Ui=-Pn#D*ohwWq7VHTz#x*?h27W#8+$Q?eHg}mjNkx9aS(@a7)LOM z6pmsXX-wc4j^hL}IEhJ|!fBkrSxn&^&LfK)@|ea2T*M_@MgcRpf~&ZO>nLJY#iFI3 mZyc6$R2m;kT`I8}vYgIx;JIHGKb1oA-