task5: implement batch38 group D proposal and ack flow
This commit is contained in:
@@ -87,6 +87,69 @@ internal sealed partial class NatsConsumer
|
||||
SendAckReply(reply);
|
||||
}
|
||||
|
||||
internal bool ProcessNak(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, byte[] message)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(message);
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_state.Redelivered ??= [];
|
||||
_state.Redelivered[streamSequence] = Math.Max(deliveryCount + 1, _state.Redelivered.GetValueOrDefault(streamSequence));
|
||||
_state.Pending ??= [];
|
||||
_state.Pending[streamSequence] = new Pending
|
||||
{
|
||||
Sequence = deliverySequence,
|
||||
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
};
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal bool ProcessTerm(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reason, string reply)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_state.Pending?.Remove(streamSequence);
|
||||
_state.Redelivered ??= [];
|
||||
_state.Redelivered[streamSequence] = Math.Max(deliveryCount, _state.Redelivered.GetValueOrDefault(streamSequence));
|
||||
if (!string.IsNullOrWhiteSpace(reply))
|
||||
_lastAckReplySubject = reply;
|
||||
_ = reason;
|
||||
_ = deliverySequence;
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal TimeSpan AckWait(TimeSpan backOff)
|
||||
{
|
||||
if (backOff > TimeSpan.Zero)
|
||||
return backOff;
|
||||
|
||||
return Config.AckWait > TimeSpan.Zero ? Config.AckWait : DefaultAckWait;
|
||||
}
|
||||
|
||||
internal bool CheckRedelivered(ulong streamSequence)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return _state.Redelivered?.TryGetValue(streamSequence, out var count) == true && count > 1;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void ProgressUpdate(ulong sequence)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
|
||||
@@ -339,4 +339,247 @@ internal sealed partial class NatsConsumer
|
||||
PriorityPolicy = cfg.PriorityPolicy,
|
||||
PinnedTTL = cfg.PinnedTTL,
|
||||
};
|
||||
|
||||
internal void ResetLocalStartingSeq(ulong sequence)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_state.Delivered.Stream = sequence;
|
||||
_state.Delivered.Consumer = sequence;
|
||||
_state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0;
|
||||
_state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0;
|
||||
_state.Pending = [];
|
||||
_state.Redelivered = [];
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal int LoopAndForwardProposals()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var count = 0;
|
||||
while (_proposalQueue.TryDequeue(out _))
|
||||
{
|
||||
count++;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void Propose(byte[] proposal)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(proposal);
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_proposalQueue.Enqueue((byte[])proposal.Clone());
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void UpdateDelivered(ulong consumerSequence, ulong streamSequence, ulong deliveryCount, long timestamp)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, consumerSequence);
|
||||
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSequence);
|
||||
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, consumerSequence > 0 ? consumerSequence - 1 : 0);
|
||||
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSequence > 0 ? streamSequence - 1 : 0);
|
||||
_state.Pending ??= [];
|
||||
_state.Pending[streamSequence] = new Pending
|
||||
{
|
||||
Sequence = consumerSequence,
|
||||
Timestamp = timestamp,
|
||||
};
|
||||
|
||||
_state.Redelivered ??= [];
|
||||
if (deliveryCount > 1)
|
||||
_state.Redelivered[streamSequence] = deliveryCount;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void AddAckReply(ulong streamSequence, string reply)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(reply))
|
||||
return;
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_ackReplies[streamSequence] = reply;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void AddReplicatedQueuedMsg(ulong sequence, JsPubMsg message)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(message);
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_state.Pending ??= [];
|
||||
_state.Pending[sequence] = new Pending
|
||||
{
|
||||
Sequence = sequence,
|
||||
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
};
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal int UpdateAcks()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var count = _ackReplies.Count;
|
||||
_ackReplies.Clear();
|
||||
return count;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void AddClusterPendingRequest(string requestId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(requestId))
|
||||
return;
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_clusterPendingRequests[requestId] = DateTime.UtcNow;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void RemoveClusterPendingRequest(string requestId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(requestId))
|
||||
return;
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_clusterPendingRequests.Remove(requestId);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void SetPendingRequestsOk(bool ok)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_pendingRequestsOk = ok;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal bool PendingRequestsOk()
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return _pendingRequestsOk;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal bool CheckAndSetPendingRequestsOk(bool ok)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var previous = _pendingRequestsOk;
|
||||
_pendingRequestsOk = ok;
|
||||
return previous;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal int CheckPendingRequests(TimeSpan maxAge)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
var cutoff = DateTime.UtcNow - maxAge;
|
||||
return _clusterPendingRequests.Values.Count(timestamp => timestamp >= cutoff);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal int ReleaseAnyPendingRequests()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var released = _clusterPendingRequests.Count;
|
||||
_clusterPendingRequests.Clear();
|
||||
_pendingRequestsOk = true;
|
||||
return released;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal ConsumerState ReadStoredState()
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return GetConsumerState();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +48,10 @@ internal sealed partial class NatsConsumer : IDisposable
|
||||
private bool _hasLocalDeliveryInterest;
|
||||
private long _rateLimitBitsPerSecond;
|
||||
private int _rateLimitBurstBytes;
|
||||
private readonly Queue<byte[]> _proposalQueue = new();
|
||||
private readonly Dictionary<ulong, string> _ackReplies = new();
|
||||
private readonly Dictionary<string, DateTime> _clusterPendingRequests = new(StringComparer.Ordinal);
|
||||
private bool _pendingRequestsOk;
|
||||
|
||||
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
|
||||
private object? _node;
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
using Shouldly;
|
||||
using ZB.MOM.NatsNet.Server;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
|
||||
|
||||
public sealed class ConsumerStateTests
|
||||
{
|
||||
private static NatsConsumer CreateConsumer()
|
||||
{
|
||||
var account = new Account { Name = "A" };
|
||||
var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!;
|
||||
return NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null)!;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProposalAndPendingRequestFlow_ShouldBehave()
|
||||
{
|
||||
var consumer = CreateConsumer();
|
||||
|
||||
consumer.Propose([1, 2, 3]);
|
||||
consumer.Propose([4]);
|
||||
consumer.LoopAndForwardProposals().ShouldBe(2);
|
||||
|
||||
consumer.AddClusterPendingRequest("r1");
|
||||
consumer.AddClusterPendingRequest("r2");
|
||||
consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(2);
|
||||
consumer.RemoveClusterPendingRequest("r2");
|
||||
consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(1);
|
||||
|
||||
consumer.SetPendingRequestsOk(false);
|
||||
consumer.PendingRequestsOk().ShouldBeFalse();
|
||||
consumer.CheckAndSetPendingRequestsOk(true).ShouldBeFalse();
|
||||
consumer.PendingRequestsOk().ShouldBeTrue();
|
||||
consumer.ReleaseAnyPendingRequests().ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DeliveredAckReplyAndAcks_ShouldBehave()
|
||||
{
|
||||
var consumer = CreateConsumer();
|
||||
|
||||
consumer.UpdateDelivered(10, 20, 2, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
|
||||
consumer.AddAckReply(20, "reply");
|
||||
consumer.UpdateAcks().ShouldBe(1);
|
||||
|
||||
var state = consumer.ReadStoredState();
|
||||
state.Delivered.Consumer.ShouldBeGreaterThanOrEqualTo(10UL);
|
||||
state.Delivered.Stream.ShouldBeGreaterThanOrEqualTo(20UL);
|
||||
state.Redelivered.ShouldNotBeNull();
|
||||
state.Redelivered!.ShouldContainKey(20UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ReplicatedQueueAndNakTermFlow_ShouldBehave()
|
||||
{
|
||||
var consumer = CreateConsumer();
|
||||
|
||||
consumer.AddReplicatedQueuedMsg(33, new JsPubMsg { Subject = "foo" });
|
||||
consumer.ProcessNak(33, 2, 1, "-NAK"u8.ToArray()).ShouldBeTrue();
|
||||
consumer.CheckRedelivered(33).ShouldBeTrue();
|
||||
consumer.ProcessNak(33, 2, 2, "-NAK"u8.ToArray()).ShouldBeTrue();
|
||||
consumer.CheckRedelivered(33).ShouldBeTrue();
|
||||
|
||||
consumer.ProcessTerm(33, 2, 2, "done", "reply").ShouldBeTrue();
|
||||
consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(30));
|
||||
consumer.AckWait(TimeSpan.FromSeconds(5)).ShouldBe(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ResetLocalStartingSeq_ShouldResetState()
|
||||
{
|
||||
var consumer = CreateConsumer();
|
||||
|
||||
consumer.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
|
||||
consumer.ResetLocalStartingSeq(100);
|
||||
|
||||
var state = consumer.GetConsumerState();
|
||||
state.Delivered.Stream.ShouldBe(100UL);
|
||||
state.AckFloor.Stream.ShouldBe(99UL);
|
||||
}
|
||||
}
|
||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user