fix(lmxproxy): wire MxAccess COM subscriptions in SubscriptionManager

SubscriptionManager tracked client-to-tag routing but never called
MxAccessClient.SubscribeAsync to create the actual COM subscriptions,
so OnDataChange never fired. Now creates MxAccess subscriptions for
new tags and disposes them when the last client unsubscribes.

All 17 integration tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-22 04:46:15 -04:00
parent c5d4849bd3
commit 7bed4b901a

View File

@@ -4,6 +4,7 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Channels; using System.Threading.Channels;
using System.Threading.Tasks;
using Serilog; using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain; using ZB.MOM.WW.LmxProxy.Host.Domain;
@@ -29,6 +30,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
private readonly ConcurrentDictionary<string, TagSubscription> _tagSubscriptions private readonly ConcurrentDictionary<string, TagSubscription> _tagSubscriptions
= new ConcurrentDictionary<string, TagSubscription>(StringComparer.OrdinalIgnoreCase); = new ConcurrentDictionary<string, TagSubscription>(StringComparer.OrdinalIgnoreCase);
// Tag address -> MxAccess subscription handle (for cleanup when last client unsubscribes)
private readonly ConcurrentDictionary<string, IAsyncDisposable> _mxAccessHandles
= new ConcurrentDictionary<string, IAsyncDisposable>(StringComparer.OrdinalIgnoreCase);
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000, public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000,
@@ -58,6 +63,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
_clientSubscriptions[clientId] = clientSub; _clientSubscriptions[clientId] = clientSub;
var newTags = new List<string>();
_rwLock.EnterWriteLock(); _rwLock.EnterWriteLock();
try try
{ {
@@ -71,6 +78,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
{ {
_tagSubscriptions[address] = new TagSubscription(address, _tagSubscriptions[address] = new TagSubscription(address,
new HashSet<string>(StringComparer.OrdinalIgnoreCase) { clientId }); new HashSet<string>(StringComparer.OrdinalIgnoreCase) { clientId });
newTags.Add(address);
} }
} }
} }
@@ -79,13 +87,40 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
_rwLock.ExitWriteLock(); _rwLock.ExitWriteLock();
} }
// Create MxAccess COM subscriptions for newly subscribed tags
if (newTags.Count > 0)
{
_ = CreateMxAccessSubscriptionsAsync(newTags);
}
// Register cancellation cleanup // Register cancellation cleanup
ct.Register(() => UnsubscribeClient(clientId)); ct.Register(() => UnsubscribeClient(clientId));
Log.Information("Client {ClientId} subscribed to {Count} tags", clientId, addressSet.Count); Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)",
clientId, addressSet.Count, newTags.Count);
return channel.Reader; return channel.Reader;
} }
private async Task CreateMxAccessSubscriptionsAsync(List<string> addresses)
{
try
{
var handle = await _scadaClient.SubscribeAsync(
addresses,
(address, vtq) => OnTagValueChanged(address, vtq));
// Store handle for each address so we can dispose per-tag
foreach (var address in addresses)
{
_mxAccessHandles[address] = handle;
}
}
catch (Exception ex)
{
Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags", addresses.Count);
}
}
/// <summary> /// <summary>
/// Called from MxAccessClient's OnDataChange handler. /// Called from MxAccessClient's OnDataChange handler.
/// Fans out the update to all subscribed clients. /// Fans out the update to all subscribed clients.
@@ -135,6 +170,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) if (!_clientSubscriptions.TryRemove(clientId, out var clientSub))
return; return;
var tagsToDispose = new List<string>();
_rwLock.EnterWriteLock(); _rwLock.EnterWriteLock();
try try
{ {
@@ -148,6 +185,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
if (tagSub.ClientIds.Count == 0) if (tagSub.ClientIds.Count == 0)
{ {
_tagSubscriptions.TryRemove(address, out _); _tagSubscriptions.TryRemove(address, out _);
tagsToDispose.Add(address);
} }
} }
} }
@@ -157,6 +195,22 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
_rwLock.ExitWriteLock(); _rwLock.ExitWriteLock();
} }
// Dispose MxAccess handles for tags with no remaining clients
foreach (var address in tagsToDispose)
{
if (_mxAccessHandles.TryRemove(address, out var handle))
{
try
{
handle.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error disposing MxAccess subscription for {Address}", address);
}
}
}
// Complete the channel (signals end of stream to the gRPC handler) // Complete the channel (signals end of stream to the gRPC handler)
clientSub.Channel.Writer.TryComplete(); clientSub.Channel.Writer.TryComplete();