From 7bed4b901a3752cd4d144aba15d3847efcc6d577 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Mar 2026 04:46:15 -0400 Subject: [PATCH] fix(lmxproxy): wire MxAccess COM subscriptions in SubscriptionManager SubscriptionManager tracked client-to-tag routing but never called MxAccessClient.SubscribeAsync to create the actual COM subscriptions, so OnDataChange never fired. Now creates MxAccess subscriptions for new tags and disposes them when the last client unsubscribes. All 17 integration tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Subscriptions/SubscriptionManager.cs | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs index ffacd92..fab5f85 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; +using System.Threading.Tasks; using Serilog; using ZB.MOM.WW.LmxProxy.Host.Domain; @@ -29,6 +30,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions private readonly ConcurrentDictionary _tagSubscriptions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + // Tag address -> MxAccess subscription handle (for cleanup when last client unsubscribes) + private readonly ConcurrentDictionary _mxAccessHandles + = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000, @@ -58,6 +63,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _clientSubscriptions[clientId] = clientSub; + var newTags = new List(); + _rwLock.EnterWriteLock(); try { @@ -71,6 +78,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions { _tagSubscriptions[address] = new TagSubscription(address, new HashSet(StringComparer.OrdinalIgnoreCase) { clientId }); + newTags.Add(address); } } } @@ -79,13 +87,40 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _rwLock.ExitWriteLock(); } + // Create MxAccess COM subscriptions for newly subscribed tags + if (newTags.Count > 0) + { + _ = CreateMxAccessSubscriptionsAsync(newTags); + } + // Register cancellation cleanup ct.Register(() => UnsubscribeClient(clientId)); - Log.Information("Client {ClientId} subscribed to {Count} tags", clientId, addressSet.Count); + Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)", + clientId, addressSet.Count, newTags.Count); return channel.Reader; } + private async Task CreateMxAccessSubscriptionsAsync(List addresses) + { + try + { + var handle = await _scadaClient.SubscribeAsync( + addresses, + (address, vtq) => OnTagValueChanged(address, vtq)); + + // Store handle for each address so we can dispose per-tag + foreach (var address in addresses) + { + _mxAccessHandles[address] = handle; + } + } + catch (Exception ex) + { + Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags", addresses.Count); + } + } + /// /// Called from MxAccessClient's OnDataChange handler. /// Fans out the update to all subscribed clients. @@ -135,6 +170,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) return; + var tagsToDispose = new List(); + _rwLock.EnterWriteLock(); try { @@ -148,6 +185,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions if (tagSub.ClientIds.Count == 0) { _tagSubscriptions.TryRemove(address, out _); + tagsToDispose.Add(address); } } } @@ -157,6 +195,22 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _rwLock.ExitWriteLock(); } + // Dispose MxAccess handles for tags with no remaining clients + foreach (var address in tagsToDispose) + { + if (_mxAccessHandles.TryRemove(address, out var handle)) + { + try + { + handle.DisposeAsync().AsTask().GetAwaiter().GetResult(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error disposing MxAccess subscription for {Address}", address); + } + } + } + // Complete the channel (signals end of stream to the gRPC handler) clientSub.Channel.Writer.TryComplete();