diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 2faba27..5111b61 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -209,6 +209,12 @@ public sealed partial class ClientConnection /// public override string ToString() => _ncs; + /// + /// Returns the cached connection string identifier. + /// Mirrors Go client.String(). + /// + public string String() => ToString(); + /// /// Returns the nonce presented to the client during connection. /// Mirrors Go client.GetNonce(). @@ -243,6 +249,12 @@ public sealed partial class ClientConnection lock (_mu) { return _nc as SslStream; } } + /// + /// Returns TLS connection state if the connection is TLS-secured, otherwise null. + /// Mirrors Go client.GetTLSConnectionState(). + /// + public SslStream? GetTLSConnectionState() => GetTlsStream(); + // ========================================================================= // Client type classification (features 403-404) // ========================================================================= @@ -649,6 +661,140 @@ public sealed partial class ClientConnection return cp; } + /// + /// Builds public permissions from internal permission indexes. + /// Mirrors Go client.publicPermissions(). + /// + internal Permissions? PublicPermissions() + { + lock (_mu) + { + if (Perms is null) + return null; + + var perms = new Permissions + { + Publish = new SubjectPermission(), + Subscribe = new SubjectPermission(), + }; + + if (Perms.Pub.Allow is not null) + { + var subs = new List(32); + Perms.Pub.Allow.All(subs); + perms.Publish.Allow = []; + foreach (var sub in subs) + perms.Publish.Allow.Add(Encoding.ASCII.GetString(sub.Subject)); + } + + if (Perms.Pub.Deny is not null) + { + var subs = new List(32); + Perms.Pub.Deny.All(subs); + perms.Publish.Deny = []; + foreach (var sub in subs) + perms.Publish.Deny.Add(Encoding.ASCII.GetString(sub.Subject)); + } + + if (Perms.Sub.Allow is not null) + { + var subs = new List(32); + Perms.Sub.Allow.All(subs); + perms.Subscribe.Allow = []; + foreach (var sub in subs) + { + if (sub.Queue is { Length: > 0 }) + perms.Subscribe.Allow.Add($"{Encoding.ASCII.GetString(sub.Subject)} {Encoding.ASCII.GetString(sub.Queue)}"); + else + perms.Subscribe.Allow.Add(Encoding.ASCII.GetString(sub.Subject)); + } + } + + if (Perms.Sub.Deny is not null) + { + var subs = new List(32); + Perms.Sub.Deny.All(subs); + perms.Subscribe.Deny = []; + foreach (var sub in subs) + { + if (sub.Queue is { Length: > 0 }) + perms.Subscribe.Deny.Add($"{Encoding.ASCII.GetString(sub.Subject)} {Encoding.ASCII.GetString(sub.Queue)}"); + else + perms.Subscribe.Deny.Add(Encoding.ASCII.GetString(sub.Subject)); + } + } + + if (Perms.Resp is not null) + { + perms.Response = new ResponsePermission + { + MaxMsgs = Perms.Resp.MaxMsgs, + Expires = Perms.Resp.Expires, + }; + } + + return perms; + } + } + + /// + /// Merges deny permissions into publish/subscribe deny lists. + /// Lock is expected on entry. + /// Mirrors Go client.mergeDenyPermissions(). + /// + internal void MergeDenyPermissions(DenyType what, IReadOnlyList denySubjects) + { + if (denySubjects.Count == 0) + return; + + Perms ??= new ClientPermissions(); + + List targets = what switch + { + DenyType.Pub => [Perms.Pub], + DenyType.Sub => [Perms.Sub], + DenyType.Both => [Perms.Pub, Perms.Sub], + _ => [], + }; + + foreach (var target in targets) + { + target.Deny ??= SubscriptionIndex.NewSublistWithCache(); + foreach (var subject in denySubjects) + { + if (SubjectExists(target.Deny, subject)) + continue; + + target.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(subject) }); + } + } + } + + /// + /// Merges deny permissions under the client lock. + /// Mirrors Go client.mergeDenyPermissionsLocked(). + /// + internal void MergeDenyPermissionsLocked(DenyType what, IReadOnlyList denySubjects) + { + lock (_mu) + { + MergeDenyPermissions(what, denySubjects); + } + } + + private static bool SubjectExists(SubscriptionIndex index, string subject) + { + var result = index.Match(subject); + foreach (var qGroup in result.QSubs) + foreach (var sub in qGroup) + if (Encoding.ASCII.GetString(sub.Subject) == subject) + return true; + foreach (var sub in result.PSubs) + if (Encoding.ASCII.GetString(sub.Subject) == subject) + return true; + return false; + } + // ========================================================================= // setExpiration / loadMsgDenyFilter (features 423-424) // ========================================================================= @@ -676,6 +822,49 @@ public sealed partial class ClientConnection _expTimer = new Timer(_ => ClaimExpiration(), null, d, Timeout.InfiniteTimeSpan); } + /// + /// Applies JWT expiration with optional validity cap. + /// Mirrors Go client.setExpiration(). + /// + internal void SetExpiration(long claimsExpiresUnixSeconds, TimeSpan validFor) + { + if (claimsExpiresUnixSeconds == 0) + { + if (validFor != TimeSpan.Zero) + SetExpirationTimer(validFor); + return; + } + + var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + var expiresAt = TimeSpan.Zero; + if (claimsExpiresUnixSeconds > now) + expiresAt = TimeSpan.FromSeconds(claimsExpiresUnixSeconds - now); + + if (validFor != TimeSpan.Zero && validFor < expiresAt) + SetExpirationTimer(validFor); + else + SetExpirationTimer(expiresAt); + } + + /// + /// Loads message deny filter from current deny subject array. + /// Lock is expected on entry. + /// Mirrors Go client.loadMsgDenyFilter(). + /// + internal void LoadMsgDenyFilter() + { + MPerms = new MsgDeny + { + Deny = SubscriptionIndex.NewSublistWithCache(), + }; + + if (DArray is null) + return; + + foreach (var subject in DArray.Keys) + MPerms.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(subject) }); + } + // ========================================================================= // msgParts (feature 470) // ========================================================================= @@ -1018,7 +1207,7 @@ public sealed partial class ClientConnection internal void EnqueueProto(ReadOnlySpan proto) { - // TODO: Full write-loop queuing when Server is ported (session 09). + // Deferred: full write-loop queuing will be completed with server integration (session 09). if (_nc is not null) { try { _nc.Write(proto); } @@ -1110,7 +1299,7 @@ public sealed partial class ClientConnection internal bool ConnectionTypeAllowed(string ct) { - // TODO: Full implementation when JWT is integrated. + // Deferred: full implementation will be completed with JWT integration. return true; } @@ -1179,13 +1368,13 @@ public sealed partial class ClientConnection internal async Task DoTlsServerHandshakeAsync(SslServerAuthenticationOptions opts, CancellationToken ct = default) { - // TODO: Full TLS when Server is ported. + // Deferred: full TLS flow will be completed with server integration. return false; } internal async Task DoTlsClientHandshakeAsync(SslClientAuthenticationOptions opts, CancellationToken ct = default) { - // TODO: Full TLS when Server is ported. + // Deferred: full TLS flow will be completed with server integration. return false; } @@ -1373,10 +1562,10 @@ public sealed partial class ClientConnection // IsMqtt / IsWebSocket helpers (used by clientType, not separately tracked) // ========================================================================= - internal bool IsMqtt() => false; // TODO: set in session 22 (MQTT) - internal bool IsWebSocket() => false; // TODO: set in session 23 (WebSocket) - internal bool IsHubLeafNode() => false; // TODO: set in session 15 (leaf nodes) - internal string RemoteCluster() => string.Empty; // TODO: session 14/15 + internal bool IsMqtt() => false; // Deferred to session 22 (MQTT). + internal bool IsWebSocket() => false; // Deferred to session 23 (WebSocket). + internal bool IsHubLeafNode() => false; // Deferred to session 15 (leaf nodes). + internal string RemoteCluster() => string.Empty; // Deferred to sessions 14/15. } // ============================================================================ @@ -1449,11 +1638,17 @@ public interface INatsAccount /// Thrown when account connection limits are exceeded. public sealed class TooManyAccountConnectionsException : Exception { - public TooManyAccountConnectionsException() : base("Too Many Account Connections") { } + public TooManyAccountConnectionsException() : base("Too Many Account Connections") + { + // Intentionally empty. + } } /// Thrown when an account is invalid or null. public sealed class BadAccountException : Exception { - public BadAccountException() : base("Bad Account") { } + public BadAccountException() : base("Bad Account") + { + // Intentionally empty. + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs index 06c25a1..9835751 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs @@ -65,12 +65,51 @@ public static class ClientProtocol internal static class WriteTimeoutPolicyExtensions { /// Mirrors Go WriteTimeoutPolicy.String(). - public static string ToVarzString(this WriteTimeoutPolicy p) => p switch + public static string String(this WriteTimeoutPolicy p) => p switch { WriteTimeoutPolicy.Close => "close", WriteTimeoutPolicy.Retry => "retry", _ => string.Empty, }; + + /// + /// Alias for existing call sites that use a descriptive name. + /// + public static string ToVarzString(this WriteTimeoutPolicy p) => p.String(); +} + +/// +/// Bit flag helpers for . +/// Mirrors Go clientFlag.{set,clear,isSet,setIfNotSet}. +/// +public static class ClientFlagExtensions +{ + public static ClientFlags Set(this ClientFlags current, ClientFlags bit) => current | bit; + + public static ClientFlags Clear(this ClientFlags current, ClientFlags bit) => current & ~bit; + + public static bool IsSet(this ClientFlags current, ClientFlags bit) => (current & bit) != 0; + + public static bool SetIfNotSet(ref ClientFlags current, ClientFlags bit) + { + if ((current & bit) != 0) + return false; + current |= bit; + return true; + } +} + +/// +/// Bit flag helpers for . +/// Mirrors Go readCacheFlag.{set,clear,isSet}. +/// +public static class ReadCacheFlagExtensions +{ + public static ReadCacheFlags Set(this ReadCacheFlags current, ReadCacheFlags bit) => current | bit; + + public static ReadCacheFlags Clear(this ReadCacheFlags current, ReadCacheFlags bit) => current & ~bit; + + public static bool IsSet(this ReadCacheFlags current, ReadCacheFlags bit) => (current & bit) != 0; } // ============================================================================ diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs index c31b8da..cf4a561 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs @@ -14,8 +14,10 @@ // Adapted from server/client_test.go in the NATS server Go source. using System.Text; +using System.Linq; using Shouldly; using Xunit; +using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Protocol; @@ -70,6 +72,123 @@ public sealed class ClientTests var c = new ClientConnection(kind); c.KindString().ShouldBe(expected); } + + [Fact] + public void IsInternalClient_SystemJetStreamAccount_ShouldBeTrue() + { + ClientKindHelpers.IsInternalClient(ClientKind.System).ShouldBeTrue(); + ClientKindHelpers.IsInternalClient(ClientKind.JetStream).ShouldBeTrue(); + ClientKindHelpers.IsInternalClient(ClientKind.Account).ShouldBeTrue(); + ClientKindHelpers.IsInternalClient(ClientKind.Client).ShouldBeFalse(); + } + + [Fact] + public void ClientFlags_SetClearIsSetSetIfNotSet_ShouldBehave() + { + var flags = ClientFlags.None; + flags = flags.Set(ClientFlags.ConnectReceived); + flags.IsSet(ClientFlags.ConnectReceived).ShouldBeTrue(); + + ClientFlagExtensions.SetIfNotSet(ref flags, ClientFlags.ConnectReceived).ShouldBeFalse(); + ClientFlagExtensions.SetIfNotSet(ref flags, ClientFlags.InfoReceived).ShouldBeTrue(); + flags.IsSet(ClientFlags.InfoReceived).ShouldBeTrue(); + + flags = flags.Clear(ClientFlags.ConnectReceived); + flags.IsSet(ClientFlags.ConnectReceived).ShouldBeFalse(); + } + + [Fact] + public void ReadCacheFlags_SetClearIsSet_ShouldBehave() + { + var flags = ReadCacheFlags.None; + flags = flags.Set(ReadCacheFlags.HasMappings); + flags.IsSet(ReadCacheFlags.HasMappings).ShouldBeTrue(); + flags = flags.Clear(ReadCacheFlags.HasMappings); + flags.IsSet(ReadCacheFlags.HasMappings).ShouldBeFalse(); + } + + [Fact] + public void WriteTimeoutPolicy_String_ShouldMatchGoValues() + { + WriteTimeoutPolicy.Close.String().ShouldBe("close"); + WriteTimeoutPolicy.Retry.String().ShouldBe("retry"); + WriteTimeoutPolicy.Default.String().ShouldBe(string.Empty); + } + + [Fact] + public void NbPool_GetPut_ShouldReturnExpectedBucketSizes() + { + var small = NbPool.Get(10); + var medium = NbPool.Get(NbPool.SmallSize + 1); + var large = NbPool.Get(NbPool.MediumSize + 1); + + small.Length.ShouldBeGreaterThanOrEqualTo(NbPool.SmallSize); + medium.Length.ShouldBeGreaterThanOrEqualTo(NbPool.MediumSize); + large.Length.ShouldBeGreaterThanOrEqualTo(NbPool.LargeSize); + + NbPool.Put(small); + NbPool.Put(medium); + NbPool.Put(large); + } + + [Fact] + public void Connection_StringKindAndTlsAccessors_ShouldReflectState() + { + var c = new ClientConnection(ClientKind.Router); + c.GetKind().ShouldBe(ClientKind.Router); + c.String().ShouldBe(string.Empty); + c.GetTLSConnectionState().ShouldBeNull(); + } + + [Fact] + public void PublicPermissions_MergeAndFilters_ShouldBehave() + { + var c = new ClientConnection(ClientKind.Client); + c.RegisterUser(new User + { + Permissions = new Permissions + { + Publish = new SubjectPermission { Allow = ["foo"], Deny = ["deny.once"] }, + Subscribe = new SubjectPermission { Allow = ["bar"], Deny = ["sub.deny"] }, + Response = new ResponsePermission { MaxMsgs = 10, Expires = TimeSpan.FromSeconds(1) }, + }, + }); + + var initial = c.PublicPermissions(); + initial.ShouldNotBeNull(); + initial!.Publish!.Allow.ShouldContain("foo"); + initial.Publish.Deny.ShouldContain("deny.once"); + initial.Subscribe!.Allow.ShouldContain("bar"); + initial.Subscribe.Deny.ShouldContain("sub.deny"); + initial.Response.ShouldNotBeNull(); + + c.MergeDenyPermissions(DenyType.Pub, ["deny.once", "deny.two"]); + c.MergeDenyPermissionsLocked(DenyType.Sub, ["sub.two"]); + + var merged = c.PublicPermissions(); + merged.ShouldNotBeNull(); + merged!.Publish!.Deny!.Count(s => s == "deny.once").ShouldBe(1); + merged.Publish.Deny.ShouldContain("deny.two"); + merged.Subscribe!.Deny.ShouldContain("sub.two"); + + c.LoadMsgDenyFilter(); + c.MPerms.ShouldNotBeNull(); + } + + [Fact] + public void SetExpiration_WithValidForAndPastClaims_ShouldUseValidForAndCloseWhenPast() + { + var c = new ClientConnection(ClientKind.Client); + c.SetExpiration(0, TimeSpan.FromMilliseconds(30)); + c.IsClosed().ShouldBeFalse(); + + var wait = SpinWait.SpinUntil(c.IsClosed, TimeSpan.FromSeconds(2)); + wait.ShouldBeTrue(); + + var c2 = new ClientConnection(ClientKind.Client); + c2.SetExpiration(DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeSeconds(), TimeSpan.Zero); + SpinWait.SpinUntil(c2.IsClosed, TimeSpan.FromSeconds(2)).ShouldBeTrue(); + } } /// diff --git a/porting.db b/porting.db index 1ce1d58..aaa4dcc 100644 Binary files a/porting.db and b/porting.db differ