using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Serilog; using ZB.MOM.WW.LmxProxy.Host.Configuration; using ZB.MOM.WW.LmxProxy.Host.Domain; namespace ZB.MOM.WW.LmxProxy.Host.Services { /// /// Manages subscriptions for multiple gRPC clients, handling tag subscriptions, message delivery, and client /// statistics. /// public class SubscriptionManager : IDisposable { private static readonly ILogger Logger = Log.ForContext(); // Configuration for channel buffering private readonly int _channelCapacity; private readonly BoundedChannelFullMode _channelFullMode; private readonly ConcurrentDictionary _clientSubscriptions = new(); private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion); private readonly IScadaClient _scadaClient; private readonly ConcurrentDictionary _tagSubscriptions = new(); private bool _disposed; /// /// Initializes a new instance of the class. /// /// The SCADA client to use for subscriptions. /// The subscription configuration. /// /// Thrown if or /// is null. /// public SubscriptionManager(IScadaClient scadaClient, SubscriptionConfiguration configuration) { _scadaClient = scadaClient ?? throw new ArgumentNullException(nameof(scadaClient)); SubscriptionConfiguration configuration1 = configuration ?? throw new ArgumentNullException(nameof(configuration)); _channelCapacity = configuration1.ChannelCapacity; _channelFullMode = ParseChannelFullMode(configuration1.ChannelFullMode); // Subscribe to connection state changes _scadaClient.ConnectionStateChanged += OnConnectionStateChanged; Logger.Information("SubscriptionManager initialized with channel capacity: {Capacity}, full mode: {Mode}", _channelCapacity, _channelFullMode); } /// /// Disposes the , unsubscribing all clients and cleaning up resources. /// public void Dispose() { if (_disposed) { return; } _disposed = true; Logger.Information("Disposing SubscriptionManager"); // Unsubscribe from connection state changes _scadaClient.ConnectionStateChanged -= OnConnectionStateChanged; // Unsubscribe all clients var clientIds = _clientSubscriptions.Keys.ToList(); foreach (string? clientId in clientIds) { UnsubscribeClient(clientId); } _clientSubscriptions.Clear(); _tagSubscriptions.Clear(); // Dispose the lock _lock?.Dispose(); } /// /// Gets the number of active client subscriptions. /// public virtual int GetActiveSubscriptionCount() => _clientSubscriptions.Count; /// /// Parses the channel full mode string to . /// /// The mode string. /// The parsed value. private static BoundedChannelFullMode ParseChannelFullMode(string mode) { return mode?.ToUpperInvariant() switch { "DROPOLDEST" => BoundedChannelFullMode.DropOldest, "DROPNEWEST" => BoundedChannelFullMode.DropNewest, "WAIT" => BoundedChannelFullMode.Wait, _ => BoundedChannelFullMode.DropOldest // Default }; } /// /// Creates a new subscription for a client to a set of tag addresses. /// /// The client identifier. /// The tag addresses to subscribe to. /// Optional cancellation token. /// A channel for receiving tag updates. /// Thrown if the manager is disposed. public async Task> SubscribeAsync( string clientId, IEnumerable addresses, CancellationToken ct = default) { if (_disposed) { throw new ObjectDisposedException(nameof(SubscriptionManager)); } var addressList = addresses.ToList(); Logger.Information("Client {ClientId} subscribing to {Count} tags", clientId, addressList.Count); // Create a bounded channel for this client with buffering var channel = Channel.CreateBounded<(string address, Vtq vtq)>(new BoundedChannelOptions(_channelCapacity) { FullMode = _channelFullMode, SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false }); Logger.Debug("Created bounded channel for client {ClientId} with capacity {Capacity}", clientId, _channelCapacity); var clientSubscription = new ClientSubscription { ClientId = clientId, Channel = channel, Addresses = new HashSet(addressList), CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct) }; _clientSubscriptions[clientId] = clientSubscription; // Subscribe to each tag foreach (string? address in addressList) { await SubscribeToTagAsync(address, clientId); } // Handle client disconnection clientSubscription.CancellationTokenSource.Token.Register(() => { Logger.Information("Client {ClientId} disconnected, cleaning up subscriptions", clientId); UnsubscribeClient(clientId); }); return channel; } /// /// Unsubscribes a client from all tags and cleans up resources. /// /// The client identifier. public void UnsubscribeClient(string clientId) { if (_clientSubscriptions.TryRemove(clientId, out ClientSubscription? clientSubscription)) { Logger.Information( "Unsubscribing client {ClientId} from {Count} tags. Stats: Delivered={Delivered}, Dropped={Dropped}", clientId, clientSubscription.Addresses.Count, clientSubscription.DeliveredMessageCount, clientSubscription.DroppedMessageCount); _lock.EnterWriteLock(); try { foreach (string? address in clientSubscription.Addresses) { if (_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription)) { tagSubscription.ClientIds.Remove(clientId); // If no more clients are subscribed to this tag, unsubscribe from SCADA if (tagSubscription.ClientIds.Count == 0) { Logger.Information( "No more clients subscribed to {Address}, removing SCADA subscription", address); _tagSubscriptions.TryRemove(address, out _); // Dispose the SCADA subscription Task.Run(async () => { try { if (tagSubscription.ScadaSubscription != null) { await tagSubscription.ScadaSubscription.DisposeAsync(); Logger.Debug("Successfully disposed SCADA subscription for {Address}", address); } } catch (Exception ex) { Logger.Error(ex, "Error disposing SCADA subscription for {Address}", address); } }); } else { Logger.Debug( "Client {ClientId} removed from {Address} subscription (remaining clients: {Count})", clientId, address, tagSubscription.ClientIds.Count); } } } } finally { _lock.ExitWriteLock(); } // Complete the channel clientSubscription.Channel.Writer.TryComplete(); clientSubscription.CancellationTokenSource.Dispose(); } } /// /// Subscribes a client to a tag address, creating a new SCADA subscription if needed. /// /// The tag address. /// The client identifier. private async Task SubscribeToTagAsync(string address, string clientId) { bool needsSubscription; TagSubscription? tagSubscription; _lock.EnterWriteLock(); try { if (_tagSubscriptions.TryGetValue(address, out TagSubscription? existingSubscription)) { // Tag is already subscribed, just add this client existingSubscription.ClientIds.Add(clientId); Logger.Debug( "Client {ClientId} added to existing subscription for {Address} (total clients: {Count})", clientId, address, existingSubscription.ClientIds.Count); return; } // Create new tag subscription and reserve the spot tagSubscription = new TagSubscription { Address = address, ClientIds = new HashSet { clientId } }; _tagSubscriptions[address] = tagSubscription; needsSubscription = true; } finally { _lock.ExitWriteLock(); } if (needsSubscription && tagSubscription != null) { // Subscribe to SCADA outside of lock to avoid blocking Logger.Debug("Creating new SCADA subscription for {Address}", address); try { IAsyncDisposable scadaSubscription = await _scadaClient.SubscribeAsync( new[] { address }, (addr, vtq) => OnTagValueChanged(addr, vtq), CancellationToken.None); _lock.EnterWriteLock(); try { tagSubscription.ScadaSubscription = scadaSubscription; } finally { _lock.ExitWriteLock(); } Logger.Information("Successfully subscribed to {Address} for client {ClientId}", address, clientId); } catch (Exception ex) { Logger.Error(ex, "Failed to subscribe to {Address}", address); // Remove the failed subscription _lock.EnterWriteLock(); try { _tagSubscriptions.TryRemove(address, out _); } finally { _lock.ExitWriteLock(); } throw; } } } /// /// Handles tag value changes and delivers updates to all subscribed clients. /// /// The tag address. /// The value, timestamp, and quality. private void OnTagValueChanged(string address, Vtq vtq) { Logger.Debug("Tag value changed: {Address} = {Vtq}", address, vtq); _lock.EnterReadLock(); try { if (!_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription)) { Logger.Warning("Received update for untracked tag {Address}", address); return; } // Send update to all subscribed clients // Use the existing collection directly without ToList() since we're in a read lock foreach (string? clientId in tagSubscription.ClientIds) { if (_clientSubscriptions.TryGetValue(clientId, out ClientSubscription? clientSubscription)) { try { if (!clientSubscription.Channel.Writer.TryWrite((address, vtq))) { // Channel is full - with DropOldest mode, this should rarely happen Logger.Warning( "Channel full for client {ClientId}, dropping message for {Address}. Consider increasing buffer size.", clientId, address); clientSubscription.DroppedMessageCount++; } else { clientSubscription.DeliveredMessageCount++; } } catch (InvalidOperationException ex) when (ex.Message.Contains("closed")) { Logger.Debug("Channel closed for client {ClientId}, removing subscription", clientId); // Schedule cleanup of disconnected client Task.Run(() => UnsubscribeClient(clientId)); } catch (Exception ex) { Logger.Error(ex, "Error sending update to client {ClientId}", clientId); } } } } finally { _lock.ExitReadLock(); } } /// /// Gets current subscription statistics for all clients and tags. /// /// A object containing statistics. public virtual SubscriptionStats GetSubscriptionStats() { _lock.EnterReadLock(); try { var tagClientCounts = _tagSubscriptions.ToDictionary( kvp => kvp.Key, kvp => kvp.Value.ClientIds.Count); var clientStats = _clientSubscriptions.ToDictionary( kvp => kvp.Key, kvp => new ClientStats { SubscribedTags = kvp.Value.Addresses.Count, DeliveredMessages = kvp.Value.DeliveredMessageCount, DroppedMessages = kvp.Value.DroppedMessageCount }); return new SubscriptionStats { TotalClients = _clientSubscriptions.Count, TotalTags = _tagSubscriptions.Count, TagClientCounts = tagClientCounts, ClientStats = clientStats }; } finally { _lock.ExitReadLock(); } } /// /// Handles SCADA client connection state changes and notifies clients of disconnection. /// /// The event sender. /// The connection state change event arguments. private void OnConnectionStateChanged(object? sender, ConnectionStateChangedEventArgs e) { Logger.Information("Connection state changed from {Previous} to {Current}", e.PreviousState, e.CurrentState); // If we're disconnected, notify all subscribed clients with bad quality if (e.CurrentState != ConnectionState.Connected) { Task.Run(async () => { try { await NotifyAllClientsOfDisconnection(); } catch (Exception ex) { Logger.Error(ex, "Error notifying clients of disconnection"); } }); } } /// /// Notifies all clients of a SCADA disconnection by sending bad quality updates. /// private async Task NotifyAllClientsOfDisconnection() { Logger.Information("Notifying all clients of disconnection"); var badQualityVtq = new Vtq(null, DateTime.UtcNow, Quality.Bad); // Get all unique addresses being subscribed to var allAddresses = _tagSubscriptions.Keys.ToList(); // Send bad quality update for each address to all subscribed clients foreach (string? address in allAddresses) { if (_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription)) { var clientIds = tagSubscription.ClientIds.ToList(); foreach (string? clientId in clientIds) { if (_clientSubscriptions.TryGetValue(clientId, out ClientSubscription? clientSubscription)) { try { await clientSubscription.Channel.Writer.WriteAsync((address, badQualityVtq)); Logger.Debug("Sent bad quality notification for {Address} to client {ClientId}", address, clientId); } catch (Exception ex) { Logger.Warning(ex, "Failed to send bad quality notification to client {ClientId}", clientId); } } } } } } /// /// Represents a client's subscription, including channel, addresses, and statistics. /// private class ClientSubscription { /// /// Gets or sets the client identifier. /// public string ClientId { get; set; } = string.Empty; /// /// Gets or sets the channel for delivering tag updates. /// public Channel<(string address, Vtq vtq)> Channel { get; set; } = null!; /// /// Gets or sets the set of addresses the client is subscribed to. /// public HashSet Addresses { get; set; } = new(); /// /// Gets or sets the cancellation token source for the client. /// public CancellationTokenSource CancellationTokenSource { get; set; } = null!; /// /// Gets or sets the count of delivered messages. /// public long DeliveredMessageCount { get; set; } /// /// Gets or sets the count of dropped messages. /// public long DroppedMessageCount { get; set; } } /// /// Represents a tag subscription, including address, client IDs, and SCADA subscription handle. /// private class TagSubscription { /// /// Gets or sets the tag address. /// public string Address { get; set; } = string.Empty; /// /// Gets or sets the set of client IDs subscribed to this tag. /// public HashSet ClientIds { get; set; } = new(); /// /// Gets or sets the SCADA subscription handle. /// public IAsyncDisposable ScadaSubscription { get; set; } = null!; } } }