using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; using Serilog; using ZB.MOM.WW.LmxProxy.Host.Domain; namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions { /// /// Manages per-client subscription channels with shared MxAccess subscriptions. /// Ref-counted tag subscriptions: first client creates, last client disposes. /// public sealed class SubscriptionManager : IDisposable { private static readonly ILogger Log = Serilog.Log.ForContext(); private readonly IScadaClient _scadaClient; private readonly int _channelCapacity; private readonly BoundedChannelFullMode _channelFullMode; // Client ID -> ClientSubscription private readonly ConcurrentDictionary _clientSubscriptions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); // Tag address -> TagSubscription (shared, ref-counted) private readonly ConcurrentDictionary _tagSubscriptions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000, BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest) { _scadaClient = scadaClient; _channelCapacity = channelCapacity; _channelFullMode = channelFullMode; } /// /// Creates a subscription for a client. Returns a ChannelReader to stream from. /// public ChannelReader<(string address, Vtq vtq)> Subscribe( string clientId, IEnumerable addresses, CancellationToken ct) { var channel = Channel.CreateBounded<(string address, Vtq vtq)>( new BoundedChannelOptions(_channelCapacity) { FullMode = _channelFullMode, SingleReader = true, SingleWriter = false }); var addressSet = new HashSet(addresses, StringComparer.OrdinalIgnoreCase); var clientSub = new ClientSubscription(clientId, channel, addressSet); _clientSubscriptions[clientId] = clientSub; _rwLock.EnterWriteLock(); try { foreach (var address in addressSet) { if (_tagSubscriptions.TryGetValue(address, out var tagSub)) { tagSub.ClientIds.Add(clientId); } else { _tagSubscriptions[address] = new TagSubscription(address, new HashSet(StringComparer.OrdinalIgnoreCase) { clientId }); } } } finally { _rwLock.ExitWriteLock(); } // Register cancellation cleanup ct.Register(() => UnsubscribeClient(clientId)); Log.Information("Client {ClientId} subscribed to {Count} tags", clientId, addressSet.Count); return channel.Reader; } /// /// Called from MxAccessClient's OnDataChange handler. /// Fans out the update to all subscribed clients. /// public void OnTagValueChanged(string address, Vtq vtq) { _rwLock.EnterReadLock(); HashSet? clientIds = null; try { if (_tagSubscriptions.TryGetValue(address, out var tagSub)) { clientIds = new HashSet(tagSub.ClientIds); } } finally { _rwLock.ExitReadLock(); } if (clientIds == null || clientIds.Count == 0) return; foreach (var clientId in clientIds) { if (_clientSubscriptions.TryGetValue(clientId, out var clientSub)) { if (!clientSub.Channel.Writer.TryWrite((address, vtq))) { clientSub.IncrementDropped(); Log.Debug("Dropped message for client {ClientId} on tag {Address} (channel full)", clientId, address); } else { clientSub.IncrementDelivered(); } } } } /// /// Removes a client's subscriptions and cleans up tag subscriptions /// when the last client unsubscribes. /// public void UnsubscribeClient(string clientId) { if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) return; _rwLock.EnterWriteLock(); try { foreach (var address in clientSub.Addresses) { if (_tagSubscriptions.TryGetValue(address, out var tagSub)) { tagSub.ClientIds.Remove(clientId); // Last client unsubscribed — remove the tag subscription if (tagSub.ClientIds.Count == 0) { _tagSubscriptions.TryRemove(address, out _); } } } } finally { _rwLock.ExitWriteLock(); } // Complete the channel (signals end of stream to the gRPC handler) clientSub.Channel.Writer.TryComplete(); Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)", clientId, clientSub.DeliveredCount, clientSub.DroppedCount); } /// /// Sends a bad-quality notification to all subscribed clients for all their tags. /// Called when MxAccess disconnects. /// public void NotifyDisconnection() { var badVtq = Vtq.New(null, Quality.Bad_NotConnected); foreach (var kvp in _clientSubscriptions) { foreach (var address in kvp.Value.Addresses) { kvp.Value.Channel.Writer.TryWrite((address, badVtq)); } } } /// Returns subscription statistics. public SubscriptionStats GetStats() { return new SubscriptionStats( _clientSubscriptions.Count, _tagSubscriptions.Count, _clientSubscriptions.Values.Sum(c => c.Addresses.Count)); } public void Dispose() { foreach (var kvp in _clientSubscriptions) { kvp.Value.Channel.Writer.TryComplete(); } _clientSubscriptions.Clear(); _tagSubscriptions.Clear(); _rwLock.Dispose(); } // ── Nested types ───────────────────────────────────────── private class ClientSubscription { public ClientSubscription(string clientId, Channel<(string address, Vtq vtq)> channel, HashSet addresses) { ClientId = clientId; Channel = channel; Addresses = addresses; } public string ClientId { get; } public Channel<(string address, Vtq vtq)> Channel { get; } public HashSet Addresses { get; } // Use backing fields for Interlocked private long _delivered; private long _dropped; public long DeliveredCount => Interlocked.Read(ref _delivered); public long DroppedCount => Interlocked.Read(ref _dropped); public void IncrementDelivered() => Interlocked.Increment(ref _delivered); public void IncrementDropped() => Interlocked.Increment(ref _dropped); } private class TagSubscription { public TagSubscription(string address, HashSet clientIds) { Address = address; ClientIds = clientIds; } public string Address { get; } public HashSet ClientIds { get; } } } }