diff --git a/src/NATS.Server/Gateways/GatewayManager.cs b/src/NATS.Server/Gateways/GatewayManager.cs index 72e48d9..3a7a965 100644 --- a/src/NATS.Server/Gateways/GatewayManager.cs +++ b/src/NATS.Server/Gateways/GatewayManager.cs @@ -7,6 +7,36 @@ using NATS.Server.Subscriptions; namespace NATS.Server.Gateways; +/// +/// Lifecycle states for a registered gateway connection. +/// Go reference: server/gateway.go gwConnState / gwReplyMapping. +/// +public enum GatewayConnectionState +{ + Connecting, + Connected, + Disconnected, + Draining, +} + +/// +/// Tracks the registration and runtime metrics for a single named gateway connection. +/// Go reference: server/gateway.go srvGateway / outboundGateway structs. +/// +public sealed class GatewayRegistration +{ + internal long _messagesSent; + internal long _messagesReceived; + + public required string Name { get; init; } + public GatewayConnectionState State { get; set; } = GatewayConnectionState.Connecting; + public DateTime ConnectedAtUtc { get; set; } + public DateTime? DisconnectedAtUtc { get; set; } + public string? RemoteAddress { get; set; } + public long MessagesSent { get => Interlocked.Read(ref _messagesSent); set => Interlocked.Exchange(ref _messagesSent, value); } + public long MessagesReceived { get => Interlocked.Read(ref _messagesReceived); set => Interlocked.Exchange(ref _messagesReceived, value); } +} + /// /// Controls retry timing for outbound gateway reconnections using exponential backoff with jitter. /// Go reference: server/gateway.go solicitGateway / reconnectGateway delay logic. @@ -44,6 +74,7 @@ public sealed class GatewayManager : IAsyncDisposable private readonly ConcurrentDictionary _connections = new(StringComparer.Ordinal); private readonly HashSet _discoveredGateways = new(StringComparer.OrdinalIgnoreCase); private readonly ConcurrentDictionary _reconnectAttempts = new(StringComparer.OrdinalIgnoreCase); + private readonly ConcurrentDictionary _registrations = new(StringComparer.OrdinalIgnoreCase); private long _forwardedJetStreamClusterMessages; private CancellationTokenSource? _cts; @@ -194,6 +225,85 @@ public sealed class GatewayManager : IAsyncDisposable return conn.GetAccountSubscriptions(account); } + // ── Gateway connection registry (Gap 11.7) ──────────────────────────────── + + /// + /// Registers a new gateway by name, starting in the Connecting state. + /// Go reference: server/gateway.go solicitGateway creates outbound entry before dialling. + /// + public void RegisterGateway(string name, string? remoteAddress = null) + { + var reg = new GatewayRegistration + { + Name = name, + State = GatewayConnectionState.Connecting, + RemoteAddress = remoteAddress, + }; + _registrations[name] = reg; + } + + /// + /// Updates the connection state of a registered gateway. + /// Setting Connected stamps ConnectedAtUtc; setting Disconnected stamps DisconnectedAtUtc. + /// Go reference: server/gateway.go gwConnState transitions. + /// + public void UpdateState(string name, GatewayConnectionState state) + { + if (!_registrations.TryGetValue(name, out var reg)) return; + reg.State = state; + if (state == GatewayConnectionState.Connected) + reg.ConnectedAtUtc = DateTime.UtcNow; + else if (state == GatewayConnectionState.Disconnected) + reg.DisconnectedAtUtc = DateTime.UtcNow; + } + + /// + /// Returns the registration for the named gateway, or null if not registered. + /// Go reference: server/gateway.go server.getOutboundGatewayConnection. + /// + public GatewayRegistration? GetRegistration(string name) + => _registrations.TryGetValue(name, out var reg) ? reg : null; + + /// + /// Returns a snapshot of all current gateway registrations. + /// + public IReadOnlyCollection GetAllRegistrations() + => [.. _registrations.Values]; + + /// + /// Removes the named gateway registration. + /// Go reference: server/gateway.go outboundGateway teardown. + /// + public void UnregisterGateway(string name) + => _registrations.TryRemove(name, out _); + + /// + /// Returns the number of gateways currently in the Connected state. + /// Go reference: server/gateway.go numOutboundGatewayConnections. + /// + public int GetConnectedGatewayCount() + => _registrations.Values.Count(r => r.State == GatewayConnectionState.Connected); + + /// + /// Atomically increments the messages-sent counter for the named gateway. + /// Go reference: server/gateway.go outboundGateway.msgs. + /// + public void IncrementMessagesSent(string name) + { + if (_registrations.TryGetValue(name, out var reg)) + Interlocked.Increment(ref reg._messagesSent); + } + + /// + /// Atomically increments the messages-received counter for the named gateway. + /// Go reference: server/gateway.go inboundGateway.msgs. + /// + public void IncrementMessagesReceived(string name) + { + if (_registrations.TryGetValue(name, out var reg)) + Interlocked.Increment(ref reg._messagesReceived); + } + public async ValueTask DisposeAsync() { if (_cts == null) diff --git a/tests/NATS.Server.Tests/Gateways/GatewayRegistrationTests.cs b/tests/NATS.Server.Tests/Gateways/GatewayRegistrationTests.cs new file mode 100644 index 0000000..f7bc065 --- /dev/null +++ b/tests/NATS.Server.Tests/Gateways/GatewayRegistrationTests.cs @@ -0,0 +1,163 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.Gateways; +using Shouldly; + +namespace NATS.Server.Tests.Gateways; + +/// +/// Tests for gateway connection registration and state tracking (Gap 11.7). +/// Go reference: server/gateway.go srvGateway / outboundGateway struct and gwConnState transitions. +/// +public class GatewayRegistrationTests +{ + // Go: server/gateway.go solicitGateway creates an outbound entry before dialling. + [Fact] + public void RegisterGateway_creates_registration() + { + var manager = BuildManager(); + + manager.RegisterGateway("gw-east"); + + manager.GetRegistration("gw-east").ShouldNotBeNull(); + } + + // Go: server/gateway.go initial gwConnState is gwConnecting before the dial completes. + [Fact] + public void RegisterGateway_starts_in_connecting_state() + { + var manager = BuildManager(); + + manager.RegisterGateway("gw-east"); + var reg = manager.GetRegistration("gw-east")!; + + reg.State.ShouldBe(GatewayConnectionState.Connecting); + } + + // Go: server/gateway.go gwConnState transitions (Connecting → Connected, Connected → Disconnected, etc.). + [Fact] + public void UpdateState_changes_state() + { + var manager = BuildManager(); + manager.RegisterGateway("gw-east"); + + manager.UpdateState("gw-east", GatewayConnectionState.Connected); + + manager.GetRegistration("gw-east")!.State.ShouldBe(GatewayConnectionState.Connected); + } + + // Go: server/gateway.go getOutboundGatewayConnection returns nil for unknown names. + [Fact] + public void GetRegistration_returns_null_for_unknown() + { + var manager = BuildManager(); + + manager.GetRegistration("does-not-exist").ShouldBeNull(); + } + + // Go: server/gateway.go server.gateways map stores all configured outbound gateways. + [Fact] + public void GetAllRegistrations_returns_all() + { + var manager = BuildManager(); + manager.RegisterGateway("gw-east"); + manager.RegisterGateway("gw-west"); + + var all = manager.GetAllRegistrations(); + + all.Count.ShouldBe(2); + all.Select(r => r.Name).ShouldContain("gw-east"); + all.Select(r => r.Name).ShouldContain("gw-west"); + } + + // Go: server/gateway.go outboundGateway teardown removes entry from server.gateways. + [Fact] + public void UnregisterGateway_removes_registration() + { + var manager = BuildManager(); + manager.RegisterGateway("gw-east"); + + manager.UnregisterGateway("gw-east"); + + manager.GetRegistration("gw-east").ShouldBeNull(); + } + + // Go: server/gateway.go numOutboundGatewayConnections counts only fully-connected entries. + [Fact] + public void GetConnectedGatewayCount_counts_connected_only() + { + var manager = BuildManager(); + manager.RegisterGateway("gw-east"); // Connecting + manager.RegisterGateway("gw-west"); // Connecting + manager.RegisterGateway("gw-south"); // → Connected + + manager.UpdateState("gw-south", GatewayConnectionState.Connected); + + manager.GetConnectedGatewayCount().ShouldBe(1); + } + + // Go: server/gateway.go outboundGateway.msgs.outMsgs incremented per forwarded message. + [Fact] + public void IncrementMessagesSent_increments() + { + var manager = BuildManager(); + manager.RegisterGateway("gw-east"); + + manager.IncrementMessagesSent("gw-east"); + manager.IncrementMessagesSent("gw-east"); + manager.IncrementMessagesSent("gw-east"); + + manager.GetRegistration("gw-east")!.MessagesSent.ShouldBe(3L); + } + + // Go: server/gateway.go inboundGateway.msgs.inMsgs incremented per received message. + [Fact] + public void IncrementMessagesReceived_increments() + { + var manager = BuildManager(); + manager.RegisterGateway("gw-east"); + + manager.IncrementMessagesReceived("gw-east"); + manager.IncrementMessagesReceived("gw-east"); + + manager.GetRegistration("gw-east")!.MessagesReceived.ShouldBe(2L); + } + + // Go: server/gateway.go outboundGateway.remoteName / remoteIP stored for monitoring. + [Fact] + public void Registration_stores_remote_address() + { + var manager = BuildManager(); + + manager.RegisterGateway("gw-east", remoteAddress: "10.0.0.1:7222"); + + manager.GetRegistration("gw-east")!.RemoteAddress.ShouldBe("10.0.0.1:7222"); + } + + // Go: server/gateway.go gwConnState Connected transition stamps connected-at time. + [Fact] + public void UpdateState_to_connected_stamps_ConnectedAtUtc() + { + var manager = BuildManager(); + manager.RegisterGateway("gw-east"); + var before = DateTime.UtcNow; + + manager.UpdateState("gw-east", GatewayConnectionState.Connected); + + var after = DateTime.UtcNow; + var stamp = manager.GetRegistration("gw-east")!.ConnectedAtUtc; + stamp.ShouldBeGreaterThanOrEqualTo(before); + stamp.ShouldBeLessThanOrEqualTo(after); + } + + // ── Helpers ────────────────────────────────────────────────────────────── + + private static GatewayManager BuildManager() => + new GatewayManager( + new GatewayOptions { Name = "TEST", Host = "127.0.0.1", Port = 0 }, + new ServerStats(), + "S1", + _ => { }, + _ => { }, + NullLogger.Instance); +}