diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs index ffacd92..fab5f85 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; +using System.Threading.Tasks; using Serilog; using ZB.MOM.WW.LmxProxy.Host.Domain; @@ -29,6 +30,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions private readonly ConcurrentDictionary _tagSubscriptions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + // Tag address -> MxAccess subscription handle (for cleanup when last client unsubscribes) + private readonly ConcurrentDictionary _mxAccessHandles + = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000, @@ -58,6 +63,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _clientSubscriptions[clientId] = clientSub; + var newTags = new List(); + _rwLock.EnterWriteLock(); try { @@ -71,6 +78,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions { _tagSubscriptions[address] = new TagSubscription(address, new HashSet(StringComparer.OrdinalIgnoreCase) { clientId }); + newTags.Add(address); } } } @@ -79,13 +87,40 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _rwLock.ExitWriteLock(); } + // Create MxAccess COM subscriptions for newly subscribed tags + if (newTags.Count > 0) + { + _ = CreateMxAccessSubscriptionsAsync(newTags); + } + // Register cancellation cleanup ct.Register(() => UnsubscribeClient(clientId)); - Log.Information("Client {ClientId} subscribed to {Count} tags", clientId, addressSet.Count); + Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)", + clientId, addressSet.Count, newTags.Count); return channel.Reader; } + private async Task CreateMxAccessSubscriptionsAsync(List addresses) + { + try + { + var handle = await _scadaClient.SubscribeAsync( + addresses, + (address, vtq) => OnTagValueChanged(address, vtq)); + + // Store handle for each address so we can dispose per-tag + foreach (var address in addresses) + { + _mxAccessHandles[address] = handle; + } + } + catch (Exception ex) + { + Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags", addresses.Count); + } + } + /// /// Called from MxAccessClient's OnDataChange handler. /// Fans out the update to all subscribed clients. @@ -135,6 +170,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) return; + var tagsToDispose = new List(); + _rwLock.EnterWriteLock(); try { @@ -148,6 +185,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions if (tagSub.ClientIds.Count == 0) { _tagSubscriptions.TryRemove(address, out _); + tagsToDispose.Add(address); } } } @@ -157,6 +195,22 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _rwLock.ExitWriteLock(); } + // Dispose MxAccess handles for tags with no remaining clients + foreach (var address in tagsToDispose) + { + if (_mxAccessHandles.TryRemove(address, out var handle)) + { + try + { + handle.DisposeAsync().AsTask().GetAwaiter().GetResult(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error disposing MxAccess subscription for {Address}", address); + } + } + } + // Complete the channel (signals end of stream to the gRPC handler) clientSub.Channel.Writer.TryComplete();