feat(batch17): port subscription and delivery client core features

This commit is contained in:
Joseph Doherty
2026-02-28 19:12:58 -05:00
parent aeeb73f699
commit 1baba5ac0e
5 changed files with 577 additions and 1 deletions

View File

@@ -0,0 +1,482 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class ClientConnection
{
private const string MqttPrefix = "$MQTT.";
private const string ReplyPrefix = "_R_.";
private const int MsgScratchSize = 1024;
private const int MaxDenyPermCacheSize = 256;
private const int MaxPermCacheSize = 128;
private const int PruneSize = 32;
private static readonly TimeSpan StallMin = TimeSpan.FromMilliseconds(2);
private static readonly TimeSpan StallMax = TimeSpan.FromMilliseconds(5);
private static readonly TimeSpan StallTotal = TimeSpan.FromMilliseconds(10);
internal Exception? AddShadowSubscriptions(INatsAccount? acc, Subscription sub)
{
if (acc is null)
return new InvalidOperationException("missing account");
_ = sub;
return null;
}
internal (Subscription? shadow, Exception? err) AddShadowSub(Subscription sub, object? ime)
{
_ = ime;
var copy = new Subscription
{
Subject = sub.Subject.ToArray(),
Queue = sub.Queue?.ToArray(),
Sid = sub.Sid?.ToArray(),
Qw = sub.Qw,
Client = sub.Client,
};
return (copy, null);
}
internal bool CanSubscribe(string subject, string? queue = null)
{
if (Perms is null)
return true;
var checkAllow = !((IsMqtt() || Kind != ClientKind.Client) && subject.StartsWith(MqttPrefix, StringComparison.Ordinal));
var allowed = true;
if (checkAllow && Perms.Sub.Allow is not null)
{
var result = Perms.Sub.Allow.Match(subject);
allowed = result.PSubs.Count > 0;
if (!string.IsNullOrEmpty(queue) && result.QSubs.Count > 0)
allowed = QueueMatches(queue, result.QSubs);
if (!allowed && Kind == ClientKind.Leaf && SubscriptionIndex.SubjectHasWildcard(subject))
{
var reverse = Perms.Sub.Allow.ReverseMatch(subject);
allowed = reverse.PSubs.Count != 0;
}
}
if (allowed && Perms.Sub.Deny is not null)
{
var result = Perms.Sub.Deny.Match(subject);
allowed = result.PSubs.Count == 0;
if (!string.IsNullOrEmpty(queue) && result.QSubs.Count > 0)
allowed = !QueueMatches(queue, result.QSubs);
if (allowed && MPerms is null && SubscriptionIndex.SubjectHasWildcard(subject) && DArray is not null)
{
foreach (var deny in DArray.Keys)
{
if (SubscriptionIndex.SubjectIsSubsetMatch(deny, subject))
{
LoadMsgDenyFilter();
break;
}
}
}
}
return allowed;
}
internal static bool QueueMatches(string queue, IReadOnlyList<List<Subscription>> qsubs)
{
if (qsubs.Count == 0)
return true;
foreach (var qsub in qsubs)
{
if (qsub.Count == 0 || qsub[0].Queue is not { Length: > 0 } q)
continue;
var qname = Encoding.ASCII.GetString(q);
if (queue == qname)
return true;
if (SubscriptionIndex.SubjectHasWildcard(qname) && SubscriptionIndex.SubjectIsSubsetMatch(queue, qname))
return true;
}
return false;
}
internal void Unsubscribe(INatsAccount? acc, Subscription sub, bool force, bool remove)
{
if (!force && sub.IsClosed())
return;
lock (_mu)
{
if (remove && sub.Sid is { Length: > 0 } sid)
Subs.Remove(Encoding.ASCII.GetString(sid));
sub.Close();
}
_ = acc;
}
internal Exception? ProcessUnsub(byte[] arg)
{
var args = SplitArg(arg);
if (args.Count is < 1 or > 2)
return new FormatException($"processUnsub Parse Error: {Encoding.ASCII.GetString(arg)}");
var sid = Encoding.ASCII.GetString(args[0]);
lock (_mu)
{
_in.Subs++;
if (!Subs.TryGetValue(sid, out var sub))
return null;
// Max-delivery based deferred unsub is not modeled yet, so unsubscribe immediately.
Unsubscribe(Account, sub, force: true, remove: true);
}
if (Opts.Verbose)
SendOK();
return null;
}
internal bool CheckDenySub(string subject)
{
if (MPerms is null || MPerms.Deny is null)
return false;
if (MPerms.DCache.TryGetValue(subject, out var denied))
return denied;
var (np, _) = MPerms.Deny.NumInterest(subject);
denied = np != 0;
MPerms.DCache[subject] = denied;
if (MPerms.DCache.Count > MaxDenyPermCacheSize)
PruneDenyCache();
return denied;
}
internal byte[] MsgHeaderForRouteOrLeaf(byte[] subj, byte[]? reply, RouteTarget rt, INatsAccount? acc)
{
var msg = new List<byte>(MsgScratchSize)
{
(byte)(rt.Sub?.Client?.Kind == ClientKind.Leaf ? 'L' : 'R'),
(byte)'M',
(byte)'S',
(byte)'G',
(byte)' ',
};
if (acc is not null && rt.Sub?.Client?.Kind == ClientKind.Router)
{
msg.AddRange(Encoding.ASCII.GetBytes(acc.Name));
msg.Add((byte)' ');
}
msg.AddRange(subj);
msg.Add((byte)' ');
if (rt.Qs.Length > 0)
{
if (reply is { Length: > 0 })
{
msg.Add((byte)'+');
msg.Add((byte)' ');
msg.AddRange(reply);
msg.Add((byte)' ');
}
else
{
msg.Add((byte)'|');
msg.Add((byte)' ');
}
msg.AddRange(rt.Qs);
msg.Add((byte)' ');
}
else if (reply is { Length: > 0 })
{
msg.AddRange(reply);
msg.Add((byte)' ');
}
var pa = ParseCtx.Pa;
if (pa.HeaderSize > 0)
{
msg.AddRange(pa.HeaderBytes ?? Encoding.ASCII.GetBytes(pa.HeaderSize.ToString()));
msg.Add((byte)' ');
msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString()));
}
else
{
msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString()));
}
msg.Add((byte)'\r');
msg.Add((byte)'\n');
return msg.ToArray();
}
internal byte[] MsgHeader(byte[] subj, byte[]? reply, Subscription sub)
{
var pa = ParseCtx.Pa;
var hasHeader = pa.HeaderSize > 0;
var msg = new List<byte>(MsgScratchSize);
if (hasHeader)
msg.Add((byte)'H');
msg.Add((byte)'M');
msg.Add((byte)'S');
msg.Add((byte)'G');
msg.Add((byte)' ');
msg.AddRange(subj);
msg.Add((byte)' ');
if (sub.Sid is { Length: > 0 })
{
msg.AddRange(sub.Sid);
msg.Add((byte)' ');
}
if (reply is { Length: > 0 })
{
msg.AddRange(reply);
msg.Add((byte)' ');
}
if (hasHeader)
{
msg.AddRange(pa.HeaderBytes ?? Encoding.ASCII.GetBytes(pa.HeaderSize.ToString()));
msg.Add((byte)' ');
msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString()));
}
else
{
msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString()));
}
msg.Add((byte)'\r');
msg.Add((byte)'\n');
return msg.ToArray();
}
internal void StalledWait(ClientConnection producer)
{
if (producer._in.Tst > StallTotal)
return;
var ttl = OutPb >= OutMp && OutMp > 0 ? StallMax : StallMin;
if (producer._in.Tst + ttl > StallTotal)
ttl = StallTotal - producer._in.Tst;
if (ttl <= TimeSpan.Zero)
return;
var start = DateTime.UtcNow;
Thread.Sleep(ttl);
producer._in.Tst += DateTime.UtcNow - start;
}
internal bool DeliverMsg(bool prodIsMqtt, Subscription sub, INatsAccount? acc, byte[] subject, byte[] reply, byte[] mh, byte[] msg, bool gwReply)
{
_ = acc;
_ = subject;
_ = reply;
_ = gwReply;
if (sub.IsClosed())
return false;
QueueOutbound(mh);
QueueOutbound(msg);
if (prodIsMqtt)
QueueOutbound("\r\n"u8.ToArray());
AddToPCD(this);
return true;
}
internal void AddToPCD(ClientConnection client)
{
Pcd ??= new Dictionary<ClientConnection, bool>();
if (Pcd.TryAdd(client, true))
client.OutPb += 0;
}
internal void TrackRemoteReply(string subject, string reply)
{
_ = subject;
_rrTracking ??= new RrTracking
{
RMap = new Dictionary<string, object>(StringComparer.Ordinal),
Lrt = TimeSpan.FromSeconds(1),
};
_rrTracking.RMap ??= new Dictionary<string, object>(StringComparer.Ordinal);
_rrTracking.RMap[reply] = new RespEntry { Time = DateTime.UtcNow };
}
internal void PruneRemoteTracking()
{
lock (_mu)
{
if (_rrTracking?.RMap is null || _rrTracking.RMap.Count == 0)
{
_rrTracking = null;
return;
}
var now = DateTime.UtcNow;
var ttl = _rrTracking.Lrt <= TimeSpan.Zero ? TimeSpan.FromSeconds(1) : _rrTracking.Lrt;
foreach (var key in _rrTracking.RMap.Keys.ToArray())
{
if (_rrTracking.RMap[key] is RespEntry re && now - re.Time > ttl)
_rrTracking.RMap.Remove(key);
}
if (_rrTracking.RMap.Count == 0)
{
_rrTracking.Ptmr?.Dispose();
_rrTracking = null;
}
}
}
internal void PruneReplyPerms()
{
var resp = Perms?.Resp;
if (resp is null || Replies is null)
return;
var maxMsgs = resp.MaxMsgs;
var ttl = resp.Expires;
var now = DateTime.UtcNow;
foreach (var k in Replies.Keys.ToArray())
{
var r = Replies[k];
if ((maxMsgs > 0 && r.N >= maxMsgs) || (ttl > TimeSpan.Zero && now - r.Time > ttl))
Replies.Remove(k);
}
RepliesSincePrune = 0;
LastReplyPrune = now;
}
internal void PruneDenyCache()
{
if (MPerms is null)
return;
var removed = 0;
foreach (var subject in MPerms.DCache.Keys.ToArray())
{
MPerms.DCache.Remove(subject);
if (++removed >= PruneSize)
break;
}
}
internal void PrunePubPermsCache()
{
if (Perms is null)
return;
for (var i = 0; i < 5; i++)
{
if (Interlocked.CompareExchange(ref Perms.PRun, 1, 0) != 0)
return;
var removed = 0;
foreach (var key in Perms.PCache.Keys.ToArray())
{
if (Perms.PCache.Remove(key))
removed++;
if (removed > PruneSize && Perms.PCache.Count <= MaxPermCacheSize)
break;
}
Interlocked.Add(ref Perms.PcsZ, -removed);
Interlocked.Exchange(ref Perms.PRun, 0);
if (Perms.PCache.Count <= MaxPermCacheSize)
return;
}
}
internal bool PubAllowed(string subject) => PubAllowedFullCheck(subject, fullCheck: true, hasLock: false);
internal bool PubAllowedFullCheck(string subject, bool fullCheck, bool hasLock)
{
if (Perms is null || (Perms.Pub.Allow is null && Perms.Pub.Deny is null))
return true;
if (Perms.PCache.TryGetValue(subject, out var cached))
return cached;
var checkAllow = !((IsMqtt() || Kind != ClientKind.Client) && subject.StartsWith(MqttPrefix, StringComparison.Ordinal));
var allowed = true;
if (checkAllow && Perms.Pub.Allow is not null)
{
var (np, _) = Perms.Pub.Allow.NumInterest(subject);
allowed = np != 0;
}
if (allowed && Perms.Pub.Deny is not null)
{
var (np, _) = Perms.Pub.Deny.NumInterest(subject);
allowed = np == 0;
}
if (!allowed && fullCheck && Perms.Resp is not null && Replies is not null)
{
if (hasLock)
{
if (Replies.TryGetValue(subject, out var resp))
{
resp.N++;
if ((Perms.Resp.MaxMsgs > 0 && resp.N > Perms.Resp.MaxMsgs)
|| (Perms.Resp.Expires > TimeSpan.Zero && DateTime.UtcNow - resp.Time > Perms.Resp.Expires))
{
Replies.Remove(subject);
}
else
{
Replies[subject] = resp;
allowed = true;
}
}
}
else
{
lock (_mu)
{
if (Replies.TryGetValue(subject, out var resp))
{
resp.N++;
if ((Perms.Resp.MaxMsgs > 0 && resp.N > Perms.Resp.MaxMsgs)
|| (Perms.Resp.Expires > TimeSpan.Zero && DateTime.UtcNow - resp.Time > Perms.Resp.Expires))
{
Replies.Remove(subject);
}
else
{
Replies[subject] = resp;
allowed = true;
}
}
}
}
}
else
{
Perms.PCache[subject] = allowed;
if (Interlocked.Increment(ref Perms.PcsZ) > MaxPermCacheSize)
PrunePubPermsCache();
}
return allowed;
}
internal static bool IsServiceReply(byte[] reply) =>
reply.Length > 3 && Encoding.ASCII.GetString(reply, 0, 4) == ReplyPrefix;
}

View File

@@ -1756,7 +1756,7 @@ public sealed partial class ClientConnection
// features 471-486: processPub variants, parseSub, processSub, etc.
// Implemented in full when Server+Account sessions complete.
// features 487-503: deliverMsg, addToPCD, trackRemoteReply, pruning, pubAllowed, etc.
// features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs
// features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults

View File

@@ -6,7 +6,9 @@ using System.Text;
using System.Linq;
using Shouldly;
using ZB.MOM.NatsNet.Server;
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server.Tests;
@@ -152,4 +154,89 @@ public sealed class ClientConnectionStubFeaturesTests
result.sub.ShouldNotBeNull();
c.Subs.Count.ShouldBe(2);
}
[Fact]
public void CanSubscribe_WithAllowAndDenyQueues_ShouldMatchExpected()
{
var c = new ClientConnection(ClientKind.Client)
{
Perms = new ClientPermissions(),
};
c.Perms.Sub.Allow = SubscriptionIndex.NewSublistWithCache();
c.Perms.Sub.Deny = SubscriptionIndex.NewSublistWithCache();
c.Perms.Sub.Allow.Insert(new Subscription
{
Subject = Encoding.ASCII.GetBytes("foo.*"),
Queue = Encoding.ASCII.GetBytes("q"),
});
c.Perms.Sub.Deny.Insert(new Subscription
{
Subject = Encoding.ASCII.GetBytes("foo.blocked"),
});
c.CanSubscribe("foo.bar", "q").ShouldBeTrue();
c.CanSubscribe("foo.bar", "other").ShouldBeFalse();
c.CanSubscribe("foo.blocked").ShouldBeFalse();
}
[Fact]
public void ProcessUnsub_WithKnownSid_ShouldRemoveSubscription()
{
var c = new ClientConnection(ClientKind.Client);
c.ParseSub(Encoding.ASCII.GetBytes("foo sid1"), noForward: false).ShouldBeNull();
c.Subs.Count.ShouldBe(1);
c.ProcessUnsub(Encoding.ASCII.GetBytes("sid1")).ShouldBeNull();
c.Subs.ShouldNotContainKey("sid1");
}
[Fact]
public void MsgHeaderAndRouteHeader_ShouldIncludeSubjectsAndSizes()
{
var c = new ClientConnection(ClientKind.Client);
c.ParseCtx.Pa.HeaderSize = 10;
c.ParseCtx.Pa.Size = 30;
c.ParseCtx.Pa.HeaderBytes = Encoding.ASCII.GetBytes("10");
c.ParseCtx.Pa.SizeBytes = Encoding.ASCII.GetBytes("30");
var sub = new Subscription { Sid = Encoding.ASCII.GetBytes("22") };
var mh = c.MsgHeader(Encoding.ASCII.GetBytes("foo.bar"), Encoding.ASCII.GetBytes("_R_.x"), sub);
Encoding.ASCII.GetString(mh).ShouldContain("foo.bar 22 _R_.x");
Encoding.ASCII.GetString(mh).ShouldContain("30");
var routeTarget = new RouteTarget { Sub = sub, Qs = Encoding.ASCII.GetBytes("q1 q2") };
var rmh = c.MsgHeaderForRouteOrLeaf(
Encoding.ASCII.GetBytes("foo.bar"),
Encoding.ASCII.GetBytes("_R_.x"),
routeTarget,
null);
Encoding.ASCII.GetString(rmh).ShouldContain("foo.bar");
Encoding.ASCII.GetString(rmh).ShouldContain("q1 q2");
}
[Fact]
public void PubAllowedFullCheck_ShouldHonorResponseReplyCache()
{
var c = new ClientConnection(ClientKind.Client)
{
Perms = new ClientPermissions
{
Resp = new ResponsePermission
{
MaxMsgs = 2,
Expires = TimeSpan.FromMinutes(1),
},
},
Replies = new Dictionary<string, RespEntry>(StringComparer.Ordinal)
{
["_R_.x"] = new RespEntry { Time = DateTime.UtcNow, N = 0 },
},
};
c.Perms.Pub.Deny = SubscriptionIndex.NewSublistWithCache();
c.Perms.Pub.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(">") });
c.PubAllowed("_R_.x").ShouldBeTrue();
c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeTrue();
c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeFalse();
}
}

View File

@@ -189,6 +189,13 @@ public sealed class ClientTests
c2.SetExpiration(DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeSeconds(), TimeSpan.Zero);
SpinWait.SpinUntil(c2.IsClosed, TimeSpan.FromSeconds(2)).ShouldBeTrue();
}
[Fact]
public void ReplyHelpers_ServiceAndReserved_ShouldClassifyPrefixes()
{
ClientConnection.IsServiceReply(Encoding.ASCII.GetBytes("_R_.A.B")).ShouldBeTrue();
ClientConnection.IsServiceReply(Encoding.ASCII.GetBytes("foo.bar")).ShouldBeFalse();
}
}
/// <summary>

Binary file not shown.