Files
scadalink-design/lmxproxy/src-reference/ZB.MOM.WW.LmxProxy.Host/Implementation/MxAccessClient.Subscription.cs
Joseph Doherty 0d63fb1105 feat(lmxproxy): phase 1 — v2 protocol types and domain model
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 23:41:56 -04:00

154 lines
6.4 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.Implementation
{
/// <summary>
/// Subscription management for MxAccessClient to handle SCADA tag updates.
/// </summary>
public sealed partial class MxAccessClient
{
/// <summary>
/// Subscribes to a set of addresses and registers a callback for value changes.
/// </summary>
/// <param name="addresses">The collection of addresses to subscribe to.</param>
/// <param name="callback">
/// The callback to invoke when a value changes.
/// The callback receives the address and the new <see cref="Vtq" /> value.
/// </param>
/// <param name="ct">An optional <see cref="CancellationToken" /> to cancel the operation.</param>
/// <returns>
/// A <see cref="Task{IAsyncDisposable}" /> that completes with a handle to the subscription.
/// Disposing the handle will unsubscribe from all addresses.
/// </returns>
/// <exception cref="InvalidOperationException">Thrown if not connected to MxAccess.</exception>
/// <exception cref="Exception">Thrown if subscription fails for any address.</exception>
public Task<IAsyncDisposable> SubscribeAsync(IEnumerable<string> addresses, Action<string, Vtq> callback,
CancellationToken ct = default) => SubscribeInternalAsync(addresses, callback, true, ct);
/// <summary>
/// Internal subscription method that allows control over whether to store the subscription for recreation.
/// </summary>
private Task<IAsyncDisposable> SubscribeInternalAsync(IEnumerable<string> addresses,
Action<string, Vtq> callback, bool storeForRecreation, CancellationToken ct = default)
{
return Task.Run<IAsyncDisposable>(() =>
{
lock (_lock)
{
if (!IsConnected || _lmxProxy == null)
{
throw new InvalidOperationException("Not connected to MxAccess");
}
var subscriptionIds = new List<string>();
try
{
var addressList = addresses.ToList();
foreach (string? address in addressList)
{
// Add the item
var itemHandle = _lmxProxy.AddItem(_connectionHandle, address);
// Create subscription info
string subscriptionId = Guid.NewGuid().ToString();
var subscription = new SubscriptionInfo
{
Address = address,
ItemHandle = itemHandle,
Callback = callback,
SubscriptionId = subscriptionId
};
// Store subscription
_subscriptions[subscriptionId] = subscription;
_subscriptionsByHandle[itemHandle] = subscription;
subscriptionIds.Add(subscriptionId);
// Advise the item
_lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
Logger.Debug("Subscribed to {Address} with handle {Handle}", address, itemHandle);
}
// Store subscription group for automatic recreation after reconnect
string groupId = Guid.NewGuid().ToString();
if (storeForRecreation)
{
_storedSubscriptions.Add(new StoredSubscription
{
Addresses = addressList,
Callback = callback,
GroupId = groupId
});
Logger.Debug(
"Stored subscription group {GroupId} with {Count} addresses for automatic recreation",
groupId, addressList.Count);
}
return new SubscriptionHandle(this, subscriptionIds, groupId);
}
catch (Exception ex)
{
// Clean up any subscriptions that were created
foreach (string? id in subscriptionIds)
{
UnsubscribeInternalAsync(id).Wait();
}
Logger.Error(ex, "Failed to subscribe to addresses");
throw;
}
}
}, ct);
}
/// <summary>
/// Unsubscribes from a subscription by its ID.
/// </summary>
/// <param name="subscriptionId">The subscription identifier.</param>
/// <returns>
/// A <see cref="Task" /> representing the asynchronous operation.
/// </returns>
private Task UnsubscribeInternalAsync(string subscriptionId)
{
return Task.Run(() =>
{
lock (_lock)
{
if (!_subscriptions.TryGetValue(subscriptionId, out SubscriptionInfo? subscription))
{
return;
}
try
{
if (_lmxProxy != null && _connectionHandle > 0)
{
_lmxProxy.UnAdvise(_connectionHandle, subscription.ItemHandle);
_lmxProxy.RemoveItem(_connectionHandle, subscription.ItemHandle);
}
_subscriptions.Remove(subscriptionId);
_subscriptionsByHandle.Remove(subscription.ItemHandle);
Logger.Debug("Unsubscribed from {Address}", subscription.Address);
}
catch (Exception ex)
{
Logger.Warning(ex, "Error unsubscribing from {Address}", subscription.Address);
}
}
});
}
}
}