feat: add response permission tracking for dynamic reply subject authorization
This commit is contained in:
@@ -6,12 +6,14 @@ public sealed class ClientPermissions : IDisposable
|
||||
{
|
||||
private readonly PermissionSet? _publish;
|
||||
private readonly PermissionSet? _subscribe;
|
||||
private readonly ResponseTracker? _responseTracker;
|
||||
private readonly PermissionLruCache _pubCache = new(128);
|
||||
|
||||
private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe)
|
||||
private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe, ResponseTracker? responseTracker)
|
||||
{
|
||||
_publish = publish;
|
||||
_subscribe = subscribe;
|
||||
_responseTracker = responseTracker;
|
||||
}
|
||||
|
||||
public static ClientPermissions? Build(Permissions? permissions)
|
||||
@@ -21,13 +23,18 @@ public sealed class ClientPermissions : IDisposable
|
||||
|
||||
var pub = PermissionSet.Build(permissions.Publish);
|
||||
var sub = PermissionSet.Build(permissions.Subscribe);
|
||||
ResponseTracker? responseTracker = null;
|
||||
if (permissions.Response != null)
|
||||
responseTracker = new ResponseTracker(permissions.Response.MaxMsgs, permissions.Response.Expires);
|
||||
|
||||
if (pub == null && sub == null)
|
||||
if (pub == null && sub == null && responseTracker == null)
|
||||
return null;
|
||||
|
||||
return new ClientPermissions(pub, sub);
|
||||
return new ClientPermissions(pub, sub, responseTracker);
|
||||
}
|
||||
|
||||
public ResponseTracker? ResponseTracker => _responseTracker;
|
||||
|
||||
public bool IsPublishAllowed(string subject)
|
||||
{
|
||||
if (_publish == null)
|
||||
@@ -37,6 +44,14 @@ public sealed class ClientPermissions : IDisposable
|
||||
return cached;
|
||||
|
||||
var allowed = _publish.IsAllowed(subject);
|
||||
|
||||
// If denied but response tracking is enabled, check reply table
|
||||
if (!allowed && _responseTracker != null)
|
||||
{
|
||||
if (_responseTracker.IsReplyAllowed(subject))
|
||||
return true; // Don't cache dynamic reply permissions
|
||||
}
|
||||
|
||||
_pubCache.Set(subject, allowed);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
78
src/NATS.Server/Auth/ResponseTracker.cs
Normal file
78
src/NATS.Server/Auth/ResponseTracker.cs
Normal file
@@ -0,0 +1,78 @@
|
||||
namespace NATS.Server.Auth;
|
||||
|
||||
/// <summary>
|
||||
/// Tracks reply subjects that a client is temporarily allowed to publish to.
|
||||
/// Reference: Go client.go resp struct, setResponsePermissionIfNeeded.
|
||||
/// </summary>
|
||||
public sealed class ResponseTracker
|
||||
{
|
||||
private readonly int _maxMsgs; // 0 = unlimited
|
||||
private readonly TimeSpan _expires; // TimeSpan.Zero = no TTL
|
||||
private readonly Dictionary<string, (DateTime RegisteredAt, int Count)> _replies = new(StringComparer.Ordinal);
|
||||
private readonly object _lock = new();
|
||||
|
||||
public ResponseTracker(int maxMsgs, TimeSpan expires)
|
||||
{
|
||||
_maxMsgs = maxMsgs;
|
||||
_expires = expires;
|
||||
}
|
||||
|
||||
public int Count
|
||||
{
|
||||
get { lock (_lock) return _replies.Count; }
|
||||
}
|
||||
|
||||
public void RegisterReply(string replySubject)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_replies[replySubject] = (DateTime.UtcNow, 0);
|
||||
}
|
||||
}
|
||||
|
||||
public bool IsReplyAllowed(string subject)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (!_replies.TryGetValue(subject, out var entry))
|
||||
return false;
|
||||
|
||||
if (_expires > TimeSpan.Zero && DateTime.UtcNow - entry.RegisteredAt > _expires)
|
||||
{
|
||||
_replies.Remove(subject);
|
||||
return false;
|
||||
}
|
||||
|
||||
var newCount = entry.Count + 1;
|
||||
if (_maxMsgs > 0 && newCount > _maxMsgs)
|
||||
{
|
||||
_replies.Remove(subject);
|
||||
return false;
|
||||
}
|
||||
|
||||
_replies[subject] = (entry.RegisteredAt, newCount);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void Prune()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_expires <= TimeSpan.Zero && _maxMsgs <= 0)
|
||||
return;
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
var toRemove = new List<string>();
|
||||
foreach (var (key, entry) in _replies)
|
||||
{
|
||||
if (_expires > TimeSpan.Zero && now - entry.RegisteredAt > _expires)
|
||||
toRemove.Add(key);
|
||||
else if (_maxMsgs > 0 && entry.Count >= _maxMsgs)
|
||||
toRemove.Add(key);
|
||||
}
|
||||
foreach (var key in toRemove)
|
||||
_replies.Remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -545,6 +545,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
return;
|
||||
|
||||
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
|
||||
|
||||
// Track reply subject for response permissions
|
||||
if (replyTo != null && client.Permissions?.ResponseTracker != null)
|
||||
{
|
||||
if (client.Permissions.IsPublishAllowed(replyTo) == false)
|
||||
client.Permissions.ResponseTracker.RegisterReply(replyTo);
|
||||
}
|
||||
}
|
||||
|
||||
private static void SendNoResponders(NatsClient sender, string replyTo)
|
||||
|
||||
51
tests/NATS.Server.Tests/ResponseTrackerTests.cs
Normal file
51
tests/NATS.Server.Tests/ResponseTrackerTests.cs
Normal file
@@ -0,0 +1,51 @@
|
||||
using NATS.Server.Auth;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ResponseTrackerTests
|
||||
{
|
||||
[Fact]
|
||||
public void Allows_reply_subject_after_registration()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5));
|
||||
tracker.RegisterReply("_INBOX.abc123");
|
||||
tracker.IsReplyAllowed("_INBOX.abc123").ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Denies_unknown_reply_subject()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5));
|
||||
tracker.IsReplyAllowed("_INBOX.unknown").ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Enforces_max_messages()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 2, expires: TimeSpan.FromMinutes(5));
|
||||
tracker.RegisterReply("_INBOX.abc");
|
||||
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue();
|
||||
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue();
|
||||
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse(); // exceeded
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Enforces_expiry()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1));
|
||||
tracker.RegisterReply("_INBOX.abc");
|
||||
Thread.Sleep(50);
|
||||
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Prune_removes_expired()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1));
|
||||
tracker.RegisterReply("_INBOX.a");
|
||||
tracker.RegisterReply("_INBOX.b");
|
||||
Thread.Sleep(50);
|
||||
tracker.Prune();
|
||||
tracker.Count.ShouldBe(0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user