using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; using ArchestrA.MxAccess; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; /// /// MXAccess runtime client — focused port of v1 MxAccessClient. Owns one /// LMXProxyServer COM connection on the supplied ; serializes /// read / write / subscribe through the pump because all COM calls must run on the STA /// thread. Subscriptions are stored so they can be replayed on reconnect (full reconnect /// loop is the deferred-but-non-blocking refinement; this version covers connect/read/write /// /subscribe/unsubscribe — the MVP needed for parity testing). /// public sealed class MxAccessClient : IDisposable { private static readonly ILogger Log = Serilog.Log.ForContext(); private readonly StaPump _pump; private readonly IMxProxy _proxy; private readonly string _clientName; private readonly MxAccessClientOptions _options; // Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read). private readonly ConcurrentDictionary _addressToHandle = new(StringComparer.OrdinalIgnoreCase); private readonly ConcurrentDictionary _handleToAddress = new(); private readonly ConcurrentDictionary> _subscriptions = new(StringComparer.OrdinalIgnoreCase); private readonly ConcurrentDictionary> _pendingWrites = new(); private int _connectionHandle; private bool _connected; private DateTime _lastObservedActivityUtc = DateTime.UtcNow; private CancellationTokenSource? _monitorCts; private int _reconnectCount; private bool _disposed; /// Fires whenever the connection transitions Connected ↔ Disconnected. public event EventHandler? ConnectionStateChanged; /// /// Fires once per failed subscription replay after a reconnect. Carries the tag reference /// and the exception so the backend can propagate the degradation signal (e.g. mark the /// subscription bad on the Proxy side rather than silently losing its callback). Added for /// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an /// operator would only find out that a specific subscription stopped updating through a /// data-quality complaint from downstream. /// public event EventHandler? SubscriptionReplayFailed; public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null) { _pump = pump; _proxy = proxy; _clientName = clientName; _options = options ?? new MxAccessClientOptions(); _proxy.OnDataChange += OnDataChange; _proxy.OnWriteComplete += OnWriteComplete; } public bool IsConnected => _connected; public int SubscriptionCount => _subscriptions.Count; public int ReconnectCount => _reconnectCount; /// /// Wonderware client identity used when registering with the LMXProxyServer. Surfaced so /// can tag its OnHostStatusChanged IPC /// pushes with a stable gateway name per PR 8. /// public string ClientName => _clientName; /// Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call. public async Task ConnectAsync() { var handle = await _pump.InvokeAsync(() => { if (_connected) return _connectionHandle; _connectionHandle = _proxy.Register(_clientName); _connected = true; return _connectionHandle; }); ConnectionStateChanged?.Invoke(this, true); if (_options.AutoReconnect && _monitorCts is null) { _monitorCts = new CancellationTokenSource(); _ = Task.Run(() => MonitorLoopAsync(_monitorCts.Token)); } return handle; } public async Task DisconnectAsync() { _monitorCts?.Cancel(); _monitorCts = null; await _pump.InvokeAsync(() => { if (!_connected) return; try { _proxy.Unregister(_connectionHandle); } finally { _connected = false; _addressToHandle.Clear(); _handleToAddress.Clear(); } }); ConnectionStateChanged?.Invoke(this, false); } /// /// Background loop that watches for connection liveness signals and triggers /// reconnect-with-replay when the connection appears dead. Per Phase 2 high finding #2: /// v1's MxAccessClient.Monitor pattern lifted into the new pump-based client. Uses /// observed-activity timestamp + optional probe-tag subscription. Without an explicit /// probe tag, falls back to "no data change in N seconds + no successful read in N /// seconds = unhealthy" — same shape as v1. /// private async Task MonitorLoopAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { try { await Task.Delay(_options.MonitorInterval, ct); } catch (OperationCanceledException) { break; } if (!_connected || _disposed) continue; var idle = DateTime.UtcNow - _lastObservedActivityUtc; if (idle <= _options.StaleThreshold) continue; // Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's // our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item // handle; we must RemoveItem it on the same pump turn or the long-running monitor // leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely). bool probeOk; try { probeOk = await _pump.InvokeAsync(() => { int probeHandle = 0; try { probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat"); return probeHandle > 0; } catch { return false; } finally { if (probeHandle > 0) { try { _proxy.RemoveItem(_connectionHandle, probeHandle); } catch { /* proxy is dying; best-effort cleanup */ } } } }); } catch { probeOk = false; } if (probeOk) { _lastObservedActivityUtc = DateTime.UtcNow; continue; } // Connection appears dead — reconnect-with-replay. try { await _pump.InvokeAsync(() => { try { _proxy.Unregister(_connectionHandle); } catch { /* dead anyway */ } _connected = false; }); ConnectionStateChanged?.Invoke(this, false); await _pump.InvokeAsync(() => { _connectionHandle = _proxy.Register(_clientName); _connected = true; }); _reconnectCount++; ConnectionStateChanged?.Invoke(this, true); // Replay every subscription that was active before the disconnect. PR 6 low // finding #2: surface per-tag failures — log them and raise // SubscriptionReplayFailed so the backend can propagate the degraded state // (previously swallowed silently; downstream quality dropped without a signal). var snapshot = _addressToHandle.Keys.ToArray(); _addressToHandle.Clear(); _handleToAddress.Clear(); var failed = 0; foreach (var fullRef in snapshot) { try { await SubscribeOnPumpAsync(fullRef); } catch (Exception subEx) { failed++; Log.Warning(subEx, "MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}", fullRef, _reconnectCount); SubscriptionReplayFailed?.Invoke(this, new SubscriptionReplayFailedEventArgs(fullRef, subEx)); } } if (failed > 0) Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length); else Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length); _lastObservedActivityUtc = DateTime.UtcNow; } catch { // Reconnect failed; back off and retry on the next tick. _connected = false; } } } /// /// One-shot read implemented as a transient subscribe + unsubscribe. /// LMXProxyServer doesn't expose a synchronous read, so the canonical pattern /// (lifted from v1) is to subscribe, await the first OnDataChange, then unsubscribe. /// This method captures that single value. /// public async Task ReadAsync(string fullReference, TimeSpan timeout, CancellationToken ct) { if (!_connected) throw new InvalidOperationException("MxAccessClient not connected"); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Action oneShot = (_, value) => tcs.TrySetResult(value); // Stash the one-shot handler before sending the subscribe, then remove it after firing. _subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot)); var addedToReadOnlyAttribute = !_addressToHandle.ContainsKey(fullReference); try { await SubscribeOnPumpAsync(fullReference); using var _ = ct.Register(() => tcs.TrySetCanceled()); var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct)); if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}"); return await tcs.Task; } finally { // High 1 — always detach the one-shot handler, even on cancellation/timeout/throw. // If we were the one who added the underlying MXAccess subscription (no other // caller had it), tear it down too so we don't leak a probe item handle. _subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot)); if (addedToReadOnlyAttribute) { try { await UnsubscribeAsync(fullReference); } catch { /* shutdown-best-effort */ } } } } /// /// Writes to the runtime and AWAITS the OnWriteComplete /// callback so the caller learns the actual write status. Per Phase 2 medium finding #4 /// in exit-gate-phase-2.md: the previous fire-and-forget version returned a /// false-positive Good even when the runtime rejected the write post-callback. /// public async Task WriteAsync(string fullReference, object value, int securityClassification = 0, TimeSpan? timeout = null) { if (!_connected) throw new InvalidOperationException("MxAccessClient not connected"); var actualTimeout = timeout ?? TimeSpan.FromSeconds(5); var itemHandle = await _pump.InvokeAsync(() => ResolveItem(fullReference)); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); if (!_pendingWrites.TryAdd(itemHandle, tcs)) { // A prior write to the same item handle is still pending — uncommon but possible // if the caller spammed writes. Replace it: the older TCS observes a Cancelled task. if (_pendingWrites.TryRemove(itemHandle, out var prior)) prior.TrySetCanceled(); _pendingWrites[itemHandle] = tcs; } try { await _pump.InvokeAsync(() => _proxy.Write(_connectionHandle, itemHandle, value, securityClassification)); var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(actualTimeout)); if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess write of {fullReference} timed out after {actualTimeout}"); return await tcs.Task; } finally { _pendingWrites.TryRemove(itemHandle, out _); } } public async Task SubscribeAsync(string fullReference, Action callback) { if (!_connected) throw new InvalidOperationException("MxAccessClient not connected"); _subscriptions.AddOrUpdate(fullReference, callback, (_, existing) => Combine(existing, callback)); await SubscribeOnPumpAsync(fullReference); } public Task UnsubscribeAsync(string fullReference) => _pump.InvokeAsync(() => { if (!_connected) return; if (!_addressToHandle.TryRemove(fullReference, out var handle)) return; _handleToAddress.TryRemove(handle, out _); _subscriptions.TryRemove(fullReference, out _); try { _proxy.UnAdviseSupervisory(_connectionHandle, handle); _proxy.RemoveItem(_connectionHandle, handle); } catch { /* best-effort during teardown */ } }); private Task SubscribeOnPumpAsync(string fullReference) => _pump.InvokeAsync(() => { if (_addressToHandle.TryGetValue(fullReference, out var existing)) return existing; var itemHandle = _proxy.AddItem(_connectionHandle, fullReference); _addressToHandle[fullReference] = itemHandle; _handleToAddress[itemHandle] = fullReference; _proxy.AdviseSupervisory(_connectionHandle, itemHandle); return itemHandle; }); private int ResolveItem(string fullReference) { if (_addressToHandle.TryGetValue(fullReference, out var existing)) return existing; var itemHandle = _proxy.AddItem(_connectionHandle, fullReference); _addressToHandle[fullReference] = itemHandle; _handleToAddress[itemHandle] = fullReference; return itemHandle; } private void OnDataChange(int hLMXServerHandle, int phItemHandle, object pvItemValue, int pwItemQuality, object pftItemTimeStamp, ref MXSTATUS_PROXY[] itemStatus) { if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return; // Liveness: any data-change event is proof the connection is alive. _lastObservedActivityUtc = DateTime.UtcNow; var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow; var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality)); var vtq = new Vtq(pvItemValue, ts, quality); if (_subscriptions.TryGetValue(fullRef, out var cb)) cb?.Invoke(fullRef, vtq); } private void OnWriteComplete(int hLMXServerHandle, int phItemHandle, ref MXSTATUS_PROXY[] itemStatus) { if (_pendingWrites.TryRemove(phItemHandle, out var tcs)) tcs.TrySetResult(itemStatus is null || itemStatus.Length == 0 || itemStatus[0].success != 0); } private static Action Combine(Action a, Action b) => (Action)Delegate.Combine(a, b)!; private static Action Remove(Action source, Action remove) => (Action?)Delegate.Remove(source, remove) ?? ((_, _) => { }); public void Dispose() { _disposed = true; _monitorCts?.Cancel(); try { DisconnectAsync().GetAwaiter().GetResult(); } catch { /* swallow */ } _proxy.OnDataChange -= OnDataChange; _proxy.OnWriteComplete -= OnWriteComplete; _monitorCts?.Dispose(); } } /// /// Tunables for 's reconnect monitor. Defaults match the v1 /// monitor's polling cadence so behavior is consistent across the lift. /// public sealed class MxAccessClientOptions { /// Whether to start the background monitor at connect time. public bool AutoReconnect { get; init; } = true; /// How often the monitor wakes up to check liveness. public TimeSpan MonitorInterval { get; init; } = TimeSpan.FromSeconds(5); /// If no data-change activity in this window, the monitor probes the connection. public TimeSpan StaleThreshold { get; init; } = TimeSpan.FromSeconds(60); }