docs(lmxproxy): add stale session subscription fix plan
This commit is contained in:
185
lmxproxy/docs/plans/lmxproxy-stale-session-fix.md
Normal file
185
lmxproxy/docs/plans/lmxproxy-stale-session-fix.md
Normal file
@@ -0,0 +1,185 @@
|
||||
# LmxProxy Stale Session Subscription Leak Fix
|
||||
|
||||
## Problem
|
||||
|
||||
When a gRPC client disconnects abruptly, Grpc.Core (the C-core library used by the .NET Framework 4.8 server) does not reliably fire the `ServerCallContext.CancellationToken`. This means:
|
||||
|
||||
1. The `Subscribe` RPC in `ScadaGrpcService` blocks forever on `reader.WaitToReadAsync(context.CancellationToken)` (line 368)
|
||||
2. The `finally` block with `_subscriptionManager.UnsubscribeClient(request.SessionId)` never runs
|
||||
3. The `ct.Register(() => UnsubscribeClient(clientId))` in `SubscriptionManager.SubscribeAsync` also never fires (same token)
|
||||
4. The old session's subscriptions leak in `SubscriptionManager._clientSubscriptions` and `_tagSubscriptions`
|
||||
|
||||
When the client reconnects with a new session ID, it creates duplicate subscriptions. Tags aren't cleaned up because they still have a ref-count from the leaked old session. Over time, client count grows and tag subscriptions accumulate.
|
||||
|
||||
The `SessionManager` does scavenge inactive sessions after 5 minutes, but it only removes the session from its own dictionary — it doesn't notify `SubscriptionManager` to clean up subscriptions.
|
||||
|
||||
## Fix
|
||||
|
||||
Bridge `SessionManager` scavenging to `SubscriptionManager` cleanup. When a session is scavenged due to inactivity, also call `SubscriptionManager.UnsubscribeClient()`.
|
||||
|
||||
### Step 1: Add cleanup callback to SessionManager
|
||||
|
||||
File: `src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs`
|
||||
|
||||
Add a callback field and expose it:
|
||||
|
||||
```csharp
|
||||
// Add after the _inactivityTimeout field (line 22)
|
||||
private Action<string>? _onSessionScavenged;
|
||||
|
||||
/// <summary>
|
||||
/// Register a callback invoked when a session is scavenged due to inactivity.
|
||||
/// The callback receives the session ID.
|
||||
/// </summary>
|
||||
public void OnSessionScavenged(Action<string> callback)
|
||||
{
|
||||
_onSessionScavenged = callback;
|
||||
}
|
||||
```
|
||||
|
||||
Then in `ScavengeInactiveSessions`, invoke the callback for each scavenged session:
|
||||
|
||||
```csharp
|
||||
// In ScavengeInactiveSessions (line 103-118), change the foreach to:
|
||||
foreach (var kvp in expired)
|
||||
{
|
||||
if (_sessions.TryRemove(kvp.Key, out _))
|
||||
{
|
||||
Log.Information("Session {SessionId} scavenged (inactive since {LastActivity})",
|
||||
kvp.Key, kvp.Value.LastActivity);
|
||||
|
||||
// Notify subscriber cleanup
|
||||
try
|
||||
{
|
||||
_onSessionScavenged?.Invoke(kvp.Key);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Error in session scavenge callback for {SessionId}", kvp.Key);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Step 2: Wire up the callback in LmxProxyService
|
||||
|
||||
File: `src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs`
|
||||
|
||||
After both `SessionManager` and `SubscriptionManager` are created, register the callback:
|
||||
|
||||
```csharp
|
||||
// Add after SubscriptionManager creation:
|
||||
_sessionManager.OnSessionScavenged(sessionId =>
|
||||
{
|
||||
Log.Information("Cleaning up subscriptions for scavenged session {SessionId}", sessionId);
|
||||
_subscriptionManager.UnsubscribeClient(sessionId);
|
||||
});
|
||||
```
|
||||
|
||||
Find where `_sessionManager` and `_subscriptionManager` are both initialized and add this line right after.
|
||||
|
||||
### Step 3: Also clean up on explicit Disconnect
|
||||
|
||||
This is already handled — `ScadaGrpcService.Disconnect()` (line 86) calls `_subscriptionManager.UnsubscribeClient(request.SessionId)` before terminating the session. No change needed.
|
||||
|
||||
### Step 4: Add proactive stream timeout (belt-and-suspenders)
|
||||
|
||||
The scavenger runs every 60 seconds with a 5-minute timeout. This means a leaked session could take up to 6 minutes to clean up. For faster detection, add a secondary timeout in the Subscribe RPC itself.
|
||||
|
||||
File: `src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs`
|
||||
|
||||
In the `Subscribe` method, replace the simple `context.CancellationToken` with a combined token that also expires if the session becomes invalid:
|
||||
|
||||
```csharp
|
||||
// Replace the Subscribe method (lines 353-390) with:
|
||||
public override async Task Subscribe(
|
||||
Scada.SubscribeRequest request,
|
||||
IServerStreamWriter<Scada.VtqMessage> responseStream,
|
||||
ServerCallContext context)
|
||||
{
|
||||
if (!_sessionManager.ValidateSession(request.SessionId))
|
||||
{
|
||||
throw new RpcException(new GrpcStatus(StatusCode.Unauthenticated, "Invalid session"));
|
||||
}
|
||||
|
||||
var reader = await _subscriptionManager.SubscribeAsync(
|
||||
request.SessionId, request.Tags, context.CancellationToken);
|
||||
|
||||
try
|
||||
{
|
||||
// Use a combined approach: check both the gRPC cancellation token AND
|
||||
// periodic session validity. This works around Grpc.Core not reliably
|
||||
// firing CancellationToken on client disconnect.
|
||||
while (true)
|
||||
{
|
||||
// Wait for data with a timeout so we can periodically check session validity
|
||||
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
||||
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
|
||||
context.CancellationToken, timeoutCts.Token);
|
||||
|
||||
bool hasData;
|
||||
try
|
||||
{
|
||||
hasData = await reader.WaitToReadAsync(linkedCts.Token);
|
||||
}
|
||||
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested
|
||||
&& !context.CancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Timeout expired, not a client disconnect — check if session is still valid
|
||||
if (!_sessionManager.ValidateSession(request.SessionId))
|
||||
{
|
||||
Log.Information("Subscribe stream ending — session {SessionId} no longer valid",
|
||||
request.SessionId);
|
||||
break;
|
||||
}
|
||||
continue; // Session still valid, keep waiting
|
||||
}
|
||||
|
||||
if (!hasData) break; // Channel completed
|
||||
|
||||
while (reader.TryRead(out var item))
|
||||
{
|
||||
var protoVtq = ConvertToProtoVtq(item.address, item.vtq);
|
||||
await responseStream.WriteAsync(protoVtq);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Client disconnected -- normal
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "Subscribe stream error for session {SessionId}", request.SessionId);
|
||||
throw new RpcException(new GrpcStatus(StatusCode.Internal, ex.Message));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_subscriptionManager.UnsubscribeClient(request.SessionId);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This adds a 30-second periodic check: if no data arrives for 30 seconds, it checks whether the session is still valid. If the session was scavenged (client disconnected, 5-min timeout), the stream exits cleanly and runs the `finally` cleanup.
|
||||
|
||||
## Summary of Changes
|
||||
|
||||
| File | Change |
|
||||
|------|--------|
|
||||
| `Sessions/SessionManager.cs` | Add `_onSessionScavenged` callback, invoke during `ScavengeInactiveSessions` |
|
||||
| `LmxProxyService.cs` | Wire `_sessionManager.OnSessionScavenged` to `_subscriptionManager.UnsubscribeClient` |
|
||||
| `Grpc/Services/ScadaGrpcService.cs` | Add 30-second periodic session validity check in `Subscribe` loop |
|
||||
|
||||
## Testing
|
||||
|
||||
1. Start LmxProxy server
|
||||
2. Connect a client and subscribe to tags
|
||||
3. Kill the client process abruptly (not a clean disconnect)
|
||||
4. Check status page — client count should still show the old session
|
||||
5. Wait up to 5 minutes — session should be scavenged, subscription count should drop
|
||||
6. Reconnect client — should get a clean new session, no duplicate subscriptions
|
||||
7. Verify tag subscription counts match expected (no leaked refs)
|
||||
|
||||
## Optional: Reduce scavenge timeout for faster cleanup
|
||||
|
||||
In `LmxProxyService.cs` where `SessionManager` is constructed, consider reducing `inactivityTimeoutMinutes` from 5 to 2, since the Subscribe RPC now has its own 30-second validity check. The 5-minute timeout was the only cleanup path before; now it's belt-and-suspenders.
|
||||
Reference in New Issue
Block a user