diff --git a/lmxproxy/docs/plans/lmxproxy-stale-session-fix.md b/lmxproxy/docs/plans/lmxproxy-stale-session-fix.md new file mode 100644 index 0000000..0d17c7f --- /dev/null +++ b/lmxproxy/docs/plans/lmxproxy-stale-session-fix.md @@ -0,0 +1,185 @@ +# LmxProxy Stale Session Subscription Leak Fix + +## Problem + +When a gRPC client disconnects abruptly, Grpc.Core (the C-core library used by the .NET Framework 4.8 server) does not reliably fire the `ServerCallContext.CancellationToken`. This means: + +1. The `Subscribe` RPC in `ScadaGrpcService` blocks forever on `reader.WaitToReadAsync(context.CancellationToken)` (line 368) +2. The `finally` block with `_subscriptionManager.UnsubscribeClient(request.SessionId)` never runs +3. The `ct.Register(() => UnsubscribeClient(clientId))` in `SubscriptionManager.SubscribeAsync` also never fires (same token) +4. The old session's subscriptions leak in `SubscriptionManager._clientSubscriptions` and `_tagSubscriptions` + +When the client reconnects with a new session ID, it creates duplicate subscriptions. Tags aren't cleaned up because they still have a ref-count from the leaked old session. Over time, client count grows and tag subscriptions accumulate. + +The `SessionManager` does scavenge inactive sessions after 5 minutes, but it only removes the session from its own dictionary — it doesn't notify `SubscriptionManager` to clean up subscriptions. + +## Fix + +Bridge `SessionManager` scavenging to `SubscriptionManager` cleanup. When a session is scavenged due to inactivity, also call `SubscriptionManager.UnsubscribeClient()`. + +### Step 1: Add cleanup callback to SessionManager + +File: `src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs` + +Add a callback field and expose it: + +```csharp +// Add after the _inactivityTimeout field (line 22) +private Action? _onSessionScavenged; + +/// +/// Register a callback invoked when a session is scavenged due to inactivity. +/// The callback receives the session ID. +/// +public void OnSessionScavenged(Action callback) +{ + _onSessionScavenged = callback; +} +``` + +Then in `ScavengeInactiveSessions`, invoke the callback for each scavenged session: + +```csharp +// In ScavengeInactiveSessions (line 103-118), change the foreach to: +foreach (var kvp in expired) +{ + if (_sessions.TryRemove(kvp.Key, out _)) + { + Log.Information("Session {SessionId} scavenged (inactive since {LastActivity})", + kvp.Key, kvp.Value.LastActivity); + + // Notify subscriber cleanup + try + { + _onSessionScavenged?.Invoke(kvp.Key); + } + catch (Exception ex) + { + Log.Warning(ex, "Error in session scavenge callback for {SessionId}", kvp.Key); + } + } +} +``` + +### Step 2: Wire up the callback in LmxProxyService + +File: `src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs` + +After both `SessionManager` and `SubscriptionManager` are created, register the callback: + +```csharp +// Add after SubscriptionManager creation: +_sessionManager.OnSessionScavenged(sessionId => +{ + Log.Information("Cleaning up subscriptions for scavenged session {SessionId}", sessionId); + _subscriptionManager.UnsubscribeClient(sessionId); +}); +``` + +Find where `_sessionManager` and `_subscriptionManager` are both initialized and add this line right after. + +### Step 3: Also clean up on explicit Disconnect + +This is already handled — `ScadaGrpcService.Disconnect()` (line 86) calls `_subscriptionManager.UnsubscribeClient(request.SessionId)` before terminating the session. No change needed. + +### Step 4: Add proactive stream timeout (belt-and-suspenders) + +The scavenger runs every 60 seconds with a 5-minute timeout. This means a leaked session could take up to 6 minutes to clean up. For faster detection, add a secondary timeout in the Subscribe RPC itself. + +File: `src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs` + +In the `Subscribe` method, replace the simple `context.CancellationToken` with a combined token that also expires if the session becomes invalid: + +```csharp +// Replace the Subscribe method (lines 353-390) with: +public override async Task Subscribe( + Scada.SubscribeRequest request, + IServerStreamWriter responseStream, + ServerCallContext context) +{ + if (!_sessionManager.ValidateSession(request.SessionId)) + { + throw new RpcException(new GrpcStatus(StatusCode.Unauthenticated, "Invalid session")); + } + + var reader = await _subscriptionManager.SubscribeAsync( + request.SessionId, request.Tags, context.CancellationToken); + + try + { + // Use a combined approach: check both the gRPC cancellation token AND + // periodic session validity. This works around Grpc.Core not reliably + // firing CancellationToken on client disconnect. + while (true) + { + // Wait for data with a timeout so we can periodically check session validity + using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource( + context.CancellationToken, timeoutCts.Token); + + bool hasData; + try + { + hasData = await reader.WaitToReadAsync(linkedCts.Token); + } + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested + && !context.CancellationToken.IsCancellationRequested) + { + // Timeout expired, not a client disconnect — check if session is still valid + if (!_sessionManager.ValidateSession(request.SessionId)) + { + Log.Information("Subscribe stream ending — session {SessionId} no longer valid", + request.SessionId); + break; + } + continue; // Session still valid, keep waiting + } + + if (!hasData) break; // Channel completed + + while (reader.TryRead(out var item)) + { + var protoVtq = ConvertToProtoVtq(item.address, item.vtq); + await responseStream.WriteAsync(protoVtq); + } + } + } + catch (OperationCanceledException) + { + // Client disconnected -- normal + } + catch (Exception ex) + { + Log.Error(ex, "Subscribe stream error for session {SessionId}", request.SessionId); + throw new RpcException(new GrpcStatus(StatusCode.Internal, ex.Message)); + } + finally + { + _subscriptionManager.UnsubscribeClient(request.SessionId); + } +} +``` + +This adds a 30-second periodic check: if no data arrives for 30 seconds, it checks whether the session is still valid. If the session was scavenged (client disconnected, 5-min timeout), the stream exits cleanly and runs the `finally` cleanup. + +## Summary of Changes + +| File | Change | +|------|--------| +| `Sessions/SessionManager.cs` | Add `_onSessionScavenged` callback, invoke during `ScavengeInactiveSessions` | +| `LmxProxyService.cs` | Wire `_sessionManager.OnSessionScavenged` to `_subscriptionManager.UnsubscribeClient` | +| `Grpc/Services/ScadaGrpcService.cs` | Add 30-second periodic session validity check in `Subscribe` loop | + +## Testing + +1. Start LmxProxy server +2. Connect a client and subscribe to tags +3. Kill the client process abruptly (not a clean disconnect) +4. Check status page — client count should still show the old session +5. Wait up to 5 minutes — session should be scavenged, subscription count should drop +6. Reconnect client — should get a clean new session, no duplicate subscriptions +7. Verify tag subscription counts match expected (no leaked refs) + +## Optional: Reduce scavenge timeout for faster cleanup + +In `LmxProxyService.cs` where `SessionManager` is constructed, consider reducing `inactivityTimeoutMinutes` from 5 to 2, since the Subscribe RPC now has its own 30-second validity check. The 5-minute timeout was the only cleanup path before; now it's belt-and-suspenders.