feat(lmxproxy): phase 1 — v2 protocol types and domain model
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,535 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.LmxProxy.Host.Configuration;
|
||||
using ZB.MOM.WW.LmxProxy.Host.Domain;
|
||||
|
||||
namespace ZB.MOM.WW.LmxProxy.Host.Services
|
||||
{
|
||||
/// <summary>
|
||||
/// Manages subscriptions for multiple gRPC clients, handling tag subscriptions, message delivery, and client
|
||||
/// statistics.
|
||||
/// </summary>
|
||||
public class SubscriptionManager : IDisposable
|
||||
{
|
||||
private static readonly ILogger Logger = Log.ForContext<SubscriptionManager>();
|
||||
|
||||
// Configuration for channel buffering
|
||||
private readonly int _channelCapacity;
|
||||
private readonly BoundedChannelFullMode _channelFullMode;
|
||||
private readonly ConcurrentDictionary<string, ClientSubscription> _clientSubscriptions = new();
|
||||
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion);
|
||||
|
||||
private readonly IScadaClient _scadaClient;
|
||||
private readonly ConcurrentDictionary<string, TagSubscription> _tagSubscriptions = new();
|
||||
private bool _disposed;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="SubscriptionManager" /> class.
|
||||
/// </summary>
|
||||
/// <param name="scadaClient">The SCADA client to use for subscriptions.</param>
|
||||
/// <param name="configuration">The subscription configuration.</param>
|
||||
/// <exception cref="ArgumentNullException">
|
||||
/// Thrown if <paramref name="scadaClient" /> or <paramref name="configuration" />
|
||||
/// is null.
|
||||
/// </exception>
|
||||
public SubscriptionManager(IScadaClient scadaClient, SubscriptionConfiguration configuration)
|
||||
{
|
||||
_scadaClient = scadaClient ?? throw new ArgumentNullException(nameof(scadaClient));
|
||||
SubscriptionConfiguration configuration1 =
|
||||
configuration ?? throw new ArgumentNullException(nameof(configuration));
|
||||
|
||||
_channelCapacity = configuration1.ChannelCapacity;
|
||||
_channelFullMode = ParseChannelFullMode(configuration1.ChannelFullMode);
|
||||
|
||||
// Subscribe to connection state changes
|
||||
_scadaClient.ConnectionStateChanged += OnConnectionStateChanged;
|
||||
|
||||
Logger.Information("SubscriptionManager initialized with channel capacity: {Capacity}, full mode: {Mode}",
|
||||
_channelCapacity, _channelFullMode);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Disposes the <see cref="SubscriptionManager" />, unsubscribing all clients and cleaning up resources.
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
|
||||
Logger.Information("Disposing SubscriptionManager");
|
||||
|
||||
// Unsubscribe from connection state changes
|
||||
_scadaClient.ConnectionStateChanged -= OnConnectionStateChanged;
|
||||
|
||||
// Unsubscribe all clients
|
||||
var clientIds = _clientSubscriptions.Keys.ToList();
|
||||
foreach (string? clientId in clientIds)
|
||||
{
|
||||
UnsubscribeClient(clientId);
|
||||
}
|
||||
|
||||
_clientSubscriptions.Clear();
|
||||
_tagSubscriptions.Clear();
|
||||
|
||||
// Dispose the lock
|
||||
_lock?.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the number of active client subscriptions.
|
||||
/// </summary>
|
||||
public virtual int GetActiveSubscriptionCount() => _clientSubscriptions.Count;
|
||||
|
||||
/// <summary>
|
||||
/// Parses the channel full mode string to <see cref="BoundedChannelFullMode" />.
|
||||
/// </summary>
|
||||
/// <param name="mode">The mode string.</param>
|
||||
/// <returns>The parsed <see cref="BoundedChannelFullMode" /> value.</returns>
|
||||
private static BoundedChannelFullMode ParseChannelFullMode(string mode)
|
||||
{
|
||||
return mode?.ToUpperInvariant() switch
|
||||
{
|
||||
"DROPOLDEST" => BoundedChannelFullMode.DropOldest,
|
||||
"DROPNEWEST" => BoundedChannelFullMode.DropNewest,
|
||||
"WAIT" => BoundedChannelFullMode.Wait,
|
||||
_ => BoundedChannelFullMode.DropOldest // Default
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new subscription for a client to a set of tag addresses.
|
||||
/// </summary>
|
||||
/// <param name="clientId">The client identifier.</param>
|
||||
/// <param name="addresses">The tag addresses to subscribe to.</param>
|
||||
/// <param name="ct">Optional cancellation token.</param>
|
||||
/// <returns>A channel for receiving tag updates.</returns>
|
||||
/// <exception cref="ObjectDisposedException">Thrown if the manager is disposed.</exception>
|
||||
public async Task<Channel<(string address, Vtq vtq)>> SubscribeAsync(
|
||||
string clientId,
|
||||
IEnumerable<string> addresses,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
throw new ObjectDisposedException(nameof(SubscriptionManager));
|
||||
}
|
||||
|
||||
var addressList = addresses.ToList();
|
||||
Logger.Information("Client {ClientId} subscribing to {Count} tags", clientId, addressList.Count);
|
||||
|
||||
// Create a bounded channel for this client with buffering
|
||||
var channel = Channel.CreateBounded<(string address, Vtq vtq)>(new BoundedChannelOptions(_channelCapacity)
|
||||
{
|
||||
FullMode = _channelFullMode,
|
||||
SingleReader = true,
|
||||
SingleWriter = false,
|
||||
AllowSynchronousContinuations = false
|
||||
});
|
||||
|
||||
Logger.Debug("Created bounded channel for client {ClientId} with capacity {Capacity}", clientId,
|
||||
_channelCapacity);
|
||||
|
||||
var clientSubscription = new ClientSubscription
|
||||
{
|
||||
ClientId = clientId,
|
||||
Channel = channel,
|
||||
Addresses = new HashSet<string>(addressList),
|
||||
CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ct)
|
||||
};
|
||||
|
||||
_clientSubscriptions[clientId] = clientSubscription;
|
||||
|
||||
// Subscribe to each tag
|
||||
foreach (string? address in addressList)
|
||||
{
|
||||
await SubscribeToTagAsync(address, clientId);
|
||||
}
|
||||
|
||||
// Handle client disconnection
|
||||
clientSubscription.CancellationTokenSource.Token.Register(() =>
|
||||
{
|
||||
Logger.Information("Client {ClientId} disconnected, cleaning up subscriptions", clientId);
|
||||
UnsubscribeClient(clientId);
|
||||
});
|
||||
|
||||
return channel;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribes a client from all tags and cleans up resources.
|
||||
/// </summary>
|
||||
/// <param name="clientId">The client identifier.</param>
|
||||
public void UnsubscribeClient(string clientId)
|
||||
{
|
||||
if (_clientSubscriptions.TryRemove(clientId, out ClientSubscription? clientSubscription))
|
||||
{
|
||||
Logger.Information(
|
||||
"Unsubscribing client {ClientId} from {Count} tags. Stats: Delivered={Delivered}, Dropped={Dropped}",
|
||||
clientId, clientSubscription.Addresses.Count,
|
||||
clientSubscription.DeliveredMessageCount, clientSubscription.DroppedMessageCount);
|
||||
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
foreach (string? address in clientSubscription.Addresses)
|
||||
{
|
||||
if (_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription))
|
||||
{
|
||||
tagSubscription.ClientIds.Remove(clientId);
|
||||
|
||||
// If no more clients are subscribed to this tag, unsubscribe from SCADA
|
||||
if (tagSubscription.ClientIds.Count == 0)
|
||||
{
|
||||
Logger.Information(
|
||||
"No more clients subscribed to {Address}, removing SCADA subscription", address);
|
||||
|
||||
_tagSubscriptions.TryRemove(address, out _);
|
||||
|
||||
// Dispose the SCADA subscription
|
||||
Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
if (tagSubscription.ScadaSubscription != null)
|
||||
{
|
||||
await tagSubscription.ScadaSubscription.DisposeAsync();
|
||||
Logger.Debug("Successfully disposed SCADA subscription for {Address}",
|
||||
address);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.Error(ex, "Error disposing SCADA subscription for {Address}", address);
|
||||
}
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
Logger.Debug(
|
||||
"Client {ClientId} removed from {Address} subscription (remaining clients: {Count})",
|
||||
clientId, address, tagSubscription.ClientIds.Count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitWriteLock();
|
||||
}
|
||||
|
||||
// Complete the channel
|
||||
clientSubscription.Channel.Writer.TryComplete();
|
||||
clientSubscription.CancellationTokenSource.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Subscribes a client to a tag address, creating a new SCADA subscription if needed.
|
||||
/// </summary>
|
||||
/// <param name="address">The tag address.</param>
|
||||
/// <param name="clientId">The client identifier.</param>
|
||||
private async Task SubscribeToTagAsync(string address, string clientId)
|
||||
{
|
||||
bool needsSubscription;
|
||||
TagSubscription? tagSubscription;
|
||||
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_tagSubscriptions.TryGetValue(address, out TagSubscription? existingSubscription))
|
||||
{
|
||||
// Tag is already subscribed, just add this client
|
||||
existingSubscription.ClientIds.Add(clientId);
|
||||
Logger.Debug(
|
||||
"Client {ClientId} added to existing subscription for {Address} (total clients: {Count})",
|
||||
clientId, address, existingSubscription.ClientIds.Count);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create new tag subscription and reserve the spot
|
||||
tagSubscription = new TagSubscription
|
||||
{
|
||||
Address = address,
|
||||
ClientIds = new HashSet<string> { clientId }
|
||||
};
|
||||
_tagSubscriptions[address] = tagSubscription;
|
||||
needsSubscription = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitWriteLock();
|
||||
}
|
||||
|
||||
if (needsSubscription && tagSubscription != null)
|
||||
{
|
||||
// Subscribe to SCADA outside of lock to avoid blocking
|
||||
Logger.Debug("Creating new SCADA subscription for {Address}", address);
|
||||
|
||||
try
|
||||
{
|
||||
IAsyncDisposable scadaSubscription = await _scadaClient.SubscribeAsync(
|
||||
new[] { address },
|
||||
(addr, vtq) => OnTagValueChanged(addr, vtq),
|
||||
CancellationToken.None);
|
||||
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
tagSubscription.ScadaSubscription = scadaSubscription;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitWriteLock();
|
||||
}
|
||||
|
||||
Logger.Information("Successfully subscribed to {Address} for client {ClientId}", address, clientId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.Error(ex, "Failed to subscribe to {Address}", address);
|
||||
|
||||
// Remove the failed subscription
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_tagSubscriptions.TryRemove(address, out _);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitWriteLock();
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles tag value changes and delivers updates to all subscribed clients.
|
||||
/// </summary>
|
||||
/// <param name="address">The tag address.</param>
|
||||
/// <param name="vtq">The value, timestamp, and quality.</param>
|
||||
private void OnTagValueChanged(string address, Vtq vtq)
|
||||
{
|
||||
Logger.Debug("Tag value changed: {Address} = {Vtq}", address, vtq);
|
||||
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (!_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription))
|
||||
{
|
||||
Logger.Warning("Received update for untracked tag {Address}", address);
|
||||
return;
|
||||
}
|
||||
|
||||
// Send update to all subscribed clients
|
||||
// Use the existing collection directly without ToList() since we're in a read lock
|
||||
foreach (string? clientId in tagSubscription.ClientIds)
|
||||
{
|
||||
if (_clientSubscriptions.TryGetValue(clientId, out ClientSubscription? clientSubscription))
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!clientSubscription.Channel.Writer.TryWrite((address, vtq)))
|
||||
{
|
||||
// Channel is full - with DropOldest mode, this should rarely happen
|
||||
Logger.Warning(
|
||||
"Channel full for client {ClientId}, dropping message for {Address}. Consider increasing buffer size.",
|
||||
clientId, address);
|
||||
clientSubscription.DroppedMessageCount++;
|
||||
}
|
||||
else
|
||||
{
|
||||
clientSubscription.DeliveredMessageCount++;
|
||||
}
|
||||
}
|
||||
catch (InvalidOperationException ex) when (ex.Message.Contains("closed"))
|
||||
{
|
||||
Logger.Debug("Channel closed for client {ClientId}, removing subscription", clientId);
|
||||
// Schedule cleanup of disconnected client
|
||||
Task.Run(() => UnsubscribeClient(clientId));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.Error(ex, "Error sending update to client {ClientId}", clientId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets current subscription statistics for all clients and tags.
|
||||
/// </summary>
|
||||
/// <returns>A <see cref="SubscriptionStats" /> object containing statistics.</returns>
|
||||
public virtual SubscriptionStats GetSubscriptionStats()
|
||||
{
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
var tagClientCounts = _tagSubscriptions.ToDictionary(
|
||||
kvp => kvp.Key,
|
||||
kvp => kvp.Value.ClientIds.Count);
|
||||
|
||||
var clientStats = _clientSubscriptions.ToDictionary(
|
||||
kvp => kvp.Key,
|
||||
kvp => new ClientStats
|
||||
{
|
||||
SubscribedTags = kvp.Value.Addresses.Count,
|
||||
DeliveredMessages = kvp.Value.DeliveredMessageCount,
|
||||
DroppedMessages = kvp.Value.DroppedMessageCount
|
||||
});
|
||||
|
||||
return new SubscriptionStats
|
||||
{
|
||||
TotalClients = _clientSubscriptions.Count,
|
||||
TotalTags = _tagSubscriptions.Count,
|
||||
TagClientCounts = tagClientCounts,
|
||||
ClientStats = clientStats
|
||||
};
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles SCADA client connection state changes and notifies clients of disconnection.
|
||||
/// </summary>
|
||||
/// <param name="sender">The event sender.</param>
|
||||
/// <param name="e">The connection state change event arguments.</param>
|
||||
private void OnConnectionStateChanged(object? sender, ConnectionStateChangedEventArgs e)
|
||||
{
|
||||
Logger.Information("Connection state changed from {Previous} to {Current}",
|
||||
e.PreviousState, e.CurrentState);
|
||||
|
||||
// If we're disconnected, notify all subscribed clients with bad quality
|
||||
if (e.CurrentState != ConnectionState.Connected)
|
||||
{
|
||||
Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await NotifyAllClientsOfDisconnection();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.Error(ex, "Error notifying clients of disconnection");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Notifies all clients of a SCADA disconnection by sending bad quality updates.
|
||||
/// </summary>
|
||||
private async Task NotifyAllClientsOfDisconnection()
|
||||
{
|
||||
Logger.Information("Notifying all clients of disconnection");
|
||||
|
||||
var badQualityVtq = new Vtq(null, DateTime.UtcNow, Quality.Bad);
|
||||
|
||||
// Get all unique addresses being subscribed to
|
||||
var allAddresses = _tagSubscriptions.Keys.ToList();
|
||||
|
||||
// Send bad quality update for each address to all subscribed clients
|
||||
foreach (string? address in allAddresses)
|
||||
{
|
||||
if (_tagSubscriptions.TryGetValue(address, out TagSubscription? tagSubscription))
|
||||
{
|
||||
var clientIds = tagSubscription.ClientIds.ToList();
|
||||
|
||||
foreach (string? clientId in clientIds)
|
||||
{
|
||||
if (_clientSubscriptions.TryGetValue(clientId, out ClientSubscription? clientSubscription))
|
||||
{
|
||||
try
|
||||
{
|
||||
await clientSubscription.Channel.Writer.WriteAsync((address, badQualityVtq));
|
||||
Logger.Debug("Sent bad quality notification for {Address} to client {ClientId}",
|
||||
address, clientId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.Warning(ex, "Failed to send bad quality notification to client {ClientId}",
|
||||
clientId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Represents a client's subscription, including channel, addresses, and statistics.
|
||||
/// </summary>
|
||||
private class ClientSubscription
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets or sets the client identifier.
|
||||
/// </summary>
|
||||
public string ClientId { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the channel for delivering tag updates.
|
||||
/// </summary>
|
||||
public Channel<(string address, Vtq vtq)> Channel { get; set; } = null!;
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the set of addresses the client is subscribed to.
|
||||
/// </summary>
|
||||
public HashSet<string> Addresses { get; set; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the cancellation token source for the client.
|
||||
/// </summary>
|
||||
public CancellationTokenSource CancellationTokenSource { get; set; } = null!;
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the count of delivered messages.
|
||||
/// </summary>
|
||||
public long DeliveredMessageCount { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the count of dropped messages.
|
||||
/// </summary>
|
||||
public long DroppedMessageCount { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Represents a tag subscription, including address, client IDs, and SCADA subscription handle.
|
||||
/// </summary>
|
||||
private class TagSubscription
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets or sets the tag address.
|
||||
/// </summary>
|
||||
public string Address { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the set of client IDs subscribed to this tag.
|
||||
/// </summary>
|
||||
public HashSet<string> ClientIds { get; set; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the SCADA subscription handle.
|
||||
/// </summary>
|
||||
public IAsyncDisposable ScadaSubscription { get; set; } = null!;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user