diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs index 0f961da..7bf5fe2 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Grpc.Core; using GrpcStatus = Grpc.Core.Status; @@ -365,12 +366,41 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services try { - while (await reader.WaitToReadAsync(context.CancellationToken)) + // Use a combined approach: check both the gRPC cancellation token AND + // periodic session validity. This works around Grpc.Core not reliably + // firing CancellationToken on client disconnect. + while (true) { - while (reader.TryRead(out var item)) + // Wait for data with a timeout so we can periodically check session validity + using (var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30))) + using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource( + context.CancellationToken, timeoutCts.Token)) { - var protoVtq = ConvertToProtoVtq(item.address, item.vtq); - await responseStream.WriteAsync(protoVtq); + bool hasData; + try + { + hasData = await reader.WaitToReadAsync(linkedCts.Token); + } + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested + && !context.CancellationToken.IsCancellationRequested) + { + // Timeout expired, not a client disconnect — check if session is still valid + if (!_sessionManager.ValidateSession(request.SessionId)) + { + Log.Information("Subscribe stream ending — session {SessionId} no longer valid", + request.SessionId); + break; + } + continue; // Session still valid, keep waiting + } + + if (!hasData) break; // Channel completed + + while (reader.TryRead(out var item)) + { + var protoVtq = ConvertToProtoVtq(item.address, item.vtq); + await responseStream.WriteAsync(protoVtq); + } } } } diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs index 2e5beed..dd8cf43 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs @@ -112,6 +112,11 @@ namespace ZB.MOM.WW.LmxProxy.Host // 8. Create SessionManager _sessionManager = new SessionManager(inactivityTimeoutMinutes: 5); + _sessionManager.OnSessionScavenged(sessionId => + { + Log.Information("Cleaning up subscriptions for scavenged session {SessionId}", sessionId); + _subscriptionManager.UnsubscribeClient(sessionId); + }); // 9. Create performance metrics _performanceMetrics = new PerformanceMetrics(); diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs index c0ef555..4e80b7c 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs @@ -20,6 +20,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions private readonly Timer? _scavengingTimer; private readonly TimeSpan _inactivityTimeout; + private Action? _onSessionScavenged; /// /// Creates a SessionManager with optional inactivity scavenging. @@ -40,6 +41,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions } } + /// + /// Register a callback invoked when a session is scavenged due to inactivity. + /// The callback receives the session ID. + /// + public void OnSessionScavenged(Action callback) + { + _onSessionScavenged = callback; + } + /// Gets the count of active sessions. public int ActiveSessionCount => _sessions.Count; @@ -113,6 +123,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.Sessions { Log.Information("Session {SessionId} scavenged (inactive since {LastActivity})", kvp.Key, kvp.Value.LastActivity); + + try + { + _onSessionScavenged?.Invoke(kvp.Key); + } + catch (Exception ex) + { + Log.Warning(ex, "Error in session scavenge callback for {SessionId}", kvp.Key); + } } } }