From eecd82b7870e5d59709f2ce05cc8f8d9e61d0c86 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Mar 2026 15:21:06 -0400 Subject: [PATCH] fix(lmxproxy): clean up stale session subscriptions on scavenge and add stream timeout Grpc.Core doesn't reliably fire CancellationToken on client disconnect, so Subscribe RPCs can hang forever and leak session subscriptions. Bridge SessionManager scavenging to SubscriptionManager cleanup, and add a 30-second periodic session validity check in the Subscribe loop so stale streams exit within 30s of session scavenge rather than hanging until process restart. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Grpc/Services/ScadaGrpcService.cs | 38 +++++++++++++++++-- .../LmxProxyService.cs | 5 +++ .../Sessions/SessionManager.cs | 19 ++++++++++ 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs index 0f961da..7bf5fe2 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Grpc.Core; using GrpcStatus = Grpc.Core.Status; @@ -365,12 +366,41 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services try { - while (await reader.WaitToReadAsync(context.CancellationToken)) + // Use a combined approach: check both the gRPC cancellation token AND + // periodic session validity. This works around Grpc.Core not reliably + // firing CancellationToken on client disconnect. + while (true) { - while (reader.TryRead(out var item)) + // Wait for data with a timeout so we can periodically check session validity + using (var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30))) + using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource( + context.CancellationToken, timeoutCts.Token)) { - var protoVtq = ConvertToProtoVtq(item.address, item.vtq); - await responseStream.WriteAsync(protoVtq); + bool hasData; + try + { + hasData = await reader.WaitToReadAsync(linkedCts.Token); + } + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested + && !context.CancellationToken.IsCancellationRequested) + { + // Timeout expired, not a client disconnect — check if session is still valid + if (!_sessionManager.ValidateSession(request.SessionId)) + { + Log.Information("Subscribe stream ending — session {SessionId} no longer valid", + request.SessionId); + break; + } + continue; // Session still valid, keep waiting + } + + if (!hasData) break; // Channel completed + + while (reader.TryRead(out var item)) + { + var protoVtq = ConvertToProtoVtq(item.address, item.vtq); + await responseStream.WriteAsync(protoVtq); + } } } } diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs index 2e5beed..dd8cf43 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs @@ -112,6 +112,11 @@ namespace ZB.MOM.WW.LmxProxy.Host // 8. Create SessionManager _sessionManager = new SessionManager(inactivityTimeoutMinutes: 5); + _sessionManager.OnSessionScavenged(sessionId => + { + Log.Information("Cleaning up subscriptions for scavenged session {SessionId}", sessionId); + _subscriptionManager.UnsubscribeClient(sessionId); + }); // 9. Create performance metrics _performanceMetrics = new PerformanceMetrics(); diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs index c0ef555..4e80b7c 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs @@ -20,6 +20,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions private readonly Timer? _scavengingTimer; private readonly TimeSpan _inactivityTimeout; + private Action? _onSessionScavenged; /// /// Creates a SessionManager with optional inactivity scavenging. @@ -40,6 +41,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions } } + /// + /// Register a callback invoked when a session is scavenged due to inactivity. + /// The callback receives the session ID. + /// + public void OnSessionScavenged(Action callback) + { + _onSessionScavenged = callback; + } + /// Gets the count of active sessions. public int ActiveSessionCount => _sessions.Count; @@ -113,6 +123,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions { Log.Information("Session {SessionId} scavenged (inactive since {LastActivity})", kvp.Key, kvp.Value.LastActivity); + + try + { + _onSessionScavenged?.Invoke(kvp.Key); + } + catch (Exception ex) + { + Log.Warning(ex, "Error in session scavenge callback for {SessionId}", kvp.Key); + } } } }