feat(batch19): implement account latency and import-cycle methods

This commit is contained in:
Joseph Doherty
2026-02-28 19:53:59 -05:00
parent dcf1df44d1
commit 50f6b69fda
3 changed files with 577 additions and 0 deletions

View File

@@ -17,6 +17,7 @@ using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
using System.Text;
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
@@ -1750,6 +1751,486 @@ public sealed class Account : INatsAccount
}
}
/// <summary>
/// Publishes a service-latency metric for an import.
/// Mirrors Go <c>(a *Account) sendLatencyResult(...)</c>.
/// </summary>
internal void SendLatencyResult(ServiceImportEntry si, ServiceLatency sl)
{
sl.Type = AccountEventConstants.ServiceLatencyType;
sl.Id = NextEventId();
sl.Time = DateTime.UtcNow;
string? latencySubject;
_mu.EnterWriteLock();
try
{
latencySubject = si.Latency?.Subject;
si.RequestingClient = null;
}
finally
{
_mu.ExitWriteLock();
}
if (string.IsNullOrWhiteSpace(latencySubject) || Server is not NatsServer server)
return;
var payload = JsonSerializer.SerializeToUtf8Bytes(sl);
_ = server.SendInternalAccountMsg(this, latencySubject, payload);
}
/// <summary>
/// Publishes a bad-request latency metric (missing or invalid request shape).
/// Mirrors Go <c>(a *Account) sendBadRequestTrackingLatency(...)</c>.
/// </summary>
internal void SendBadRequestTrackingLatency(ServiceImportEntry si, ClientConnection requestor, Dictionary<string, string[]>? header)
{
var sl = new ServiceLatency
{
Status = 400,
Error = "Bad Request",
Requestor = CreateClientInfo(requestor, si.Share),
RequestHeader = header,
RequestStart = DateTime.UtcNow.Subtract(requestor.GetRttValue()),
};
SendLatencyResult(si, sl);
}
/// <summary>
/// Publishes timeout latency when requestor interest is lost before response delivery.
/// Mirrors Go <c>(a *Account) sendReplyInterestLostTrackLatency(...)</c>.
/// </summary>
internal void SendReplyInterestLostTrackLatency(ServiceImportEntry si)
{
var sl = new ServiceLatency
{
Status = 408,
Error = "Request Timeout",
};
ClientConnection? requestor;
bool share;
long timestamp;
_mu.EnterReadLock();
try
{
requestor = si.RequestingClient;
share = si.Share;
timestamp = si.Timestamp;
sl.RequestHeader = si.TrackingHeader;
}
finally
{
_mu.ExitReadLock();
}
if (requestor != null)
sl.Requestor = CreateClientInfo(requestor, share);
var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero;
sl.RequestStart = UnixNanoToDateTime(timestamp - TimeSpanToUnixNanos(reqRtt));
SendLatencyResult(si, sl);
}
/// <summary>
/// Publishes backend failure latency for response-service imports.
/// Mirrors Go <c>(a *Account) sendBackendErrorTrackingLatency(...)</c>.
/// </summary>
internal void SendBackendErrorTrackingLatency(ServiceImportEntry si, RsiReason reason)
{
var sl = new ServiceLatency();
ClientConnection? requestor;
bool share;
long timestamp;
_mu.EnterReadLock();
try
{
requestor = si.RequestingClient;
share = si.Share;
timestamp = si.Timestamp;
sl.RequestHeader = si.TrackingHeader;
}
finally
{
_mu.ExitReadLock();
}
if (requestor != null)
sl.Requestor = CreateClientInfo(requestor, share);
var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero;
sl.RequestStart = UnixNanoToDateTime(timestamp - TimeSpanToUnixNanos(reqRtt));
if (reason == RsiReason.NoDelivery)
{
sl.Status = 503;
sl.Error = "Service Unavailable";
}
else if (reason == RsiReason.Timeout)
{
sl.Status = 504;
sl.Error = "Service Timeout";
}
SendLatencyResult(si, sl);
}
/// <summary>
/// Sends request/response latency metrics. Returns true when complete, false when waiting for remote-half merge.
/// Mirrors Go <c>(a *Account) sendTrackingLatency(...)</c>.
/// </summary>
internal bool SendTrackingLatency(ServiceImportEntry si, ClientConnection? responder)
{
_mu.EnterReadLock();
var requestor = si.RequestingClient;
_mu.ExitReadLock();
if (requestor == null)
return true;
var nowUnixNanos = UtcNowUnixNanos();
var serviceRtt = UnixNanosToTimeSpan(Math.Max(0, nowUnixNanos - si.Timestamp));
var sl = new ServiceLatency
{
Status = 200,
Requestor = CreateClientInfo(requestor, si.Share),
Responder = responder == null ? null : CreateClientInfo(responder, true),
RequestHeader = si.TrackingHeader,
};
var respRtt = sl.Responder?.Rtt ?? TimeSpan.Zero;
var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero;
sl.RequestStart = UnixNanoToDateTime(si.Timestamp - TimeSpanToUnixNanos(reqRtt));
sl.ServiceLatencyDuration = serviceRtt > respRtt ? serviceRtt - respRtt : TimeSpan.Zero;
sl.TotalLatency = reqRtt + serviceRtt;
if (respRtt > TimeSpan.Zero)
{
sl.SystemLatency = DateTime.UtcNow - UnixNanoToDateTime(nowUnixNanos);
if (sl.SystemLatency < TimeSpan.Zero)
sl.SystemLatency = TimeSpan.Zero;
sl.TotalLatency += sl.SystemLatency;
}
if (responder != null && responder.Kind != ClientKind.Client)
{
if (si.M1 != null)
{
SendLatencyResult(si, sl);
return true;
}
_mu.EnterWriteLock();
try
{
si.M1 = sl;
}
finally
{
_mu.ExitWriteLock();
}
return false;
}
SendLatencyResult(si, sl);
return true;
}
/// <summary>
/// Returns the lowest response threshold configured across all service exports.
/// Mirrors Go <c>(a *Account) lowestServiceExportResponseTime() time.Duration</c>.
/// </summary>
internal TimeSpan LowestServiceExportResponseTime()
{
var lowest = TimeSpan.FromMinutes(5);
_mu.EnterReadLock();
try
{
if (Exports.Services == null)
return lowest;
foreach (var export in Exports.Services.Values)
{
if (export != null && export.ResponseThreshold < lowest)
lowest = export.ResponseThreshold;
}
return lowest;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Adds a service import with claim authorization context.
/// Mirrors Go <c>(a *Account) AddServiceImportWithClaim(...)</c>.
/// </summary>
public Exception? AddServiceImportWithClaim(Account destination, string from, string to, object? imClaim) =>
AddServiceImportWithClaimInternal(destination, from, to, imClaim, false);
/// <summary>
/// Internal service-import add path with optional authorization bypass.
/// Mirrors Go <c>(a *Account) addServiceImportWithClaim(..., internal bool)</c>.
/// </summary>
internal Exception? AddServiceImportWithClaimInternal(Account destination, string from, string to, object? imClaim, bool internalRequest)
{
if (destination == null)
return ServerErrors.ErrMissingAccount;
if (string.IsNullOrEmpty(to))
to = from;
if (!SubscriptionIndex.IsValidSubject(from) || !SubscriptionIndex.IsValidSubject(to))
return SubscriptionIndex.ErrInvalidSubject;
if (!internalRequest && !destination.CheckServiceExportApproved(this, to, imClaim))
return ServerErrors.ErrServiceImportAuthorization;
var cycleErr = ServiceImportFormsCycle(destination, from);
if (cycleErr != null)
return cycleErr;
var (_, addErr) = AddServiceImportInternal(destination, from, to, imClaim);
return addErr;
}
/// <summary>
/// Checks whether adding a service import forms an account cycle.
/// Mirrors Go <c>(a *Account) serviceImportFormsCycle(...)</c>.
/// </summary>
internal Exception? ServiceImportFormsCycle(Account destination, string from)
{
var visited = new HashSet<string>(StringComparer.Ordinal) { Name };
return destination.CheckServiceImportsForCycles(from, visited);
}
/// <summary>
/// Recursively checks service-import graph for cycles.
/// Mirrors Go <c>(a *Account) checkServiceImportsForCycles(...)</c>.
/// </summary>
internal Exception? CheckServiceImportsForCycles(string from, HashSet<string> visited)
{
if (visited.Count >= AccountConstants.MaxCycleSearchDepth)
return ServerErrors.ErrCycleSearchDepth;
List<ServiceImportEntry>? snapshot = null;
_mu.EnterReadLock();
try
{
if (Imports.Services == null || Imports.Services.Count == 0)
return null;
snapshot = [];
foreach (var entries in Imports.Services.Values)
snapshot.AddRange(entries);
}
finally
{
_mu.ExitReadLock();
}
foreach (var import in snapshot)
{
if (import?.Account == null)
continue;
if (!SubscriptionIndex.SubjectsCollide(from, import.To))
continue;
if (visited.Contains(import.Account.Name))
return ServerErrors.ErrImportFormsCycle;
visited.Add(Name);
var nextFrom = SubscriptionIndex.SubjectIsSubsetMatch(import.From, from) ? import.From : from;
var err = import.Account.CheckServiceImportsForCycles(nextFrom, visited);
if (err != null)
return err;
}
return null;
}
/// <summary>
/// Checks whether adding a stream import forms an account cycle.
/// Mirrors Go <c>(a *Account) streamImportFormsCycle(...)</c>.
/// </summary>
internal Exception? StreamImportFormsCycle(Account destination, string to)
{
var visited = new HashSet<string>(StringComparer.Ordinal) { Name };
return destination.CheckStreamImportsForCycles(to, visited);
}
/// <summary>
/// Returns true when any service export subject can match <paramref name="to"/>.
/// Mirrors Go <c>(a *Account) hasServiceExportMatching(to string) bool</c>.
/// </summary>
internal bool HasServiceExportMatching(string to)
{
if (Exports.Services == null)
return false;
foreach (var subject in Exports.Services.Keys)
{
if (SubscriptionIndex.SubjectIsSubsetMatch(to, subject))
return true;
}
return false;
}
/// <summary>
/// Returns true when any stream export subject can match <paramref name="to"/>.
/// Mirrors Go <c>(a *Account) hasStreamExportMatching(to string) bool</c>.
/// </summary>
internal bool HasStreamExportMatching(string to)
{
if (Exports.Streams == null)
return false;
foreach (var subject in Exports.Streams.Keys)
{
if (SubscriptionIndex.SubjectIsSubsetMatch(to, subject))
return true;
}
return false;
}
/// <summary>
/// Recursively checks stream-import graph for cycles.
/// Mirrors Go <c>(a *Account) checkStreamImportsForCycles(...)</c>.
/// </summary>
internal Exception? CheckStreamImportsForCycles(string to, HashSet<string> visited)
{
if (visited.Count >= AccountConstants.MaxCycleSearchDepth)
return ServerErrors.ErrCycleSearchDepth;
_mu.EnterReadLock();
var hasMatchingExport = HasStreamExportMatching(to);
var streams = Imports.Streams == null ? null : new List<StreamImportEntry>(Imports.Streams);
_mu.ExitReadLock();
if (!hasMatchingExport || streams == null || streams.Count == 0)
return null;
foreach (var stream in streams)
{
if (stream?.Account == null)
continue;
if (!SubscriptionIndex.SubjectsCollide(to, stream.To))
continue;
if (visited.Contains(stream.Account.Name))
return ServerErrors.ErrImportFormsCycle;
visited.Add(Name);
var nextTo = SubscriptionIndex.SubjectIsSubsetMatch(stream.To, to) ? stream.To : to;
var err = stream.Account.CheckStreamImportsForCycles(nextTo, visited);
if (err != null)
return err;
}
return null;
}
/// <summary>
/// Allows or disallows request metadata sharing for a service import.
/// Mirrors Go <c>(a *Account) SetServiceImportSharing(...)</c>.
/// </summary>
public Exception? SetServiceImportSharing(Account destination, string to, bool allow) =>
SetServiceImportSharingInternal(destination, to, true, allow);
/// <summary>
/// Internal service-import sharing setter with optional claim-account check bypass.
/// Mirrors Go <c>(a *Account) setServiceImportSharing(...)</c>.
/// </summary>
internal Exception? SetServiceImportSharingInternal(Account destination, string to, bool check, bool allow)
{
_mu.EnterWriteLock();
try
{
if (check && IsClaimAccount())
return new InvalidOperationException("claim based accounts can not be updated directly");
if (Imports.Services == null)
return new InvalidOperationException("service import not found");
foreach (var imports in Imports.Services.Values)
{
foreach (var import in imports)
{
if (import?.Account?.Name == destination.Name && import.To == to)
{
import.Share = allow;
return null;
}
}
}
return new InvalidOperationException("service import not found");
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Adds a service import from this account to <paramref name="destination"/>.
/// Mirrors Go <c>(a *Account) AddServiceImport(destination, from, to string) error</c>.
/// </summary>
public Exception? AddServiceImport(Account destination, string from, string to) =>
AddServiceImportWithClaim(destination, from, to, null);
/// <summary>
/// Number of pending reverse-response map entries.
/// Mirrors Go <c>(a *Account) NumPendingReverseResponses() int</c>.
/// </summary>
public int NumPendingReverseResponses()
{
_mu.EnterReadLock();
try { return Imports.ReverseResponseMap?.Count ?? 0; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Total number of pending response imports across all service exports.
/// Mirrors Go <c>(a *Account) NumPendingAllResponses() int</c>.
/// </summary>
public int NumPendingAllResponses() => NumPendingResponses(string.Empty);
/// <summary>
/// Number of pending response imports, optionally filtered by exported service subject.
/// Mirrors Go <c>(a *Account) NumPendingResponses(filter string) int</c>.
/// </summary>
public int NumPendingResponses(string filter)
{
_mu.EnterReadLock();
try
{
if (string.IsNullOrEmpty(filter))
return Exports.Responses?.Count ?? 0;
var export = GetServiceExport(filter);
if (export == null || Exports.Responses == null)
return 0;
var count = 0;
foreach (var import in Exports.Responses.Values)
{
if (ReferenceEquals(import.ServiceExport, export))
count++;
}
return count;
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Export checks
// -------------------------------------------------------------------------
@@ -2359,6 +2840,92 @@ public sealed class Account : INatsAccount
return null;
}
/// <summary>
/// Adds a service import entry to the import map.
/// Mirrors Go <c>(a *Account) addServiceImport(...)</c>.
/// </summary>
private (ServiceImportEntry? Import, Exception? Error) AddServiceImportInternal(Account destination, string from, string to, object? claim)
{
_mu.EnterWriteLock();
try
{
Imports.Services ??= new Dictionary<string, List<ServiceImportEntry>>(StringComparer.Ordinal);
var serviceImport = new ServiceImportEntry
{
Account = destination,
Claim = claim,
From = from,
To = to,
};
if (!Imports.Services.TryGetValue(from, out var entries))
{
entries = [];
Imports.Services[from] = entries;
}
entries.Add(serviceImport);
return (serviceImport, null);
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Resolves a service export by exact or wildcard subject match.
/// Mirrors Go <c>(a *Account) getServiceExport(service string) *serviceExport</c>.
/// </summary>
private ServiceExportEntry? GetServiceExport(string service)
{
if (Exports.Services == null)
return null;
if (Exports.Services.TryGetValue(service, out var serviceExport))
return serviceExport;
var tokens = SubjectTransform.TokenizeSubject(service);
foreach (var (subject, export) in Exports.Services)
{
if (SubjectTransform.IsSubsetMatch(tokens, subject))
return export;
}
return null;
}
private static ClientInfo? CreateClientInfo(ClientConnection? client, bool _)
{
if (client == null)
return null;
return new ClientInfo
{
Id = client.Cid,
Account = client.Account?.Name ?? string.Empty,
Name = client.Opts.Name ?? string.Empty,
Rtt = client.GetRttValue(),
Start = client.Start == default ? string.Empty : client.Start.ToUniversalTime().ToString("O"),
Kind = client.Kind.ToString(),
ClientType = client.ClientType().ToString(),
};
}
private static long UtcNowUnixNanos() => TimeSpanToUnixNanos(DateTime.UtcNow - DateTime.UnixEpoch);
private static long TimeSpanToUnixNanos(TimeSpan value) => value.Ticks * 100L;
private static TimeSpan UnixNanosToTimeSpan(long unixNanos) => TimeSpan.FromTicks(unixNanos / 100L);
private static DateTime UnixNanoToDateTime(long unixNanos)
{
if (unixNanos <= 0)
return DateTime.UnixEpoch;
return DateTime.UnixEpoch.AddTicks(unixNanos / 100L);
}
/// <summary>
/// Tokenises a subject string into an array, using the same split logic
/// as <c>btsep</c>-based tokenisation in the Go source.