359 lines
14 KiB
C#
359 lines
14 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using ArchestrA.MxAccess;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
|
|
|
/// <summary>
|
|
/// MXAccess runtime client — focused port of v1 <c>MxAccessClient</c>. Owns one
|
|
/// <c>LMXProxyServer</c> COM connection on the supplied <see cref="StaPump"/>; serializes
|
|
/// read / write / subscribe through the pump because all COM calls must run on the STA
|
|
/// thread. Subscriptions are stored so they can be replayed on reconnect (full reconnect
|
|
/// loop is the deferred-but-non-blocking refinement; this version covers connect/read/write
|
|
/// /subscribe/unsubscribe — the MVP needed for parity testing).
|
|
/// </summary>
|
|
public sealed class MxAccessClient : IDisposable
|
|
{
|
|
private readonly StaPump _pump;
|
|
private readonly IMxProxy _proxy;
|
|
private readonly string _clientName;
|
|
private readonly MxAccessClientOptions _options;
|
|
|
|
// Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read).
|
|
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
|
|
private readonly ConcurrentDictionary<int, string> _handleToAddress = new();
|
|
private readonly ConcurrentDictionary<string, Action<string, Vtq>> _subscriptions =
|
|
new(StringComparer.OrdinalIgnoreCase);
|
|
private readonly ConcurrentDictionary<int, TaskCompletionSource<bool>> _pendingWrites = new();
|
|
|
|
private int _connectionHandle;
|
|
private bool _connected;
|
|
private DateTime _lastObservedActivityUtc = DateTime.UtcNow;
|
|
private CancellationTokenSource? _monitorCts;
|
|
private int _reconnectCount;
|
|
private bool _disposed;
|
|
|
|
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
|
|
public event EventHandler<bool>? ConnectionStateChanged;
|
|
|
|
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
|
|
{
|
|
_pump = pump;
|
|
_proxy = proxy;
|
|
_clientName = clientName;
|
|
_options = options ?? new MxAccessClientOptions();
|
|
_proxy.OnDataChange += OnDataChange;
|
|
_proxy.OnWriteComplete += OnWriteComplete;
|
|
}
|
|
|
|
public bool IsConnected => _connected;
|
|
public int SubscriptionCount => _subscriptions.Count;
|
|
public int ReconnectCount => _reconnectCount;
|
|
|
|
/// <summary>Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call.</summary>
|
|
public async Task<int> ConnectAsync()
|
|
{
|
|
var handle = await _pump.InvokeAsync(() =>
|
|
{
|
|
if (_connected) return _connectionHandle;
|
|
_connectionHandle = _proxy.Register(_clientName);
|
|
_connected = true;
|
|
return _connectionHandle;
|
|
});
|
|
|
|
ConnectionStateChanged?.Invoke(this, true);
|
|
|
|
if (_options.AutoReconnect && _monitorCts is null)
|
|
{
|
|
_monitorCts = new CancellationTokenSource();
|
|
_ = Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
|
|
}
|
|
|
|
return handle;
|
|
}
|
|
|
|
public async Task DisconnectAsync()
|
|
{
|
|
_monitorCts?.Cancel();
|
|
_monitorCts = null;
|
|
|
|
await _pump.InvokeAsync(() =>
|
|
{
|
|
if (!_connected) return;
|
|
try { _proxy.Unregister(_connectionHandle); }
|
|
finally
|
|
{
|
|
_connected = false;
|
|
_addressToHandle.Clear();
|
|
_handleToAddress.Clear();
|
|
}
|
|
});
|
|
|
|
ConnectionStateChanged?.Invoke(this, false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Background loop that watches for connection liveness signals and triggers
|
|
/// reconnect-with-replay when the connection appears dead. Per Phase 2 high finding #2:
|
|
/// v1's MxAccessClient.Monitor pattern lifted into the new pump-based client. Uses
|
|
/// observed-activity timestamp + optional probe-tag subscription. Without an explicit
|
|
/// probe tag, falls back to "no data change in N seconds + no successful read in N
|
|
/// seconds = unhealthy" — same shape as v1.
|
|
/// </summary>
|
|
private async Task MonitorLoopAsync(CancellationToken ct)
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
try { await Task.Delay(_options.MonitorInterval, ct); }
|
|
catch (OperationCanceledException) { break; }
|
|
|
|
if (!_connected || _disposed) continue;
|
|
|
|
var idle = DateTime.UtcNow - _lastObservedActivityUtc;
|
|
if (idle <= _options.StaleThreshold) continue;
|
|
|
|
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
|
|
// our reconnect signal.
|
|
bool probeOk;
|
|
try
|
|
{
|
|
probeOk = await _pump.InvokeAsync(() =>
|
|
{
|
|
// AddItem on the connection handle is cheap and round-trips through COM.
|
|
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
|
|
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
|
|
catch { return false; }
|
|
});
|
|
}
|
|
catch { probeOk = false; }
|
|
|
|
if (probeOk)
|
|
{
|
|
_lastObservedActivityUtc = DateTime.UtcNow;
|
|
continue;
|
|
}
|
|
|
|
// Connection appears dead — reconnect-with-replay.
|
|
try
|
|
{
|
|
await _pump.InvokeAsync(() =>
|
|
{
|
|
try { _proxy.Unregister(_connectionHandle); } catch { /* dead anyway */ }
|
|
_connected = false;
|
|
});
|
|
ConnectionStateChanged?.Invoke(this, false);
|
|
|
|
await _pump.InvokeAsync(() =>
|
|
{
|
|
_connectionHandle = _proxy.Register(_clientName);
|
|
_connected = true;
|
|
});
|
|
_reconnectCount++;
|
|
ConnectionStateChanged?.Invoke(this, true);
|
|
|
|
// Replay every subscription that was active before the disconnect.
|
|
var snapshot = _addressToHandle.Keys.ToArray();
|
|
_addressToHandle.Clear();
|
|
_handleToAddress.Clear();
|
|
foreach (var fullRef in snapshot)
|
|
{
|
|
try { await SubscribeOnPumpAsync(fullRef); }
|
|
catch { /* skip — operator can re-subscribe */ }
|
|
}
|
|
|
|
_lastObservedActivityUtc = DateTime.UtcNow;
|
|
}
|
|
catch
|
|
{
|
|
// Reconnect failed; back off and retry on the next tick.
|
|
_connected = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// One-shot read implemented as a transient subscribe + unsubscribe.
|
|
/// <c>LMXProxyServer</c> doesn't expose a synchronous read, so the canonical pattern
|
|
/// (lifted from v1) is to subscribe, await the first OnDataChange, then unsubscribe.
|
|
/// This method captures that single value.
|
|
/// </summary>
|
|
public async Task<Vtq> ReadAsync(string fullReference, TimeSpan timeout, CancellationToken ct)
|
|
{
|
|
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
|
|
|
var tcs = new TaskCompletionSource<Vtq>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
Action<string, Vtq> oneShot = (_, value) => tcs.TrySetResult(value);
|
|
|
|
// Stash the one-shot handler before sending the subscribe, then remove it after firing.
|
|
_subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot));
|
|
var addedToReadOnlyAttribute = !_addressToHandle.ContainsKey(fullReference);
|
|
|
|
try
|
|
{
|
|
await SubscribeOnPumpAsync(fullReference);
|
|
|
|
using var _ = ct.Register(() => tcs.TrySetCanceled());
|
|
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
|
|
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
|
|
|
|
return await tcs.Task;
|
|
}
|
|
finally
|
|
{
|
|
// High 1 — always detach the one-shot handler, even on cancellation/timeout/throw.
|
|
// If we were the one who added the underlying MXAccess subscription (no other
|
|
// caller had it), tear it down too so we don't leak a probe item handle.
|
|
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
|
|
if (addedToReadOnlyAttribute)
|
|
{
|
|
try { await UnsubscribeAsync(fullReference); }
|
|
catch { /* shutdown-best-effort */ }
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Writes <paramref name="value"/> to the runtime and AWAITS the OnWriteComplete
|
|
/// callback so the caller learns the actual write status. Per Phase 2 medium finding #4
|
|
/// in <c>exit-gate-phase-2.md</c>: the previous fire-and-forget version returned a
|
|
/// false-positive Good even when the runtime rejected the write post-callback.
|
|
/// </summary>
|
|
public async Task<bool> WriteAsync(string fullReference, object value,
|
|
int securityClassification = 0, TimeSpan? timeout = null)
|
|
{
|
|
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
|
var actualTimeout = timeout ?? TimeSpan.FromSeconds(5);
|
|
|
|
var itemHandle = await _pump.InvokeAsync(() => ResolveItem(fullReference));
|
|
|
|
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
if (!_pendingWrites.TryAdd(itemHandle, tcs))
|
|
{
|
|
// A prior write to the same item handle is still pending — uncommon but possible
|
|
// if the caller spammed writes. Replace it: the older TCS observes a Cancelled task.
|
|
if (_pendingWrites.TryRemove(itemHandle, out var prior))
|
|
prior.TrySetCanceled();
|
|
_pendingWrites[itemHandle] = tcs;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _pump.InvokeAsync(() =>
|
|
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification));
|
|
|
|
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(actualTimeout));
|
|
if (raceTask != tcs.Task)
|
|
throw new TimeoutException($"MXAccess write of {fullReference} timed out after {actualTimeout}");
|
|
|
|
return await tcs.Task;
|
|
}
|
|
finally
|
|
{
|
|
_pendingWrites.TryRemove(itemHandle, out _);
|
|
}
|
|
}
|
|
|
|
public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback)
|
|
{
|
|
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
|
|
|
_subscriptions.AddOrUpdate(fullReference, callback, (_, existing) => Combine(existing, callback));
|
|
await SubscribeOnPumpAsync(fullReference);
|
|
}
|
|
|
|
public Task UnsubscribeAsync(string fullReference) => _pump.InvokeAsync(() =>
|
|
{
|
|
if (!_connected) return;
|
|
if (!_addressToHandle.TryRemove(fullReference, out var handle)) return;
|
|
_handleToAddress.TryRemove(handle, out _);
|
|
_subscriptions.TryRemove(fullReference, out _);
|
|
|
|
try
|
|
{
|
|
_proxy.UnAdviseSupervisory(_connectionHandle, handle);
|
|
_proxy.RemoveItem(_connectionHandle, handle);
|
|
}
|
|
catch { /* best-effort during teardown */ }
|
|
});
|
|
|
|
private Task<int> SubscribeOnPumpAsync(string fullReference) => _pump.InvokeAsync(() =>
|
|
{
|
|
if (_addressToHandle.TryGetValue(fullReference, out var existing)) return existing;
|
|
|
|
var itemHandle = _proxy.AddItem(_connectionHandle, fullReference);
|
|
_addressToHandle[fullReference] = itemHandle;
|
|
_handleToAddress[itemHandle] = fullReference;
|
|
_proxy.AdviseSupervisory(_connectionHandle, itemHandle);
|
|
return itemHandle;
|
|
});
|
|
|
|
private int ResolveItem(string fullReference)
|
|
{
|
|
if (_addressToHandle.TryGetValue(fullReference, out var existing)) return existing;
|
|
var itemHandle = _proxy.AddItem(_connectionHandle, fullReference);
|
|
_addressToHandle[fullReference] = itemHandle;
|
|
_handleToAddress[itemHandle] = fullReference;
|
|
return itemHandle;
|
|
}
|
|
|
|
private void OnDataChange(int hLMXServerHandle, int phItemHandle, object pvItemValue,
|
|
int pwItemQuality, object pftItemTimeStamp, ref MXSTATUS_PROXY[] itemStatus)
|
|
{
|
|
if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return;
|
|
|
|
// Liveness: any data-change event is proof the connection is alive.
|
|
_lastObservedActivityUtc = DateTime.UtcNow;
|
|
|
|
var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow;
|
|
var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality));
|
|
var vtq = new Vtq(pvItemValue, ts, quality);
|
|
|
|
if (_subscriptions.TryGetValue(fullRef, out var cb)) cb?.Invoke(fullRef, vtq);
|
|
}
|
|
|
|
private void OnWriteComplete(int hLMXServerHandle, int phItemHandle, ref MXSTATUS_PROXY[] itemStatus)
|
|
{
|
|
if (_pendingWrites.TryRemove(phItemHandle, out var tcs))
|
|
tcs.TrySetResult(itemStatus is null || itemStatus.Length == 0 || itemStatus[0].success != 0);
|
|
}
|
|
|
|
private static Action<string, Vtq> Combine(Action<string, Vtq> a, Action<string, Vtq> b)
|
|
=> (Action<string, Vtq>)Delegate.Combine(a, b)!;
|
|
|
|
private static Action<string, Vtq> Remove(Action<string, Vtq> source, Action<string, Vtq> remove)
|
|
=> (Action<string, Vtq>?)Delegate.Remove(source, remove) ?? ((_, _) => { });
|
|
|
|
public void Dispose()
|
|
{
|
|
_disposed = true;
|
|
_monitorCts?.Cancel();
|
|
|
|
try { DisconnectAsync().GetAwaiter().GetResult(); }
|
|
catch { /* swallow */ }
|
|
|
|
_proxy.OnDataChange -= OnDataChange;
|
|
_proxy.OnWriteComplete -= OnWriteComplete;
|
|
_monitorCts?.Dispose();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tunables for <see cref="MxAccessClient"/>'s reconnect monitor. Defaults match the v1
|
|
/// monitor's polling cadence so behavior is consistent across the lift.
|
|
/// </summary>
|
|
public sealed class MxAccessClientOptions
|
|
{
|
|
/// <summary>Whether to start the background monitor at connect time.</summary>
|
|
public bool AutoReconnect { get; init; } = true;
|
|
|
|
/// <summary>How often the monitor wakes up to check liveness.</summary>
|
|
public TimeSpan MonitorInterval { get; init; } = TimeSpan.FromSeconds(5);
|
|
|
|
/// <summary>If no data-change activity in this window, the monitor probes the connection.</summary>
|
|
public TimeSpan StaleThreshold { get; init; } = TimeSpan.FromSeconds(60);
|
|
}
|