task4: implement batch38 group C ack and delivery state
This commit is contained in:
@@ -0,0 +1,47 @@
|
|||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
public sealed partial class Account
|
||||||
|
{
|
||||||
|
internal Exception? CheckNewConsumerConfig(ConsumerConfig current, ConsumerConfig next)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(current);
|
||||||
|
ArgumentNullException.ThrowIfNull(next);
|
||||||
|
|
||||||
|
if (NatsConsumer.ConfigsEqualSansDelivery(current, next) &&
|
||||||
|
string.Equals(current.DeliverSubject, next.DeliverSubject, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (current.DeliverPolicy != next.DeliverPolicy)
|
||||||
|
return new InvalidOperationException("deliver policy can not be updated");
|
||||||
|
if (current.OptStartSeq != next.OptStartSeq)
|
||||||
|
return new InvalidOperationException("start sequence can not be updated");
|
||||||
|
if (current.OptStartTime != next.OptStartTime)
|
||||||
|
return new InvalidOperationException("start time can not be updated");
|
||||||
|
if (current.AckPolicy != next.AckPolicy)
|
||||||
|
return new InvalidOperationException("ack policy can not be updated");
|
||||||
|
if (current.ReplayPolicy != next.ReplayPolicy)
|
||||||
|
return new InvalidOperationException("replay policy can not be updated");
|
||||||
|
if (current.Heartbeat != next.Heartbeat)
|
||||||
|
return new InvalidOperationException("heart beats can not be updated");
|
||||||
|
if (current.FlowControl != next.FlowControl)
|
||||||
|
return new InvalidOperationException("flow control can not be updated");
|
||||||
|
|
||||||
|
if (!string.Equals(current.DeliverSubject, next.DeliverSubject, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(current.DeliverSubject))
|
||||||
|
return new InvalidOperationException("can not update pull consumer to push based");
|
||||||
|
if (string.IsNullOrWhiteSpace(next.DeliverSubject))
|
||||||
|
return new InvalidOperationException("can not update push consumer to pull based");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (current.MaxWaiting != next.MaxWaiting)
|
||||||
|
return new InvalidOperationException("max waiting can not be updated");
|
||||||
|
|
||||||
|
if (next.BackOff is { Length: > 0 } && next.MaxDeliver != -1 && next.BackOff.Length > next.MaxDeliver)
|
||||||
|
return new InvalidOperationException(JsApiErrors.NewJSConsumerMaxDeliverBackoffError().Description ?? "max deliver backoff invalid");
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
191
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs
Normal file
191
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs
Normal file
@@ -0,0 +1,191 @@
|
|||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
internal sealed partial class NatsConsumer
|
||||||
|
{
|
||||||
|
private static readonly byte[] AckAck = "+ACK"u8.ToArray();
|
||||||
|
private static readonly byte[] AckOk = "+OK"u8.ToArray();
|
||||||
|
private static readonly byte[] AckNak = "-NAK"u8.ToArray();
|
||||||
|
private static readonly byte[] AckProgress = "+WPI"u8.ToArray();
|
||||||
|
private static readonly byte[] AckNext = "+NXT"u8.ToArray();
|
||||||
|
|
||||||
|
private readonly Queue<JsAckMsg> _ackQueue = new();
|
||||||
|
private string? _lastAckReplySubject;
|
||||||
|
|
||||||
|
internal void SendAckReply(string subject)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(subject))
|
||||||
|
return;
|
||||||
|
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_lastAckReplySubject = subject;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal static JsAckMsg NewJSAckMsg(string subject, string reply, int headerBytes, byte[] msg)
|
||||||
|
=> new()
|
||||||
|
{
|
||||||
|
Subject = subject,
|
||||||
|
Reply = reply,
|
||||||
|
HeaderBytes = headerBytes,
|
||||||
|
Msg = msg,
|
||||||
|
};
|
||||||
|
|
||||||
|
internal void PushAck(string subject, string reply, int headerBytes, byte[] rawMessage)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(rawMessage);
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_ackQueue.Enqueue(NewJSAckMsg(subject, reply, headerBytes, (byte[])rawMessage.Clone()));
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void ProcessAck(string subject, string reply, int headerBytes, byte[] rawMessage)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(rawMessage);
|
||||||
|
|
||||||
|
var msg = headerBytes > 0 && headerBytes <= rawMessage.Length
|
||||||
|
? rawMessage[headerBytes..]
|
||||||
|
: rawMessage;
|
||||||
|
|
||||||
|
var (streamSeq, deliverySeq, deliveryCount) = AckReplyInfo(subject);
|
||||||
|
var skipAckReply = streamSeq == 0;
|
||||||
|
|
||||||
|
if (msg.Length == 0 || msg.SequenceEqual(AckAck) || msg.SequenceEqual(AckOk))
|
||||||
|
{
|
||||||
|
ProcessAckMessage(streamSeq, deliverySeq, deliveryCount, reply);
|
||||||
|
}
|
||||||
|
else if (StartsWith(msg, AckNext))
|
||||||
|
{
|
||||||
|
ProcessAckMessage(streamSeq, deliverySeq, deliveryCount, string.Empty);
|
||||||
|
skipAckReply = true;
|
||||||
|
}
|
||||||
|
else if (StartsWith(msg, AckNak))
|
||||||
|
{
|
||||||
|
_state.Redelivered ??= new Dictionary<ulong, ulong>();
|
||||||
|
_state.Redelivered[streamSeq] = _state.Redelivered.TryGetValue(streamSeq, out var redeliveries)
|
||||||
|
? redeliveries + 1
|
||||||
|
: 1UL;
|
||||||
|
skipAckReply = true;
|
||||||
|
}
|
||||||
|
else if (msg.SequenceEqual(AckProgress))
|
||||||
|
{
|
||||||
|
ProgressUpdate(streamSeq);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(reply) && !skipAckReply)
|
||||||
|
SendAckReply(reply);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void ProgressUpdate(ulong sequence)
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_state.Pending ??= new Dictionary<ulong, Pending>();
|
||||||
|
if (_state.Pending.TryGetValue(sequence, out var pending))
|
||||||
|
{
|
||||||
|
pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||||
|
_state.Pending[sequence] = pending;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void UpdateSkipped(ulong sequence)
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sequence > 0 ? sequence - 1 : 0);
|
||||||
|
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, sequence > 0 ? sequence - 1 : 0);
|
||||||
|
_updateChannel.Writer.TryWrite(true);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ProcessAckMessage(ulong streamSeq, ulong deliverySeq, ulong deliveryCount, string reply)
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_state.Pending ??= new Dictionary<ulong, Pending>();
|
||||||
|
_state.Pending.Remove(streamSeq);
|
||||||
|
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSeq);
|
||||||
|
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, deliverySeq);
|
||||||
|
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSeq);
|
||||||
|
_state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, deliverySeq);
|
||||||
|
|
||||||
|
_state.Redelivered ??= new Dictionary<ulong, ulong>();
|
||||||
|
if (deliveryCount > 1)
|
||||||
|
_state.Redelivered[streamSeq] = deliveryCount;
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(reply))
|
||||||
|
_lastAckReplySubject = reply;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount) AckReplyInfo(string subject)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(subject))
|
||||||
|
return (0, 0, 0);
|
||||||
|
|
||||||
|
var numbers = subject
|
||||||
|
.Split('.', StringSplitOptions.RemoveEmptyEntries)
|
||||||
|
.Select(static token => ulong.TryParse(token, out var value) ? (ulong?)value : null)
|
||||||
|
.Where(static value => value.HasValue)
|
||||||
|
.Select(static value => value!.Value)
|
||||||
|
.ToArray();
|
||||||
|
|
||||||
|
if (numbers.Length >= 3)
|
||||||
|
return (numbers[^2], numbers[^3], numbers[^1]);
|
||||||
|
if (numbers.Length == 2)
|
||||||
|
return (numbers[1], numbers[0], 1);
|
||||||
|
if (numbers.Length == 1)
|
||||||
|
return (numbers[0], numbers[0], 1);
|
||||||
|
|
||||||
|
return (0, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static bool StartsWith(byte[] message, byte[] prefix)
|
||||||
|
{
|
||||||
|
if (message.Length < prefix.Length)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
for (var i = 0; i < prefix.Length; i++)
|
||||||
|
{
|
||||||
|
if (message[i] != prefix[i])
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal sealed class JsAckMsg
|
||||||
|
{
|
||||||
|
internal string Subject { get; set; } = string.Empty;
|
||||||
|
internal string Reply { get; set; } = string.Empty;
|
||||||
|
internal int HeaderBytes { get; set; }
|
||||||
|
internal byte[] Msg { get; set; } = [];
|
||||||
|
}
|
||||||
@@ -134,17 +134,6 @@ internal sealed partial class NatsConsumer
|
|||||||
|
|
||||||
internal bool HasDeliveryInterest()
|
internal bool HasDeliveryInterest()
|
||||||
{
|
{
|
||||||
_mu.EnterReadLock();
|
return HasDeliveryInterest(_hasLocalDeliveryInterest);
|
||||||
try
|
|
||||||
{
|
|
||||||
if (_closed || string.IsNullOrWhiteSpace(Config.DeliverSubject))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
return _internalSubscriptions.Contains(Config.DeliverSubject!);
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_mu.ExitReadLock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
342
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs
Normal file
342
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs
Normal file
@@ -0,0 +1,342 @@
|
|||||||
|
using System.Text.Json;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
internal sealed partial class NatsConsumer
|
||||||
|
{
|
||||||
|
private static readonly TimeSpan DefaultGatewayInterestInterval = TimeSpan.FromSeconds(1);
|
||||||
|
|
||||||
|
internal bool UpdateDeliveryInterest(bool localInterest)
|
||||||
|
{
|
||||||
|
var interest = HasDeliveryInterest(localInterest);
|
||||||
|
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_hasLocalDeliveryInterest = localInterest;
|
||||||
|
if (_closed || IsPullMode())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
var wasActive = !IsPullMode() && _isLeader;
|
||||||
|
if (interest && !wasActive)
|
||||||
|
_updateChannel.Writer.TryWrite(true);
|
||||||
|
|
||||||
|
if (!interest)
|
||||||
|
_isPaused = false;
|
||||||
|
|
||||||
|
if (_deleteTimer != null && _deleteThreshold > TimeSpan.Zero && !interest)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
_deleteTimer?.Dispose();
|
||||||
|
_deleteTimer = null;
|
||||||
|
|
||||||
|
if (!interest && _deleteThreshold > TimeSpan.Zero)
|
||||||
|
{
|
||||||
|
_deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void DeleteNotActive()
|
||||||
|
{
|
||||||
|
_mu.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (_closed)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (IsPushMode())
|
||||||
|
{
|
||||||
|
if (HasDeliveryInterest())
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (_state.Pending is { Count: > 0 })
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitReadLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
Delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void WatchGWinterest()
|
||||||
|
{
|
||||||
|
var wasActive = HasDeliveryInterest();
|
||||||
|
if (!_hasLocalDeliveryInterest)
|
||||||
|
{
|
||||||
|
UpdateDeliveryInterest(localInterest: false);
|
||||||
|
if (!wasActive && HasDeliveryInterest())
|
||||||
|
_updateChannel.Writer.TryWrite(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_gatewayWatchTimer?.Dispose();
|
||||||
|
_gatewayWatchTimer = new Timer(static s => ((NatsConsumer)s!).WatchGWinterest(), this, DefaultGatewayInterestInterval, Timeout.InfiniteTimeSpan);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal bool HasMaxDeliveries(ulong sequence)
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (Config.MaxDeliver <= 0)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
_state.Redelivered ??= new Dictionary<ulong, ulong>();
|
||||||
|
_state.Pending ??= new Dictionary<ulong, Pending>();
|
||||||
|
|
||||||
|
var deliveryCount = _state.Redelivered.TryGetValue(sequence, out var redeliveries)
|
||||||
|
? redeliveries + 1
|
||||||
|
: 1UL;
|
||||||
|
|
||||||
|
if (deliveryCount < (ulong)Config.MaxDeliver)
|
||||||
|
{
|
||||||
|
_state.Redelivered[sequence] = deliveryCount;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_state.Redelivered[sequence] = deliveryCount;
|
||||||
|
_state.Pending.Remove(sequence);
|
||||||
|
_updateChannel.Writer.TryWrite(true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void ForceExpirePending()
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (_state.Pending is not { Count: > 0 })
|
||||||
|
return;
|
||||||
|
|
||||||
|
_state.Redelivered ??= new Dictionary<ulong, ulong>();
|
||||||
|
foreach (var seq in _state.Pending.Keys.ToArray())
|
||||||
|
{
|
||||||
|
if (HasMaxDeliveries(seq))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
_state.Redelivered[seq] = _state.Redelivered.TryGetValue(seq, out var current)
|
||||||
|
? current + 1
|
||||||
|
: 1UL;
|
||||||
|
|
||||||
|
_state.Pending[seq].Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||||
|
}
|
||||||
|
|
||||||
|
_updateChannel.Writer.TryWrite(true);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void SetRateLimitNeedsLocks()
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
SetRateLimit(Config.RateLimit);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void SetRateLimit(ulong bitsPerSecond)
|
||||||
|
{
|
||||||
|
if (bitsPerSecond == 0)
|
||||||
|
{
|
||||||
|
Interlocked.Exchange(ref _rateLimitBitsPerSecond, 0);
|
||||||
|
_rateLimitBurstBytes = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Interlocked.Exchange(ref _rateLimitBitsPerSecond, (long)bitsPerSecond);
|
||||||
|
var configuredMax = _streamRef?.Config.MaxMsgSize ?? 0;
|
||||||
|
_rateLimitBurstBytes = configuredMax > 0
|
||||||
|
? configuredMax
|
||||||
|
: (int)Math.Max(1024UL, bitsPerSecond / 8UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal bool UpdateDeliverSubject(string? newDeliver)
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return UpdateDeliverSubjectLocked(newDeliver);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal bool UpdateDeliverSubjectLocked(string? newDeliver)
|
||||||
|
{
|
||||||
|
if (_closed || IsPullMode() || string.Equals(Config.DeliverSubject, newDeliver, StringComparison.Ordinal))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (_state.Pending is { Count: > 0 })
|
||||||
|
ForceExpirePending();
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(Config.DeliverSubject))
|
||||||
|
_internalSubscriptions.Remove(Config.DeliverSubject!);
|
||||||
|
|
||||||
|
Config.DeliverSubject = string.IsNullOrWhiteSpace(newDeliver) ? null : newDeliver;
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(Config.DeliverSubject))
|
||||||
|
_internalSubscriptions.Add(Config.DeliverSubject!);
|
||||||
|
|
||||||
|
_updateChannel.Writer.TryWrite(true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal static bool ConfigsEqualSansDelivery(ConsumerConfig left, ConsumerConfig right)
|
||||||
|
{
|
||||||
|
var l = CloneConfig(left);
|
||||||
|
var r = CloneConfig(right);
|
||||||
|
l.DeliverSubject = null;
|
||||||
|
r.DeliverSubject = null;
|
||||||
|
|
||||||
|
return JsonSerializer.Serialize(l) == JsonSerializer.Serialize(r);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal (ulong Sequence, bool CanRespond, Exception? Error) ResetStartingSeq(ulong sequence, string? reply)
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (sequence == 0)
|
||||||
|
{
|
||||||
|
sequence = _state.AckFloor.Stream + 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
switch (Config.DeliverPolicy)
|
||||||
|
{
|
||||||
|
case DeliverPolicy.DeliverAll:
|
||||||
|
break;
|
||||||
|
case DeliverPolicy.DeliverByStartSequence when sequence < Config.OptStartSeq:
|
||||||
|
return (0, false, new InvalidOperationException("below start seq"));
|
||||||
|
case DeliverPolicy.DeliverByStartTime when Config.OptStartTime.HasValue:
|
||||||
|
if (sequence == 0)
|
||||||
|
return (0, false, new InvalidOperationException("below start time"));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return (0, false, new InvalidOperationException("not allowed"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sequence == 0)
|
||||||
|
sequence = 1;
|
||||||
|
|
||||||
|
_state.Delivered.Stream = sequence;
|
||||||
|
_state.Delivered.Consumer = sequence;
|
||||||
|
_state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0;
|
||||||
|
_state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0;
|
||||||
|
_state.Pending = new Dictionary<ulong, Pending>();
|
||||||
|
_state.Redelivered = new Dictionary<ulong, ulong>();
|
||||||
|
_updateChannel.Writer.TryWrite(true);
|
||||||
|
|
||||||
|
_ = reply;
|
||||||
|
return (sequence, true, null);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool HasDeliveryInterest(bool localInterest)
|
||||||
|
{
|
||||||
|
_mu.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(Config.DeliverSubject))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (localInterest)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (_streamRef?.Account is not { } account)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (account.Server is NatsServer server && server.HasGatewayInterest(account, Config.DeliverSubject!))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
return _internalSubscriptions.Contains(Config.DeliverSubject!);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitReadLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool IsPullMode() => string.IsNullOrWhiteSpace(Config.DeliverSubject);
|
||||||
|
|
||||||
|
private bool IsPushMode() => !IsPullMode();
|
||||||
|
|
||||||
|
private static ConsumerConfig CloneConfig(ConsumerConfig cfg) =>
|
||||||
|
new()
|
||||||
|
{
|
||||||
|
Durable = cfg.Durable,
|
||||||
|
Name = cfg.Name,
|
||||||
|
Description = cfg.Description,
|
||||||
|
DeliverPolicy = cfg.DeliverPolicy,
|
||||||
|
OptStartSeq = cfg.OptStartSeq,
|
||||||
|
OptStartTime = cfg.OptStartTime,
|
||||||
|
AckPolicy = cfg.AckPolicy,
|
||||||
|
AckWait = cfg.AckWait,
|
||||||
|
MaxDeliver = cfg.MaxDeliver,
|
||||||
|
BackOff = cfg.BackOff?.ToArray(),
|
||||||
|
FilterSubject = cfg.FilterSubject,
|
||||||
|
FilterSubjects = cfg.FilterSubjects?.ToArray(),
|
||||||
|
ReplayPolicy = cfg.ReplayPolicy,
|
||||||
|
RateLimit = cfg.RateLimit,
|
||||||
|
SampleFrequency = cfg.SampleFrequency,
|
||||||
|
MaxWaiting = cfg.MaxWaiting,
|
||||||
|
MaxAckPending = cfg.MaxAckPending,
|
||||||
|
FlowControl = cfg.FlowControl,
|
||||||
|
HeadersOnly = cfg.HeadersOnly,
|
||||||
|
MaxRequestBatch = cfg.MaxRequestBatch,
|
||||||
|
MaxRequestExpires = cfg.MaxRequestExpires,
|
||||||
|
MaxRequestMaxBytes = cfg.MaxRequestMaxBytes,
|
||||||
|
DeliverSubject = cfg.DeliverSubject,
|
||||||
|
DeliverGroup = cfg.DeliverGroup,
|
||||||
|
Heartbeat = cfg.Heartbeat,
|
||||||
|
InactiveThreshold = cfg.InactiveThreshold,
|
||||||
|
Replicas = cfg.Replicas,
|
||||||
|
MemoryStorage = cfg.MemoryStorage,
|
||||||
|
Direct = cfg.Direct,
|
||||||
|
Metadata = cfg.Metadata is null ? null : new Dictionary<string, string>(cfg.Metadata),
|
||||||
|
PauseUntil = cfg.PauseUntil,
|
||||||
|
PriorityGroups = cfg.PriorityGroups?.ToArray(),
|
||||||
|
PriorityPolicy = cfg.PriorityPolicy,
|
||||||
|
PinnedTTL = cfg.PinnedTTL,
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -43,6 +43,11 @@ internal sealed partial class NatsConsumer : IDisposable
|
|||||||
private DateTime _lostQuorumSent;
|
private DateTime _lostQuorumSent;
|
||||||
private TimeSpan _deleteThreshold;
|
private TimeSpan _deleteThreshold;
|
||||||
private bool _isPaused;
|
private bool _isPaused;
|
||||||
|
private Timer? _deleteTimer;
|
||||||
|
private Timer? _gatewayWatchTimer;
|
||||||
|
private bool _hasLocalDeliveryInterest;
|
||||||
|
private long _rateLimitBitsPerSecond;
|
||||||
|
private int _rateLimitBurstBytes;
|
||||||
|
|
||||||
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
|
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
|
||||||
private object? _node;
|
private object? _node;
|
||||||
|
|||||||
@@ -239,6 +239,16 @@ public sealed class JsPubMsg
|
|||||||
|
|
||||||
/// <summary>Sync/ack channel (opaque, set at runtime).</summary>
|
/// <summary>Sync/ack channel (opaque, set at runtime).</summary>
|
||||||
public object? Sync { get; set; }
|
public object? Sync { get; set; }
|
||||||
|
|
||||||
|
public void ReturnToPool()
|
||||||
|
{
|
||||||
|
Subject = string.Empty;
|
||||||
|
Reply = null;
|
||||||
|
Hdr = null;
|
||||||
|
Msg = null;
|
||||||
|
Pa = null;
|
||||||
|
Sync = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -296,4 +296,149 @@ public sealed class NatsConsumerTests
|
|||||||
consumer.HandleClusterConsumerInfoRequest().ShouldNotBeNull();
|
consumer.HandleClusterConsumerInfoRequest().ShouldNotBeNull();
|
||||||
consumer.ClearNode();
|
consumer.ClearNode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void UpdateDeliveryInterest_AndDeleteNotActive_ShouldReflectInterestState()
|
||||||
|
{
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null);
|
||||||
|
stream.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var consumer = NatsConsumer.Create(
|
||||||
|
stream!,
|
||||||
|
new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo", InactiveThreshold = TimeSpan.FromMilliseconds(20) },
|
||||||
|
ConsumerAction.Create,
|
||||||
|
null);
|
||||||
|
consumer.ShouldNotBeNull();
|
||||||
|
|
||||||
|
consumer!.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromMilliseconds(20) });
|
||||||
|
consumer.UpdateDeliveryInterest(localInterest: false).ShouldBeTrue();
|
||||||
|
Thread.Sleep(40);
|
||||||
|
consumer.IsClosed().ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void WatchGWinterest_AndRateLimit_ShouldExecuteWithoutErrors()
|
||||||
|
{
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"], MaxMsgSize = 4096 }, null, null, null, null);
|
||||||
|
stream.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var consumer = NatsConsumer.Create(
|
||||||
|
stream!,
|
||||||
|
new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo", RateLimit = 8_000 },
|
||||||
|
ConsumerAction.Create,
|
||||||
|
null);
|
||||||
|
consumer.ShouldNotBeNull();
|
||||||
|
|
||||||
|
consumer!.SetRateLimitNeedsLocks();
|
||||||
|
consumer.WatchGWinterest();
|
||||||
|
consumer.SubscribeInternal("deliver.foo").ShouldBeTrue();
|
||||||
|
consumer.UpdateDeliveryInterest(localInterest: true).ShouldBeFalse();
|
||||||
|
consumer.HasDeliveryInterest().ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AccountCheckNewConsumerConfig_InvalidPolicyChanges_ShouldFail()
|
||||||
|
{
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
var current = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, DeliverPolicy = DeliverPolicy.DeliverAll };
|
||||||
|
var next = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll, DeliverPolicy = DeliverPolicy.DeliverAll };
|
||||||
|
|
||||||
|
var err = account.CheckNewConsumerConfig(current, next);
|
||||||
|
|
||||||
|
err.ShouldNotBeNull();
|
||||||
|
err.Message.ShouldContain("ack policy");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void UpdateDeliverSubject_AndConfigsEqualSansDelivery_ShouldBehave()
|
||||||
|
{
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null);
|
||||||
|
stream.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var cfg = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit };
|
||||||
|
var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null);
|
||||||
|
consumer.ShouldNotBeNull();
|
||||||
|
|
||||||
|
consumer!.SubscribeInternal("deliver.a");
|
||||||
|
consumer.UpdateDeliverSubject("deliver.b").ShouldBeTrue();
|
||||||
|
consumer.SubscribeInternal("deliver.b");
|
||||||
|
consumer.HasDeliveryInterest().ShouldBeTrue();
|
||||||
|
|
||||||
|
var left = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit };
|
||||||
|
var right = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.b", AckPolicy = AckPolicy.AckExplicit };
|
||||||
|
NatsConsumer.ConfigsEqualSansDelivery(left, right).ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AckFlow_NewMessage_Push_Process_Progress_UpdateSkipped_ShouldBehave()
|
||||||
|
{
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null);
|
||||||
|
stream.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null);
|
||||||
|
consumer.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var ack = NatsConsumer.NewJSAckMsg("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray());
|
||||||
|
ack.Subject.ShouldBe("a.b.10.20.1");
|
||||||
|
|
||||||
|
consumer!.PushAck("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray());
|
||||||
|
consumer.ProcessAck("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray());
|
||||||
|
consumer.ProgressUpdate(10);
|
||||||
|
consumer.UpdateSkipped(25);
|
||||||
|
|
||||||
|
var state = consumer.GetConsumerState();
|
||||||
|
state.AckFloor.Stream.ShouldBeGreaterThanOrEqualTo(10UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void HasMaxDeliveries_ForceExpirePending_AndResetStartingSeq_ShouldBehave()
|
||||||
|
{
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null);
|
||||||
|
stream.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var consumer = NatsConsumer.Create(
|
||||||
|
stream!,
|
||||||
|
new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, MaxDeliver = 2, DeliverPolicy = DeliverPolicy.DeliverAll },
|
||||||
|
ConsumerAction.Create,
|
||||||
|
null);
|
||||||
|
consumer.ShouldNotBeNull();
|
||||||
|
|
||||||
|
consumer!.ProcessAck("a.b.5.7.1", "reply", 0, "-NAK"u8.ToArray());
|
||||||
|
consumer.HasMaxDeliveries(5).ShouldBeFalse();
|
||||||
|
consumer.HasMaxDeliveries(5).ShouldBeTrue();
|
||||||
|
consumer.ForceExpirePending();
|
||||||
|
|
||||||
|
var (seq, canRespond, err) = consumer.ResetStartingSeq(10, "reply");
|
||||||
|
err.ShouldBeNull();
|
||||||
|
seq.ShouldBe(10UL);
|
||||||
|
canRespond.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void JsPubMsg_ReturnToPool_ShouldResetState()
|
||||||
|
{
|
||||||
|
var msg = new JsPubMsg
|
||||||
|
{
|
||||||
|
Subject = "foo",
|
||||||
|
Reply = "bar",
|
||||||
|
Hdr = [1, 2],
|
||||||
|
Msg = [3, 4],
|
||||||
|
Pa = new object(),
|
||||||
|
Sync = new object(),
|
||||||
|
};
|
||||||
|
|
||||||
|
msg.ReturnToPool();
|
||||||
|
|
||||||
|
msg.Subject.ShouldBe(string.Empty);
|
||||||
|
msg.Reply.ShouldBeNull();
|
||||||
|
msg.Hdr.ShouldBeNull();
|
||||||
|
msg.Msg.ShouldBeNull();
|
||||||
|
msg.Pa.ShouldBeNull();
|
||||||
|
msg.Sync.ShouldBeNull();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user