diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs new file mode 100644 index 0000000..71e6678 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs @@ -0,0 +1,44 @@ +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private string? _lastAdvisorySubject; + private byte[]? _lastAdvisoryPayload; + private DateTime _lastAdvisorySent; + + internal bool SendAdvisory(string subject, object advisory) + { + if (string.IsNullOrWhiteSpace(subject) || advisory is null) + return false; + + _mu.EnterWriteLock(); + try + { + _lastAdvisorySubject = subject; + _lastAdvisoryPayload = Encoding.UTF8.GetBytes(advisory.ToString() ?? string.Empty); + _lastAdvisorySent = DateTime.UtcNow; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool SendDeleteAdvisoryLocked() => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerDeleted}.{Stream}.{Name}", new { action = "delete" }); + + internal bool SendPinnedAdvisoryLocked(string pinId) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPinned}.{Stream}.{Name}", new { pin = pinId }); + + internal bool SendUnpinnedAdvisoryLocked(string pinId) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerUnpinned}.{Stream}.{Name}", new { pin = pinId }); + + internal bool SendCreateAdvisory() => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerCreated}.{Stream}.{Name}", new { action = "create" }); + + internal bool SendPauseAdvisoryLocked(DateTime pauseUntil) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPause}.{Stream}.{Name}", new { pauseUntil }); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs new file mode 100644 index 0000000..75256d5 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs @@ -0,0 +1,150 @@ +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private readonly HashSet _internalSubscriptions = new(StringComparer.Ordinal); + private readonly Channel _updateChannel = Channel.CreateBounded(4); + private Channel? _monitorQuitChannel = Channel.CreateBounded(1); + + internal ChannelReader? MonitorQuitC() + { + _mu.EnterReadLock(); + try + { + return _monitorQuitChannel?.Reader; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SignalMonitorQuit() + { + _mu.EnterWriteLock(); + try + { + var channel = _monitorQuitChannel; + if (channel is null) + return; + + channel.Writer.TryWrite(true); + channel.Writer.TryComplete(); + _monitorQuitChannel = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader UpdateC() => _updateChannel.Reader; + + internal bool CheckQueueInterest(string? queue = null) + { + _mu.EnterReadLock(); + try + { + if (_closed) + return false; + + if (_internalSubscriptions.Count > 0) + return true; + + return !string.IsNullOrWhiteSpace(queue) && _internalSubscriptions.Contains(queue); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ClearNode() => ClearRaftNode(); + + internal bool IsLeaderInternal() => IsLeader(); + + internal ConsumerInfo? HandleClusterConsumerInfoRequest() => + IsLeader() && !_closed ? GetInfo() : null; + + internal bool SubscribeInternal(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + _mu.EnterWriteLock(); + try + { + var added = _internalSubscriptions.Add(subject); + if (added) + _updateChannel.Writer.TryWrite(true); + return added; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool Unsubscribe(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + _mu.EnterWriteLock(); + try + { + var removed = _internalSubscriptions.Remove(subject); + if (removed) + _updateChannel.Writer.TryWrite(true); + return removed; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal DateTime CreatedTime() + { + _mu.EnterReadLock(); + try + { + return Created; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCreatedTime(DateTime created) + { + _mu.EnterWriteLock(); + try + { + Created = created; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool HasDeliveryInterest() + { + _mu.EnterReadLock(); + try + { + if (_closed || string.IsNullOrWhiteSpace(Config.DeliverSubject)) + return false; + + return _internalSubscriptions.Contains(Config.DeliverSubject!); + } + finally + { + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index c7ccf81..198002e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -212,4 +212,88 @@ public sealed class NatsConsumerTests consumer!.SetConsumerAssignment(assignment); consumer.ConsumerAssignment().ShouldBeSameAs(assignment); } + + [Fact] + public async Task MonitorQuitC_AndSignalMonitorQuit_ShouldPublishQuitSignal() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + var monitor = consumer!.MonitorQuitC(); + monitor.ShouldNotBeNull(); + monitor!.TryRead(out _).ShouldBeFalse(); + + consumer.SignalMonitorQuit(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var signal = await monitor.ReadAsync(cts.Token); + signal.ShouldBeTrue(); + } + + [Fact] + public void SubscribeInternal_Unsubscribe_AndHasDeliveryInterest_ShouldTrackState() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo" }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.HasDeliveryInterest().ShouldBeFalse(); + consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); + consumer.CheckQueueInterest("deliver.foo").ShouldBeTrue(); + consumer.HasDeliveryInterest().ShouldBeTrue(); + consumer.Unsubscribe("deliver.foo").ShouldBeTrue(); + consumer.HasDeliveryInterest().ShouldBeFalse(); + } + + [Fact] + public void AdvisoryHelpers_AndCreatedTime_ShouldBehave() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.SendCreateAdvisory().ShouldBeTrue(); + consumer.SendDeleteAdvisoryLocked().ShouldBeTrue(); + consumer.SendPinnedAdvisoryLocked("pin-1").ShouldBeTrue(); + consumer.SendUnpinnedAdvisoryLocked("pin-1").ShouldBeTrue(); + consumer.SendPauseAdvisoryLocked(DateTime.UtcNow.AddMinutes(1)).ShouldBeTrue(); + + var created = DateTime.UtcNow.AddHours(-1); + consumer.SetCreatedTime(created); + consumer.CreatedTime().ShouldBe(created); + } + + [Fact] + public void HandleClusterConsumerInfoRequest_WhenLeader_ReturnsInfo() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.HandleClusterConsumerInfoRequest().ShouldBeNull(); + consumer.SetLeader(true, 1); + consumer.IsLeaderInternal().ShouldBeTrue(); + consumer.HandleClusterConsumerInfoRequest().ShouldNotBeNull(); + consumer.ClearNode(); + } } diff --git a/porting.db b/porting.db index 94b6836..e3a156e 100644 Binary files a/porting.db and b/porting.db differ