task3: implement batch38 group B lifecycle and advisories

This commit is contained in:
Joseph Doherty
2026-03-01 00:14:58 -05:00
parent fce6bd7dca
commit 804bc89246
4 changed files with 278 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
using System.Text;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
private string? _lastAdvisorySubject;
private byte[]? _lastAdvisoryPayload;
private DateTime _lastAdvisorySent;
internal bool SendAdvisory(string subject, object advisory)
{
if (string.IsNullOrWhiteSpace(subject) || advisory is null)
return false;
_mu.EnterWriteLock();
try
{
_lastAdvisorySubject = subject;
_lastAdvisoryPayload = Encoding.UTF8.GetBytes(advisory.ToString() ?? string.Empty);
_lastAdvisorySent = DateTime.UtcNow;
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool SendDeleteAdvisoryLocked() =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerDeleted}.{Stream}.{Name}", new { action = "delete" });
internal bool SendPinnedAdvisoryLocked(string pinId) =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPinned}.{Stream}.{Name}", new { pin = pinId });
internal bool SendUnpinnedAdvisoryLocked(string pinId) =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerUnpinned}.{Stream}.{Name}", new { pin = pinId });
internal bool SendCreateAdvisory() =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerCreated}.{Stream}.{Name}", new { action = "create" });
internal bool SendPauseAdvisoryLocked(DateTime pauseUntil) =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPause}.{Stream}.{Name}", new { pauseUntil });
}

View File

@@ -0,0 +1,150 @@
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
private readonly HashSet<string> _internalSubscriptions = new(StringComparer.Ordinal);
private readonly Channel<bool> _updateChannel = Channel.CreateBounded<bool>(4);
private Channel<bool>? _monitorQuitChannel = Channel.CreateBounded<bool>(1);
internal ChannelReader<bool>? MonitorQuitC()
{
_mu.EnterReadLock();
try
{
return _monitorQuitChannel?.Reader;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SignalMonitorQuit()
{
_mu.EnterWriteLock();
try
{
var channel = _monitorQuitChannel;
if (channel is null)
return;
channel.Writer.TryWrite(true);
channel.Writer.TryComplete();
_monitorQuitChannel = null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ChannelReader<bool> UpdateC() => _updateChannel.Reader;
internal bool CheckQueueInterest(string? queue = null)
{
_mu.EnterReadLock();
try
{
if (_closed)
return false;
if (_internalSubscriptions.Count > 0)
return true;
return !string.IsNullOrWhiteSpace(queue) && _internalSubscriptions.Contains(queue);
}
finally
{
_mu.ExitReadLock();
}
}
internal void ClearNode() => ClearRaftNode();
internal bool IsLeaderInternal() => IsLeader();
internal ConsumerInfo? HandleClusterConsumerInfoRequest() =>
IsLeader() && !_closed ? GetInfo() : null;
internal bool SubscribeInternal(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return false;
_mu.EnterWriteLock();
try
{
var added = _internalSubscriptions.Add(subject);
if (added)
_updateChannel.Writer.TryWrite(true);
return added;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool Unsubscribe(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return false;
_mu.EnterWriteLock();
try
{
var removed = _internalSubscriptions.Remove(subject);
if (removed)
_updateChannel.Writer.TryWrite(true);
return removed;
}
finally
{
_mu.ExitWriteLock();
}
}
internal DateTime CreatedTime()
{
_mu.EnterReadLock();
try
{
return Created;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetCreatedTime(DateTime created)
{
_mu.EnterWriteLock();
try
{
Created = created;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool HasDeliveryInterest()
{
_mu.EnterReadLock();
try
{
if (_closed || string.IsNullOrWhiteSpace(Config.DeliverSubject))
return false;
return _internalSubscriptions.Contains(Config.DeliverSubject!);
}
finally
{
_mu.ExitReadLock();
}
}
}

View File

@@ -212,4 +212,88 @@ public sealed class NatsConsumerTests
consumer!.SetConsumerAssignment(assignment);
consumer.ConsumerAssignment().ShouldBeSameAs(assignment);
}
[Fact]
public async Task MonitorQuitC_AndSignalMonitorQuit_ShouldPublishQuitSignal()
{
var account = new Account { Name = "A" };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] };
var stream = NatsStream.Create(account, streamCfg, null, null, null, null);
stream.ShouldNotBeNull();
var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null);
consumer.ShouldNotBeNull();
var monitor = consumer!.MonitorQuitC();
monitor.ShouldNotBeNull();
monitor!.TryRead(out _).ShouldBeFalse();
consumer.SignalMonitorQuit();
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
var signal = await monitor.ReadAsync(cts.Token);
signal.ShouldBeTrue();
}
[Fact]
public void SubscribeInternal_Unsubscribe_AndHasDeliveryInterest_ShouldTrackState()
{
var account = new Account { Name = "A" };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] };
var stream = NatsStream.Create(account, streamCfg, null, null, null, null);
stream.ShouldNotBeNull();
var consumer = NatsConsumer.Create(
stream!,
new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo" },
ConsumerAction.Create,
null);
consumer.ShouldNotBeNull();
consumer!.HasDeliveryInterest().ShouldBeFalse();
consumer.SubscribeInternal("deliver.foo").ShouldBeTrue();
consumer.CheckQueueInterest("deliver.foo").ShouldBeTrue();
consumer.HasDeliveryInterest().ShouldBeTrue();
consumer.Unsubscribe("deliver.foo").ShouldBeTrue();
consumer.HasDeliveryInterest().ShouldBeFalse();
}
[Fact]
public void AdvisoryHelpers_AndCreatedTime_ShouldBehave()
{
var account = new Account { Name = "A" };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] };
var stream = NatsStream.Create(account, streamCfg, null, null, null, null);
stream.ShouldNotBeNull();
var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null);
consumer.ShouldNotBeNull();
consumer!.SendCreateAdvisory().ShouldBeTrue();
consumer.SendDeleteAdvisoryLocked().ShouldBeTrue();
consumer.SendPinnedAdvisoryLocked("pin-1").ShouldBeTrue();
consumer.SendUnpinnedAdvisoryLocked("pin-1").ShouldBeTrue();
consumer.SendPauseAdvisoryLocked(DateTime.UtcNow.AddMinutes(1)).ShouldBeTrue();
var created = DateTime.UtcNow.AddHours(-1);
consumer.SetCreatedTime(created);
consumer.CreatedTime().ShouldBe(created);
}
[Fact]
public void HandleClusterConsumerInfoRequest_WhenLeader_ReturnsInfo()
{
var account = new Account { Name = "A" };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] };
var stream = NatsStream.Create(account, streamCfg, null, null, null, null);
stream.ShouldNotBeNull();
var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null);
consumer.ShouldNotBeNull();
consumer!.HandleClusterConsumerInfoRequest().ShouldBeNull();
consumer.SetLeader(true, 1);
consumer.IsLeaderInternal().ShouldBeTrue();
consumer.HandleClusterConsumerInfoRequest().ShouldNotBeNull();
consumer.ClearNode();
}
}