diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index fa0308b..3fe30ed 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -17,6 +17,7 @@ using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; using System.Text; +using System.Text.Json; namespace ZB.MOM.NatsNet.Server; @@ -1750,6 +1751,486 @@ public sealed class Account : INatsAccount } } + /// + /// Publishes a service-latency metric for an import. + /// Mirrors Go (a *Account) sendLatencyResult(...). + /// + internal void SendLatencyResult(ServiceImportEntry si, ServiceLatency sl) + { + sl.Type = AccountEventConstants.ServiceLatencyType; + sl.Id = NextEventId(); + sl.Time = DateTime.UtcNow; + + string? latencySubject; + _mu.EnterWriteLock(); + try + { + latencySubject = si.Latency?.Subject; + si.RequestingClient = null; + } + finally + { + _mu.ExitWriteLock(); + } + + if (string.IsNullOrWhiteSpace(latencySubject) || Server is not NatsServer server) + return; + + var payload = JsonSerializer.SerializeToUtf8Bytes(sl); + _ = server.SendInternalAccountMsg(this, latencySubject, payload); + } + + /// + /// Publishes a bad-request latency metric (missing or invalid request shape). + /// Mirrors Go (a *Account) sendBadRequestTrackingLatency(...). + /// + internal void SendBadRequestTrackingLatency(ServiceImportEntry si, ClientConnection requestor, Dictionary? header) + { + var sl = new ServiceLatency + { + Status = 400, + Error = "Bad Request", + Requestor = CreateClientInfo(requestor, si.Share), + RequestHeader = header, + RequestStart = DateTime.UtcNow.Subtract(requestor.GetRttValue()), + }; + SendLatencyResult(si, sl); + } + + /// + /// Publishes timeout latency when requestor interest is lost before response delivery. + /// Mirrors Go (a *Account) sendReplyInterestLostTrackLatency(...). + /// + internal void SendReplyInterestLostTrackLatency(ServiceImportEntry si) + { + var sl = new ServiceLatency + { + Status = 408, + Error = "Request Timeout", + }; + + ClientConnection? requestor; + bool share; + long timestamp; + _mu.EnterReadLock(); + try + { + requestor = si.RequestingClient; + share = si.Share; + timestamp = si.Timestamp; + sl.RequestHeader = si.TrackingHeader; + } + finally + { + _mu.ExitReadLock(); + } + + if (requestor != null) + sl.Requestor = CreateClientInfo(requestor, share); + + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(timestamp - TimeSpanToUnixNanos(reqRtt)); + SendLatencyResult(si, sl); + } + + /// + /// Publishes backend failure latency for response-service imports. + /// Mirrors Go (a *Account) sendBackendErrorTrackingLatency(...). + /// + internal void SendBackendErrorTrackingLatency(ServiceImportEntry si, RsiReason reason) + { + var sl = new ServiceLatency(); + + ClientConnection? requestor; + bool share; + long timestamp; + _mu.EnterReadLock(); + try + { + requestor = si.RequestingClient; + share = si.Share; + timestamp = si.Timestamp; + sl.RequestHeader = si.TrackingHeader; + } + finally + { + _mu.ExitReadLock(); + } + + if (requestor != null) + sl.Requestor = CreateClientInfo(requestor, share); + + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(timestamp - TimeSpanToUnixNanos(reqRtt)); + + if (reason == RsiReason.NoDelivery) + { + sl.Status = 503; + sl.Error = "Service Unavailable"; + } + else if (reason == RsiReason.Timeout) + { + sl.Status = 504; + sl.Error = "Service Timeout"; + } + + SendLatencyResult(si, sl); + } + + /// + /// Sends request/response latency metrics. Returns true when complete, false when waiting for remote-half merge. + /// Mirrors Go (a *Account) sendTrackingLatency(...). + /// + internal bool SendTrackingLatency(ServiceImportEntry si, ClientConnection? responder) + { + _mu.EnterReadLock(); + var requestor = si.RequestingClient; + _mu.ExitReadLock(); + + if (requestor == null) + return true; + + var nowUnixNanos = UtcNowUnixNanos(); + var serviceRtt = UnixNanosToTimeSpan(Math.Max(0, nowUnixNanos - si.Timestamp)); + var sl = new ServiceLatency + { + Status = 200, + Requestor = CreateClientInfo(requestor, si.Share), + Responder = responder == null ? null : CreateClientInfo(responder, true), + RequestHeader = si.TrackingHeader, + }; + + var respRtt = sl.Responder?.Rtt ?? TimeSpan.Zero; + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(si.Timestamp - TimeSpanToUnixNanos(reqRtt)); + sl.ServiceLatencyDuration = serviceRtt > respRtt ? serviceRtt - respRtt : TimeSpan.Zero; + sl.TotalLatency = reqRtt + serviceRtt; + if (respRtt > TimeSpan.Zero) + { + sl.SystemLatency = DateTime.UtcNow - UnixNanoToDateTime(nowUnixNanos); + if (sl.SystemLatency < TimeSpan.Zero) + sl.SystemLatency = TimeSpan.Zero; + sl.TotalLatency += sl.SystemLatency; + } + + if (responder != null && responder.Kind != ClientKind.Client) + { + if (si.M1 != null) + { + SendLatencyResult(si, sl); + return true; + } + + _mu.EnterWriteLock(); + try + { + si.M1 = sl; + } + finally + { + _mu.ExitWriteLock(); + } + return false; + } + + SendLatencyResult(si, sl); + return true; + } + + /// + /// Returns the lowest response threshold configured across all service exports. + /// Mirrors Go (a *Account) lowestServiceExportResponseTime() time.Duration. + /// + internal TimeSpan LowestServiceExportResponseTime() + { + var lowest = TimeSpan.FromMinutes(5); + + _mu.EnterReadLock(); + try + { + if (Exports.Services == null) + return lowest; + + foreach (var export in Exports.Services.Values) + { + if (export != null && export.ResponseThreshold < lowest) + lowest = export.ResponseThreshold; + } + + return lowest; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Adds a service import with claim authorization context. + /// Mirrors Go (a *Account) AddServiceImportWithClaim(...). + /// + public Exception? AddServiceImportWithClaim(Account destination, string from, string to, object? imClaim) => + AddServiceImportWithClaimInternal(destination, from, to, imClaim, false); + + /// + /// Internal service-import add path with optional authorization bypass. + /// Mirrors Go (a *Account) addServiceImportWithClaim(..., internal bool). + /// + internal Exception? AddServiceImportWithClaimInternal(Account destination, string from, string to, object? imClaim, bool internalRequest) + { + if (destination == null) + return ServerErrors.ErrMissingAccount; + + if (string.IsNullOrEmpty(to)) + to = from; + if (!SubscriptionIndex.IsValidSubject(from) || !SubscriptionIndex.IsValidSubject(to)) + return SubscriptionIndex.ErrInvalidSubject; + + if (!internalRequest && !destination.CheckServiceExportApproved(this, to, imClaim)) + return ServerErrors.ErrServiceImportAuthorization; + + var cycleErr = ServiceImportFormsCycle(destination, from); + if (cycleErr != null) + return cycleErr; + + var (_, addErr) = AddServiceImportInternal(destination, from, to, imClaim); + return addErr; + } + + /// + /// Checks whether adding a service import forms an account cycle. + /// Mirrors Go (a *Account) serviceImportFormsCycle(...). + /// + internal Exception? ServiceImportFormsCycle(Account destination, string from) + { + var visited = new HashSet(StringComparer.Ordinal) { Name }; + return destination.CheckServiceImportsForCycles(from, visited); + } + + /// + /// Recursively checks service-import graph for cycles. + /// Mirrors Go (a *Account) checkServiceImportsForCycles(...). + /// + internal Exception? CheckServiceImportsForCycles(string from, HashSet visited) + { + if (visited.Count >= AccountConstants.MaxCycleSearchDepth) + return ServerErrors.ErrCycleSearchDepth; + + List? snapshot = null; + _mu.EnterReadLock(); + try + { + if (Imports.Services == null || Imports.Services.Count == 0) + return null; + + snapshot = []; + foreach (var entries in Imports.Services.Values) + snapshot.AddRange(entries); + } + finally + { + _mu.ExitReadLock(); + } + + foreach (var import in snapshot) + { + if (import?.Account == null) + continue; + if (!SubscriptionIndex.SubjectsCollide(from, import.To)) + continue; + + if (visited.Contains(import.Account.Name)) + return ServerErrors.ErrImportFormsCycle; + + visited.Add(Name); + var nextFrom = SubscriptionIndex.SubjectIsSubsetMatch(import.From, from) ? import.From : from; + var err = import.Account.CheckServiceImportsForCycles(nextFrom, visited); + if (err != null) + return err; + } + + return null; + } + + /// + /// Checks whether adding a stream import forms an account cycle. + /// Mirrors Go (a *Account) streamImportFormsCycle(...). + /// + internal Exception? StreamImportFormsCycle(Account destination, string to) + { + var visited = new HashSet(StringComparer.Ordinal) { Name }; + return destination.CheckStreamImportsForCycles(to, visited); + } + + /// + /// Returns true when any service export subject can match . + /// Mirrors Go (a *Account) hasServiceExportMatching(to string) bool. + /// + internal bool HasServiceExportMatching(string to) + { + if (Exports.Services == null) + return false; + + foreach (var subject in Exports.Services.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(to, subject)) + return true; + } + + return false; + } + + /// + /// Returns true when any stream export subject can match . + /// Mirrors Go (a *Account) hasStreamExportMatching(to string) bool. + /// + internal bool HasStreamExportMatching(string to) + { + if (Exports.Streams == null) + return false; + + foreach (var subject in Exports.Streams.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(to, subject)) + return true; + } + + return false; + } + + /// + /// Recursively checks stream-import graph for cycles. + /// Mirrors Go (a *Account) checkStreamImportsForCycles(...). + /// + internal Exception? CheckStreamImportsForCycles(string to, HashSet visited) + { + if (visited.Count >= AccountConstants.MaxCycleSearchDepth) + return ServerErrors.ErrCycleSearchDepth; + + _mu.EnterReadLock(); + var hasMatchingExport = HasStreamExportMatching(to); + var streams = Imports.Streams == null ? null : new List(Imports.Streams); + _mu.ExitReadLock(); + + if (!hasMatchingExport || streams == null || streams.Count == 0) + return null; + + foreach (var stream in streams) + { + if (stream?.Account == null) + continue; + if (!SubscriptionIndex.SubjectsCollide(to, stream.To)) + continue; + + if (visited.Contains(stream.Account.Name)) + return ServerErrors.ErrImportFormsCycle; + + visited.Add(Name); + var nextTo = SubscriptionIndex.SubjectIsSubsetMatch(stream.To, to) ? stream.To : to; + var err = stream.Account.CheckStreamImportsForCycles(nextTo, visited); + if (err != null) + return err; + } + + return null; + } + + /// + /// Allows or disallows request metadata sharing for a service import. + /// Mirrors Go (a *Account) SetServiceImportSharing(...). + /// + public Exception? SetServiceImportSharing(Account destination, string to, bool allow) => + SetServiceImportSharingInternal(destination, to, true, allow); + + /// + /// Internal service-import sharing setter with optional claim-account check bypass. + /// Mirrors Go (a *Account) setServiceImportSharing(...). + /// + internal Exception? SetServiceImportSharingInternal(Account destination, string to, bool check, bool allow) + { + _mu.EnterWriteLock(); + try + { + if (check && IsClaimAccount()) + return new InvalidOperationException("claim based accounts can not be updated directly"); + + if (Imports.Services == null) + return new InvalidOperationException("service import not found"); + + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name == destination.Name && import.To == to) + { + import.Share = allow; + return null; + } + } + } + + return new InvalidOperationException("service import not found"); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Adds a service import from this account to . + /// Mirrors Go (a *Account) AddServiceImport(destination, from, to string) error. + /// + public Exception? AddServiceImport(Account destination, string from, string to) => + AddServiceImportWithClaim(destination, from, to, null); + + /// + /// Number of pending reverse-response map entries. + /// Mirrors Go (a *Account) NumPendingReverseResponses() int. + /// + public int NumPendingReverseResponses() + { + _mu.EnterReadLock(); + try { return Imports.ReverseResponseMap?.Count ?? 0; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Total number of pending response imports across all service exports. + /// Mirrors Go (a *Account) NumPendingAllResponses() int. + /// + public int NumPendingAllResponses() => NumPendingResponses(string.Empty); + + /// + /// Number of pending response imports, optionally filtered by exported service subject. + /// Mirrors Go (a *Account) NumPendingResponses(filter string) int. + /// + public int NumPendingResponses(string filter) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrEmpty(filter)) + return Exports.Responses?.Count ?? 0; + + var export = GetServiceExport(filter); + if (export == null || Exports.Responses == null) + return 0; + + var count = 0; + foreach (var import in Exports.Responses.Values) + { + if (ReferenceEquals(import.ServiceExport, export)) + count++; + } + return count; + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- @@ -2359,6 +2840,92 @@ public sealed class Account : INatsAccount return null; } + /// + /// Adds a service import entry to the import map. + /// Mirrors Go (a *Account) addServiceImport(...). + /// + private (ServiceImportEntry? Import, Exception? Error) AddServiceImportInternal(Account destination, string from, string to, object? claim) + { + _mu.EnterWriteLock(); + try + { + Imports.Services ??= new Dictionary>(StringComparer.Ordinal); + + var serviceImport = new ServiceImportEntry + { + Account = destination, + Claim = claim, + From = from, + To = to, + }; + + if (!Imports.Services.TryGetValue(from, out var entries)) + { + entries = []; + Imports.Services[from] = entries; + } + + entries.Add(serviceImport); + return (serviceImport, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Resolves a service export by exact or wildcard subject match. + /// Mirrors Go (a *Account) getServiceExport(service string) *serviceExport. + /// + private ServiceExportEntry? GetServiceExport(string service) + { + if (Exports.Services == null) + return null; + + if (Exports.Services.TryGetValue(service, out var serviceExport)) + return serviceExport; + + var tokens = SubjectTransform.TokenizeSubject(service); + foreach (var (subject, export) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subject)) + return export; + } + + return null; + } + + private static ClientInfo? CreateClientInfo(ClientConnection? client, bool _) + { + if (client == null) + return null; + + return new ClientInfo + { + Id = client.Cid, + Account = client.Account?.Name ?? string.Empty, + Name = client.Opts.Name ?? string.Empty, + Rtt = client.GetRttValue(), + Start = client.Start == default ? string.Empty : client.Start.ToUniversalTime().ToString("O"), + Kind = client.Kind.ToString(), + ClientType = client.ClientType().ToString(), + }; + } + + private static long UtcNowUnixNanos() => TimeSpanToUnixNanos(DateTime.UtcNow - DateTime.UnixEpoch); + + private static long TimeSpanToUnixNanos(TimeSpan value) => value.Ticks * 100L; + + private static TimeSpan UnixNanosToTimeSpan(long unixNanos) => TimeSpan.FromTicks(unixNanos / 100L); + + private static DateTime UnixNanoToDateTime(long unixNanos) + { + if (unixNanos <= 0) + return DateTime.UnixEpoch; + return DateTime.UnixEpoch.AddTicks(unixNanos / 100L); + } + /// /// Tokenises a subject string into an array, using the same split logic /// as btsep-based tokenisation in the Go source. diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs index 216fc86..1bb3634 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs @@ -45,6 +45,11 @@ public sealed class AccessTimeServiceTests : IDisposable // Background timer should update the time. await Task.Delay(AccessTimeService.TickInterval * 3); var atn = AccessTimeService.AccessTime(); + if (atn <= at) + { + await Task.Delay(AccessTimeService.TickInterval); + atn = AccessTimeService.AccessTime(); + } atn.ShouldBeGreaterThan(at); // Unregister; timer should stop. @@ -63,6 +68,11 @@ public sealed class AccessTimeServiceTests : IDisposable at = AccessTimeService.AccessTime(); await Task.Delay(AccessTimeService.TickInterval * 3); atn = AccessTimeService.AccessTime(); + if (atn <= at) + { + await Task.Delay(AccessTimeService.TickInterval); + atn = AccessTimeService.AccessTime(); + } atn.ShouldBeGreaterThan(at); } finally diff --git a/porting.db b/porting.db index 244b04e..6ec2d78 100644 Binary files a/porting.db and b/porting.db differ