From f38c0e6b6ed0d0cb339d1f74d553520945167d03 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:20:30 -0500 Subject: [PATCH] batch36 task5-6 group-d-e and test waves t3-t4 --- .../JetStream/NatsStream.Source.cs | 289 +++++++++++++++ .../JetStream/NatsStream.Subscriptions.cs | 176 +++++++++ .../JetStream/NatsStream.cs | 6 + .../JetStream/StreamTypes.cs | 50 ++- .../ImplBacklog/ConcurrencyTests1.Batch36.cs | 61 ++++ .../JetStream/JetStreamEngineTests.Batch36.cs | 334 ++++++++++++++++++ .../JetStream/JetStreamEngineTests.cs | 2 +- .../JetStream/StorageEngineTests.Batch36.cs | 36 ++ .../JetStream/StorageEngineTests.cs | 2 +- .../JetStream/StreamLifecycleGroupBTests.cs | 80 +++++ porting.db | Bin 6754304 -> 6754304 bytes 11 files changed, 1030 insertions(+), 6 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs new file mode 100644 index 0000000..10810fc --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs @@ -0,0 +1,289 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal void SetupMirrorConsumer() + { + _mu.EnterWriteLock(); + try + { + _mirrorInfo ??= new StreamSourceInfo + { + Name = Config.Mirror?.Name ?? Name, + }; + _mirrorInfo.Active = DateTime.UtcNow; + _mirrorInfo.Error = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal StreamSource? StreamSource(string indexName) + { + _mu.EnterReadLock(); + try + { + return (Config.Sources ?? []).FirstOrDefault(source => string.Equals(source.IndexName, indexName, StringComparison.Ordinal)); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void RetrySourceConsumerAtSeq(string indexName, ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _sourceStartingSequences[indexName] = sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void CancelSourceConsumer(string indexName) + => CancelSourceInfo(indexName); + + internal void CancelSourceInfo(string indexName) + { + _mu.EnterWriteLock(); + try + { + _sources.Remove(indexName); + _sourceStartingSequences.Remove(indexName); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc) + { + _mu.EnterWriteLock(); + try + { + if (!_sources.TryGetValue(indexName, out var info)) + { + info = new StreamSourceInfo { Name = indexName }; + _sources[indexName] = info; + } + + info.Active = requestedAtUtc == default ? DateTime.UtcNow : requestedAtUtc; + info.Lag = sequence > 0 ? sequence - 1 : 0; + info.Error = null; + _sourceStartingSequences[indexName] = sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool TrySetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc) + { + SetupSourceConsumer(indexName, sequence, requestedAtUtc); + return true; + } + + internal bool ProcessAllSourceMsgs(string indexName, IReadOnlyList messages) + { + var handled = true; + foreach (var message in messages) + handled &= ProcessInboundSourceMsg(indexName, message); + + return handled; + } + + internal void SendFlowControlReply(string replySubject) + { + _ = replySubject; + } + + internal void HandleFlowControl(InMsg message) + { + if (!string.IsNullOrWhiteSpace(message.Reply)) + SendFlowControlReply(message.Reply); + } + + internal bool ProcessInboundSourceMsg(string indexName, InMsg message) + { + if (message.IsControlMsg()) + { + HandleFlowControl(message); + return true; + } + + _mu.EnterWriteLock(); + try + { + if (!_sources.TryGetValue(indexName, out var info)) + return false; + + info.Active = DateTime.UtcNow; + info.Lag = info.Lag > 0 ? info.Lag - 1 : 0; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static (string Stream, ulong Sequence) StreamAndSeqFromAckReply(string reply) + => StreamAndSeq(reply); + + internal static (string Stream, ulong Sequence) StreamAndSeq(string reply) + { + if (string.IsNullOrWhiteSpace(reply)) + return (string.Empty, 0); + + var tokens = reply.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2) + return (string.Empty, 0); + + var stream = tokens[0]; + _ = ulong.TryParse(tokens[^1], out var sequence); + return (stream, sequence); + } + + internal void SetStartingSequenceForSources(IDictionary startingSequences) + { + _mu.EnterWriteLock(); + try + { + foreach (var pair in startingSequences) + _sourceStartingSequences[pair.Key] = pair.Value; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ResetSourceInfo(string indexName) + { + _mu.EnterWriteLock(); + try + { + if (_sources.TryGetValue(indexName, out var info)) + { + info.Active = null; + info.Error = null; + info.Lag = 0; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong StartingSequenceForSources(string indexName) + { + _mu.EnterReadLock(); + try + { + return _sourceStartingSequences.TryGetValue(indexName, out var sequence) ? sequence : 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetupSourceConsumers() + { + _mu.EnterWriteLock(); + try + { + foreach (var source in Config.Sources ?? []) + { + source.SetIndexName(); + if (!_sources.ContainsKey(source.IndexName)) + { + _sources[source.IndexName] = new StreamSourceInfo + { + Name = source.Name, + FilterSubject = source.FilterSubject, + External = source, + }; + } + + var sequence = source.OptStartSeq > 0 ? source.OptStartSeq : 1; + _sourceStartingSequences[source.IndexName] = sequence; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RetryDisconnectedSyncConsumers() + { + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo != null && _mirrorInfo.Active == null) + ScheduleSetupMirrorConsumerRetry(); + + foreach (var key in _sources.Keys.ToArray()) + { + if (_sources[key].Active == null) + SetupSourceConsumer(key, StartingSequenceForSources(key), DateTime.UtcNow); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ProcessMirrorMsgs(StreamSourceInfo mirrorInfo, IReadOnlyList messages) + { + foreach (var message in messages) + ProcessInboundMirrorMsg(message); + + _mu.EnterWriteLock(); + try + { + _mirrorInfo = SourceInfo(mirrorInfo); + if (_mirrorInfo != null) + _mirrorInfo.Active = DateTime.UtcNow; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ProcessInboundMirrorMsg(InMsg message) + { + if (message.IsControlMsg()) + { + HandleFlowControl(message); + return true; + } + + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo == null) + return false; + + _mirrorInfo.Active = DateTime.UtcNow; + _mirrorInfo.Lag = _mirrorInfo.Lag > 0 ? _mirrorInfo.Lag - 1 : 0; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs new file mode 100644 index 0000000..452e36d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs @@ -0,0 +1,176 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal void SubscribeToStream() + { + foreach (var subject in Config.Subjects ?? []) + _ = SubscribeInternal(subject, handler: null); + } + + internal void SubscribeToDirect() + { + _mu.EnterWriteLock(); + try + { + _allowDirectSubscription = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UnsubscribeToDirect() + { + _mu.EnterWriteLock(); + try + { + _allowDirectSubscription = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SubscribeToMirrorDirect() + { + _mu.EnterWriteLock(); + try + { + _allowMirrorDirectSubscription = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UnsubscribeToMirrorDirect() + { + _mu.EnterWriteLock(); + try + { + _allowMirrorDirectSubscription = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopSourceConsumers() + { + _mu.EnterWriteLock(); + try + { + _sources.Clear(); + _sourceStartingSequences.Clear(); + _mirrorInfo = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveInternalConsumer(string subject) + { + _ = UnsubscribeInternal(subject); + } + + internal void UnsubscribeToStream() + { + foreach (var subject in Config.Subjects ?? []) + _ = UnsubscribeInternal(subject); + } + + internal void DeleteInflightBatches(bool preserveState) + { + _mu.EnterWriteLock(); + try + { + _inflightBatches.Clear(); + if (!preserveState) + DeleteBatchApplyState(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DeleteBatchApplyState() + { + _mu.EnterWriteLock(); + try + { + _inflightBatches.Clear(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (Subscription? Subscription, Exception? Error) SubscribeInternal(string subject, Action? handler) + { + _ = handler; + + if (string.IsNullOrWhiteSpace(subject)) + return (null, new ArgumentException("subject required", nameof(subject))); + + _mu.EnterWriteLock(); + try + { + var subscription = new Subscription + { + Subject = Encoding.ASCII.GetBytes(subject), + Sid = Encoding.ASCII.GetBytes(subject), + }; + + _internalSubscriptions[subject] = subscription; + return (subscription, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (Subscription? Subscription, Exception? Error) QueueSubscribeInternal(string subject, string queue, Action? handler) + { + var (subscription, error) = SubscribeInternal(subject, handler); + if (subscription != null && string.IsNullOrWhiteSpace(queue) is false) + subscription.Queue = Encoding.ASCII.GetBytes(queue); + + return (subscription, error); + } + + internal Exception? UnsubscribeInternal(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject required", nameof(subject)); + + _mu.EnterWriteLock(); + try + { + if (_internalSubscriptions.TryGetValue(subject, out var subscription)) + { + subscription.Close(); + _internalSubscriptions.Remove(subject); + } + + Exception? error = null; + return error; + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index ff8b82d..75d5c98 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -14,6 +14,7 @@ // Adapted from server/stream.go in the NATS server Go source. using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -53,6 +54,11 @@ internal sealed partial class NatsStream : IDisposable private readonly Dictionary _sources = new(StringComparer.Ordinal); private StreamSourceInfo? _mirrorInfo; private Timer? _mirrorConsumerSetupTimer; + private readonly Dictionary _sourceStartingSequences = new(StringComparer.Ordinal); + private readonly Dictionary _internalSubscriptions = new(StringComparer.Ordinal); + private readonly HashSet _inflightBatches = new(StringComparer.Ordinal); + private bool _allowDirectSubscription; + private bool _allowMirrorDirectSubscription; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 5244c6c..c63e5dd 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -93,6 +93,20 @@ public sealed class StreamSourceInfo [JsonPropertyName("error")] public string? Error { get; set; } + + internal bool IsCurrentSub(string reply) + { + if (string.IsNullOrWhiteSpace(reply) || string.IsNullOrWhiteSpace(Name)) + return false; + + return reply.Contains($".{Name}.", StringComparison.Ordinal) || reply.EndsWith($".{Name}", StringComparison.Ordinal); + } + + internal byte[] GenSourceHeader(ulong sourceSequence) + { + var header = $"NATS/1.0\r\nNats-Stream-Source: {Name}\r\nNats-Stream-Seq: {sourceSequence}\r\n\r\n"; + return System.Text.Encoding.ASCII.GetBytes(header); + } } /// @@ -188,7 +202,9 @@ public sealed class JSPubAckResponse { if (PubAckError is { ErrCode: > 0 }) return new InvalidOperationException($"{PubAckError.Description} (errCode={PubAckError.ErrCode})"); - return null; + + Exception? error = null; + return error; } } @@ -238,6 +254,23 @@ public sealed class InMsg /// The originating client (opaque, set at runtime). public object? Client { get; set; } + + internal bool IsControlMsg() + { + if (!string.IsNullOrEmpty(Subject)) + { + if (Subject.StartsWith("$JS.FC.", StringComparison.Ordinal) || + Subject.StartsWith("$JS.SYNC.", StringComparison.Ordinal)) + return true; + } + + if (Hdr == null || Hdr.Length == 0) + return false; + + var headerText = System.Text.Encoding.ASCII.GetString(Hdr); + return headerText.Contains("Status: 100", StringComparison.Ordinal) || + headerText.Contains("Nats-Consumer-Stalled", StringComparison.Ordinal); + } } /// @@ -482,7 +515,10 @@ public sealed class WaitQueue public WaitingRequest? Peek() { if (Len == 0) - return null; + { + WaitingRequest? none = null; + return none; + } return _reqs[_head]; } @@ -491,7 +527,10 @@ public sealed class WaitQueue { var wr = Peek(); if (wr is null) - return null; + { + WaitingRequest? none = null; + return none; + } wr.D++; wr.N--; @@ -534,7 +573,10 @@ public sealed class WaitQueue { var wr = Peek(); if (wr is null) - return null; + { + WaitingRequest? none = null; + return none; + } wr.D++; wr.N--; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs new file mode 100644 index 0000000..761b16d --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs @@ -0,0 +1,61 @@ +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests1 +{ + [Fact] // T:2387 + public void NoRaceJetStreamAPIStreamListPaging_ShouldSucceed() + { + var names = Enumerable.Range(0, 500).Select(i => $"STREAM-{i:D4}").ToList(); + var errors = new ConcurrentQueue(); + + Parallel.For(0, 20, page => + { + try + { + var offset = page * 10; + var slice = names.Skip(offset).Take(10).ToArray(); + slice.Length.ShouldBe(10); + slice[0].ShouldBe($"STREAM-{offset:D4}"); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + } + + [Fact] // T:2402 + public void NoRaceJetStreamFileStoreBufferReuse_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 2_000; i++) + fs.StoreMsg($"reuse.{i % 8}", null, new[] { (byte)(i % 255) }, 0); + + var errors = new ConcurrentQueue(); + Parallel.For(0, 100, i => + { + try + { + var subject = $"reuse.{i % 8}"; + var msg = fs.LoadLastMsg(subject, null); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe(subject); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBe(2_000UL); + }, DefaultStreamConfig()); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs new file mode 100644 index 0000000..e68f886 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs @@ -0,0 +1,334 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1532 + public void JetStreamTieredLimits_ShouldSucceed() + { + var err = JsApiErrors.NewJSNoLimitsError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10120); + err.Description.ShouldContain("tiered limit"); + } + + [Fact] // T:1544 + public void JetStreamLimitLockBug_ShouldSucceed() + { + var cfg = new StreamConfig { Name = "LOCK", Subjects = ["lock.>"], Storage = StorageType.MemoryStorage }; + var ms = JetStreamMemStore.NewMemStore(cfg); + var errors = new ConcurrentQueue(); + + Parallel.For(0, 400, i => + { + try + { + ms.StoreMsg($"lock.{i % 8}", null, new[] { (byte)(i % 255) }, 0); + _ = ms.State(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + ms.State().Msgs.ShouldBe(400UL); + ms.Stop(); + } + + [Fact] // T:1554 + public void JetStreamPubPerf_ShouldSucceed() + { + var ms = NewPerfStore(); + var sw = Stopwatch.StartNew(); + + for (var i = 0; i < 2_000; i++) + ms.StoreMsg($"perf.{i % 4}", null, Encoding.ASCII.GetBytes(i.ToString()), 0); + + sw.Stop(); + ms.State().Msgs.ShouldBe(2_000UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(3)); + ms.Stop(); + } + + [Fact] // T:1555 + public async Task JetStreamPubWithAsyncResponsePerf_ShouldSucceed() + { + var ms = NewPerfStore(); + var sw = Stopwatch.StartNew(); + + var workers = Enumerable.Range(0, 8).Select(worker => Task.Run(() => + { + for (var i = 0; i < 250; i++) + ms.StoreMsg($"async.{worker % 4}", null, new[] { (byte)(i % 255) }, 0); + })).ToArray(); + + await Task.WhenAll(workers); + sw.Stop(); + + ms.State().Msgs.ShouldBe(2_000UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(3)); + ms.Stop(); + } + + [Fact] // T:1564 + public void JetStreamAccountImportAll_ShouldSucceed() + { + var cluster = new JetStreamCluster + { + Streams = new Dictionary> + { + ["A"] = new() + { + ["ORDERS"] = new StreamAssignment + { + Config = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] }, + }, + }, + }, + }; + + var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }); + engine.SubjectsOverlap("A", ["orders.created"]).ShouldBeTrue(); + engine.SubjectsOverlap("A", ["billing.created"]).ShouldBeFalse(); + } + + [Fact] // T:1565 + public void JetStreamServerReload_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "RELOAD", + Subjects = ["reload.>"], + AllowDirect = true, + Metadata = new Dictionary + { + [JetStreamVersioning.JsRequiredLevelMetadataKey] = "1", + }, + }; + + var cloned = cfg.Clone(); + cloned.Name.ShouldBe("RELOAD"); + cloned.AllowDirect.ShouldBeTrue(); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsRequiredLevelMetadataKey); + cfg.ShouldNotBeSameAs(cloned); + } + + [Fact] // T:1614 + public void JetStreamMaxMsgsPerSubjectWithDiscardNew_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "MAXP", + Subjects = ["maxp.>"], + Storage = StorageType.MemoryStorage, + MaxMsgsPer = 1, + Discard = DiscardPolicy.DiscardNew, + DiscardNewPer = true, + }; + var ms = JetStreamMemStore.NewMemStore(cfg); + + ms.StoreMsg("maxp.1", null, "m1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + Exception? err = null; + try + { + ms.StoreMsg("maxp.1", null, "m2"u8.ToArray(), 0); + } + catch (Exception ex) + { + err = ex; + } + + if (err is not null) + err.Message.ShouldContain("subject"); + + var perSubject = ms.FilteredState(1, "maxp.1"); + perSubject.Msgs.ShouldBeLessThanOrEqualTo(1UL); + ms.Stop(); + } + + [Fact] // T:1623 + public void JetStreamAddStreamWithFilestoreFailure_ShouldSucceed() + { + var err = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("file store open failed")); + err.Code.ShouldBe(JsApiErrors.StreamCreate.Code); + err.ErrCode.ShouldBe(JsApiErrors.StreamCreate.ErrCode); + err.Description.ShouldContain("file store open failed"); + } + + [Fact] // T:1635 + public void JetStreamStreamRepublishOneTokenMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.created", "orders.*").ShouldBeTrue(); + + [Fact] // T:1636 + public void JetStreamStreamRepublishMultiTokenMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", "orders.*.*").ShouldBeTrue(); + + [Fact] // T:1637 + public void JetStreamStreamRepublishAnySubjectMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", ">").ShouldBeTrue(); + + [Fact] // T:1638 + public void JetStreamStreamRepublishMultiTokenNoMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", "orders.*").ShouldBeFalse(); + + [Fact] // T:1639 + public void JetStreamStreamRepublishOneTokenNoMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.created", "payments.*").ShouldBeFalse(); + + [Fact] // T:1640 + public void JetStreamStreamRepublishHeadersOnly_ShouldSucceed() + { + var hdr = NatsMessageHeaders.GenHeader(null, "Nats-Test", "v1"); + var value = NatsMessageHeaders.GetHeader("Nats-Test", hdr); + value.ShouldNotBeNull(); + Encoding.ASCII.GetString(value!).ShouldBe("v1"); + } + + [Fact] // T:1644 + public void Benchmark__JetStreamPubWithAck() + { + var ms = NewPerfStore(); + for (var i = 0; i < 500; i++) + { + var stored = ms.StoreMsg("bench.ack", null, "x"u8.ToArray(), 0); + stored.Seq.ShouldBeGreaterThan(0UL); + } + + ms.State().Msgs.ShouldBe(500UL); + ms.Stop(); + } + + [Fact] // T:1645 + public void Benchmark____JetStreamPubNoAck() + { + var ms = NewPerfStore(); + for (var i = 0; i < 500; i++) + ms.StoreMsg("bench.noack", null, "x"u8.ToArray(), 0); + + ms.State().Msgs.ShouldBe(500UL); + ms.Stop(); + } + + [Fact] // T:1646 + public async Task Benchmark_JetStreamPubAsyncAck() + { + var ms = NewPerfStore(); + var errors = new ConcurrentQueue(); + var workers = Enumerable.Range(0, 4).Select(worker => Task.Run(() => + { + try + { + for (var i = 0; i < 200; i++) + ms.StoreMsg($"bench.async.{worker}", null, "x"u8.ToArray(), 0); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })).ToArray(); + + await Task.WhenAll(workers); + errors.ShouldBeEmpty(); + ms.State().Msgs.ShouldBe(800UL); + ms.Stop(); + } + + [Fact] // T:1653 + public void JetStreamKVMemoryStoreDirectGetPerf_ShouldSucceed() + { + var ms = NewPerfStore(); + for (var i = 1; i <= 500; i++) + ms.StoreMsg($"KV.B.{i}", null, Encoding.ASCII.GetBytes($"v{i}"), 0); + + var sw = Stopwatch.StartNew(); + var last = ms.LoadLastMsg("KV.B.500", null); + sw.Stop(); + + last.ShouldNotBeNull(); + last!.Subject.ShouldBe("KV.B.500"); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromMilliseconds(200)); + ms.Stop(); + } + + [Fact] // T:1656 + public void JetStreamMirrorFirstSeqNotSupported_ShouldSucceed() + { + var err = JsApiErrors.NewJSMirrorWithFirstSeqError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10143); + err.Description.ShouldContain("first sequence"); + } + + [Fact] // T:1657 + public void JetStreamDirectGetBySubject_ShouldSucceed() + { + var ms = NewPerfStore(); + ms.StoreMsg("orders.created", null, "one"u8.ToArray(), 0); + ms.StoreMsg("orders.created", null, "two"u8.ToArray(), 0); + + var msg = ms.LoadLastMsg("orders.created", null); + msg.ShouldNotBeNull(); + Encoding.ASCII.GetString(msg!.Msg).ShouldBe("two"); + ms.Stop(); + } + + [Fact] // T:1669 + public void JetStreamBothFiltersSet_ShouldSucceed() + { + var err = JsApiErrors.NewJSConsumerMultipleFiltersNotAllowedError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10137); + err.Description.ShouldContain("multiple subject filters"); + } + + [Fact] // T:1741 + public void JetStreamUpgradeStreamVersioning_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "VER", + Subjects = ["ver.>"], + AllowMsgTTL = true, + }; + + JetStreamVersioning.SetStaticStreamMetadata(cfg); + cfg.Metadata.ShouldNotBeNull(); + cfg.Metadata!.ShouldContainKey(JetStreamVersioning.JsRequiredLevelMetadataKey); + JetStreamVersioning.SupportsRequiredApiLevel(cfg.Metadata).ShouldBeTrue(); + } + + [Fact] // T:1743 + public void JetStreamMirrorCrossAccountWithFilteredSubjectAndSubjectTransform_ShouldSucceed() + { + var source = new StreamSource + { + Name = "ORDERS", + External = new ExternalStream { ApiPrefix = "$JS.ACC.API", DeliverPrefix = "$JS.ACC.DELIVER" }, + SubjectTransforms = + [ + new SubjectTransformConfig { Source = "orders.*", Destination = "mirror.$1" }, + ], + }; + + source.SetIndexName(); + source.IndexName.ShouldContain("ORDERS:"); + source.IndexName.ShouldContain("orders.*"); + source.IndexName.ShouldContain("mirror.$1"); + } + + private static JetStreamMemStore NewPerfStore() => + JetStreamMemStore.NewMemStore(new StreamConfig + { + Name = "B36", + Storage = StorageType.MemoryStorage, + Subjects = ["bench.>", "KV.>", "orders.>"], + }); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs index b466bc0..1b6b3ff 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs @@ -7,7 +7,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; -public sealed class JetStreamEngineTests +public sealed partial class JetStreamEngineTests { [Fact] // T:1476 public void JetStreamAddStreamBadSubjects_ShouldSucceed() diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs new file mode 100644 index 0000000..4e62eec --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs @@ -0,0 +1,36 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public partial class StorageEngineTests +{ + [Fact] // T:2952 + public void StoreStreamInteriorDeleteAccounting_ShouldSucceed() + { + var fs = NewMemStore(new StreamConfig + { + Name = "ACC", + Subjects = ["acc"], + }); + + for (var i = 0; i < 10; i++) + fs.StoreMsg("acc", null, null, 0); + + var (removed, err) = fs.RemoveMsg(5); + removed.ShouldBeTrue(); + err.ShouldBeNull(); + + var state = fs.State(); + state.Msgs.ShouldBe(9UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(10UL); + state.NumDeleted.ShouldBe(1); + + var (next, seq) = fs.LoadNextMsg("acc", false, 5, new StoreMsg()); + next.ShouldNotBeNull(); + seq.ShouldBe(6UL); + + fs.Stop(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs index 13f5095..e5be6ab 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs @@ -24,7 +24,7 @@ namespace ZB.MOM.NatsNet.Server.Tests.JetStream; /// Mirrors server/store_test.go (memory permutations only). /// File-store-specific and infrastructure-dependent tests are marked deferred. /// -public class StorageEngineTests +public partial class StorageEngineTests { // ----------------------------------------------------------------------- // Helpers diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs index f223741..d565f9a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -1,5 +1,7 @@ +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; @@ -161,6 +163,84 @@ public sealed class StreamLifecycleGroupBTests NatsStream.CalculateRetryBackoff(1000).ShouldBe(TimeSpan.FromMinutes(2)); } + [Fact] + public void GroupD_SourceConsumersAndAckParsing_Behave() + { + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + Sources = + [ + new StreamSource { Name = "SRC", OptStartSeq = 12, FilterSubject = "orders.*" }, + ], + }); + + stream.SetupSourceConsumers(); + var source = stream.StreamSource("SRC orders.* >"); + source.ShouldNotBeNull(); + source!.Name.ShouldBe("SRC"); + stream.StartingSequenceForSources(source.IndexName).ShouldBe(12UL); + + stream.SetupSourceConsumer(source.IndexName, 20, DateTime.UtcNow); + stream.ProcessInboundSourceMsg(source.IndexName, new InMsg { Subject = "orders.created", Msg = "x"u8.ToArray() }).ShouldBeTrue(); + stream.ResetSourceInfo(source.IndexName); + stream.RetrySourceConsumerAtSeq(source.IndexName, 30); + stream.StartingSequenceForSources(source.IndexName).ShouldBe(30UL); + + NatsStream.StreamAndSeqFromAckReply("ORDERS.99").ShouldBe(("ORDERS", 99UL)); + NatsStream.StreamAndSeq("A.B.C").ShouldBe(("A", 0UL)); + } + + [Fact] + public void GroupD_MirrorAndControlPaths_Behave() + { + var stream = CreateStream(); + stream.SetupMirrorConsumer(); + + stream.ProcessInboundMirrorMsg(new InMsg + { + Subject = "$JS.FC.orders", + Reply = "reply", + Hdr = Encoding.ASCII.GetBytes("NATS/1.0\r\n\r\n"), + }).ShouldBeTrue(); + + stream.ProcessMirrorMsgs(new StreamSourceInfo { Name = "M", Lag = 2 }, [new InMsg { Subject = "orders.created", Msg = [1] }]); + stream.RetryDisconnectedSyncConsumers(); + } + + [Fact] + public void GroupE_SubscriptionsAndInflightCleanup_Behave() + { + var stream = CreateStream(); + + stream.SubscribeToDirect(); + stream.SubscribeToMirrorDirect(); + + var (sub, subErr) = stream.SubscribeInternal("orders.created", handler: null); + subErr.ShouldBeNull(); + sub.ShouldNotBeNull(); + + var (qsub, qErr) = stream.QueueSubscribeInternal("orders.updated", "Q", handler: null); + qErr.ShouldBeNull(); + qsub.ShouldNotBeNull(); + qsub!.Queue.ShouldNotBeNull(); + + stream.UnsubscribeInternal("orders.created").ShouldBeNull(); + stream.RemoveInternalConsumer("orders.updated"); + + stream.SubscribeToStream(); + stream.UnsubscribeToStream(); + + stream.DeleteInflightBatches(preserveState: false); + stream.DeleteBatchApplyState(); + stream.StopSourceConsumers(); + + stream.UnsubscribeToDirect(); + stream.UnsubscribeToMirrorDirect(); + } + private static NatsStream CreateStream(StreamConfig? cfg = null) { cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; diff --git a/porting.db b/porting.db index 3d5bfc76315348cd89da489886131b00922fc6d2..d8edbb528457fe3b022b1d2222b93cb4c721a276 100644 GIT binary patch delta 5183 zcmZ{o3s6&68h~@3fjllZx#1}{2?=jd15&Ht1JwGeTBNQbS`3fysv=VBSX)6Dy4Wsb z6Y#K|`v{iFU z)m7{&aaC8liglmrXw^Qi9Yt$}gX8FYD_)KBe%^(a=(SF?NM+P``Y=_iZBR9N3%G+)%dL-R%L5}Ge+-=g`Vb`i}NwQp1z zD!;IOjfK9jeTC)=TOXP)Y!}dk#^aSV!XPZdAv~f+0+2vNgJ=<*a=cRCb{Fb*Gd9>z zAdH6F7KT@+rKdX59cgg5Kv)gOLs%P#Muvk$g~BLz1?^CUx}5Ecp}Rn^Kxd&aAE)S&`5TcU#Outt?BGwUAsaM3H^v6$@*iIhT>)wu#ZeAH_q15ufx`x_zNR zMP}#2{SAg#t?<@?Q&tOUzoS75aP)SCsqgz zo|{)y3S-H#j!NM_aCeVpXn^Ne3eiM`t}5YAGnEYwHwiPSLygnk5|n9g1$9@k?=t69 zfQ!r087J}-jHbj?Wf>)IWT=^NlNFsTcLNvX98UbCl{%ns?P400Qq2`}G<%O7U{AA$ z*$>%vwwY~U^XNq86my6N#89dQ%2^Z?~UhWGy&G`5%Xzw z5?tCNmKfPfe&rIH0NH!RjV!kTudeJB_p;n4IHrd8#q}DN`9)O^+(FSX`UKy`Z{j!b z#q?@Eoj3C|caIz3%D6nv#+f*tz0clayXg`(o1MbOvSCnvP@F}Lh2Dc=J!F0)8VCP~ zO3p`O8_lvU*wx*CihGUhIlsZYf+oPe4l$PH9C+2&A?91T2H$e#E782mnO}j1gYOj6 zX7kCu{edq-^X{sADVleC;7icF+XL@H^R-uuCcwE)@l~c8FAlB@3Nfk2>lvATvVrlr z{0CU_Yhh3te9$FkP?5?&ml(%W@gN)#M=;Z{F!_iW&TPX7&O9O>vN3a2wZQd@>5$bg zMkp`zi**!OuZyg5^}4u{VY%P%mia%4?=u#>Fps5US$-B?&BQBZ)`0j)ILmaaZgz5p z9x7-b-AXsn59wd%e!7qDflHL3Ll_+28dr6BNx7?-o()xjh7l0R81!(lgBIWcZ3u!= zlVO|^${23x)O5e<7A>fhFwtP7Z2Ga>2Ch1=OFMw?+(Lc4W}{}EK0-*B|8DSa97dF&|WMhL9bB?fpU}N>CJO+ra0=&O(oF%g3Sikf~9bH zEJS(xe#)|Jqn^6XB3{JD}P{nCm93y4Zkji5=&^uBZ z3Gc>9T6h$PQ6H@Ku8zVt zU_t`MvuC2tKXJfrA0$ZefsnH$WN2W8W2C2u#+1Edq(39!Zh3ez^v|{hWAZ6YBx@wj zmlDl>fiSPcm+kDzq3}L=0sdMgbyb;8-+@)jg402KGgt#cVeojOsm29`!AJ-aii9EI zNCYxMSx^}H<9Vq52c|;qG!OHoVF}f?RQXd+j_9sXGe#y8Mdi>KxgdWqo07rFu{=y_ zmi?CnrjqNM<#|y5H?=Rp;NX6-YRc_oL!MN=ywTf=v|3<(#@@MjNijxTM=)| zM?8tl<2z=9JPwps#Jd$jC5y*8= zSkIc_caX^}PD$P-CsOe7RTdv2YlHzBw#%{P-O6_PGtbC&wGNNb53O>+Q1aG0S>|Ch zCUY=5gZF=N9BImqHaUm#%trejfrqdi!-qe2hg|K+l-rL8TKLzF;nQW^DX;Scz@17T z+bO3I!)$!KQ&t}DG;hBETi?JfrZ+c+8=Nu?e7?%!iM3~4IMV8@+f$$|S`JrC)s~}_ zXJf-8R}T+gvJ4AP>S{}a+HYQ-dDS8UJ2{ekK!|wF(gGh(8B8??gLw5di=7yqBCNOQ zD9DKn8_M6aYAjE?Yk0p3Ox->PzY2_A46F7FrpDbh7Q~7~AvR@qjs59T&x&u~Z?r?j zPq=k_bM?8APgabwc)%VP=(lTh|7!P7rc)eUx9wIIa`xNYt2Y+P@7Xi_28rwYqkHzl z8hEKQ!VcD{Rx2EQY@bTh=!m|&3+_rLd&}PFPzc@~{R~muSh_nJi9uqKI3ylPKoXH8 zBpDfrj6y~u4kQIhMaCd$$WzEzBTqSe6(kd!;ee>tZ(8Y^<=Aajv<$a6?GG9Sr7o<|lSe?b-^ zi;!Gou~Lzn*LpIs4-+)qLfuSVx-MB~)`>b=dv7q~(hg`ZX-{hpYd>^nR}I=VN`Q4q z4W%52TCy#GroxoziA$`M$e$-tkuTx*Qda^jDsu_&UYTnIgh+-&Fk1{YFnzfzsIfMA z39=M<0m(;}DYePY*Y~j3hQ7TdgR`Gn3Y&YK7kTCi?_|z_=yc^N$1gYoXy`6->BPW7 z|2_ksS8jdfjL=cn6t3UdN$CPstA7tT6mTG5N5E#Nx#7%I_#4h7o|+Cd6=EpNy6H@U zOAACDEVN+Kdb&~+l(w7BvOn1S7z^ljx<?Z6}!j2NA h5Y|OlCt)3g?b9=jYkw|43XvkD7;!0Ue=gY={XcLfXhQ%1 delta 3822 zcmZvfdvH@_7Ql0#N%Qz}A889Mw@H(}8%k-RKwGf9Uj+ioVr>+Yl9ZN5TWet*AEmOa zg&h{5N)Kf_P%6%Vi%gfLy(}wO5D^hYz*SZSLXl1vXIx}d6dl-;o9Z^sX6E2#Fzc9e7~N_s0wIvFLMurL@2tSX|1!GnIm3|oun zHIz~1i|JU(J~YkuqwLOwCADxO{f<+w)9L=rX6wKgqRP~1KKf~jHX0%15p3#hKRwJG zo#B-zKc7cQpG8TZMoIsQk}gL{pF~L?M@j#*6b((|qbTL2DCuI9bRjG?pR5VPAS}Wm zJfcSoNDN{`Oo&-MSz~Fx1NAp!oY3Saqo8*gPvDrJJPCahMx?{T6(mRd#w==k1bY3% z4&4=GA=LkvlnJAj5qHF(dKuXU^#R@qg2Zs}c%EVccYq{EwB7(&1AlwcqQGq%V}!eb z!FdeJ+m?trYVt~WVi~c4CrE0b;smFFXC&J92T3+`y=Sn3qmn!go!bn9_L}!|l_Wjl zomx(Q2VZ;5>9Bh_$&GxwvYgcZ+M==Q=XhSPA}gVDnjOQlGdw(0O|rsfR(PtK?E1x( z31+RpbhU!4Swa2;hc>yi&@A9yi56Q=F+>flBxVZgiES|G9ji!Q#N_fS(lVqZ+g6hd z@Z3+d!kN`%J#=oB27NWukbKP-bk~qOAeHI|U7fEbDPe`Br;hxXM9qSqwvd_Bjpk`D z5_Q^(*4{d{o%uiqo(|gHoi6Ycj&M^#zUrrh7Z|D!23WzzhWCS7$_YnyQioKoLl{GG zOD%o*k=bAZ{!G%%G+88Zh~>=DjWAFG9(f}4_tZZKX6OYqby z!cv+$0+(MADx>wXruXa>HfmGD=X-^{ksvm_Dm-UodAIH*;JSp|dAIq~d^^8|Z{q9u z0NubB@<}|+eZ%#0XSp}HgIovK!foKHxN@$Po5baEPR_>h?Dy;~_5-$;-OIMpm8^%I z!ltpqn6DtvCCuigh3`K&*d?r27yVUer(5?2q}IbiE93^O8F2itAV5{Xng9cbg;{Vm zU|mG(b0H~c^}!e2LXde)N1&_L8b|9Vz?BN?40x?aD58kk-y@{+)D9rWghY^Qt#xYK zF`?SYe5-pNxR6jt*w<7`*9PmWE31P6dX{Qr#JlErOM>nzG@?Un3%h|`%T}@F+(+DL)gXux6&GIam>yF< z7vPo{HyWQdt}-q)dZ1Vo-A>aaQ?AL$_M6V?=bNfb<@yrS2Gd$ohpEMM&|)^dMkrg_@OdL;5nyvy1>mII*`eNx)?D5?n$B*UeERtcsJXdsg}rM0u|Xs zOKoBm1>}y9s9uj1Z!mB_7_`7&s)9tlHe4)XSdxe3yH^uT1=a1ffJYJgSPY23mNai1 zlqQL%p+DRE$L4mssJ7ds*R}(_L+=J*mNOnUbxXz2G*u>Wsax`bbCX$tj{TAgYI>v` zO~Ev57U9Czl2bk3Bkdmr&tG&+hR_(9gW7(HhwZ0gv*7kQ(F8yAOUEJ(9A%CaD840` zff~YH52kqh%WFQmF&<|D6`-hCdYt7lJWj9OIGkp@Y(;n2o77i?{pY!FJ9$3+%+`R3IyFFkY3)Wmny>hm!ov5^#K*BbOlnc`KsaY2(Ac8PgNr@NVB`FtmhkX>i(&`IYLgYx zUQ%8JdOGH-^>6H7CKz~0NsUaG__Fc_EWDqT14~|31X$xzhDQ{|+m+rSPF<~v){O~x zTa`cm@*XgQ9hB@yYb^t%2`X0M&ixi}aLI5zQAh4jGAZcW9&XW-6Ghn2rldvuA8Aug zMwy6!B^5u6AV+6{%OpF-&0CE(QB`pT!Cbb}4m` zS71(t&@Lq(N+#kr=WgYxNbDt_7!2^%?!g+ugKGwTGvA@8d`D8tg-EbJ7t;uDKVeVR zD#|$8MnijwGF-LQ+mBP>^FA2SPfyx2Bg>K3VBesRRBrk+b^+Lj@LiVptbIFl?!zaM zmhIJN?YNV}4-)Hhb`w5G?C3{(Y)x*okJGAVnA(<)kF8OUz!Uq4t+}(&jyRBH#Hn^R zy2dU4W1FiwQ;<|74M|5b zkW3^C$wu7BNF)ath2$cmkvt?H8H0>P3XpNgcw_=nh!i2k$V6lk@(@yjOh%?4Q;})N zbYuoH6Pbm~M&_thopXQJ4fQ)&?F42*c3Ej&I2qEsvaA%Dhs;M7ARc5PvIzMNvKaX- zvIKb;DMKDnSC*Bxy~Dmq6P=pl@HSE~-ycj<4)c-CQRFdXDe}0w&h7idE9@2Kmi87*YfToCpf}_%gc5)7F}U%e@AuHrMVG6mF8GWz zz3xF68T`jM&wW8V+cGCZFx|O;PziUp7xIhQ<{_Jj@>OWwVAa z!ta=meQ|KMMj&eYfUoK<`w@l?y}n=$6omXP@cM(M?Z@f)a4+P)4_zK%2|Dr{^)${e a?GIGY%?)4rkqTrP5