Compare commits
1 Commits
phase-2-st
...
phase-2-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
caa9cb86f6 |
91
docs/v2/implementation/pr-4-body.md
Normal file
91
docs/v2/implementation/pr-4-body.md
Normal file
@@ -0,0 +1,91 @@
|
||||
# PR 4 — Phase 2 follow-up: close the 4 open MXAccess findings
|
||||
|
||||
**Source**: `phase-2-pr4-findings` (branched from `phase-2-stream-d`)
|
||||
**Target**: `v2`
|
||||
|
||||
## Summary
|
||||
|
||||
Closes the 4 high/medium open findings carried forward in `exit-gate-phase-2-final.md`:
|
||||
|
||||
- **High 1 — `ReadAsync` subscription-leak on cancel.** One-shot read now wraps the
|
||||
subscribe→first-OnDataChange→unsubscribe pattern in a `try/finally` so the per-tag
|
||||
callback is always detached, and if the read installed the underlying MXAccess
|
||||
subscription itself (no other caller had it), it tears it down on the way out.
|
||||
- **High 2 — No reconnect loop on the MXAccess COM connection.** New
|
||||
`MxAccessClientOptions { AutoReconnect, MonitorInterval, StaleThreshold }` + a background
|
||||
`MonitorLoopAsync` that watches a stale-activity threshold + probes the proxy via a
|
||||
no-op COM call, then reconnects-with-replay (re-Register, re-AddItem every active
|
||||
subscription) when the proxy is dead. Liveness signal: every `OnDataChange` callback bumps
|
||||
`_lastObservedActivityUtc`. Defaults match v1 monitor cadence (5s poll, 60s stale).
|
||||
`ReconnectCount` exposed for diagnostics; `ConnectionStateChanged` event for downstream
|
||||
consumers (the supervisor on the Proxy side already surfaces this through its
|
||||
HeartbeatMonitor, but the Host-side event lets local logging/metrics hook in).
|
||||
- **Medium 3 — `MxAccessGalaxyBackend.SubscribeAsync` doesn't push OnDataChange frames back to
|
||||
the Proxy.** New `IGalaxyBackend.OnDataChange` / `OnAlarmEvent` / `OnHostStatusChanged`
|
||||
events that the new `GalaxyFrameHandler.AttachConnection` subscribes per-connection and
|
||||
forwards as outbound `OnDataChangeNotification` / `AlarmEvent` /
|
||||
`RuntimeStatusChange` frames through the connection's `FrameWriter`. `MxAccessGalaxyBackend`
|
||||
fans out per-tag value changes to every `SubscriptionId` that's listening to that tag
|
||||
(multiple Proxy subs may share a Galaxy attribute — single COM subscription, multi-fan-out
|
||||
on the wire). Stub + DbBacked backends declare the events with `#pragma warning disable
|
||||
CS0067` (treat-warnings-as-errors would otherwise fail on never-raised events that exist
|
||||
only to satisfy the interface).
|
||||
- **Medium 4 — `WriteValuesAsync` doesn't await `OnWriteComplete`.** New
|
||||
`WriteAsync(...)` overload returns `bool` after awaiting the OnWriteComplete callback via
|
||||
the v1-style `TaskCompletionSource`-keyed-by-item-handle pattern in `_pendingWrites`.
|
||||
`MxAccessGalaxyBackend.WriteValuesAsync` now reports per-tag `Bad_InternalError` when the
|
||||
runtime rejected the write, instead of false-positive `Good`.
|
||||
|
||||
## Pipe server change
|
||||
|
||||
`IFrameHandler` gains `AttachConnection(FrameWriter writer): IDisposable` so the handler can
|
||||
register backend event sinks on each accepted connection and detach them at disconnect. The
|
||||
`PipeServer.RunOneConnectionAsync` calls it after the Hello handshake and disposes it in the
|
||||
finally of the per-connection scope. `StubFrameHandler` returns `IFrameHandler.NoopAttachment.Instance`
|
||||
(net48 doesn't support default interface methods, so the empty-attach lives as a public nested
|
||||
class).
|
||||
|
||||
## Tests
|
||||
|
||||
**`dotnet test ZB.MOM.WW.OtOpcUa.slnx`**: **460 pass / 7 skip (E2E on admin shell) / 1
|
||||
pre-existing baseline failure**. No regressions. The Driver.Galaxy.Host unit tests + 5 live
|
||||
ZB smoke + 3 live MXAccess COM smoke all pass unchanged.
|
||||
|
||||
## Test plan for reviewers
|
||||
|
||||
- [ ] `dotnet build` clean
|
||||
- [ ] `dotnet test` shows 460/7-skip/1-baseline
|
||||
- [ ] Spot-check `MxAccessClient.MonitorLoopAsync` against v1's `MxAccessClient.Monitor`
|
||||
partial (`src/ZB.MOM.WW.OtOpcUa.Host/MxAccess/MxAccessClient.Monitor.cs`) — same
|
||||
polling cadence, same probe-then-reconnect-with-replay shape
|
||||
- [ ] Read `GalaxyFrameHandler.ConnectionSink.Dispose` and confirm event handlers are
|
||||
detached on connection close (no leaked invocation list refs)
|
||||
- [ ] `WriteValuesAsync` returning `Bad_InternalError` on a runtime-rejected write is the
|
||||
correct shape — confirm against the v1 `MxAccessClient.ReadWrite.cs` pattern
|
||||
|
||||
## What's NOT in this PR
|
||||
|
||||
- Wonderware Historian SDK plugin port (Task B.1.h) — separate PR, larger scope.
|
||||
- Alarm subsystem wire-up (`MxAccessGalaxyBackend.SubscribeAlarmsAsync` is still a no-op).
|
||||
`OnAlarmEvent` is declared on the backend interface and pushed by the frame handler when
|
||||
raised; `MxAccessGalaxyBackend` just doesn't raise it yet (waits for the alarm-tracking
|
||||
port from v1's `AlarmObjectFilter` + Galaxy alarm primitives).
|
||||
- Host-status push (`OnHostStatusChanged`) — declared on the interface and pushed by the
|
||||
frame handler; `MxAccessGalaxyBackend` doesn't raise it (the Galaxy.Host's
|
||||
`HostConnectivityProbe` from v1 needs porting too, scoped under the Historian PR).
|
||||
|
||||
## Adversarial review
|
||||
|
||||
Quick pass over the PR 4 deltas. No new findings beyond:
|
||||
|
||||
- **Low 1** — `MonitorLoopAsync`'s `$Heartbeat` probe item-handle is leaked
|
||||
(`AddItem` succeeds, never `RemoveItem`'d). Cosmetic — the probe item is internal to
|
||||
the COM connection, dies with `Unregister` at disconnect/recycle. Worth a follow-up
|
||||
to call `RemoveItem` after the probe succeeds.
|
||||
- **Low 2** — Replay loop in `MonitorLoopAsync` swallows per-subscription failures. If
|
||||
Galaxy permanently rejects a previously-valid reference (rare but possible after a
|
||||
re-deploy), the user gets silent data loss for that one subscription. The stub-handler-
|
||||
unaware operator wouldn't notice. Worth surfacing as a `ConnectionStateChanged(false)
|
||||
→ ConnectionStateChanged(true)` payload that includes the replay-failures list.
|
||||
|
||||
Both are low-priority follow-ups, not PR 4 blockers.
|
||||
@@ -21,6 +21,13 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
|
||||
private long _nextSessionId;
|
||||
private long _nextSubscriptionId;
|
||||
|
||||
// DB-only backend doesn't have a runtime data plane; never raises events.
|
||||
#pragma warning disable CS0067
|
||||
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||
#pragma warning restore CS0067
|
||||
|
||||
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||
{
|
||||
var id = Interlocked.Increment(ref _nextSessionId);
|
||||
|
||||
@@ -14,6 +14,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
/// </summary>
|
||||
public interface IGalaxyBackend
|
||||
{
|
||||
/// <summary>
|
||||
/// Server-pushed events the backend raises asynchronously (data-change, alarm,
|
||||
/// host-status). The frame handler subscribes once on connect and forwards each
|
||||
/// event to the Proxy as a typed <see cref="MessageKind"/> notification.
|
||||
/// </summary>
|
||||
event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||
event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||
event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||
|
||||
Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct);
|
||||
Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct);
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ArchestrA.MxAccess;
|
||||
@@ -20,6 +21,7 @@ public sealed class MxAccessClient : IDisposable
|
||||
private readonly StaPump _pump;
|
||||
private readonly IMxProxy _proxy;
|
||||
private readonly string _clientName;
|
||||
private readonly MxAccessClientOptions _options;
|
||||
|
||||
// Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read).
|
||||
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
|
||||
@@ -30,39 +32,148 @@ public sealed class MxAccessClient : IDisposable
|
||||
|
||||
private int _connectionHandle;
|
||||
private bool _connected;
|
||||
private DateTime _lastObservedActivityUtc = DateTime.UtcNow;
|
||||
private CancellationTokenSource? _monitorCts;
|
||||
private int _reconnectCount;
|
||||
private bool _disposed;
|
||||
|
||||
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName)
|
||||
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
|
||||
public event EventHandler<bool>? ConnectionStateChanged;
|
||||
|
||||
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
|
||||
{
|
||||
_pump = pump;
|
||||
_proxy = proxy;
|
||||
_clientName = clientName;
|
||||
_options = options ?? new MxAccessClientOptions();
|
||||
_proxy.OnDataChange += OnDataChange;
|
||||
_proxy.OnWriteComplete += OnWriteComplete;
|
||||
}
|
||||
|
||||
public bool IsConnected => _connected;
|
||||
public int SubscriptionCount => _subscriptions.Count;
|
||||
public int ReconnectCount => _reconnectCount;
|
||||
|
||||
/// <summary>Connects on the STA thread. Idempotent.</summary>
|
||||
public Task<int> ConnectAsync() => _pump.InvokeAsync(() =>
|
||||
/// <summary>Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call.</summary>
|
||||
public async Task<int> ConnectAsync()
|
||||
{
|
||||
if (_connected) return _connectionHandle;
|
||||
_connectionHandle = _proxy.Register(_clientName);
|
||||
_connected = true;
|
||||
return _connectionHandle;
|
||||
});
|
||||
|
||||
public Task DisconnectAsync() => _pump.InvokeAsync(() =>
|
||||
{
|
||||
if (!_connected) return;
|
||||
try { _proxy.Unregister(_connectionHandle); }
|
||||
finally
|
||||
var handle = await _pump.InvokeAsync(() =>
|
||||
{
|
||||
_connected = false;
|
||||
_addressToHandle.Clear();
|
||||
_handleToAddress.Clear();
|
||||
if (_connected) return _connectionHandle;
|
||||
_connectionHandle = _proxy.Register(_clientName);
|
||||
_connected = true;
|
||||
return _connectionHandle;
|
||||
});
|
||||
|
||||
ConnectionStateChanged?.Invoke(this, true);
|
||||
|
||||
if (_options.AutoReconnect && _monitorCts is null)
|
||||
{
|
||||
_monitorCts = new CancellationTokenSource();
|
||||
_ = Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
|
||||
}
|
||||
});
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
public async Task DisconnectAsync()
|
||||
{
|
||||
_monitorCts?.Cancel();
|
||||
_monitorCts = null;
|
||||
|
||||
await _pump.InvokeAsync(() =>
|
||||
{
|
||||
if (!_connected) return;
|
||||
try { _proxy.Unregister(_connectionHandle); }
|
||||
finally
|
||||
{
|
||||
_connected = false;
|
||||
_addressToHandle.Clear();
|
||||
_handleToAddress.Clear();
|
||||
}
|
||||
});
|
||||
|
||||
ConnectionStateChanged?.Invoke(this, false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Background loop that watches for connection liveness signals and triggers
|
||||
/// reconnect-with-replay when the connection appears dead. Per Phase 2 high finding #2:
|
||||
/// v1's MxAccessClient.Monitor pattern lifted into the new pump-based client. Uses
|
||||
/// observed-activity timestamp + optional probe-tag subscription. Without an explicit
|
||||
/// probe tag, falls back to "no data change in N seconds + no successful read in N
|
||||
/// seconds = unhealthy" — same shape as v1.
|
||||
/// </summary>
|
||||
private async Task MonitorLoopAsync(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try { await Task.Delay(_options.MonitorInterval, ct); }
|
||||
catch (OperationCanceledException) { break; }
|
||||
|
||||
if (!_connected || _disposed) continue;
|
||||
|
||||
var idle = DateTime.UtcNow - _lastObservedActivityUtc;
|
||||
if (idle <= _options.StaleThreshold) continue;
|
||||
|
||||
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
|
||||
// our reconnect signal.
|
||||
bool probeOk;
|
||||
try
|
||||
{
|
||||
probeOk = await _pump.InvokeAsync(() =>
|
||||
{
|
||||
// AddItem on the connection handle is cheap and round-trips through COM.
|
||||
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
|
||||
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
|
||||
catch { return false; }
|
||||
});
|
||||
}
|
||||
catch { probeOk = false; }
|
||||
|
||||
if (probeOk)
|
||||
{
|
||||
_lastObservedActivityUtc = DateTime.UtcNow;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Connection appears dead — reconnect-with-replay.
|
||||
try
|
||||
{
|
||||
await _pump.InvokeAsync(() =>
|
||||
{
|
||||
try { _proxy.Unregister(_connectionHandle); } catch { /* dead anyway */ }
|
||||
_connected = false;
|
||||
});
|
||||
ConnectionStateChanged?.Invoke(this, false);
|
||||
|
||||
await _pump.InvokeAsync(() =>
|
||||
{
|
||||
_connectionHandle = _proxy.Register(_clientName);
|
||||
_connected = true;
|
||||
});
|
||||
_reconnectCount++;
|
||||
ConnectionStateChanged?.Invoke(this, true);
|
||||
|
||||
// Replay every subscription that was active before the disconnect.
|
||||
var snapshot = _addressToHandle.Keys.ToArray();
|
||||
_addressToHandle.Clear();
|
||||
_handleToAddress.Clear();
|
||||
foreach (var fullRef in snapshot)
|
||||
{
|
||||
try { await SubscribeOnPumpAsync(fullRef); }
|
||||
catch { /* skip — operator can re-subscribe */ }
|
||||
}
|
||||
|
||||
_lastObservedActivityUtc = DateTime.UtcNow;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Reconnect failed; back off and retry on the next tick.
|
||||
_connected = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One-shot read implemented as a transient subscribe + unsubscribe.
|
||||
@@ -79,26 +190,72 @@ public sealed class MxAccessClient : IDisposable
|
||||
|
||||
// Stash the one-shot handler before sending the subscribe, then remove it after firing.
|
||||
_subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot));
|
||||
var addedToReadOnlyAttribute = !_addressToHandle.ContainsKey(fullReference);
|
||||
|
||||
var itemHandle = await SubscribeOnPumpAsync(fullReference);
|
||||
try
|
||||
{
|
||||
await SubscribeOnPumpAsync(fullReference);
|
||||
|
||||
using var _ = ct.Register(() => tcs.TrySetCanceled());
|
||||
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
|
||||
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
|
||||
using var _ = ct.Register(() => tcs.TrySetCanceled());
|
||||
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
|
||||
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
|
||||
|
||||
// Detach the one-shot handler.
|
||||
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
|
||||
|
||||
return await tcs.Task;
|
||||
return await tcs.Task;
|
||||
}
|
||||
finally
|
||||
{
|
||||
// High 1 — always detach the one-shot handler, even on cancellation/timeout/throw.
|
||||
// If we were the one who added the underlying MXAccess subscription (no other
|
||||
// caller had it), tear it down too so we don't leak a probe item handle.
|
||||
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
|
||||
if (addedToReadOnlyAttribute)
|
||||
{
|
||||
try { await UnsubscribeAsync(fullReference); }
|
||||
catch { /* shutdown-best-effort */ }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Task WriteAsync(string fullReference, object value, int securityClassification = 0) =>
|
||||
_pump.InvokeAsync(() =>
|
||||
/// <summary>
|
||||
/// Writes <paramref name="value"/> to the runtime and AWAITS the OnWriteComplete
|
||||
/// callback so the caller learns the actual write status. Per Phase 2 medium finding #4
|
||||
/// in <c>exit-gate-phase-2.md</c>: the previous fire-and-forget version returned a
|
||||
/// false-positive Good even when the runtime rejected the write post-callback.
|
||||
/// </summary>
|
||||
public async Task<bool> WriteAsync(string fullReference, object value,
|
||||
int securityClassification = 0, TimeSpan? timeout = null)
|
||||
{
|
||||
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
||||
var actualTimeout = timeout ?? TimeSpan.FromSeconds(5);
|
||||
|
||||
var itemHandle = await _pump.InvokeAsync(() => ResolveItem(fullReference));
|
||||
|
||||
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
if (!_pendingWrites.TryAdd(itemHandle, tcs))
|
||||
{
|
||||
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
||||
var itemHandle = ResolveItem(fullReference);
|
||||
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification);
|
||||
});
|
||||
// A prior write to the same item handle is still pending — uncommon but possible
|
||||
// if the caller spammed writes. Replace it: the older TCS observes a Cancelled task.
|
||||
if (_pendingWrites.TryRemove(itemHandle, out var prior))
|
||||
prior.TrySetCanceled();
|
||||
_pendingWrites[itemHandle] = tcs;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await _pump.InvokeAsync(() =>
|
||||
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification));
|
||||
|
||||
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(actualTimeout));
|
||||
if (raceTask != tcs.Task)
|
||||
throw new TimeoutException($"MXAccess write of {fullReference} timed out after {actualTimeout}");
|
||||
|
||||
return await tcs.Task;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_pendingWrites.TryRemove(itemHandle, out _);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback)
|
||||
{
|
||||
@@ -148,6 +305,9 @@ public sealed class MxAccessClient : IDisposable
|
||||
{
|
||||
if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return;
|
||||
|
||||
// Liveness: any data-change event is proof the connection is alive.
|
||||
_lastObservedActivityUtc = DateTime.UtcNow;
|
||||
|
||||
var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow;
|
||||
var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality));
|
||||
var vtq = new Vtq(pvItemValue, ts, quality);
|
||||
@@ -169,10 +329,30 @@ public sealed class MxAccessClient : IDisposable
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_disposed = true;
|
||||
_monitorCts?.Cancel();
|
||||
|
||||
try { DisconnectAsync().GetAwaiter().GetResult(); }
|
||||
catch { /* swallow */ }
|
||||
|
||||
_proxy.OnDataChange -= OnDataChange;
|
||||
_proxy.OnWriteComplete -= OnWriteComplete;
|
||||
_monitorCts?.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tunables for <see cref="MxAccessClient"/>'s reconnect monitor. Defaults match the v1
|
||||
/// monitor's polling cadence so behavior is consistent across the lift.
|
||||
/// </summary>
|
||||
public sealed class MxAccessClientOptions
|
||||
{
|
||||
/// <summary>Whether to start the background monitor at connect time.</summary>
|
||||
public bool AutoReconnect { get; init; } = true;
|
||||
|
||||
/// <summary>How often the monitor wakes up to check liveness.</summary>
|
||||
public TimeSpan MonitorInterval { get; init; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
/// <summary>If no data-change activity in this window, the monitor probes the connection.</summary>
|
||||
public TimeSpan StaleThreshold { get; init; } = TimeSpan.FromSeconds(60);
|
||||
}
|
||||
|
||||
@@ -27,6 +27,15 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
|
||||
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
|
||||
// Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many).
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, System.Collections.Concurrent.ConcurrentBag<long>>
|
||||
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||
#pragma warning disable CS0067 // event not yet raised — alarm + host-status wire-up in PR #4 follow-up
|
||||
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||
#pragma warning restore CS0067
|
||||
|
||||
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
|
||||
{
|
||||
@@ -120,8 +129,13 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
? null
|
||||
: MessagePackSerializer.Deserialize<object>(w.ValueBytes);
|
||||
|
||||
await _mx.WriteAsync(w.TagReference, value!);
|
||||
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0 });
|
||||
var ok = await _mx.WriteAsync(w.TagReference, value!);
|
||||
results.Add(new WriteValueResult
|
||||
{
|
||||
TagReference = w.TagReference,
|
||||
StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError
|
||||
Error = ok ? null : "MXAccess runtime reported write failure",
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -137,12 +151,16 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
|
||||
try
|
||||
{
|
||||
// For each requested tag, register a subscription that publishes back via the
|
||||
// shared MXAccess data-change handler. The OnDataChange push frame to the Proxy
|
||||
// is wired in the upcoming subscription-push pass; for now the value is captured
|
||||
// for the first ReadAsync to hit it (so the subscribe surface itself is functional).
|
||||
foreach (var tag in req.TagReferences)
|
||||
await _mx.SubscribeAsync(tag, (_, __) => { /* push-frame plumbing in next iteration */ });
|
||||
{
|
||||
_refToSubs.AddOrUpdate(tag,
|
||||
_ => new System.Collections.Concurrent.ConcurrentBag<long> { sid },
|
||||
(_, bag) => { bag.Add(sid); return bag; });
|
||||
|
||||
// The MXAccess SubscribeAsync only takes one callback per tag; the same callback
|
||||
// fires for every active subscription of that tag — we fan out by SubscriptionId.
|
||||
await _mx.SubscribeAsync(tag, OnTagValueChanged);
|
||||
}
|
||||
|
||||
_subs[sid] = req.TagReferences;
|
||||
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
|
||||
@@ -157,7 +175,48 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
{
|
||||
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
|
||||
foreach (var r in refs)
|
||||
await _mx.UnsubscribeAsync(r);
|
||||
{
|
||||
// Drop this subscription from the reverse map; only unsubscribe from MXAccess if no
|
||||
// other subscription is still listening (multiple Proxy subs may share a tag).
|
||||
_refToSubs.TryGetValue(r, out var bag);
|
||||
if (bag is not null)
|
||||
{
|
||||
var remaining = new System.Collections.Concurrent.ConcurrentBag<long>(
|
||||
bag.Where(id => id != req.SubscriptionId));
|
||||
if (remaining.IsEmpty)
|
||||
{
|
||||
_refToSubs.TryRemove(r, out _);
|
||||
await _mx.UnsubscribeAsync(r);
|
||||
}
|
||||
else
|
||||
{
|
||||
_refToSubs[r] = remaining;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in
|
||||
/// a <see cref="GalaxyDataValue"/> and raises <see cref="OnDataChange"/> once per
|
||||
/// subscription that includes this tag — the IPC sink translates that into outbound
|
||||
/// <c>OnDataChangeNotification</c> frames.
|
||||
/// </summary>
|
||||
private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq)
|
||||
{
|
||||
if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return;
|
||||
|
||||
var wireValue = ToWire(fullReference, vtq);
|
||||
// Emit one notification per active SubscriptionId for this tag — the Proxy fans out to
|
||||
// each ISubscribable consumer based on the SubscriptionId in the payload.
|
||||
foreach (var sid in bag.Distinct())
|
||||
{
|
||||
OnDataChange?.Invoke(this, new OnDataChangeNotification
|
||||
{
|
||||
SubscriptionId = sid,
|
||||
Values = new[] { wireValue },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
|
||||
|
||||
@@ -15,6 +15,13 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
|
||||
private long _nextSessionId;
|
||||
private long _nextSubscriptionId;
|
||||
|
||||
// Stub backend never raises events — implements the interface members for symmetry.
|
||||
#pragma warning disable CS0067
|
||||
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||
#pragma warning restore CS0067
|
||||
|
||||
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||
{
|
||||
var id = Interlocked.Increment(ref _nextSessionId);
|
||||
|
||||
@@ -99,9 +99,64 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Subscribes the backend's server-pushed events for the lifetime of the connection.
|
||||
/// The returned disposable unsubscribes when the connection closes — without it the
|
||||
/// backend's static event invocation list would accumulate dead writer references and
|
||||
/// leak memory + raise <see cref="ObjectDisposedException"/> on every push.
|
||||
/// </summary>
|
||||
public IDisposable AttachConnection(FrameWriter writer)
|
||||
{
|
||||
var sink = new ConnectionSink(backend, writer, logger);
|
||||
sink.Attach();
|
||||
return sink;
|
||||
}
|
||||
|
||||
private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
|
||||
|
||||
private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct)
|
||||
=> writer.WriteAsync(MessageKind.ErrorResponse,
|
||||
new ErrorResponse { Code = code, Message = message }, ct);
|
||||
|
||||
private sealed class ConnectionSink : IDisposable
|
||||
{
|
||||
private readonly IGalaxyBackend _backend;
|
||||
private readonly FrameWriter _writer;
|
||||
private readonly ILogger _logger;
|
||||
private EventHandler<OnDataChangeNotification>? _onData;
|
||||
private EventHandler<GalaxyAlarmEvent>? _onAlarm;
|
||||
private EventHandler<HostConnectivityStatus>? _onHost;
|
||||
|
||||
public ConnectionSink(IGalaxyBackend backend, FrameWriter writer, ILogger logger)
|
||||
{
|
||||
_backend = backend; _writer = writer; _logger = logger;
|
||||
}
|
||||
|
||||
public void Attach()
|
||||
{
|
||||
_onData = (_, e) => Push(MessageKind.OnDataChangeNotification, e);
|
||||
_onAlarm = (_, e) => Push(MessageKind.AlarmEvent, e);
|
||||
_onHost = (_, e) => Push(MessageKind.RuntimeStatusChange,
|
||||
new RuntimeStatusChangeNotification { Status = e });
|
||||
_backend.OnDataChange += _onData;
|
||||
_backend.OnAlarmEvent += _onAlarm;
|
||||
_backend.OnHostStatusChanged += _onHost;
|
||||
}
|
||||
|
||||
private void Push<T>(MessageKind kind, T payload)
|
||||
{
|
||||
// Fire-and-forget — pushes can race with disposal of the writer. We swallow
|
||||
// ObjectDisposedException because the dispose path will detach this sink shortly.
|
||||
try { _writer.WriteAsync(kind, payload, CancellationToken.None).GetAwaiter().GetResult(); }
|
||||
catch (ObjectDisposedException) { }
|
||||
catch (Exception ex) { _logger.Warning(ex, "ConnectionSink push failed for {Kind}", kind); }
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_onData is not null) _backend.OnDataChange -= _onData;
|
||||
if (_onAlarm is not null) _backend.OnAlarmEvent -= _onAlarm;
|
||||
if (_onHost is not null) _backend.OnHostStatusChanged -= _onHost;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,6 +98,8 @@ public sealed class PipeServer : IDisposable
|
||||
new HelloAck { Accepted = true, HostName = Environment.MachineName },
|
||||
linked.Token).ConfigureAwait(false);
|
||||
|
||||
using var attachment = handler.AttachConnection(writer);
|
||||
|
||||
while (!linked.Token.IsCancellationRequested)
|
||||
{
|
||||
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
|
||||
@@ -157,4 +159,19 @@ public sealed class PipeServer : IDisposable
|
||||
public interface IFrameHandler
|
||||
{
|
||||
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
|
||||
|
||||
/// <summary>
|
||||
/// Called once per accepted connection after the Hello handshake. Lets the handler
|
||||
/// attach server-pushed event sinks (data-change, alarm, host-status) to the
|
||||
/// connection's <paramref name="writer"/>. Returns an <see cref="IDisposable"/> the
|
||||
/// pipe server disposes when the connection closes — backends use it to unsubscribe.
|
||||
/// Implementations that don't push events can return <see cref="NoopAttachment"/>.
|
||||
/// </summary>
|
||||
IDisposable AttachConnection(FrameWriter writer);
|
||||
|
||||
public sealed class NoopAttachment : IDisposable
|
||||
{
|
||||
public static readonly NoopAttachment Instance = new();
|
||||
public void Dispose() { }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
@@ -27,4 +28,6 @@ public sealed class StubFrameHandler : IFrameHandler
|
||||
new ErrorResponse { Code = "not-implemented", Message = $"Kind {kind} is stubbed — MXAccess lift deferred" },
|
||||
ct);
|
||||
}
|
||||
|
||||
public IDisposable AttachConnection(FrameWriter writer) => IFrameHandler.NoopAttachment.Instance;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user