Fix runtime review findings

This commit is contained in:
Joseph Doherty
2026-04-29 10:39:49 -04:00
parent 133c83029b
commit d543679044
69 changed files with 2233 additions and 409 deletions
@@ -1,3 +1,5 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveProtocolConfiguration(uint WorkerProtocolVersion);
public sealed record EffectiveProtocolConfiguration(
uint WorkerProtocolVersion,
int MaxGrpcMessageBytes);
@@ -3,4 +3,7 @@ namespace MxGateway.Server.Configuration;
public sealed record EffectiveSessionConfiguration(
int DefaultCommandTimeoutSeconds,
int MaxSessions,
int MaxPendingCommandsPerSession,
int DefaultLeaseSeconds,
int LeaseSweepIntervalSeconds,
bool AllowMultipleEventSubscribers);
@@ -28,6 +28,9 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
Sessions: new EffectiveSessionConfiguration(
DefaultCommandTimeoutSeconds: value.Sessions.DefaultCommandTimeoutSeconds,
MaxSessions: value.Sessions.MaxSessions,
MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession,
DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds,
LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds,
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers),
Events: new EffectiveEventConfiguration(
QueueCapacity: value.Events.QueueCapacity,
@@ -41,6 +44,8 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
RecentFaultLimit: value.Dashboard.RecentFaultLimit,
RecentSessionLimit: value.Dashboard.RecentSessionLimit,
ShowTagValues: value.Dashboard.ShowTagValues),
Protocol: new EffectiveProtocolConfiguration(value.Protocol.WorkerProtocolVersion));
Protocol: new EffectiveProtocolConfiguration(
value.Protocol.WorkerProtocolVersion,
value.Protocol.MaxGrpcMessageBytes));
}
}
@@ -129,6 +129,14 @@ public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
options.MaxPendingCommandsPerSession,
"MxGateway:Sessions:MaxPendingCommandsPerSession must be greater than zero.",
failures);
AddIfNotPositive(
options.DefaultLeaseSeconds,
"MxGateway:Sessions:DefaultLeaseSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.LeaseSweepIntervalSeconds,
"MxGateway:Sessions:LeaseSweepIntervalSeconds must be greater than zero.",
failures);
if (options.AllowMultipleEventSubscribers)
{
@@ -179,6 +187,12 @@ public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
failures.Add(
$"MxGateway:Protocol:WorkerProtocolVersion must be {GatewayContractInfo.WorkerProtocolVersion}.");
}
if (options.MaxGrpcMessageBytes is < MinimumMaxMessageBytes or > MaximumMaxMessageBytes)
{
failures.Add(
$"MxGateway:Protocol:MaxGrpcMessageBytes must be between {MinimumMaxMessageBytes} and {MaximumMaxMessageBytes}.");
}
}
private static void AddIfBlank(string? value, string message, List<string> failures)
@@ -5,4 +5,6 @@ namespace MxGateway.Server.Configuration;
public sealed class ProtocolOptions
{
public uint WorkerProtocolVersion { get; init; } = GatewayContractInfo.WorkerProtocolVersion;
public int MaxGrpcMessageBytes { get; init; } = 16 * 1024 * 1024;
}
@@ -8,5 +8,9 @@ public sealed class SessionOptions
public int MaxPendingCommandsPerSession { get; init; } = 128;
public int DefaultLeaseSeconds { get; init; } = 1800;
public int LeaseSweepIntervalSeconds { get; init; } = 30;
public bool AllowMultipleEventSubscribers { get; init; }
}
@@ -190,6 +190,8 @@ else
private int CommandTimeoutSeconds() => GalaxyOptions.Value.CommandTimeoutSeconds;
private string? GalaxyConnectionStringDisplay() =>
DashboardRedactor.Redact(GalaxyOptions.Value.ConnectionString);
private string GalaxyConnectionStringDisplay()
{
return DashboardConnectionStringDisplay.GalaxyRepositoryConnectionString(GalaxyOptions.Value.ConnectionString);
}
}
@@ -0,0 +1,28 @@
using Microsoft.Data.SqlClient;
namespace MxGateway.Server.Dashboard;
public static class DashboardConnectionStringDisplay
{
public static string GalaxyRepositoryConnectionString(string connectionString)
{
try
{
SqlConnectionStringBuilder builder = new(connectionString);
SqlConnectionStringBuilder display = new()
{
DataSource = builder.DataSource,
InitialCatalog = builder.InitialCatalog,
IntegratedSecurity = builder.IntegratedSecurity,
Encrypt = builder.Encrypt,
TrustServerCertificate = builder.TrustServerCertificate,
};
return display.ConnectionString;
}
catch (ArgumentException)
{
return "[invalid connection string]";
}
}
}
@@ -2,97 +2,11 @@ using MxGateway.Server.Galaxy;
namespace MxGateway.Server.Dashboard;
/// <summary>
/// Projects a <see cref="GalaxyHierarchyCacheEntry"/> into a
/// <see cref="DashboardGalaxySummary"/> for the Blazor pages. Top-templates and
/// per-category breakdowns are computed here rather than stored on the cache so the
/// Galaxy namespace stays free of dashboard-presentation concepts.
/// </summary>
/// <summary>Projects the precomputed Galaxy cache dashboard summary.</summary>
internal static class DashboardGalaxyProjector
{
private const int TopTemplatesLimit = 10;
private static readonly IReadOnlyDictionary<int, string> CategoryNamesById = new Dictionary<int, string>
{
[1] = "WinPlatform",
[3] = "AppEngine",
[4] = "InTouchViewApp",
[10] = "UserDefined",
[11] = "FieldReference",
[13] = "Area",
[17] = "DIObject",
[24] = "DDESuiteLinkClient",
[26] = "OPCClient",
};
public static DashboardGalaxySummary Project(GalaxyHierarchyCacheEntry entry)
{
DashboardGalaxyStatus status = entry.Status switch
{
GalaxyCacheStatus.Healthy => DashboardGalaxyStatus.Healthy,
GalaxyCacheStatus.Stale => DashboardGalaxyStatus.Stale,
GalaxyCacheStatus.Unavailable => DashboardGalaxyStatus.Unavailable,
_ => DashboardGalaxyStatus.Unknown,
};
IReadOnlyList<DashboardGalaxyTemplateUsage> topTemplates;
IReadOnlyList<DashboardGalaxyCategoryCount> objectCategories;
if (entry.Hierarchy.Count == 0)
{
topTemplates = Array.Empty<DashboardGalaxyTemplateUsage>();
objectCategories = Array.Empty<DashboardGalaxyCategoryCount>();
}
else
{
Dictionary<int, int> objectsByCategory = new();
Dictionary<string, int> templateUsage = new(StringComparer.OrdinalIgnoreCase);
foreach (GalaxyHierarchyRow row in entry.Hierarchy)
{
objectsByCategory.TryGetValue(row.CategoryId, out int categoryCount);
objectsByCategory[row.CategoryId] = categoryCount + 1;
if (row.TemplateChain.Count > 0)
{
string immediate = row.TemplateChain[0];
if (!string.IsNullOrWhiteSpace(immediate))
{
templateUsage.TryGetValue(immediate, out int templateCount);
templateUsage[immediate] = templateCount + 1;
}
}
}
topTemplates = templateUsage
.OrderByDescending(entry => entry.Value)
.ThenBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Take(TopTemplatesLimit)
.Select(entry => new DashboardGalaxyTemplateUsage(entry.Key, entry.Value))
.ToArray();
objectCategories = objectsByCategory
.OrderByDescending(entry => entry.Value)
.ThenBy(entry => entry.Key)
.Select(entry => new DashboardGalaxyCategoryCount(
entry.Key,
CategoryNamesById.TryGetValue(entry.Key, out string? name) ? name : $"Category {entry.Key}",
entry.Value))
.ToArray();
}
return new DashboardGalaxySummary(
Status: status,
LastQueriedAt: entry.LastQueriedAt,
LastSuccessAt: entry.LastSuccessAt,
LastDeployTime: entry.LastDeployTime,
LastError: entry.LastError,
ObjectCount: entry.ObjectCount,
AreaCount: entry.AreaCount,
AttributeCount: entry.AttributeCount,
HistorizedAttributeCount: entry.HistorizedAttributeCount,
AlarmAttributeCount: entry.AlarmAttributeCount,
TopTemplates: topTemplates,
ObjectCategories: objectCategories);
return entry.DashboardSummary;
}
}
@@ -2,6 +2,7 @@ using Google.Protobuf.WellKnownTypes;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using MxGateway.Contracts.Proto.Galaxy;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Grpc;
namespace MxGateway.Server.Galaxy;
@@ -43,7 +44,16 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
{
GalaxyHierarchyCacheEntry snapshot = Volatile.Read(ref _current);
GalaxyCacheStatus projected = ProjectStatus(snapshot);
return projected == snapshot.Status ? snapshot : snapshot with { Status = projected };
return projected == snapshot.Status
? snapshot
: snapshot with
{
Status = projected,
DashboardSummary = snapshot.DashboardSummary with
{
Status = MapDashboardStatus(projected),
},
};
}
}
@@ -101,11 +111,23 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
List<GalaxyHierarchyRow> hierarchy = hierarchyTask.Result;
List<GalaxyAttributeRow> attributes = attributesTask.Result;
DiscoverHierarchyReply reply = BuildReply(hierarchy, attributes);
IReadOnlyList<GalaxyObject> objects = BuildObjects(hierarchy, attributes);
int areaCount = hierarchy.Count(row => row.IsArea);
int historized = attributes.Count(row => row.IsHistorized);
int alarms = attributes.Count(row => row.IsAlarm);
DashboardGalaxySummary dashboardSummary = BuildDashboardSummary(
status: GalaxyCacheStatus.Healthy,
lastQueriedAt: queriedAt,
lastSuccessAt: queriedAt,
lastDeployTime: deployTime,
lastError: null,
hierarchy: hierarchy,
objectCount: hierarchy.Count,
areaCount: areaCount,
attributeCount: attributes.Count,
historizedAttributeCount: historized,
alarmAttributeCount: alarms);
long nextSequence = previous.Sequence + 1;
GalaxyHierarchyCacheEntry next = new(
@@ -115,9 +137,8 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
LastSuccessAt: queriedAt,
LastDeployTime: deployTime,
LastError: null,
Hierarchy: hierarchy,
Attributes: attributes,
Reply: reply,
Objects: objects,
DashboardSummary: dashboardSummary,
ObjectCount: hierarchy.Count,
AreaCount: areaCount,
AttributeCount: attributes.Count,
@@ -146,13 +167,19 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
Status = previous.HasData ? GalaxyCacheStatus.Stale : GalaxyCacheStatus.Unavailable,
LastQueriedAt = queriedAt,
LastError = exception.Message,
DashboardSummary = previous.DashboardSummary with
{
Status = MapDashboardStatus(previous.HasData ? GalaxyCacheStatus.Stale : GalaxyCacheStatus.Unavailable),
LastQueriedAt = queriedAt,
LastError = exception.Message,
},
};
Volatile.Write(ref _current, failed);
_firstLoad.TrySetResult();
}
}
private static DiscoverHierarchyReply BuildReply(
private static IReadOnlyList<GalaxyObject> BuildObjects(
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
IReadOnlyList<GalaxyAttributeRow> attributes)
{
@@ -160,14 +187,110 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
.GroupBy(a => a.GobjectId)
.ToDictionary(g => g.Key, g => g.ToList());
DiscoverHierarchyReply reply = new();
List<GalaxyObject> objects = new(hierarchy.Count);
foreach (GalaxyHierarchyRow row in hierarchy)
{
reply.Objects.Add(GalaxyProtoMapper.MapObject(row, attributesByGobjectId));
objects.Add(GalaxyProtoMapper.MapObject(row, attributesByGobjectId));
}
return reply;
return objects;
}
private static DashboardGalaxySummary BuildDashboardSummary(
GalaxyCacheStatus status,
DateTimeOffset? lastQueriedAt,
DateTimeOffset? lastSuccessAt,
DateTimeOffset? lastDeployTime,
string? lastError,
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
int objectCount,
int areaCount,
int attributeCount,
int historizedAttributeCount,
int alarmAttributeCount)
{
IReadOnlyList<DashboardGalaxyTemplateUsage> topTemplates;
IReadOnlyList<DashboardGalaxyCategoryCount> objectCategories;
if (hierarchy.Count == 0)
{
topTemplates = Array.Empty<DashboardGalaxyTemplateUsage>();
objectCategories = Array.Empty<DashboardGalaxyCategoryCount>();
}
else
{
Dictionary<int, int> objectsByCategory = new();
Dictionary<string, int> templateUsage = new(StringComparer.OrdinalIgnoreCase);
foreach (GalaxyHierarchyRow row in hierarchy)
{
objectsByCategory.TryGetValue(row.CategoryId, out int categoryCount);
objectsByCategory[row.CategoryId] = categoryCount + 1;
if (row.TemplateChain.Count > 0)
{
string immediate = row.TemplateChain[0];
if (!string.IsNullOrWhiteSpace(immediate))
{
templateUsage.TryGetValue(immediate, out int templateCount);
templateUsage[immediate] = templateCount + 1;
}
}
}
topTemplates = templateUsage
.OrderByDescending(entry => entry.Value)
.ThenBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Take(10)
.Select(entry => new DashboardGalaxyTemplateUsage(entry.Key, entry.Value))
.ToArray();
objectCategories = objectsByCategory
.OrderByDescending(entry => entry.Value)
.ThenBy(entry => entry.Key)
.Select(entry => new DashboardGalaxyCategoryCount(
entry.Key,
ResolveCategoryName(entry.Key),
entry.Value))
.ToArray();
}
return new DashboardGalaxySummary(
Status: MapDashboardStatus(status),
LastQueriedAt: lastQueriedAt,
LastSuccessAt: lastSuccessAt,
LastDeployTime: lastDeployTime,
LastError: lastError,
ObjectCount: objectCount,
AreaCount: areaCount,
AttributeCount: attributeCount,
HistorizedAttributeCount: historizedAttributeCount,
AlarmAttributeCount: alarmAttributeCount,
TopTemplates: topTemplates,
ObjectCategories: objectCategories);
}
private static DashboardGalaxyStatus MapDashboardStatus(GalaxyCacheStatus status) => status switch
{
GalaxyCacheStatus.Healthy => DashboardGalaxyStatus.Healthy,
GalaxyCacheStatus.Stale => DashboardGalaxyStatus.Stale,
GalaxyCacheStatus.Unavailable => DashboardGalaxyStatus.Unavailable,
_ => DashboardGalaxyStatus.Unknown,
};
private static string ResolveCategoryName(int categoryId) => categoryId switch
{
1 => "WinPlatform",
3 => "AppEngine",
4 => "InTouchViewApp",
10 => "UserDefined",
11 => "FieldReference",
13 => "Area",
17 => "DIObject",
24 => "DDESuiteLinkClient",
26 => "OPCClient",
_ => $"Category {categoryId}",
};
private GalaxyCacheStatus ProjectStatus(GalaxyHierarchyCacheEntry snapshot)
{
if (snapshot.Status is GalaxyCacheStatus.Unknown or GalaxyCacheStatus.Unavailable)
@@ -1,11 +1,12 @@
using MxGateway.Contracts.Proto.Galaxy;
using MxGateway.Server.Dashboard;
namespace MxGateway.Server.Galaxy;
/// <summary>
/// Immutable snapshot of the Galaxy Repository browse data held by
/// <see cref="GalaxyHierarchyCache"/>. Multiple gRPC clients share the same instance —
/// the materialized <see cref="Reply"/> is produced once per refresh and reused.
/// <see cref="GalaxyHierarchyCache"/>. Multiple gRPC clients share the same
/// materialized object list and precomputed dashboard projection.
/// </summary>
public sealed record GalaxyHierarchyCacheEntry(
GalaxyCacheStatus Status,
@@ -14,9 +15,8 @@ public sealed record GalaxyHierarchyCacheEntry(
DateTimeOffset? LastSuccessAt,
DateTimeOffset? LastDeployTime,
string? LastError,
IReadOnlyList<GalaxyHierarchyRow> Hierarchy,
IReadOnlyList<GalaxyAttributeRow> Attributes,
DiscoverHierarchyReply? Reply,
IReadOnlyList<GalaxyObject> Objects,
DashboardGalaxySummary DashboardSummary,
int ObjectCount,
int AreaCount,
int AttributeCount,
@@ -30,9 +30,8 @@ public sealed record GalaxyHierarchyCacheEntry(
LastSuccessAt: null,
LastDeployTime: null,
LastError: null,
Hierarchy: Array.Empty<GalaxyHierarchyRow>(),
Attributes: Array.Empty<GalaxyAttributeRow>(),
Reply: null,
Objects: Array.Empty<GalaxyObject>(),
DashboardSummary: DashboardGalaxySummary.Unknown,
ObjectCount: 0,
AreaCount: 0,
AttributeCount: 0,
@@ -21,6 +21,8 @@ public sealed class GalaxyRepositoryGrpcService(
ILogger<GalaxyRepositoryGrpcService> logger) : ProtoGalaxyRepository.GalaxyRepositoryBase
{
private static readonly TimeSpan FirstLoadWaitBudget = TimeSpan.FromSeconds(5);
private const int DefaultDiscoverPageSize = 1000;
private const int MaxDiscoverPageSize = 5000;
public override async Task<TestConnectionReply> TestConnection(
TestConnectionRequest request,
@@ -59,16 +61,39 @@ public sealed class GalaxyRepositoryGrpcService(
await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false);
GalaxyDb.GalaxyHierarchyCacheEntry entry = cache.Current;
if (!entry.HasData || entry.Reply is null)
if (!entry.HasData)
{
throw new RpcException(new Status(
StatusCode.Unavailable,
ResolveUnavailableMessage(entry)));
}
// Same materialized reply is shared across all clients — gRPC serialization is
// read-only and the entry is replaced atomically on the next refresh.
return entry.Reply;
int offset = ParsePageToken(request.PageToken);
if (offset > entry.Objects.Count)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is outside the current hierarchy."));
}
int pageSize = ResolvePageSize(request.PageSize);
int take = Math.Min(pageSize, entry.Objects.Count - offset);
DiscoverHierarchyReply reply = new()
{
TotalObjectCount = entry.Objects.Count,
};
for (int index = offset; index < offset + take; index++)
{
reply.Objects.Add(entry.Objects[index].Clone());
}
int nextOffset = offset + take;
if (nextOffset < entry.Objects.Count)
{
reply.NextPageToken = nextOffset.ToString(System.Globalization.CultureInfo.InvariantCulture);
}
return reply;
}
public override async Task WatchDeployEvents(
@@ -144,6 +169,41 @@ public sealed class GalaxyRepositoryGrpcService(
_ => "Galaxy cache has no data available.",
};
private static int ResolvePageSize(int requestedPageSize)
{
if (requestedPageSize < 0)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_size must be greater than zero when provided."));
}
int pageSize = requestedPageSize == 0 ? DefaultDiscoverPageSize : requestedPageSize;
return Math.Min(pageSize, MaxDiscoverPageSize);
}
private static int ParsePageToken(string pageToken)
{
if (string.IsNullOrWhiteSpace(pageToken))
{
return 0;
}
if (!int.TryParse(
pageToken,
System.Globalization.NumberStyles.None,
System.Globalization.CultureInfo.InvariantCulture,
out int offset)
|| offset < 0)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is invalid."));
}
return offset;
}
[System.Diagnostics.CodeAnalysis.SuppressMessage(
"Style",
"IDE0051:Remove unused private members",
@@ -1,4 +1,6 @@
using Grpc.Core.Interceptors;
using Microsoft.Extensions.Configuration;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authorization;
@@ -9,6 +11,15 @@ public static class GrpcAuthorizationServiceCollectionExtensions
services.AddSingleton<GatewayGrpcScopeResolver>();
services.AddSingleton<IGatewayRequestIdentityAccessor, GatewayRequestIdentityAccessor>();
services.AddSingleton<GatewayGrpcAuthorizationInterceptor>();
services
.AddOptions<global::Grpc.AspNetCore.Server.GrpcServiceOptions>()
.Configure<IConfiguration>((grpcOptions, configuration) =>
{
ProtocolOptions protocolOptions = new();
configuration.GetSection("MxGateway:Protocol").Bind(protocolOptions);
grpcOptions.MaxReceiveMessageSize = protocolOptions.MaxGrpcMessageBytes;
grpcOptions.MaxSendMessageSize = protocolOptions.MaxGrpcMessageBytes;
});
services.AddGrpc(options => options.Interceptors.Add<GatewayGrpcAuthorizationInterceptor>());
return services;
@@ -27,6 +27,35 @@ public sealed class GatewaySession
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
DateTimeOffset openedAt)
: this(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
clientSessionName,
clientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
TimeSpan.FromMinutes(30),
openedAt)
{
}
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
TimeSpan leaseDuration,
DateTimeOffset openedAt)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
@@ -58,8 +87,10 @@ public sealed class GatewaySession
CommandTimeout = commandTimeout;
StartupTimeout = startupTimeout;
ShutdownTimeout = shutdownTimeout;
LeaseDuration = leaseDuration;
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
_leaseExpiresAt = openedAt + leaseDuration;
}
public string SessionId { get; }
@@ -82,6 +113,8 @@ public sealed class GatewaySession
public TimeSpan ShutdownTimeout { get; }
public TimeSpan LeaseDuration { get; }
public DateTimeOffset OpenedAt { get; }
public int? WorkerProcessId => _workerClient?.ProcessId;
@@ -195,6 +228,7 @@ public sealed class GatewaySession
lock (_syncRoot)
{
_lastClientActivityAt = activityAt;
_leaseExpiresAt = activityAt + LeaseDuration;
}
}
@@ -210,7 +244,9 @@ public sealed class GatewaySession
{
lock (_syncRoot)
{
return _leaseExpiresAt is not null && _leaseExpiresAt <= now;
return _activeEventSubscriberCount == 0
&& _leaseExpiresAt is not null
&& _leaseExpiresAt <= now;
}
}
@@ -0,0 +1,45 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Sessions;
public sealed class SessionLeaseMonitorHostedService(
ISessionManager sessionManager,
IOptions<GatewayOptions> options,
ILogger<SessionLeaseMonitorHostedService> logger,
TimeProvider? timeProvider = null) : BackgroundService
{
private readonly TimeProvider _timeProvider = timeProvider ?? TimeProvider.System;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
TimeSpan interval = TimeSpan.FromSeconds(Math.Max(1, options.Value.Sessions.LeaseSweepIntervalSeconds));
using PeriodicTimer timer = new(interval, _timeProvider);
try
{
while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
{
try
{
await sessionManager
.CloseExpiredLeasesAsync(_timeProvider.GetUtcNow(), stoppingToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (Exception exception)
{
logger.LogWarning(exception, "Session lease sweep failed.");
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
}
}
}
@@ -287,6 +287,7 @@ public sealed class SessionManager : ISessionManager
TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout);
TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds);
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds);
TimeSpan leaseDuration = TimeSpan.FromSeconds(_options.Sessions.DefaultLeaseSeconds);
string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}";
string nonce = CreateNonce();
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
@@ -303,6 +304,7 @@ public sealed class SessionManager : ISessionManager
commandTimeout,
startupTimeout,
shutdownTimeout,
leaseDuration,
openedAt);
}
@@ -7,6 +7,7 @@ public static class SessionServiceCollectionExtensions
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
services.AddHostedService<SessionLeaseMonitorHostedService>();
services.AddHostedService<SessionShutdownHostedService>();
return services;
+36 -3
View File
@@ -231,11 +231,17 @@ public sealed class WorkerClient : IWorkerClient
}
WorkerClientState state = State;
if (state is WorkerClientState.Closed or WorkerClientState.Faulted)
if (state == WorkerClientState.Closed)
{
return;
}
if (state == WorkerClientState.Faulted)
{
KillOwnedProcess("ShutdownFaulted");
return;
}
MarkClosing();
await EnqueueAsync(CreateShutdownEnvelope(timeout, "gateway-shutdown"), cancellationToken).ConfigureAwait(false);
_outboundEnvelopes.Writer.TryComplete();
@@ -263,8 +269,7 @@ public sealed class WorkerClient : IWorkerClient
public void Kill(string reason)
{
ThrowIfDisposed();
_connection.ProcessHandle?.Process.Kill(entireProcessTree: true);
_metrics?.WorkerKilled(reason);
KillOwnedProcess(reason);
SetFaulted(
WorkerClientErrorCode.WorkerFaulted,
$"Worker was killed by the gateway: {reason}.",
@@ -279,6 +284,7 @@ public sealed class WorkerClient : IWorkerClient
}
_disposed = true;
KillOwnedProcess("Dispose");
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete();
_events.Writer.TryComplete();
@@ -607,12 +613,39 @@ public sealed class WorkerClient : IWorkerClient
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete(fault);
_events.Writer.TryComplete(fault);
KillOwnedProcess(errorCode.ToString());
CompletePendingCommands(fault);
RecordWorkerStoppedOnce(errorCode.ToString());
_metrics?.Fault(errorCode.ToString());
_logger.LogWarning(exception, "Worker client faulted for session {SessionId}: {Message}", SessionId, message);
}
private void KillOwnedProcess(string reason)
{
WorkerProcessHandle? processHandle = _connection.ProcessHandle;
if (processHandle is null)
{
return;
}
try
{
if (!processHandle.Process.HasExited)
{
processHandle.Process.Kill(entireProcessTree: true);
_metrics?.WorkerKilled(reason);
}
}
catch (Exception exception)
{
_logger.LogWarning(
exception,
"Failed to kill worker process {ProcessId} for session {SessionId}.",
processHandle.ProcessId,
SessionId);
}
}
private void RecordWorkerStoppedOnce(string reason)
{
bool shouldRecord;
+5 -1
View File
@@ -25,6 +25,9 @@
"Sessions": {
"DefaultCommandTimeoutSeconds": 30,
"MaxSessions": 64,
"MaxPendingCommandsPerSession": 128,
"DefaultLeaseSeconds": 1800,
"LeaseSweepIntervalSeconds": 30,
"AllowMultipleEventSubscribers": false
},
"Events": {
@@ -42,7 +45,8 @@
"ShowTagValues": false
},
"Protocol": {
"WorkerProtocolVersion": 1
"WorkerProtocolVersion": 1,
"MaxGrpcMessageBytes": 16777216
},
"Galaxy": {
"ConnectionString": "Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;",