Files
scadalink-design/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs
2026-03-22 23:18:09 -04:00

205 lines
7.0 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// Subscribes to value changes for the specified addresses.
/// Stores subscription state for reconnect replay.
/// COM calls dispatched on the dedicated STA thread.
/// </summary>
public async Task<IAsyncDisposable> SubscribeAsync(
IEnumerable<string> addresses,
Action<string, Vtq> callback,
CancellationToken ct = default)
{
if (!IsConnected)
throw new InvalidOperationException("Not connected to MxAccess");
var addressList = addresses.ToList();
await _staThread.RunAsync(() =>
{
lock (_lock)
{
if (!IsConnected || _lmxProxy == null)
throw new InvalidOperationException("Not connected to MxAccess");
foreach (var address in addressList)
{
SubscribeInternal(address);
// Store for reconnect replay
_storedSubscriptions[address] = callback;
}
}
});
Log.Information("Subscribed to {Count} tags", addressList.Count);
return new SubscriptionHandle(this, addressList, callback);
}
/// <summary>
/// Unsubscribes specific addresses by address name.
/// Removes from both COM state and stored subscriptions (no reconnect replay).
/// </summary>
public async Task UnsubscribeByAddressAsync(IEnumerable<string> addresses)
{
await UnsubscribeAsync(addresses);
}
/// <summary>
/// Unsubscribes specific addresses.
/// </summary>
internal async Task UnsubscribeAsync(IEnumerable<string> addresses)
{
var addressList = addresses.ToList();
await _staThread.RunAsync(() =>
{
lock (_lock)
{
foreach (var address in addressList)
{
UnsubscribeInternal(address);
_storedSubscriptions.Remove(address);
}
}
});
Log.Information("Unsubscribed from {Count} tags", addressList.Count);
}
/// <summary>
/// Recreates all stored subscriptions after a reconnect.
/// Does not re-store them (they're already stored).
/// </summary>
private async Task RecreateStoredSubscriptionsAsync()
{
Dictionary<string, Action<string, Vtq>> subscriptions;
lock (_lock)
{
if (_storedSubscriptions.Count == 0) return;
subscriptions = new Dictionary<string, Action<string, Vtq>>(_storedSubscriptions);
}
Log.Information("Recreating {Count} stored subscriptions after reconnect", subscriptions.Count);
await _staThread.RunAsync(() =>
{
lock (_lock)
{
foreach (var kvp in subscriptions)
{
try
{
SubscribeInternal(kvp.Key);
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to recreate subscription for {Address}", kvp.Key);
}
}
}
});
}
// ── Internal COM calls ──────────
/// <summary>
/// Registers a tag subscription with MxAccess COM API (AddItem + AdviseSupervisory).
/// Must be called while holding _lock.
/// </summary>
private void SubscribeInternal(string address)
{
if (_lmxProxy == null || _connectionHandle <= 0)
throw new InvalidOperationException("Not connected to MxAccess");
// If already subscribed to this address, skip
if (_addressToHandle.ContainsKey(address))
{
Log.Debug("Already subscribed to {Address}, skipping", address);
return;
}
// Add the item to MxAccess
int itemHandle = _lmxProxy.AddItem(_connectionHandle, address);
// Track handle-to-address and address-to-handle mappings
_handleToAddress[itemHandle] = address;
_addressToHandle[address] = itemHandle;
// Advise (subscribe) for data change events
_lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
Log.Debug("Subscribed to {Address} with handle {Handle}", address, itemHandle);
}
/// <summary>
/// Unregisters a tag subscription from MxAccess COM API (UnAdvise + RemoveItem).
/// Must be called while holding _lock.
/// </summary>
private void UnsubscribeInternal(string address)
{
if (!_addressToHandle.TryGetValue(address, out int itemHandle))
{
Log.Debug("No active subscription for {Address}, skipping unsubscribe", address);
return;
}
try
{
if (_lmxProxy != null && _connectionHandle > 0)
{
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
}
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing from {Address} (handle {Handle})", address, itemHandle);
}
finally
{
_handleToAddress.Remove(itemHandle);
_addressToHandle.Remove(address);
}
Log.Debug("Unsubscribed from {Address} (handle {Handle})", address, itemHandle);
}
/// <summary>
/// Disposable subscription handle that unsubscribes on disposal.
/// </summary>
private sealed class SubscriptionHandle : IAsyncDisposable
{
private readonly MxAccessClient _client;
private readonly List<string> _addresses;
private readonly Action<string, Vtq> _callback;
private bool _disposed;
public SubscriptionHandle(MxAccessClient client, List<string> addresses, Action<string, Vtq> callback)
{
_client = client;
_addresses = addresses;
_callback = callback;
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
await _client.UnsubscribeAsync(_addresses);
}
}
}
}