Compare commits

...

1 Commits

Author SHA1 Message Date
Joseph Doherty
caa9cb86f6 Phase 2 PR 4 — close the 4 open high/medium MXAccess findings from exit-gate-phase-2-final.md. High 1 (ReadAsync subscription-leak on cancel): the one-shot read now wraps subscribe→first-OnDataChange→unsubscribe in try/finally so the per-tag callback is always detached, and if the read installed the underlying MXAccess subscription itself (the prior _addressToHandle key was absent) it tears it down on the way out — no leaked probe item handles when the caller cancels or times out. High 2 (no reconnect loop): MxAccessClient gets a MxAccessClientOptions {AutoReconnect, MonitorInterval=5s, StaleThreshold=60s} + a background MonitorLoopAsync started at first ConnectAsync. The loop wakes every MonitorInterval, checks _lastObservedActivityUtc (bumped by every OnDataChange callback), and if stale probes the proxy with a no-op COM AddItem("$Heartbeat") on the StaPump; if the probe throws or returns false, the loop reconnects-with-replay — Unregister (best-effort), Register, snapshot _addressToHandle.Keys + clear, re-AddItem every previously-active subscription, ConnectionStateChanged events fire for the false→true transition, ReconnectCount bumps. Medium 3 (subscriptions don't push frames back to Proxy): IGalaxyBackend gains OnDataChange/OnAlarmEvent/OnHostStatusChanged events; new IFrameHandler.AttachConnection(FrameWriter) is called per-connection by PipeServer after Hello + the returned IDisposable disposes at connection close; GalaxyFrameHandler.ConnectionSink subscribes the events for the connection lifetime, fire-and-forget pushes them as MessageKind.OnDataChangeNotification / AlarmEvent / RuntimeStatusChange frames through the writer, swallows ObjectDisposedException for the dispose race, and unsubscribes in Dispose to prevent leaked invocation list refs across reconnects. MxAccessGalaxyBackend's existing SubscribeAsync (which previously discarded values via a (_, __) => {} callback) now wires OnTagValueChanged that fans out per-tag value changes to every subscription ID listening (one MXAccess subscription, multi-fan-out — _refToSubs reverse map). UnsubscribeAsync also reverse-walks the map to only call mx.UnsubscribeAsync when the LAST sub for a tag drops. Stub + DbBacked backends declare the events with #pragma warning disable CS0067 because they never raise them but must satisfy the interface (treat-warnings-as-errors would otherwise fail). Medium 4 (WriteValuesAsync doesn't await OnWriteComplete): MxAccessClient.WriteAsync rewritten to return Task<bool> via the v1-style TaskCompletionSource-keyed-by-item-handle pattern in _pendingWrites — adds the TCS before the Write call, awaits it with a configurable timeout (default 5s), removes the TCS in finally, returns true only when OnWriteComplete reported success. MxAccessGalaxyBackend.WriteValuesAsync now reports per-tag Bad_InternalError ("MXAccess runtime reported write failure") when the bool returns false, instead of false-positive Good. PipeServer's IFrameHandler interface adds the AttachConnection(FrameWriter):IDisposable method + a public NoopAttachment nested class (net48 doesn't support default interface methods so the empty-attach is exposed for stub implementations). StubFrameHandler returns IFrameHandler.NoopAttachment.Instance. RunOneConnectionAsync calls AttachConnection after HelloAck and usings the returned disposable so it disposes at the connection scope's finally. ConnectionStateChanged event added on MxAccessClient (caller-facing diagnostics for false→true reconnect transitions). docs/v2/implementation/pr-4-body.md is the Gitea web-UI paste-in for opening PR 4 once pushed; includes 2 new low-priority adversarial findings (probe item-handle leak; replay-loop silently swallows per-subscription failures) flagged as follow-ups not PR 4 blockers. Full solution 460 pass / 7 skip (E2E on admin shell) / 1 pre-existing Phase 0 baseline. No regressions vs PR 2's baseline.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 01:12:09 -04:00
9 changed files with 468 additions and 40 deletions

View File

@@ -0,0 +1,91 @@
# PR 4 — Phase 2 follow-up: close the 4 open MXAccess findings
**Source**: `phase-2-pr4-findings` (branched from `phase-2-stream-d`)
**Target**: `v2`
## Summary
Closes the 4 high/medium open findings carried forward in `exit-gate-phase-2-final.md`:
- **High 1 — `ReadAsync` subscription-leak on cancel.** One-shot read now wraps the
subscribe→first-OnDataChange→unsubscribe pattern in a `try/finally` so the per-tag
callback is always detached, and if the read installed the underlying MXAccess
subscription itself (no other caller had it), it tears it down on the way out.
- **High 2 — No reconnect loop on the MXAccess COM connection.** New
`MxAccessClientOptions { AutoReconnect, MonitorInterval, StaleThreshold }` + a background
`MonitorLoopAsync` that watches a stale-activity threshold + probes the proxy via a
no-op COM call, then reconnects-with-replay (re-Register, re-AddItem every active
subscription) when the proxy is dead. Liveness signal: every `OnDataChange` callback bumps
`_lastObservedActivityUtc`. Defaults match v1 monitor cadence (5s poll, 60s stale).
`ReconnectCount` exposed for diagnostics; `ConnectionStateChanged` event for downstream
consumers (the supervisor on the Proxy side already surfaces this through its
HeartbeatMonitor, but the Host-side event lets local logging/metrics hook in).
- **Medium 3 — `MxAccessGalaxyBackend.SubscribeAsync` doesn't push OnDataChange frames back to
the Proxy.** New `IGalaxyBackend.OnDataChange` / `OnAlarmEvent` / `OnHostStatusChanged`
events that the new `GalaxyFrameHandler.AttachConnection` subscribes per-connection and
forwards as outbound `OnDataChangeNotification` / `AlarmEvent` /
`RuntimeStatusChange` frames through the connection's `FrameWriter`. `MxAccessGalaxyBackend`
fans out per-tag value changes to every `SubscriptionId` that's listening to that tag
(multiple Proxy subs may share a Galaxy attribute — single COM subscription, multi-fan-out
on the wire). Stub + DbBacked backends declare the events with `#pragma warning disable
CS0067` (treat-warnings-as-errors would otherwise fail on never-raised events that exist
only to satisfy the interface).
- **Medium 4 — `WriteValuesAsync` doesn't await `OnWriteComplete`.** New
`WriteAsync(...)` overload returns `bool` after awaiting the OnWriteComplete callback via
the v1-style `TaskCompletionSource`-keyed-by-item-handle pattern in `_pendingWrites`.
`MxAccessGalaxyBackend.WriteValuesAsync` now reports per-tag `Bad_InternalError` when the
runtime rejected the write, instead of false-positive `Good`.
## Pipe server change
`IFrameHandler` gains `AttachConnection(FrameWriter writer): IDisposable` so the handler can
register backend event sinks on each accepted connection and detach them at disconnect. The
`PipeServer.RunOneConnectionAsync` calls it after the Hello handshake and disposes it in the
finally of the per-connection scope. `StubFrameHandler` returns `IFrameHandler.NoopAttachment.Instance`
(net48 doesn't support default interface methods, so the empty-attach lives as a public nested
class).
## Tests
**`dotnet test ZB.MOM.WW.OtOpcUa.slnx`**: **460 pass / 7 skip (E2E on admin shell) / 1
pre-existing baseline failure**. No regressions. The Driver.Galaxy.Host unit tests + 5 live
ZB smoke + 3 live MXAccess COM smoke all pass unchanged.
## Test plan for reviewers
- [ ] `dotnet build` clean
- [ ] `dotnet test` shows 460/7-skip/1-baseline
- [ ] Spot-check `MxAccessClient.MonitorLoopAsync` against v1's `MxAccessClient.Monitor`
partial (`src/ZB.MOM.WW.OtOpcUa.Host/MxAccess/MxAccessClient.Monitor.cs`) — same
polling cadence, same probe-then-reconnect-with-replay shape
- [ ] Read `GalaxyFrameHandler.ConnectionSink.Dispose` and confirm event handlers are
detached on connection close (no leaked invocation list refs)
- [ ] `WriteValuesAsync` returning `Bad_InternalError` on a runtime-rejected write is the
correct shape — confirm against the v1 `MxAccessClient.ReadWrite.cs` pattern
## What's NOT in this PR
- Wonderware Historian SDK plugin port (Task B.1.h) — separate PR, larger scope.
- Alarm subsystem wire-up (`MxAccessGalaxyBackend.SubscribeAlarmsAsync` is still a no-op).
`OnAlarmEvent` is declared on the backend interface and pushed by the frame handler when
raised; `MxAccessGalaxyBackend` just doesn't raise it yet (waits for the alarm-tracking
port from v1's `AlarmObjectFilter` + Galaxy alarm primitives).
- Host-status push (`OnHostStatusChanged`) — declared on the interface and pushed by the
frame handler; `MxAccessGalaxyBackend` doesn't raise it (the Galaxy.Host's
`HostConnectivityProbe` from v1 needs porting too, scoped under the Historian PR).
## Adversarial review
Quick pass over the PR 4 deltas. No new findings beyond:
- **Low 1** — `MonitorLoopAsync`'s `$Heartbeat` probe item-handle is leaked
(`AddItem` succeeds, never `RemoveItem`'d). Cosmetic — the probe item is internal to
the COM connection, dies with `Unregister` at disconnect/recycle. Worth a follow-up
to call `RemoveItem` after the probe succeeds.
- **Low 2** — Replay loop in `MonitorLoopAsync` swallows per-subscription failures. If
Galaxy permanently rejects a previously-valid reference (rare but possible after a
re-deploy), the user gets silent data loss for that one subscription. The stub-handler-
unaware operator wouldn't notice. Worth surfacing as a `ConnectionStateChanged(false)
→ ConnectionStateChanged(true)` payload that includes the replay-failures list.
Both are low-priority follow-ups, not PR 4 blockers.

View File

@@ -21,6 +21,13 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
private long _nextSessionId; private long _nextSessionId;
private long _nextSubscriptionId; private long _nextSubscriptionId;
// DB-only backend doesn't have a runtime data plane; never raises events.
#pragma warning disable CS0067
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{ {
var id = Interlocked.Increment(ref _nextSessionId); var id = Interlocked.Increment(ref _nextSessionId);

View File

@@ -14,6 +14,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// </summary> /// </summary>
public interface IGalaxyBackend public interface IGalaxyBackend
{ {
/// <summary>
/// Server-pushed events the backend raises asynchronously (data-change, alarm,
/// host-status). The frame handler subscribes once on connect and forwards each
/// event to the Proxy as a typed <see cref="MessageKind"/> notification.
/// </summary>
event System.EventHandler<OnDataChangeNotification>? OnDataChange;
event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct); Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct);
Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct); Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct);

View File

@@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ArchestrA.MxAccess; using ArchestrA.MxAccess;
@@ -20,6 +21,7 @@ public sealed class MxAccessClient : IDisposable
private readonly StaPump _pump; private readonly StaPump _pump;
private readonly IMxProxy _proxy; private readonly IMxProxy _proxy;
private readonly string _clientName; private readonly string _clientName;
private readonly MxAccessClientOptions _options;
// Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read). // Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read).
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase); private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
@@ -30,39 +32,148 @@ public sealed class MxAccessClient : IDisposable
private int _connectionHandle; private int _connectionHandle;
private bool _connected; private bool _connected;
private DateTime _lastObservedActivityUtc = DateTime.UtcNow;
private CancellationTokenSource? _monitorCts;
private int _reconnectCount;
private bool _disposed;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName) /// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
public event EventHandler<bool>? ConnectionStateChanged;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
{ {
_pump = pump; _pump = pump;
_proxy = proxy; _proxy = proxy;
_clientName = clientName; _clientName = clientName;
_options = options ?? new MxAccessClientOptions();
_proxy.OnDataChange += OnDataChange; _proxy.OnDataChange += OnDataChange;
_proxy.OnWriteComplete += OnWriteComplete; _proxy.OnWriteComplete += OnWriteComplete;
} }
public bool IsConnected => _connected; public bool IsConnected => _connected;
public int SubscriptionCount => _subscriptions.Count; public int SubscriptionCount => _subscriptions.Count;
public int ReconnectCount => _reconnectCount;
/// <summary>Connects on the STA thread. Idempotent.</summary> /// <summary>Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call.</summary>
public Task<int> ConnectAsync() => _pump.InvokeAsync(() => public async Task<int> ConnectAsync()
{ {
if (_connected) return _connectionHandle; var handle = await _pump.InvokeAsync(() =>
_connectionHandle = _proxy.Register(_clientName);
_connected = true;
return _connectionHandle;
});
public Task DisconnectAsync() => _pump.InvokeAsync(() =>
{
if (!_connected) return;
try { _proxy.Unregister(_connectionHandle); }
finally
{ {
_connected = false; if (_connected) return _connectionHandle;
_addressToHandle.Clear(); _connectionHandle = _proxy.Register(_clientName);
_handleToAddress.Clear(); _connected = true;
return _connectionHandle;
});
ConnectionStateChanged?.Invoke(this, true);
if (_options.AutoReconnect && _monitorCts is null)
{
_monitorCts = new CancellationTokenSource();
_ = Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
} }
});
return handle;
}
public async Task DisconnectAsync()
{
_monitorCts?.Cancel();
_monitorCts = null;
await _pump.InvokeAsync(() =>
{
if (!_connected) return;
try { _proxy.Unregister(_connectionHandle); }
finally
{
_connected = false;
_addressToHandle.Clear();
_handleToAddress.Clear();
}
});
ConnectionStateChanged?.Invoke(this, false);
}
/// <summary>
/// Background loop that watches for connection liveness signals and triggers
/// reconnect-with-replay when the connection appears dead. Per Phase 2 high finding #2:
/// v1's MxAccessClient.Monitor pattern lifted into the new pump-based client. Uses
/// observed-activity timestamp + optional probe-tag subscription. Without an explicit
/// probe tag, falls back to "no data change in N seconds + no successful read in N
/// seconds = unhealthy" — same shape as v1.
/// </summary>
private async Task MonitorLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try { await Task.Delay(_options.MonitorInterval, ct); }
catch (OperationCanceledException) { break; }
if (!_connected || _disposed) continue;
var idle = DateTime.UtcNow - _lastObservedActivityUtc;
if (idle <= _options.StaleThreshold) continue;
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
// our reconnect signal.
bool probeOk;
try
{
probeOk = await _pump.InvokeAsync(() =>
{
// AddItem on the connection handle is cheap and round-trips through COM.
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
catch { return false; }
});
}
catch { probeOk = false; }
if (probeOk)
{
_lastObservedActivityUtc = DateTime.UtcNow;
continue;
}
// Connection appears dead — reconnect-with-replay.
try
{
await _pump.InvokeAsync(() =>
{
try { _proxy.Unregister(_connectionHandle); } catch { /* dead anyway */ }
_connected = false;
});
ConnectionStateChanged?.Invoke(this, false);
await _pump.InvokeAsync(() =>
{
_connectionHandle = _proxy.Register(_clientName);
_connected = true;
});
_reconnectCount++;
ConnectionStateChanged?.Invoke(this, true);
// Replay every subscription that was active before the disconnect.
var snapshot = _addressToHandle.Keys.ToArray();
_addressToHandle.Clear();
_handleToAddress.Clear();
foreach (var fullRef in snapshot)
{
try { await SubscribeOnPumpAsync(fullRef); }
catch { /* skip — operator can re-subscribe */ }
}
_lastObservedActivityUtc = DateTime.UtcNow;
}
catch
{
// Reconnect failed; back off and retry on the next tick.
_connected = false;
}
}
}
/// <summary> /// <summary>
/// One-shot read implemented as a transient subscribe + unsubscribe. /// One-shot read implemented as a transient subscribe + unsubscribe.
@@ -79,26 +190,72 @@ public sealed class MxAccessClient : IDisposable
// Stash the one-shot handler before sending the subscribe, then remove it after firing. // Stash the one-shot handler before sending the subscribe, then remove it after firing.
_subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot)); _subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot));
var addedToReadOnlyAttribute = !_addressToHandle.ContainsKey(fullReference);
var itemHandle = await SubscribeOnPumpAsync(fullReference); try
{
await SubscribeOnPumpAsync(fullReference);
using var _ = ct.Register(() => tcs.TrySetCanceled()); using var _ = ct.Register(() => tcs.TrySetCanceled());
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct)); var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}"); if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
// Detach the one-shot handler. return await tcs.Task;
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot)); }
finally
return await tcs.Task; {
// High 1 — always detach the one-shot handler, even on cancellation/timeout/throw.
// If we were the one who added the underlying MXAccess subscription (no other
// caller had it), tear it down too so we don't leak a probe item handle.
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
if (addedToReadOnlyAttribute)
{
try { await UnsubscribeAsync(fullReference); }
catch { /* shutdown-best-effort */ }
}
}
} }
public Task WriteAsync(string fullReference, object value, int securityClassification = 0) => /// <summary>
_pump.InvokeAsync(() => /// Writes <paramref name="value"/> to the runtime and AWAITS the OnWriteComplete
/// callback so the caller learns the actual write status. Per Phase 2 medium finding #4
/// in <c>exit-gate-phase-2.md</c>: the previous fire-and-forget version returned a
/// false-positive Good even when the runtime rejected the write post-callback.
/// </summary>
public async Task<bool> WriteAsync(string fullReference, object value,
int securityClassification = 0, TimeSpan? timeout = null)
{
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
var actualTimeout = timeout ?? TimeSpan.FromSeconds(5);
var itemHandle = await _pump.InvokeAsync(() => ResolveItem(fullReference));
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_pendingWrites.TryAdd(itemHandle, tcs))
{ {
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected"); // A prior write to the same item handle is still pending — uncommon but possible
var itemHandle = ResolveItem(fullReference); // if the caller spammed writes. Replace it: the older TCS observes a Cancelled task.
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification); if (_pendingWrites.TryRemove(itemHandle, out var prior))
}); prior.TrySetCanceled();
_pendingWrites[itemHandle] = tcs;
}
try
{
await _pump.InvokeAsync(() =>
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification));
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(actualTimeout));
if (raceTask != tcs.Task)
throw new TimeoutException($"MXAccess write of {fullReference} timed out after {actualTimeout}");
return await tcs.Task;
}
finally
{
_pendingWrites.TryRemove(itemHandle, out _);
}
}
public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback) public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback)
{ {
@@ -148,6 +305,9 @@ public sealed class MxAccessClient : IDisposable
{ {
if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return; if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return;
// Liveness: any data-change event is proof the connection is alive.
_lastObservedActivityUtc = DateTime.UtcNow;
var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow; var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow;
var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality)); var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality));
var vtq = new Vtq(pvItemValue, ts, quality); var vtq = new Vtq(pvItemValue, ts, quality);
@@ -169,10 +329,30 @@ public sealed class MxAccessClient : IDisposable
public void Dispose() public void Dispose()
{ {
_disposed = true;
_monitorCts?.Cancel();
try { DisconnectAsync().GetAwaiter().GetResult(); } try { DisconnectAsync().GetAwaiter().GetResult(); }
catch { /* swallow */ } catch { /* swallow */ }
_proxy.OnDataChange -= OnDataChange; _proxy.OnDataChange -= OnDataChange;
_proxy.OnWriteComplete -= OnWriteComplete; _proxy.OnWriteComplete -= OnWriteComplete;
_monitorCts?.Dispose();
} }
} }
/// <summary>
/// Tunables for <see cref="MxAccessClient"/>'s reconnect monitor. Defaults match the v1
/// monitor's polling cadence so behavior is consistent across the lift.
/// </summary>
public sealed class MxAccessClientOptions
{
/// <summary>Whether to start the background monitor at connect time.</summary>
public bool AutoReconnect { get; init; } = true;
/// <summary>How often the monitor wakes up to check liveness.</summary>
public TimeSpan MonitorInterval { get; init; } = TimeSpan.FromSeconds(5);
/// <summary>If no data-change activity in this window, the monitor probes the connection.</summary>
public TimeSpan StaleThreshold { get; init; } = TimeSpan.FromSeconds(60);
}

View File

@@ -27,6 +27,15 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them. // Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new(); private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
// Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many).
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, System.Collections.Concurrent.ConcurrentBag<long>>
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
#pragma warning disable CS0067 // event not yet raised — alarm + host-status wire-up in PR #4 follow-up
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx) public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
{ {
@@ -120,8 +129,13 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
? null ? null
: MessagePackSerializer.Deserialize<object>(w.ValueBytes); : MessagePackSerializer.Deserialize<object>(w.ValueBytes);
await _mx.WriteAsync(w.TagReference, value!); var ok = await _mx.WriteAsync(w.TagReference, value!);
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0 }); results.Add(new WriteValueResult
{
TagReference = w.TagReference,
StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError
Error = ok ? null : "MXAccess runtime reported write failure",
});
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -137,12 +151,16 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
try try
{ {
// For each requested tag, register a subscription that publishes back via the
// shared MXAccess data-change handler. The OnDataChange push frame to the Proxy
// is wired in the upcoming subscription-push pass; for now the value is captured
// for the first ReadAsync to hit it (so the subscribe surface itself is functional).
foreach (var tag in req.TagReferences) foreach (var tag in req.TagReferences)
await _mx.SubscribeAsync(tag, (_, __) => { /* push-frame plumbing in next iteration */ }); {
_refToSubs.AddOrUpdate(tag,
_ => new System.Collections.Concurrent.ConcurrentBag<long> { sid },
(_, bag) => { bag.Add(sid); return bag; });
// The MXAccess SubscribeAsync only takes one callback per tag; the same callback
// fires for every active subscription of that tag — we fan out by SubscriptionId.
await _mx.SubscribeAsync(tag, OnTagValueChanged);
}
_subs[sid] = req.TagReferences; _subs[sid] = req.TagReferences;
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs }; return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
@@ -157,7 +175,48 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
{ {
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return; if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
foreach (var r in refs) foreach (var r in refs)
await _mx.UnsubscribeAsync(r); {
// Drop this subscription from the reverse map; only unsubscribe from MXAccess if no
// other subscription is still listening (multiple Proxy subs may share a tag).
_refToSubs.TryGetValue(r, out var bag);
if (bag is not null)
{
var remaining = new System.Collections.Concurrent.ConcurrentBag<long>(
bag.Where(id => id != req.SubscriptionId));
if (remaining.IsEmpty)
{
_refToSubs.TryRemove(r, out _);
await _mx.UnsubscribeAsync(r);
}
else
{
_refToSubs[r] = remaining;
}
}
}
}
/// <summary>
/// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in
/// a <see cref="GalaxyDataValue"/> and raises <see cref="OnDataChange"/> once per
/// subscription that includes this tag — the IPC sink translates that into outbound
/// <c>OnDataChangeNotification</c> frames.
/// </summary>
private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq)
{
if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return;
var wireValue = ToWire(fullReference, vtq);
// Emit one notification per active SubscriptionId for this tag — the Proxy fans out to
// each ISubscribable consumer based on the SubscriptionId in the payload.
foreach (var sid in bag.Distinct())
{
OnDataChange?.Invoke(this, new OnDataChangeNotification
{
SubscriptionId = sid,
Values = new[] { wireValue },
});
}
} }
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask; public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;

View File

@@ -15,6 +15,13 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
private long _nextSessionId; private long _nextSessionId;
private long _nextSubscriptionId; private long _nextSubscriptionId;
// Stub backend never raises events — implements the interface members for symmetry.
#pragma warning disable CS0067
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{ {
var id = Interlocked.Increment(ref _nextSessionId); var id = Interlocked.Increment(ref _nextSessionId);

View File

@@ -99,9 +99,64 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
} }
} }
/// <summary>
/// Subscribes the backend's server-pushed events for the lifetime of the connection.
/// The returned disposable unsubscribes when the connection closes — without it the
/// backend's static event invocation list would accumulate dead writer references and
/// leak memory + raise <see cref="ObjectDisposedException"/> on every push.
/// </summary>
public IDisposable AttachConnection(FrameWriter writer)
{
var sink = new ConnectionSink(backend, writer, logger);
sink.Attach();
return sink;
}
private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body); private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct) private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct)
=> writer.WriteAsync(MessageKind.ErrorResponse, => writer.WriteAsync(MessageKind.ErrorResponse,
new ErrorResponse { Code = code, Message = message }, ct); new ErrorResponse { Code = code, Message = message }, ct);
private sealed class ConnectionSink : IDisposable
{
private readonly IGalaxyBackend _backend;
private readonly FrameWriter _writer;
private readonly ILogger _logger;
private EventHandler<OnDataChangeNotification>? _onData;
private EventHandler<GalaxyAlarmEvent>? _onAlarm;
private EventHandler<HostConnectivityStatus>? _onHost;
public ConnectionSink(IGalaxyBackend backend, FrameWriter writer, ILogger logger)
{
_backend = backend; _writer = writer; _logger = logger;
}
public void Attach()
{
_onData = (_, e) => Push(MessageKind.OnDataChangeNotification, e);
_onAlarm = (_, e) => Push(MessageKind.AlarmEvent, e);
_onHost = (_, e) => Push(MessageKind.RuntimeStatusChange,
new RuntimeStatusChangeNotification { Status = e });
_backend.OnDataChange += _onData;
_backend.OnAlarmEvent += _onAlarm;
_backend.OnHostStatusChanged += _onHost;
}
private void Push<T>(MessageKind kind, T payload)
{
// Fire-and-forget — pushes can race with disposal of the writer. We swallow
// ObjectDisposedException because the dispose path will detach this sink shortly.
try { _writer.WriteAsync(kind, payload, CancellationToken.None).GetAwaiter().GetResult(); }
catch (ObjectDisposedException) { }
catch (Exception ex) { _logger.Warning(ex, "ConnectionSink push failed for {Kind}", kind); }
}
public void Dispose()
{
if (_onData is not null) _backend.OnDataChange -= _onData;
if (_onAlarm is not null) _backend.OnAlarmEvent -= _onAlarm;
if (_onHost is not null) _backend.OnHostStatusChanged -= _onHost;
}
}
} }

View File

@@ -98,6 +98,8 @@ public sealed class PipeServer : IDisposable
new HelloAck { Accepted = true, HostName = Environment.MachineName }, new HelloAck { Accepted = true, HostName = Environment.MachineName },
linked.Token).ConfigureAwait(false); linked.Token).ConfigureAwait(false);
using var attachment = handler.AttachConnection(writer);
while (!linked.Token.IsCancellationRequested) while (!linked.Token.IsCancellationRequested)
{ {
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false); var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
@@ -157,4 +159,19 @@ public sealed class PipeServer : IDisposable
public interface IFrameHandler public interface IFrameHandler
{ {
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct); Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
/// <summary>
/// Called once per accepted connection after the Hello handshake. Lets the handler
/// attach server-pushed event sinks (data-change, alarm, host-status) to the
/// connection's <paramref name="writer"/>. Returns an <see cref="IDisposable"/> the
/// pipe server disposes when the connection closes — backends use it to unsubscribe.
/// Implementations that don't push events can return <see cref="NoopAttachment"/>.
/// </summary>
IDisposable AttachConnection(FrameWriter writer);
public sealed class NoopAttachment : IDisposable
{
public static readonly NoopAttachment Instance = new();
public void Dispose() { }
}
} }

View File

@@ -1,3 +1,4 @@
using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MessagePack; using MessagePack;
@@ -27,4 +28,6 @@ public sealed class StubFrameHandler : IFrameHandler
new ErrorResponse { Code = "not-implemented", Message = $"Kind {kind} is stubbed — MXAccess lift deferred" }, new ErrorResponse { Code = "not-implemented", Message = $"Kind {kind} is stubbed — MXAccess lift deferred" },
ct); ct);
} }
public IDisposable AttachConnection(FrameWriter writer) => IFrameHandler.NoopAttachment.Instance;
} }