diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs new file mode 100644 index 0000000..927c304 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs @@ -0,0 +1,482 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + private const string MqttPrefix = "$MQTT."; + private const string ReplyPrefix = "_R_."; + private const int MsgScratchSize = 1024; + private const int MaxDenyPermCacheSize = 256; + private const int MaxPermCacheSize = 128; + private const int PruneSize = 32; + private static readonly TimeSpan StallMin = TimeSpan.FromMilliseconds(2); + private static readonly TimeSpan StallMax = TimeSpan.FromMilliseconds(5); + private static readonly TimeSpan StallTotal = TimeSpan.FromMilliseconds(10); + + internal Exception? AddShadowSubscriptions(INatsAccount? acc, Subscription sub) + { + if (acc is null) + return new InvalidOperationException("missing account"); + _ = sub; + return null; + } + + internal (Subscription? shadow, Exception? err) AddShadowSub(Subscription sub, object? ime) + { + _ = ime; + var copy = new Subscription + { + Subject = sub.Subject.ToArray(), + Queue = sub.Queue?.ToArray(), + Sid = sub.Sid?.ToArray(), + Qw = sub.Qw, + Client = sub.Client, + }; + return (copy, null); + } + + internal bool CanSubscribe(string subject, string? queue = null) + { + if (Perms is null) + return true; + + var checkAllow = !((IsMqtt() || Kind != ClientKind.Client) && subject.StartsWith(MqttPrefix, StringComparison.Ordinal)); + var allowed = true; + + if (checkAllow && Perms.Sub.Allow is not null) + { + var result = Perms.Sub.Allow.Match(subject); + allowed = result.PSubs.Count > 0; + if (!string.IsNullOrEmpty(queue) && result.QSubs.Count > 0) + allowed = QueueMatches(queue, result.QSubs); + + if (!allowed && Kind == ClientKind.Leaf && SubscriptionIndex.SubjectHasWildcard(subject)) + { + var reverse = Perms.Sub.Allow.ReverseMatch(subject); + allowed = reverse.PSubs.Count != 0; + } + } + + if (allowed && Perms.Sub.Deny is not null) + { + var result = Perms.Sub.Deny.Match(subject); + allowed = result.PSubs.Count == 0; + if (!string.IsNullOrEmpty(queue) && result.QSubs.Count > 0) + allowed = !QueueMatches(queue, result.QSubs); + + if (allowed && MPerms is null && SubscriptionIndex.SubjectHasWildcard(subject) && DArray is not null) + { + foreach (var deny in DArray.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(deny, subject)) + { + LoadMsgDenyFilter(); + break; + } + } + } + } + + return allowed; + } + + internal static bool QueueMatches(string queue, IReadOnlyList> qsubs) + { + if (qsubs.Count == 0) + return true; + + foreach (var qsub in qsubs) + { + if (qsub.Count == 0 || qsub[0].Queue is not { Length: > 0 } q) + continue; + + var qname = Encoding.ASCII.GetString(q); + if (queue == qname) + return true; + if (SubscriptionIndex.SubjectHasWildcard(qname) && SubscriptionIndex.SubjectIsSubsetMatch(queue, qname)) + return true; + } + return false; + } + + internal void Unsubscribe(INatsAccount? acc, Subscription sub, bool force, bool remove) + { + if (!force && sub.IsClosed()) + return; + + lock (_mu) + { + if (remove && sub.Sid is { Length: > 0 } sid) + Subs.Remove(Encoding.ASCII.GetString(sid)); + sub.Close(); + } + + _ = acc; + } + + internal Exception? ProcessUnsub(byte[] arg) + { + var args = SplitArg(arg); + if (args.Count is < 1 or > 2) + return new FormatException($"processUnsub Parse Error: {Encoding.ASCII.GetString(arg)}"); + + var sid = Encoding.ASCII.GetString(args[0]); + lock (_mu) + { + _in.Subs++; + if (!Subs.TryGetValue(sid, out var sub)) + return null; + + // Max-delivery based deferred unsub is not modeled yet, so unsubscribe immediately. + Unsubscribe(Account, sub, force: true, remove: true); + } + + if (Opts.Verbose) + SendOK(); + + return null; + } + + internal bool CheckDenySub(string subject) + { + if (MPerms is null || MPerms.Deny is null) + return false; + + if (MPerms.DCache.TryGetValue(subject, out var denied)) + return denied; + + var (np, _) = MPerms.Deny.NumInterest(subject); + denied = np != 0; + MPerms.DCache[subject] = denied; + if (MPerms.DCache.Count > MaxDenyPermCacheSize) + PruneDenyCache(); + return denied; + } + + internal byte[] MsgHeaderForRouteOrLeaf(byte[] subj, byte[]? reply, RouteTarget rt, INatsAccount? acc) + { + var msg = new List(MsgScratchSize) + { + (byte)(rt.Sub?.Client?.Kind == ClientKind.Leaf ? 'L' : 'R'), + (byte)'M', + (byte)'S', + (byte)'G', + (byte)' ', + }; + + if (acc is not null && rt.Sub?.Client?.Kind == ClientKind.Router) + { + msg.AddRange(Encoding.ASCII.GetBytes(acc.Name)); + msg.Add((byte)' '); + } + + msg.AddRange(subj); + msg.Add((byte)' '); + + if (rt.Qs.Length > 0) + { + if (reply is { Length: > 0 }) + { + msg.Add((byte)'+'); + msg.Add((byte)' '); + msg.AddRange(reply); + msg.Add((byte)' '); + } + else + { + msg.Add((byte)'|'); + msg.Add((byte)' '); + } + msg.AddRange(rt.Qs); + msg.Add((byte)' '); + } + else if (reply is { Length: > 0 }) + { + msg.AddRange(reply); + msg.Add((byte)' '); + } + + var pa = ParseCtx.Pa; + if (pa.HeaderSize > 0) + { + msg.AddRange(pa.HeaderBytes ?? Encoding.ASCII.GetBytes(pa.HeaderSize.ToString())); + msg.Add((byte)' '); + msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString())); + } + else + { + msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString())); + } + + msg.Add((byte)'\r'); + msg.Add((byte)'\n'); + return msg.ToArray(); + } + + internal byte[] MsgHeader(byte[] subj, byte[]? reply, Subscription sub) + { + var pa = ParseCtx.Pa; + var hasHeader = pa.HeaderSize > 0; + var msg = new List(MsgScratchSize); + if (hasHeader) + msg.Add((byte)'H'); + msg.Add((byte)'M'); + msg.Add((byte)'S'); + msg.Add((byte)'G'); + msg.Add((byte)' '); + msg.AddRange(subj); + msg.Add((byte)' '); + + if (sub.Sid is { Length: > 0 }) + { + msg.AddRange(sub.Sid); + msg.Add((byte)' '); + } + + if (reply is { Length: > 0 }) + { + msg.AddRange(reply); + msg.Add((byte)' '); + } + + if (hasHeader) + { + msg.AddRange(pa.HeaderBytes ?? Encoding.ASCII.GetBytes(pa.HeaderSize.ToString())); + msg.Add((byte)' '); + msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString())); + } + else + { + msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString())); + } + + msg.Add((byte)'\r'); + msg.Add((byte)'\n'); + return msg.ToArray(); + } + + internal void StalledWait(ClientConnection producer) + { + if (producer._in.Tst > StallTotal) + return; + + var ttl = OutPb >= OutMp && OutMp > 0 ? StallMax : StallMin; + if (producer._in.Tst + ttl > StallTotal) + ttl = StallTotal - producer._in.Tst; + if (ttl <= TimeSpan.Zero) + return; + + var start = DateTime.UtcNow; + Thread.Sleep(ttl); + producer._in.Tst += DateTime.UtcNow - start; + } + + internal bool DeliverMsg(bool prodIsMqtt, Subscription sub, INatsAccount? acc, byte[] subject, byte[] reply, byte[] mh, byte[] msg, bool gwReply) + { + _ = acc; + _ = subject; + _ = reply; + _ = gwReply; + + if (sub.IsClosed()) + return false; + + QueueOutbound(mh); + QueueOutbound(msg); + if (prodIsMqtt) + QueueOutbound("\r\n"u8.ToArray()); + + AddToPCD(this); + return true; + } + + internal void AddToPCD(ClientConnection client) + { + Pcd ??= new Dictionary(); + if (Pcd.TryAdd(client, true)) + client.OutPb += 0; + } + + internal void TrackRemoteReply(string subject, string reply) + { + _ = subject; + _rrTracking ??= new RrTracking + { + RMap = new Dictionary(StringComparer.Ordinal), + Lrt = TimeSpan.FromSeconds(1), + }; + _rrTracking.RMap ??= new Dictionary(StringComparer.Ordinal); + _rrTracking.RMap[reply] = new RespEntry { Time = DateTime.UtcNow }; + } + + internal void PruneRemoteTracking() + { + lock (_mu) + { + if (_rrTracking?.RMap is null || _rrTracking.RMap.Count == 0) + { + _rrTracking = null; + return; + } + + var now = DateTime.UtcNow; + var ttl = _rrTracking.Lrt <= TimeSpan.Zero ? TimeSpan.FromSeconds(1) : _rrTracking.Lrt; + foreach (var key in _rrTracking.RMap.Keys.ToArray()) + { + if (_rrTracking.RMap[key] is RespEntry re && now - re.Time > ttl) + _rrTracking.RMap.Remove(key); + } + + if (_rrTracking.RMap.Count == 0) + { + _rrTracking.Ptmr?.Dispose(); + _rrTracking = null; + } + } + } + + internal void PruneReplyPerms() + { + var resp = Perms?.Resp; + if (resp is null || Replies is null) + return; + + var maxMsgs = resp.MaxMsgs; + var ttl = resp.Expires; + var now = DateTime.UtcNow; + + foreach (var k in Replies.Keys.ToArray()) + { + var r = Replies[k]; + if ((maxMsgs > 0 && r.N >= maxMsgs) || (ttl > TimeSpan.Zero && now - r.Time > ttl)) + Replies.Remove(k); + } + + RepliesSincePrune = 0; + LastReplyPrune = now; + } + + internal void PruneDenyCache() + { + if (MPerms is null) + return; + + var removed = 0; + foreach (var subject in MPerms.DCache.Keys.ToArray()) + { + MPerms.DCache.Remove(subject); + if (++removed >= PruneSize) + break; + } + } + + internal void PrunePubPermsCache() + { + if (Perms is null) + return; + + for (var i = 0; i < 5; i++) + { + if (Interlocked.CompareExchange(ref Perms.PRun, 1, 0) != 0) + return; + + var removed = 0; + foreach (var key in Perms.PCache.Keys.ToArray()) + { + if (Perms.PCache.Remove(key)) + removed++; + if (removed > PruneSize && Perms.PCache.Count <= MaxPermCacheSize) + break; + } + + Interlocked.Add(ref Perms.PcsZ, -removed); + Interlocked.Exchange(ref Perms.PRun, 0); + if (Perms.PCache.Count <= MaxPermCacheSize) + return; + } + } + + internal bool PubAllowed(string subject) => PubAllowedFullCheck(subject, fullCheck: true, hasLock: false); + + internal bool PubAllowedFullCheck(string subject, bool fullCheck, bool hasLock) + { + if (Perms is null || (Perms.Pub.Allow is null && Perms.Pub.Deny is null)) + return true; + + if (Perms.PCache.TryGetValue(subject, out var cached)) + return cached; + + var checkAllow = !((IsMqtt() || Kind != ClientKind.Client) && subject.StartsWith(MqttPrefix, StringComparison.Ordinal)); + var allowed = true; + + if (checkAllow && Perms.Pub.Allow is not null) + { + var (np, _) = Perms.Pub.Allow.NumInterest(subject); + allowed = np != 0; + } + + if (allowed && Perms.Pub.Deny is not null) + { + var (np, _) = Perms.Pub.Deny.NumInterest(subject); + allowed = np == 0; + } + + if (!allowed && fullCheck && Perms.Resp is not null && Replies is not null) + { + if (hasLock) + { + if (Replies.TryGetValue(subject, out var resp)) + { + resp.N++; + if ((Perms.Resp.MaxMsgs > 0 && resp.N > Perms.Resp.MaxMsgs) + || (Perms.Resp.Expires > TimeSpan.Zero && DateTime.UtcNow - resp.Time > Perms.Resp.Expires)) + { + Replies.Remove(subject); + } + else + { + Replies[subject] = resp; + allowed = true; + } + } + } + else + { + lock (_mu) + { + if (Replies.TryGetValue(subject, out var resp)) + { + resp.N++; + if ((Perms.Resp.MaxMsgs > 0 && resp.N > Perms.Resp.MaxMsgs) + || (Perms.Resp.Expires > TimeSpan.Zero && DateTime.UtcNow - resp.Time > Perms.Resp.Expires)) + { + Replies.Remove(subject); + } + else + { + Replies[subject] = resp; + allowed = true; + } + } + } + } + } + else + { + Perms.PCache[subject] = allowed; + if (Interlocked.Increment(ref Perms.PcsZ) > MaxPermCacheSize) + PrunePubPermsCache(); + } + + return allowed; + } + + internal static bool IsServiceReply(byte[] reply) => + reply.Length > 3 && Encoding.ASCII.GetString(reply, 0, 4) == ReplyPrefix; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 60bc7d8..2bb31fc 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1756,7 +1756,7 @@ public sealed partial class ClientConnection // features 471-486: processPub variants, parseSub, processSub, etc. // Implemented in full when Server+Account sessions complete. - // features 487-503: deliverMsg, addToPCD, trackRemoteReply, pruning, pubAllowed, etc. + // features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs // features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs index 57b3e75..592898c 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -6,7 +6,9 @@ using System.Text; using System.Linq; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server.Tests; @@ -152,4 +154,89 @@ public sealed class ClientConnectionStubFeaturesTests result.sub.ShouldNotBeNull(); c.Subs.Count.ShouldBe(2); } + + [Fact] + public void CanSubscribe_WithAllowAndDenyQueues_ShouldMatchExpected() + { + var c = new ClientConnection(ClientKind.Client) + { + Perms = new ClientPermissions(), + }; + c.Perms.Sub.Allow = SubscriptionIndex.NewSublistWithCache(); + c.Perms.Sub.Deny = SubscriptionIndex.NewSublistWithCache(); + c.Perms.Sub.Allow.Insert(new Subscription + { + Subject = Encoding.ASCII.GetBytes("foo.*"), + Queue = Encoding.ASCII.GetBytes("q"), + }); + c.Perms.Sub.Deny.Insert(new Subscription + { + Subject = Encoding.ASCII.GetBytes("foo.blocked"), + }); + + c.CanSubscribe("foo.bar", "q").ShouldBeTrue(); + c.CanSubscribe("foo.bar", "other").ShouldBeFalse(); + c.CanSubscribe("foo.blocked").ShouldBeFalse(); + } + + [Fact] + public void ProcessUnsub_WithKnownSid_ShouldRemoveSubscription() + { + var c = new ClientConnection(ClientKind.Client); + c.ParseSub(Encoding.ASCII.GetBytes("foo sid1"), noForward: false).ShouldBeNull(); + c.Subs.Count.ShouldBe(1); + + c.ProcessUnsub(Encoding.ASCII.GetBytes("sid1")).ShouldBeNull(); + c.Subs.ShouldNotContainKey("sid1"); + } + + [Fact] + public void MsgHeaderAndRouteHeader_ShouldIncludeSubjectsAndSizes() + { + var c = new ClientConnection(ClientKind.Client); + c.ParseCtx.Pa.HeaderSize = 10; + c.ParseCtx.Pa.Size = 30; + c.ParseCtx.Pa.HeaderBytes = Encoding.ASCII.GetBytes("10"); + c.ParseCtx.Pa.SizeBytes = Encoding.ASCII.GetBytes("30"); + + var sub = new Subscription { Sid = Encoding.ASCII.GetBytes("22") }; + var mh = c.MsgHeader(Encoding.ASCII.GetBytes("foo.bar"), Encoding.ASCII.GetBytes("_R_.x"), sub); + Encoding.ASCII.GetString(mh).ShouldContain("foo.bar 22 _R_.x"); + Encoding.ASCII.GetString(mh).ShouldContain("30"); + + var routeTarget = new RouteTarget { Sub = sub, Qs = Encoding.ASCII.GetBytes("q1 q2") }; + var rmh = c.MsgHeaderForRouteOrLeaf( + Encoding.ASCII.GetBytes("foo.bar"), + Encoding.ASCII.GetBytes("_R_.x"), + routeTarget, + null); + Encoding.ASCII.GetString(rmh).ShouldContain("foo.bar"); + Encoding.ASCII.GetString(rmh).ShouldContain("q1 q2"); + } + + [Fact] + public void PubAllowedFullCheck_ShouldHonorResponseReplyCache() + { + var c = new ClientConnection(ClientKind.Client) + { + Perms = new ClientPermissions + { + Resp = new ResponsePermission + { + MaxMsgs = 2, + Expires = TimeSpan.FromMinutes(1), + }, + }, + Replies = new Dictionary(StringComparer.Ordinal) + { + ["_R_.x"] = new RespEntry { Time = DateTime.UtcNow, N = 0 }, + }, + }; + c.Perms.Pub.Deny = SubscriptionIndex.NewSublistWithCache(); + c.Perms.Pub.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(">") }); + + c.PubAllowed("_R_.x").ShouldBeTrue(); + c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeTrue(); + c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeFalse(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs index cf4a561..0486941 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs @@ -189,6 +189,13 @@ public sealed class ClientTests c2.SetExpiration(DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeSeconds(), TimeSpan.Zero); SpinWait.SpinUntil(c2.IsClosed, TimeSpan.FromSeconds(2)).ShouldBeTrue(); } + + [Fact] + public void ReplyHelpers_ServiceAndReserved_ShouldClassifyPrefixes() + { + ClientConnection.IsServiceReply(Encoding.ASCII.GetBytes("_R_.A.B")).ShouldBeTrue(); + ClientConnection.IsServiceReply(Encoding.ASCII.GetBytes("foo.bar")).ShouldBeFalse(); + } } /// diff --git a/porting.db b/porting.db index baf14ed..a228dc6 100644 Binary files a/porting.db and b/porting.db differ