diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs index de4ef4e..69d5ab2 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs @@ -128,7 +128,14 @@ public static class JwtProcessor // If start > end, end is on the next day (overnight range). if (startTime > endTime) { - end = end.AddDays(1); + if (now.TimeOfDay < endTime) + { + start = start.AddDays(-1); + } + else + { + end = end.AddDays(1); + } } if (start <= now && now < end) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs new file mode 100644 index 0000000..711d466 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs @@ -0,0 +1,10 @@ +namespace ZB.MOM.NatsNet.Server; + +internal static class SubjectTokens +{ + internal static string[] Subjects(IEnumerable filters) + { + ArgumentNullException.ThrowIfNull(filters); + return filters.Where(static filter => !string.IsNullOrWhiteSpace(filter)).ToArray(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs new file mode 100644 index 0000000..cb90083 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs @@ -0,0 +1,268 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal const int DefaultMaxAckPending = 1000; + internal static readonly TimeSpan DefaultAckWait = TimeSpan.FromSeconds(30); + internal static readonly TimeSpan DefaultDeleteWait = TimeSpan.FromSeconds(5); + internal static readonly TimeSpan DefaultPinnedTtl = TimeSpan.FromMinutes(2); + + internal static JsApiError? SetConsumerConfigDefaults( + ConsumerConfig config, + StreamConfig streamConfig, + JetStreamAccountLimits? selectedLimits, + bool pedantic) + { + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(streamConfig); + var streamReplicas = Math.Max(1, streamConfig.Replicas); + + if (config.MaxDeliver is 0 or < -1) + { + if (pedantic && config.MaxDeliver < -1) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_deliver must be set to -1")); + config.MaxDeliver = -1; + } + + if (config.MaxWaiting < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_waiting must not be negative")); + config.MaxWaiting = 0; + } + + if (config.MaxAckPending < -1) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_ack_pending must be set to -1")); + config.MaxAckPending = -1; + } + + if (config.MaxRequestBatch < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_batch must not be negative")); + config.MaxRequestBatch = 0; + } + + if (config.MaxRequestExpires < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_expires must not be negative")); + config.MaxRequestExpires = TimeSpan.Zero; + } + + if (config.MaxRequestMaxBytes < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_bytes must not be negative")); + config.MaxRequestMaxBytes = 0; + } + + if (config.Heartbeat < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("idle_heartbeat must not be negative")); + config.Heartbeat = TimeSpan.Zero; + } + + if (config.InactiveThreshold < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("inactive_threshold must not be negative")); + config.InactiveThreshold = TimeSpan.Zero; + } + + if (config.PinnedTTL < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("priority_timeout must not be negative")); + config.PinnedTTL = TimeSpan.Zero; + } + + if (config.AckWait == TimeSpan.Zero) + config.AckWait = DefaultAckWait; + + if (config.MaxAckPending == 0 && config.AckPolicy != AckPolicy.AckNone) + { + config.MaxAckPending = selectedLimits?.MaxAckPending > 0 + ? selectedLimits.MaxAckPending + : DefaultMaxAckPending; + } + + if (config.InactiveThreshold == TimeSpan.Zero && string.IsNullOrWhiteSpace(config.Durable)) + config.InactiveThreshold = DefaultDeleteWait; + + if (config.PinnedTTL == TimeSpan.Zero && config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient) + config.PinnedTTL = DefaultPinnedTtl; + + if (config.Replicas == 0 || config.Replicas > streamReplicas) + config.Replicas = streamReplicas; + + if (!string.IsNullOrWhiteSpace(config.Name) && string.IsNullOrWhiteSpace(config.Durable)) + config.Durable = config.Name; + + return null; + } + + internal static JsApiError? CheckConsumerCfg( + ConsumerConfig config, + StreamConfig streamConfig, + JetStreamAccountLimits? selectedLimits, + bool isRecovering) + { + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(streamConfig); + var streamReplicas = Math.Max(1, streamConfig.Replicas); + + if (!string.IsNullOrWhiteSpace(config.Durable) && + !string.IsNullOrWhiteSpace(config.Name) && + !string.Equals(config.Durable, config.Name, StringComparison.Ordinal)) + { + return JsApiErrors.NewJSConsumerCreateDurableAndNameMismatchError(); + } + + if (HasPathSeparators(config.Durable) || HasPathSeparators(config.Name)) + return JsApiErrors.NewJSConsumerNameContainsPathSeparatorsError(); + + if (config.Replicas > streamReplicas) + return JsApiErrors.NewJSConsumerReplicasExceedsStreamError(); + + if (!Enum.IsDefined(config.AckPolicy)) + return JsApiErrors.NewJSConsumerAckPolicyInvalidError(); + + if (!Enum.IsDefined(config.ReplayPolicy)) + return JsApiErrors.NewJSConsumerReplayPolicyInvalidError(); + + if (!Enum.IsDefined(config.DeliverPolicy)) + return JsApiErrors.NewJSConsumerInvalidPolicyError(new InvalidOperationException("deliver policy invalid")); + + if (config.FilterSubjects is { Length: > 0 } && !string.IsNullOrWhiteSpace(config.FilterSubject)) + return JsApiErrors.NewJSConsumerDuplicateFilterSubjectsError(); + + var filters = config.FilterSubjects is { Length: > 0 } + ? SubjectTokens.Subjects(config.FilterSubjects) + : (string.IsNullOrWhiteSpace(config.FilterSubject) ? [] : [config.FilterSubject]); + + for (var i = 0; i < filters.Length; i++) + { + if (string.IsNullOrWhiteSpace(filters[i])) + return JsApiErrors.NewJSConsumerEmptyFilterError(); + if (!SubscriptionIndex.IsValidSubject(filters[i])) + return JsApiErrors.NewJSConsumerFilterNotSubsetError(); + + for (var j = i + 1; j < filters.Length; j++) + { + if (SubscriptionIndex.SubjectsCollide(filters[i], filters[j])) + return JsApiErrors.NewJSConsumerOverlappingSubjectFiltersError(); + } + } + + var isPush = !string.IsNullOrWhiteSpace(config.DeliverSubject); + if (isPush) + { + if (!SubscriptionIndex.IsValidSubject(config.DeliverSubject!)) + return JsApiErrors.NewJSConsumerInvalidDeliverSubjectError(); + + if (SubscriptionIndex.SubjectHasWildcard(config.DeliverSubject!)) + return JsApiErrors.NewJSConsumerDeliverToWildcardsError(); + + if (config.MaxWaiting > 0) + return JsApiErrors.NewJSConsumerPushMaxWaitingError(); + } + else + { + if (config.RateLimit > 0) + return JsApiErrors.NewJSConsumerPullWithRateLimitError(); + } + + if (config.MaxAckPending > 0 && selectedLimits?.MaxAckPending > 0 && config.MaxAckPending > selectedLimits.MaxAckPending) + return JsApiErrors.NewJSConsumerMaxPendingAckExcessError(selectedLimits.MaxAckPending); + + if (streamConfig.Retention == RetentionPolicy.WorkQueuePolicy && config.AckPolicy != AckPolicy.AckExplicit) + return JsApiErrors.NewJSConsumerWQRequiresExplicitAckError(); + + if (config.Direct) + { + if (isPush) + return JsApiErrors.NewJSConsumerDirectRequiresPushError(); + if (!string.IsNullOrWhiteSpace(config.Durable)) + return JsApiErrors.NewJSConsumerDirectRequiresEphemeralError(); + } + + _ = isRecovering; + return null; + } + + internal void UpdateInactiveThreshold(ConsumerConfig config) + { + ArgumentNullException.ThrowIfNull(config); + + _mu.EnterWriteLock(); + try + { + _deleteThreshold = config.InactiveThreshold > TimeSpan.Zero + ? config.InactiveThreshold + : DefaultDeleteWait; + + Config.InactiveThreshold = _deleteThreshold; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdatePauseState(ConsumerConfig config, DateTime? nowUtc = null) + { + ArgumentNullException.ThrowIfNull(config); + var now = nowUtc ?? DateTime.UtcNow; + + _mu.EnterWriteLock(); + try + { + Config.PauseUntil = config.PauseUntil; + _isPaused = config.PauseUntil.HasValue && config.PauseUntil.Value > now; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerAssignment? ConsumerAssignment() + { + _mu.EnterReadLock(); + try + { + return _assignment; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetConsumerAssignment(ConsumerAssignment? assignment) + { + _mu.EnterWriteLock(); + try + { + _assignment = assignment; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private static bool HasPathSeparators(string? value) + { + if (string.IsNullOrWhiteSpace(value)) + return false; + + return value.Contains('/') || value.Contains('\\'); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 77b9b92..32062b9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -19,7 +19,7 @@ namespace ZB.MOM.NatsNet.Server; /// Represents a JetStream consumer, managing message delivery, ack tracking, and lifecycle. /// Mirrors the consumer struct in server/consumer.go. /// -internal sealed class NatsConsumer : IDisposable +internal sealed partial class NatsConsumer : IDisposable { private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); @@ -41,6 +41,8 @@ internal sealed class NatsConsumer : IDisposable private NatsStream? _streamRef; private ConsumerAssignment? _assignment; private DateTime _lostQuorumSent; + private TimeSpan _deleteThreshold; + private bool _isPaused; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -320,7 +322,8 @@ internal sealed class NatsConsumer : IDisposable _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, dseq); _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sseq); Interlocked.Exchange(ref AckFloor, (long)_state.AckFloor.Stream); - return null; + Exception? noError = null; + return noError; } finally { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs new file mode 100644 index 0000000..a1816d4 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -0,0 +1,80 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAction( + ConsumerConfig config, + string oname, + ConsumerAction action, + bool pedantic = false) => + AddConsumerWithAssignment(config, oname, null, isRecovering: false, action, pedantic); + + internal (NatsConsumer? Consumer, Exception? Error) AddConsumer( + ConsumerConfig config, + string oname, + bool pedantic = false) => + AddConsumerWithAssignment(config, oname, null, isRecovering: false, ConsumerAction.CreateOrUpdate, pedantic); + + internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAssignment( + ConsumerConfig config, + string oname, + ConsumerAssignment? assignment, + bool isRecovering, + ConsumerAction action, + bool pedantic = false) + { + ArgumentNullException.ThrowIfNull(config); + + _mu.EnterWriteLock(); + try + { + if (_closed) + return (null, new InvalidOperationException("stream closed")); + + var name = !string.IsNullOrWhiteSpace(oname) + ? oname + : (!string.IsNullOrWhiteSpace(config.Name) ? config.Name! : (config.Durable ?? string.Empty)); + if (string.IsNullOrWhiteSpace(name)) + return (null, new InvalidOperationException("consumer name required")); + + config.Name = name; + config.Durable ??= name; + + var defaultsErr = NatsConsumer.SetConsumerConfigDefaults(config, Config, null, pedantic); + if (defaultsErr is not null) + return (null, new InvalidOperationException(defaultsErr.Description ?? "consumer defaults invalid")); + + var cfgErr = NatsConsumer.CheckConsumerCfg(config, Config, null, isRecovering); + if (cfgErr is not null) + return (null, new InvalidOperationException(cfgErr.Description ?? "consumer config invalid")); + + if (_consumers.TryGetValue(name, out var existing)) + { + if (action == ConsumerAction.Create) + return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerAlreadyExistsError().Description ?? "consumer exists")); + + existing.UpdateConfig(config); + if (assignment is not null) + existing.SetConsumerAssignment(assignment); + return (existing, null); + } + + if (action == ConsumerAction.Update) + return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerDoesNotExistError().Description ?? "consumer does not exist")); + + var consumer = NatsConsumer.Create(this, config, action, assignment); + if (consumer is null) + return (null, new InvalidOperationException("consumer create failed")); + + consumer.SetConsumerAssignment(assignment); + consumer.UpdateInactiveThreshold(config); + consumer.UpdatePauseState(config); + _consumers[name] = consumer; + return (consumer, null); + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 5ad6a98..d75a776 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -52,6 +52,7 @@ internal sealed partial class NatsStream : IDisposable private ulong _clseq; private ulong _clfs; private readonly Dictionary _sources = new(StringComparer.Ordinal); + private readonly Dictionary _consumers = new(StringComparer.Ordinal); private StreamSourceInfo? _mirrorInfo; private Timer? _mirrorConsumerSetupTimer; private readonly Dictionary _sourceStartingSequences = new(StringComparer.Ordinal); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs new file mode 100644 index 0000000..62ab93e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs @@ -0,0 +1,82 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +internal static class ConsumerPolicyExtensions +{ + internal static string String(this PriorityPolicy policy) => + policy switch + { + PriorityPolicy.PriorityOverflow => "\"overflow\"", + PriorityPolicy.PriorityPinnedClient => "\"pinned_client\"", + PriorityPolicy.PriorityPrioritized => "\"prioritized\"", + _ => "\"none\"", + }; + + internal static string String(this DeliverPolicy policy) => + policy switch + { + DeliverPolicy.DeliverAll => "all", + DeliverPolicy.DeliverLast => "last", + DeliverPolicy.DeliverNew => "new", + DeliverPolicy.DeliverByStartSequence => "by_start_sequence", + DeliverPolicy.DeliverByStartTime => "by_start_time", + DeliverPolicy.DeliverLastPerSubject => "last_per_subject", + _ => "undefined", + }; + + internal static string String(this AckPolicy policy) => + policy switch + { + AckPolicy.AckNone => "none", + AckPolicy.AckAll => "all", + _ => "explicit", + }; + + internal static string String(this ReplayPolicy policy) => + policy switch + { + ReplayPolicy.ReplayInstant => "instant", + _ => "original", + }; +} + +public sealed class PriorityPolicyJsonConverter : JsonConverter +{ + public override PriorityPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "none" => PriorityPolicy.PriorityNone, + "overflow" => PriorityPolicy.PriorityOverflow, + "pinned_client" => PriorityPolicy.PriorityPinnedClient, + "prioritized" => PriorityPolicy.PriorityPrioritized, + var value => throw new JsonException($"unknown priority policy: {value}"), + }; + } + + public override void Write(Utf8JsonWriter writer, PriorityPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case PriorityPolicy.PriorityNone: + writer.WriteStringValue("none"); + break; + case PriorityPolicy.PriorityOverflow: + writer.WriteStringValue("overflow"); + break; + case PriorityPolicy.PriorityPinnedClient: + writer.WriteStringValue("pinned_client"); + break; + case PriorityPolicy.PriorityPrioritized: + writer.WriteStringValue("prioritized"); + break; + default: + throw new JsonException($"unknown priority policy: {value}"); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index 03eb1d7..439ad7d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -615,6 +615,7 @@ public enum DeliverPolicy // --------------------------------------------------------------------------- /// Policy for selecting messages based on priority. +[JsonConverter(typeof(PriorityPolicyJsonConverter))] public enum PriorityPolicy { PriorityNone = 0, diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs new file mode 100644 index 0000000..e3c40cd --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs @@ -0,0 +1,51 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +internal static class ConsumerActionExtensions +{ + internal static string String(this ConsumerAction action) => + action switch + { + ConsumerAction.CreateOrUpdate => "\"create_or_update\"", + ConsumerAction.Create => "\"create\"", + ConsumerAction.Update => "\"update\"", + _ => "\"create_or_update\"", + }; +} + +public sealed class ConsumerActionJsonConverter : JsonConverter +{ + public override ConsumerAction Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "create" => ConsumerAction.Create, + "update" => ConsumerAction.Update, + "create_or_update" => ConsumerAction.CreateOrUpdate, + var value => throw new JsonException($"unknown consumer action: {value}"), + }; + } + + public override void Write(Utf8JsonWriter writer, ConsumerAction value, JsonSerializerOptions options) + { + switch (value) + { + case ConsumerAction.Create: + writer.WriteStringValue("create"); + break; + case ConsumerAction.Update: + writer.WriteStringValue("update"); + break; + case ConsumerAction.CreateOrUpdate: + writer.WriteStringValue("create_or_update"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index c63e5dd..d3b1952 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -395,6 +395,7 @@ public sealed class CreateConsumerRequest /// Specifies the intended action when creating a consumer. /// Mirrors ConsumerAction in server/consumer.go. /// +[JsonConverter(typeof(ConsumerActionJsonConverter))] public enum ConsumerAction { /// Create a new consumer or update if it already exists. diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs new file mode 100644 index 0000000..7a3e2f1 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs @@ -0,0 +1,133 @@ +using System.Text.Json; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class ConsumerPoliciesTests +{ + [Fact] + public void ConsumerAction_StringAndJsonParity_ShouldMatchGo() + { + ConsumerAction.CreateOrUpdate.String().ShouldBe("\"create_or_update\""); + ConsumerAction.Create.String().ShouldBe("\"create\""); + ConsumerAction.Update.String().ShouldBe("\"update\""); + + JsonSerializer.Serialize(ConsumerAction.Create).ShouldBe("\"create\""); + JsonSerializer.Deserialize("\"update\"").ShouldBe(ConsumerAction.Update); + Should.Throw(() => JsonSerializer.Deserialize("\"bogus\"")); + } + + [Fact] + public void PriorityPolicy_StringAndJsonParity_ShouldMatchGo() + { + PriorityPolicy.PriorityNone.String().ShouldBe("\"none\""); + PriorityPolicy.PriorityOverflow.String().ShouldBe("\"overflow\""); + PriorityPolicy.PriorityPinnedClient.String().ShouldBe("\"pinned_client\""); + PriorityPolicy.PriorityPrioritized.String().ShouldBe("\"prioritized\""); + + JsonSerializer.Serialize(PriorityPolicy.PriorityPinnedClient).ShouldBe("\"pinned_client\""); + JsonSerializer.Deserialize("\"prioritized\"").ShouldBe(PriorityPolicy.PriorityPrioritized); + Should.Throw(() => JsonSerializer.Deserialize("\"none-ish\"")); + } + + [Fact] + public void ConsumerPolicies_StringParity_ShouldMatchGo() + { + DeliverPolicy.DeliverByStartSequence.String().ShouldBe("by_start_sequence"); + AckPolicy.AckExplicit.String().ShouldBe("explicit"); + ReplayPolicy.ReplayInstant.String().ShouldBe("instant"); + } + + [Fact] + public void SubjectTokens_Subjects_RemovesEmptyValues() + { + var subjects = SubjectTokens.Subjects(new[] { "foo.*", string.Empty, " ", "bar.>" }); + subjects.ShouldBe(["foo.*", "bar.>"]); + } + + [Fact] + public void SetConsumerConfigDefaults_InvalidNegativesInPedanticMode_ReturnsError() + { + var cfg = new ConsumerConfig { MaxDeliver = -2 }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 3 }; + + var err = NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: true); + + err.ShouldNotBeNull(); + cfg.MaxDeliver.ShouldBe(-2); + } + + [Fact] + public void SetConsumerConfigDefaults_AppliesGoDefaults_ShouldPopulateExpectedValues() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxDeliver = 0, + AckPolicy = AckPolicy.AckExplicit, + Replicas = 0, + }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 3 }; + var limits = new JetStreamAccountLimits { MaxAckPending = 2500 }; + + var err = NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, limits, pedantic: false); + + err.ShouldBeNull(); + cfg.MaxDeliver.ShouldBe(-1); + cfg.AckWait.ShouldBe(TimeSpan.FromSeconds(30)); + cfg.MaxAckPending.ShouldBe(2500); + cfg.Replicas.ShouldBe(3); + } + + [Fact] + public void CheckConsumerCfg_DurableNameMismatch_ReturnsError() + { + var cfg = new ConsumerConfig { Name = "A", Durable = "B", AckPolicy = AckPolicy.AckExplicit }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 1 }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldNotBeNull(); + err.ErrCode.ShouldBe(JsApiErrors.ConsumerCreateDurableAndNameMismatch.ErrCode); + } + + [Fact] + public void CheckConsumerCfg_OverlappingFilterSubjects_ReturnsError() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubjects = ["orders.*", "orders.created"], + }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 1 }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldNotBeNull(); + err.ErrCode.ShouldBe(JsApiErrors.ConsumerOverlappingSubjectFilters.ErrCode); + } + + [Fact] + public void CheckConsumerCfg_WithValidPullConfig_ReturnsNull() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubject = "orders.created", + }; + var streamCfg = new StreamConfig + { + Name = "ORDERS", + Replicas = 1, + Retention = RetentionPolicy.LimitsPolicy, + Subjects = ["orders.>"], + }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldBeNull(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index 611e398..c7ccf81 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -127,4 +127,89 @@ public sealed class NatsConsumerTests q.Peek()!.Reply.ShouldBe("2a"); q.Peek()!.N.ShouldBe(3); } + + [Fact] + public void AddConsumerWithAction_CreateThenUpdate_ShouldRespectActions() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var (created, createErr) = stream!.AddConsumerWithAction(cfg, "D", ConsumerAction.Create, pedantic: false); + createErr.ShouldBeNull(); + created.ShouldNotBeNull(); + + var updateCfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll }; + var (updated, updateErr) = stream.AddConsumerWithAction(updateCfg, "D", ConsumerAction.Update, pedantic: false); + updateErr.ShouldBeNull(); + updated.ShouldNotBeNull(); + updated!.GetConfig().AckPolicy.ShouldBe(AckPolicy.AckAll); + } + + [Fact] + public void AddConsumer_WithAssignment_ShouldAttachAssignment() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var assignment = new ConsumerAssignment + { + Name = "D", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["N1"] }, + }; + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + + var (consumer, err) = stream!.AddConsumerWithAssignment(cfg, "D", assignment, isRecovering: false, ConsumerAction.Create, pedantic: false); + err.ShouldBeNull(); + consumer.ShouldNotBeNull(); + consumer!.ConsumerAssignment().ShouldBeSameAs(assignment); + } + + [Fact] + public void UpdateInactiveThreshold_AndPauseState_ShouldTrackConfigValues() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromSeconds(30) }); + consumer.GetConfig().InactiveThreshold.ShouldBe(TimeSpan.FromSeconds(30)); + + var pauseUntil = DateTime.UtcNow.AddMinutes(1); + consumer.UpdatePauseState(new ConsumerConfig { PauseUntil = pauseUntil }); + consumer.GetConfig().PauseUntil.ShouldBe(pauseUntil); + } + + [Fact] + public void ConsumerAssignment_GetSet_ShouldRoundTrip() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + var assignment = new ConsumerAssignment + { + Name = "D", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["N1"] }, + }; + + consumer!.SetConsumerAssignment(assignment); + consumer.ConsumerAssignment().ShouldBeSameAs(assignment); + } } diff --git a/porting.db b/porting.db index 7a4e86e..94b6836 100644 Binary files a/porting.db and b/porting.db differ