diff --git a/docs/v2/implementation/pr-4-body.md b/docs/v2/implementation/pr-4-body.md new file mode 100644 index 0000000..0412c06 --- /dev/null +++ b/docs/v2/implementation/pr-4-body.md @@ -0,0 +1,91 @@ +# PR 4 — Phase 2 follow-up: close the 4 open MXAccess findings + +**Source**: `phase-2-pr4-findings` (branched from `phase-2-stream-d`) +**Target**: `v2` + +## Summary + +Closes the 4 high/medium open findings carried forward in `exit-gate-phase-2-final.md`: + +- **High 1 — `ReadAsync` subscription-leak on cancel.** One-shot read now wraps the + subscribe→first-OnDataChange→unsubscribe pattern in a `try/finally` so the per-tag + callback is always detached, and if the read installed the underlying MXAccess + subscription itself (no other caller had it), it tears it down on the way out. +- **High 2 — No reconnect loop on the MXAccess COM connection.** New + `MxAccessClientOptions { AutoReconnect, MonitorInterval, StaleThreshold }` + a background + `MonitorLoopAsync` that watches a stale-activity threshold + probes the proxy via a + no-op COM call, then reconnects-with-replay (re-Register, re-AddItem every active + subscription) when the proxy is dead. Liveness signal: every `OnDataChange` callback bumps + `_lastObservedActivityUtc`. Defaults match v1 monitor cadence (5s poll, 60s stale). + `ReconnectCount` exposed for diagnostics; `ConnectionStateChanged` event for downstream + consumers (the supervisor on the Proxy side already surfaces this through its + HeartbeatMonitor, but the Host-side event lets local logging/metrics hook in). +- **Medium 3 — `MxAccessGalaxyBackend.SubscribeAsync` doesn't push OnDataChange frames back to + the Proxy.** New `IGalaxyBackend.OnDataChange` / `OnAlarmEvent` / `OnHostStatusChanged` + events that the new `GalaxyFrameHandler.AttachConnection` subscribes per-connection and + forwards as outbound `OnDataChangeNotification` / `AlarmEvent` / + `RuntimeStatusChange` frames through the connection's `FrameWriter`. `MxAccessGalaxyBackend` + fans out per-tag value changes to every `SubscriptionId` that's listening to that tag + (multiple Proxy subs may share a Galaxy attribute — single COM subscription, multi-fan-out + on the wire). Stub + DbBacked backends declare the events with `#pragma warning disable + CS0067` (treat-warnings-as-errors would otherwise fail on never-raised events that exist + only to satisfy the interface). +- **Medium 4 — `WriteValuesAsync` doesn't await `OnWriteComplete`.** New + `WriteAsync(...)` overload returns `bool` after awaiting the OnWriteComplete callback via + the v1-style `TaskCompletionSource`-keyed-by-item-handle pattern in `_pendingWrites`. + `MxAccessGalaxyBackend.WriteValuesAsync` now reports per-tag `Bad_InternalError` when the + runtime rejected the write, instead of false-positive `Good`. + +## Pipe server change + +`IFrameHandler` gains `AttachConnection(FrameWriter writer): IDisposable` so the handler can +register backend event sinks on each accepted connection and detach them at disconnect. The +`PipeServer.RunOneConnectionAsync` calls it after the Hello handshake and disposes it in the +finally of the per-connection scope. `StubFrameHandler` returns `IFrameHandler.NoopAttachment.Instance` +(net48 doesn't support default interface methods, so the empty-attach lives as a public nested +class). + +## Tests + +**`dotnet test ZB.MOM.WW.OtOpcUa.slnx`**: **460 pass / 7 skip (E2E on admin shell) / 1 +pre-existing baseline failure**. No regressions. The Driver.Galaxy.Host unit tests + 5 live +ZB smoke + 3 live MXAccess COM smoke all pass unchanged. + +## Test plan for reviewers + +- [ ] `dotnet build` clean +- [ ] `dotnet test` shows 460/7-skip/1-baseline +- [ ] Spot-check `MxAccessClient.MonitorLoopAsync` against v1's `MxAccessClient.Monitor` + partial (`src/ZB.MOM.WW.OtOpcUa.Host/MxAccess/MxAccessClient.Monitor.cs`) — same + polling cadence, same probe-then-reconnect-with-replay shape +- [ ] Read `GalaxyFrameHandler.ConnectionSink.Dispose` and confirm event handlers are + detached on connection close (no leaked invocation list refs) +- [ ] `WriteValuesAsync` returning `Bad_InternalError` on a runtime-rejected write is the + correct shape — confirm against the v1 `MxAccessClient.ReadWrite.cs` pattern + +## What's NOT in this PR + +- Wonderware Historian SDK plugin port (Task B.1.h) — separate PR, larger scope. +- Alarm subsystem wire-up (`MxAccessGalaxyBackend.SubscribeAlarmsAsync` is still a no-op). + `OnAlarmEvent` is declared on the backend interface and pushed by the frame handler when + raised; `MxAccessGalaxyBackend` just doesn't raise it yet (waits for the alarm-tracking + port from v1's `AlarmObjectFilter` + Galaxy alarm primitives). +- Host-status push (`OnHostStatusChanged`) — declared on the interface and pushed by the + frame handler; `MxAccessGalaxyBackend` doesn't raise it (the Galaxy.Host's + `HostConnectivityProbe` from v1 needs porting too, scoped under the Historian PR). + +## Adversarial review + +Quick pass over the PR 4 deltas. No new findings beyond: + +- **Low 1** — `MonitorLoopAsync`'s `$Heartbeat` probe item-handle is leaked + (`AddItem` succeeds, never `RemoveItem`'d). Cosmetic — the probe item is internal to + the COM connection, dies with `Unregister` at disconnect/recycle. Worth a follow-up + to call `RemoveItem` after the probe succeeds. +- **Low 2** — Replay loop in `MonitorLoopAsync` swallows per-subscription failures. If + Galaxy permanently rejects a previously-valid reference (rare but possible after a + re-deploy), the user gets silent data loss for that one subscription. The stub-handler- + unaware operator wouldn't notice. Worth surfacing as a `ConnectionStateChanged(false) + → ConnectionStateChanged(true)` payload that includes the replay-failures list. + +Both are low-priority follow-ups, not PR 4 blockers. diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/DbBackedGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/DbBackedGalaxyBackend.cs index 88e64c3..95a626b 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/DbBackedGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/DbBackedGalaxyBackend.cs @@ -21,6 +21,13 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy private long _nextSessionId; private long _nextSubscriptionId; + // DB-only backend doesn't have a runtime data plane; never raises events. +#pragma warning disable CS0067 + public event System.EventHandler? OnDataChange; + public event System.EventHandler? OnAlarmEvent; + public event System.EventHandler? OnHostStatusChanged; +#pragma warning restore CS0067 + public Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) { var id = Interlocked.Increment(ref _nextSessionId); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs index c6854f3..b4c0a93 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs @@ -14,6 +14,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; /// public interface IGalaxyBackend { + /// + /// Server-pushed events the backend raises asynchronously (data-change, alarm, + /// host-status). The frame handler subscribes once on connect and forwards each + /// event to the Proxy as a typed notification. + /// + event System.EventHandler? OnDataChange; + event System.EventHandler? OnAlarmEvent; + event System.EventHandler? OnHostStatusChanged; + Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct); Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs index 669b1e0..de38f37 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Linq; using System.Threading; using System.Threading.Tasks; using ArchestrA.MxAccess; @@ -20,6 +21,7 @@ public sealed class MxAccessClient : IDisposable private readonly StaPump _pump; private readonly IMxProxy _proxy; private readonly string _clientName; + private readonly MxAccessClientOptions _options; // Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read). private readonly ConcurrentDictionary _addressToHandle = new(StringComparer.OrdinalIgnoreCase); @@ -30,39 +32,148 @@ public sealed class MxAccessClient : IDisposable private int _connectionHandle; private bool _connected; + private DateTime _lastObservedActivityUtc = DateTime.UtcNow; + private CancellationTokenSource? _monitorCts; + private int _reconnectCount; + private bool _disposed; - public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName) + /// Fires whenever the connection transitions Connected ↔ Disconnected. + public event EventHandler? ConnectionStateChanged; + + public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null) { _pump = pump; _proxy = proxy; _clientName = clientName; + _options = options ?? new MxAccessClientOptions(); _proxy.OnDataChange += OnDataChange; _proxy.OnWriteComplete += OnWriteComplete; } public bool IsConnected => _connected; public int SubscriptionCount => _subscriptions.Count; + public int ReconnectCount => _reconnectCount; - /// Connects on the STA thread. Idempotent. - public Task ConnectAsync() => _pump.InvokeAsync(() => + /// Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call. + public async Task ConnectAsync() { - if (_connected) return _connectionHandle; - _connectionHandle = _proxy.Register(_clientName); - _connected = true; - return _connectionHandle; - }); - - public Task DisconnectAsync() => _pump.InvokeAsync(() => - { - if (!_connected) return; - try { _proxy.Unregister(_connectionHandle); } - finally + var handle = await _pump.InvokeAsync(() => { - _connected = false; - _addressToHandle.Clear(); - _handleToAddress.Clear(); + if (_connected) return _connectionHandle; + _connectionHandle = _proxy.Register(_clientName); + _connected = true; + return _connectionHandle; + }); + + ConnectionStateChanged?.Invoke(this, true); + + if (_options.AutoReconnect && _monitorCts is null) + { + _monitorCts = new CancellationTokenSource(); + _ = Task.Run(() => MonitorLoopAsync(_monitorCts.Token)); } - }); + + return handle; + } + + public async Task DisconnectAsync() + { + _monitorCts?.Cancel(); + _monitorCts = null; + + await _pump.InvokeAsync(() => + { + if (!_connected) return; + try { _proxy.Unregister(_connectionHandle); } + finally + { + _connected = false; + _addressToHandle.Clear(); + _handleToAddress.Clear(); + } + }); + + ConnectionStateChanged?.Invoke(this, false); + } + + /// + /// Background loop that watches for connection liveness signals and triggers + /// reconnect-with-replay when the connection appears dead. Per Phase 2 high finding #2: + /// v1's MxAccessClient.Monitor pattern lifted into the new pump-based client. Uses + /// observed-activity timestamp + optional probe-tag subscription. Without an explicit + /// probe tag, falls back to "no data change in N seconds + no successful read in N + /// seconds = unhealthy" — same shape as v1. + /// + private async Task MonitorLoopAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try { await Task.Delay(_options.MonitorInterval, ct); } + catch (OperationCanceledException) { break; } + + if (!_connected || _disposed) continue; + + var idle = DateTime.UtcNow - _lastObservedActivityUtc; + if (idle <= _options.StaleThreshold) continue; + + // Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's + // our reconnect signal. + bool probeOk; + try + { + probeOk = await _pump.InvokeAsync(() => + { + // AddItem on the connection handle is cheap and round-trips through COM. + // We use a sentinel "$Heartbeat" reference; if it fails the connection is gone. + try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; } + catch { return false; } + }); + } + catch { probeOk = false; } + + if (probeOk) + { + _lastObservedActivityUtc = DateTime.UtcNow; + continue; + } + + // Connection appears dead — reconnect-with-replay. + try + { + await _pump.InvokeAsync(() => + { + try { _proxy.Unregister(_connectionHandle); } catch { /* dead anyway */ } + _connected = false; + }); + ConnectionStateChanged?.Invoke(this, false); + + await _pump.InvokeAsync(() => + { + _connectionHandle = _proxy.Register(_clientName); + _connected = true; + }); + _reconnectCount++; + ConnectionStateChanged?.Invoke(this, true); + + // Replay every subscription that was active before the disconnect. + var snapshot = _addressToHandle.Keys.ToArray(); + _addressToHandle.Clear(); + _handleToAddress.Clear(); + foreach (var fullRef in snapshot) + { + try { await SubscribeOnPumpAsync(fullRef); } + catch { /* skip — operator can re-subscribe */ } + } + + _lastObservedActivityUtc = DateTime.UtcNow; + } + catch + { + // Reconnect failed; back off and retry on the next tick. + _connected = false; + } + } + } /// /// One-shot read implemented as a transient subscribe + unsubscribe. @@ -79,26 +190,72 @@ public sealed class MxAccessClient : IDisposable // Stash the one-shot handler before sending the subscribe, then remove it after firing. _subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot)); + var addedToReadOnlyAttribute = !_addressToHandle.ContainsKey(fullReference); - var itemHandle = await SubscribeOnPumpAsync(fullReference); + try + { + await SubscribeOnPumpAsync(fullReference); - using var _ = ct.Register(() => tcs.TrySetCanceled()); - var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct)); - if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}"); + using var _ = ct.Register(() => tcs.TrySetCanceled()); + var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct)); + if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}"); - // Detach the one-shot handler. - _subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot)); - - return await tcs.Task; + return await tcs.Task; + } + finally + { + // High 1 — always detach the one-shot handler, even on cancellation/timeout/throw. + // If we were the one who added the underlying MXAccess subscription (no other + // caller had it), tear it down too so we don't leak a probe item handle. + _subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot)); + if (addedToReadOnlyAttribute) + { + try { await UnsubscribeAsync(fullReference); } + catch { /* shutdown-best-effort */ } + } + } } - public Task WriteAsync(string fullReference, object value, int securityClassification = 0) => - _pump.InvokeAsync(() => + /// + /// Writes to the runtime and AWAITS the OnWriteComplete + /// callback so the caller learns the actual write status. Per Phase 2 medium finding #4 + /// in exit-gate-phase-2.md: the previous fire-and-forget version returned a + /// false-positive Good even when the runtime rejected the write post-callback. + /// + public async Task WriteAsync(string fullReference, object value, + int securityClassification = 0, TimeSpan? timeout = null) + { + if (!_connected) throw new InvalidOperationException("MxAccessClient not connected"); + var actualTimeout = timeout ?? TimeSpan.FromSeconds(5); + + var itemHandle = await _pump.InvokeAsync(() => ResolveItem(fullReference)); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + if (!_pendingWrites.TryAdd(itemHandle, tcs)) { - if (!_connected) throw new InvalidOperationException("MxAccessClient not connected"); - var itemHandle = ResolveItem(fullReference); - _proxy.Write(_connectionHandle, itemHandle, value, securityClassification); - }); + // A prior write to the same item handle is still pending — uncommon but possible + // if the caller spammed writes. Replace it: the older TCS observes a Cancelled task. + if (_pendingWrites.TryRemove(itemHandle, out var prior)) + prior.TrySetCanceled(); + _pendingWrites[itemHandle] = tcs; + } + + try + { + await _pump.InvokeAsync(() => + _proxy.Write(_connectionHandle, itemHandle, value, securityClassification)); + + var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(actualTimeout)); + if (raceTask != tcs.Task) + throw new TimeoutException($"MXAccess write of {fullReference} timed out after {actualTimeout}"); + + return await tcs.Task; + } + finally + { + _pendingWrites.TryRemove(itemHandle, out _); + } + } public async Task SubscribeAsync(string fullReference, Action callback) { @@ -148,6 +305,9 @@ public sealed class MxAccessClient : IDisposable { if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return; + // Liveness: any data-change event is proof the connection is alive. + _lastObservedActivityUtc = DateTime.UtcNow; + var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow; var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality)); var vtq = new Vtq(pvItemValue, ts, quality); @@ -169,10 +329,30 @@ public sealed class MxAccessClient : IDisposable public void Dispose() { + _disposed = true; + _monitorCts?.Cancel(); + try { DisconnectAsync().GetAwaiter().GetResult(); } catch { /* swallow */ } _proxy.OnDataChange -= OnDataChange; _proxy.OnWriteComplete -= OnWriteComplete; + _monitorCts?.Dispose(); } } + +/// +/// Tunables for 's reconnect monitor. Defaults match the v1 +/// monitor's polling cadence so behavior is consistent across the lift. +/// +public sealed class MxAccessClientOptions +{ + /// Whether to start the background monitor at connect time. + public bool AutoReconnect { get; init; } = true; + + /// How often the monitor wakes up to check liveness. + public TimeSpan MonitorInterval { get; init; } = TimeSpan.FromSeconds(5); + + /// If no data-change activity in this window, the monitor probes the connection. + public TimeSpan StaleThreshold { get; init; } = TimeSpan.FromSeconds(60); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs index af9851f..0134451 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs @@ -27,6 +27,15 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend // Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them. private readonly System.Collections.Concurrent.ConcurrentDictionary> _subs = new(); + // Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many). + private readonly System.Collections.Concurrent.ConcurrentDictionary> + _refToSubs = new(System.StringComparer.OrdinalIgnoreCase); + + public event System.EventHandler? OnDataChange; +#pragma warning disable CS0067 // event not yet raised — alarm + host-status wire-up in PR #4 follow-up + public event System.EventHandler? OnAlarmEvent; + public event System.EventHandler? OnHostStatusChanged; +#pragma warning restore CS0067 public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx) { @@ -120,8 +129,13 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend ? null : MessagePackSerializer.Deserialize(w.ValueBytes); - await _mx.WriteAsync(w.TagReference, value!); - results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0 }); + var ok = await _mx.WriteAsync(w.TagReference, value!); + results.Add(new WriteValueResult + { + TagReference = w.TagReference, + StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError + Error = ok ? null : "MXAccess runtime reported write failure", + }); } catch (Exception ex) { @@ -137,12 +151,16 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend try { - // For each requested tag, register a subscription that publishes back via the - // shared MXAccess data-change handler. The OnDataChange push frame to the Proxy - // is wired in the upcoming subscription-push pass; for now the value is captured - // for the first ReadAsync to hit it (so the subscribe surface itself is functional). foreach (var tag in req.TagReferences) - await _mx.SubscribeAsync(tag, (_, __) => { /* push-frame plumbing in next iteration */ }); + { + _refToSubs.AddOrUpdate(tag, + _ => new System.Collections.Concurrent.ConcurrentBag { sid }, + (_, bag) => { bag.Add(sid); return bag; }); + + // The MXAccess SubscribeAsync only takes one callback per tag; the same callback + // fires for every active subscription of that tag — we fan out by SubscriptionId. + await _mx.SubscribeAsync(tag, OnTagValueChanged); + } _subs[sid] = req.TagReferences; return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs }; @@ -157,7 +175,48 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend { if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return; foreach (var r in refs) - await _mx.UnsubscribeAsync(r); + { + // Drop this subscription from the reverse map; only unsubscribe from MXAccess if no + // other subscription is still listening (multiple Proxy subs may share a tag). + _refToSubs.TryGetValue(r, out var bag); + if (bag is not null) + { + var remaining = new System.Collections.Concurrent.ConcurrentBag( + bag.Where(id => id != req.SubscriptionId)); + if (remaining.IsEmpty) + { + _refToSubs.TryRemove(r, out _); + await _mx.UnsubscribeAsync(r); + } + else + { + _refToSubs[r] = remaining; + } + } + } + } + + /// + /// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in + /// a and raises once per + /// subscription that includes this tag — the IPC sink translates that into outbound + /// OnDataChangeNotification frames. + /// + private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq) + { + if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return; + + var wireValue = ToWire(fullReference, vtq); + // Emit one notification per active SubscriptionId for this tag — the Proxy fans out to + // each ISubscribable consumer based on the SubscriptionId in the payload. + foreach (var sid in bag.Distinct()) + { + OnDataChange?.Invoke(this, new OnDataChangeNotification + { + SubscriptionId = sid, + Values = new[] { wireValue }, + }); + } } public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask; diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs index 2848baf..bff89fe 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs @@ -15,6 +15,13 @@ public sealed class StubGalaxyBackend : IGalaxyBackend private long _nextSessionId; private long _nextSubscriptionId; + // Stub backend never raises events — implements the interface members for symmetry. +#pragma warning disable CS0067 + public event System.EventHandler? OnDataChange; + public event System.EventHandler? OnAlarmEvent; + public event System.EventHandler? OnHostStatusChanged; +#pragma warning restore CS0067 + public Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) { var id = Interlocked.Increment(ref _nextSessionId); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs index ad7a58c..a406c04 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs @@ -99,9 +99,64 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) : } } + /// + /// Subscribes the backend's server-pushed events for the lifetime of the connection. + /// The returned disposable unsubscribes when the connection closes — without it the + /// backend's static event invocation list would accumulate dead writer references and + /// leak memory + raise on every push. + /// + public IDisposable AttachConnection(FrameWriter writer) + { + var sink = new ConnectionSink(backend, writer, logger); + sink.Attach(); + return sink; + } + private static T Deserialize(byte[] body) => MessagePackSerializer.Deserialize(body); private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct) => writer.WriteAsync(MessageKind.ErrorResponse, new ErrorResponse { Code = code, Message = message }, ct); + + private sealed class ConnectionSink : IDisposable + { + private readonly IGalaxyBackend _backend; + private readonly FrameWriter _writer; + private readonly ILogger _logger; + private EventHandler? _onData; + private EventHandler? _onAlarm; + private EventHandler? _onHost; + + public ConnectionSink(IGalaxyBackend backend, FrameWriter writer, ILogger logger) + { + _backend = backend; _writer = writer; _logger = logger; + } + + public void Attach() + { + _onData = (_, e) => Push(MessageKind.OnDataChangeNotification, e); + _onAlarm = (_, e) => Push(MessageKind.AlarmEvent, e); + _onHost = (_, e) => Push(MessageKind.RuntimeStatusChange, + new RuntimeStatusChangeNotification { Status = e }); + _backend.OnDataChange += _onData; + _backend.OnAlarmEvent += _onAlarm; + _backend.OnHostStatusChanged += _onHost; + } + + private void Push(MessageKind kind, T payload) + { + // Fire-and-forget — pushes can race with disposal of the writer. We swallow + // ObjectDisposedException because the dispose path will detach this sink shortly. + try { _writer.WriteAsync(kind, payload, CancellationToken.None).GetAwaiter().GetResult(); } + catch (ObjectDisposedException) { } + catch (Exception ex) { _logger.Warning(ex, "ConnectionSink push failed for {Kind}", kind); } + } + + public void Dispose() + { + if (_onData is not null) _backend.OnDataChange -= _onData; + if (_onAlarm is not null) _backend.OnAlarmEvent -= _onAlarm; + if (_onHost is not null) _backend.OnHostStatusChanged -= _onHost; + } + } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/PipeServer.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/PipeServer.cs index b9b281b..32651e0 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/PipeServer.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/PipeServer.cs @@ -98,6 +98,8 @@ public sealed class PipeServer : IDisposable new HelloAck { Accepted = true, HostName = Environment.MachineName }, linked.Token).ConfigureAwait(false); + using var attachment = handler.AttachConnection(writer); + while (!linked.Token.IsCancellationRequested) { var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false); @@ -157,4 +159,19 @@ public sealed class PipeServer : IDisposable public interface IFrameHandler { Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct); + + /// + /// Called once per accepted connection after the Hello handshake. Lets the handler + /// attach server-pushed event sinks (data-change, alarm, host-status) to the + /// connection's . Returns an the + /// pipe server disposes when the connection closes — backends use it to unsubscribe. + /// Implementations that don't push events can return . + /// + IDisposable AttachConnection(FrameWriter writer); + + public sealed class NoopAttachment : IDisposable + { + public static readonly NoopAttachment Instance = new(); + public void Dispose() { } + } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/StubFrameHandler.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/StubFrameHandler.cs index 0ba149b..fcbf15e 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/StubFrameHandler.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/StubFrameHandler.cs @@ -1,3 +1,4 @@ +using System; using System.Threading; using System.Threading.Tasks; using MessagePack; @@ -27,4 +28,6 @@ public sealed class StubFrameHandler : IFrameHandler new ErrorResponse { Code = "not-implemented", Message = $"Kind {kind} is stubbed — MXAccess lift deferred" }, ct); } + + public IDisposable AttachConnection(FrameWriter writer) => IFrameHandler.NoopAttachment.Instance; }