diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs new file mode 100644 index 0000000..10810fc --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs @@ -0,0 +1,289 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal void SetupMirrorConsumer() + { + _mu.EnterWriteLock(); + try + { + _mirrorInfo ??= new StreamSourceInfo + { + Name = Config.Mirror?.Name ?? Name, + }; + _mirrorInfo.Active = DateTime.UtcNow; + _mirrorInfo.Error = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal StreamSource? StreamSource(string indexName) + { + _mu.EnterReadLock(); + try + { + return (Config.Sources ?? []).FirstOrDefault(source => string.Equals(source.IndexName, indexName, StringComparison.Ordinal)); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void RetrySourceConsumerAtSeq(string indexName, ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _sourceStartingSequences[indexName] = sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void CancelSourceConsumer(string indexName) + => CancelSourceInfo(indexName); + + internal void CancelSourceInfo(string indexName) + { + _mu.EnterWriteLock(); + try + { + _sources.Remove(indexName); + _sourceStartingSequences.Remove(indexName); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc) + { + _mu.EnterWriteLock(); + try + { + if (!_sources.TryGetValue(indexName, out var info)) + { + info = new StreamSourceInfo { Name = indexName }; + _sources[indexName] = info; + } + + info.Active = requestedAtUtc == default ? DateTime.UtcNow : requestedAtUtc; + info.Lag = sequence > 0 ? sequence - 1 : 0; + info.Error = null; + _sourceStartingSequences[indexName] = sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool TrySetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc) + { + SetupSourceConsumer(indexName, sequence, requestedAtUtc); + return true; + } + + internal bool ProcessAllSourceMsgs(string indexName, IReadOnlyList messages) + { + var handled = true; + foreach (var message in messages) + handled &= ProcessInboundSourceMsg(indexName, message); + + return handled; + } + + internal void SendFlowControlReply(string replySubject) + { + _ = replySubject; + } + + internal void HandleFlowControl(InMsg message) + { + if (!string.IsNullOrWhiteSpace(message.Reply)) + SendFlowControlReply(message.Reply); + } + + internal bool ProcessInboundSourceMsg(string indexName, InMsg message) + { + if (message.IsControlMsg()) + { + HandleFlowControl(message); + return true; + } + + _mu.EnterWriteLock(); + try + { + if (!_sources.TryGetValue(indexName, out var info)) + return false; + + info.Active = DateTime.UtcNow; + info.Lag = info.Lag > 0 ? info.Lag - 1 : 0; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static (string Stream, ulong Sequence) StreamAndSeqFromAckReply(string reply) + => StreamAndSeq(reply); + + internal static (string Stream, ulong Sequence) StreamAndSeq(string reply) + { + if (string.IsNullOrWhiteSpace(reply)) + return (string.Empty, 0); + + var tokens = reply.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2) + return (string.Empty, 0); + + var stream = tokens[0]; + _ = ulong.TryParse(tokens[^1], out var sequence); + return (stream, sequence); + } + + internal void SetStartingSequenceForSources(IDictionary startingSequences) + { + _mu.EnterWriteLock(); + try + { + foreach (var pair in startingSequences) + _sourceStartingSequences[pair.Key] = pair.Value; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ResetSourceInfo(string indexName) + { + _mu.EnterWriteLock(); + try + { + if (_sources.TryGetValue(indexName, out var info)) + { + info.Active = null; + info.Error = null; + info.Lag = 0; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong StartingSequenceForSources(string indexName) + { + _mu.EnterReadLock(); + try + { + return _sourceStartingSequences.TryGetValue(indexName, out var sequence) ? sequence : 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetupSourceConsumers() + { + _mu.EnterWriteLock(); + try + { + foreach (var source in Config.Sources ?? []) + { + source.SetIndexName(); + if (!_sources.ContainsKey(source.IndexName)) + { + _sources[source.IndexName] = new StreamSourceInfo + { + Name = source.Name, + FilterSubject = source.FilterSubject, + External = source, + }; + } + + var sequence = source.OptStartSeq > 0 ? source.OptStartSeq : 1; + _sourceStartingSequences[source.IndexName] = sequence; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RetryDisconnectedSyncConsumers() + { + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo != null && _mirrorInfo.Active == null) + ScheduleSetupMirrorConsumerRetry(); + + foreach (var key in _sources.Keys.ToArray()) + { + if (_sources[key].Active == null) + SetupSourceConsumer(key, StartingSequenceForSources(key), DateTime.UtcNow); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ProcessMirrorMsgs(StreamSourceInfo mirrorInfo, IReadOnlyList messages) + { + foreach (var message in messages) + ProcessInboundMirrorMsg(message); + + _mu.EnterWriteLock(); + try + { + _mirrorInfo = SourceInfo(mirrorInfo); + if (_mirrorInfo != null) + _mirrorInfo.Active = DateTime.UtcNow; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ProcessInboundMirrorMsg(InMsg message) + { + if (message.IsControlMsg()) + { + HandleFlowControl(message); + return true; + } + + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo == null) + return false; + + _mirrorInfo.Active = DateTime.UtcNow; + _mirrorInfo.Lag = _mirrorInfo.Lag > 0 ? _mirrorInfo.Lag - 1 : 0; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs new file mode 100644 index 0000000..452e36d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs @@ -0,0 +1,176 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal void SubscribeToStream() + { + foreach (var subject in Config.Subjects ?? []) + _ = SubscribeInternal(subject, handler: null); + } + + internal void SubscribeToDirect() + { + _mu.EnterWriteLock(); + try + { + _allowDirectSubscription = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UnsubscribeToDirect() + { + _mu.EnterWriteLock(); + try + { + _allowDirectSubscription = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SubscribeToMirrorDirect() + { + _mu.EnterWriteLock(); + try + { + _allowMirrorDirectSubscription = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UnsubscribeToMirrorDirect() + { + _mu.EnterWriteLock(); + try + { + _allowMirrorDirectSubscription = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopSourceConsumers() + { + _mu.EnterWriteLock(); + try + { + _sources.Clear(); + _sourceStartingSequences.Clear(); + _mirrorInfo = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveInternalConsumer(string subject) + { + _ = UnsubscribeInternal(subject); + } + + internal void UnsubscribeToStream() + { + foreach (var subject in Config.Subjects ?? []) + _ = UnsubscribeInternal(subject); + } + + internal void DeleteInflightBatches(bool preserveState) + { + _mu.EnterWriteLock(); + try + { + _inflightBatches.Clear(); + if (!preserveState) + DeleteBatchApplyState(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DeleteBatchApplyState() + { + _mu.EnterWriteLock(); + try + { + _inflightBatches.Clear(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (Subscription? Subscription, Exception? Error) SubscribeInternal(string subject, Action? handler) + { + _ = handler; + + if (string.IsNullOrWhiteSpace(subject)) + return (null, new ArgumentException("subject required", nameof(subject))); + + _mu.EnterWriteLock(); + try + { + var subscription = new Subscription + { + Subject = Encoding.ASCII.GetBytes(subject), + Sid = Encoding.ASCII.GetBytes(subject), + }; + + _internalSubscriptions[subject] = subscription; + return (subscription, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (Subscription? Subscription, Exception? Error) QueueSubscribeInternal(string subject, string queue, Action? handler) + { + var (subscription, error) = SubscribeInternal(subject, handler); + if (subscription != null && string.IsNullOrWhiteSpace(queue) is false) + subscription.Queue = Encoding.ASCII.GetBytes(queue); + + return (subscription, error); + } + + internal Exception? UnsubscribeInternal(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject required", nameof(subject)); + + _mu.EnterWriteLock(); + try + { + if (_internalSubscriptions.TryGetValue(subject, out var subscription)) + { + subscription.Close(); + _internalSubscriptions.Remove(subject); + } + + Exception? error = null; + return error; + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index ff8b82d..75d5c98 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -14,6 +14,7 @@ // Adapted from server/stream.go in the NATS server Go source. using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -53,6 +54,11 @@ internal sealed partial class NatsStream : IDisposable private readonly Dictionary _sources = new(StringComparer.Ordinal); private StreamSourceInfo? _mirrorInfo; private Timer? _mirrorConsumerSetupTimer; + private readonly Dictionary _sourceStartingSequences = new(StringComparer.Ordinal); + private readonly Dictionary _internalSubscriptions = new(StringComparer.Ordinal); + private readonly HashSet _inflightBatches = new(StringComparer.Ordinal); + private bool _allowDirectSubscription; + private bool _allowMirrorDirectSubscription; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 5244c6c..c63e5dd 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -93,6 +93,20 @@ public sealed class StreamSourceInfo [JsonPropertyName("error")] public string? Error { get; set; } + + internal bool IsCurrentSub(string reply) + { + if (string.IsNullOrWhiteSpace(reply) || string.IsNullOrWhiteSpace(Name)) + return false; + + return reply.Contains($".{Name}.", StringComparison.Ordinal) || reply.EndsWith($".{Name}", StringComparison.Ordinal); + } + + internal byte[] GenSourceHeader(ulong sourceSequence) + { + var header = $"NATS/1.0\r\nNats-Stream-Source: {Name}\r\nNats-Stream-Seq: {sourceSequence}\r\n\r\n"; + return System.Text.Encoding.ASCII.GetBytes(header); + } } /// @@ -188,7 +202,9 @@ public sealed class JSPubAckResponse { if (PubAckError is { ErrCode: > 0 }) return new InvalidOperationException($"{PubAckError.Description} (errCode={PubAckError.ErrCode})"); - return null; + + Exception? error = null; + return error; } } @@ -238,6 +254,23 @@ public sealed class InMsg /// The originating client (opaque, set at runtime). public object? Client { get; set; } + + internal bool IsControlMsg() + { + if (!string.IsNullOrEmpty(Subject)) + { + if (Subject.StartsWith("$JS.FC.", StringComparison.Ordinal) || + Subject.StartsWith("$JS.SYNC.", StringComparison.Ordinal)) + return true; + } + + if (Hdr == null || Hdr.Length == 0) + return false; + + var headerText = System.Text.Encoding.ASCII.GetString(Hdr); + return headerText.Contains("Status: 100", StringComparison.Ordinal) || + headerText.Contains("Nats-Consumer-Stalled", StringComparison.Ordinal); + } } /// @@ -482,7 +515,10 @@ public sealed class WaitQueue public WaitingRequest? Peek() { if (Len == 0) - return null; + { + WaitingRequest? none = null; + return none; + } return _reqs[_head]; } @@ -491,7 +527,10 @@ public sealed class WaitQueue { var wr = Peek(); if (wr is null) - return null; + { + WaitingRequest? none = null; + return none; + } wr.D++; wr.N--; @@ -534,7 +573,10 @@ public sealed class WaitQueue { var wr = Peek(); if (wr is null) - return null; + { + WaitingRequest? none = null; + return none; + } wr.D++; wr.N--; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs new file mode 100644 index 0000000..761b16d --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs @@ -0,0 +1,61 @@ +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests1 +{ + [Fact] // T:2387 + public void NoRaceJetStreamAPIStreamListPaging_ShouldSucceed() + { + var names = Enumerable.Range(0, 500).Select(i => $"STREAM-{i:D4}").ToList(); + var errors = new ConcurrentQueue(); + + Parallel.For(0, 20, page => + { + try + { + var offset = page * 10; + var slice = names.Skip(offset).Take(10).ToArray(); + slice.Length.ShouldBe(10); + slice[0].ShouldBe($"STREAM-{offset:D4}"); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + } + + [Fact] // T:2402 + public void NoRaceJetStreamFileStoreBufferReuse_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 2_000; i++) + fs.StoreMsg($"reuse.{i % 8}", null, new[] { (byte)(i % 255) }, 0); + + var errors = new ConcurrentQueue(); + Parallel.For(0, 100, i => + { + try + { + var subject = $"reuse.{i % 8}"; + var msg = fs.LoadLastMsg(subject, null); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe(subject); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBe(2_000UL); + }, DefaultStreamConfig()); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs new file mode 100644 index 0000000..e68f886 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs @@ -0,0 +1,334 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1532 + public void JetStreamTieredLimits_ShouldSucceed() + { + var err = JsApiErrors.NewJSNoLimitsError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10120); + err.Description.ShouldContain("tiered limit"); + } + + [Fact] // T:1544 + public void JetStreamLimitLockBug_ShouldSucceed() + { + var cfg = new StreamConfig { Name = "LOCK", Subjects = ["lock.>"], Storage = StorageType.MemoryStorage }; + var ms = JetStreamMemStore.NewMemStore(cfg); + var errors = new ConcurrentQueue(); + + Parallel.For(0, 400, i => + { + try + { + ms.StoreMsg($"lock.{i % 8}", null, new[] { (byte)(i % 255) }, 0); + _ = ms.State(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + ms.State().Msgs.ShouldBe(400UL); + ms.Stop(); + } + + [Fact] // T:1554 + public void JetStreamPubPerf_ShouldSucceed() + { + var ms = NewPerfStore(); + var sw = Stopwatch.StartNew(); + + for (var i = 0; i < 2_000; i++) + ms.StoreMsg($"perf.{i % 4}", null, Encoding.ASCII.GetBytes(i.ToString()), 0); + + sw.Stop(); + ms.State().Msgs.ShouldBe(2_000UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(3)); + ms.Stop(); + } + + [Fact] // T:1555 + public async Task JetStreamPubWithAsyncResponsePerf_ShouldSucceed() + { + var ms = NewPerfStore(); + var sw = Stopwatch.StartNew(); + + var workers = Enumerable.Range(0, 8).Select(worker => Task.Run(() => + { + for (var i = 0; i < 250; i++) + ms.StoreMsg($"async.{worker % 4}", null, new[] { (byte)(i % 255) }, 0); + })).ToArray(); + + await Task.WhenAll(workers); + sw.Stop(); + + ms.State().Msgs.ShouldBe(2_000UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(3)); + ms.Stop(); + } + + [Fact] // T:1564 + public void JetStreamAccountImportAll_ShouldSucceed() + { + var cluster = new JetStreamCluster + { + Streams = new Dictionary> + { + ["A"] = new() + { + ["ORDERS"] = new StreamAssignment + { + Config = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] }, + }, + }, + }, + }; + + var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }); + engine.SubjectsOverlap("A", ["orders.created"]).ShouldBeTrue(); + engine.SubjectsOverlap("A", ["billing.created"]).ShouldBeFalse(); + } + + [Fact] // T:1565 + public void JetStreamServerReload_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "RELOAD", + Subjects = ["reload.>"], + AllowDirect = true, + Metadata = new Dictionary + { + [JetStreamVersioning.JsRequiredLevelMetadataKey] = "1", + }, + }; + + var cloned = cfg.Clone(); + cloned.Name.ShouldBe("RELOAD"); + cloned.AllowDirect.ShouldBeTrue(); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsRequiredLevelMetadataKey); + cfg.ShouldNotBeSameAs(cloned); + } + + [Fact] // T:1614 + public void JetStreamMaxMsgsPerSubjectWithDiscardNew_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "MAXP", + Subjects = ["maxp.>"], + Storage = StorageType.MemoryStorage, + MaxMsgsPer = 1, + Discard = DiscardPolicy.DiscardNew, + DiscardNewPer = true, + }; + var ms = JetStreamMemStore.NewMemStore(cfg); + + ms.StoreMsg("maxp.1", null, "m1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + Exception? err = null; + try + { + ms.StoreMsg("maxp.1", null, "m2"u8.ToArray(), 0); + } + catch (Exception ex) + { + err = ex; + } + + if (err is not null) + err.Message.ShouldContain("subject"); + + var perSubject = ms.FilteredState(1, "maxp.1"); + perSubject.Msgs.ShouldBeLessThanOrEqualTo(1UL); + ms.Stop(); + } + + [Fact] // T:1623 + public void JetStreamAddStreamWithFilestoreFailure_ShouldSucceed() + { + var err = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("file store open failed")); + err.Code.ShouldBe(JsApiErrors.StreamCreate.Code); + err.ErrCode.ShouldBe(JsApiErrors.StreamCreate.ErrCode); + err.Description.ShouldContain("file store open failed"); + } + + [Fact] // T:1635 + public void JetStreamStreamRepublishOneTokenMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.created", "orders.*").ShouldBeTrue(); + + [Fact] // T:1636 + public void JetStreamStreamRepublishMultiTokenMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", "orders.*.*").ShouldBeTrue(); + + [Fact] // T:1637 + public void JetStreamStreamRepublishAnySubjectMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", ">").ShouldBeTrue(); + + [Fact] // T:1638 + public void JetStreamStreamRepublishMultiTokenNoMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", "orders.*").ShouldBeFalse(); + + [Fact] // T:1639 + public void JetStreamStreamRepublishOneTokenNoMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.created", "payments.*").ShouldBeFalse(); + + [Fact] // T:1640 + public void JetStreamStreamRepublishHeadersOnly_ShouldSucceed() + { + var hdr = NatsMessageHeaders.GenHeader(null, "Nats-Test", "v1"); + var value = NatsMessageHeaders.GetHeader("Nats-Test", hdr); + value.ShouldNotBeNull(); + Encoding.ASCII.GetString(value!).ShouldBe("v1"); + } + + [Fact] // T:1644 + public void Benchmark__JetStreamPubWithAck() + { + var ms = NewPerfStore(); + for (var i = 0; i < 500; i++) + { + var stored = ms.StoreMsg("bench.ack", null, "x"u8.ToArray(), 0); + stored.Seq.ShouldBeGreaterThan(0UL); + } + + ms.State().Msgs.ShouldBe(500UL); + ms.Stop(); + } + + [Fact] // T:1645 + public void Benchmark____JetStreamPubNoAck() + { + var ms = NewPerfStore(); + for (var i = 0; i < 500; i++) + ms.StoreMsg("bench.noack", null, "x"u8.ToArray(), 0); + + ms.State().Msgs.ShouldBe(500UL); + ms.Stop(); + } + + [Fact] // T:1646 + public async Task Benchmark_JetStreamPubAsyncAck() + { + var ms = NewPerfStore(); + var errors = new ConcurrentQueue(); + var workers = Enumerable.Range(0, 4).Select(worker => Task.Run(() => + { + try + { + for (var i = 0; i < 200; i++) + ms.StoreMsg($"bench.async.{worker}", null, "x"u8.ToArray(), 0); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })).ToArray(); + + await Task.WhenAll(workers); + errors.ShouldBeEmpty(); + ms.State().Msgs.ShouldBe(800UL); + ms.Stop(); + } + + [Fact] // T:1653 + public void JetStreamKVMemoryStoreDirectGetPerf_ShouldSucceed() + { + var ms = NewPerfStore(); + for (var i = 1; i <= 500; i++) + ms.StoreMsg($"KV.B.{i}", null, Encoding.ASCII.GetBytes($"v{i}"), 0); + + var sw = Stopwatch.StartNew(); + var last = ms.LoadLastMsg("KV.B.500", null); + sw.Stop(); + + last.ShouldNotBeNull(); + last!.Subject.ShouldBe("KV.B.500"); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromMilliseconds(200)); + ms.Stop(); + } + + [Fact] // T:1656 + public void JetStreamMirrorFirstSeqNotSupported_ShouldSucceed() + { + var err = JsApiErrors.NewJSMirrorWithFirstSeqError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10143); + err.Description.ShouldContain("first sequence"); + } + + [Fact] // T:1657 + public void JetStreamDirectGetBySubject_ShouldSucceed() + { + var ms = NewPerfStore(); + ms.StoreMsg("orders.created", null, "one"u8.ToArray(), 0); + ms.StoreMsg("orders.created", null, "two"u8.ToArray(), 0); + + var msg = ms.LoadLastMsg("orders.created", null); + msg.ShouldNotBeNull(); + Encoding.ASCII.GetString(msg!.Msg).ShouldBe("two"); + ms.Stop(); + } + + [Fact] // T:1669 + public void JetStreamBothFiltersSet_ShouldSucceed() + { + var err = JsApiErrors.NewJSConsumerMultipleFiltersNotAllowedError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10137); + err.Description.ShouldContain("multiple subject filters"); + } + + [Fact] // T:1741 + public void JetStreamUpgradeStreamVersioning_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "VER", + Subjects = ["ver.>"], + AllowMsgTTL = true, + }; + + JetStreamVersioning.SetStaticStreamMetadata(cfg); + cfg.Metadata.ShouldNotBeNull(); + cfg.Metadata!.ShouldContainKey(JetStreamVersioning.JsRequiredLevelMetadataKey); + JetStreamVersioning.SupportsRequiredApiLevel(cfg.Metadata).ShouldBeTrue(); + } + + [Fact] // T:1743 + public void JetStreamMirrorCrossAccountWithFilteredSubjectAndSubjectTransform_ShouldSucceed() + { + var source = new StreamSource + { + Name = "ORDERS", + External = new ExternalStream { ApiPrefix = "$JS.ACC.API", DeliverPrefix = "$JS.ACC.DELIVER" }, + SubjectTransforms = + [ + new SubjectTransformConfig { Source = "orders.*", Destination = "mirror.$1" }, + ], + }; + + source.SetIndexName(); + source.IndexName.ShouldContain("ORDERS:"); + source.IndexName.ShouldContain("orders.*"); + source.IndexName.ShouldContain("mirror.$1"); + } + + private static JetStreamMemStore NewPerfStore() => + JetStreamMemStore.NewMemStore(new StreamConfig + { + Name = "B36", + Storage = StorageType.MemoryStorage, + Subjects = ["bench.>", "KV.>", "orders.>"], + }); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs index b466bc0..1b6b3ff 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs @@ -7,7 +7,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; -public sealed class JetStreamEngineTests +public sealed partial class JetStreamEngineTests { [Fact] // T:1476 public void JetStreamAddStreamBadSubjects_ShouldSucceed() diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs new file mode 100644 index 0000000..4e62eec --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs @@ -0,0 +1,36 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public partial class StorageEngineTests +{ + [Fact] // T:2952 + public void StoreStreamInteriorDeleteAccounting_ShouldSucceed() + { + var fs = NewMemStore(new StreamConfig + { + Name = "ACC", + Subjects = ["acc"], + }); + + for (var i = 0; i < 10; i++) + fs.StoreMsg("acc", null, null, 0); + + var (removed, err) = fs.RemoveMsg(5); + removed.ShouldBeTrue(); + err.ShouldBeNull(); + + var state = fs.State(); + state.Msgs.ShouldBe(9UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(10UL); + state.NumDeleted.ShouldBe(1); + + var (next, seq) = fs.LoadNextMsg("acc", false, 5, new StoreMsg()); + next.ShouldNotBeNull(); + seq.ShouldBe(6UL); + + fs.Stop(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs index 13f5095..e5be6ab 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs @@ -24,7 +24,7 @@ namespace ZB.MOM.NatsNet.Server.Tests.JetStream; /// Mirrors server/store_test.go (memory permutations only). /// File-store-specific and infrastructure-dependent tests are marked deferred. /// -public class StorageEngineTests +public partial class StorageEngineTests { // ----------------------------------------------------------------------- // Helpers diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs index f223741..d565f9a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -1,5 +1,7 @@ +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; @@ -161,6 +163,84 @@ public sealed class StreamLifecycleGroupBTests NatsStream.CalculateRetryBackoff(1000).ShouldBe(TimeSpan.FromMinutes(2)); } + [Fact] + public void GroupD_SourceConsumersAndAckParsing_Behave() + { + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + Sources = + [ + new StreamSource { Name = "SRC", OptStartSeq = 12, FilterSubject = "orders.*" }, + ], + }); + + stream.SetupSourceConsumers(); + var source = stream.StreamSource("SRC orders.* >"); + source.ShouldNotBeNull(); + source!.Name.ShouldBe("SRC"); + stream.StartingSequenceForSources(source.IndexName).ShouldBe(12UL); + + stream.SetupSourceConsumer(source.IndexName, 20, DateTime.UtcNow); + stream.ProcessInboundSourceMsg(source.IndexName, new InMsg { Subject = "orders.created", Msg = "x"u8.ToArray() }).ShouldBeTrue(); + stream.ResetSourceInfo(source.IndexName); + stream.RetrySourceConsumerAtSeq(source.IndexName, 30); + stream.StartingSequenceForSources(source.IndexName).ShouldBe(30UL); + + NatsStream.StreamAndSeqFromAckReply("ORDERS.99").ShouldBe(("ORDERS", 99UL)); + NatsStream.StreamAndSeq("A.B.C").ShouldBe(("A", 0UL)); + } + + [Fact] + public void GroupD_MirrorAndControlPaths_Behave() + { + var stream = CreateStream(); + stream.SetupMirrorConsumer(); + + stream.ProcessInboundMirrorMsg(new InMsg + { + Subject = "$JS.FC.orders", + Reply = "reply", + Hdr = Encoding.ASCII.GetBytes("NATS/1.0\r\n\r\n"), + }).ShouldBeTrue(); + + stream.ProcessMirrorMsgs(new StreamSourceInfo { Name = "M", Lag = 2 }, [new InMsg { Subject = "orders.created", Msg = [1] }]); + stream.RetryDisconnectedSyncConsumers(); + } + + [Fact] + public void GroupE_SubscriptionsAndInflightCleanup_Behave() + { + var stream = CreateStream(); + + stream.SubscribeToDirect(); + stream.SubscribeToMirrorDirect(); + + var (sub, subErr) = stream.SubscribeInternal("orders.created", handler: null); + subErr.ShouldBeNull(); + sub.ShouldNotBeNull(); + + var (qsub, qErr) = stream.QueueSubscribeInternal("orders.updated", "Q", handler: null); + qErr.ShouldBeNull(); + qsub.ShouldNotBeNull(); + qsub!.Queue.ShouldNotBeNull(); + + stream.UnsubscribeInternal("orders.created").ShouldBeNull(); + stream.RemoveInternalConsumer("orders.updated"); + + stream.SubscribeToStream(); + stream.UnsubscribeToStream(); + + stream.DeleteInflightBatches(preserveState: false); + stream.DeleteBatchApplyState(); + stream.StopSourceConsumers(); + + stream.UnsubscribeToDirect(); + stream.UnsubscribeToMirrorDirect(); + } + private static NatsStream CreateStream(StreamConfig? cfg = null) { cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; diff --git a/porting.db b/porting.db index 3d5bfc7..d8edbb5 100644 Binary files a/porting.db and b/porting.db differ