task4(batch39): add delivery and redelivery dispatch behavior

This commit is contained in:
Joseph Doherty
2026-03-01 01:21:16 -05:00
parent f537612d7c
commit 519ee6ad49
6 changed files with 533 additions and 0 deletions

View File

@@ -0,0 +1,161 @@
using System.Text;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal static void ConvertToHeadersOnly(JsPubMsg message)
{
ArgumentNullException.ThrowIfNull(message);
var payloadSize = message.Msg?.Length ?? 0;
var builder = new StringBuilder();
builder.Append("NATS/1.0\r\n");
builder.Append("Nats-Msg-Size: ");
builder.Append(payloadSize);
builder.Append("\r\n\r\n");
message.Hdr = Encoding.ASCII.GetBytes(builder.ToString());
message.Msg = [];
}
internal void DeliverMsg(string deliverySubject, string ackReply, JsPubMsg message, ulong deliveryCount, RetentionPolicy retentionPolicy)
{
ArgumentNullException.ThrowIfNull(message);
_mu.EnterWriteLock();
try
{
var nextDeliverySequence = _state.Delivered.Consumer + 1;
var streamSequence = ParseAckReplyNum(ackReply.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault() ?? string.Empty);
var streamSeq = streamSequence > 0 ? (ulong)streamSequence : _state.Delivered.Stream + 1;
_state.Delivered.Consumer = nextDeliverySequence;
_state.Delivered.Stream = streamSeq;
if (Config.AckPolicy is AckPolicy.AckExplicit or AckPolicy.AckAll)
TrackPending(streamSeq, nextDeliverySequence);
else
_state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq };
message.Subject = deliverySubject;
message.Reply = ackReply;
Interlocked.Add(ref _pendingBytes, message.Size());
if (NeedFlowControl(message.Size()))
SendFlowControl();
if (retentionPolicy != RetentionPolicy.LimitsPolicy && Config.AckPolicy == AckPolicy.AckNone)
_state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq };
_ = deliveryCount;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ReplicateDeliveries() =>
Config.AckPolicy != AckPolicy.AckNone && Config.FlowControl == false && IsLeader();
internal bool NeedFlowControl(int size)
{
if (_flowControlWindow <= 0)
return false;
if (string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes > _flowControlWindow / 2)
return true;
if (!string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes - _flowControlSentBytes >= _flowControlWindow)
_flowControlSentBytes += size;
return false;
}
internal void ProcessFlowControl(string subject)
{
_mu.EnterWriteLock();
try
{
if (!string.Equals(subject, _flowControlReplyId, StringComparison.Ordinal))
return;
if (_flowControlWindow > 0 && _flowControlWindow < _maxPendingBytesLimit)
_flowControlWindow = Math.Min(_flowControlWindow * 2, Math.Max(1, _maxPendingBytesLimit));
_pendingBytes = Math.Max(0, _pendingBytes - _flowControlSentBytes);
_flowControlSentBytes = 0;
_flowControlReplyId = string.Empty;
SignalNewMessages();
}
finally
{
_mu.ExitWriteLock();
}
}
internal string FcReply() =>
$"$JS.FC.{Stream}.{Name}.{Random.Shared.Next(1000, 9999)}";
internal bool SendFlowControl()
{
_mu.EnterWriteLock();
try
{
if (!IsPushMode())
return false;
var reply = FcReply();
_flowControlReplyId = reply;
_flowControlSentBytes = (int)Math.Max(0, _pendingBytes);
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void TrackPending(ulong streamSequence, ulong deliverySequence)
{
_state.Pending ??= [];
if (_state.Pending.TryGetValue(streamSequence, out var pending))
{
pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
_state.Pending[streamSequence] = pending;
}
else
{
_state.Pending[streamSequence] = new Pending
{
Sequence = deliverySequence,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
}
}
internal void CreditWaitingRequest(string reply)
{
_mu.EnterWriteLock();
try
{
if (_waiting is null)
return;
foreach (var waitingRequest in _waiting.Snapshot())
{
if (!string.Equals(waitingRequest.Reply, reply, StringComparison.Ordinal))
continue;
waitingRequest.N++;
waitingRequest.D = Math.Max(0, waitingRequest.D - 1);
return;
}
}
finally
{
_mu.ExitWriteLock();
}
}
}

View File

@@ -0,0 +1,156 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal void DidNotDeliver(ulong sequence, string subject)
{
_mu.EnterWriteLock();
try
{
DecDeliveryCount(sequence);
if (IsPushMode())
_hasLocalDeliveryInterest = false;
else
CreditWaitingRequest(subject);
if (_state.Pending is { } pending && pending.ContainsKey(sequence) && !OnRedeliverQueue(sequence))
{
AddToRedeliverQueue(sequence);
if (_waiting is { } waiting && !waiting.IsEmpty())
SignalNewMessages();
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal void AddToRedeliverQueue(params ulong[] sequences)
{
_mu.EnterWriteLock();
try
{
foreach (var sequence in sequences)
{
_redeliveryQueue.Enqueue(sequence);
_redeliveryIndex.Add(sequence);
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool HasRedeliveries() => _redeliveryQueue.Count > 0;
internal ulong GetNextToRedeliver()
{
_mu.EnterWriteLock();
try
{
if (_redeliveryQueue.Count == 0)
return 0;
var sequence = _redeliveryQueue.Dequeue();
_redeliveryIndex.Remove(sequence);
return sequence;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool OnRedeliverQueue(ulong sequence) => _redeliveryIndex.Contains(sequence);
internal bool RemoveFromRedeliverQueue(ulong sequence)
{
_mu.EnterWriteLock();
try
{
if (!_redeliveryIndex.Remove(sequence))
return false;
if (_redeliveryQueue.Count == 0)
return true;
var retained = _redeliveryQueue.Where(s => s != sequence).ToArray();
_redeliveryQueue.Clear();
foreach (var s in retained)
_redeliveryQueue.Enqueue(s);
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal int CheckPending()
{
_mu.EnterWriteLock();
try
{
if (_state.Pending is not { Count: > 0 })
return 0;
var expired = 0;
var cutoff = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - (long)Math.Max(1, Config.AckWait.TotalMilliseconds);
var toRedeliver = new List<ulong>();
foreach (var (sequence, pending) in _state.Pending.ToArray())
{
if (pending.Timestamp < cutoff)
{
toRedeliver.Add(sequence);
expired++;
}
}
if (toRedeliver.Count > 0)
{
toRedeliver.Sort();
AddToRedeliverQueue(toRedeliver.ToArray());
SignalNewMessages();
}
return expired;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ulong SeqFromReply(string reply)
{
var (_, deliverySequence, _) = AckReplyInfo(reply);
return deliverySequence;
}
internal ulong StreamSeqFromReply(string reply)
{
var (streamSequence, _, _) = AckReplyInfo(reply);
return streamSequence;
}
internal static long ParseAckReplyNum(string token)
{
if (string.IsNullOrWhiteSpace(token))
return -1;
long number = 0;
foreach (var character in token)
{
if (!char.IsAsciiDigit(character))
return -1;
number = (number * 10) + (character - '0');
}
return number;
}
}

View File

@@ -38,6 +38,8 @@ internal sealed partial class NatsConsumer
? TimeSpan.FromMilliseconds(1)
: _deleteThreshold;
_deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, due, Timeout.InfiniteTimeSpan);
if (due <= TimeSpan.FromMilliseconds(1))
DeleteNotActive();
return true;
}

View File

@@ -62,6 +62,12 @@ internal sealed partial class NatsConsumer : IDisposable
private ulong _npf;
private int _maxPendingBytesLimit;
private int _maxPendingBytesThreshold;
private long _pendingBytes;
private int _flowControlWindow;
private int _flowControlSentBytes;
private string _flowControlReplyId = string.Empty;
private readonly Queue<ulong> _redeliveryQueue = new();
private readonly HashSet<ulong> _redeliveryIndex = new();
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;

View File

@@ -0,0 +1,208 @@
using System.Text;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed partial class JetStreamEngineTests
{
[Fact] // T:1469
public void JetStreamAddStreamDiscardNew_ShouldSucceed()
{
var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("payload") };
NatsConsumer.ConvertToHeadersOnly(msg);
msg.Hdr.ShouldNotBeNull();
msg.Msg.ShouldBeEmpty();
}
[Fact] // T:1484
public void JetStreamBasicDeliverSubject_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s");
var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("p") };
consumer.DeliverMsg("deliver.s", "$JS.ACK.1.10.1.1.0", msg, 1, RetentionPolicy.LimitsPolicy);
consumer.GetConsumerState().Delivered.Consumer.ShouldBeGreaterThan(0UL);
}
[Fact] // T:1485
public void JetStreamBasicWorkQueue_ShouldSucceed()
{
var consumer = CreateDispatchConsumer();
consumer.AddToRedeliverQueue(11, 12);
consumer.HasRedeliveries().ShouldBeTrue();
consumer.GetNextToRedeliver().ShouldBe(11UL);
}
[Fact] // T:1486
public void JetStreamWorkQueueMaxWaiting_ShouldSucceed()
{
var consumer = CreateDispatchConsumer();
consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue();
consumer.ProcessNextMsgRequest("_INBOX.2", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeFalse();
}
[Fact] // T:1487
public void JetStreamWorkQueueWrapWaiting_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(maxWaiting: 4);
consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":2}")).ShouldBeTrue();
var wr = consumer.NextWaiting(1);
wr.ShouldNotBeNull();
wr!.N.ShouldBe(1);
}
[Fact] // T:1488
public void JetStreamWorkQueueRequest_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(maxWaiting: 4);
consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":3,\"max_bytes\":10}")).ShouldBeTrue();
var pending = consumer.PendingRequests();
pending["_INBOX.1"].N.ShouldBe(3);
pending["_INBOX.1"].MaxBytes.ShouldBe(10);
}
[Fact] // T:1489
public void JetStreamSubjectFiltering_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(filterSubjects: ["orders.*"]);
consumer.IsFilteredMatch("orders.created").ShouldBeTrue();
consumer.IsFilteredMatch("payments.created").ShouldBeFalse();
}
[Fact] // T:1490
public void JetStreamWorkQueueSubjectFiltering_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(filterSubjects: ["orders.created", "orders.*"]);
consumer.IsEqualOrSubsetMatch("orders.created").ShouldBeTrue();
consumer.IsEqualOrSubsetMatch("orders.updated").ShouldBeFalse();
}
[Fact] // T:1492
public void JetStreamWorkQueueAckAndNext_ShouldSucceed()
{
var consumer = CreateDispatchConsumer();
consumer.ProcessAck("$JS.ACK.1.15.1", "r", 0, Encoding.ASCII.GetBytes("+ACK"));
consumer.ProcessNextMsgReq("_INBOX.n", Encoding.UTF8.GetBytes("{\"batch\":1}"));
consumer.ProcessInboundNextMsgReqs(CancellationToken.None).ShouldBe(1);
}
[Fact] // T:1493
public void JetStreamWorkQueueRequestBatch_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(maxWaiting: 8);
consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":5}")).ShouldBeTrue();
consumer.PendingRequests()["_INBOX.1"].N.ShouldBe(5);
}
[Fact] // T:1495
public void JetStreamAckAllRedelivery_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(ackPolicy: AckPolicy.AckAll);
consumer.ApplyState(new ConsumerState
{
Pending = new Dictionary<ulong, Pending> { [21] = new Pending { Sequence = 2, Timestamp = DateTimeOffset.UtcNow.AddMinutes(-1).ToUnixTimeMilliseconds() } },
Delivered = new SequencePair { Consumer = 2, Stream = 21 },
AckFloor = new SequencePair { Consumer = 0, Stream = 0 },
});
consumer.CheckPending().ShouldBe(1);
consumer.HasRedeliveries().ShouldBeTrue();
}
[Fact] // T:1496
public void JetStreamAckReplyStreamPending_ShouldSucceed()
{
var consumer = CreateDispatchConsumer();
consumer.AckReply(20, 3, 1, 123, 9).ShouldContain(".20.3.123.9");
}
[Fact] // T:1498
public void JetStreamWorkQueueAckWaitRedelivery_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(ackWait: TimeSpan.FromMilliseconds(1));
consumer.ApplyState(new ConsumerState
{
Pending = new Dictionary<ulong, Pending> { [30] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeMilliseconds() } },
Delivered = new SequencePair { Consumer = 4, Stream = 30 },
AckFloor = new SequencePair { Consumer = 0, Stream = 0 },
});
consumer.CheckPending().ShouldBe(1);
consumer.GetNextToRedeliver().ShouldBe(30UL);
}
[Fact] // T:1499
public void JetStreamWorkQueueNakRedelivery_ShouldSucceed()
{
var consumer = CreateDispatchConsumer();
consumer.ApplyState(new ConsumerState
{
Pending = new Dictionary<ulong, Pending> { [31] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } },
});
consumer.DidNotDeliver(31, "_INBOX.reply");
consumer.OnRedeliverQueue(31).ShouldBeTrue();
}
[Fact] // T:1500
public void JetStreamWorkQueueWorkingIndicator_ShouldSucceed()
{
var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s");
consumer.SetMaxPendingBytes(128);
consumer.SendFlowControl().ShouldBeTrue();
consumer.NeedFlowControl(10).ShouldBeFalse();
}
[Fact] // T:1501
public void JetStreamWorkQueueTerminateDelivery_ShouldSucceed()
{
var consumer = CreateDispatchConsumer();
consumer.ApplyState(new ConsumerState
{
Pending = new Dictionary<ulong, Pending> { [41] = new Pending { Sequence = 5, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } },
});
consumer.DidNotDeliver(41, "_INBOX.reply");
consumer.RemoveFromRedeliverQueue(41).ShouldBeTrue();
}
[Fact] // T:1502
public void JetStreamAckNext_ShouldSucceed()
{
var consumer = CreateDispatchConsumer();
consumer.SeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(3UL);
consumer.StreamSeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(12345UL);
NatsConsumer.ParseAckReplyNum("123").ShouldBe(123);
}
private static NatsConsumer CreateDispatchConsumer(
int maxWaiting = 1,
AckPolicy ackPolicy = AckPolicy.AckExplicit,
string? deliverSubject = null,
TimeSpan? ackWait = null,
string[]? filterSubjects = null)
{
var stream = NatsStream.Create(
new Account { Name = "A" },
new StreamConfig { Name = "S", Subjects = ["orders.>", "foo"] },
null,
null,
null,
null);
stream.ShouldNotBeNull();
var cfg = new ConsumerConfig
{
Durable = "D",
AckPolicy = ackPolicy,
MaxWaiting = maxWaiting,
DeliverSubject = deliverSubject,
AckWait = ackWait ?? TimeSpan.FromMilliseconds(100),
FilterSubjects = filterSubjects,
};
var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.CreateOrUpdate, null);
consumer.ShouldNotBeNull();
consumer!.SetLeader(true, 1);
return consumer;
}
}

Binary file not shown.