feat(lmxproxy): active health probing + address-based subscription cleanup (gap 1 & 2)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-22 06:44:21 -04:00
parent 86a15c0a65
commit a6c01d73e2
12 changed files with 301 additions and 43 deletions

View File

@@ -30,10 +30,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
private readonly ConcurrentDictionary<string, TagSubscription> _tagSubscriptions
= new ConcurrentDictionary<string, TagSubscription>(StringComparer.OrdinalIgnoreCase);
// Tag address -> MxAccess subscription handle (for cleanup when last client unsubscribes)
private readonly ConcurrentDictionary<string, IAsyncDisposable> _mxAccessHandles
= new ConcurrentDictionary<string, IAsyncDisposable>(StringComparer.OrdinalIgnoreCase);
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000,
@@ -105,15 +101,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
{
try
{
var handle = await _scadaClient.SubscribeAsync(
await _scadaClient.SubscribeAsync(
addresses,
(address, vtq) => OnTagValueChanged(address, vtq));
// Store handle for each address so we can dispose per-tag
foreach (var address in addresses)
{
_mxAccessHandles[address] = handle;
}
}
catch (Exception ex)
{
@@ -195,19 +185,16 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
_rwLock.ExitWriteLock();
}
// Dispose MxAccess handles for tags with no remaining clients
foreach (var address in tagsToDispose)
// Unsubscribe tags with no remaining clients via address-based API
if (tagsToDispose.Count > 0)
{
if (_mxAccessHandles.TryRemove(address, out var handle))
try
{
try
{
handle.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error disposing MxAccess subscription for {Address}", address);
}
_scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count);
}
}
@@ -235,6 +222,18 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
}
}
/// <summary>
/// Logs reconnection for observability. Data flow resumes automatically
/// via MxAccessClient.RecreateStoredSubscriptionsAsync callbacks.
/// </summary>
public void NotifyReconnection()
{
Log.Information("MxAccess reconnected -- subscriptions recreated, " +
"data flow will resume via OnDataChange callbacks " +
"({ClientCount} clients, {TagCount} tags)",
_clientSubscriptions.Count, _tagSubscriptions.Count);
}
/// <summary>Returns subscription statistics.</summary>
public SubscriptionStats GetStats()
{