feat(batch10): task2 implement jetstream advisory publish

This commit is contained in:
Joseph Doherty
2026-02-28 12:51:02 -05:00
parent 03a3ff3910
commit 4b2875141c
4 changed files with 236 additions and 4 deletions

View File

@@ -0,0 +1,86 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal bool PublishAdvisory(Account? acc, string subject, object advisory) =>
PublishAdvisory(acc, subject, advisory, SendInternalAccountMsg);
internal bool PublishAdvisory(
Account? acc,
string subject,
object advisory,
Func<Account, string, byte[], Exception?> sendInternalAccountMessage,
Func<Account, string, bool>? hasGatewayInterest = null)
{
ArgumentNullException.ThrowIfNull(sendInternalAccountMessage);
var account = acc ?? SystemAccount();
if (account is null)
{
return false;
}
var sublist = account.Sublist;
var gatewayInterestCheck = hasGatewayInterest ?? HasGatewayInterest;
var hasLocalInterest = sublist != null && sublist.HasInterest(subject);
if (!hasLocalInterest && !gatewayInterestCheck(account, subject))
{
return false;
}
byte[] payload;
try
{
payload = JsonSerializer.SerializeToUtf8Bytes(advisory);
}
catch (Exception ex)
{
Warnf("Advisory could not be serialized for account {0}: {1}", account.Name, ex);
return false;
}
Exception? err;
try
{
err = sendInternalAccountMessage(account, subject, payload);
}
catch (Exception ex)
{
err = ex;
}
if (err != null)
{
Warnf("Advisory could not be sent for account {0}: {1}", account.Name, err);
return false;
}
return true;
}
internal Exception? SendInternalAccountMsg(Account account, string subject, byte[] message)
{
try
{
var sendQueue = account.GetSendQueue() ?? NewSendQueue(account);
SendQueue.Send(sendQueue, subject, string.Empty, [], message);
return null;
}
catch (Exception ex)
{
return ex;
}
}
internal bool HasGatewayInterest(Account account, string subject)
{
_ = account;
_ = subject;
return false;
}
}

View File

@@ -0,0 +1,146 @@
using System.Text;
using Shouldly;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server.Tests;
public sealed class NatsServerJetStreamEventsTests
{
[Fact]
public void PublishAdvisory_NullAccount_UsesSystemAccount()
{
var server = NewServer(new ServerOptions());
server.SetDefaultSystemAccount().ShouldBeNull();
var systemAccount = server.SystemAccount();
systemAccount.ShouldNotBeNull();
const string subject = "$JS.EVENT.ADVISORY.API";
AddInterest(systemAccount!, subject);
Account? capturedAccount = null;
var result = server.PublishAdvisory(
acc: null,
subject,
new { Event = "ok" },
sendInternalAccountMessage: (account, _, _) =>
{
capturedAccount = account;
return null;
});
result.ShouldBeTrue();
capturedAccount.ShouldBe(systemAccount);
}
[Fact]
public void PublishAdvisory_NoInterest_ReturnsFalseWithoutSend()
{
var server = NewServer(new ServerOptions { NoSystemAccount = true });
var account = new Account { Name = "A" };
var sendCalls = 0;
var result = server.PublishAdvisory(
account,
"$JS.EVENT.ADVISORY.API",
new { Event = "no-interest" },
sendInternalAccountMessage: (_, _, _) =>
{
sendCalls++;
return null;
},
hasGatewayInterest: (_, _) => false);
result.ShouldBeFalse();
sendCalls.ShouldBe(0);
}
[Fact]
public void PublishAdvisory_MarshalFailure_ReturnsFalse()
{
var server = NewServer(new ServerOptions { NoSystemAccount = true });
var account = new Account { Name = "A" };
AddInterest(account, "$JS.EVENT.ADVISORY.API");
var sendCalls = 0;
var advisory = new CyclicAdvisory();
advisory.Self = advisory;
var result = server.PublishAdvisory(
account,
"$JS.EVENT.ADVISORY.API",
advisory,
sendInternalAccountMessage: (_, _, _) =>
{
sendCalls++;
return null;
});
result.ShouldBeFalse();
sendCalls.ShouldBe(0);
}
[Fact]
public void PublishAdvisory_SendFailure_ReturnsFalse()
{
var server = NewServer(new ServerOptions { NoSystemAccount = true });
var account = new Account { Name = "A" };
AddInterest(account, "$JS.EVENT.ADVISORY.API");
var result = server.PublishAdvisory(
account,
"$JS.EVENT.ADVISORY.API",
new { Event = "send-error" },
sendInternalAccountMessage: (_, _, _) => new InvalidOperationException("send failed"));
result.ShouldBeFalse();
}
[Fact]
public void PublishAdvisory_SendSucceeds_ReturnsTrue()
{
var server = NewServer(new ServerOptions { NoSystemAccount = true });
var account = new Account { Name = "A" };
AddInterest(account, "$JS.EVENT.ADVISORY.API");
byte[]? payload = null;
var result = server.PublishAdvisory(
account,
"$JS.EVENT.ADVISORY.API",
new { Event = "sent" },
sendInternalAccountMessage: (_, _, bytes) =>
{
payload = bytes;
return null;
});
result.ShouldBeTrue();
payload.ShouldNotBeNull();
Encoding.UTF8.GetString(payload!).ShouldContain("\"Event\":\"sent\"");
}
private static NatsServer NewServer(ServerOptions options)
{
var (server, err) = NatsServer.NewServer(options);
err.ShouldBeNull();
server.ShouldNotBeNull();
return server!;
}
private static void AddInterest(Account account, string subject)
{
account.Sublist ??= SubscriptionIndex.NewSublistWithCache();
var err = account.Sublist.Insert(new Subscription
{
Subject = Encoding.ASCII.GetBytes(subject),
});
err.ShouldBeNull();
}
private sealed class CyclicAdvisory
{
public CyclicAdvisory? Self { get; set; }
}
}

Binary file not shown.

View File

@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
Generated: 2026-02-28 17:44:45 UTC
Generated: 2026-02-28 17:51:02 UTC
## Modules (12 total)
@@ -13,10 +13,10 @@ Generated: 2026-02-28 17:44:45 UTC
| Status | Count |
|--------|-------|
| complete | 14 |
| deferred | 1969 |
| deferred | 1968 |
| n_a | 24 |
| stub | 1 |
| verified | 1665 |
| verified | 1666 |
## Unit Tests (3257 total)
@@ -35,4 +35,4 @@ Generated: 2026-02-28 17:44:45 UTC
## Overall Progress
**3016/6942 items complete (43.4%)**
**3017/6942 items complete (43.5%)**