fix(lmxproxy): clean up stale session subscriptions on scavenge and add stream timeout

Grpc.Core doesn't reliably fire CancellationToken on client disconnect,
so Subscribe RPCs can hang forever and leak session subscriptions. Bridge
SessionManager scavenging to SubscriptionManager cleanup, and add a
30-second periodic session validity check in the Subscribe loop so stale
streams exit within 30s of session scavenge rather than hanging until
process restart.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-24 15:21:06 -04:00
parent b74e139a85
commit eecd82b787
3 changed files with 58 additions and 4 deletions

View File

@@ -1,6 +1,7 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using GrpcStatus = Grpc.Core.Status;
@@ -365,12 +366,41 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services
try
{
while (await reader.WaitToReadAsync(context.CancellationToken))
// Use a combined approach: check both the gRPC cancellation token AND
// periodic session validity. This works around Grpc.Core not reliably
// firing CancellationToken on client disconnect.
while (true)
{
while (reader.TryRead(out var item))
// Wait for data with a timeout so we can periodically check session validity
using (var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
context.CancellationToken, timeoutCts.Token))
{
var protoVtq = ConvertToProtoVtq(item.address, item.vtq);
await responseStream.WriteAsync(protoVtq);
bool hasData;
try
{
hasData = await reader.WaitToReadAsync(linkedCts.Token);
}
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested
&& !context.CancellationToken.IsCancellationRequested)
{
// Timeout expired, not a client disconnect — check if session is still valid
if (!_sessionManager.ValidateSession(request.SessionId))
{
Log.Information("Subscribe stream ending — session {SessionId} no longer valid",
request.SessionId);
break;
}
continue; // Session still valid, keep waiting
}
if (!hasData) break; // Channel completed
while (reader.TryRead(out var item))
{
var protoVtq = ConvertToProtoVtq(item.address, item.vtq);
await responseStream.WriteAsync(protoVtq);
}
}
}
}

View File

@@ -112,6 +112,11 @@ namespace ZB.MOM.WW.LmxProxy.Host
// 8. Create SessionManager
_sessionManager = new SessionManager(inactivityTimeoutMinutes: 5);
_sessionManager.OnSessionScavenged(sessionId =>
{
Log.Information("Cleaning up subscriptions for scavenged session {SessionId}", sessionId);
_subscriptionManager.UnsubscribeClient(sessionId);
});
// 9. Create performance metrics
_performanceMetrics = new PerformanceMetrics();

View File

@@ -20,6 +20,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions
private readonly Timer? _scavengingTimer;
private readonly TimeSpan _inactivityTimeout;
private Action<string>? _onSessionScavenged;
/// <summary>
/// Creates a SessionManager with optional inactivity scavenging.
@@ -40,6 +41,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions
}
}
/// <summary>
/// Register a callback invoked when a session is scavenged due to inactivity.
/// The callback receives the session ID.
/// </summary>
public void OnSessionScavenged(Action<string> callback)
{
_onSessionScavenged = callback;
}
/// <summary>Gets the count of active sessions.</summary>
public int ActiveSessionCount => _sessions.Count;
@@ -113,6 +123,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions
{
Log.Information("Session {SessionId} scavenged (inactive since {LastActivity})",
kvp.Key, kvp.Value.LastActivity);
try
{
_onSessionScavenged?.Invoke(kvp.Key);
}
catch (Exception ex)
{
Log.Warning(ex, "Error in session scavenge callback for {SessionId}", kvp.Key);
}
}
}
}