From fce6bd7dcace4b60e2cd793be9257caa129e09be Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:11:47 -0500 Subject: [PATCH] task2: implement batch38 group A consumer lifecycle features --- .../Auth/JwtProcessor.cs | 9 +- .../Internal/SubjectTokens.ConsumerFilters.cs | 10 + .../JetStream/NatsConsumer.Config.cs | 268 ++++++++++++++++++ .../JetStream/NatsConsumer.cs | 7 +- .../JetStream/NatsStream.Consumers.cs | 80 ++++++ .../JetStream/NatsStream.cs | 1 + .../JetStream/StoreTypes.ConsumerPolicies.cs | 82 ++++++ .../JetStream/StoreTypes.cs | 1 + .../StreamTypes.ConsumerLifecycle.cs | 51 ++++ .../JetStream/StreamTypes.cs | 1 + .../JetStream/ConsumerPoliciesTests.cs | 133 +++++++++ .../JetStream/NatsConsumerTests.cs | 85 ++++++ porting.db | Bin 6758400 -> 6758400 bytes 13 files changed, 725 insertions(+), 3 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs index de4ef4e..69d5ab2 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs @@ -128,7 +128,14 @@ public static class JwtProcessor // If start > end, end is on the next day (overnight range). if (startTime > endTime) { - end = end.AddDays(1); + if (now.TimeOfDay < endTime) + { + start = start.AddDays(-1); + } + else + { + end = end.AddDays(1); + } } if (start <= now && now < end) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs new file mode 100644 index 0000000..711d466 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs @@ -0,0 +1,10 @@ +namespace ZB.MOM.NatsNet.Server; + +internal static class SubjectTokens +{ + internal static string[] Subjects(IEnumerable filters) + { + ArgumentNullException.ThrowIfNull(filters); + return filters.Where(static filter => !string.IsNullOrWhiteSpace(filter)).ToArray(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs new file mode 100644 index 0000000..cb90083 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs @@ -0,0 +1,268 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal const int DefaultMaxAckPending = 1000; + internal static readonly TimeSpan DefaultAckWait = TimeSpan.FromSeconds(30); + internal static readonly TimeSpan DefaultDeleteWait = TimeSpan.FromSeconds(5); + internal static readonly TimeSpan DefaultPinnedTtl = TimeSpan.FromMinutes(2); + + internal static JsApiError? SetConsumerConfigDefaults( + ConsumerConfig config, + StreamConfig streamConfig, + JetStreamAccountLimits? selectedLimits, + bool pedantic) + { + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(streamConfig); + var streamReplicas = Math.Max(1, streamConfig.Replicas); + + if (config.MaxDeliver is 0 or < -1) + { + if (pedantic && config.MaxDeliver < -1) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_deliver must be set to -1")); + config.MaxDeliver = -1; + } + + if (config.MaxWaiting < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_waiting must not be negative")); + config.MaxWaiting = 0; + } + + if (config.MaxAckPending < -1) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_ack_pending must be set to -1")); + config.MaxAckPending = -1; + } + + if (config.MaxRequestBatch < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_batch must not be negative")); + config.MaxRequestBatch = 0; + } + + if (config.MaxRequestExpires < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_expires must not be negative")); + config.MaxRequestExpires = TimeSpan.Zero; + } + + if (config.MaxRequestMaxBytes < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_bytes must not be negative")); + config.MaxRequestMaxBytes = 0; + } + + if (config.Heartbeat < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("idle_heartbeat must not be negative")); + config.Heartbeat = TimeSpan.Zero; + } + + if (config.InactiveThreshold < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("inactive_threshold must not be negative")); + config.InactiveThreshold = TimeSpan.Zero; + } + + if (config.PinnedTTL < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("priority_timeout must not be negative")); + config.PinnedTTL = TimeSpan.Zero; + } + + if (config.AckWait == TimeSpan.Zero) + config.AckWait = DefaultAckWait; + + if (config.MaxAckPending == 0 && config.AckPolicy != AckPolicy.AckNone) + { + config.MaxAckPending = selectedLimits?.MaxAckPending > 0 + ? selectedLimits.MaxAckPending + : DefaultMaxAckPending; + } + + if (config.InactiveThreshold == TimeSpan.Zero && string.IsNullOrWhiteSpace(config.Durable)) + config.InactiveThreshold = DefaultDeleteWait; + + if (config.PinnedTTL == TimeSpan.Zero && config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient) + config.PinnedTTL = DefaultPinnedTtl; + + if (config.Replicas == 0 || config.Replicas > streamReplicas) + config.Replicas = streamReplicas; + + if (!string.IsNullOrWhiteSpace(config.Name) && string.IsNullOrWhiteSpace(config.Durable)) + config.Durable = config.Name; + + return null; + } + + internal static JsApiError? CheckConsumerCfg( + ConsumerConfig config, + StreamConfig streamConfig, + JetStreamAccountLimits? selectedLimits, + bool isRecovering) + { + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(streamConfig); + var streamReplicas = Math.Max(1, streamConfig.Replicas); + + if (!string.IsNullOrWhiteSpace(config.Durable) && + !string.IsNullOrWhiteSpace(config.Name) && + !string.Equals(config.Durable, config.Name, StringComparison.Ordinal)) + { + return JsApiErrors.NewJSConsumerCreateDurableAndNameMismatchError(); + } + + if (HasPathSeparators(config.Durable) || HasPathSeparators(config.Name)) + return JsApiErrors.NewJSConsumerNameContainsPathSeparatorsError(); + + if (config.Replicas > streamReplicas) + return JsApiErrors.NewJSConsumerReplicasExceedsStreamError(); + + if (!Enum.IsDefined(config.AckPolicy)) + return JsApiErrors.NewJSConsumerAckPolicyInvalidError(); + + if (!Enum.IsDefined(config.ReplayPolicy)) + return JsApiErrors.NewJSConsumerReplayPolicyInvalidError(); + + if (!Enum.IsDefined(config.DeliverPolicy)) + return JsApiErrors.NewJSConsumerInvalidPolicyError(new InvalidOperationException("deliver policy invalid")); + + if (config.FilterSubjects is { Length: > 0 } && !string.IsNullOrWhiteSpace(config.FilterSubject)) + return JsApiErrors.NewJSConsumerDuplicateFilterSubjectsError(); + + var filters = config.FilterSubjects is { Length: > 0 } + ? SubjectTokens.Subjects(config.FilterSubjects) + : (string.IsNullOrWhiteSpace(config.FilterSubject) ? [] : [config.FilterSubject]); + + for (var i = 0; i < filters.Length; i++) + { + if (string.IsNullOrWhiteSpace(filters[i])) + return JsApiErrors.NewJSConsumerEmptyFilterError(); + if (!SubscriptionIndex.IsValidSubject(filters[i])) + return JsApiErrors.NewJSConsumerFilterNotSubsetError(); + + for (var j = i + 1; j < filters.Length; j++) + { + if (SubscriptionIndex.SubjectsCollide(filters[i], filters[j])) + return JsApiErrors.NewJSConsumerOverlappingSubjectFiltersError(); + } + } + + var isPush = !string.IsNullOrWhiteSpace(config.DeliverSubject); + if (isPush) + { + if (!SubscriptionIndex.IsValidSubject(config.DeliverSubject!)) + return JsApiErrors.NewJSConsumerInvalidDeliverSubjectError(); + + if (SubscriptionIndex.SubjectHasWildcard(config.DeliverSubject!)) + return JsApiErrors.NewJSConsumerDeliverToWildcardsError(); + + if (config.MaxWaiting > 0) + return JsApiErrors.NewJSConsumerPushMaxWaitingError(); + } + else + { + if (config.RateLimit > 0) + return JsApiErrors.NewJSConsumerPullWithRateLimitError(); + } + + if (config.MaxAckPending > 0 && selectedLimits?.MaxAckPending > 0 && config.MaxAckPending > selectedLimits.MaxAckPending) + return JsApiErrors.NewJSConsumerMaxPendingAckExcessError(selectedLimits.MaxAckPending); + + if (streamConfig.Retention == RetentionPolicy.WorkQueuePolicy && config.AckPolicy != AckPolicy.AckExplicit) + return JsApiErrors.NewJSConsumerWQRequiresExplicitAckError(); + + if (config.Direct) + { + if (isPush) + return JsApiErrors.NewJSConsumerDirectRequiresPushError(); + if (!string.IsNullOrWhiteSpace(config.Durable)) + return JsApiErrors.NewJSConsumerDirectRequiresEphemeralError(); + } + + _ = isRecovering; + return null; + } + + internal void UpdateInactiveThreshold(ConsumerConfig config) + { + ArgumentNullException.ThrowIfNull(config); + + _mu.EnterWriteLock(); + try + { + _deleteThreshold = config.InactiveThreshold > TimeSpan.Zero + ? config.InactiveThreshold + : DefaultDeleteWait; + + Config.InactiveThreshold = _deleteThreshold; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdatePauseState(ConsumerConfig config, DateTime? nowUtc = null) + { + ArgumentNullException.ThrowIfNull(config); + var now = nowUtc ?? DateTime.UtcNow; + + _mu.EnterWriteLock(); + try + { + Config.PauseUntil = config.PauseUntil; + _isPaused = config.PauseUntil.HasValue && config.PauseUntil.Value > now; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerAssignment? ConsumerAssignment() + { + _mu.EnterReadLock(); + try + { + return _assignment; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetConsumerAssignment(ConsumerAssignment? assignment) + { + _mu.EnterWriteLock(); + try + { + _assignment = assignment; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private static bool HasPathSeparators(string? value) + { + if (string.IsNullOrWhiteSpace(value)) + return false; + + return value.Contains('/') || value.Contains('\\'); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 77b9b92..32062b9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -19,7 +19,7 @@ namespace ZB.MOM.NatsNet.Server; /// Represents a JetStream consumer, managing message delivery, ack tracking, and lifecycle. /// Mirrors the consumer struct in server/consumer.go. /// -internal sealed class NatsConsumer : IDisposable +internal sealed partial class NatsConsumer : IDisposable { private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); @@ -41,6 +41,8 @@ internal sealed class NatsConsumer : IDisposable private NatsStream? _streamRef; private ConsumerAssignment? _assignment; private DateTime _lostQuorumSent; + private TimeSpan _deleteThreshold; + private bool _isPaused; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -320,7 +322,8 @@ internal sealed class NatsConsumer : IDisposable _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, dseq); _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sseq); Interlocked.Exchange(ref AckFloor, (long)_state.AckFloor.Stream); - return null; + Exception? noError = null; + return noError; } finally { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs new file mode 100644 index 0000000..a1816d4 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -0,0 +1,80 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAction( + ConsumerConfig config, + string oname, + ConsumerAction action, + bool pedantic = false) => + AddConsumerWithAssignment(config, oname, null, isRecovering: false, action, pedantic); + + internal (NatsConsumer? Consumer, Exception? Error) AddConsumer( + ConsumerConfig config, + string oname, + bool pedantic = false) => + AddConsumerWithAssignment(config, oname, null, isRecovering: false, ConsumerAction.CreateOrUpdate, pedantic); + + internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAssignment( + ConsumerConfig config, + string oname, + ConsumerAssignment? assignment, + bool isRecovering, + ConsumerAction action, + bool pedantic = false) + { + ArgumentNullException.ThrowIfNull(config); + + _mu.EnterWriteLock(); + try + { + if (_closed) + return (null, new InvalidOperationException("stream closed")); + + var name = !string.IsNullOrWhiteSpace(oname) + ? oname + : (!string.IsNullOrWhiteSpace(config.Name) ? config.Name! : (config.Durable ?? string.Empty)); + if (string.IsNullOrWhiteSpace(name)) + return (null, new InvalidOperationException("consumer name required")); + + config.Name = name; + config.Durable ??= name; + + var defaultsErr = NatsConsumer.SetConsumerConfigDefaults(config, Config, null, pedantic); + if (defaultsErr is not null) + return (null, new InvalidOperationException(defaultsErr.Description ?? "consumer defaults invalid")); + + var cfgErr = NatsConsumer.CheckConsumerCfg(config, Config, null, isRecovering); + if (cfgErr is not null) + return (null, new InvalidOperationException(cfgErr.Description ?? "consumer config invalid")); + + if (_consumers.TryGetValue(name, out var existing)) + { + if (action == ConsumerAction.Create) + return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerAlreadyExistsError().Description ?? "consumer exists")); + + existing.UpdateConfig(config); + if (assignment is not null) + existing.SetConsumerAssignment(assignment); + return (existing, null); + } + + if (action == ConsumerAction.Update) + return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerDoesNotExistError().Description ?? "consumer does not exist")); + + var consumer = NatsConsumer.Create(this, config, action, assignment); + if (consumer is null) + return (null, new InvalidOperationException("consumer create failed")); + + consumer.SetConsumerAssignment(assignment); + consumer.UpdateInactiveThreshold(config); + consumer.UpdatePauseState(config); + _consumers[name] = consumer; + return (consumer, null); + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 5ad6a98..d75a776 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -52,6 +52,7 @@ internal sealed partial class NatsStream : IDisposable private ulong _clseq; private ulong _clfs; private readonly Dictionary _sources = new(StringComparer.Ordinal); + private readonly Dictionary _consumers = new(StringComparer.Ordinal); private StreamSourceInfo? _mirrorInfo; private Timer? _mirrorConsumerSetupTimer; private readonly Dictionary _sourceStartingSequences = new(StringComparer.Ordinal); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs new file mode 100644 index 0000000..62ab93e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs @@ -0,0 +1,82 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +internal static class ConsumerPolicyExtensions +{ + internal static string String(this PriorityPolicy policy) => + policy switch + { + PriorityPolicy.PriorityOverflow => "\"overflow\"", + PriorityPolicy.PriorityPinnedClient => "\"pinned_client\"", + PriorityPolicy.PriorityPrioritized => "\"prioritized\"", + _ => "\"none\"", + }; + + internal static string String(this DeliverPolicy policy) => + policy switch + { + DeliverPolicy.DeliverAll => "all", + DeliverPolicy.DeliverLast => "last", + DeliverPolicy.DeliverNew => "new", + DeliverPolicy.DeliverByStartSequence => "by_start_sequence", + DeliverPolicy.DeliverByStartTime => "by_start_time", + DeliverPolicy.DeliverLastPerSubject => "last_per_subject", + _ => "undefined", + }; + + internal static string String(this AckPolicy policy) => + policy switch + { + AckPolicy.AckNone => "none", + AckPolicy.AckAll => "all", + _ => "explicit", + }; + + internal static string String(this ReplayPolicy policy) => + policy switch + { + ReplayPolicy.ReplayInstant => "instant", + _ => "original", + }; +} + +public sealed class PriorityPolicyJsonConverter : JsonConverter +{ + public override PriorityPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "none" => PriorityPolicy.PriorityNone, + "overflow" => PriorityPolicy.PriorityOverflow, + "pinned_client" => PriorityPolicy.PriorityPinnedClient, + "prioritized" => PriorityPolicy.PriorityPrioritized, + var value => throw new JsonException($"unknown priority policy: {value}"), + }; + } + + public override void Write(Utf8JsonWriter writer, PriorityPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case PriorityPolicy.PriorityNone: + writer.WriteStringValue("none"); + break; + case PriorityPolicy.PriorityOverflow: + writer.WriteStringValue("overflow"); + break; + case PriorityPolicy.PriorityPinnedClient: + writer.WriteStringValue("pinned_client"); + break; + case PriorityPolicy.PriorityPrioritized: + writer.WriteStringValue("prioritized"); + break; + default: + throw new JsonException($"unknown priority policy: {value}"); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index 03eb1d7..439ad7d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -615,6 +615,7 @@ public enum DeliverPolicy // --------------------------------------------------------------------------- /// Policy for selecting messages based on priority. +[JsonConverter(typeof(PriorityPolicyJsonConverter))] public enum PriorityPolicy { PriorityNone = 0, diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs new file mode 100644 index 0000000..e3c40cd --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs @@ -0,0 +1,51 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +internal static class ConsumerActionExtensions +{ + internal static string String(this ConsumerAction action) => + action switch + { + ConsumerAction.CreateOrUpdate => "\"create_or_update\"", + ConsumerAction.Create => "\"create\"", + ConsumerAction.Update => "\"update\"", + _ => "\"create_or_update\"", + }; +} + +public sealed class ConsumerActionJsonConverter : JsonConverter +{ + public override ConsumerAction Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "create" => ConsumerAction.Create, + "update" => ConsumerAction.Update, + "create_or_update" => ConsumerAction.CreateOrUpdate, + var value => throw new JsonException($"unknown consumer action: {value}"), + }; + } + + public override void Write(Utf8JsonWriter writer, ConsumerAction value, JsonSerializerOptions options) + { + switch (value) + { + case ConsumerAction.Create: + writer.WriteStringValue("create"); + break; + case ConsumerAction.Update: + writer.WriteStringValue("update"); + break; + case ConsumerAction.CreateOrUpdate: + writer.WriteStringValue("create_or_update"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index c63e5dd..d3b1952 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -395,6 +395,7 @@ public sealed class CreateConsumerRequest /// Specifies the intended action when creating a consumer. /// Mirrors ConsumerAction in server/consumer.go. /// +[JsonConverter(typeof(ConsumerActionJsonConverter))] public enum ConsumerAction { /// Create a new consumer or update if it already exists. diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs new file mode 100644 index 0000000..7a3e2f1 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs @@ -0,0 +1,133 @@ +using System.Text.Json; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class ConsumerPoliciesTests +{ + [Fact] + public void ConsumerAction_StringAndJsonParity_ShouldMatchGo() + { + ConsumerAction.CreateOrUpdate.String().ShouldBe("\"create_or_update\""); + ConsumerAction.Create.String().ShouldBe("\"create\""); + ConsumerAction.Update.String().ShouldBe("\"update\""); + + JsonSerializer.Serialize(ConsumerAction.Create).ShouldBe("\"create\""); + JsonSerializer.Deserialize("\"update\"").ShouldBe(ConsumerAction.Update); + Should.Throw(() => JsonSerializer.Deserialize("\"bogus\"")); + } + + [Fact] + public void PriorityPolicy_StringAndJsonParity_ShouldMatchGo() + { + PriorityPolicy.PriorityNone.String().ShouldBe("\"none\""); + PriorityPolicy.PriorityOverflow.String().ShouldBe("\"overflow\""); + PriorityPolicy.PriorityPinnedClient.String().ShouldBe("\"pinned_client\""); + PriorityPolicy.PriorityPrioritized.String().ShouldBe("\"prioritized\""); + + JsonSerializer.Serialize(PriorityPolicy.PriorityPinnedClient).ShouldBe("\"pinned_client\""); + JsonSerializer.Deserialize("\"prioritized\"").ShouldBe(PriorityPolicy.PriorityPrioritized); + Should.Throw(() => JsonSerializer.Deserialize("\"none-ish\"")); + } + + [Fact] + public void ConsumerPolicies_StringParity_ShouldMatchGo() + { + DeliverPolicy.DeliverByStartSequence.String().ShouldBe("by_start_sequence"); + AckPolicy.AckExplicit.String().ShouldBe("explicit"); + ReplayPolicy.ReplayInstant.String().ShouldBe("instant"); + } + + [Fact] + public void SubjectTokens_Subjects_RemovesEmptyValues() + { + var subjects = SubjectTokens.Subjects(new[] { "foo.*", string.Empty, " ", "bar.>" }); + subjects.ShouldBe(["foo.*", "bar.>"]); + } + + [Fact] + public void SetConsumerConfigDefaults_InvalidNegativesInPedanticMode_ReturnsError() + { + var cfg = new ConsumerConfig { MaxDeliver = -2 }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 3 }; + + var err = NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: true); + + err.ShouldNotBeNull(); + cfg.MaxDeliver.ShouldBe(-2); + } + + [Fact] + public void SetConsumerConfigDefaults_AppliesGoDefaults_ShouldPopulateExpectedValues() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxDeliver = 0, + AckPolicy = AckPolicy.AckExplicit, + Replicas = 0, + }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 3 }; + var limits = new JetStreamAccountLimits { MaxAckPending = 2500 }; + + var err = NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, limits, pedantic: false); + + err.ShouldBeNull(); + cfg.MaxDeliver.ShouldBe(-1); + cfg.AckWait.ShouldBe(TimeSpan.FromSeconds(30)); + cfg.MaxAckPending.ShouldBe(2500); + cfg.Replicas.ShouldBe(3); + } + + [Fact] + public void CheckConsumerCfg_DurableNameMismatch_ReturnsError() + { + var cfg = new ConsumerConfig { Name = "A", Durable = "B", AckPolicy = AckPolicy.AckExplicit }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 1 }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldNotBeNull(); + err.ErrCode.ShouldBe(JsApiErrors.ConsumerCreateDurableAndNameMismatch.ErrCode); + } + + [Fact] + public void CheckConsumerCfg_OverlappingFilterSubjects_ReturnsError() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubjects = ["orders.*", "orders.created"], + }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 1 }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldNotBeNull(); + err.ErrCode.ShouldBe(JsApiErrors.ConsumerOverlappingSubjectFilters.ErrCode); + } + + [Fact] + public void CheckConsumerCfg_WithValidPullConfig_ReturnsNull() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubject = "orders.created", + }; + var streamCfg = new StreamConfig + { + Name = "ORDERS", + Replicas = 1, + Retention = RetentionPolicy.LimitsPolicy, + Subjects = ["orders.>"], + }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldBeNull(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index 611e398..c7ccf81 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -127,4 +127,89 @@ public sealed class NatsConsumerTests q.Peek()!.Reply.ShouldBe("2a"); q.Peek()!.N.ShouldBe(3); } + + [Fact] + public void AddConsumerWithAction_CreateThenUpdate_ShouldRespectActions() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var (created, createErr) = stream!.AddConsumerWithAction(cfg, "D", ConsumerAction.Create, pedantic: false); + createErr.ShouldBeNull(); + created.ShouldNotBeNull(); + + var updateCfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll }; + var (updated, updateErr) = stream.AddConsumerWithAction(updateCfg, "D", ConsumerAction.Update, pedantic: false); + updateErr.ShouldBeNull(); + updated.ShouldNotBeNull(); + updated!.GetConfig().AckPolicy.ShouldBe(AckPolicy.AckAll); + } + + [Fact] + public void AddConsumer_WithAssignment_ShouldAttachAssignment() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var assignment = new ConsumerAssignment + { + Name = "D", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["N1"] }, + }; + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + + var (consumer, err) = stream!.AddConsumerWithAssignment(cfg, "D", assignment, isRecovering: false, ConsumerAction.Create, pedantic: false); + err.ShouldBeNull(); + consumer.ShouldNotBeNull(); + consumer!.ConsumerAssignment().ShouldBeSameAs(assignment); + } + + [Fact] + public void UpdateInactiveThreshold_AndPauseState_ShouldTrackConfigValues() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromSeconds(30) }); + consumer.GetConfig().InactiveThreshold.ShouldBe(TimeSpan.FromSeconds(30)); + + var pauseUntil = DateTime.UtcNow.AddMinutes(1); + consumer.UpdatePauseState(new ConsumerConfig { PauseUntil = pauseUntil }); + consumer.GetConfig().PauseUntil.ShouldBe(pauseUntil); + } + + [Fact] + public void ConsumerAssignment_GetSet_ShouldRoundTrip() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + var assignment = new ConsumerAssignment + { + Name = "D", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["N1"] }, + }; + + consumer!.SetConsumerAssignment(assignment); + consumer.ConsumerAssignment().ShouldBeSameAs(assignment); + } } diff --git a/porting.db b/porting.db index 7a4e86edcdb0fdedb7c1b7863a9cbaa49ad7ea35..94b683634876f2f4a07e0dade43ef79afc4041ad 100644 GIT binary patch delta 4460 zcmc(heN0=|8OHCq{=nBhz9t_u1Y%+!KpIN0A(V{JL9#aG8^?T`Ph+l$gKY?gsEb-9 zs8OOxk(N|bPu5u*WSUk@ZIwuAZl!*0U0Q8gyQSGwY0K7bKdj%V(vM|e;(5v2WQf)FL0c3l;gO3K+bM2HN3#>Jo(_of$~BUeR`@; zo+>1L7d9vTmU~O@F0$hkMkhX(Q!X1GO4<^Ee_ zH(_Vyh*|x^BtJJtUQN=?a@Sbh4EZ(-&ywA;aEJVb=xGMGgWdd!ln|5bSRkL0W43v6 zbU+G7qodLQ-w78|oENS~aUF1O#kIrjQ(PO|F2%LN&^OaAw6d*^bR~3ySsF z=DE)m=Z2eBTqE2y#WlcPQCvOTWyRIOJ+HW0xaSmC19y7Kk!h=r8NVif*W-&K%3pp> z%;w8d1y+0NRs#BS?3`g~G`F{T<_nu;zPt8vZ8I=s?Y4QoEu$Vy|?ulZQRD~(mR=3_-x8mo5A z#|p1BR?V7^6d4hf4d)W(#GVnKPK zvIFl(JD6jWPawtYeV=~=d+L%z?ImT-qB3V`C>B)KC`-I-D%n@GJf8Hw_~kxc6psnI z6h1mO-p_a9MemI(FO)m3I-vH)RXbF9T(v??(7NF2c#CNq(Wwbip!% z8kZ~6t9%dPEi>$JNotY&!}s~e%|={AJGpjvqGWiJ4vq?bM!Ci{-MjZlnVc3E+=ubwr=WXnLdM^UFoLz&u-8e z@r>g~5L@V`HHlJhbB@45J#?$8ey)dl67?S($g#4wTXgDrWiRbd)Xxl2k-gVTb5-lZ z9vWch8_jlh%R|c+OO~b6?~xN;=K7;0owYX6z3iDxdnR*yv_)W_+%T(Y?& z?IxN^SXO3ICQIIhT`F2=_VQii;TBpz*!+*N&GAR+diI-Ex?_2OrF!WQI~>(zvE=nS zBkS#?X7;j|=BYw|^U?;kZ<{@j^>os0s?v8m@iCW57M9(0-vCG5^b5jtZS>sYF7Xdy zWxsB_ugje_`u#+`@3Ki?ueVc|YL~Ct>D!DS%*kOh9qQ*OH+4}xdA5(5(yXbL_k^D% zoi;vXDA(87wuwK|P4R!_^T?CTF>Rh?DMPeJ`HK{(`u}c-HYZAzAzGj+9Sz|Fo;{K7 zV9qbo4eXr|-M~VgoXx6GSs1PFgsT-l z#P!!Bf4pETC1gKKyK8G;PYUa7?5X)=F~36o{} zAO$=CHi1;I8Ki-9um#vb2G|N5AQNPPY_JXFfLxFVwu1-34v-HX0tH|vC;ffValTTp8+-$lfjyucJOaK6DnKRJ3-*C;fk(k(U_W>qRDo(x18PAXs0R(85xBtt z-~mmb8MJ^_&<5H;2k?SU&;`0d59kGbzz6z)9}LK|ccqNE$_#Fhv-OCtiD}kxdX?r| zPMU9+%LQhdH`OOSpHyzVY8)`!Hk>v%^q2HixJ0%o5 zs69T|#Sd_UpPup?mLDPaPMksQl6K9Bvqd?e0@^s6yBS#ROp~-rS~WY|8)!fqZ&@Yo z*T$PyiT7*cP5(C@jQQg6YMkN9dF){mz5c?L9%eAsr;WQ;iT7&bjjP0awDE>j;@#SK z{VMS;ZM<%kc&9dAyGq=vjn`=6OJB2KtV0`LylUpN3(l3l$iY~FyIz<0Tn?3d8iR2J`fXvMh&n$RR}B#t-MQ(HN*%Q zi_|t~Nb7GCswO}U=jPRX`&Hzm7FKwy+*=~k!$3X=uj^Rj+&OH*2dZ`wU&G|Jz&ongV)IoiS!r)x5+Hx z#tGbkWGPxvz)H|^16GWd8L%R>#DKZbf)f^^1t%;p;{Qo5o5;Ih8YLOB0h50`?3>VeA2!Prf!{C&+CA%{feWSS6?E zsjaJRYpeB$8#pV6BRQ-EX0Kw!q9DNdB)Kp93XHG+N6h(lvvfh&CFKaxkaR+NOWH5> zNbORiR0*v%o4tcZo3c#F%63z<$z~Ged-6?rSoy7TMEWAq$-mma14v^VS{ z+DY4}hnCPhnnjb@7`x6sX6M--7;N;4GkhD93p>-r<*+YZq~P^>;-NEL%=W3-rzwzq zT+};69rqYtp_i{7;_M)2Kj*BUvo|?=gR`G;cHp6POzpl3Tyj5WKjrKvoW0IjpJW#@ zADyVS8Z9wkE6^MP%Rtk8w%m94S~}O}mw39tfeX}6LIzIPe&ruHH7*jq)4=9I(>a?H zvYT11Zt4$aRRfxv*zcgYktM>bjm$x=z(^yjhJq&MfTTvIlN*rL$Y#K|8<-AG3?$uA zq6GJ&RjNn0W&MJ)w_weATU4|3OTjO*HZ7b>GEC-zd!e`pQt*om zjZOFrSYOX-;Mudbd2qX)*?dpt;N$E)(EemgH1cX$tO!S2SP7in%y#^~O4M~o%NI|< z&_a@)Ut8{A52e+h7sW~fe_w20WE?DD>&(0Fd@3RmA|ob5K~zLT z!Vt4@=TnPw%Pf|yAFxhXOQ}nHpx#$S%U<&rX1l3H8kaJ}CUQz>hx9a(Z{C@`t0G(= zD|b$ZmTxSP#h0F>6i)LO*Gvf zyRr#mr>|QQ;c$gNW`!#i^aD8alHvs0Y^K47l{DU8e6p0*Lwc8%0F!07%HlGb07KoH z4tvVzI&e3J;Um6`M!*+OniC+coK^>7QE&x%G5&<75c`is;3}6@QyoSNaRm0a z?ysVnBdk|jrNqjjI6zJaXIPwm$a>G3McXZZvqbpL;-InCLyOGteif|%@oC&eY_h=L z_~mNqg1$^_{AD$r?~l!O<79nbI-+4{rCEhzZkp^*+;(Gw6RwzGLV?yA8t2ar)L^SK zXJec&w8E_UhA9y1d3Xi8z0DM{BD&#iX4PFX-tb@HKR31z>e58r*f7ctk!jnc&*%uf zBwdh(^m^YP9d*DMzso)|>w2W1Z>??HT(`N_BX2iGCRhd`XCZ2mtp_biWTXRnqQnd+ zwTlkei*LJcO)|uWJcfiL(~!rJ2qY4jj@XfJAyLQ-#5c}~L?basEHV?Bh0I3ckU2;^ zl7P%b<{^nl5|WI}M^cbfBn?SN79b0eMaUD#Vk85}M3x{~$dkxYWErv?S%G9DE0G*z z74j7FG?I%vgXAHrku}J($Xa9_vL1O3$wvy1Ld1m>A;m}uQi_xz<;VtPBeDspKq`^v zkt(DbaU(T|$GA0F>%1FnRo@m^r9P-fSzn+RX_BSee9gQx474$Ao%*&hGhggh<*u3i RrI7wfU4(J^zjZ6-{vS)xaf$!{