task3(batch39): implement dispatch core and ack-floor processing

This commit is contained in:
Joseph Doherty
2026-03-01 01:16:27 -05:00
parent 0760c550b4
commit f537612d7c
4 changed files with 478 additions and 0 deletions

View File

@@ -0,0 +1,313 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal (JsPubMsg? Message, ulong DeliveryCount, Exception? Error) GetNextMsg()
{
_mu.EnterWriteLock();
try
{
if (_closed)
return (null, 0, new InvalidOperationException("consumer not valid"));
if (_state.Pending is { Count: > 0 })
{
var sequence = _state.Pending.Keys.Min();
var deliveryCount = IncDeliveryCount(sequence);
var message = new JsPubMsg
{
Subject = Config.DeliverSubject ?? string.Empty,
Reply = AckReply(sequence, _state.Delivered.Consumer + 1, deliveryCount, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), NumPending()),
Msg = [],
};
return (message, deliveryCount, null);
}
return (null, 0, null);
}
finally
{
_mu.ExitWriteLock();
}
}
internal (int Expired, int Waiting, int BatchRequestsPending, DateTime FirstExpiration) ProcessWaiting(bool endOfStream)
{
_mu.EnterWriteLock();
try
{
var firstExpiration = DateTime.MinValue;
if (_waiting is null || _waiting.IsEmpty())
return (0, 0, 0, firstExpiration);
var expired = 0;
var batchRequestsPending = 0;
var now = DateTime.UtcNow;
var toRemove = new List<WaitingRequest>();
foreach (var waitingRequest in _waiting.Snapshot())
{
var isExpired = waitingRequest.Expires is DateTime expiresAt && now >= expiresAt;
if ((endOfStream && waitingRequest.NoWait == 1) || isExpired)
{
toRemove.Add(waitingRequest);
expired++;
continue;
}
batchRequestsPending += Math.Max(0, waitingRequest.N);
if (waitingRequest.Expires is DateTime wrExpires &&
(firstExpiration == DateTime.MinValue || wrExpires < firstExpiration))
{
firstExpiration = wrExpires;
}
}
foreach (var waitingRequest in toRemove)
_waiting.Remove(null, waitingRequest);
return (expired, _waiting.Len, batchRequestsPending, firstExpiration);
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool CheckWaitingForInterest()
{
var (_, waiting, _, _) = ProcessWaiting(endOfStream: true);
return waiting > 0;
}
internal (TimeSpan Duration, Timer? Timer) HbTimer()
{
if (Config.Heartbeat <= TimeSpan.Zero)
return (TimeSpan.Zero, null);
return (Config.Heartbeat, new Timer(static _ => { }, null, Config.Heartbeat, Timeout.InfiniteTimeSpan));
}
internal void CheckAckFloor()
{
_mu.EnterWriteLock();
try
{
if (_closed || _state.Pending is not { Count: > 0 })
return;
var minPending = _state.Pending.OrderBy(static pair => pair.Key).First();
var pendingStream = minPending.Key;
var pendingConsumer = minPending.Value?.Sequence ?? pendingStream;
var desiredStreamFloor = pendingStream > 0 ? pendingStream - 1 : 0;
var desiredConsumerFloor = pendingConsumer > 0 ? pendingConsumer - 1 : 0;
if (_state.AckFloor.Stream < desiredStreamFloor)
_state.AckFloor.Stream = desiredStreamFloor;
if (_state.AckFloor.Consumer < desiredConsumerFloor)
_state.AckFloor.Consumer = desiredConsumerFloor;
}
finally
{
_mu.ExitWriteLock();
}
}
internal int ProcessInboundAcks(CancellationToken cancellationToken)
{
var processed = 0;
while (!cancellationToken.IsCancellationRequested)
{
JsAckMsg? ack;
_mu.EnterWriteLock();
try
{
if (_ackQueue.Count == 0)
break;
ack = _ackQueue.Dequeue();
}
finally
{
_mu.ExitWriteLock();
}
ProcessAck(ack.Subject, ack.Reply, ack.HeaderBytes, ack.Msg);
processed++;
}
return processed;
}
internal int ProcessInboundNextMsgReqs(CancellationToken cancellationToken)
{
var processed = 0;
while (!cancellationToken.IsCancellationRequested)
{
NextMsgReq? request;
_mu.EnterWriteLock();
try
{
if (_nextMsgReqs is null || _nextMsgReqs.Count == 0)
break;
request = _nextMsgReqs.Dequeue();
}
finally
{
_mu.ExitWriteLock();
}
_ = ProcessNextMsgRequest(request.Reply, request.Message);
request.ReturnToPool();
processed++;
}
return processed;
}
internal void SuppressDeletion()
{
_mu.EnterWriteLock();
try
{
if (_closed || _deleteTimer is null || _deleteThreshold <= TimeSpan.Zero)
return;
_deleteTimer.Change(_deleteThreshold, Timeout.InfiniteTimeSpan);
}
finally
{
_mu.ExitWriteLock();
}
}
internal int LoopAndGatherMsgs(int maxIterations, CancellationToken cancellationToken)
{
var delivered = 0;
for (var i = 0; i < maxIterations && !cancellationToken.IsCancellationRequested; i++)
{
var (message, _, error) = GetNextMsg();
if (error is not null || message is null)
break;
delivered++;
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, (ulong)delivered);
_state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, (ulong)delivered);
var expired = ProcessWaiting(endOfStream: false);
if (expired.Waiting == 0)
break;
}
return delivered;
}
internal string SendIdleHeartbeat(string subject)
{
var streamSequence = _state.Delivered.Stream;
var consumerSequence = _state.Delivered.Consumer;
var heartbeat = $"NATS/1.0 100 Idle Heartbeat\r\nNats-Last-Consumer: {consumerSequence}\r\nNats-Last-Stream: {streamSequence}\r\n\r\n";
_ = SendAdvisory(subject, heartbeat);
return heartbeat;
}
internal string AckReply(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, long timestamp, ulong pending) =>
$"$JS.ACK.{deliveryCount}.{streamSequence}.{deliverySequence}.{timestamp}.{pending}";
internal void SetMaxPendingBytes(int limit)
{
_mu.EnterWriteLock();
try
{
_maxPendingBytesLimit = Math.Max(0, limit);
_maxPendingBytesThreshold = Math.Max(1, _maxPendingBytesLimit / 16);
}
finally
{
_mu.ExitWriteLock();
}
}
internal (ulong Pending, Exception? Error) CheckNumPending()
{
_mu.EnterWriteLock();
try
{
if (_npc < 0)
_npc = 0;
var (pending, floor, error) = CalculateNumPending();
if (error is not null)
return (0, error);
_npc = (long)pending;
_npf = floor;
return (NumPending(), null);
}
finally
{
_mu.ExitWriteLock();
}
}
internal ulong NumPending() => _npc < 0 ? 0UL : (ulong)_npc;
internal void CheckNumPendingOnEOF()
{
_mu.EnterWriteLock();
try
{
if (_npc < 0)
_npc = 0;
if (_state.Delivered.Stream <= _state.AckFloor.Stream)
{
_npc = 0;
_npf = _state.Delivered.Stream;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal (ulong Pending, Exception? Error) StreamNumPendingLocked()
{
_mu.EnterWriteLock();
try
{
return StreamNumPending();
}
finally
{
_mu.ExitWriteLock();
}
}
internal (ulong Pending, Exception? Error) StreamNumPending()
{
var (pending, floor, error) = CalculateNumPending();
if (error is not null)
return (0, error);
_npc = (long)pending;
_npf = floor;
return (NumPending(), null);
}
internal (ulong Pending, ulong Floor, Exception? Error) CalculateNumPending()
{
if (_closed)
return (0, 0, null);
var delivered = _state.Delivered.Stream;
var acked = _state.AckFloor.Stream;
if (delivered <= acked)
return (0, delivered, null);
return (delivered - acked, delivered, null);
}
}

View File

@@ -58,6 +58,10 @@ internal sealed partial class NatsConsumer : IDisposable
private string _currentPinId = string.Empty;
private DateTime _pinnedTs;
private Dictionary<ulong, ulong>? _rdc;
private long _npc;
private ulong _npf;
private int _maxPendingBytesLimit;
private int _maxPendingBytesThreshold;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;