task2: implement batch38 group A consumer lifecycle features

This commit is contained in:
Joseph Doherty
2026-03-01 00:11:47 -05:00
parent ad3a1bbb38
commit fce6bd7dca
13 changed files with 725 additions and 3 deletions

View File

@@ -128,7 +128,14 @@ public static class JwtProcessor
// If start > end, end is on the next day (overnight range).
if (startTime > endTime)
{
end = end.AddDays(1);
if (now.TimeOfDay < endTime)
{
start = start.AddDays(-1);
}
else
{
end = end.AddDays(1);
}
}
if (start <= now && now < end)

View File

@@ -0,0 +1,10 @@
namespace ZB.MOM.NatsNet.Server;
internal static class SubjectTokens
{
internal static string[] Subjects(IEnumerable<string> filters)
{
ArgumentNullException.ThrowIfNull(filters);
return filters.Where(static filter => !string.IsNullOrWhiteSpace(filter)).ToArray();
}
}

View File

@@ -0,0 +1,268 @@
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal const int DefaultMaxAckPending = 1000;
internal static readonly TimeSpan DefaultAckWait = TimeSpan.FromSeconds(30);
internal static readonly TimeSpan DefaultDeleteWait = TimeSpan.FromSeconds(5);
internal static readonly TimeSpan DefaultPinnedTtl = TimeSpan.FromMinutes(2);
internal static JsApiError? SetConsumerConfigDefaults(
ConsumerConfig config,
StreamConfig streamConfig,
JetStreamAccountLimits? selectedLimits,
bool pedantic)
{
ArgumentNullException.ThrowIfNull(config);
ArgumentNullException.ThrowIfNull(streamConfig);
var streamReplicas = Math.Max(1, streamConfig.Replicas);
if (config.MaxDeliver is 0 or < -1)
{
if (pedantic && config.MaxDeliver < -1)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_deliver must be set to -1"));
config.MaxDeliver = -1;
}
if (config.MaxWaiting < 0)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_waiting must not be negative"));
config.MaxWaiting = 0;
}
if (config.MaxAckPending < -1)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_ack_pending must be set to -1"));
config.MaxAckPending = -1;
}
if (config.MaxRequestBatch < 0)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_batch must not be negative"));
config.MaxRequestBatch = 0;
}
if (config.MaxRequestExpires < TimeSpan.Zero)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_expires must not be negative"));
config.MaxRequestExpires = TimeSpan.Zero;
}
if (config.MaxRequestMaxBytes < 0)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_bytes must not be negative"));
config.MaxRequestMaxBytes = 0;
}
if (config.Heartbeat < TimeSpan.Zero)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("idle_heartbeat must not be negative"));
config.Heartbeat = TimeSpan.Zero;
}
if (config.InactiveThreshold < TimeSpan.Zero)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("inactive_threshold must not be negative"));
config.InactiveThreshold = TimeSpan.Zero;
}
if (config.PinnedTTL < TimeSpan.Zero)
{
if (pedantic)
return JsApiErrors.NewJSPedanticError(new InvalidOperationException("priority_timeout must not be negative"));
config.PinnedTTL = TimeSpan.Zero;
}
if (config.AckWait == TimeSpan.Zero)
config.AckWait = DefaultAckWait;
if (config.MaxAckPending == 0 && config.AckPolicy != AckPolicy.AckNone)
{
config.MaxAckPending = selectedLimits?.MaxAckPending > 0
? selectedLimits.MaxAckPending
: DefaultMaxAckPending;
}
if (config.InactiveThreshold == TimeSpan.Zero && string.IsNullOrWhiteSpace(config.Durable))
config.InactiveThreshold = DefaultDeleteWait;
if (config.PinnedTTL == TimeSpan.Zero && config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient)
config.PinnedTTL = DefaultPinnedTtl;
if (config.Replicas == 0 || config.Replicas > streamReplicas)
config.Replicas = streamReplicas;
if (!string.IsNullOrWhiteSpace(config.Name) && string.IsNullOrWhiteSpace(config.Durable))
config.Durable = config.Name;
return null;
}
internal static JsApiError? CheckConsumerCfg(
ConsumerConfig config,
StreamConfig streamConfig,
JetStreamAccountLimits? selectedLimits,
bool isRecovering)
{
ArgumentNullException.ThrowIfNull(config);
ArgumentNullException.ThrowIfNull(streamConfig);
var streamReplicas = Math.Max(1, streamConfig.Replicas);
if (!string.IsNullOrWhiteSpace(config.Durable) &&
!string.IsNullOrWhiteSpace(config.Name) &&
!string.Equals(config.Durable, config.Name, StringComparison.Ordinal))
{
return JsApiErrors.NewJSConsumerCreateDurableAndNameMismatchError();
}
if (HasPathSeparators(config.Durable) || HasPathSeparators(config.Name))
return JsApiErrors.NewJSConsumerNameContainsPathSeparatorsError();
if (config.Replicas > streamReplicas)
return JsApiErrors.NewJSConsumerReplicasExceedsStreamError();
if (!Enum.IsDefined(config.AckPolicy))
return JsApiErrors.NewJSConsumerAckPolicyInvalidError();
if (!Enum.IsDefined(config.ReplayPolicy))
return JsApiErrors.NewJSConsumerReplayPolicyInvalidError();
if (!Enum.IsDefined(config.DeliverPolicy))
return JsApiErrors.NewJSConsumerInvalidPolicyError(new InvalidOperationException("deliver policy invalid"));
if (config.FilterSubjects is { Length: > 0 } && !string.IsNullOrWhiteSpace(config.FilterSubject))
return JsApiErrors.NewJSConsumerDuplicateFilterSubjectsError();
var filters = config.FilterSubjects is { Length: > 0 }
? SubjectTokens.Subjects(config.FilterSubjects)
: (string.IsNullOrWhiteSpace(config.FilterSubject) ? [] : [config.FilterSubject]);
for (var i = 0; i < filters.Length; i++)
{
if (string.IsNullOrWhiteSpace(filters[i]))
return JsApiErrors.NewJSConsumerEmptyFilterError();
if (!SubscriptionIndex.IsValidSubject(filters[i]))
return JsApiErrors.NewJSConsumerFilterNotSubsetError();
for (var j = i + 1; j < filters.Length; j++)
{
if (SubscriptionIndex.SubjectsCollide(filters[i], filters[j]))
return JsApiErrors.NewJSConsumerOverlappingSubjectFiltersError();
}
}
var isPush = !string.IsNullOrWhiteSpace(config.DeliverSubject);
if (isPush)
{
if (!SubscriptionIndex.IsValidSubject(config.DeliverSubject!))
return JsApiErrors.NewJSConsumerInvalidDeliverSubjectError();
if (SubscriptionIndex.SubjectHasWildcard(config.DeliverSubject!))
return JsApiErrors.NewJSConsumerDeliverToWildcardsError();
if (config.MaxWaiting > 0)
return JsApiErrors.NewJSConsumerPushMaxWaitingError();
}
else
{
if (config.RateLimit > 0)
return JsApiErrors.NewJSConsumerPullWithRateLimitError();
}
if (config.MaxAckPending > 0 && selectedLimits?.MaxAckPending > 0 && config.MaxAckPending > selectedLimits.MaxAckPending)
return JsApiErrors.NewJSConsumerMaxPendingAckExcessError(selectedLimits.MaxAckPending);
if (streamConfig.Retention == RetentionPolicy.WorkQueuePolicy && config.AckPolicy != AckPolicy.AckExplicit)
return JsApiErrors.NewJSConsumerWQRequiresExplicitAckError();
if (config.Direct)
{
if (isPush)
return JsApiErrors.NewJSConsumerDirectRequiresPushError();
if (!string.IsNullOrWhiteSpace(config.Durable))
return JsApiErrors.NewJSConsumerDirectRequiresEphemeralError();
}
_ = isRecovering;
return null;
}
internal void UpdateInactiveThreshold(ConsumerConfig config)
{
ArgumentNullException.ThrowIfNull(config);
_mu.EnterWriteLock();
try
{
_deleteThreshold = config.InactiveThreshold > TimeSpan.Zero
? config.InactiveThreshold
: DefaultDeleteWait;
Config.InactiveThreshold = _deleteThreshold;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void UpdatePauseState(ConsumerConfig config, DateTime? nowUtc = null)
{
ArgumentNullException.ThrowIfNull(config);
var now = nowUtc ?? DateTime.UtcNow;
_mu.EnterWriteLock();
try
{
Config.PauseUntil = config.PauseUntil;
_isPaused = config.PauseUntil.HasValue && config.PauseUntil.Value > now;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ConsumerAssignment? ConsumerAssignment()
{
_mu.EnterReadLock();
try
{
return _assignment;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetConsumerAssignment(ConsumerAssignment? assignment)
{
_mu.EnterWriteLock();
try
{
_assignment = assignment;
}
finally
{
_mu.ExitWriteLock();
}
}
private static bool HasPathSeparators(string? value)
{
if (string.IsNullOrWhiteSpace(value))
return false;
return value.Contains('/') || value.Contains('\\');
}
}

View File

@@ -19,7 +19,7 @@ namespace ZB.MOM.NatsNet.Server;
/// Represents a JetStream consumer, managing message delivery, ack tracking, and lifecycle.
/// Mirrors the <c>consumer</c> struct in server/consumer.go.
/// </summary>
internal sealed class NatsConsumer : IDisposable
internal sealed partial class NatsConsumer : IDisposable
{
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
@@ -41,6 +41,8 @@ internal sealed class NatsConsumer : IDisposable
private NatsStream? _streamRef;
private ConsumerAssignment? _assignment;
private DateTime _lostQuorumSent;
private TimeSpan _deleteThreshold;
private bool _isPaused;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;
@@ -320,7 +322,8 @@ internal sealed class NatsConsumer : IDisposable
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, dseq);
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sseq);
Interlocked.Exchange(ref AckFloor, (long)_state.AckFloor.Stream);
return null;
Exception? noError = null;
return noError;
}
finally
{

View File

@@ -0,0 +1,80 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAction(
ConsumerConfig config,
string oname,
ConsumerAction action,
bool pedantic = false) =>
AddConsumerWithAssignment(config, oname, null, isRecovering: false, action, pedantic);
internal (NatsConsumer? Consumer, Exception? Error) AddConsumer(
ConsumerConfig config,
string oname,
bool pedantic = false) =>
AddConsumerWithAssignment(config, oname, null, isRecovering: false, ConsumerAction.CreateOrUpdate, pedantic);
internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAssignment(
ConsumerConfig config,
string oname,
ConsumerAssignment? assignment,
bool isRecovering,
ConsumerAction action,
bool pedantic = false)
{
ArgumentNullException.ThrowIfNull(config);
_mu.EnterWriteLock();
try
{
if (_closed)
return (null, new InvalidOperationException("stream closed"));
var name = !string.IsNullOrWhiteSpace(oname)
? oname
: (!string.IsNullOrWhiteSpace(config.Name) ? config.Name! : (config.Durable ?? string.Empty));
if (string.IsNullOrWhiteSpace(name))
return (null, new InvalidOperationException("consumer name required"));
config.Name = name;
config.Durable ??= name;
var defaultsErr = NatsConsumer.SetConsumerConfigDefaults(config, Config, null, pedantic);
if (defaultsErr is not null)
return (null, new InvalidOperationException(defaultsErr.Description ?? "consumer defaults invalid"));
var cfgErr = NatsConsumer.CheckConsumerCfg(config, Config, null, isRecovering);
if (cfgErr is not null)
return (null, new InvalidOperationException(cfgErr.Description ?? "consumer config invalid"));
if (_consumers.TryGetValue(name, out var existing))
{
if (action == ConsumerAction.Create)
return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerAlreadyExistsError().Description ?? "consumer exists"));
existing.UpdateConfig(config);
if (assignment is not null)
existing.SetConsumerAssignment(assignment);
return (existing, null);
}
if (action == ConsumerAction.Update)
return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerDoesNotExistError().Description ?? "consumer does not exist"));
var consumer = NatsConsumer.Create(this, config, action, assignment);
if (consumer is null)
return (null, new InvalidOperationException("consumer create failed"));
consumer.SetConsumerAssignment(assignment);
consumer.UpdateInactiveThreshold(config);
consumer.UpdatePauseState(config);
_consumers[name] = consumer;
return (consumer, null);
}
finally
{
_mu.ExitWriteLock();
}
}
}

View File

@@ -52,6 +52,7 @@ internal sealed partial class NatsStream : IDisposable
private ulong _clseq;
private ulong _clfs;
private readonly Dictionary<string, StreamSourceInfo> _sources = new(StringComparer.Ordinal);
private readonly Dictionary<string, NatsConsumer> _consumers = new(StringComparer.Ordinal);
private StreamSourceInfo? _mirrorInfo;
private Timer? _mirrorConsumerSetupTimer;
private readonly Dictionary<string, ulong> _sourceStartingSequences = new(StringComparer.Ordinal);

View File

@@ -0,0 +1,82 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace ZB.MOM.NatsNet.Server;
internal static class ConsumerPolicyExtensions
{
internal static string String(this PriorityPolicy policy) =>
policy switch
{
PriorityPolicy.PriorityOverflow => "\"overflow\"",
PriorityPolicy.PriorityPinnedClient => "\"pinned_client\"",
PriorityPolicy.PriorityPrioritized => "\"prioritized\"",
_ => "\"none\"",
};
internal static string String(this DeliverPolicy policy) =>
policy switch
{
DeliverPolicy.DeliverAll => "all",
DeliverPolicy.DeliverLast => "last",
DeliverPolicy.DeliverNew => "new",
DeliverPolicy.DeliverByStartSequence => "by_start_sequence",
DeliverPolicy.DeliverByStartTime => "by_start_time",
DeliverPolicy.DeliverLastPerSubject => "last_per_subject",
_ => "undefined",
};
internal static string String(this AckPolicy policy) =>
policy switch
{
AckPolicy.AckNone => "none",
AckPolicy.AckAll => "all",
_ => "explicit",
};
internal static string String(this ReplayPolicy policy) =>
policy switch
{
ReplayPolicy.ReplayInstant => "instant",
_ => "original",
};
}
public sealed class PriorityPolicyJsonConverter : JsonConverter<PriorityPolicy>
{
public override PriorityPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"none" => PriorityPolicy.PriorityNone,
"overflow" => PriorityPolicy.PriorityOverflow,
"pinned_client" => PriorityPolicy.PriorityPinnedClient,
"prioritized" => PriorityPolicy.PriorityPrioritized,
var value => throw new JsonException($"unknown priority policy: {value}"),
};
}
public override void Write(Utf8JsonWriter writer, PriorityPolicy value, JsonSerializerOptions options)
{
switch (value)
{
case PriorityPolicy.PriorityNone:
writer.WriteStringValue("none");
break;
case PriorityPolicy.PriorityOverflow:
writer.WriteStringValue("overflow");
break;
case PriorityPolicy.PriorityPinnedClient:
writer.WriteStringValue("pinned_client");
break;
case PriorityPolicy.PriorityPrioritized:
writer.WriteStringValue("prioritized");
break;
default:
throw new JsonException($"unknown priority policy: {value}");
}
}
}

View File

@@ -615,6 +615,7 @@ public enum DeliverPolicy
// ---------------------------------------------------------------------------
/// <summary>Policy for selecting messages based on priority.</summary>
[JsonConverter(typeof(PriorityPolicyJsonConverter))]
public enum PriorityPolicy
{
PriorityNone = 0,

View File

@@ -0,0 +1,51 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace ZB.MOM.NatsNet.Server;
internal static class ConsumerActionExtensions
{
internal static string String(this ConsumerAction action) =>
action switch
{
ConsumerAction.CreateOrUpdate => "\"create_or_update\"",
ConsumerAction.Create => "\"create\"",
ConsumerAction.Update => "\"update\"",
_ => "\"create_or_update\"",
};
}
public sealed class ConsumerActionJsonConverter : JsonConverter<ConsumerAction>
{
public override ConsumerAction Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"create" => ConsumerAction.Create,
"update" => ConsumerAction.Update,
"create_or_update" => ConsumerAction.CreateOrUpdate,
var value => throw new JsonException($"unknown consumer action: {value}"),
};
}
public override void Write(Utf8JsonWriter writer, ConsumerAction value, JsonSerializerOptions options)
{
switch (value)
{
case ConsumerAction.Create:
writer.WriteStringValue("create");
break;
case ConsumerAction.Update:
writer.WriteStringValue("update");
break;
case ConsumerAction.CreateOrUpdate:
writer.WriteStringValue("create_or_update");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}

View File

@@ -395,6 +395,7 @@ public sealed class CreateConsumerRequest
/// Specifies the intended action when creating a consumer.
/// Mirrors <c>ConsumerAction</c> in server/consumer.go.
/// </summary>
[JsonConverter(typeof(ConsumerActionJsonConverter))]
public enum ConsumerAction
{
/// <summary>Create a new consumer or update if it already exists.</summary>

View File

@@ -0,0 +1,133 @@
using System.Text.Json;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class ConsumerPoliciesTests
{
[Fact]
public void ConsumerAction_StringAndJsonParity_ShouldMatchGo()
{
ConsumerAction.CreateOrUpdate.String().ShouldBe("\"create_or_update\"");
ConsumerAction.Create.String().ShouldBe("\"create\"");
ConsumerAction.Update.String().ShouldBe("\"update\"");
JsonSerializer.Serialize(ConsumerAction.Create).ShouldBe("\"create\"");
JsonSerializer.Deserialize<ConsumerAction>("\"update\"").ShouldBe(ConsumerAction.Update);
Should.Throw<JsonException>(() => JsonSerializer.Deserialize<ConsumerAction>("\"bogus\""));
}
[Fact]
public void PriorityPolicy_StringAndJsonParity_ShouldMatchGo()
{
PriorityPolicy.PriorityNone.String().ShouldBe("\"none\"");
PriorityPolicy.PriorityOverflow.String().ShouldBe("\"overflow\"");
PriorityPolicy.PriorityPinnedClient.String().ShouldBe("\"pinned_client\"");
PriorityPolicy.PriorityPrioritized.String().ShouldBe("\"prioritized\"");
JsonSerializer.Serialize(PriorityPolicy.PriorityPinnedClient).ShouldBe("\"pinned_client\"");
JsonSerializer.Deserialize<PriorityPolicy>("\"prioritized\"").ShouldBe(PriorityPolicy.PriorityPrioritized);
Should.Throw<JsonException>(() => JsonSerializer.Deserialize<PriorityPolicy>("\"none-ish\""));
}
[Fact]
public void ConsumerPolicies_StringParity_ShouldMatchGo()
{
DeliverPolicy.DeliverByStartSequence.String().ShouldBe("by_start_sequence");
AckPolicy.AckExplicit.String().ShouldBe("explicit");
ReplayPolicy.ReplayInstant.String().ShouldBe("instant");
}
[Fact]
public void SubjectTokens_Subjects_RemovesEmptyValues()
{
var subjects = SubjectTokens.Subjects(new[] { "foo.*", string.Empty, " ", "bar.>" });
subjects.ShouldBe(["foo.*", "bar.>"]);
}
[Fact]
public void SetConsumerConfigDefaults_InvalidNegativesInPedanticMode_ReturnsError()
{
var cfg = new ConsumerConfig { MaxDeliver = -2 };
var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 3 };
var err = NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: true);
err.ShouldNotBeNull();
cfg.MaxDeliver.ShouldBe(-2);
}
[Fact]
public void SetConsumerConfigDefaults_AppliesGoDefaults_ShouldPopulateExpectedValues()
{
var cfg = new ConsumerConfig
{
Durable = "D",
MaxDeliver = 0,
AckPolicy = AckPolicy.AckExplicit,
Replicas = 0,
};
var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 3 };
var limits = new JetStreamAccountLimits { MaxAckPending = 2500 };
var err = NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, limits, pedantic: false);
err.ShouldBeNull();
cfg.MaxDeliver.ShouldBe(-1);
cfg.AckWait.ShouldBe(TimeSpan.FromSeconds(30));
cfg.MaxAckPending.ShouldBe(2500);
cfg.Replicas.ShouldBe(3);
}
[Fact]
public void CheckConsumerCfg_DurableNameMismatch_ReturnsError()
{
var cfg = new ConsumerConfig { Name = "A", Durable = "B", AckPolicy = AckPolicy.AckExplicit };
var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 1 };
var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false);
err.ShouldNotBeNull();
err.ErrCode.ShouldBe(JsApiErrors.ConsumerCreateDurableAndNameMismatch.ErrCode);
}
[Fact]
public void CheckConsumerCfg_OverlappingFilterSubjects_ReturnsError()
{
var cfg = new ConsumerConfig
{
Durable = "D",
AckPolicy = AckPolicy.AckExplicit,
FilterSubjects = ["orders.*", "orders.created"],
};
var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 1 };
var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false);
err.ShouldNotBeNull();
err.ErrCode.ShouldBe(JsApiErrors.ConsumerOverlappingSubjectFilters.ErrCode);
}
[Fact]
public void CheckConsumerCfg_WithValidPullConfig_ReturnsNull()
{
var cfg = new ConsumerConfig
{
Durable = "D",
AckPolicy = AckPolicy.AckExplicit,
FilterSubject = "orders.created",
};
var streamCfg = new StreamConfig
{
Name = "ORDERS",
Replicas = 1,
Retention = RetentionPolicy.LimitsPolicy,
Subjects = ["orders.>"],
};
var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false);
err.ShouldBeNull();
}
}

View File

@@ -127,4 +127,89 @@ public sealed class NatsConsumerTests
q.Peek()!.Reply.ShouldBe("2a");
q.Peek()!.N.ShouldBe(3);
}
[Fact]
public void AddConsumerWithAction_CreateThenUpdate_ShouldRespectActions()
{
var account = new Account { Name = "A" };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy };
var stream = NatsStream.Create(account, streamCfg, null, null, null, null);
stream.ShouldNotBeNull();
var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit };
var (created, createErr) = stream!.AddConsumerWithAction(cfg, "D", ConsumerAction.Create, pedantic: false);
createErr.ShouldBeNull();
created.ShouldNotBeNull();
var updateCfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll };
var (updated, updateErr) = stream.AddConsumerWithAction(updateCfg, "D", ConsumerAction.Update, pedantic: false);
updateErr.ShouldBeNull();
updated.ShouldNotBeNull();
updated!.GetConfig().AckPolicy.ShouldBe(AckPolicy.AckAll);
}
[Fact]
public void AddConsumer_WithAssignment_ShouldAttachAssignment()
{
var account = new Account { Name = "A" };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy };
var stream = NatsStream.Create(account, streamCfg, null, null, null, null);
stream.ShouldNotBeNull();
var assignment = new ConsumerAssignment
{
Name = "D",
Stream = "S",
Group = new RaftGroup { Name = "RG", Peers = ["N1"] },
};
var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit };
var (consumer, err) = stream!.AddConsumerWithAssignment(cfg, "D", assignment, isRecovering: false, ConsumerAction.Create, pedantic: false);
err.ShouldBeNull();
consumer.ShouldNotBeNull();
consumer!.ConsumerAssignment().ShouldBeSameAs(assignment);
}
[Fact]
public void UpdateInactiveThreshold_AndPauseState_ShouldTrackConfigValues()
{
var account = new Account { Name = "A" };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy };
var stream = NatsStream.Create(account, streamCfg, null, null, null, null);
stream.ShouldNotBeNull();
var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit };
var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null);
consumer.ShouldNotBeNull();
consumer!.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromSeconds(30) });
consumer.GetConfig().InactiveThreshold.ShouldBe(TimeSpan.FromSeconds(30));
var pauseUntil = DateTime.UtcNow.AddMinutes(1);
consumer.UpdatePauseState(new ConsumerConfig { PauseUntil = pauseUntil });
consumer.GetConfig().PauseUntil.ShouldBe(pauseUntil);
}
[Fact]
public void ConsumerAssignment_GetSet_ShouldRoundTrip()
{
var account = new Account { Name = "A" };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy };
var stream = NatsStream.Create(account, streamCfg, null, null, null, null);
stream.ShouldNotBeNull();
var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit };
var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null);
consumer.ShouldNotBeNull();
var assignment = new ConsumerAssignment
{
Name = "D",
Stream = "S",
Group = new RaftGroup { Name = "RG", Peers = ["N1"] },
};
consumer!.SetConsumerAssignment(assignment);
consumer.ConsumerAssignment().ShouldBeSameAs(assignment);
}
}

Binary file not shown.