Files
ScadaBridge/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs
T
Joseph Doherty 7f74b660b3 feat(lmxproxy): add delivered/dropped message counts to subscription stats
Subscription metrics (totalDelivered, totalDropped) now visible in
/api/status JSON and HTML dashboard. Card turns yellow if drops > 0.
Aggregated from per-client counters in SubscriptionManager.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 00:07:58 -04:00

313 lines
11 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
{
/// <summary>
/// Manages per-client subscription channels with shared MxAccess subscriptions.
/// Ref-counted tag subscriptions: first client creates, last client disposes.
/// </summary>
public sealed class SubscriptionManager : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<SubscriptionManager>();
private readonly IScadaClient _scadaClient;
private readonly int _channelCapacity;
private readonly BoundedChannelFullMode _channelFullMode;
// Client ID -> ClientSubscription
private readonly ConcurrentDictionary<string, ClientSubscription> _clientSubscriptions
= new ConcurrentDictionary<string, ClientSubscription>(StringComparer.OrdinalIgnoreCase);
// Tag address -> TagSubscription (shared, ref-counted)
private readonly ConcurrentDictionary<string, TagSubscription> _tagSubscriptions
= new ConcurrentDictionary<string, TagSubscription>(StringComparer.OrdinalIgnoreCase);
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000,
BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest)
{
_scadaClient = scadaClient;
_channelCapacity = channelCapacity;
_channelFullMode = channelFullMode;
}
/// <summary>
/// Creates a subscription for a client. Returns a ChannelReader to stream from.
/// Awaits COM subscription creation so the initial OnDataChange callback
/// is not missed.
/// </summary>
public async Task<ChannelReader<(string address, Vtq vtq)>> SubscribeAsync(
string clientId, IEnumerable<string> addresses, CancellationToken ct)
{
var channel = Channel.CreateBounded<(string address, Vtq vtq)>(
new BoundedChannelOptions(_channelCapacity)
{
FullMode = _channelFullMode,
SingleReader = true,
SingleWriter = false
});
var addressSet = new HashSet<string>(addresses, StringComparer.OrdinalIgnoreCase);
var clientSub = new ClientSubscription(clientId, channel, addressSet);
_clientSubscriptions[clientId] = clientSub;
var newTags = new List<string>();
_rwLock.EnterWriteLock();
try
{
foreach (var address in addressSet)
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
tagSub.ClientIds.Add(clientId);
}
else
{
_tagSubscriptions[address] = new TagSubscription(address,
new HashSet<string>(StringComparer.OrdinalIgnoreCase) { clientId });
newTags.Add(address);
}
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Create MxAccess COM subscriptions — awaited so the initial
// OnDataChange (first value delivery after AdviseSupervisory)
// is not lost. The channel and routing are already set up above,
// so any callback that fires during this call will be delivered.
if (newTags.Count > 0)
{
await CreateMxAccessSubscriptionsAsync(newTags);
}
// Register cancellation cleanup
ct.Register(() => UnsubscribeClient(clientId));
Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)",
clientId, addressSet.Count, newTags.Count);
return channel.Reader;
}
private async Task CreateMxAccessSubscriptionsAsync(List<string> addresses)
{
try
{
await _scadaClient.SubscribeAsync(
addresses,
(address, vtq) => OnTagValueChanged(address, vtq));
}
catch (Exception ex)
{
Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags", addresses.Count);
}
}
/// <summary>
/// Called from MxAccessClient's OnDataChange handler.
/// Fans out the update to all subscribed clients.
/// </summary>
public void OnTagValueChanged(string address, Vtq vtq)
{
_rwLock.EnterReadLock();
HashSet<string>? clientIds = null;
try
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
clientIds = new HashSet<string>(tagSub.ClientIds);
}
}
finally
{
_rwLock.ExitReadLock();
}
if (clientIds == null || clientIds.Count == 0) return;
foreach (var clientId in clientIds)
{
if (_clientSubscriptions.TryGetValue(clientId, out var clientSub))
{
if (!clientSub.Channel.Writer.TryWrite((address, vtq)))
{
clientSub.IncrementDropped();
Log.Debug("Dropped message for client {ClientId} on tag {Address} (channel full)",
clientId, address);
}
else
{
clientSub.IncrementDelivered();
}
}
}
}
/// <summary>
/// Removes a client's subscriptions and cleans up tag subscriptions
/// when the last client unsubscribes.
/// </summary>
public void UnsubscribeClient(string clientId)
{
if (!_clientSubscriptions.TryRemove(clientId, out var clientSub))
return;
var tagsToDispose = new List<string>();
_rwLock.EnterWriteLock();
try
{
foreach (var address in clientSub.Addresses)
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
tagSub.ClientIds.Remove(clientId);
// Last client unsubscribed — remove the tag subscription
if (tagSub.ClientIds.Count == 0)
{
_tagSubscriptions.TryRemove(address, out _);
tagsToDispose.Add(address);
}
}
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Unsubscribe tags with no remaining clients via address-based API
if (tagsToDispose.Count > 0)
{
try
{
_scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count);
}
}
// Complete the channel (signals end of stream to the gRPC handler)
clientSub.Channel.Writer.TryComplete();
Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)",
clientId, clientSub.DeliveredCount, clientSub.DroppedCount);
}
/// <summary>
/// Sends a bad-quality notification to all subscribed clients for all their tags.
/// Called when MxAccess disconnects.
/// </summary>
public void NotifyDisconnection()
{
var badVtq = Vtq.New(null, Quality.Bad_NotConnected);
foreach (var kvp in _clientSubscriptions)
{
foreach (var address in kvp.Value.Addresses)
{
kvp.Value.Channel.Writer.TryWrite((address, badVtq));
}
}
}
/// <summary>
/// Logs reconnection for observability. Data flow resumes automatically
/// via MxAccessClient.RecreateStoredSubscriptionsAsync callbacks.
/// </summary>
public void NotifyReconnection()
{
Log.Information("MxAccess reconnected -- subscriptions recreated, " +
"data flow will resume via OnDataChange callbacks " +
"({ClientCount} clients, {TagCount} tags)",
_clientSubscriptions.Count, _tagSubscriptions.Count);
}
/// <summary>Returns subscription statistics.</summary>
public SubscriptionStats GetStats()
{
long totalDelivered = 0;
long totalDropped = 0;
foreach (var kvp in _clientSubscriptions)
{
totalDelivered += kvp.Value.DeliveredCount;
totalDropped += kvp.Value.DroppedCount;
}
return new SubscriptionStats(
_clientSubscriptions.Count,
_tagSubscriptions.Count,
_clientSubscriptions.Values.Sum(c => c.Addresses.Count),
totalDelivered,
totalDropped);
}
public void Dispose()
{
foreach (var kvp in _clientSubscriptions)
{
kvp.Value.Channel.Writer.TryComplete();
}
_clientSubscriptions.Clear();
_tagSubscriptions.Clear();
_rwLock.Dispose();
}
// ── Nested types ─────────────────────────────────────────
private class ClientSubscription
{
public ClientSubscription(string clientId,
Channel<(string address, Vtq vtq)> channel,
HashSet<string> addresses)
{
ClientId = clientId;
Channel = channel;
Addresses = addresses;
}
public string ClientId { get; }
public Channel<(string address, Vtq vtq)> Channel { get; }
public HashSet<string> Addresses { get; }
// Use backing fields for Interlocked
private long _delivered;
private long _dropped;
public long DeliveredCount => Interlocked.Read(ref _delivered);
public long DroppedCount => Interlocked.Read(ref _dropped);
public void IncrementDelivered() => Interlocked.Increment(ref _delivered);
public void IncrementDropped() => Interlocked.Increment(ref _dropped);
}
private class TagSubscription
{
public TagSubscription(string address, HashSet<string> clientIds)
{
Address = address;
ClientIds = clientIds;
}
public string Address { get; }
public HashSet<string> ClientIds { get; }
}
}
}