536 lines
22 KiB
C#
536 lines
22 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Channels;
|
|
using System.Threading.Tasks;
|
|
using Serilog;
|
|
using ZB.MOM.WW.LmxProxy.Host.Configuration;
|
|
using ZB.MOM.WW.LmxProxy.Host.Domain;
|
|
|
|
namespace ZB.MOM.WW.LmxProxy.Host.Services
|
|
{
|
|
/// <summary>
|
|
/// Manages subscriptions for multiple gRPC clients, handling tag subscriptions, message delivery, and client
|
|
/// statistics.
|
|
/// </summary>
|
|
public class SubscriptionManager : IDisposable
|
|
{
|
|
private static readonly ILogger Logger = Log.ForContext<SubscriptionManager>();
|
|
|
|
// Configuration for channel buffering
|
|
private readonly int _channelCapacity;
|
|
private readonly BoundedChannelFullMode _channelFullMode;
|
|
private readonly ConcurrentDictionary<string, ClientSubscription> _clientSubscriptions = new();
|
|
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion);
|
|
|
|
private readonly IScadaClient _scadaClient;
|
|
private readonly ConcurrentDictionary<string, TagSubscription> _tagSubscriptions = new();
|
|
private bool _disposed;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="SubscriptionManager" /> class.
|
|
/// </summary>
|
|
/// <param name="scadaClient">The SCADA client to use for subscriptions.</param>
|
|
/// <param name="configuration">The subscription configuration.</param>
|
|
/// <exception cref="ArgumentNullException">
|
|
/// Thrown if <paramref name="scadaClient" /> or <paramref name="configuration" />
|
|
/// is null.
|
|
/// </exception>
|
|
public SubscriptionManager(IScadaClient scadaClient, SubscriptionConfiguration configuration)
|
|
{
|
|
_scadaClient = scadaClient ?? throw new ArgumentNullException(nameof(scadaClient));
|
|
SubscriptionConfiguration configuration1 =
|
|
configuration ?? throw new ArgumentNullException(nameof(configuration));
|
|
|
|
_channelCapacity = configuration1.ChannelCapacity;
|
|
_channelFullMode = ParseChannelFullMode(configuration1.ChannelFullMode);
|
|
|
|
// Subscribe to connection state changes
|
|
_scadaClient.ConnectionStateChanged += OnConnectionStateChanged;
|
|
|
|
Logger.Information("SubscriptionManager initialized with channel capacity: {Capacity}, full mode: {Mode}",
|
|
_channelCapacity, _channelFullMode);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Disposes the <see cref="SubscriptionManager" />, unsubscribing all clients and cleaning up resources.
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
if (_disposed)
|
|
{
|
|
return;
|
|
}
|
|
|
|
_disposed = true;
|
|
|
|
Logger.Information("Disposing SubscriptionManager");
|
|
|
|
// Unsubscribe from connection state changes
|
|
_scadaClient.ConnectionStateChanged -= OnConnectionStateChanged;
|
|
|
|
// Unsubscribe all clients
|
|
var clientIds = _clientSubscriptions.Keys.ToList();
|
|
foreach (string? clientId in clientIds)
|
|
{
|
|
UnsubscribeClient(clientId);
|
|
}
|
|
|
|
_clientSubscriptions.Clear();
|
|
_tagSubscriptions.Clear();
|
|
|
|
// Dispose the lock
|
|
_lock?.Dispose();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the number of active client subscriptions.
|
|
/// </summary>
|
|
public virtual int GetActiveSubscriptionCount() => _clientSubscriptions.Count;
|
|
|
|
/// <summary>
|
|
/// Parses the channel full mode string to <see cref="BoundedChannelFullMode" />.
|
|
/// </summary>
|
|
/// <param name="mode">The mode string.</param>
|
|
/// <returns>The parsed <see cref="BoundedChannelFullMode" /> value.</returns>
|
|
private static BoundedChannelFullMode ParseChannelFullMode(string mode)
|
|
{
|
|
return mode?.ToUpperInvariant() switch
|
|
{
|
|
"DROPOLDEST" => BoundedChannelFullMode.DropOldest,
|
|
"DROPNEWEST" => BoundedChannelFullMode.DropNewest,
|
|
"WAIT" => BoundedChannelFullMode.Wait,
|
|
_ => BoundedChannelFullMode.DropOldest // Default
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a new subscription for a client to a set of tag addresses.
|
|
/// </summary>
|
|
/// <param name="clientId">The client identifier.</param>
|
|
/// <param name="addresses">The tag addresses to subscribe to.</param>
|
|
/// <param name="ct">Optional cancellation token.</param>
|
|
/// <returns>A channel for receiving tag updates.</returns>
|
|
/// <exception cref="ObjectDisposedException">Thrown if the manager is disposed.</exception>
|
|
public async Task<Channel<(string address, Vtq vtq)>> SubscribeAsync(
|
|
string clientId,
|
|
IEnumerable<string> addresses,
|
|
CancellationToken ct = default)
|
|
{
|
|
if (_disposed)
|
|
{
|
|
throw new ObjectDisposedException(nameof(SubscriptionManager));
|
|
}
|
|
|
|
var addressList = addresses.ToList();
|
|
Logger.Information("Client {ClientId} subscribing to {Count} tags", clientId, addressList.Count);
|
|
|
|
// Create a bounded channel for this client with buffering
|
|
var channel = Channel.CreateBounded<(string address, Vtq vtq)>(new BoundedChannelOptions(_channelCapacity)
|
|
{
|
|
FullMode = _channelFullMode,
|
|
SingleReader = true,
|
|
SingleWriter = false,
|
|
AllowSynchronousContinuations = false
|
|
});
|
|
|
|
Logger.Debug("Created bounded channel for client {ClientId} with capacity {Capacity}", clientId,
|
|
_channelCapacity);
|
|
|
|
var clientSubscription = new ClientSubscription
|
|
{
|
|
ClientId = clientId,
|
|
Channel = channel,
|
|
Addresses = new HashSet<string>(addressList),
|
|
CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct)
|
|
};
|
|
|
|
_clientSubscriptions[clientId] = clientSubscription;
|
|
|
|
// Subscribe to each tag
|
|
foreach (string? address in addressList)
|
|
{
|
|
await SubscribeToTagAsync(address, clientId);
|
|
}
|
|
|
|
// Handle client disconnection
|
|
clientSubscription.CancellationTokenSource.Token.Register(() =>
|
|
{
|
|
Logger.Information("Client {ClientId} disconnected, cleaning up subscriptions", clientId);
|
|
UnsubscribeClient(clientId);
|
|
});
|
|
|
|
return channel;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Unsubscribes a client from all tags and cleans up resources.
|
|
/// </summary>
|
|
/// <param name="clientId">The client identifier.</param>
|
|
public void UnsubscribeClient(string clientId)
|
|
{
|
|
if (_clientSubscriptions.TryRemove(clientId, out ClientSubscription? clientSubscription))
|
|
{
|
|
Logger.Information(
|
|
"Unsubscribing client {ClientId} from {Count} tags. Stats: Delivered={Delivered}, Dropped={Dropped}",
|
|
clientId, clientSubscription.Addresses.Count,
|
|
clientSubscription.DeliveredMessageCount, clientSubscription.DroppedMessageCount);
|
|
|
|
_lock.EnterWriteLock();
|
|
try
|
|
{
|
|
foreach (string? address in clientSubscription.Addresses)
|
|
{
|
|
if (_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription))
|
|
{
|
|
tagSubscription.ClientIds.Remove(clientId);
|
|
|
|
// If no more clients are subscribed to this tag, unsubscribe from SCADA
|
|
if (tagSubscription.ClientIds.Count == 0)
|
|
{
|
|
Logger.Information(
|
|
"No more clients subscribed to {Address}, removing SCADA subscription", address);
|
|
|
|
_tagSubscriptions.TryRemove(address, out _);
|
|
|
|
// Dispose the SCADA subscription
|
|
Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
if (tagSubscription.ScadaSubscription != null)
|
|
{
|
|
await tagSubscription.ScadaSubscription.DisposeAsync();
|
|
Logger.Debug("Successfully disposed SCADA subscription for {Address}",
|
|
address);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.Error(ex, "Error disposing SCADA subscription for {Address}", address);
|
|
}
|
|
});
|
|
}
|
|
else
|
|
{
|
|
Logger.Debug(
|
|
"Client {ClientId} removed from {Address} subscription (remaining clients: {Count})",
|
|
clientId, address, tagSubscription.ClientIds.Count);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
_lock.ExitWriteLock();
|
|
}
|
|
|
|
// Complete the channel
|
|
clientSubscription.Channel.Writer.TryComplete();
|
|
clientSubscription.CancellationTokenSource.Dispose();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Subscribes a client to a tag address, creating a new SCADA subscription if needed.
|
|
/// </summary>
|
|
/// <param name="address">The tag address.</param>
|
|
/// <param name="clientId">The client identifier.</param>
|
|
private async Task SubscribeToTagAsync(string address, string clientId)
|
|
{
|
|
bool needsSubscription;
|
|
TagSubscription? tagSubscription;
|
|
|
|
_lock.EnterWriteLock();
|
|
try
|
|
{
|
|
if (_tagSubscriptions.TryGetValue(address, out TagSubscription? existingSubscription))
|
|
{
|
|
// Tag is already subscribed, just add this client
|
|
existingSubscription.ClientIds.Add(clientId);
|
|
Logger.Debug(
|
|
"Client {ClientId} added to existing subscription for {Address} (total clients: {Count})",
|
|
clientId, address, existingSubscription.ClientIds.Count);
|
|
return;
|
|
}
|
|
|
|
// Create new tag subscription and reserve the spot
|
|
tagSubscription = new TagSubscription
|
|
{
|
|
Address = address,
|
|
ClientIds = new HashSet<string> { clientId }
|
|
};
|
|
_tagSubscriptions[address] = tagSubscription;
|
|
needsSubscription = true;
|
|
}
|
|
finally
|
|
{
|
|
_lock.ExitWriteLock();
|
|
}
|
|
|
|
if (needsSubscription && tagSubscription != null)
|
|
{
|
|
// Subscribe to SCADA outside of lock to avoid blocking
|
|
Logger.Debug("Creating new SCADA subscription for {Address}", address);
|
|
|
|
try
|
|
{
|
|
IAsyncDisposable scadaSubscription = await _scadaClient.SubscribeAsync(
|
|
new[] { address },
|
|
(addr, vtq) => OnTagValueChanged(addr, vtq),
|
|
CancellationToken.None);
|
|
|
|
_lock.EnterWriteLock();
|
|
try
|
|
{
|
|
tagSubscription.ScadaSubscription = scadaSubscription;
|
|
}
|
|
finally
|
|
{
|
|
_lock.ExitWriteLock();
|
|
}
|
|
|
|
Logger.Information("Successfully subscribed to {Address} for client {ClientId}", address, clientId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.Error(ex, "Failed to subscribe to {Address}", address);
|
|
|
|
// Remove the failed subscription
|
|
_lock.EnterWriteLock();
|
|
try
|
|
{
|
|
_tagSubscriptions.TryRemove(address, out _);
|
|
}
|
|
finally
|
|
{
|
|
_lock.ExitWriteLock();
|
|
}
|
|
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles tag value changes and delivers updates to all subscribed clients.
|
|
/// </summary>
|
|
/// <param name="address">The tag address.</param>
|
|
/// <param name="vtq">The value, timestamp, and quality.</param>
|
|
private void OnTagValueChanged(string address, Vtq vtq)
|
|
{
|
|
Logger.Debug("Tag value changed: {Address} = {Vtq}", address, vtq);
|
|
|
|
_lock.EnterReadLock();
|
|
try
|
|
{
|
|
if (!_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription))
|
|
{
|
|
Logger.Warning("Received update for untracked tag {Address}", address);
|
|
return;
|
|
}
|
|
|
|
// Send update to all subscribed clients
|
|
// Use the existing collection directly without ToList() since we're in a read lock
|
|
foreach (string? clientId in tagSubscription.ClientIds)
|
|
{
|
|
if (_clientSubscriptions.TryGetValue(clientId, out ClientSubscription? clientSubscription))
|
|
{
|
|
try
|
|
{
|
|
if (!clientSubscription.Channel.Writer.TryWrite((address, vtq)))
|
|
{
|
|
// Channel is full - with DropOldest mode, this should rarely happen
|
|
Logger.Warning(
|
|
"Channel full for client {ClientId}, dropping message for {Address}. Consider increasing buffer size.",
|
|
clientId, address);
|
|
clientSubscription.DroppedMessageCount++;
|
|
}
|
|
else
|
|
{
|
|
clientSubscription.DeliveredMessageCount++;
|
|
}
|
|
}
|
|
catch (InvalidOperationException ex) when (ex.Message.Contains("closed"))
|
|
{
|
|
Logger.Debug("Channel closed for client {ClientId}, removing subscription", clientId);
|
|
// Schedule cleanup of disconnected client
|
|
Task.Run(() => UnsubscribeClient(clientId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.Error(ex, "Error sending update to client {ClientId}", clientId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
_lock.ExitReadLock();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets current subscription statistics for all clients and tags.
|
|
/// </summary>
|
|
/// <returns>A <see cref="SubscriptionStats" /> object containing statistics.</returns>
|
|
public virtual SubscriptionStats GetSubscriptionStats()
|
|
{
|
|
_lock.EnterReadLock();
|
|
try
|
|
{
|
|
var tagClientCounts = _tagSubscriptions.ToDictionary(
|
|
kvp => kvp.Key,
|
|
kvp => kvp.Value.ClientIds.Count);
|
|
|
|
var clientStats = _clientSubscriptions.ToDictionary(
|
|
kvp => kvp.Key,
|
|
kvp => new ClientStats
|
|
{
|
|
SubscribedTags = kvp.Value.Addresses.Count,
|
|
DeliveredMessages = kvp.Value.DeliveredMessageCount,
|
|
DroppedMessages = kvp.Value.DroppedMessageCount
|
|
});
|
|
|
|
return new SubscriptionStats
|
|
{
|
|
TotalClients = _clientSubscriptions.Count,
|
|
TotalTags = _tagSubscriptions.Count,
|
|
TagClientCounts = tagClientCounts,
|
|
ClientStats = clientStats
|
|
};
|
|
}
|
|
finally
|
|
{
|
|
_lock.ExitReadLock();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Handles SCADA client connection state changes and notifies clients of disconnection.
|
|
/// </summary>
|
|
/// <param name="sender">The event sender.</param>
|
|
/// <param name="e">The connection state change event arguments.</param>
|
|
private void OnConnectionStateChanged(object? sender, ConnectionStateChangedEventArgs e)
|
|
{
|
|
Logger.Information("Connection state changed from {Previous} to {Current}",
|
|
e.PreviousState, e.CurrentState);
|
|
|
|
// If we're disconnected, notify all subscribed clients with bad quality
|
|
if (e.CurrentState != ConnectionState.Connected)
|
|
{
|
|
Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await NotifyAllClientsOfDisconnection();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.Error(ex, "Error notifying clients of disconnection");
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Notifies all clients of a SCADA disconnection by sending bad quality updates.
|
|
/// </summary>
|
|
private async Task NotifyAllClientsOfDisconnection()
|
|
{
|
|
Logger.Information("Notifying all clients of disconnection");
|
|
|
|
var badQualityVtq = new Vtq(null, DateTime.UtcNow, Quality.Bad);
|
|
|
|
// Get all unique addresses being subscribed to
|
|
var allAddresses = _tagSubscriptions.Keys.ToList();
|
|
|
|
// Send bad quality update for each address to all subscribed clients
|
|
foreach (string? address in allAddresses)
|
|
{
|
|
if (_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription))
|
|
{
|
|
var clientIds = tagSubscription.ClientIds.ToList();
|
|
|
|
foreach (string? clientId in clientIds)
|
|
{
|
|
if (_clientSubscriptions.TryGetValue(clientId, out ClientSubscription? clientSubscription))
|
|
{
|
|
try
|
|
{
|
|
await clientSubscription.Channel.Writer.WriteAsync((address, badQualityVtq));
|
|
Logger.Debug("Sent bad quality notification for {Address} to client {ClientId}",
|
|
address, clientId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.Warning(ex, "Failed to send bad quality notification to client {ClientId}",
|
|
clientId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Represents a client's subscription, including channel, addresses, and statistics.
|
|
/// </summary>
|
|
private class ClientSubscription
|
|
{
|
|
/// <summary>
|
|
/// Gets or sets the client identifier.
|
|
/// </summary>
|
|
public string ClientId { get; set; } = string.Empty;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the channel for delivering tag updates.
|
|
/// </summary>
|
|
public Channel<(string address, Vtq vtq)> Channel { get; set; } = null!;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the set of addresses the client is subscribed to.
|
|
/// </summary>
|
|
public HashSet<string> Addresses { get; set; } = new();
|
|
|
|
/// <summary>
|
|
/// Gets or sets the cancellation token source for the client.
|
|
/// </summary>
|
|
public CancellationTokenSource CancellationTokenSource { get; set; } = null!;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the count of delivered messages.
|
|
/// </summary>
|
|
public long DeliveredMessageCount { get; set; }
|
|
|
|
/// <summary>
|
|
/// Gets or sets the count of dropped messages.
|
|
/// </summary>
|
|
public long DroppedMessageCount { get; set; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Represents a tag subscription, including address, client IDs, and SCADA subscription handle.
|
|
/// </summary>
|
|
private class TagSubscription
|
|
{
|
|
/// <summary>
|
|
/// Gets or sets the tag address.
|
|
/// </summary>
|
|
public string Address { get; set; } = string.Empty;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the set of client IDs subscribed to this tag.
|
|
/// </summary>
|
|
public HashSet<string> ClientIds { get; set; } = new();
|
|
|
|
/// <summary>
|
|
/// Gets or sets the SCADA subscription handle.
|
|
/// </summary>
|
|
public IAsyncDisposable ScadaSubscription { get; set; } = null!;
|
|
}
|
|
}
|
|
}
|