Phase 2 PR 4 — close 4 open MXAccess findings (push frames + reconnect + write-await + read-cancel) #3

Merged
dohertj2 merged 14 commits from phase-2-pr4-findings into v2 2026-04-18 06:57:22 -04:00
9 changed files with 468 additions and 40 deletions
Showing only changes of commit caa9cb86f6 - Show all commits

View File

@@ -0,0 +1,91 @@
# PR 4 — Phase 2 follow-up: close the 4 open MXAccess findings
**Source**: `phase-2-pr4-findings` (branched from `phase-2-stream-d`)
**Target**: `v2`
## Summary
Closes the 4 high/medium open findings carried forward in `exit-gate-phase-2-final.md`:
- **High 1 — `ReadAsync` subscription-leak on cancel.** One-shot read now wraps the
subscribe→first-OnDataChange→unsubscribe pattern in a `try/finally` so the per-tag
callback is always detached, and if the read installed the underlying MXAccess
subscription itself (no other caller had it), it tears it down on the way out.
- **High 2 — No reconnect loop on the MXAccess COM connection.** New
`MxAccessClientOptions { AutoReconnect, MonitorInterval, StaleThreshold }` + a background
`MonitorLoopAsync` that watches a stale-activity threshold + probes the proxy via a
no-op COM call, then reconnects-with-replay (re-Register, re-AddItem every active
subscription) when the proxy is dead. Liveness signal: every `OnDataChange` callback bumps
`_lastObservedActivityUtc`. Defaults match v1 monitor cadence (5s poll, 60s stale).
`ReconnectCount` exposed for diagnostics; `ConnectionStateChanged` event for downstream
consumers (the supervisor on the Proxy side already surfaces this through its
HeartbeatMonitor, but the Host-side event lets local logging/metrics hook in).
- **Medium 3 — `MxAccessGalaxyBackend.SubscribeAsync` doesn't push OnDataChange frames back to
the Proxy.** New `IGalaxyBackend.OnDataChange` / `OnAlarmEvent` / `OnHostStatusChanged`
events that the new `GalaxyFrameHandler.AttachConnection` subscribes per-connection and
forwards as outbound `OnDataChangeNotification` / `AlarmEvent` /
`RuntimeStatusChange` frames through the connection's `FrameWriter`. `MxAccessGalaxyBackend`
fans out per-tag value changes to every `SubscriptionId` that's listening to that tag
(multiple Proxy subs may share a Galaxy attribute — single COM subscription, multi-fan-out
on the wire). Stub + DbBacked backends declare the events with `#pragma warning disable
CS0067` (treat-warnings-as-errors would otherwise fail on never-raised events that exist
only to satisfy the interface).
- **Medium 4 — `WriteValuesAsync` doesn't await `OnWriteComplete`.** New
`WriteAsync(...)` overload returns `bool` after awaiting the OnWriteComplete callback via
the v1-style `TaskCompletionSource`-keyed-by-item-handle pattern in `_pendingWrites`.
`MxAccessGalaxyBackend.WriteValuesAsync` now reports per-tag `Bad_InternalError` when the
runtime rejected the write, instead of false-positive `Good`.
## Pipe server change
`IFrameHandler` gains `AttachConnection(FrameWriter writer): IDisposable` so the handler can
register backend event sinks on each accepted connection and detach them at disconnect. The
`PipeServer.RunOneConnectionAsync` calls it after the Hello handshake and disposes it in the
finally of the per-connection scope. `StubFrameHandler` returns `IFrameHandler.NoopAttachment.Instance`
(net48 doesn't support default interface methods, so the empty-attach lives as a public nested
class).
## Tests
**`dotnet test ZB.MOM.WW.OtOpcUa.slnx`**: **460 pass / 7 skip (E2E on admin shell) / 1
pre-existing baseline failure**. No regressions. The Driver.Galaxy.Host unit tests + 5 live
ZB smoke + 3 live MXAccess COM smoke all pass unchanged.
## Test plan for reviewers
- [ ] `dotnet build` clean
- [ ] `dotnet test` shows 460/7-skip/1-baseline
- [ ] Spot-check `MxAccessClient.MonitorLoopAsync` against v1's `MxAccessClient.Monitor`
partial (`src/ZB.MOM.WW.OtOpcUa.Host/MxAccess/MxAccessClient.Monitor.cs`) — same
polling cadence, same probe-then-reconnect-with-replay shape
- [ ] Read `GalaxyFrameHandler.ConnectionSink.Dispose` and confirm event handlers are
detached on connection close (no leaked invocation list refs)
- [ ] `WriteValuesAsync` returning `Bad_InternalError` on a runtime-rejected write is the
correct shape — confirm against the v1 `MxAccessClient.ReadWrite.cs` pattern
## What's NOT in this PR
- Wonderware Historian SDK plugin port (Task B.1.h) — separate PR, larger scope.
- Alarm subsystem wire-up (`MxAccessGalaxyBackend.SubscribeAlarmsAsync` is still a no-op).
`OnAlarmEvent` is declared on the backend interface and pushed by the frame handler when
raised; `MxAccessGalaxyBackend` just doesn't raise it yet (waits for the alarm-tracking
port from v1's `AlarmObjectFilter` + Galaxy alarm primitives).
- Host-status push (`OnHostStatusChanged`) — declared on the interface and pushed by the
frame handler; `MxAccessGalaxyBackend` doesn't raise it (the Galaxy.Host's
`HostConnectivityProbe` from v1 needs porting too, scoped under the Historian PR).
## Adversarial review
Quick pass over the PR 4 deltas. No new findings beyond:
- **Low 1** — `MonitorLoopAsync`'s `$Heartbeat` probe item-handle is leaked
(`AddItem` succeeds, never `RemoveItem`'d). Cosmetic — the probe item is internal to
the COM connection, dies with `Unregister` at disconnect/recycle. Worth a follow-up
to call `RemoveItem` after the probe succeeds.
- **Low 2** — Replay loop in `MonitorLoopAsync` swallows per-subscription failures. If
Galaxy permanently rejects a previously-valid reference (rare but possible after a
re-deploy), the user gets silent data loss for that one subscription. The stub-handler-
unaware operator wouldn't notice. Worth surfacing as a `ConnectionStateChanged(false)
→ ConnectionStateChanged(true)` payload that includes the replay-failures list.
Both are low-priority follow-ups, not PR 4 blockers.

View File

@@ -21,6 +21,13 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
private long _nextSessionId;
private long _nextSubscriptionId;
// DB-only backend doesn't have a runtime data plane; never raises events.
#pragma warning disable CS0067
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{
var id = Interlocked.Increment(ref _nextSessionId);

View File

@@ -14,6 +14,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// </summary>
public interface IGalaxyBackend
{
/// <summary>
/// Server-pushed events the backend raises asynchronously (data-change, alarm,
/// host-status). The frame handler subscribes once on connect and forwards each
/// event to the Proxy as a typed <see cref="MessageKind"/> notification.
/// </summary>
event System.EventHandler<OnDataChangeNotification>? OnDataChange;
event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct);
Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct);

View File

@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
@@ -20,6 +21,7 @@ public sealed class MxAccessClient : IDisposable
private readonly StaPump _pump;
private readonly IMxProxy _proxy;
private readonly string _clientName;
private readonly MxAccessClientOptions _options;
// Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read).
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
@@ -30,39 +32,148 @@ public sealed class MxAccessClient : IDisposable
private int _connectionHandle;
private bool _connected;
private DateTime _lastObservedActivityUtc = DateTime.UtcNow;
private CancellationTokenSource? _monitorCts;
private int _reconnectCount;
private bool _disposed;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName)
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
public event EventHandler<bool>? ConnectionStateChanged;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
{
_pump = pump;
_proxy = proxy;
_clientName = clientName;
_options = options ?? new MxAccessClientOptions();
_proxy.OnDataChange += OnDataChange;
_proxy.OnWriteComplete += OnWriteComplete;
}
public bool IsConnected => _connected;
public int SubscriptionCount => _subscriptions.Count;
public int ReconnectCount => _reconnectCount;
/// <summary>Connects on the STA thread. Idempotent.</summary>
public Task<int> ConnectAsync() => _pump.InvokeAsync(() =>
/// <summary>Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call.</summary>
public async Task<int> ConnectAsync()
{
if (_connected) return _connectionHandle;
_connectionHandle = _proxy.Register(_clientName);
_connected = true;
return _connectionHandle;
});
public Task DisconnectAsync() => _pump.InvokeAsync(() =>
{
if (!_connected) return;
try { _proxy.Unregister(_connectionHandle); }
finally
var handle = await _pump.InvokeAsync(() =>
{
_connected = false;
_addressToHandle.Clear();
_handleToAddress.Clear();
if (_connected) return _connectionHandle;
_connectionHandle = _proxy.Register(_clientName);
_connected = true;
return _connectionHandle;
});
ConnectionStateChanged?.Invoke(this, true);
if (_options.AutoReconnect && _monitorCts is null)
{
_monitorCts = new CancellationTokenSource();
_ = Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
}
});
return handle;
}
public async Task DisconnectAsync()
{
_monitorCts?.Cancel();
_monitorCts = null;
await _pump.InvokeAsync(() =>
{
if (!_connected) return;
try { _proxy.Unregister(_connectionHandle); }
finally
{
_connected = false;
_addressToHandle.Clear();
_handleToAddress.Clear();
}
});
ConnectionStateChanged?.Invoke(this, false);
}
/// <summary>
/// Background loop that watches for connection liveness signals and triggers
/// reconnect-with-replay when the connection appears dead. Per Phase 2 high finding #2:
/// v1's MxAccessClient.Monitor pattern lifted into the new pump-based client. Uses
/// observed-activity timestamp + optional probe-tag subscription. Without an explicit
/// probe tag, falls back to "no data change in N seconds + no successful read in N
/// seconds = unhealthy" — same shape as v1.
/// </summary>
private async Task MonitorLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try { await Task.Delay(_options.MonitorInterval, ct); }
catch (OperationCanceledException) { break; }
if (!_connected || _disposed) continue;
var idle = DateTime.UtcNow - _lastObservedActivityUtc;
if (idle <= _options.StaleThreshold) continue;
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
// our reconnect signal.
bool probeOk;
try
{
probeOk = await _pump.InvokeAsync(() =>
{
// AddItem on the connection handle is cheap and round-trips through COM.
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
catch { return false; }
});
}
catch { probeOk = false; }
if (probeOk)
{
_lastObservedActivityUtc = DateTime.UtcNow;
continue;
}
// Connection appears dead — reconnect-with-replay.
try
{
await _pump.InvokeAsync(() =>
{
try { _proxy.Unregister(_connectionHandle); } catch { /* dead anyway */ }
_connected = false;
});
ConnectionStateChanged?.Invoke(this, false);
await _pump.InvokeAsync(() =>
{
_connectionHandle = _proxy.Register(_clientName);
_connected = true;
});
_reconnectCount++;
ConnectionStateChanged?.Invoke(this, true);
// Replay every subscription that was active before the disconnect.
var snapshot = _addressToHandle.Keys.ToArray();
_addressToHandle.Clear();
_handleToAddress.Clear();
foreach (var fullRef in snapshot)
{
try { await SubscribeOnPumpAsync(fullRef); }
catch { /* skip — operator can re-subscribe */ }
}
_lastObservedActivityUtc = DateTime.UtcNow;
}
catch
{
// Reconnect failed; back off and retry on the next tick.
_connected = false;
}
}
}
/// <summary>
/// One-shot read implemented as a transient subscribe + unsubscribe.
@@ -79,26 +190,72 @@ public sealed class MxAccessClient : IDisposable
// Stash the one-shot handler before sending the subscribe, then remove it after firing.
_subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot));
var addedToReadOnlyAttribute = !_addressToHandle.ContainsKey(fullReference);
var itemHandle = await SubscribeOnPumpAsync(fullReference);
try
{
await SubscribeOnPumpAsync(fullReference);
using var _ = ct.Register(() => tcs.TrySetCanceled());
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
using var _ = ct.Register(() => tcs.TrySetCanceled());
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
// Detach the one-shot handler.
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
return await tcs.Task;
return await tcs.Task;
}
finally
{
// High 1 — always detach the one-shot handler, even on cancellation/timeout/throw.
// If we were the one who added the underlying MXAccess subscription (no other
// caller had it), tear it down too so we don't leak a probe item handle.
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
if (addedToReadOnlyAttribute)
{
try { await UnsubscribeAsync(fullReference); }
catch { /* shutdown-best-effort */ }
}
}
}
public Task WriteAsync(string fullReference, object value, int securityClassification = 0) =>
_pump.InvokeAsync(() =>
/// <summary>
/// Writes <paramref name="value"/> to the runtime and AWAITS the OnWriteComplete
/// callback so the caller learns the actual write status. Per Phase 2 medium finding #4
/// in <c>exit-gate-phase-2.md</c>: the previous fire-and-forget version returned a
/// false-positive Good even when the runtime rejected the write post-callback.
/// </summary>
public async Task<bool> WriteAsync(string fullReference, object value,
int securityClassification = 0, TimeSpan? timeout = null)
{
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
var actualTimeout = timeout ?? TimeSpan.FromSeconds(5);
var itemHandle = await _pump.InvokeAsync(() => ResolveItem(fullReference));
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_pendingWrites.TryAdd(itemHandle, tcs))
{
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
var itemHandle = ResolveItem(fullReference);
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification);
});
// A prior write to the same item handle is still pending — uncommon but possible
// if the caller spammed writes. Replace it: the older TCS observes a Cancelled task.
if (_pendingWrites.TryRemove(itemHandle, out var prior))
prior.TrySetCanceled();
_pendingWrites[itemHandle] = tcs;
}
try
{
await _pump.InvokeAsync(() =>
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification));
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(actualTimeout));
if (raceTask != tcs.Task)
throw new TimeoutException($"MXAccess write of {fullReference} timed out after {actualTimeout}");
return await tcs.Task;
}
finally
{
_pendingWrites.TryRemove(itemHandle, out _);
}
}
public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback)
{
@@ -148,6 +305,9 @@ public sealed class MxAccessClient : IDisposable
{
if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return;
// Liveness: any data-change event is proof the connection is alive.
_lastObservedActivityUtc = DateTime.UtcNow;
var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow;
var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality));
var vtq = new Vtq(pvItemValue, ts, quality);
@@ -169,10 +329,30 @@ public sealed class MxAccessClient : IDisposable
public void Dispose()
{
_disposed = true;
_monitorCts?.Cancel();
try { DisconnectAsync().GetAwaiter().GetResult(); }
catch { /* swallow */ }
_proxy.OnDataChange -= OnDataChange;
_proxy.OnWriteComplete -= OnWriteComplete;
_monitorCts?.Dispose();
}
}
/// <summary>
/// Tunables for <see cref="MxAccessClient"/>'s reconnect monitor. Defaults match the v1
/// monitor's polling cadence so behavior is consistent across the lift.
/// </summary>
public sealed class MxAccessClientOptions
{
/// <summary>Whether to start the background monitor at connect time.</summary>
public bool AutoReconnect { get; init; } = true;
/// <summary>How often the monitor wakes up to check liveness.</summary>
public TimeSpan MonitorInterval { get; init; } = TimeSpan.FromSeconds(5);
/// <summary>If no data-change activity in this window, the monitor probes the connection.</summary>
public TimeSpan StaleThreshold { get; init; } = TimeSpan.FromSeconds(60);
}

View File

@@ -27,6 +27,15 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
// Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many).
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, System.Collections.Concurrent.ConcurrentBag<long>>
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
#pragma warning disable CS0067 // event not yet raised — alarm + host-status wire-up in PR #4 follow-up
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
{
@@ -120,8 +129,13 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
? null
: MessagePackSerializer.Deserialize<object>(w.ValueBytes);
await _mx.WriteAsync(w.TagReference, value!);
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0 });
var ok = await _mx.WriteAsync(w.TagReference, value!);
results.Add(new WriteValueResult
{
TagReference = w.TagReference,
StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError
Error = ok ? null : "MXAccess runtime reported write failure",
});
}
catch (Exception ex)
{
@@ -137,12 +151,16 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
try
{
// For each requested tag, register a subscription that publishes back via the
// shared MXAccess data-change handler. The OnDataChange push frame to the Proxy
// is wired in the upcoming subscription-push pass; for now the value is captured
// for the first ReadAsync to hit it (so the subscribe surface itself is functional).
foreach (var tag in req.TagReferences)
await _mx.SubscribeAsync(tag, (_, __) => { /* push-frame plumbing in next iteration */ });
{
_refToSubs.AddOrUpdate(tag,
_ => new System.Collections.Concurrent.ConcurrentBag<long> { sid },
(_, bag) => { bag.Add(sid); return bag; });
// The MXAccess SubscribeAsync only takes one callback per tag; the same callback
// fires for every active subscription of that tag — we fan out by SubscriptionId.
await _mx.SubscribeAsync(tag, OnTagValueChanged);
}
_subs[sid] = req.TagReferences;
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
@@ -157,7 +175,48 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
{
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
foreach (var r in refs)
await _mx.UnsubscribeAsync(r);
{
// Drop this subscription from the reverse map; only unsubscribe from MXAccess if no
// other subscription is still listening (multiple Proxy subs may share a tag).
_refToSubs.TryGetValue(r, out var bag);
if (bag is not null)
{
var remaining = new System.Collections.Concurrent.ConcurrentBag<long>(
bag.Where(id => id != req.SubscriptionId));
if (remaining.IsEmpty)
{
_refToSubs.TryRemove(r, out _);
await _mx.UnsubscribeAsync(r);
}
else
{
_refToSubs[r] = remaining;
}
}
}
}
/// <summary>
/// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in
/// a <see cref="GalaxyDataValue"/> and raises <see cref="OnDataChange"/> once per
/// subscription that includes this tag — the IPC sink translates that into outbound
/// <c>OnDataChangeNotification</c> frames.
/// </summary>
private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq)
{
if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return;
var wireValue = ToWire(fullReference, vtq);
// Emit one notification per active SubscriptionId for this tag — the Proxy fans out to
// each ISubscribable consumer based on the SubscriptionId in the payload.
foreach (var sid in bag.Distinct())
{
OnDataChange?.Invoke(this, new OnDataChangeNotification
{
SubscriptionId = sid,
Values = new[] { wireValue },
});
}
}
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;

View File

@@ -15,6 +15,13 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
private long _nextSessionId;
private long _nextSubscriptionId;
// Stub backend never raises events — implements the interface members for symmetry.
#pragma warning disable CS0067
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{
var id = Interlocked.Increment(ref _nextSessionId);

View File

@@ -99,9 +99,64 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
}
}
/// <summary>
/// Subscribes the backend's server-pushed events for the lifetime of the connection.
/// The returned disposable unsubscribes when the connection closes — without it the
/// backend's static event invocation list would accumulate dead writer references and
/// leak memory + raise <see cref="ObjectDisposedException"/> on every push.
/// </summary>
public IDisposable AttachConnection(FrameWriter writer)
{
var sink = new ConnectionSink(backend, writer, logger);
sink.Attach();
return sink;
}
private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct)
=> writer.WriteAsync(MessageKind.ErrorResponse,
new ErrorResponse { Code = code, Message = message }, ct);
private sealed class ConnectionSink : IDisposable
{
private readonly IGalaxyBackend _backend;
private readonly FrameWriter _writer;
private readonly ILogger _logger;
private EventHandler<OnDataChangeNotification>? _onData;
private EventHandler<GalaxyAlarmEvent>? _onAlarm;
private EventHandler<HostConnectivityStatus>? _onHost;
public ConnectionSink(IGalaxyBackend backend, FrameWriter writer, ILogger logger)
{
_backend = backend; _writer = writer; _logger = logger;
}
public void Attach()
{
_onData = (_, e) => Push(MessageKind.OnDataChangeNotification, e);
_onAlarm = (_, e) => Push(MessageKind.AlarmEvent, e);
_onHost = (_, e) => Push(MessageKind.RuntimeStatusChange,
new RuntimeStatusChangeNotification { Status = e });
_backend.OnDataChange += _onData;
_backend.OnAlarmEvent += _onAlarm;
_backend.OnHostStatusChanged += _onHost;
}
private void Push<T>(MessageKind kind, T payload)
{
// Fire-and-forget — pushes can race with disposal of the writer. We swallow
// ObjectDisposedException because the dispose path will detach this sink shortly.
try { _writer.WriteAsync(kind, payload, CancellationToken.None).GetAwaiter().GetResult(); }
catch (ObjectDisposedException) { }
catch (Exception ex) { _logger.Warning(ex, "ConnectionSink push failed for {Kind}", kind); }
}
public void Dispose()
{
if (_onData is not null) _backend.OnDataChange -= _onData;
if (_onAlarm is not null) _backend.OnAlarmEvent -= _onAlarm;
if (_onHost is not null) _backend.OnHostStatusChanged -= _onHost;
}
}
}

View File

@@ -98,6 +98,8 @@ public sealed class PipeServer : IDisposable
new HelloAck { Accepted = true, HostName = Environment.MachineName },
linked.Token).ConfigureAwait(false);
using var attachment = handler.AttachConnection(writer);
while (!linked.Token.IsCancellationRequested)
{
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
@@ -157,4 +159,19 @@ public sealed class PipeServer : IDisposable
public interface IFrameHandler
{
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
/// <summary>
/// Called once per accepted connection after the Hello handshake. Lets the handler
/// attach server-pushed event sinks (data-change, alarm, host-status) to the
/// connection's <paramref name="writer"/>. Returns an <see cref="IDisposable"/> the
/// pipe server disposes when the connection closes — backends use it to unsubscribe.
/// Implementations that don't push events can return <see cref="NoopAttachment"/>.
/// </summary>
IDisposable AttachConnection(FrameWriter writer);
public sealed class NoopAttachment : IDisposable
{
public static readonly NoopAttachment Instance = new();
public void Dispose() { }
}
}

View File

@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
@@ -27,4 +28,6 @@ public sealed class StubFrameHandler : IFrameHandler
new ErrorResponse { Code = "not-implemented", Message = $"Kind {kind} is stubbed — MXAccess lift deferred" },
ct);
}
public IDisposable AttachConnection(FrameWriter writer) => IFrameHandler.NoopAttachment.Instance;
}