From 804bc89246ed504c8c4fa771cbc5e29edd4ffe23 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:14:58 -0500 Subject: [PATCH] task3: implement batch38 group B lifecycle and advisories --- .../JetStream/NatsConsumer.Advisories.cs | 44 +++++ .../JetStream/NatsConsumer.Lifecycle.cs | 150 ++++++++++++++++++ .../JetStream/NatsConsumerTests.cs | 84 ++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 4 files changed, 278 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs new file mode 100644 index 0000000..71e6678 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs @@ -0,0 +1,44 @@ +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private string? _lastAdvisorySubject; + private byte[]? _lastAdvisoryPayload; + private DateTime _lastAdvisorySent; + + internal bool SendAdvisory(string subject, object advisory) + { + if (string.IsNullOrWhiteSpace(subject) || advisory is null) + return false; + + _mu.EnterWriteLock(); + try + { + _lastAdvisorySubject = subject; + _lastAdvisoryPayload = Encoding.UTF8.GetBytes(advisory.ToString() ?? string.Empty); + _lastAdvisorySent = DateTime.UtcNow; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool SendDeleteAdvisoryLocked() => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerDeleted}.{Stream}.{Name}", new { action = "delete" }); + + internal bool SendPinnedAdvisoryLocked(string pinId) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPinned}.{Stream}.{Name}", new { pin = pinId }); + + internal bool SendUnpinnedAdvisoryLocked(string pinId) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerUnpinned}.{Stream}.{Name}", new { pin = pinId }); + + internal bool SendCreateAdvisory() => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerCreated}.{Stream}.{Name}", new { action = "create" }); + + internal bool SendPauseAdvisoryLocked(DateTime pauseUntil) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPause}.{Stream}.{Name}", new { pauseUntil }); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs new file mode 100644 index 0000000..75256d5 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs @@ -0,0 +1,150 @@ +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private readonly HashSet _internalSubscriptions = new(StringComparer.Ordinal); + private readonly Channel _updateChannel = Channel.CreateBounded(4); + private Channel? _monitorQuitChannel = Channel.CreateBounded(1); + + internal ChannelReader? MonitorQuitC() + { + _mu.EnterReadLock(); + try + { + return _monitorQuitChannel?.Reader; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SignalMonitorQuit() + { + _mu.EnterWriteLock(); + try + { + var channel = _monitorQuitChannel; + if (channel is null) + return; + + channel.Writer.TryWrite(true); + channel.Writer.TryComplete(); + _monitorQuitChannel = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader UpdateC() => _updateChannel.Reader; + + internal bool CheckQueueInterest(string? queue = null) + { + _mu.EnterReadLock(); + try + { + if (_closed) + return false; + + if (_internalSubscriptions.Count > 0) + return true; + + return !string.IsNullOrWhiteSpace(queue) && _internalSubscriptions.Contains(queue); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ClearNode() => ClearRaftNode(); + + internal bool IsLeaderInternal() => IsLeader(); + + internal ConsumerInfo? HandleClusterConsumerInfoRequest() => + IsLeader() && !_closed ? GetInfo() : null; + + internal bool SubscribeInternal(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + _mu.EnterWriteLock(); + try + { + var added = _internalSubscriptions.Add(subject); + if (added) + _updateChannel.Writer.TryWrite(true); + return added; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool Unsubscribe(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + _mu.EnterWriteLock(); + try + { + var removed = _internalSubscriptions.Remove(subject); + if (removed) + _updateChannel.Writer.TryWrite(true); + return removed; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal DateTime CreatedTime() + { + _mu.EnterReadLock(); + try + { + return Created; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCreatedTime(DateTime created) + { + _mu.EnterWriteLock(); + try + { + Created = created; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool HasDeliveryInterest() + { + _mu.EnterReadLock(); + try + { + if (_closed || string.IsNullOrWhiteSpace(Config.DeliverSubject)) + return false; + + return _internalSubscriptions.Contains(Config.DeliverSubject!); + } + finally + { + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index c7ccf81..198002e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -212,4 +212,88 @@ public sealed class NatsConsumerTests consumer!.SetConsumerAssignment(assignment); consumer.ConsumerAssignment().ShouldBeSameAs(assignment); } + + [Fact] + public async Task MonitorQuitC_AndSignalMonitorQuit_ShouldPublishQuitSignal() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + var monitor = consumer!.MonitorQuitC(); + monitor.ShouldNotBeNull(); + monitor!.TryRead(out _).ShouldBeFalse(); + + consumer.SignalMonitorQuit(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var signal = await monitor.ReadAsync(cts.Token); + signal.ShouldBeTrue(); + } + + [Fact] + public void SubscribeInternal_Unsubscribe_AndHasDeliveryInterest_ShouldTrackState() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo" }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.HasDeliveryInterest().ShouldBeFalse(); + consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); + consumer.CheckQueueInterest("deliver.foo").ShouldBeTrue(); + consumer.HasDeliveryInterest().ShouldBeTrue(); + consumer.Unsubscribe("deliver.foo").ShouldBeTrue(); + consumer.HasDeliveryInterest().ShouldBeFalse(); + } + + [Fact] + public void AdvisoryHelpers_AndCreatedTime_ShouldBehave() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.SendCreateAdvisory().ShouldBeTrue(); + consumer.SendDeleteAdvisoryLocked().ShouldBeTrue(); + consumer.SendPinnedAdvisoryLocked("pin-1").ShouldBeTrue(); + consumer.SendUnpinnedAdvisoryLocked("pin-1").ShouldBeTrue(); + consumer.SendPauseAdvisoryLocked(DateTime.UtcNow.AddMinutes(1)).ShouldBeTrue(); + + var created = DateTime.UtcNow.AddHours(-1); + consumer.SetCreatedTime(created); + consumer.CreatedTime().ShouldBe(created); + } + + [Fact] + public void HandleClusterConsumerInfoRequest_WhenLeader_ReturnsInfo() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.HandleClusterConsumerInfoRequest().ShouldBeNull(); + consumer.SetLeader(true, 1); + consumer.IsLeaderInternal().ShouldBeTrue(); + consumer.HandleClusterConsumerInfoRequest().ShouldNotBeNull(); + consumer.ClearNode(); + } } diff --git a/porting.db b/porting.db index 94b683634876f2f4a07e0dade43ef79afc4041ad..e3a156e7a4fe0befaf58a32653e0b6a0c55d5c09 100644 GIT binary patch delta 3184 zcmZ`)3s6+&6~6z!ci(sKT^4!YML;Ax=6(=fD3n&s}{$0qsk2-9+eOz{f zv(eer#j-atCMx6i(I7fxtl;WAj$@3;9@fE_=wa>d8a%9x`4T;>)hn*W9S0*jr4Z%A z|Db9uFD+;qVCbJH6XxcStUUV=m7|@~0vh)ua4`11haF+;Z4awqY{V&hSr+Zs{ zcTRR%chB*;tRE~%)6p%E1|Gc=^4CWc;B(NTu_ zRn*APNfkL6>QGSwLk%jbW9YDo91NAI=maJ%@>GArCM-!sRUUNF962-S^hsWuq>f$N3zG*o8l&vRAVIF#QX0>>VigCX#+ z@F(d1iQYbR%`6O63W@II?!Sop;JzeR6--135pbYV$b$a!nrN6GGZ^4XrLfhj!rn6B zIpf)Tl>##hGf}x$DMh{!`Clw@VwJc@|D||RY{6Hx$=XP*Rg3xi{5}2_OpG)HE0I=w z4Z%_X{taZ?0xU4IC|Q&V8#Z8Zee3z%@Wn%EKm5(@E)2rwF?2qV!WA5fLv&C+=g5aw z9!Pd*U6e$WtB^?Ci(FP$fXn6zbZv44xq@9Gu25H)E8G=vMMe|x{L8F!UvrW}EVMKU zKQn(~PC*}XUz>`I#fFpm3%ct%vo=>Vjv@NJu-^}qnW6V%xeLzkHL1OVph;P6*6Ng? zWK#YW6vW!qDVkt1(74OvJNk)y76zl4&vDgYgwUIE-1`1O@lE*{3?9-fr~b!HIo9nC zUYP@%@%sTBDz&8Bb1@2nEmLwOEHs6MxrYX4{$=Ljohf-U-&f*l18X*$SLT#l3|*Jm zywvk9PRXfWgWct7h4g821ZX~$^I_jXAz?YqV4x<&Q}l&T<<51!f?LZ;JpNO+r&;@%x_y-En;q3PILPMR=4YU*HN~LJNlBK{JImEHE`AZ}IXkO~@DF(rx+3@*vb@E%{hBK+fkrnJ(Orr=avlne=~+2^eGWopLhXb`61thtC_X0+@ zpYz+@@B8oL+wJasG0?5_T!DQ;(~K@n(`G5L=VvTrCtho*E@OLs_)?koQW>Xg+Vob9GalWsuYkT|lwpM*9Rt&ODTzHro zWW^#^+28Sbe;QwU&6#tX(%#-m1u z(PA`l=~WH03Wi5+upi*!`Jwlw>7%WAUd6C@o8|M5Iq~!SkGv;1 zJ6ITu1-bJ*Uc)Q+OdP!--q7_v&B5LO-kRs7M?{$Gp?_&fOw47?IJqchc{R7h&$#a7 zeHFw%6B7-#I}@?fk2o|cI#|zSuFuUk!{wTIUQBg1+b>ubEXQ0Pbt9+4pNGpm_CB8& z)}L7X;vU@E%*T24>-k?MJibwtX~Ukq-%aujiW;)?kfNF_tyff)r3V$w%hCgiW@Krd zqQWe#RWv?JYZOJZ)D#I!;8|)=eZKQ*MZWVYMap@Bns}u`-@0CrZ(XOzw_c&hw_dKu zw_fJ`wS(WAh^me+KQDTe9~3xk2jA5b<9h5DdM}TKPny>=<0YPz<}D06%O}nAe%dH% zqX)0|bESZit`t-BOccV+u~bV4ApgrB#ZtPTldIUTNiVJakIl zjb~R|vyk=%>{NHyC_bR^huLpfk#@kjZ4X)l=0784;U9V;MNuyPqodHibT5GWDdSnU?zZMQ-Ws@~MJ8*Dlqorgk zIo{Al5kJg6!{A9?t4B+sle~Q=`Lk?B=*;&0);Q6X^un|uvc+!7Ag7kSs- z*QK2yd?_aOU5|1j%l^w?nq7net#t9;FG`SkL#*J5-qB2gq%QkwaPV)J<*qvh9(h+L{M6O% zc6_u?7U9`<Av-)t)lPht+?O4sN-ooIA>6n z_`zET)%N_uUTWpyl|h-G%>a@>aiBt=BB04YQ-G!dO#>Hq!9?*QCdx6S<762^-ssOqV=zgF|pemq6K;HoRCQvodVxVsUEdg2z z^lhLTpjx11K+Azv0M!B21FZyF1+*Hd0jLqE31|(_TA+164*)#~v>xaopbbD9fi?kc Z25JV{0@MPu6(|YxFi