diff --git a/src/NATS.Server/Auth/ClientPermissions.cs b/src/NATS.Server/Auth/ClientPermissions.cs index 266cece..b4d7dc7 100644 --- a/src/NATS.Server/Auth/ClientPermissions.cs +++ b/src/NATS.Server/Auth/ClientPermissions.cs @@ -6,12 +6,14 @@ public sealed class ClientPermissions : IDisposable { private readonly PermissionSet? _publish; private readonly PermissionSet? _subscribe; + private readonly ResponseTracker? _responseTracker; private readonly PermissionLruCache _pubCache = new(128); - private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe) + private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe, ResponseTracker? responseTracker) { _publish = publish; _subscribe = subscribe; + _responseTracker = responseTracker; } public static ClientPermissions? Build(Permissions? permissions) @@ -21,13 +23,18 @@ public sealed class ClientPermissions : IDisposable var pub = PermissionSet.Build(permissions.Publish); var sub = PermissionSet.Build(permissions.Subscribe); + ResponseTracker? responseTracker = null; + if (permissions.Response != null) + responseTracker = new ResponseTracker(permissions.Response.MaxMsgs, permissions.Response.Expires); - if (pub == null && sub == null) + if (pub == null && sub == null && responseTracker == null) return null; - return new ClientPermissions(pub, sub); + return new ClientPermissions(pub, sub, responseTracker); } + public ResponseTracker? ResponseTracker => _responseTracker; + public bool IsPublishAllowed(string subject) { if (_publish == null) @@ -37,6 +44,14 @@ public sealed class ClientPermissions : IDisposable return cached; var allowed = _publish.IsAllowed(subject); + + // If denied but response tracking is enabled, check reply table + if (!allowed && _responseTracker != null) + { + if (_responseTracker.IsReplyAllowed(subject)) + return true; // Don't cache dynamic reply permissions + } + _pubCache.Set(subject, allowed); return allowed; } diff --git a/src/NATS.Server/Auth/ResponseTracker.cs b/src/NATS.Server/Auth/ResponseTracker.cs new file mode 100644 index 0000000..e4e62db --- /dev/null +++ b/src/NATS.Server/Auth/ResponseTracker.cs @@ -0,0 +1,78 @@ +namespace NATS.Server.Auth; + +/// +/// Tracks reply subjects that a client is temporarily allowed to publish to. +/// Reference: Go client.go resp struct, setResponsePermissionIfNeeded. +/// +public sealed class ResponseTracker +{ + private readonly int _maxMsgs; // 0 = unlimited + private readonly TimeSpan _expires; // TimeSpan.Zero = no TTL + private readonly Dictionary _replies = new(StringComparer.Ordinal); + private readonly object _lock = new(); + + public ResponseTracker(int maxMsgs, TimeSpan expires) + { + _maxMsgs = maxMsgs; + _expires = expires; + } + + public int Count + { + get { lock (_lock) return _replies.Count; } + } + + public void RegisterReply(string replySubject) + { + lock (_lock) + { + _replies[replySubject] = (DateTime.UtcNow, 0); + } + } + + public bool IsReplyAllowed(string subject) + { + lock (_lock) + { + if (!_replies.TryGetValue(subject, out var entry)) + return false; + + if (_expires > TimeSpan.Zero && DateTime.UtcNow - entry.RegisteredAt > _expires) + { + _replies.Remove(subject); + return false; + } + + var newCount = entry.Count + 1; + if (_maxMsgs > 0 && newCount > _maxMsgs) + { + _replies.Remove(subject); + return false; + } + + _replies[subject] = (entry.RegisteredAt, newCount); + return true; + } + } + + public void Prune() + { + lock (_lock) + { + if (_expires <= TimeSpan.Zero && _maxMsgs <= 0) + return; + + var now = DateTime.UtcNow; + var toRemove = new List(); + foreach (var (key, entry) in _replies) + { + if (_expires > TimeSpan.Zero && now - entry.RegisteredAt > _expires) + toRemove.Add(key); + else if (_maxMsgs > 0 && entry.Count >= _maxMsgs) + toRemove.Add(key); + } + foreach (var key in toRemove) + _replies.Remove(key); + } + } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 49b88b1..270892f 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -545,6 +545,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable return; client.SendMessage(subject, sub.Sid, replyTo, headers, payload); + + // Track reply subject for response permissions + if (replyTo != null && client.Permissions?.ResponseTracker != null) + { + if (client.Permissions.IsPublishAllowed(replyTo) == false) + client.Permissions.ResponseTracker.RegisterReply(replyTo); + } } private static void SendNoResponders(NatsClient sender, string replyTo) diff --git a/tests/NATS.Server.Tests/ResponseTrackerTests.cs b/tests/NATS.Server.Tests/ResponseTrackerTests.cs new file mode 100644 index 0000000..b5758dc --- /dev/null +++ b/tests/NATS.Server.Tests/ResponseTrackerTests.cs @@ -0,0 +1,51 @@ +using NATS.Server.Auth; + +namespace NATS.Server.Tests; + +public class ResponseTrackerTests +{ + [Fact] + public void Allows_reply_subject_after_registration() + { + var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5)); + tracker.RegisterReply("_INBOX.abc123"); + tracker.IsReplyAllowed("_INBOX.abc123").ShouldBeTrue(); + } + + [Fact] + public void Denies_unknown_reply_subject() + { + var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5)); + tracker.IsReplyAllowed("_INBOX.unknown").ShouldBeFalse(); + } + + [Fact] + public void Enforces_max_messages() + { + var tracker = new ResponseTracker(maxMsgs: 2, expires: TimeSpan.FromMinutes(5)); + tracker.RegisterReply("_INBOX.abc"); + tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue(); + tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue(); + tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse(); // exceeded + } + + [Fact] + public void Enforces_expiry() + { + var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1)); + tracker.RegisterReply("_INBOX.abc"); + Thread.Sleep(50); + tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse(); + } + + [Fact] + public void Prune_removes_expired() + { + var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1)); + tracker.RegisterReply("_INBOX.a"); + tracker.RegisterReply("_INBOX.b"); + Thread.Sleep(50); + tracker.Prune(); + tracker.Count.ShouldBe(0); + } +}