feat: add MaxSubs enforcement, delivery-time deny filtering, auto-unsub cleanup
This commit is contained in:
@@ -45,8 +45,18 @@ public sealed class ClientPermissions : IDisposable
|
|||||||
{
|
{
|
||||||
if (_subscribe == null)
|
if (_subscribe == null)
|
||||||
return true;
|
return true;
|
||||||
|
if (!_subscribe.IsAllowed(subject))
|
||||||
|
return false;
|
||||||
|
if (queue != null && _subscribe.IsDenied(queue))
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
return _subscribe.IsAllowed(subject);
|
public bool IsDeliveryAllowed(string subject)
|
||||||
|
{
|
||||||
|
if (_subscribe == null)
|
||||||
|
return true;
|
||||||
|
return _subscribe.IsDeliveryAllowed(subject);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
@@ -117,6 +127,21 @@ public sealed class PermissionSet : IDisposable
|
|||||||
return allowed;
|
return allowed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public bool IsDenied(string subject)
|
||||||
|
{
|
||||||
|
if (_deny == null) return false;
|
||||||
|
var result = _deny.Match(subject);
|
||||||
|
return result.PlainSubs.Length > 0 || result.QueueSubs.Length > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsDeliveryAllowed(string subject)
|
||||||
|
{
|
||||||
|
if (_deny == null)
|
||||||
|
return true;
|
||||||
|
var result = _deny.Match(subject);
|
||||||
|
return result.PlainSubs.Length == 0 && result.QueueSubs.Length == 0;
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
_allow?.Dispose();
|
_allow?.Dispose();
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ public enum ClientClosedReason
|
|||||||
ServerShutdown,
|
ServerShutdown,
|
||||||
MsgHeaderViolation,
|
MsgHeaderViolation,
|
||||||
NoRespondersRequiresHeaders,
|
NoRespondersRequiresHeaders,
|
||||||
|
AuthenticationExpired,
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class ClientClosedReasonExtensions
|
public static class ClientClosedReasonExtensions
|
||||||
@@ -46,6 +47,7 @@ public static class ClientClosedReasonExtensions
|
|||||||
ClientClosedReason.ServerShutdown => "Server Shutdown",
|
ClientClosedReason.ServerShutdown => "Server Shutdown",
|
||||||
ClientClosedReason.MsgHeaderViolation => "Message Header Violation",
|
ClientClosedReason.MsgHeaderViolation => "Message Header Violation",
|
||||||
ClientClosedReason.NoRespondersRequiresHeaders => "No Responders Requires Headers",
|
ClientClosedReason.NoRespondersRequiresHeaders => "No Responders Requires Headers",
|
||||||
|
ClientClosedReason.AuthenticationExpired => "Authentication Expired",
|
||||||
_ => reason.ToString(),
|
_ => reason.ToString(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -523,7 +523,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void DeliverMessage(Subscription sub, string subject, string? replyTo,
|
private void DeliverMessage(Subscription sub, string subject, string? replyTo,
|
||||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
||||||
{
|
{
|
||||||
var client = sub.Client;
|
var client = sub.Client;
|
||||||
@@ -532,6 +532,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
// Check auto-unsub
|
// Check auto-unsub
|
||||||
var count = Interlocked.Increment(ref sub.MessageCount);
|
var count = Interlocked.Increment(ref sub.MessageCount);
|
||||||
if (sub.MaxMessages > 0 && count > sub.MaxMessages)
|
if (sub.MaxMessages > 0 && count > sub.MaxMessages)
|
||||||
|
{
|
||||||
|
// Clean up exhausted subscription from trie and client tracking
|
||||||
|
var subList = client.Account?.SubList ?? _globalAccount.SubList;
|
||||||
|
subList.Remove(sub);
|
||||||
|
client.RemoveSubscription(sub.Sid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deny-list delivery filter
|
||||||
|
if (client.Permissions?.IsDeliveryAllowed(subject) == false)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
|
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ public static class NatsProtocol
|
|||||||
public const string ErrPermissionsSubscribe = "Permissions Violation for Subscription";
|
public const string ErrPermissionsSubscribe = "Permissions Violation for Subscription";
|
||||||
public const string ErrSlowConsumer = "Slow Consumer";
|
public const string ErrSlowConsumer = "Slow Consumer";
|
||||||
public const string ErrNoRespondersRequiresHeaders = "No Responders Requires Headers Support";
|
public const string ErrNoRespondersRequiresHeaders = "No Responders Requires Headers Support";
|
||||||
|
public const string ErrMaxSubscriptionsExceeded = "Maximum Subscriptions Exceeded";
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed class ServerInfo
|
public sealed class ServerInfo
|
||||||
|
|||||||
@@ -5,10 +5,10 @@ public class ClientClosedReasonTests
|
|||||||
[Fact]
|
[Fact]
|
||||||
public void All_expected_close_reasons_exist()
|
public void All_expected_close_reasons_exist()
|
||||||
{
|
{
|
||||||
// Verify all 17 enum values exist and are distinct (None + 16 named reasons)
|
// Verify all 18 enum values exist and are distinct (None + 17 named reasons)
|
||||||
var values = Enum.GetValues<ClientClosedReason>();
|
var values = Enum.GetValues<ClientClosedReason>();
|
||||||
values.Length.ShouldBe(17);
|
values.Length.ShouldBe(18);
|
||||||
values.Distinct().Count().ShouldBe(17);
|
values.Distinct().Count().ShouldBe(18);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Theory]
|
[Theory]
|
||||||
|
|||||||
Reference in New Issue
Block a user