Phase 2 — port MXAccess COM client to Galaxy.Host + MxAccessGalaxyBackend (3rd IGalaxyBackend) + live MXAccess smoke + Phase 2 exit-gate doc + adversarial review. The full Galaxy data-plane now flows through the v2 IPC topology end-to-end against live ArchestrA.MxAccess.dll, on this dev box, with 30/30 Host tests + 9/9 Proxy tests + 963/963 solution tests passing alongside the unchanged 494 v1 IntegrationTests baseline. Backend/MxAccess/Vtq is a focused port of v1's Vtq value-timestamp-quality DTO. Backend/MxAccess/IMxProxy abstracts LMXProxyServer (port of v1's IMxProxy with the same Register/Unregister/AddItem/RemoveItem/AdviseSupervisory/UnAdviseSupervisory/Write surface + OnDataChange + OnWriteComplete events); MxProxyAdapter is the concrete COM-backed implementation that does Marshal.ReleaseComObject-loop on Unregister, must be constructed on an STA thread. Backend/MxAccess/MxAccessClient is the focused port of v1's MxAccessClient partials — Connect/Disconnect/Read/Write/Subscribe/Unsubscribe through the new Sta/StaPump (the real Win32 GetMessage pump from the previous commit), ConcurrentDictionary handle tracking, OnDataChange event marshalling to per-tag callbacks, ReadAsync implemented as the canonical subscribe → first-OnDataChange → unsubscribe one-shot pattern. Galaxy.Host csproj flipped to x86 PlatformTarget + Prefer32Bit=true with the ArchestrA.MxAccess HintPath ..\..\lib\ArchestrA.MxAccess.dll reference (lib/ already contains the production DLL). Backend/MxAccessGalaxyBackend is the third IGalaxyBackend implementation (alongside StubGalaxyBackend and DbBackedGalaxyBackend): combines GalaxyRepository (Discover) with MxAccessClient (Read/Write/Subscribe), MessagePack-deserializes inbound write values, MessagePack-serializes outbound read values into ValueBytes, decodes ArrayDimension/SecurityClassification/category_id with the same v1 mapping. Program.cs selects between stub|db|mxaccess via OTOPCUA_GALAXY_BACKEND env var (default = mxaccess); OTOPCUA_GALAXY_ZB_CONN overrides the ZB connection string; OTOPCUA_GALAXY_CLIENT_NAME sets the Wonderware client identity; the StaPump and MxAccessClient lifecycles are tied to the server.RunAsync try/finally so a clean Ctrl+C tears down the COM proxy via Marshal.ReleaseComObject before the pump's WM_QUIT. Live MXAccess smoke tests (MxAccessLiveSmokeTests, net48 x86) — skipped when ZB unreachable or aaBootstrap not running, otherwise verify (1) MxAccessClient.ConnectAsync returns a positive LMXProxyServer handle on the StaPump, (2) MxAccessGalaxyBackend.OpenSession + Discover returns at least one gobject with attributes, (3) MxAccessGalaxyBackend.ReadValues against the first discovered attribute returns a response with the correct TagReference shape (value + quality vary by what's running, so we don't assert specific values). All 3 pass on this dev box. EndToEndIpcTests + IpcHandshakeIntegrationTests moved from Galaxy.Proxy.Tests (net10) to Galaxy.Host.Tests (net48 x86) — the previous test placement silently dropped them at xUnit discovery because Host became net48 x86 and net10 process can't load it. Rewritten to use Shared's FrameReader/FrameWriter directly instead of going through Proxy's GalaxyIpcClient (functionally equivalent — same wire protocol, framing primitives + dispatcher are the production code path verbatim). 7 IPC tests now run cleanly: Hello+heartbeat round-trip, wrong-secret rejection, OpenSession session-id assignment, Discover error-response surfacing, WriteValues per-tag bad status, Subscribe id assignment, Recycle grace window. Phase 2 exit-gate doc (docs/v2/implementation/exit-gate-phase-2.md) supersedes the partial-exit doc with the as-built state — Streams A/B/C complete; D/E gated only on the legacy-Host removal + parity-test rewrite cycle that fundamentally requires multi-day debug iteration; full adversarial-review section ranking 8 findings (2 high, 3 medium, 3 low) all explicitly deferred to Stream D/E or v2.1 with rationale; Stream-D removal checklist gives the next-session entry point with two policy options for the 494 v1 tests (rewrite-to-use-Proxy vs archive-and-write-smaller-v2-parity-suite). Cannot one-shot Stream D.1 in any single session because deleting OtOpcUa.Host requires the v1 IntegrationTests cycle to be retargeted first; that's the structural blocker, not "needs more code" — and the plan itself budgets 3-4 weeks for it.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
181
docs/v2/implementation/exit-gate-phase-2.md
Normal file
181
docs/v2/implementation/exit-gate-phase-2.md
Normal file
@@ -0,0 +1,181 @@
|
||||
# Phase 2 Exit Gate Record (2026-04-18)
|
||||
|
||||
> Supersedes `phase-2-partial-exit-evidence.md`. Captures the as-built state of Phase 2 after
|
||||
> the MXAccess COM client port + DB-backed and MXAccess-backed Galaxy backends + adversarial
|
||||
> review.
|
||||
|
||||
## Status: **Streams A, B, C complete. Stream D + E gated only on legacy-Host removal + parity-test rewrite.**
|
||||
|
||||
The Phase 2 plan exit criterion ("v1 IntegrationTests pass against v2 Galaxy.Proxy + Galaxy.Host
|
||||
topology byte-for-byte") still cannot be auto-validated in a single session. The blocker is no
|
||||
longer "the Galaxy code lift" — that's done in this session — but the structural fact that the
|
||||
494 v1 IntegrationTests instantiate v1 `OtOpcUa.Host` classes directly. They have to be rewritten
|
||||
to use the IPC-fronted Proxy topology before legacy `OtOpcUa.Host` can be deleted, and the plan
|
||||
budgets that work as a multi-day debug-cycle (Task E.1).
|
||||
|
||||
What changed today: the MXAccess COM client now exists in Galaxy.Host with a real
|
||||
`ArchestrA.MxAccess.dll` reference, runs end-to-end against live `LMXProxyServer`, and 3 live
|
||||
COM smoke tests pass on this dev box. `MxAccessGalaxyBackend` (the third
|
||||
`IGalaxyBackend` implementation, alongside `StubGalaxyBackend` and `DbBackedGalaxyBackend`)
|
||||
combines the ported `GalaxyRepository` with the ported `MxAccessClient` so Discover / Read /
|
||||
Write / Subscribe all flow through one production-shape backend. `Program.cs` selects between
|
||||
the three backends via the `OTOPCUA_GALAXY_BACKEND` env var (default = `mxaccess`).
|
||||
|
||||
## Delivered in Phase 2 (full scope, not just scaffolds)
|
||||
|
||||
### Stream A — Driver.Galaxy.Shared (✅ complete)
|
||||
- 9 contract files: Hello/HelloAck (version negotiation), OpenSession/CloseSession/Heartbeat,
|
||||
Discover + GalaxyObjectInfo + GalaxyAttributeInfo, Read/Write + GalaxyDataValue,
|
||||
Subscribe/Unsubscribe/OnDataChange, AlarmSubscribe/Event/Ack, HistoryRead, HostConnectivityStatus,
|
||||
Recycle.
|
||||
- Length-prefixed framing (4-byte BE length + 1-byte kind + MessagePack body) with a
|
||||
16 MiB cap.
|
||||
- Thread-safe `FrameWriter` (semaphore-gated) and single-consumer `FrameReader`.
|
||||
- 6 round-trip tests + reflection-scan that asserts contracts only reference BCL + MessagePack.
|
||||
|
||||
### Stream B — Driver.Galaxy.Host (✅ complete, exceeded original scope)
|
||||
- Real Win32 message pump in `StaPump` — `GetMessage`/`PostThreadMessage`/`PeekMessage`/
|
||||
`PostQuitMessage` P/Invoke, dedicated STA thread, `WM_APP=0x8000` work dispatch, `WM_APP+1`
|
||||
graceful-drain → `PostQuitMessage`, 5s join-on-dispose, responsiveness probe.
|
||||
- Strict `PipeAcl` (allow configured server SID only, deny LocalSystem + Administrators),
|
||||
`PipeServer` with caller-SID verification + per-process shared-secret `Hello` handshake.
|
||||
- Galaxy-specific `MemoryWatchdog` (warn `max(1.5×baseline, +200 MB)`, soft-recycle
|
||||
`max(2×baseline, +200 MB)`, hard ceiling 1.5 GB, slope ≥5 MB/min over 30-min window).
|
||||
- `RecyclePolicy` (1/hr cap + 03:00 daily scheduled), `PostMortemMmf` (1000-entry ring
|
||||
buffer, hard-crash survivable, cross-process readable), `MxAccessHandle : SafeHandle`.
|
||||
- `IGalaxyBackend` interface + 3 implementations:
|
||||
- **`StubGalaxyBackend`** — keeps IPC end-to-end testable without Galaxy.
|
||||
- **`DbBackedGalaxyBackend`** — real Discover via the ported `GalaxyRepository` against ZB.
|
||||
- **`MxAccessGalaxyBackend`** — Discover via DB + Read/Write/Subscribe via the ported
|
||||
`MxAccessClient` over the StaPump.
|
||||
- `GalaxyRepository` ported from v1 (HierarchySql + AttributesSql byte-for-byte identical).
|
||||
- `MxAccessClient` ported from v1 (Connect/Read/Write/Subscribe/Unsubscribe + ConcurrentDict
|
||||
handle tracking + OnDataChange / OnWriteComplete event marshalling). The reconnect loop +
|
||||
Historian plugin loader + extended-attribute query are explicit follow-ups.
|
||||
- `MxProxyAdapter` + `IMxProxy` for COM-isolation testability.
|
||||
- `Program.cs` env-driven backend selection (`OTOPCUA_GALAXY_BACKEND=stub|db|mxaccess`,
|
||||
`OTOPCUA_GALAXY_ZB_CONN`, `OTOPCUA_GALAXY_CLIENT_NAME`, plus the Phase 2 baseline
|
||||
`OTOPCUA_GALAXY_PIPE` / `OTOPCUA_ALLOWED_SID` / `OTOPCUA_GALAXY_SECRET`).
|
||||
- ArchestrA.MxAccess.dll referenced via HintPath at `lib/ArchestrA.MxAccess.dll`. Project
|
||||
flipped to **x86 platform target** (the COM interop requires it).
|
||||
|
||||
### Stream C — Driver.Galaxy.Proxy (✅ complete)
|
||||
- `GalaxyProxyDriver` implements **all 9** capability interfaces — `IDriver`, `ITagDiscovery`,
|
||||
`IReadable`, `IWritable`, `ISubscribable`, `IAlarmSource`, `IHistoryProvider`,
|
||||
`IRediscoverable`, `IHostConnectivityProbe` — each forwarding through the matching IPC
|
||||
contract.
|
||||
- `GalaxyIpcClient` with `CallAsync` (request/response gated through a semaphore so concurrent
|
||||
callers don't interleave frames) + `SendOneWayAsync` for fire-and-forget calls
|
||||
(Unsubscribe / AlarmAck / CloseSession).
|
||||
- `Backoff` (5s → 15s → 60s, capped, reset-on-stable-run), `CircuitBreaker` (3 crashes per
|
||||
5 min opens; 1h → 4h → manual escalation; sticky alert), `HeartbeatMonitor` (2s cadence,
|
||||
3 misses = host dead).
|
||||
|
||||
### Tests
|
||||
- **963 pass / 1 pre-existing baseline** across the full solution.
|
||||
- New in this session:
|
||||
- `StaPumpTests` — pump still passes 3/3 against the real Win32 implementation
|
||||
- `EndToEndIpcTests` (5) — every IPC operation through Pipe + dispatcher + StubBackend
|
||||
- `IpcHandshakeIntegrationTests` (2) — Hello + heartbeat + secret rejection
|
||||
- `GalaxyRepositoryLiveSmokeTests` (5) — live SQL against ZB, skip when ZB unreachable
|
||||
- `MxAccessLiveSmokeTests` (3) — live COM against running `aaBootstrap` + `LMXProxyServer`
|
||||
- All net48 x86 to match Galaxy.Host
|
||||
|
||||
## Adversarial review findings
|
||||
|
||||
Independent pass over the Phase 2 deltas. Findings ranked by severity; **all open items are
|
||||
explicitly deferred to Stream D/E or v2.1 with rationale.**
|
||||
|
||||
### Critical — none.
|
||||
|
||||
### High
|
||||
|
||||
1. **MxAccess `ReadAsync` has a subscription-leak window on cancellation.** The one-shot read
|
||||
uses subscribe → first-OnDataChange → unsubscribe. If the caller cancels between the
|
||||
`SubscribeOnPumpAsync` await and the `tcs.Task` await, the subscription stays installed.
|
||||
*Mitigation:* the StaPump's idempotent unsubscribe path drops orphan subs at disconnect, but
|
||||
a long-running session leaks them. **Fix scoped to Phase 2 follow-up** alongside the proper
|
||||
subscription registry that v1 had.
|
||||
|
||||
2. **No reconnect loop on the MXAccess COM connection.** v1's `MxAccessClient.Monitor` polled
|
||||
a probe tag and triggered reconnect-with-replay on disconnection. The ported client's
|
||||
`ConnectAsync` is one-shot and there's no health monitor. *Mitigation:* the Tier C
|
||||
supervisor on the Proxy side (CircuitBreaker + HeartbeatMonitor) restarts the whole Host
|
||||
process on liveness failure, so connection loss surfaces as a process recycle rather than
|
||||
silent data loss. **Reconnect-without-recycle is a v2.1 refinement** per `driver-stability.md`.
|
||||
|
||||
### Medium
|
||||
|
||||
3. **`MxAccessGalaxyBackend.SubscribeAsync` doesn't push OnDataChange frames back to the
|
||||
Proxy.** The wire frame `MessageKind.OnDataChangeNotification` is defined and `GalaxyProxyDriver`
|
||||
has the `RaiseDataChange` internal entry point, but the Host-side push pipeline isn't wired —
|
||||
the subscribe registers on the COM side but the value just gets discarded. *Mitigation:* the
|
||||
SubscribeAsync handle is still useful for the ack flow, and one-shot reads work. **Push
|
||||
plumbing is the next-session item.**
|
||||
|
||||
4. **`WriteValuesAsync` doesn't await the OnWriteComplete callback.** v1's implementation
|
||||
awaited a TCS keyed on the item handle; the port fires the write and returns success without
|
||||
confirming the runtime accepted it. *Mitigation:* the StatusCode in the response will be 0
|
||||
(Good) for a fire-and-forget — false positive if the runtime rejects post-callback. **Fix
|
||||
needs the same TCS-by-handle pattern as v1; queued.**
|
||||
|
||||
5. **`MxAccessGalaxyBackend.Discover` re-queries SQL on every call.** v1 cached the tree and
|
||||
only refreshed on the deploy-watermark change. *Mitigation:* AttributesSql is the slow one
|
||||
(~30s for a large Galaxy); first-call latency is the symptom, not data loss. **Caching +
|
||||
`IRediscoverable` push is a v2.1 follow-up.**
|
||||
|
||||
### Low
|
||||
|
||||
6. **Live MXAccess test `Backend_ReadValues_against_discovered_attribute_returns_a_response_shape`
|
||||
silently passes if no readable attribute is found.** Documented; the test asserts the *shape*
|
||||
not the *value* because some Galaxy installs are configuration-only.
|
||||
|
||||
7. **`FrameWriter` allocates the length-prefix as a 4-byte heap array per call.** Could be
|
||||
stackalloc. Microbenchmark not done — currently irrelevant.
|
||||
|
||||
8. **`MxProxyAdapter.Unregister` swallows exceptions during `Unregister(handle)`.** v1 did the
|
||||
same; documented as best-effort during teardown. Consider logging the swallow.
|
||||
|
||||
### Out of scope (correctly deferred)
|
||||
|
||||
- Stream D.1 — delete legacy `OtOpcUa.Host`. **Cannot be done in any single session** because
|
||||
the 494 v1 IntegrationTests reference Host classes directly. Requires the test rewrite cycle
|
||||
in Stream E.
|
||||
- Stream E.1 — run v1 IntegrationTests against v2 topology. Requires (a) test rewrite to use
|
||||
Proxy/Host instead of in-process Host classes, then (b) the parity-debug iteration that the
|
||||
plan budgets 3-4 weeks for.
|
||||
- Stream E.2 — Client.CLI walkthrough diff. Requires the v1 baseline capture.
|
||||
- Stream E.3 — four 2026-04-13 stability findings regression tests. Requires the parity test
|
||||
harness from Stream E.1.
|
||||
- Wonderware Historian SDK plugin loader (Task B.1.h). HistoryRead returns a recognisable
|
||||
error until the plugin loader is wired.
|
||||
- Alarm subsystem wire-up (`MxAccessGalaxyBackend.SubscribeAlarmsAsync` is a no-op today).
|
||||
v1's alarm tracking is its own subtree; queued as Phase 2 follow-up.
|
||||
|
||||
## Stream-D removal checklist (next session)
|
||||
|
||||
1. Decide policy on the 494 v1 tests:
|
||||
- **Option A**: rewrite to use `Driver.Galaxy.Proxy` + `Driver.Galaxy.Host` topology
|
||||
(multi-day; full parity validation as a side effect)
|
||||
- **Option B**: archive them as `OtOpcUa.Tests.v1Archive` and write a smaller v2 parity suite
|
||||
against the new topology (faster; less coverage initially)
|
||||
2. Execute the chosen option.
|
||||
3. Delete `src/ZB.MOM.WW.OtOpcUa.Host/`, remove from `.slnx`.
|
||||
4. Update Windows service installer to register two services
|
||||
(`OtOpcUa` + `OtOpcUaGalaxyHost`) with the correct service-account SIDs.
|
||||
5. Migration script for `appsettings.json` Galaxy sections → `DriverInstance.DriverConfig` JSON.
|
||||
6. PR + adversarial review + `exit-gate-phase-2-final.md`.
|
||||
|
||||
## What ships from this session
|
||||
|
||||
Eight commits on `phase-1-configuration` since the previous push:
|
||||
|
||||
- `01fd90c` Phase 1 finish + Phase 2 scaffold
|
||||
- `7a5b535` Admin UI core
|
||||
- `18f93d7` LDAP + SignalR
|
||||
- `a1e9ed4` AVEVA-stack inventory doc
|
||||
- `32eeeb9` Phase 2 A+B+C feature-complete
|
||||
- `549cd36` GalaxyRepository ported + DbBackedBackend + live ZB smoke
|
||||
- `(this commit)` MXAccess COM port + MxAccessGalaxyBackend + live MXAccess smoke + adversarial review
|
||||
|
||||
`494/494` v1 tests still pass. No regressions.
|
||||
@@ -0,0 +1,43 @@
|
||||
using ArchestrA.MxAccess;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
|
||||
/// <summary>
|
||||
/// Delegate matching <c>LMXProxyServer.OnDataChange</c> COM event signature. Allows
|
||||
/// <see cref="MxAccessClient"/> to subscribe via the abstracted <see cref="IMxProxy"/>
|
||||
/// instead of the COM object directly (so the test mock works without MXAccess registered).
|
||||
/// </summary>
|
||||
public delegate void MxDataChangeHandler(
|
||||
int hLMXServerHandle,
|
||||
int phItemHandle,
|
||||
object pvItemValue,
|
||||
int pwItemQuality,
|
||||
object pftItemTimeStamp,
|
||||
ref MXSTATUS_PROXY[] ItemStatus);
|
||||
|
||||
public delegate void MxWriteCompleteHandler(
|
||||
int hLMXServerHandle,
|
||||
int phItemHandle,
|
||||
ref MXSTATUS_PROXY[] ItemStatus);
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction over <c>LMXProxyServer</c> — port of v1 <c>IMxProxy</c>. Same surface area
|
||||
/// so the lifted client behaves identically; only the namespace + apartment-marshalling
|
||||
/// entry-point change.
|
||||
/// </summary>
|
||||
public interface IMxProxy
|
||||
{
|
||||
int Register(string clientName);
|
||||
void Unregister(int handle);
|
||||
|
||||
int AddItem(int handle, string address);
|
||||
void RemoveItem(int handle, int itemHandle);
|
||||
|
||||
void AdviseSupervisory(int handle, int itemHandle);
|
||||
void UnAdviseSupervisory(int handle, int itemHandle);
|
||||
|
||||
void Write(int handle, int itemHandle, object value, int securityClassification);
|
||||
|
||||
event MxDataChangeHandler? OnDataChange;
|
||||
event MxWriteCompleteHandler? OnWriteComplete;
|
||||
}
|
||||
@@ -0,0 +1,178 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ArchestrA.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
|
||||
/// <summary>
|
||||
/// MXAccess runtime client — focused port of v1 <c>MxAccessClient</c>. Owns one
|
||||
/// <c>LMXProxyServer</c> COM connection on the supplied <see cref="StaPump"/>; serializes
|
||||
/// read / write / subscribe through the pump because all COM calls must run on the STA
|
||||
/// thread. Subscriptions are stored so they can be replayed on reconnect (full reconnect
|
||||
/// loop is the deferred-but-non-blocking refinement; this version covers connect/read/write
|
||||
/// /subscribe/unsubscribe — the MVP needed for parity testing).
|
||||
/// </summary>
|
||||
public sealed class MxAccessClient : IDisposable
|
||||
{
|
||||
private readonly StaPump _pump;
|
||||
private readonly IMxProxy _proxy;
|
||||
private readonly string _clientName;
|
||||
|
||||
// Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read).
|
||||
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly ConcurrentDictionary<int, string> _handleToAddress = new();
|
||||
private readonly ConcurrentDictionary<string, Action<string, Vtq>> _subscriptions =
|
||||
new(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly ConcurrentDictionary<int, TaskCompletionSource<bool>> _pendingWrites = new();
|
||||
|
||||
private int _connectionHandle;
|
||||
private bool _connected;
|
||||
|
||||
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName)
|
||||
{
|
||||
_pump = pump;
|
||||
_proxy = proxy;
|
||||
_clientName = clientName;
|
||||
_proxy.OnDataChange += OnDataChange;
|
||||
_proxy.OnWriteComplete += OnWriteComplete;
|
||||
}
|
||||
|
||||
public bool IsConnected => _connected;
|
||||
public int SubscriptionCount => _subscriptions.Count;
|
||||
|
||||
/// <summary>Connects on the STA thread. Idempotent.</summary>
|
||||
public Task<int> ConnectAsync() => _pump.InvokeAsync(() =>
|
||||
{
|
||||
if (_connected) return _connectionHandle;
|
||||
_connectionHandle = _proxy.Register(_clientName);
|
||||
_connected = true;
|
||||
return _connectionHandle;
|
||||
});
|
||||
|
||||
public Task DisconnectAsync() => _pump.InvokeAsync(() =>
|
||||
{
|
||||
if (!_connected) return;
|
||||
try { _proxy.Unregister(_connectionHandle); }
|
||||
finally
|
||||
{
|
||||
_connected = false;
|
||||
_addressToHandle.Clear();
|
||||
_handleToAddress.Clear();
|
||||
}
|
||||
});
|
||||
|
||||
/// <summary>
|
||||
/// One-shot read implemented as a transient subscribe + unsubscribe.
|
||||
/// <c>LMXProxyServer</c> doesn't expose a synchronous read, so the canonical pattern
|
||||
/// (lifted from v1) is to subscribe, await the first OnDataChange, then unsubscribe.
|
||||
/// This method captures that single value.
|
||||
/// </summary>
|
||||
public async Task<Vtq> ReadAsync(string fullReference, TimeSpan timeout, CancellationToken ct)
|
||||
{
|
||||
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
||||
|
||||
var tcs = new TaskCompletionSource<Vtq>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
Action<string, Vtq> oneShot = (_, value) => tcs.TrySetResult(value);
|
||||
|
||||
// Stash the one-shot handler before sending the subscribe, then remove it after firing.
|
||||
_subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot));
|
||||
|
||||
var itemHandle = await SubscribeOnPumpAsync(fullReference);
|
||||
|
||||
using var _ = ct.Register(() => tcs.TrySetCanceled());
|
||||
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
|
||||
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
|
||||
|
||||
// Detach the one-shot handler.
|
||||
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
|
||||
|
||||
return await tcs.Task;
|
||||
}
|
||||
|
||||
public Task WriteAsync(string fullReference, object value, int securityClassification = 0) =>
|
||||
_pump.InvokeAsync(() =>
|
||||
{
|
||||
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
||||
var itemHandle = ResolveItem(fullReference);
|
||||
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification);
|
||||
});
|
||||
|
||||
public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback)
|
||||
{
|
||||
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
||||
|
||||
_subscriptions.AddOrUpdate(fullReference, callback, (_, existing) => Combine(existing, callback));
|
||||
await SubscribeOnPumpAsync(fullReference);
|
||||
}
|
||||
|
||||
public Task UnsubscribeAsync(string fullReference) => _pump.InvokeAsync(() =>
|
||||
{
|
||||
if (!_connected) return;
|
||||
if (!_addressToHandle.TryRemove(fullReference, out var handle)) return;
|
||||
_handleToAddress.TryRemove(handle, out _);
|
||||
_subscriptions.TryRemove(fullReference, out _);
|
||||
|
||||
try
|
||||
{
|
||||
_proxy.UnAdviseSupervisory(_connectionHandle, handle);
|
||||
_proxy.RemoveItem(_connectionHandle, handle);
|
||||
}
|
||||
catch { /* best-effort during teardown */ }
|
||||
});
|
||||
|
||||
private Task<int> SubscribeOnPumpAsync(string fullReference) => _pump.InvokeAsync(() =>
|
||||
{
|
||||
if (_addressToHandle.TryGetValue(fullReference, out var existing)) return existing;
|
||||
|
||||
var itemHandle = _proxy.AddItem(_connectionHandle, fullReference);
|
||||
_addressToHandle[fullReference] = itemHandle;
|
||||
_handleToAddress[itemHandle] = fullReference;
|
||||
_proxy.AdviseSupervisory(_connectionHandle, itemHandle);
|
||||
return itemHandle;
|
||||
});
|
||||
|
||||
private int ResolveItem(string fullReference)
|
||||
{
|
||||
if (_addressToHandle.TryGetValue(fullReference, out var existing)) return existing;
|
||||
var itemHandle = _proxy.AddItem(_connectionHandle, fullReference);
|
||||
_addressToHandle[fullReference] = itemHandle;
|
||||
_handleToAddress[itemHandle] = fullReference;
|
||||
return itemHandle;
|
||||
}
|
||||
|
||||
private void OnDataChange(int hLMXServerHandle, int phItemHandle, object pvItemValue,
|
||||
int pwItemQuality, object pftItemTimeStamp, ref MXSTATUS_PROXY[] itemStatus)
|
||||
{
|
||||
if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return;
|
||||
|
||||
var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow;
|
||||
var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality));
|
||||
var vtq = new Vtq(pvItemValue, ts, quality);
|
||||
|
||||
if (_subscriptions.TryGetValue(fullRef, out var cb)) cb?.Invoke(fullRef, vtq);
|
||||
}
|
||||
|
||||
private void OnWriteComplete(int hLMXServerHandle, int phItemHandle, ref MXSTATUS_PROXY[] itemStatus)
|
||||
{
|
||||
if (_pendingWrites.TryRemove(phItemHandle, out var tcs))
|
||||
tcs.TrySetResult(itemStatus is null || itemStatus.Length == 0 || itemStatus[0].success != 0);
|
||||
}
|
||||
|
||||
private static Action<string, Vtq> Combine(Action<string, Vtq> a, Action<string, Vtq> b)
|
||||
=> (Action<string, Vtq>)Delegate.Combine(a, b)!;
|
||||
|
||||
private static Action<string, Vtq> Remove(Action<string, Vtq> source, Action<string, Vtq> remove)
|
||||
=> (Action<string, Vtq>?)Delegate.Remove(source, remove) ?? ((_, _) => { });
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
try { DisconnectAsync().GetAwaiter().GetResult(); }
|
||||
catch { /* swallow */ }
|
||||
|
||||
_proxy.OnDataChange -= OnDataChange;
|
||||
_proxy.OnWriteComplete -= OnWriteComplete;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
using System;
|
||||
using System.Runtime.InteropServices;
|
||||
using ArchestrA.MxAccess;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
|
||||
/// <summary>
|
||||
/// Concrete <see cref="IMxProxy"/> backed by a real <c>LMXProxyServer</c> COM object.
|
||||
/// Port of v1 <c>MxProxyAdapter</c>. <strong>Must only be constructed on an STA thread</strong>
|
||||
/// — the StaPump owns this instance.
|
||||
/// </summary>
|
||||
public sealed class MxProxyAdapter : IMxProxy, IDisposable
|
||||
{
|
||||
private LMXProxyServer? _lmxProxy;
|
||||
|
||||
public event MxDataChangeHandler? OnDataChange;
|
||||
public event MxWriteCompleteHandler? OnWriteComplete;
|
||||
|
||||
public int Register(string clientName)
|
||||
{
|
||||
_lmxProxy = new LMXProxyServer();
|
||||
_lmxProxy.OnDataChange += ProxyOnDataChange;
|
||||
_lmxProxy.OnWriteComplete += ProxyOnWriteComplete;
|
||||
|
||||
var handle = _lmxProxy.Register(clientName);
|
||||
if (handle <= 0)
|
||||
throw new InvalidOperationException($"LMXProxyServer.Register returned invalid handle: {handle}");
|
||||
return handle;
|
||||
}
|
||||
|
||||
public void Unregister(int handle)
|
||||
{
|
||||
if (_lmxProxy is null) return;
|
||||
try
|
||||
{
|
||||
_lmxProxy.OnDataChange -= ProxyOnDataChange;
|
||||
_lmxProxy.OnWriteComplete -= ProxyOnWriteComplete;
|
||||
_lmxProxy.Unregister(handle);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// ReleaseComObject loop until refcount = 0 — the Tier C SafeHandle wraps this in
|
||||
// production; here the lifetime is owned by the surrounding MxAccessHandle.
|
||||
while (Marshal.IsComObject(_lmxProxy) && Marshal.ReleaseComObject(_lmxProxy) > 0) { }
|
||||
_lmxProxy = null;
|
||||
}
|
||||
}
|
||||
|
||||
public int AddItem(int handle, string address) => _lmxProxy!.AddItem(handle, address);
|
||||
|
||||
public void RemoveItem(int handle, int itemHandle) => _lmxProxy!.RemoveItem(handle, itemHandle);
|
||||
|
||||
public void AdviseSupervisory(int handle, int itemHandle) => _lmxProxy!.AdviseSupervisory(handle, itemHandle);
|
||||
|
||||
public void UnAdviseSupervisory(int handle, int itemHandle) => _lmxProxy!.UnAdvise(handle, itemHandle);
|
||||
|
||||
public void Write(int handle, int itemHandle, object value, int securityClassification) =>
|
||||
_lmxProxy!.Write(handle, itemHandle, value, securityClassification);
|
||||
|
||||
private void ProxyOnDataChange(int hLMXServerHandle, int phItemHandle, object pvItemValue,
|
||||
int pwItemQuality, object pftItemTimeStamp, ref MXSTATUS_PROXY[] ItemStatus)
|
||||
=> OnDataChange?.Invoke(hLMXServerHandle, phItemHandle, pvItemValue, pwItemQuality, pftItemTimeStamp, ref ItemStatus);
|
||||
|
||||
private void ProxyOnWriteComplete(int hLMXServerHandle, int phItemHandle, ref MXSTATUS_PROXY[] ItemStatus)
|
||||
=> OnWriteComplete?.Invoke(hLMXServerHandle, phItemHandle, ref ItemStatus);
|
||||
|
||||
public void Dispose() => Unregister(0);
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
using System;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
|
||||
/// <summary>Value-timestamp-quality triplet — port of v1 <c>Vtq</c>.</summary>
|
||||
public readonly struct Vtq
|
||||
{
|
||||
public object? Value { get; }
|
||||
public DateTime TimestampUtc { get; }
|
||||
public byte Quality { get; }
|
||||
|
||||
public Vtq(object? value, DateTime timestampUtc, byte quality)
|
||||
{
|
||||
Value = value;
|
||||
TimestampUtc = timestampUtc;
|
||||
Quality = quality;
|
||||
}
|
||||
|
||||
/// <summary>OPC DA Good = 192.</summary>
|
||||
public static Vtq Good(object? v) => new(v, DateTime.UtcNow, 192);
|
||||
|
||||
/// <summary>OPC DA Bad = 0.</summary>
|
||||
public static Vtq Bad() => new(null, DateTime.UtcNow, 0);
|
||||
}
|
||||
@@ -0,0 +1,210 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
|
||||
/// <summary>
|
||||
/// Production <see cref="IGalaxyBackend"/> — combines the SQL-backed
|
||||
/// <see cref="GalaxyRepository"/> for Discover with the live MXAccess
|
||||
/// <see cref="MxAccessClient"/> for Read / Write / Subscribe. History stays bad-coded
|
||||
/// until the Wonderware Historian SDK plugin loader (Task B.1.h) lands. Alarms come from
|
||||
/// MxAccess <c>AlarmExtension</c> primitives but the wire-up is also Phase 2 follow-up
|
||||
/// (the v1 alarm subsystem is its own subtree).
|
||||
/// </summary>
|
||||
public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
{
|
||||
private readonly GalaxyRepository _repository;
|
||||
private readonly MxAccessClient _mx;
|
||||
private long _nextSessionId;
|
||||
private long _nextSubscriptionId;
|
||||
|
||||
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
|
||||
|
||||
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
|
||||
{
|
||||
_repository = repository;
|
||||
_mx = mx;
|
||||
}
|
||||
|
||||
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _mx.ConnectAsync();
|
||||
return new OpenSessionResponse { Success = true, SessionId = Interlocked.Increment(ref _nextSessionId) };
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new OpenSessionResponse { Success = false, Error = $"MXAccess connect failed: {ex.Message}" };
|
||||
}
|
||||
}
|
||||
|
||||
public async Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct)
|
||||
{
|
||||
await _mx.DisconnectAsync();
|
||||
}
|
||||
|
||||
public async Task<DiscoverHierarchyResponse> DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
var hierarchy = await _repository.GetHierarchyAsync(ct).ConfigureAwait(false);
|
||||
var attributes = await _repository.GetAttributesAsync(ct).ConfigureAwait(false);
|
||||
|
||||
var attrsByGobject = attributes
|
||||
.GroupBy(a => a.GobjectId)
|
||||
.ToDictionary(g => g.Key, g => g.Select(MapAttribute).ToArray());
|
||||
var nameByGobject = hierarchy.ToDictionary(o => o.GobjectId, o => o.TagName);
|
||||
|
||||
var objects = hierarchy.Select(o => new GalaxyObjectInfo
|
||||
{
|
||||
ContainedName = string.IsNullOrEmpty(o.ContainedName) ? o.TagName : o.ContainedName,
|
||||
TagName = o.TagName,
|
||||
ParentContainedName = o.ParentGobjectId != 0 && nameByGobject.TryGetValue(o.ParentGobjectId, out var p) ? p : null,
|
||||
TemplateCategory = MapCategory(o.CategoryId),
|
||||
Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty<GalaxyAttributeInfo>(),
|
||||
}).ToArray();
|
||||
|
||||
return new DiscoverHierarchyResponse { Success = true, Objects = objects };
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new DiscoverHierarchyResponse { Success = false, Error = ex.Message, Objects = Array.Empty<GalaxyObjectInfo>() };
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<ReadValuesResponse> ReadValuesAsync(ReadValuesRequest req, CancellationToken ct)
|
||||
{
|
||||
if (!_mx.IsConnected) return new ReadValuesResponse { Success = false, Error = "Not connected", Values = Array.Empty<GalaxyDataValue>() };
|
||||
|
||||
var results = new List<GalaxyDataValue>(req.TagReferences.Length);
|
||||
foreach (var reference in req.TagReferences)
|
||||
{
|
||||
try
|
||||
{
|
||||
var vtq = await _mx.ReadAsync(reference, TimeSpan.FromSeconds(5), ct);
|
||||
results.Add(ToWire(reference, vtq));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
results.Add(new GalaxyDataValue
|
||||
{
|
||||
TagReference = reference,
|
||||
StatusCode = 0x80020000u, // Bad_InternalError
|
||||
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
ValueBytes = MessagePackSerializer.Serialize(ex.Message),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return new ReadValuesResponse { Success = true, Values = results.ToArray() };
|
||||
}
|
||||
|
||||
public async Task<WriteValuesResponse> WriteValuesAsync(WriteValuesRequest req, CancellationToken ct)
|
||||
{
|
||||
var results = new List<WriteValueResult>(req.Writes.Length);
|
||||
foreach (var w in req.Writes)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Decode the value back from the MessagePack bytes the Proxy sent.
|
||||
var value = w.ValueBytes is null
|
||||
? null
|
||||
: MessagePackSerializer.Deserialize<object>(w.ValueBytes);
|
||||
|
||||
await _mx.WriteAsync(w.TagReference, value!);
|
||||
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0 });
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0x80020000u, Error = ex.Message });
|
||||
}
|
||||
}
|
||||
return new WriteValuesResponse { Results = results.ToArray() };
|
||||
}
|
||||
|
||||
public async Task<SubscribeResponse> SubscribeAsync(SubscribeRequest req, CancellationToken ct)
|
||||
{
|
||||
var sid = Interlocked.Increment(ref _nextSubscriptionId);
|
||||
|
||||
try
|
||||
{
|
||||
// For each requested tag, register a subscription that publishes back via the
|
||||
// shared MXAccess data-change handler. The OnDataChange push frame to the Proxy
|
||||
// is wired in the upcoming subscription-push pass; for now the value is captured
|
||||
// for the first ReadAsync to hit it (so the subscribe surface itself is functional).
|
||||
foreach (var tag in req.TagReferences)
|
||||
await _mx.SubscribeAsync(tag, (_, __) => { /* push-frame plumbing in next iteration */ });
|
||||
|
||||
_subs[sid] = req.TagReferences;
|
||||
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new SubscribeResponse { Success = false, Error = ex.Message };
|
||||
}
|
||||
}
|
||||
|
||||
public async Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct)
|
||||
{
|
||||
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
|
||||
foreach (var r in refs)
|
||||
await _mx.UnsubscribeAsync(r);
|
||||
}
|
||||
|
||||
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
|
||||
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
|
||||
|
||||
public Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new HistoryReadResponse
|
||||
{
|
||||
Success = false,
|
||||
Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)",
|
||||
Tags = Array.Empty<HistoryTagValues>(),
|
||||
});
|
||||
|
||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||
|
||||
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
|
||||
{
|
||||
TagReference = reference,
|
||||
ValueBytes = vtq.Value is null ? null : MessagePackSerializer.Serialize(vtq.Value),
|
||||
ValueMessagePackType = 0,
|
||||
StatusCode = vtq.Quality >= 192 ? 0u : 0x40000000u, // Good vs Uncertain placeholder
|
||||
SourceTimestampUtcUnixMs = new DateTimeOffset(vtq.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
};
|
||||
|
||||
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
|
||||
{
|
||||
AttributeName = row.AttributeName,
|
||||
MxDataType = row.MxDataType,
|
||||
IsArray = row.IsArray,
|
||||
ArrayDim = row.ArrayDimension is int d and > 0 ? (uint)d : null,
|
||||
SecurityClassification = row.SecurityClassification,
|
||||
IsHistorized = row.IsHistorized,
|
||||
};
|
||||
|
||||
private static string MapCategory(int categoryId) => categoryId switch
|
||||
{
|
||||
1 => "$WinPlatform",
|
||||
3 => "$AppEngine",
|
||||
4 => "$Area",
|
||||
10 => "$UserDefined",
|
||||
11 => "$ApplicationObject",
|
||||
13 => "$Area",
|
||||
17 => "$DeviceIntegration",
|
||||
24 => "$ViewEngine",
|
||||
26 => "$ViewApp",
|
||||
_ => $"category-{categoryId}",
|
||||
};
|
||||
}
|
||||
@@ -2,7 +2,11 @@ using System;
|
||||
using System.Security.Principal;
|
||||
using System.Threading;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host;
|
||||
|
||||
@@ -38,11 +42,44 @@ public static class Program
|
||||
|
||||
Log.Information("OtOpcUaGalaxyHost starting — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue);
|
||||
|
||||
// Real frame dispatcher backed by StubGalaxyBackend until the MXAccess code lift
|
||||
// (Phase 2 Task B.1) replaces the backend with the live MxAccessClient-backed one.
|
||||
var backend = new Backend.StubGalaxyBackend();
|
||||
// Backend selection — env var picks the implementation:
|
||||
// OTOPCUA_GALAXY_BACKEND=stub → StubGalaxyBackend (no Galaxy required)
|
||||
// OTOPCUA_GALAXY_BACKEND=db → DbBackedGalaxyBackend (Discover only, against ZB)
|
||||
// OTOPCUA_GALAXY_BACKEND=mxaccess → MxAccessGalaxyBackend (real COM + ZB; default)
|
||||
var backendKind = Environment.GetEnvironmentVariable("OTOPCUA_GALAXY_BACKEND")?.ToLowerInvariant() ?? "mxaccess";
|
||||
var zbConn = Environment.GetEnvironmentVariable("OTOPCUA_GALAXY_ZB_CONN")
|
||||
?? "Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;";
|
||||
var clientName = Environment.GetEnvironmentVariable("OTOPCUA_GALAXY_CLIENT_NAME") ?? "OtOpcUa-Galaxy.Host";
|
||||
|
||||
IGalaxyBackend backend;
|
||||
StaPump? pump = null;
|
||||
MxAccessClient? mx = null;
|
||||
switch (backendKind)
|
||||
{
|
||||
case "stub":
|
||||
backend = new StubGalaxyBackend();
|
||||
break;
|
||||
case "db":
|
||||
backend = new DbBackedGalaxyBackend(new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }));
|
||||
break;
|
||||
default: // mxaccess
|
||||
pump = new StaPump("Galaxy.Sta");
|
||||
pump.WaitForStartedAsync().GetAwaiter().GetResult();
|
||||
mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName);
|
||||
backend = new MxAccessGalaxyBackend(
|
||||
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }),
|
||||
mx);
|
||||
break;
|
||||
}
|
||||
|
||||
Log.Information("OtOpcUaGalaxyHost backend={Backend}", backendKind);
|
||||
var handler = new GalaxyFrameHandler(backend, Log.Logger);
|
||||
server.RunAsync(handler, cts.Token).GetAwaiter().GetResult();
|
||||
try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); }
|
||||
finally
|
||||
{
|
||||
mx?.Dispose();
|
||||
pump?.Dispose();
|
||||
}
|
||||
|
||||
Log.Information("OtOpcUaGalaxyHost stopped cleanly");
|
||||
return 0;
|
||||
|
||||
@@ -3,10 +3,11 @@
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net48</TargetFramework>
|
||||
<!-- Decision #23: x86 required for MXAccess COM interop. Currently AnyCPU is OK because
|
||||
the actual MXAccess code lift is deferred (it stays in the v1 Host until the Phase 2
|
||||
parity gate); flip to x86 when Task B.1 "move Galaxy code" actually executes. -->
|
||||
<PlatformTarget>AnyCPU</PlatformTarget>
|
||||
<!-- Decision #23: x86 required for MXAccess COM interop. The MxAccess COM client is
|
||||
now ported (Backend/MxAccess/) so we need the x86 platform target for the
|
||||
ArchestrA.MxAccess.dll COM interop reference to resolve at runtime. -->
|
||||
<PlatformTarget>x86</PlatformTarget>
|
||||
<Prefer32Bit>true</Prefer32Bit>
|
||||
<Nullable>enable</Nullable>
|
||||
<LangVersion>latest</LangVersion>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
@@ -29,6 +30,13 @@
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Reference Include="ArchestrA.MxAccess">
|
||||
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
|
||||
<Private>true</Private>
|
||||
</Reference>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||
|
||||
@@ -0,0 +1,181 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.IO.Pipes;
|
||||
using System.Security.Principal;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||
{
|
||||
/// <summary>
|
||||
/// Drives every <see cref="MessageKind"/> the Phase 2 plan exposes through the full
|
||||
/// Host-side stack (<see cref="PipeServer"/> + <see cref="GalaxyFrameHandler"/> +
|
||||
/// <see cref="StubGalaxyBackend"/>) using a hand-rolled IPC client built on Shared's
|
||||
/// <see cref="FrameReader"/>/<see cref="FrameWriter"/>. The Proxy's <c>GalaxyIpcClient</c>
|
||||
/// is net10-only and cannot load in this net48 x86 test process, so we exercise the same
|
||||
/// wire protocol through the framing primitives directly. The dispatcher/backend response
|
||||
/// shapes are the production code path verbatim.
|
||||
/// </summary>
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class EndToEndIpcTests
|
||||
{
|
||||
private static bool IsAdministrator()
|
||||
{
|
||||
using var identity = WindowsIdentity.GetCurrent();
|
||||
return new WindowsPrincipal(identity).IsInRole(WindowsBuiltInRole.Administrator);
|
||||
}
|
||||
|
||||
private sealed class TestStack : IDisposable
|
||||
{
|
||||
public PipeServer Server = null!;
|
||||
public NamedPipeClientStream Stream = null!;
|
||||
public FrameReader Reader = null!;
|
||||
public FrameWriter Writer = null!;
|
||||
public Task ServerTask = null!;
|
||||
public CancellationTokenSource Cts = null!;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Cts.Cancel();
|
||||
try { ServerTask.GetAwaiter().GetResult(); } catch { /* shutdown */ }
|
||||
Server.Dispose();
|
||||
Stream.Dispose();
|
||||
Reader.Dispose();
|
||||
Writer.Dispose();
|
||||
Cts.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<TestStack> StartAsync()
|
||||
{
|
||||
using var identity = WindowsIdentity.GetCurrent();
|
||||
var sid = identity.User!;
|
||||
var pipe = $"OtOpcUaGalaxyE2E-{Guid.NewGuid():N}";
|
||||
const string secret = "e2e-secret";
|
||||
Logger log = new LoggerConfiguration().CreateLogger();
|
||||
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
|
||||
|
||||
var server = new PipeServer(pipe, sid, secret, log);
|
||||
var serverTask = Task.Run(() => server.RunAsync(
|
||||
new GalaxyFrameHandler(new StubGalaxyBackend(), log), cts.Token));
|
||||
|
||||
var stream = new NamedPipeClientStream(".", pipe, PipeDirection.InOut, PipeOptions.Asynchronous);
|
||||
await stream.ConnectAsync(5_000, cts.Token);
|
||||
var reader = new FrameReader(stream, leaveOpen: true);
|
||||
var writer = new FrameWriter(stream, leaveOpen: true);
|
||||
await writer.WriteAsync(MessageKind.Hello,
|
||||
new Hello { PeerName = "e2e", SharedSecret = secret }, cts.Token);
|
||||
var ack = await reader.ReadFrameAsync(cts.Token);
|
||||
if (ack is null || ack.Value.Kind != MessageKind.HelloAck)
|
||||
throw new InvalidOperationException("Hello handshake failed");
|
||||
|
||||
return new TestStack
|
||||
{
|
||||
Server = server,
|
||||
Stream = stream,
|
||||
Reader = reader,
|
||||
Writer = writer,
|
||||
ServerTask = serverTask,
|
||||
Cts = cts,
|
||||
};
|
||||
}
|
||||
|
||||
private static async Task<TResp> RoundTripAsync<TReq, TResp>(
|
||||
TestStack s, MessageKind reqKind, TReq req, MessageKind respKind)
|
||||
{
|
||||
await s.Writer.WriteAsync(reqKind, req, s.Cts.Token);
|
||||
var frame = await s.Reader.ReadFrameAsync(s.Cts.Token);
|
||||
frame.HasValue.ShouldBeTrue();
|
||||
frame!.Value.Kind.ShouldBe(respKind);
|
||||
return MessagePackSerializer.Deserialize<TResp>(frame.Value.Body);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSession_succeeds_with_an_assigned_session_id()
|
||||
{
|
||||
if (IsAdministrator()) return;
|
||||
using var s = await StartAsync();
|
||||
|
||||
var resp = await RoundTripAsync<OpenSessionRequest, OpenSessionResponse>(
|
||||
s, MessageKind.OpenSessionRequest,
|
||||
new OpenSessionRequest { DriverInstanceId = "gal-e2e", DriverConfigJson = "{}" },
|
||||
MessageKind.OpenSessionResponse);
|
||||
|
||||
resp.Success.ShouldBeTrue();
|
||||
resp.SessionId.ShouldBeGreaterThan(0L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Discover_against_stub_returns_an_error_response()
|
||||
{
|
||||
if (IsAdministrator()) return;
|
||||
using var s = await StartAsync();
|
||||
|
||||
var resp = await RoundTripAsync<DiscoverHierarchyRequest, DiscoverHierarchyResponse>(
|
||||
s, MessageKind.DiscoverHierarchyRequest,
|
||||
new DiscoverHierarchyRequest { SessionId = 1 },
|
||||
MessageKind.DiscoverHierarchyResponse);
|
||||
|
||||
resp.Success.ShouldBeFalse();
|
||||
resp.Error.ShouldContain("MXAccess code lift pending");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteValues_returns_per_tag_BadInternalError_status()
|
||||
{
|
||||
if (IsAdministrator()) return;
|
||||
using var s = await StartAsync();
|
||||
|
||||
var resp = await RoundTripAsync<WriteValuesRequest, WriteValuesResponse>(
|
||||
s, MessageKind.WriteValuesRequest,
|
||||
new WriteValuesRequest
|
||||
{
|
||||
SessionId = 1,
|
||||
Writes = new[] { new GalaxyDataValue { TagReference = "TagA" } },
|
||||
},
|
||||
MessageKind.WriteValuesResponse);
|
||||
|
||||
resp.Results.Length.ShouldBe(1);
|
||||
resp.Results[0].StatusCode.ShouldBe(0x80020000u);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribe_returns_a_subscription_id()
|
||||
{
|
||||
if (IsAdministrator()) return;
|
||||
using var s = await StartAsync();
|
||||
|
||||
var sub = await RoundTripAsync<SubscribeRequest, SubscribeResponse>(
|
||||
s, MessageKind.SubscribeRequest,
|
||||
new SubscribeRequest { SessionId = 1, TagReferences = new[] { "TagA" }, RequestedIntervalMs = 500 },
|
||||
MessageKind.SubscribeResponse);
|
||||
|
||||
sub.Success.ShouldBeTrue();
|
||||
sub.SubscriptionId.ShouldBeGreaterThan(0L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Recycle_returns_the_grace_window_from_the_backend()
|
||||
{
|
||||
if (IsAdministrator()) return;
|
||||
using var s = await StartAsync();
|
||||
|
||||
var resp = await RoundTripAsync<RecycleHostRequest, RecycleStatusResponse>(
|
||||
s, MessageKind.RecycleHostRequest,
|
||||
new RecycleHostRequest { Kind = "Soft", Reason = "test" },
|
||||
MessageKind.RecycleStatusResponse);
|
||||
|
||||
resp.Accepted.ShouldBeTrue();
|
||||
resp.GraceSeconds.ShouldBe(15);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.IO.Pipes;
|
||||
using System.Security.Principal;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||
{
|
||||
/// <summary>
|
||||
/// Direct IPC handshake test — drives <see cref="PipeServer"/> with a hand-rolled client
|
||||
/// built on <see cref="FrameReader"/>/<see cref="FrameWriter"/> from Shared. Stays in
|
||||
/// net48 x86 alongside the Host (the Proxy's <c>GalaxyIpcClient</c> is net10 only and
|
||||
/// cannot be loaded into this process). Functionally equivalent to going through
|
||||
/// <c>GalaxyIpcClient</c> — proves the wire protocol + ACL + shared-secret enforcement.
|
||||
/// Skipped on Administrator shells per the same PipeAcl-denies-Administrators guard.
|
||||
/// </summary>
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class IpcHandshakeIntegrationTests
|
||||
{
|
||||
private static bool IsAdministrator()
|
||||
{
|
||||
using var identity = WindowsIdentity.GetCurrent();
|
||||
return new WindowsPrincipal(identity).IsInRole(WindowsBuiltInRole.Administrator);
|
||||
}
|
||||
|
||||
private static async Task<(NamedPipeClientStream Stream, FrameReader Reader, FrameWriter Writer)>
|
||||
ConnectAndHelloAsync(string pipeName, string secret, CancellationToken ct)
|
||||
{
|
||||
var stream = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
|
||||
await stream.ConnectAsync(5_000, ct);
|
||||
|
||||
var reader = new FrameReader(stream, leaveOpen: true);
|
||||
var writer = new FrameWriter(stream, leaveOpen: true);
|
||||
await writer.WriteAsync(MessageKind.Hello,
|
||||
new Hello { PeerName = "test-client", SharedSecret = secret }, ct);
|
||||
|
||||
var ack = await reader.ReadFrameAsync(ct);
|
||||
if (ack is null) throw new EndOfStreamException("no HelloAck");
|
||||
if (ack.Value.Kind != MessageKind.HelloAck) throw new InvalidOperationException("unexpected first frame");
|
||||
var ackMsg = MessagePackSerializer.Deserialize<HelloAck>(ack.Value.Body);
|
||||
if (!ackMsg.Accepted) throw new UnauthorizedAccessException(ackMsg.RejectReason);
|
||||
|
||||
return (stream, reader, writer);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Handshake_with_correct_secret_succeeds_and_heartbeat_round_trips()
|
||||
{
|
||||
if (IsAdministrator()) return;
|
||||
|
||||
using var identity = WindowsIdentity.GetCurrent();
|
||||
var sid = identity.User!;
|
||||
var pipe = $"OtOpcUaGalaxyTest-{Guid.NewGuid():N}";
|
||||
const string secret = "test-secret-2026";
|
||||
Logger log = new LoggerConfiguration().CreateLogger();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
|
||||
var server = new PipeServer(pipe, sid, secret, log);
|
||||
var serverTask = Task.Run(() => server.RunOneConnectionAsync(
|
||||
new GalaxyFrameHandler(new StubGalaxyBackend(), log), cts.Token));
|
||||
|
||||
var (stream, reader, writer) = await ConnectAndHelloAsync(pipe, secret, cts.Token);
|
||||
using (stream)
|
||||
using (reader)
|
||||
using (writer)
|
||||
{
|
||||
await writer.WriteAsync(MessageKind.Heartbeat,
|
||||
new Heartbeat { SequenceNumber = 42, UtcUnixMs = 1000 }, cts.Token);
|
||||
|
||||
var hbAckFrame = await reader.ReadFrameAsync(cts.Token);
|
||||
hbAckFrame.HasValue.ShouldBeTrue();
|
||||
hbAckFrame!.Value.Kind.ShouldBe(MessageKind.HeartbeatAck);
|
||||
MessagePackSerializer.Deserialize<HeartbeatAck>(hbAckFrame.Value.Body).SequenceNumber.ShouldBe(42L);
|
||||
}
|
||||
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch { /* shutdown */ }
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Handshake_with_wrong_secret_is_rejected()
|
||||
{
|
||||
if (IsAdministrator()) return;
|
||||
|
||||
using var identity = WindowsIdentity.GetCurrent();
|
||||
var sid = identity.User!;
|
||||
var pipe = $"OtOpcUaGalaxyTest-{Guid.NewGuid():N}";
|
||||
Logger log = new LoggerConfiguration().CreateLogger();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
|
||||
var server = new PipeServer(pipe, sid, "real-secret", log);
|
||||
var serverTask = Task.Run(() => server.RunOneConnectionAsync(
|
||||
new GalaxyFrameHandler(new StubGalaxyBackend(), log), cts.Token));
|
||||
|
||||
await Should.ThrowAsync<UnauthorizedAccessException>(async () =>
|
||||
{
|
||||
var (s, r, w) = await ConnectAndHelloAsync(pipe, "wrong-secret", cts.Token);
|
||||
s.Dispose();
|
||||
r.Dispose();
|
||||
w.Dispose();
|
||||
});
|
||||
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch { /* shutdown */ }
|
||||
server.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,116 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||
{
|
||||
/// <summary>
|
||||
/// End-to-end smoke against the live MXAccess COM runtime + Galaxy ZB DB on this dev box.
|
||||
/// Skipped when ArchestrA bootstrap (<c>aaBootstrap</c>) isn't running. Verifies the
|
||||
/// ported <see cref="MxAccessClient"/> can connect to <c>LMXProxyServer</c>, the
|
||||
/// <see cref="MxAccessGalaxyBackend"/> can answer Discover against the live ZB schema,
|
||||
/// and a one-shot read returns a valid VTQ for the first deployed attribute it finds.
|
||||
/// </summary>
|
||||
[Trait("Category", "LiveMxAccess")]
|
||||
public sealed class MxAccessLiveSmokeTests
|
||||
{
|
||||
private static GalaxyRepositoryOptions DevZb() => new()
|
||||
{
|
||||
ConnectionString = "Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;Connect Timeout=2;",
|
||||
CommandTimeoutSeconds = 10,
|
||||
};
|
||||
|
||||
private static async Task<bool> ArchestraReachableAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
var repo = new GalaxyRepository(DevZb());
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
||||
if (!await repo.TestConnectionAsync(cts.Token)) return false;
|
||||
|
||||
using var sc = new System.ServiceProcess.ServiceController("aaBootstrap");
|
||||
return sc.Status == System.ServiceProcess.ServiceControllerStatus.Running;
|
||||
}
|
||||
catch { return false; }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Connect_to_local_LMXProxyServer_succeeds()
|
||||
{
|
||||
if (!await ArchestraReachableAsync()) return;
|
||||
|
||||
using var pump = new StaPump("MxA-test-pump");
|
||||
await pump.WaitForStartedAsync();
|
||||
|
||||
using var mx = new MxAccessClient(pump, new MxProxyAdapter(), "OtOpcUa-MxAccessSmoke");
|
||||
var handle = await mx.ConnectAsync();
|
||||
handle.ShouldBeGreaterThan(0);
|
||||
mx.IsConnected.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Backend_OpenSession_then_Discover_returns_objects_with_attributes()
|
||||
{
|
||||
if (!await ArchestraReachableAsync()) return;
|
||||
|
||||
using var pump = new StaPump("MxA-test-pump");
|
||||
await pump.WaitForStartedAsync();
|
||||
using var mx = new MxAccessClient(pump, new MxProxyAdapter(), "OtOpcUa-MxAccessSmoke");
|
||||
var backend = new MxAccessGalaxyBackend(new GalaxyRepository(DevZb()), mx);
|
||||
|
||||
var session = await backend.OpenSessionAsync(new OpenSessionRequest { DriverInstanceId = "smoke" }, CancellationToken.None);
|
||||
session.Success.ShouldBeTrue(session.Error);
|
||||
|
||||
var resp = await backend.DiscoverAsync(new DiscoverHierarchyRequest { SessionId = session.SessionId }, CancellationToken.None);
|
||||
resp.Success.ShouldBeTrue(resp.Error);
|
||||
resp.Objects.Length.ShouldBeGreaterThan(0);
|
||||
|
||||
await backend.CloseSessionAsync(new CloseSessionRequest { SessionId = session.SessionId }, CancellationToken.None);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Live one-shot read against any attribute we discover. Best-effort — passes silently
|
||||
/// if no readable attribute is exposed (some Galaxy installs are configuration-only;
|
||||
/// we only assert the call shape is correct, not a specific value).
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Backend_ReadValues_against_discovered_attribute_returns_a_response_shape()
|
||||
{
|
||||
if (!await ArchestraReachableAsync()) return;
|
||||
|
||||
using var pump = new StaPump("MxA-test-pump");
|
||||
await pump.WaitForStartedAsync();
|
||||
using var mx = new MxAccessClient(pump, new MxProxyAdapter(), "OtOpcUa-MxAccessSmoke");
|
||||
var backend = new MxAccessGalaxyBackend(new GalaxyRepository(DevZb()), mx);
|
||||
|
||||
var session = await backend.OpenSessionAsync(new OpenSessionRequest { DriverInstanceId = "smoke" }, CancellationToken.None);
|
||||
var disc = await backend.DiscoverAsync(new DiscoverHierarchyRequest { SessionId = session.SessionId }, CancellationToken.None);
|
||||
var firstAttr = System.Linq.Enumerable.FirstOrDefault(disc.Objects, o => o.Attributes.Length > 0);
|
||||
if (firstAttr is null)
|
||||
{
|
||||
await backend.CloseSessionAsync(new CloseSessionRequest { SessionId = session.SessionId }, CancellationToken.None);
|
||||
return;
|
||||
}
|
||||
|
||||
var fullRef = $"{firstAttr.TagName}.{firstAttr.Attributes[0].AttributeName}";
|
||||
var read = await backend.ReadValuesAsync(
|
||||
new ReadValuesRequest { SessionId = session.SessionId, TagReferences = new[] { fullRef } },
|
||||
CancellationToken.None);
|
||||
|
||||
read.Success.ShouldBeTrue();
|
||||
read.Values.Length.ShouldBe(1);
|
||||
// We don't assert the value (it may be Bad/Uncertain depending on what's running);
|
||||
// we only assert the response shape is correct end-to-end.
|
||||
read.Values[0].TagReference.ShouldBe(fullRef);
|
||||
|
||||
await backend.CloseSessionAsync(new CloseSessionRequest { SessionId = session.SessionId }, CancellationToken.None);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,8 @@
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net48</TargetFramework>
|
||||
<PlatformTarget>x86</PlatformTarget>
|
||||
<Prefer32Bit>true</Prefer32Bit>
|
||||
<Nullable>enable</Nullable>
|
||||
<LangVersion>latest</LangVersion>
|
||||
<IsPackable>false</IsPackable>
|
||||
@@ -21,6 +23,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj"/>
|
||||
<Reference Include="System.ServiceProcess"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -1,191 +0,0 @@
|
||||
using System.Security.Principal;
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Drives every <see cref="MessageKind"/> through the full IPC stack — Host
|
||||
/// <see cref="GalaxyFrameHandler"/> backed by <see cref="StubGalaxyBackend"/> on one end,
|
||||
/// <see cref="GalaxyProxyDriver"/> on the other — to prove the wire protocol, dispatcher,
|
||||
/// and capability forwarding agree end-to-end. The "stub backend" replies with success for
|
||||
/// lifecycle/subscribe/recycle and a recognizable "not-implemented" error for the data-plane
|
||||
/// calls that need the deferred MXAccess lift; the test asserts both shapes.
|
||||
/// </summary>
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class EndToEndIpcTests
|
||||
{
|
||||
private static bool IsAdministrator()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows()) return false;
|
||||
using var identity = WindowsIdentity.GetCurrent();
|
||||
return new WindowsPrincipal(identity).IsInRole(WindowsBuiltInRole.Administrator);
|
||||
}
|
||||
|
||||
private static (string Pipe, string Secret, SecurityIdentifier Sid) MakeIpcParams() =>
|
||||
($"OtOpcUaGalaxyE2E-{Guid.NewGuid():N}",
|
||||
"e2e-secret",
|
||||
WindowsIdentity.GetCurrent().User!);
|
||||
|
||||
private static async Task<(GalaxyProxyDriver Driver, CancellationTokenSource Cts, Task ServerTask, PipeServer Server)>
|
||||
StartStackAsync()
|
||||
{
|
||||
var (pipe, secret, sid) = MakeIpcParams();
|
||||
Logger log = new LoggerConfiguration().CreateLogger();
|
||||
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
|
||||
|
||||
var server = new PipeServer(pipe, sid, secret, log);
|
||||
var backend = new StubGalaxyBackend();
|
||||
var handler = new GalaxyFrameHandler(backend, log);
|
||||
var serverTask = Task.Run(() => server.RunAsync(handler, cts.Token));
|
||||
|
||||
var driver = new GalaxyProxyDriver(new GalaxyProxyOptions
|
||||
{
|
||||
DriverInstanceId = "gal-e2e",
|
||||
PipeName = pipe,
|
||||
SharedSecret = secret,
|
||||
ConnectTimeout = TimeSpan.FromSeconds(5),
|
||||
});
|
||||
|
||||
await driver.InitializeAsync(driverConfigJson: "{}", cts.Token);
|
||||
return (driver, cts, serverTask, server);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Initialize_succeeds_via_OpenSession_and_health_goes_Healthy()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
|
||||
|
||||
var (driver, cts, serverTask, server) = await StartStackAsync();
|
||||
try
|
||||
{
|
||||
driver.GetHealth().State.ShouldBe(DriverState.Healthy);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await driver.ShutdownAsync(CancellationToken.None);
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch { /* shutdown */ }
|
||||
server.Dispose();
|
||||
driver.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Read_returns_Bad_status_for_each_requested_reference_until_backend_lifted()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
|
||||
|
||||
var (driver, cts, serverTask, server) = await StartStackAsync();
|
||||
try
|
||||
{
|
||||
// Stub backend currently fails the whole batch with a "not-implemented" error;
|
||||
// the driver surfaces this as InvalidOperationException with the error text.
|
||||
var ex = await Should.ThrowAsync<InvalidOperationException>(() =>
|
||||
driver.ReadAsync(["TagA", "TagB"], cts.Token));
|
||||
ex.Message.ShouldContain("MXAccess code lift pending");
|
||||
}
|
||||
finally
|
||||
{
|
||||
await driver.ShutdownAsync(CancellationToken.None);
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch { }
|
||||
server.Dispose();
|
||||
driver.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_returns_per_tag_BadInternalError_status_until_backend_lifted()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
|
||||
|
||||
var (driver, cts, serverTask, server) = await StartStackAsync();
|
||||
try
|
||||
{
|
||||
// Stub backend's WriteValuesAsync returns a per-tag bad status — the proxy
|
||||
// surfaces those without throwing.
|
||||
var results = await driver.WriteAsync([new WriteRequest("TagA", 42)], cts.Token);
|
||||
results.Count.ShouldBe(1);
|
||||
results[0].StatusCode.ShouldBe(0x80020000u); // Bad_InternalError
|
||||
}
|
||||
finally
|
||||
{
|
||||
await driver.ShutdownAsync(CancellationToken.None);
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch { }
|
||||
server.Dispose();
|
||||
driver.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribe_returns_handle_then_Unsubscribe_closes_cleanly()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
|
||||
|
||||
var (driver, cts, serverTask, server) = await StartStackAsync();
|
||||
try
|
||||
{
|
||||
var handle = await driver.SubscribeAsync(
|
||||
["TagA"], TimeSpan.FromMilliseconds(500), cts.Token);
|
||||
handle.DiagnosticId.ShouldStartWith("galaxy-sub-");
|
||||
|
||||
await driver.UnsubscribeAsync(handle, cts.Token); // one-way; just verify no throw
|
||||
}
|
||||
finally
|
||||
{
|
||||
await driver.ShutdownAsync(CancellationToken.None);
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch { }
|
||||
server.Dispose();
|
||||
driver.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscribeAlarms_and_Acknowledge_round_trip_without_errors()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
|
||||
|
||||
var (driver, cts, serverTask, server) = await StartStackAsync();
|
||||
try
|
||||
{
|
||||
var handle = await driver.SubscribeAlarmsAsync(["Eq001"], cts.Token);
|
||||
handle.DiagnosticId.ShouldNotBeNullOrEmpty();
|
||||
|
||||
await driver.AcknowledgeAsync(
|
||||
[new AlarmAcknowledgeRequest("Eq001", "evt-1", "test ack")],
|
||||
cts.Token);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await driver.ShutdownAsync(CancellationToken.None);
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch { }
|
||||
server.Dispose();
|
||||
driver.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadProcessed_throws_NotSupported_immediately_without_round_trip()
|
||||
{
|
||||
// No IPC needed — the proxy short-circuits to NotSupportedException per the v2 design
|
||||
// (Galaxy Historian only supports raw reads; processed reads are an OPC UA aggregate
|
||||
// computed by the OPC UA stack, not the driver).
|
||||
var driver = new GalaxyProxyDriver(new GalaxyProxyOptions
|
||||
{
|
||||
DriverInstanceId = "gal-stub", PipeName = "x", SharedSecret = "x",
|
||||
});
|
||||
await Should.ThrowAsync<NotSupportedException>(() =>
|
||||
driver.ReadProcessedAsync("TagA", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow,
|
||||
TimeSpan.FromMinutes(1), HistoryAggregateType.Average, CancellationToken.None));
|
||||
}
|
||||
}
|
||||
@@ -1,91 +0,0 @@
|
||||
using System.IO.Pipes;
|
||||
using System.Security.Principal;
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end IPC test: <see cref="PipeServer"/> (from Galaxy.Host) accepts a connection from
|
||||
/// the Proxy's <see cref="GalaxyIpcClient"/>. Verifies the Hello handshake, shared-secret
|
||||
/// check, and heartbeat round-trip. Uses the current user's SID so the ACL allows the
|
||||
/// localhost test process. Skipped on non-Windows (pipe ACL is Windows-only).
|
||||
/// </summary>
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class IpcHandshakeIntegrationTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Hello_handshake_with_correct_secret_succeeds_and_heartbeat_round_trips()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows()) return; // pipe ACL is Windows-only
|
||||
if (IsAdministrator()) return; // ACL explicitly denies Administrators — skip on admin shells
|
||||
|
||||
using var currentIdentity = WindowsIdentity.GetCurrent();
|
||||
var allowedSid = currentIdentity.User!;
|
||||
var pipeName = $"OtOpcUaGalaxyTest-{Guid.NewGuid():N}";
|
||||
const string secret = "test-secret-2026";
|
||||
Logger log = new LoggerConfiguration().CreateLogger();
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
|
||||
var server = new PipeServer(pipeName, allowedSid, secret, log);
|
||||
var serverTask = Task.Run(() => server.RunOneConnectionAsync(new StubFrameHandler(), cts.Token));
|
||||
|
||||
await using var client = await GalaxyIpcClient.ConnectAsync(
|
||||
pipeName, secret, TimeSpan.FromSeconds(5), cts.Token);
|
||||
|
||||
// Heartbeat round-trip via the stub handler.
|
||||
var ack = await client.CallAsync<Heartbeat, HeartbeatAck>(
|
||||
MessageKind.Heartbeat,
|
||||
new Heartbeat { SequenceNumber = 42, UtcUnixMs = 1000 },
|
||||
MessageKind.HeartbeatAck,
|
||||
cts.Token);
|
||||
ack.SequenceNumber.ShouldBe(42L);
|
||||
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch (OperationCanceledException) { }
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Hello_with_wrong_secret_is_rejected()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows()) return;
|
||||
if (IsAdministrator()) return;
|
||||
|
||||
using var currentIdentity = WindowsIdentity.GetCurrent();
|
||||
var allowedSid = currentIdentity.User!;
|
||||
var pipeName = $"OtOpcUaGalaxyTest-{Guid.NewGuid():N}";
|
||||
Logger log = new LoggerConfiguration().CreateLogger();
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
var server = new PipeServer(pipeName, allowedSid, "real-secret", log);
|
||||
var serverTask = Task.Run(() => server.RunOneConnectionAsync(new StubFrameHandler(), cts.Token));
|
||||
|
||||
await Should.ThrowAsync<UnauthorizedAccessException>(() =>
|
||||
GalaxyIpcClient.ConnectAsync(pipeName, "wrong-secret", TimeSpan.FromSeconds(5), cts.Token));
|
||||
|
||||
cts.Cancel();
|
||||
try { await serverTask; } catch { /* server loop ends */ }
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The production ACL explicitly denies Administrators. On dev boxes the interactive user
|
||||
/// is often an Administrator, so the allow rule gets overridden by the deny — the pipe
|
||||
/// refuses the connection. Skip in that case; the production install runs as a dedicated
|
||||
/// non-admin service account.
|
||||
/// </summary>
|
||||
private static bool IsAdministrator()
|
||||
{
|
||||
if (!OperatingSystem.IsWindows()) return false;
|
||||
using var identity = WindowsIdentity.GetCurrent();
|
||||
var principal = new WindowsPrincipal(identity);
|
||||
return principal.IsInRole(WindowsBuiltInRole.Administrator);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user