feat(batch38): merge consumer-lifecycle

This commit is contained in:
Joseph Doherty
2026-03-01 00:57:36 -05:00
29 changed files with 3235 additions and 5 deletions

View File

@@ -0,0 +1,47 @@
namespace ZB.MOM.NatsNet.Server;
public sealed partial class Account
{
internal Exception? CheckNewConsumerConfig(ConsumerConfig current, ConsumerConfig next)
{
ArgumentNullException.ThrowIfNull(current);
ArgumentNullException.ThrowIfNull(next);
if (NatsConsumer.ConfigsEqualSansDelivery(current, next) &&
string.Equals(current.DeliverSubject, next.DeliverSubject, StringComparison.Ordinal))
{
return null;
}
if (current.DeliverPolicy != next.DeliverPolicy)
return new InvalidOperationException("deliver policy can not be updated");
if (current.OptStartSeq != next.OptStartSeq)
return new InvalidOperationException("start sequence can not be updated");
if (current.OptStartTime != next.OptStartTime)
return new InvalidOperationException("start time can not be updated");
if (current.AckPolicy != next.AckPolicy)
return new InvalidOperationException("ack policy can not be updated");
if (current.ReplayPolicy != next.ReplayPolicy)
return new InvalidOperationException("replay policy can not be updated");
if (current.Heartbeat != next.Heartbeat)
return new InvalidOperationException("heart beats can not be updated");
if (current.FlowControl != next.FlowControl)
return new InvalidOperationException("flow control can not be updated");
if (!string.Equals(current.DeliverSubject, next.DeliverSubject, StringComparison.Ordinal))
{
if (string.IsNullOrWhiteSpace(current.DeliverSubject))
return new InvalidOperationException("can not update pull consumer to push based");
if (string.IsNullOrWhiteSpace(next.DeliverSubject))
return new InvalidOperationException("can not update push consumer to pull based");
}
if (current.MaxWaiting != next.MaxWaiting)
return new InvalidOperationException("max waiting can not be updated");
if (next.BackOff is { Length: > 0 } && next.MaxDeliver != -1 && next.BackOff.Length > next.MaxDeliver)
return new InvalidOperationException(JsApiErrors.NewJSConsumerMaxDeliverBackoffError().Description ?? "max deliver backoff invalid");
return null;
}
}

View File

@@ -0,0 +1,10 @@
namespace ZB.MOM.NatsNet.Server;
internal static class SubjectTokens
{
internal static string[] Subjects(IEnumerable<string> filters)
{
ArgumentNullException.ThrowIfNull(filters);
return filters.Where(static filter => !string.IsNullOrWhiteSpace(filter)).ToArray();
}
}

View File

@@ -0,0 +1,262 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
private static readonly byte[] AckAck = "+ACK"u8.ToArray();
private static readonly byte[] AckOk = "+OK"u8.ToArray();
private static readonly byte[] AckNak = "-NAK"u8.ToArray();
private static readonly byte[] AckProgress = "+WPI"u8.ToArray();
private static readonly byte[] AckNext = "+NXT"u8.ToArray();
private readonly Queue<JsAckMsg> _ackQueue = new();
private string? _lastAckReplySubject;
internal void SendAckReply(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return;
_mu.EnterWriteLock();
try
{
_lastAckReplySubject = subject;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static JsAckMsg NewJSAckMsg(string subject, string reply, int headerBytes, byte[] msg)
=> new()
{
Subject = subject,
Reply = reply,
HeaderBytes = headerBytes,
Msg = msg,
};
internal void PushAck(string subject, string reply, int headerBytes, byte[] rawMessage)
{
ArgumentNullException.ThrowIfNull(rawMessage);
_mu.EnterWriteLock();
try
{
_ackQueue.Enqueue(NewJSAckMsg(subject, reply, headerBytes, (byte[])rawMessage.Clone()));
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ProcessAck(string subject, string reply, int headerBytes, byte[] rawMessage)
{
ArgumentNullException.ThrowIfNull(rawMessage);
var msg = headerBytes > 0 && headerBytes <= rawMessage.Length
? rawMessage[headerBytes..]
: rawMessage;
var (streamSeq, deliverySeq, deliveryCount) = AckReplyInfo(subject);
var skipAckReply = streamSeq == 0;
if (msg.Length == 0 || msg.SequenceEqual(AckAck) || msg.SequenceEqual(AckOk))
{
ProcessAckMessage(streamSeq, deliverySeq, deliveryCount, reply);
}
else if (StartsWith(msg, AckNext))
{
ProcessAckMessage(streamSeq, deliverySeq, deliveryCount, string.Empty);
skipAckReply = true;
}
else if (StartsWith(msg, AckNak))
{
_state.Redelivered ??= new Dictionary<ulong, ulong>();
_state.Redelivered[streamSeq] = _state.Redelivered.TryGetValue(streamSeq, out var redeliveries)
? redeliveries + 1
: 1UL;
skipAckReply = true;
}
else if (msg.SequenceEqual(AckProgress))
{
ProgressUpdate(streamSeq);
}
if (!string.IsNullOrWhiteSpace(reply) && !skipAckReply)
SendAckReply(reply);
}
internal bool ProcessAckMsg(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reply, bool doSample)
{
ProcessAckMessage(streamSequence, deliverySequence, deliveryCount, reply);
if (doSample && NeedAck())
SampleAck(reply);
return true;
}
internal bool ProcessNak(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, byte[] message)
{
ArgumentNullException.ThrowIfNull(message);
_mu.EnterWriteLock();
try
{
_state.Redelivered ??= [];
_state.Redelivered[streamSequence] = Math.Max(deliveryCount + 1, _state.Redelivered.GetValueOrDefault(streamSequence));
_state.Pending ??= [];
_state.Pending[streamSequence] = new Pending
{
Sequence = deliverySequence,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ProcessTerm(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reason, string reply)
{
_mu.EnterWriteLock();
try
{
_state.Pending?.Remove(streamSequence);
_state.Redelivered ??= [];
_state.Redelivered[streamSequence] = Math.Max(deliveryCount, _state.Redelivered.GetValueOrDefault(streamSequence));
if (!string.IsNullOrWhiteSpace(reply))
_lastAckReplySubject = reply;
_ = reason;
_ = deliverySequence;
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal TimeSpan AckWait(TimeSpan backOff)
{
if (backOff > TimeSpan.Zero)
return backOff;
return Config.AckWait > TimeSpan.Zero ? Config.AckWait : DefaultAckWait;
}
internal bool CheckRedelivered(ulong streamSequence)
{
_mu.EnterReadLock();
try
{
return _state.Redelivered?.TryGetValue(streamSequence, out var count) == true && count > 1;
}
finally
{
_mu.ExitReadLock();
}
}
internal void ProgressUpdate(ulong sequence)
{
_mu.EnterWriteLock();
try
{
_state.Pending ??= new Dictionary<ulong, Pending>();
if (_state.Pending.TryGetValue(sequence, out var pending))
{
pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
_state.Pending[sequence] = pending;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal void UpdateSkipped(ulong sequence)
{
_mu.EnterWriteLock();
try
{
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sequence > 0 ? sequence - 1 : 0);
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, sequence > 0 ? sequence - 1 : 0);
_updateChannel.Writer.TryWrite(true);
}
finally
{
_mu.ExitWriteLock();
}
}
private void ProcessAckMessage(ulong streamSeq, ulong deliverySeq, ulong deliveryCount, string reply)
{
_mu.EnterWriteLock();
try
{
_state.Pending ??= new Dictionary<ulong, Pending>();
_state.Pending.Remove(streamSeq);
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSeq);
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, deliverySeq);
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSeq);
_state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, deliverySeq);
_state.Redelivered ??= new Dictionary<ulong, ulong>();
if (deliveryCount > 1)
_state.Redelivered[streamSeq] = deliveryCount;
if (!string.IsNullOrWhiteSpace(reply))
_lastAckReplySubject = reply;
}
finally
{
_mu.ExitWriteLock();
}
}
private static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount) AckReplyInfo(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return (0, 0, 0);
var numbers = subject
.Split('.', StringSplitOptions.RemoveEmptyEntries)
.Select(static token => ulong.TryParse(token, out var value) ? (ulong?)value : null)
.Where(static value => value.HasValue)
.Select(static value => value!.Value)
.ToArray();
if (numbers.Length >= 3)
return (numbers[^2], numbers[^3], numbers[^1]);
if (numbers.Length == 2)
return (numbers[1], numbers[0], 1);
if (numbers.Length == 1)
return (numbers[0], numbers[0], 1);
return (0, 0, 0);
}
private static bool StartsWith(byte[] message, byte[] prefix)
{
if (message.Length < prefix.Length)
return false;
for (var i = 0; i < prefix.Length; i++)
{
if (message[i] != prefix[i])
return false;
}
return true;
}
}
internal sealed class JsAckMsg
{
internal string Subject { get; set; } = string.Empty;
internal string Reply { get; set; } = string.Empty;
internal int HeaderBytes { get; set; }
internal byte[] Msg { get; set; } = [];
}

View File

@@ -0,0 +1,44 @@
using System.Text;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
private string? _lastAdvisorySubject;
private byte[]? _lastAdvisoryPayload;
private DateTime _lastAdvisorySent;
internal bool SendAdvisory(string subject, object advisory)
{
if (string.IsNullOrWhiteSpace(subject) || advisory is null)
return false;
_mu.EnterWriteLock();
try
{
_lastAdvisorySubject = subject;
_lastAdvisoryPayload = Encoding.UTF8.GetBytes(advisory.ToString() ?? string.Empty);
_lastAdvisorySent = DateTime.UtcNow;
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool SendDeleteAdvisoryLocked() =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerDeleted}.{Stream}.{Name}", new { action = "delete" });
internal bool SendPinnedAdvisoryLocked(string pinId) =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPinned}.{Stream}.{Name}", new { pin = pinId });
internal bool SendUnpinnedAdvisoryLocked(string pinId) =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerUnpinned}.{Stream}.{Name}", new { pin = pinId });
internal bool SendCreateAdvisory() =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerCreated}.{Stream}.{Name}", new { action = "create" });
internal bool SendPauseAdvisoryLocked(DateTime pauseUntil) =>
SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPause}.{Stream}.{Name}", new { pauseUntil });
}

View File

@@ -0,0 +1,268 @@
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal const int DefaultMaxAckPending = 1000;
internal static readonly TimeSpan DefaultAckWait = TimeSpan.FromSeconds(30);
internal static readonly TimeSpan DefaultDeleteWait = TimeSpan.FromSeconds(5);
internal static readonly TimeSpan DefaultPinnedTtl = TimeSpan.FromMinutes(2);
internal static JsApiError? SetConsumerConfigDefaults(
ConsumerConfig config,
StreamConfig streamConfig,
JetStreamAccountLimits? selectedLimits,
bool pedantic)
{
ArgumentNullException.ThrowIfNull(config);
ArgumentNullException.ThrowIfNull(streamConfig);
var streamReplicas = Math.Max(1, streamConfig.Replicas);
if (config.MaxDeliver is 0 or < -1)
{
if (pedantic && config.MaxDeliver < -1)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_deliver must be set to -1"));
config.MaxDeliver = -1;
}
if (config.MaxWaiting < 0)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_waiting must not be negative"));
config.MaxWaiting = 0;
}
if (config.MaxAckPending < -1)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_ack_pending must be set to -1"));
config.MaxAckPending = -1;
}
if (config.MaxRequestBatch < 0)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_batch must not be negative"));
config.MaxRequestBatch = 0;
}
if (config.MaxRequestExpires < TimeSpan.Zero)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_expires must not be negative"));
config.MaxRequestExpires = TimeSpan.Zero;
}
if (config.MaxRequestMaxBytes < 0)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_bytes must not be negative"));
config.MaxRequestMaxBytes = 0;
}
if (config.Heartbeat < TimeSpan.Zero)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("idle_heartbeat must not be negative"));
config.Heartbeat = TimeSpan.Zero;
}
if (config.InactiveThreshold < TimeSpan.Zero)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("inactive_threshold must not be negative"));
config.InactiveThreshold = TimeSpan.Zero;
}
if (config.PinnedTTL < TimeSpan.Zero)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("priority_timeout must not be negative"));
config.PinnedTTL = TimeSpan.Zero;
}
if (config.AckWait == TimeSpan.Zero)
config.AckWait = DefaultAckWait;
if (config.MaxAckPending == 0 && config.AckPolicy != AckPolicy.AckNone)
{
config.MaxAckPending = selectedLimits?.MaxAckPending > 0
? selectedLimits.MaxAckPending
: DefaultMaxAckPending;
}
if (config.InactiveThreshold == TimeSpan.Zero && string.IsNullOrWhiteSpace(config.Durable))
config.InactiveThreshold = DefaultDeleteWait;
if (config.PinnedTTL == TimeSpan.Zero && config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient)
config.PinnedTTL = DefaultPinnedTtl;
if (config.Replicas == 0 || config.Replicas > streamReplicas)
config.Replicas = streamReplicas;
if (!string.IsNullOrWhiteSpace(config.Name) && string.IsNullOrWhiteSpace(config.Durable))
config.Durable = config.Name;
return null;
}
internal static JsApiError? CheckConsumerCfg(
ConsumerConfig config,
StreamConfig streamConfig,
JetStreamAccountLimits? selectedLimits,
bool isRecovering)
{
ArgumentNullException.ThrowIfNull(config);
ArgumentNullException.ThrowIfNull(streamConfig);
var streamReplicas = Math.Max(1, streamConfig.Replicas);
if (!string.IsNullOrWhiteSpace(config.Durable) &&
!string.IsNullOrWhiteSpace(config.Name) &&
!string.Equals(config.Durable, config.Name, StringComparison.Ordinal))
{
return JsApiErrors.NewJSConsumerCreateDurableAndNameMismatchError();
}
if (HasPathSeparators(config.Durable) || HasPathSeparators(config.Name))
return JsApiErrors.NewJSConsumerNameContainsPathSeparatorsError();
if (config.Replicas > streamReplicas)
return JsApiErrors.NewJSConsumerReplicasExceedsStreamError();
if (!Enum.IsDefined(config.AckPolicy))
return JsApiErrors.NewJSConsumerAckPolicyInvalidError();
if (!Enum.IsDefined(config.ReplayPolicy))
return JsApiErrors.NewJSConsumerReplayPolicyInvalidError();
if (!Enum.IsDefined(config.DeliverPolicy))
return JsApiErrors.NewJSConsumerInvalidPolicyError(new InvalidOperationException("deliver policy invalid"));
if (config.FilterSubjects is { Length: > 0 } && !string.IsNullOrWhiteSpace(config.FilterSubject))
return JsApiErrors.NewJSConsumerDuplicateFilterSubjectsError();
var filters = config.FilterSubjects is { Length: > 0 }
? SubjectTokens.Subjects(config.FilterSubjects)
: (string.IsNullOrWhiteSpace(config.FilterSubject) ? [] : [config.FilterSubject]);
for (var i = 0; i < filters.Length; i++)
{
if (string.IsNullOrWhiteSpace(filters[i]))
return JsApiErrors.NewJSConsumerEmptyFilterError();
if (!SubscriptionIndex.IsValidSubject(filters[i]))
return JsApiErrors.NewJSConsumerFilterNotSubsetError();
for (var j = i + 1; j < filters.Length; j++)
{
if (SubscriptionIndex.SubjectsCollide(filters[i], filters[j]))
return JsApiErrors.NewJSConsumerOverlappingSubjectFiltersError();
}
}
var isPush = !string.IsNullOrWhiteSpace(config.DeliverSubject);
if (isPush)
{
if (!SubscriptionIndex.IsValidSubject(config.DeliverSubject!))
return JsApiErrors.NewJSConsumerInvalidDeliverSubjectError();
if (SubscriptionIndex.SubjectHasWildcard(config.DeliverSubject!))
return JsApiErrors.NewJSConsumerDeliverToWildcardsError();
if (config.MaxWaiting > 0)
return JsApiErrors.NewJSConsumerPushMaxWaitingError();
}
else
{
if (config.RateLimit > 0)
return JsApiErrors.NewJSConsumerPullWithRateLimitError();
}
if (config.MaxAckPending > 0 && selectedLimits?.MaxAckPending > 0 && config.MaxAckPending > selectedLimits.MaxAckPending)
return JsApiErrors.NewJSConsumerMaxPendingAckExcessError(selectedLimits.MaxAckPending);
if (streamConfig.Retention == RetentionPolicy.WorkQueuePolicy && config.AckPolicy != AckPolicy.AckExplicit)
return JsApiErrors.NewJSConsumerWQRequiresExplicitAckError();
if (config.Direct)
{
if (isPush)
return JsApiErrors.NewJSConsumerDirectRequiresPushError();
if (!string.IsNullOrWhiteSpace(config.Durable))
return JsApiErrors.NewJSConsumerDirectRequiresEphemeralError();
}
_ = isRecovering;
return null;
}
internal void UpdateInactiveThreshold(ConsumerConfig config)
{
ArgumentNullException.ThrowIfNull(config);
_mu.EnterWriteLock();
try
{
_deleteThreshold = config.InactiveThreshold > TimeSpan.Zero
? config.InactiveThreshold
: DefaultDeleteWait;
Config.InactiveThreshold = _deleteThreshold;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void UpdatePauseState(ConsumerConfig config, DateTime? nowUtc = null)
{
ArgumentNullException.ThrowIfNull(config);
var now = nowUtc ?? DateTime.UtcNow;
_mu.EnterWriteLock();
try
{
Config.PauseUntil = config.PauseUntil;
_isPaused = config.PauseUntil.HasValue && config.PauseUntil.Value > now;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ConsumerAssignment? ConsumerAssignment()
{
_mu.EnterReadLock();
try
{
return _assignment;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetConsumerAssignment(ConsumerAssignment? assignment)
{
_mu.EnterWriteLock();
try
{
_assignment = assignment;
}
finally
{
_mu.ExitWriteLock();
}
}
private static bool HasPathSeparators(string? value)
{
if (string.IsNullOrWhiteSpace(value))
return false;
return value.Contains('/') || value.Contains('\\');
}
}

View File

@@ -0,0 +1,139 @@
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
private readonly HashSet<string> _internalSubscriptions = new(StringComparer.Ordinal);
private readonly Channel<bool> _updateChannel = Channel.CreateBounded<bool>(4);
private Channel<bool>? _monitorQuitChannel = Channel.CreateBounded<bool>(1);
internal ChannelReader<bool>? MonitorQuitC()
{
_mu.EnterReadLock();
try
{
return _monitorQuitChannel?.Reader;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SignalMonitorQuit()
{
_mu.EnterWriteLock();
try
{
var channel = _monitorQuitChannel;
if (channel is null)
return;
channel.Writer.TryWrite(true);
channel.Writer.TryComplete();
_monitorQuitChannel = null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ChannelReader<bool> UpdateC() => _updateChannel.Reader;
internal bool CheckQueueInterest(string? queue = null)
{
_mu.EnterReadLock();
try
{
if (_closed)
return false;
if (_internalSubscriptions.Count > 0)
return true;
return !string.IsNullOrWhiteSpace(queue) && _internalSubscriptions.Contains(queue);
}
finally
{
_mu.ExitReadLock();
}
}
internal void ClearNode() => ClearRaftNode();
internal bool IsLeaderInternal() => IsLeader();
internal ConsumerInfo? HandleClusterConsumerInfoRequest() =>
IsLeader() && !_closed ? GetInfo() : null;
internal bool SubscribeInternal(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return false;
_mu.EnterWriteLock();
try
{
var added = _internalSubscriptions.Add(subject);
if (added)
_updateChannel.Writer.TryWrite(true);
return added;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool Unsubscribe(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return false;
_mu.EnterWriteLock();
try
{
var removed = _internalSubscriptions.Remove(subject);
if (removed)
_updateChannel.Writer.TryWrite(true);
return removed;
}
finally
{
_mu.ExitWriteLock();
}
}
internal DateTime CreatedTime()
{
_mu.EnterReadLock();
try
{
return Created;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetCreatedTime(DateTime created)
{
_mu.EnterWriteLock();
try
{
Created = created;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool HasDeliveryInterest()
{
return HasDeliveryInterest(_hasLocalDeliveryInterest);
}
}

View File

@@ -0,0 +1,741 @@
using System.Text.Json;
using System.Text;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
private static readonly TimeSpan DefaultGatewayInterestInterval = TimeSpan.FromSeconds(1);
private ConsumerInfo? _initialInfo;
internal bool UpdateDeliveryInterest(bool localInterest)
{
var interest = HasDeliveryInterest(localInterest);
_mu.EnterWriteLock();
try
{
_hasLocalDeliveryInterest = localInterest;
if (_closed || IsPullMode())
return false;
var wasActive = !IsPullMode() && _isLeader;
if (interest && !wasActive)
_updateChannel.Writer.TryWrite(true);
if (!interest)
_isPaused = false;
if (_deleteTimer != null && _deleteThreshold > TimeSpan.Zero && !interest)
return true;
_deleteTimer?.Dispose();
_deleteTimer = null;
if (!interest && _deleteThreshold > TimeSpan.Zero)
{
_deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan);
return true;
}
return false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void DeleteNotActive()
{
_mu.EnterReadLock();
try
{
if (_closed)
return;
if (IsPushMode())
{
if (HasDeliveryInterest())
return;
}
else
{
if (_state.Pending is { Count: > 0 })
return;
}
}
finally
{
_mu.ExitReadLock();
}
Delete();
}
internal void WatchGWinterest()
{
var wasActive = HasDeliveryInterest();
if (!_hasLocalDeliveryInterest)
{
UpdateDeliveryInterest(localInterest: false);
if (!wasActive && HasDeliveryInterest())
_updateChannel.Writer.TryWrite(true);
}
_mu.EnterWriteLock();
try
{
_gatewayWatchTimer?.Dispose();
_gatewayWatchTimer = new Timer(static s => ((NatsConsumer)s!).WatchGWinterest(), this, DefaultGatewayInterestInterval, Timeout.InfiniteTimeSpan);
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool HasMaxDeliveries(ulong sequence)
{
_mu.EnterWriteLock();
try
{
if (Config.MaxDeliver <= 0)
return false;
_state.Redelivered ??= new Dictionary<ulong, ulong>();
_state.Pending ??= new Dictionary<ulong, Pending>();
var deliveryCount = _state.Redelivered.TryGetValue(sequence, out var redeliveries)
? redeliveries + 1
: 1UL;
if (deliveryCount < (ulong)Config.MaxDeliver)
{
_state.Redelivered[sequence] = deliveryCount;
return false;
}
_state.Redelivered[sequence] = deliveryCount;
_state.Pending.Remove(sequence);
_updateChannel.Writer.TryWrite(true);
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ForceExpirePending()
{
_mu.EnterWriteLock();
try
{
if (_state.Pending is not { Count: > 0 })
return;
_state.Redelivered ??= new Dictionary<ulong, ulong>();
foreach (var seq in _state.Pending.Keys.ToArray())
{
if (HasMaxDeliveries(seq))
continue;
_state.Redelivered[seq] = _state.Redelivered.TryGetValue(seq, out var current)
? current + 1
: 1UL;
_state.Pending[seq].Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
_updateChannel.Writer.TryWrite(true);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SetRateLimitNeedsLocks()
{
_mu.EnterWriteLock();
try
{
SetRateLimit(Config.RateLimit);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SetRateLimit(ulong bitsPerSecond)
{
if (bitsPerSecond == 0)
{
Interlocked.Exchange(ref _rateLimitBitsPerSecond, 0);
_rateLimitBurstBytes = 0;
return;
}
Interlocked.Exchange(ref _rateLimitBitsPerSecond, (long)bitsPerSecond);
var configuredMax = _streamRef?.Config.MaxMsgSize ?? 0;
_rateLimitBurstBytes = configuredMax > 0
? configuredMax
: (int)Math.Max(1024UL, bitsPerSecond / 8UL);
}
internal bool UpdateDeliverSubject(string? newDeliver)
{
_mu.EnterWriteLock();
try
{
return UpdateDeliverSubjectLocked(newDeliver);
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool UpdateDeliverSubjectLocked(string? newDeliver)
{
if (_closed || IsPullMode() || string.Equals(Config.DeliverSubject, newDeliver, StringComparison.Ordinal))
return false;
if (_state.Pending is { Count: > 0 })
ForceExpirePending();
if (!string.IsNullOrWhiteSpace(Config.DeliverSubject))
_internalSubscriptions.Remove(Config.DeliverSubject!);
Config.DeliverSubject = string.IsNullOrWhiteSpace(newDeliver) ? null : newDeliver;
if (!string.IsNullOrWhiteSpace(Config.DeliverSubject))
_internalSubscriptions.Add(Config.DeliverSubject!);
_updateChannel.Writer.TryWrite(true);
return true;
}
internal static bool ConfigsEqualSansDelivery(ConsumerConfig left, ConsumerConfig right)
{
var l = CloneConfig(left);
var r = CloneConfig(right);
l.DeliverSubject = null;
r.DeliverSubject = null;
return JsonSerializer.Serialize(l) == JsonSerializer.Serialize(r);
}
internal (ulong Sequence, bool CanRespond, Exception? Error) ResetStartingSeq(ulong sequence, string? reply)
{
_mu.EnterWriteLock();
try
{
if (sequence == 0)
{
sequence = _state.AckFloor.Stream + 1;
}
else
{
switch (Config.DeliverPolicy)
{
case DeliverPolicy.DeliverAll:
break;
case DeliverPolicy.DeliverByStartSequence when sequence < Config.OptStartSeq:
return (0, false, new InvalidOperationException("below start seq"));
case DeliverPolicy.DeliverByStartTime when Config.OptStartTime.HasValue:
if (sequence == 0)
return (0, false, new InvalidOperationException("below start time"));
break;
default:
return (0, false, new InvalidOperationException("not allowed"));
}
}
if (sequence == 0)
sequence = 1;
_state.Delivered.Stream = sequence;
_state.Delivered.Consumer = sequence;
_state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0;
_state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0;
_state.Pending = new Dictionary<ulong, Pending>();
_state.Redelivered = new Dictionary<ulong, ulong>();
_updateChannel.Writer.TryWrite(true);
_ = reply;
return (sequence, true, null);
}
finally
{
_mu.ExitWriteLock();
}
}
private bool HasDeliveryInterest(bool localInterest)
{
_mu.EnterReadLock();
try
{
if (string.IsNullOrWhiteSpace(Config.DeliverSubject))
return false;
if (localInterest)
return true;
if (_streamRef?.Account is not { } account)
return false;
if (account.Server is NatsServer server && server.HasGatewayInterest(account, Config.DeliverSubject!))
return true;
return _internalSubscriptions.Contains(Config.DeliverSubject!);
}
finally
{
_mu.ExitReadLock();
}
}
private bool IsPullMode() => string.IsNullOrWhiteSpace(Config.DeliverSubject);
private bool IsPushMode() => !IsPullMode();
private static ConsumerConfig CloneConfig(ConsumerConfig cfg) =>
new()
{
Durable = cfg.Durable,
Name = cfg.Name,
Description = cfg.Description,
DeliverPolicy = cfg.DeliverPolicy,
OptStartSeq = cfg.OptStartSeq,
OptStartTime = cfg.OptStartTime,
AckPolicy = cfg.AckPolicy,
AckWait = cfg.AckWait,
MaxDeliver = cfg.MaxDeliver,
BackOff = cfg.BackOff?.ToArray(),
FilterSubject = cfg.FilterSubject,
FilterSubjects = cfg.FilterSubjects?.ToArray(),
ReplayPolicy = cfg.ReplayPolicy,
RateLimit = cfg.RateLimit,
SampleFrequency = cfg.SampleFrequency,
MaxWaiting = cfg.MaxWaiting,
MaxAckPending = cfg.MaxAckPending,
FlowControl = cfg.FlowControl,
HeadersOnly = cfg.HeadersOnly,
MaxRequestBatch = cfg.MaxRequestBatch,
MaxRequestExpires = cfg.MaxRequestExpires,
MaxRequestMaxBytes = cfg.MaxRequestMaxBytes,
DeliverSubject = cfg.DeliverSubject,
DeliverGroup = cfg.DeliverGroup,
Heartbeat = cfg.Heartbeat,
InactiveThreshold = cfg.InactiveThreshold,
Replicas = cfg.Replicas,
MemoryStorage = cfg.MemoryStorage,
Direct = cfg.Direct,
Metadata = cfg.Metadata is null ? null : new Dictionary<string, string>(cfg.Metadata),
PauseUntil = cfg.PauseUntil,
PriorityGroups = cfg.PriorityGroups?.ToArray(),
PriorityPolicy = cfg.PriorityPolicy,
PinnedTTL = cfg.PinnedTTL,
};
internal void ResetLocalStartingSeq(ulong sequence)
{
_mu.EnterWriteLock();
try
{
_state.Delivered.Stream = sequence;
_state.Delivered.Consumer = sequence;
_state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0;
_state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0;
_state.Pending = [];
_state.Redelivered = [];
}
finally
{
_mu.ExitWriteLock();
}
}
internal int LoopAndForwardProposals()
{
_mu.EnterWriteLock();
try
{
var count = 0;
while (_proposalQueue.TryDequeue(out _))
{
count++;
}
return count;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void Propose(byte[] proposal)
{
ArgumentNullException.ThrowIfNull(proposal);
_mu.EnterWriteLock();
try
{
_proposalQueue.Enqueue((byte[])proposal.Clone());
}
finally
{
_mu.ExitWriteLock();
}
}
internal void UpdateDelivered(ulong consumerSequence, ulong streamSequence, ulong deliveryCount, long timestamp)
{
_mu.EnterWriteLock();
try
{
_state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, consumerSequence);
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSequence);
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, consumerSequence > 0 ? consumerSequence - 1 : 0);
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSequence > 0 ? streamSequence - 1 : 0);
_state.Pending ??= [];
_state.Pending[streamSequence] = new Pending
{
Sequence = consumerSequence,
Timestamp = timestamp,
};
_state.Redelivered ??= [];
if (deliveryCount > 1)
_state.Redelivered[streamSequence] = deliveryCount;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void AddAckReply(ulong streamSequence, string reply)
{
if (string.IsNullOrWhiteSpace(reply))
return;
_mu.EnterWriteLock();
try
{
_ackReplies[streamSequence] = reply;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void AddReplicatedQueuedMsg(ulong sequence, JsPubMsg message)
{
ArgumentNullException.ThrowIfNull(message);
_mu.EnterWriteLock();
try
{
_state.Pending ??= [];
_state.Pending[sequence] = new Pending
{
Sequence = sequence,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
}
finally
{
_mu.ExitWriteLock();
}
}
internal int UpdateAcks()
{
_mu.EnterWriteLock();
try
{
var count = _ackReplies.Count;
_ackReplies.Clear();
return count;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void AddClusterPendingRequest(string requestId)
{
if (string.IsNullOrWhiteSpace(requestId))
return;
_mu.EnterWriteLock();
try
{
_clusterPendingRequests[requestId] = DateTime.UtcNow;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void RemoveClusterPendingRequest(string requestId)
{
if (string.IsNullOrWhiteSpace(requestId))
return;
_mu.EnterWriteLock();
try
{
_clusterPendingRequests.Remove(requestId);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SetPendingRequestsOk(bool ok)
{
_mu.EnterWriteLock();
try
{
_pendingRequestsOk = ok;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool PendingRequestsOk()
{
_mu.EnterReadLock();
try
{
return _pendingRequestsOk;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool CheckAndSetPendingRequestsOk(bool ok)
{
_mu.EnterWriteLock();
try
{
var previous = _pendingRequestsOk;
_pendingRequestsOk = ok;
return previous;
}
finally
{
_mu.ExitWriteLock();
}
}
internal int CheckPendingRequests(TimeSpan maxAge)
{
_mu.EnterReadLock();
try
{
var cutoff = DateTime.UtcNow - maxAge;
return _clusterPendingRequests.Values.Count(timestamp => timestamp >= cutoff);
}
finally
{
_mu.ExitReadLock();
}
}
internal int ReleaseAnyPendingRequests()
{
_mu.EnterWriteLock();
try
{
var released = _clusterPendingRequests.Count;
_clusterPendingRequests.Clear();
_pendingRequestsOk = true;
return released;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ConsumerState ReadStoredState()
{
_mu.EnterReadLock();
try
{
return GetConsumerState();
}
finally
{
_mu.ExitReadLock();
}
}
internal void ApplyState(ConsumerState state)
{
ArgumentNullException.ThrowIfNull(state);
_mu.EnterWriteLock();
try
{
_state = new ConsumerState
{
Delivered = new SequencePair
{
Consumer = state.Delivered.Consumer,
Stream = state.Delivered.Stream,
},
AckFloor = new SequencePair
{
Consumer = state.AckFloor.Consumer,
Stream = state.AckFloor.Stream,
},
Pending = state.Pending is null ? null : new Dictionary<ulong, Pending>(state.Pending),
Redelivered = state.Redelivered is null ? null : new Dictionary<ulong, ulong>(state.Redelivered),
};
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SetStoreState(ConsumerState state) => ApplyState(state);
internal ConsumerState WriteStoreState()
{
_mu.EnterWriteLock();
try
{
return WriteStoreStateUnlocked();
}
finally
{
_mu.ExitWriteLock();
}
}
internal ConsumerState WriteStoreStateUnlocked() => GetConsumerState();
internal ConsumerInfo InitialInfo()
{
_mu.EnterWriteLock();
try
{
_initialInfo ??= GetInfo();
return _initialInfo;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ClearInitialInfo()
{
_mu.EnterWriteLock();
try
{
_initialInfo = null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ConsumerInfo Info() => GetInfo();
internal ConsumerInfo InfoWithSnap(ConsumerState? snapshot = null)
{
if (snapshot is null)
return GetInfo();
var info = GetInfo();
info.Delivered = new SequenceInfo { Consumer = snapshot.Delivered.Consumer, Stream = snapshot.Delivered.Stream };
info.AckFloor = new SequenceInfo { Consumer = snapshot.AckFloor.Consumer, Stream = snapshot.AckFloor.Stream };
return info;
}
internal (ConsumerInfo Info, string ReplySubject) InfoWithSnapAndReply(string replySubject, ConsumerState? snapshot = null) =>
(InfoWithSnap(snapshot), replySubject);
internal void SignalNewMessages() => _updateChannel.Writer.TryWrite(true);
internal bool ShouldSample()
{
if (string.IsNullOrWhiteSpace(Config.SampleFrequency))
return false;
var token = Config.SampleFrequency!.Trim().TrimEnd('%');
if (!int.TryParse(token, out var percent))
return false;
return percent > 0;
}
internal bool SampleAck(string ackReply)
{
if (!ShouldSample())
return false;
if (string.IsNullOrWhiteSpace(ackReply))
return false;
AddAckReply((ulong)ackReply.Length, ackReply);
return true;
}
internal bool IsFiltered(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return false;
if (!string.IsNullOrWhiteSpace(Config.FilterSubject))
return Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, Config.FilterSubject!);
if (Config.FilterSubjects is not { Length: > 0 })
return false;
return Config.FilterSubjects.Any(filter => Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, filter));
}
internal bool NeedAck() => Config.AckPolicy != AckPolicy.AckNone;
internal static (JsApiConsumerGetNextRequest? Request, Exception? Error) NextReqFromMsg(ReadOnlySpan<byte> message)
{
if (message.Length == 0)
return (new JsApiConsumerGetNextRequest { Batch = 1 }, null);
try
{
var text = Encoding.UTF8.GetString(message);
if (int.TryParse(text, out var batch))
return (new JsApiConsumerGetNextRequest { Batch = Math.Max(1, batch) }, null);
var req = JsonSerializer.Deserialize<JsApiConsumerGetNextRequest>(text);
if (req is null)
return (null, new InvalidOperationException("invalid request"));
if (req.Batch <= 0)
req.Batch = 1;
return (req, null);
}
catch (Exception ex)
{
return (null, ex);
}
}
}

View File

@@ -0,0 +1,6 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal static WaitQueue NewWaitQueue(int max = 0) => WaitQueue.NewWaitQueue(max);
}

View File

@@ -19,7 +19,7 @@ namespace ZB.MOM.NatsNet.Server;
/// Represents a JetStream consumer, managing message delivery, ack tracking, and lifecycle.
/// Mirrors the <c>consumer</c> struct in server/consumer.go.
/// </summary>
internal sealed class NatsConsumer : IDisposable
internal sealed partial class NatsConsumer : IDisposable
{
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
@@ -41,6 +41,17 @@ internal sealed class NatsConsumer : IDisposable
private NatsStream? _streamRef;
private ConsumerAssignment? _assignment;
private DateTime _lostQuorumSent;
private TimeSpan _deleteThreshold;
private bool _isPaused;
private Timer? _deleteTimer;
private Timer? _gatewayWatchTimer;
private bool _hasLocalDeliveryInterest;
private long _rateLimitBitsPerSecond;
private int _rateLimitBurstBytes;
private readonly Queue<byte[]> _proposalQueue = new();
private readonly Dictionary<ulong, string> _ackReplies = new();
private readonly Dictionary<string, DateTime> _clusterPendingRequests = new(StringComparer.Ordinal);
private bool _pendingRequestsOk;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;
@@ -320,7 +331,8 @@ internal sealed class NatsConsumer : IDisposable
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, dseq);
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sseq);
Interlocked.Exchange(ref AckFloor, (long)_state.AckFloor.Stream);
return null;
Exception? noError = null;
return noError;
}
finally
{

View File

@@ -310,4 +310,80 @@ internal sealed partial class NatsStream
return _consumerList.All(c => ReferenceEquals(c, observingConsumer));
}
}
internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAction(
ConsumerConfig config,
string oname,
ConsumerAction action,
bool pedantic = false) =>
AddConsumerWithAssignment(config, oname, null, isRecovering: false, action, pedantic);
internal (NatsConsumer? Consumer, Exception? Error) AddConsumer(
ConsumerConfig config,
string oname,
bool pedantic = false) =>
AddConsumerWithAssignment(config, oname, null, isRecovering: false, ConsumerAction.CreateOrUpdate, pedantic);
internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAssignment(
ConsumerConfig config,
string oname,
ConsumerAssignment? assignment,
bool isRecovering,
ConsumerAction action,
bool pedantic = false)
{
ArgumentNullException.ThrowIfNull(config);
_mu.EnterWriteLock();
try
{
if (_closed)
return (null, new InvalidOperationException("stream closed"));
var name = !string.IsNullOrWhiteSpace(oname)
? oname
: (!string.IsNullOrWhiteSpace(config.Name) ? config.Name! : (config.Durable ?? string.Empty));
if (string.IsNullOrWhiteSpace(name))
return (null, new InvalidOperationException("consumer name required"));
config.Name = name;
config.Durable ??= name;
var defaultsErr = NatsConsumer.SetConsumerConfigDefaults(config, Config, null, pedantic);
if (defaultsErr is not null)
return (null, new InvalidOperationException(defaultsErr.Description ?? "consumer defaults invalid"));
var cfgErr = NatsConsumer.CheckConsumerCfg(config, Config, null, isRecovering);
if (cfgErr is not null)
return (null, new InvalidOperationException(cfgErr.Description ?? "consumer config invalid"));
if (_consumers.TryGetValue(name, out var existing))
{
if (action == ConsumerAction.Create)
return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerAlreadyExistsError().Description ?? "consumer exists"));
existing.UpdateConfig(config);
if (assignment is not null)
existing.SetConsumerAssignment(assignment);
return (existing, null);
}
if (action == ConsumerAction.Update)
return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerDoesNotExistError().Description ?? "consumer does not exist"));
var consumer = NatsConsumer.Create(this, config, action, assignment);
if (consumer is null)
return (null, new InvalidOperationException("consumer create failed"));
consumer.SetConsumerAssignment(assignment);
consumer.UpdateInactiveThreshold(config);
consumer.UpdatePauseState(config);
_consumers[name] = consumer;
return (consumer, null);
}
finally
{
_mu.ExitWriteLock();
}
}
}

View File

@@ -0,0 +1,82 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace ZB.MOM.NatsNet.Server;
internal static class ConsumerPolicyExtensions
{
internal static string String(this PriorityPolicy policy) =>
policy switch
{
PriorityPolicy.PriorityOverflow => "\"overflow\"",
PriorityPolicy.PriorityPinnedClient => "\"pinned_client\"",
PriorityPolicy.PriorityPrioritized => "\"prioritized\"",
_ => "\"none\"",
};
internal static string String(this DeliverPolicy policy) =>
policy switch
{
DeliverPolicy.DeliverAll => "all",
DeliverPolicy.DeliverLast => "last",
DeliverPolicy.DeliverNew => "new",
DeliverPolicy.DeliverByStartSequence => "by_start_sequence",
DeliverPolicy.DeliverByStartTime => "by_start_time",
DeliverPolicy.DeliverLastPerSubject => "last_per_subject",
_ => "undefined",
};
internal static string String(this AckPolicy policy) =>
policy switch
{
AckPolicy.AckNone => "none",
AckPolicy.AckAll => "all",
_ => "explicit",
};
internal static string String(this ReplayPolicy policy) =>
policy switch
{
ReplayPolicy.ReplayInstant => "instant",
_ => "original",
};
}
public sealed class PriorityPolicyJsonConverter : JsonConverter<PriorityPolicy>
{
public override PriorityPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"none" => PriorityPolicy.PriorityNone,
"overflow" => PriorityPolicy.PriorityOverflow,
"pinned_client" => PriorityPolicy.PriorityPinnedClient,
"prioritized" => PriorityPolicy.PriorityPrioritized,
var value => throw new JsonException($"unknown priority policy: {value}"),
};
}
public override void Write(Utf8JsonWriter writer, PriorityPolicy value, JsonSerializerOptions options)
{
switch (value)
{
case PriorityPolicy.PriorityNone:
writer.WriteStringValue("none");
break;
case PriorityPolicy.PriorityOverflow:
writer.WriteStringValue("overflow");
break;
case PriorityPolicy.PriorityPinnedClient:
writer.WriteStringValue("pinned_client");
break;
case PriorityPolicy.PriorityPrioritized:
writer.WriteStringValue("prioritized");
break;
default:
throw new JsonException($"unknown priority policy: {value}");
}
}
}

View File

@@ -615,6 +615,7 @@ public enum DeliverPolicy
// ---------------------------------------------------------------------------
/// <summary>Policy for selecting messages based on priority.</summary>
[JsonConverter(typeof(PriorityPolicyJsonConverter))]
public enum PriorityPolicy
{
PriorityNone = 0,

View File

@@ -0,0 +1,51 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace ZB.MOM.NatsNet.Server;
internal static class ConsumerActionExtensions
{
internal static string String(this ConsumerAction action) =>
action switch
{
ConsumerAction.CreateOrUpdate => "\"create_or_update\"",
ConsumerAction.Create => "\"create\"",
ConsumerAction.Update => "\"update\"",
_ => "\"create_or_update\"",
};
}
public sealed class ConsumerActionJsonConverter : JsonConverter<ConsumerAction>
{
public override ConsumerAction Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"create" => ConsumerAction.Create,
"update" => ConsumerAction.Update,
"create_or_update" => ConsumerAction.CreateOrUpdate,
var value => throw new JsonException($"unknown consumer action: {value}"),
};
}
public override void Write(Utf8JsonWriter writer, ConsumerAction value, JsonSerializerOptions options)
{
switch (value)
{
case ConsumerAction.Create:
writer.WriteStringValue("create");
break;
case ConsumerAction.Update:
writer.WriteStringValue("update");
break;
case ConsumerAction.CreateOrUpdate:
writer.WriteStringValue("create_or_update");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}

View File

@@ -395,6 +395,7 @@ public sealed class CreateConsumerRequest
/// Specifies the intended action when creating a consumer.
/// Mirrors <c>ConsumerAction</c> in server/consumer.go.
/// </summary>
[JsonConverter(typeof(ConsumerActionJsonConverter))]
public enum ConsumerAction
{
/// <summary>Create a new consumer or update if it already exists.</summary>
@@ -445,6 +446,42 @@ public sealed class WaitingRequest
/// <summary>Optional pull request priority group metadata.</summary>
public PriorityGroup? PriorityGroup { get; set; }
public bool RecycleIfDone()
{
if (N > 0 || MaxBytes > 0 && B < MaxBytes)
return false;
Recycle();
return true;
}
public void Recycle()
{
Subject = string.Empty;
Reply = null;
N = 0;
D = 0;
NoWait = 0;
Expires = null;
MaxBytes = 0;
B = 0;
PriorityGroup = null;
}
}
public sealed class WaitingDelivery
{
public string Reply { get; set; } = string.Empty;
public ulong Sequence { get; set; }
public DateTime Created { get; set; } = DateTime.UtcNow;
public void Recycle()
{
Reply = string.Empty;
Sequence = 0;
Created = DateTime.UnixEpoch;
}
}
/// <summary>
@@ -670,6 +707,8 @@ public sealed class WaitQueue
}
private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue;
public static WaitQueue NewWaitQueue(int max = 0) => new(max);
}
/// <summary>