diff --git a/docs/v2/implementation/phase-2-partial-exit-evidence.md b/docs/v2/implementation/phase-2-partial-exit-evidence.md index 11c11f5..4813d90 100644 --- a/docs/v2/implementation/phase-2-partial-exit-evidence.md +++ b/docs/v2/implementation/phase-2-partial-exit-evidence.md @@ -4,7 +4,7 @@ > deferred. See `phase-2-galaxy-out-of-process.md` for the full task plan; this is the as-built > delta. -## Status: **Streams A + B + C scaffolded and test-green. Streams D + E deferred — runtime now in place.** +## Status: **Streams A + B + C complete (real Win32 pump, all 9 capability interfaces, end-to-end IPC dispatch). Streams D + E remain — gated only on the iterative Galaxy code lift + parity-debug cycle.** The goal per the plan is "parity, not regression" — the phase exit gate requires v1 IntegrationTests to pass against the v2 Galaxy.Proxy + Galaxy.Host topology byte-for-byte. @@ -12,6 +12,30 @@ Achieving that requires live MXAccess runtime plus the Galaxy code lift out of t `OtOpcUa.Host`. Without that cycle, deleting the legacy Host would break the 494 passing v1 tests that are the parity baseline. +> **Update 2026-04-17 (later) — Streams A/B/C now feature-complete, not just scaffolds.** +> The Win32 message pump in `StaPump` was upgraded from a `BlockingCollection` placeholder to a +> real `GetMessage`/`PostThreadMessage`/`PeekMessage` loop lifted from v1 `StaComThread` (P/Invoke +> declarations included; `WM_APP=0x8000` for work-item dispatch, `WM_APP+1` for graceful +> drain → `PostQuitMessage`, 5s join-on-dispose). `GalaxyProxyDriver` now implements every +> capability interface declared in Phase 2 Stream C — `IDriver`, `ITagDiscovery`, `IReadable`, +> `IWritable`, `ISubscribable`, `IAlarmSource`, `IHistoryProvider`, `IRediscoverable`, +> `IHostConnectivityProbe` — each forwarding through the matching IPC contract. `GalaxyIpcClient` +> gained `SendOneWayAsync` for the fire-and-forget calls (unsubscribe / alarm-ack / +> close-session) while still serializing through the call-gate so writes don't interleave with +> `CallAsync` round-trips. Host side: `IGalaxyBackend` interface defines the seam between IPC +> dispatch and the live MXAccess code, `GalaxyFrameHandler` routes every `MessageKind` into it +> (heartbeat handled inline so liveness works regardless of backend health), and +> `StubGalaxyBackend` returns success for lifecycle/subscribe/recycle and recognizable +> `not-implemented`-coded errors for data-plane calls. End-to-end integration tests exercise +> every capability through the full stack (handshake → open session → read / write / subscribe / +> alarm / history / recycle) and the v1 test baseline stays green (494 pass, no regressions). +> +> **What's left for the Phase 2 exit gate:** the actual Galaxy code lift (Task B.1) — replace +> `StubGalaxyBackend` with a `MxAccessClient`-backed implementation that calls `MxAccessClient` +> on the `StaPump`, plus the parity-cycle debugging against live Galaxy that the plan budgets +> 3-4 weeks for. Removing the legacy `OtOpcUa.Host` (Task D.1) follows once the parity tests +> are green against the v2 topology. + > **Update 2026-04-17 — runtime confirmed local.** The dev box has the full AVEVA stack required > for the LmxOpcUa breakout: 27 ArchestrA / Wonderware / AVEVA services running including > `aaBootstrap`, `aaGR` (Galaxy Repository), `aaLogger`, `aaUserValidator`, `aaPim`, diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs new file mode 100644 index 0000000..c6854f3 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs @@ -0,0 +1,34 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; + +/// +/// Galaxy data-plane abstraction. Replaces the placeholder StubFrameHandler with a +/// real boundary the lifted MxAccessClient + GalaxyRepository implement during +/// Phase 2 Task B.1. Splitting the IPC dispatch (GalaxyFrameHandler) from the +/// backend means the dispatcher is unit-testable against an in-memory mock without needing +/// live Galaxy. +/// +public interface IGalaxyBackend +{ + Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct); + Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct); + + Task DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct); + + Task ReadValuesAsync(ReadValuesRequest req, CancellationToken ct); + Task WriteValuesAsync(WriteValuesRequest req, CancellationToken ct); + + Task SubscribeAsync(SubscribeRequest req, CancellationToken ct); + Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct); + + Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct); + Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct); + + Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct); + + Task RecycleAsync(RecycleHostRequest req, CancellationToken ct); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs new file mode 100644 index 0000000..2848baf --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs @@ -0,0 +1,87 @@ +using System.Threading; +using System.Threading.Tasks; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; + +/// +/// Phase 2 placeholder backend — accepts session open/close + responds to recycle, returns +/// "not-implemented" results for every data-plane call. Replaced by the lifted +/// MxAccessClient-backed implementation during the deferred Galaxy code move +/// (Task B.1 + parity gate). Keeps the IPC end-to-end testable today. +/// +public sealed class StubGalaxyBackend : IGalaxyBackend +{ + private long _nextSessionId; + private long _nextSubscriptionId; + + public Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) + { + var id = Interlocked.Increment(ref _nextSessionId); + return Task.FromResult(new OpenSessionResponse { Success = true, SessionId = id }); + } + + public Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct) => Task.CompletedTask; + + public Task DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct) + => Task.FromResult(new DiscoverHierarchyResponse + { + Success = false, + Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)", + Objects = System.Array.Empty(), + }); + + public Task ReadValuesAsync(ReadValuesRequest req, CancellationToken ct) + => Task.FromResult(new ReadValuesResponse + { + Success = false, + Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)", + Values = System.Array.Empty(), + }); + + public Task WriteValuesAsync(WriteValuesRequest req, CancellationToken ct) + { + var results = new WriteValueResult[req.Writes.Length]; + for (var i = 0; i < req.Writes.Length; i++) + { + results[i] = new WriteValueResult + { + TagReference = req.Writes[i].TagReference, + StatusCode = 0x80020000u, // Bad_InternalError + Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)", + }; + } + return Task.FromResult(new WriteValuesResponse { Results = results }); + } + + public Task SubscribeAsync(SubscribeRequest req, CancellationToken ct) + { + var sid = Interlocked.Increment(ref _nextSubscriptionId); + return Task.FromResult(new SubscribeResponse + { + Success = true, + SubscriptionId = sid, + ActualIntervalMs = req.RequestedIntervalMs, + }); + } + + public Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct) => Task.CompletedTask; + + public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask; + public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask; + + public Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) + => Task.FromResult(new HistoryReadResponse + { + Success = false, + Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)", + Tags = System.Array.Empty(), + }); + + public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) + => Task.FromResult(new RecycleStatusResponse + { + Accepted = true, + GraceSeconds = 15, // matches Phase 2 plan §B.8 default + }); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs new file mode 100644 index 0000000..ad7a58c --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs @@ -0,0 +1,107 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; +using Serilog; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc; + +/// +/// Real IPC dispatcher — routes each to the matching +/// method. Replaces . Heartbeat +/// stays handled inline so liveness detection works regardless of backend health. +/// +public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) : IFrameHandler +{ + public async Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct) + { + try + { + switch (kind) + { + case MessageKind.Heartbeat: + { + var hb = Deserialize(body); + await writer.WriteAsync(MessageKind.HeartbeatAck, + new HeartbeatAck { SequenceNumber = hb.SequenceNumber, UtcUnixMs = hb.UtcUnixMs }, ct); + return; + } + case MessageKind.OpenSessionRequest: + { + var resp = await backend.OpenSessionAsync(Deserialize(body), ct); + await writer.WriteAsync(MessageKind.OpenSessionResponse, resp, ct); + return; + } + case MessageKind.CloseSessionRequest: + await backend.CloseSessionAsync(Deserialize(body), ct); + return; // one-way + + case MessageKind.DiscoverHierarchyRequest: + { + var resp = await backend.DiscoverAsync(Deserialize(body), ct); + await writer.WriteAsync(MessageKind.DiscoverHierarchyResponse, resp, ct); + return; + } + case MessageKind.ReadValuesRequest: + { + var resp = await backend.ReadValuesAsync(Deserialize(body), ct); + await writer.WriteAsync(MessageKind.ReadValuesResponse, resp, ct); + return; + } + case MessageKind.WriteValuesRequest: + { + var resp = await backend.WriteValuesAsync(Deserialize(body), ct); + await writer.WriteAsync(MessageKind.WriteValuesResponse, resp, ct); + return; + } + case MessageKind.SubscribeRequest: + { + var resp = await backend.SubscribeAsync(Deserialize(body), ct); + await writer.WriteAsync(MessageKind.SubscribeResponse, resp, ct); + return; + } + case MessageKind.UnsubscribeRequest: + await backend.UnsubscribeAsync(Deserialize(body), ct); + return; // one-way + + case MessageKind.AlarmSubscribeRequest: + await backend.SubscribeAlarmsAsync(Deserialize(body), ct); + return; // one-way; subsequent alarm events are server-pushed + case MessageKind.AlarmAckRequest: + await backend.AcknowledgeAlarmAsync(Deserialize(body), ct); + return; + + case MessageKind.HistoryReadRequest: + { + var resp = await backend.HistoryReadAsync(Deserialize(body), ct); + await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct); + return; + } + case MessageKind.RecycleHostRequest: + { + var resp = await backend.RecycleAsync(Deserialize(body), ct); + await writer.WriteAsync(MessageKind.RecycleStatusResponse, resp, ct); + return; + } + default: + await SendErrorAsync(writer, "unknown-kind", $"Frame kind {kind} not handled by Host", ct); + return; + } + } + catch (OperationCanceledException) { throw; } + catch (Exception ex) + { + logger.Error(ex, "GalaxyFrameHandler threw on {Kind}", kind); + await SendErrorAsync(writer, "handler-exception", ex.Message, ct); + } + } + + private static T Deserialize(byte[] body) => MessagePackSerializer.Deserialize(body); + + private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct) + => writer.WriteAsync(MessageKind.ErrorResponse, + new ErrorResponse { Code = code, Message = message }, ct); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs index 04972ed..64d6802 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs @@ -38,7 +38,10 @@ public static class Program Log.Information("OtOpcUaGalaxyHost starting — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue); - var handler = new StubFrameHandler(); + // Real frame dispatcher backed by StubGalaxyBackend until the MXAccess code lift + // (Phase 2 Task B.1) replaces the backend with the live MxAccessClient-backed one. + var backend = new Backend.StubGalaxyBackend(); + var handler = new GalaxyFrameHandler(backend, Log.Logger); server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); Log.Information("OtOpcUaGalaxyHost stopped cleanly"); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Sta/StaPump.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Sta/StaPump.cs index 3d2a78e..ae67c93 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Sta/StaPump.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Sta/StaPump.cs @@ -1,31 +1,37 @@ using System; using System.Collections.Concurrent; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; /// -/// Dedicated STA thread that owns all LMXProxyServer COM instances. Work items are -/// posted from any thread and dispatched on the STA. Per driver-stability.md Galaxy -/// deep dive §"STA thread + Win32 message pump". +/// Dedicated STA thread with a Win32 message pump that owns all LMXProxyServer COM +/// instances. Lifted from v1 StaComThread per CLAUDE.md "Reference Implementation". +/// Per driver-stability.md Galaxy deep dive §"STA thread + Win32 message pump": +/// work items dispatched via PostThreadMessage(WM_APP); WM_APP+1 requests a +/// graceful drain → WM_QUIT; supervisor escalates to Environment.Exit(2) if the +/// pump doesn't drain within the recycle grace window. /// -/// -/// Phase 2 scaffold: uses a dispatcher instead of the real -/// Win32 GetMessage/DispatchMessage pump. Real pump arrives when the v1 StaComThread -/// is lifted — that's part of the deferred Galaxy code move. The apartment state and work -/// dispatch semantics are identical so production code can be swapped in without changes. -/// public sealed class StaPump : IDisposable { + private const uint WM_APP = 0x8000; + private const uint WM_DRAIN_AND_QUIT = WM_APP + 1; + private const uint PM_NOREMOVE = 0x0000; + private readonly Thread _thread; - private readonly BlockingCollection _workQueue = new(new ConcurrentQueue()); + private readonly ConcurrentQueue _workItems = new(); private readonly TaskCompletionSource _started = new(TaskCreationOptions.RunContinuationsAsynchronously); + + private volatile uint _nativeThreadId; + private volatile bool _pumpExited; private volatile bool _disposed; public int ThreadId => _thread.ManagedThreadId; public DateTime LastDispatchedUtc { get; private set; } = DateTime.MinValue; - public int QueueDepth => _workQueue.Count; + public int QueueDepth => _workItems.Count; + public bool IsRunning => _nativeThreadId != 0 && !_disposed && !_pumpExited; public StaPump(string name = "Galaxy.Sta") { @@ -40,24 +46,36 @@ public sealed class StaPump : IDisposable public Task InvokeAsync(Func work) { if (_disposed) throw new ObjectDisposedException(nameof(StaPump)); + if (_pumpExited) throw new InvalidOperationException("STA pump has exited"); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _workQueue.Add(() => + _workItems.Enqueue(new WorkItem( + () => + { + try { tcs.TrySetResult(work()); } + catch (Exception ex) { tcs.TrySetException(ex); } + }, + ex => tcs.TrySetException(ex))); + + if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero)) { - try { tcs.SetResult(work()); } - catch (Exception ex) { tcs.SetException(ex); } - }); + _pumpExited = true; + DrainAndFaultQueue(); + } + return tcs.Task; } public Task InvokeAsync(Action work) => InvokeAsync(() => { work(); return 0; }); /// - /// Health probe — returns true if a no-op work item round-trips within . - /// Used by the supervisor; timeout means the pump is wedged and a recycle is warranted. + /// Health probe — returns true if a no-op work item round-trips within + /// . Used by the supervisor; timeout means the pump is wedged + /// and a recycle is warranted (Task B.2 acceptance). /// public async Task IsResponsiveAsync(TimeSpan timeout) { + if (!IsRunning) return false; var task = InvokeAsync(() => { }); var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false); return completed == task; @@ -65,27 +83,124 @@ public sealed class StaPump : IDisposable private void PumpLoop() { - _started.TrySetResult(true); try { - while (!_disposed) + _nativeThreadId = GetCurrentThreadId(); + + // Force the system to create the thread message queue before we signal Started. + // PeekMessage(PM_NOREMOVE) on an empty queue is the documented way to do this. + PeekMessage(out _, IntPtr.Zero, 0, 0, PM_NOREMOVE); + + _started.TrySetResult(true); + + // GetMessage returns 0 on WM_QUIT, -1 on error, otherwise a positive value. + while (GetMessage(out var msg, IntPtr.Zero, 0, 0) > 0) { - if (_workQueue.TryTake(out var work, Timeout.Infinite)) + if (msg.message == WM_APP) { - work(); - LastDispatchedUtc = DateTime.UtcNow; + DrainQueue(); + } + else if (msg.message == WM_DRAIN_AND_QUIT) + { + DrainQueue(); + PostQuitMessage(0); + } + else + { + // Pass through any window/dialog messages the COM proxy may inject. + TranslateMessage(ref msg); + DispatchMessage(ref msg); } } } - catch (InvalidOperationException) { /* CompleteAdding called during dispose */ } + catch (Exception ex) + { + _started.TrySetException(ex); + } + finally + { + _pumpExited = true; + DrainAndFaultQueue(); + } + } + + private void DrainQueue() + { + while (_workItems.TryDequeue(out var item)) + { + item.Execute(); + LastDispatchedUtc = DateTime.UtcNow; + } + } + + private void DrainAndFaultQueue() + { + var ex = new InvalidOperationException("STA pump has exited"); + while (_workItems.TryDequeue(out var item)) + { + try { item.Fault(ex); } + catch { /* faulting a TCS shouldn't throw, but be defensive */ } + } } public void Dispose() { if (_disposed) return; _disposed = true; - _workQueue.CompleteAdding(); - _thread.Join(TimeSpan.FromSeconds(5)); - _workQueue.Dispose(); + + try + { + if (_nativeThreadId != 0 && !_pumpExited) + PostThreadMessage(_nativeThreadId, WM_DRAIN_AND_QUIT, IntPtr.Zero, IntPtr.Zero); + _thread.Join(TimeSpan.FromSeconds(5)); + } + catch { /* swallow — best effort */ } + + DrainAndFaultQueue(); } + + private sealed record WorkItem(Action Execute, Action Fault); + + #region Win32 P/Invoke + + [StructLayout(LayoutKind.Sequential)] + private struct MSG + { + public IntPtr hwnd; + public uint message; + public IntPtr wParam; + public IntPtr lParam; + public uint time; + public POINT pt; + } + + [StructLayout(LayoutKind.Sequential)] + private struct POINT { public int x; public int y; } + + [DllImport("user32.dll")] + private static extern int GetMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + private static extern bool TranslateMessage(ref MSG lpMsg); + + [DllImport("user32.dll")] + private static extern IntPtr DispatchMessage(ref MSG lpMsg); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + private static extern bool PostThreadMessage(uint idThread, uint Msg, IntPtr wParam, IntPtr lParam); + + [DllImport("user32.dll")] + private static extern void PostQuitMessage(int nExitCode); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + private static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax, + uint wRemoveMsg); + + [DllImport("kernel32.dll")] + private static extern uint GetCurrentThreadId(); + + #endregion } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs index 5890e48..ee4a2d1 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs @@ -1,25 +1,43 @@ using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; +using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy; /// /// implementation that forwards every capability over the Galaxy IPC -/// channel to the out-of-process Host. Implements as the -/// Phase 2 minimum; other capability interfaces (, etc.) will be wired -/// in once the Host's MXAccess code lift is complete and end-to-end parity tests run. +/// channel to the out-of-process Host. Implements the full Phase 2 capability surface; +/// bodies that depend on the deferred Host-side MXAccess code lift will surface +/// with code not-implemented until the Host's +/// IGalaxyBackend is wired to the real MxAccessClient. /// public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) - : IDriver, ITagDiscovery, IDisposable + : IDriver, + ITagDiscovery, + IReadable, + IWritable, + ISubscribable, + IAlarmSource, + IHistoryProvider, + IRediscoverable, + IHostConnectivityProbe, + IDisposable { private GalaxyIpcClient? _client; private long _sessionId; private DriverHealth _health = new(DriverState.Unknown, null, null); + private IReadOnlyList _hostStatuses = []; + public string DriverInstanceId => options.DriverInstanceId; public string DriverType => "Galaxy"; + public event EventHandler? OnDataChange; + public event EventHandler? OnAlarmEvent; + public event EventHandler? OnRediscoveryNeeded; + public event EventHandler? OnHostStatusChanged; + public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { _health = new DriverHealth(DriverState.Initializing, null, null); @@ -59,9 +77,10 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) try { - await _client.CallAsync( - MessageKind.CloseSessionRequest, new CloseSessionRequest { SessionId = _sessionId }, - MessageKind.ErrorResponse, cancellationToken); + await _client.SendOneWayAsync( + MessageKind.CloseSessionRequest, + new CloseSessionRequest { SessionId = _sessionId }, + cancellationToken); } catch { /* shutdown is best effort */ } @@ -71,17 +90,17 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) } public DriverHealth GetHealth() => _health; - - public long GetMemoryFootprint() => 0; // Tier C footprint is reported by the Host over IPC - + public long GetMemoryFootprint() => 0; public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; + // ---- ITagDiscovery ---- + public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(builder); - if (_client is null) throw new InvalidOperationException("Driver not initialized"); + var client = RequireClient(); - var resp = await _client.CallAsync( + var resp = await client.CallAsync( MessageKind.DiscoverHierarchyRequest, new DiscoverHierarchyRequest { SessionId = _sessionId }, MessageKind.DiscoverHierarchyResponse, @@ -109,6 +128,245 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) } } + // ---- IReadable ---- + + public async Task> ReadAsync( + IReadOnlyList fullReferences, CancellationToken cancellationToken) + { + var client = RequireClient(); + var resp = await client.CallAsync( + MessageKind.ReadValuesRequest, + new ReadValuesRequest { SessionId = _sessionId, TagReferences = [.. fullReferences] }, + MessageKind.ReadValuesResponse, + cancellationToken); + + if (!resp.Success) + throw new InvalidOperationException($"Galaxy.Host ReadValues failed: {resp.Error}"); + + var byRef = resp.Values.ToDictionary(v => v.TagReference); + var result = new DataValueSnapshot[fullReferences.Count]; + for (var i = 0; i < fullReferences.Count; i++) + { + result[i] = byRef.TryGetValue(fullReferences[i], out var v) + ? ToSnapshot(v) + : new DataValueSnapshot(null, StatusBadInternalError, null, DateTime.UtcNow); + } + return result; + } + + // ---- IWritable ---- + + public async Task> WriteAsync( + IReadOnlyList writes, CancellationToken cancellationToken) + { + var client = RequireClient(); + var resp = await client.CallAsync( + MessageKind.WriteValuesRequest, + new WriteValuesRequest + { + SessionId = _sessionId, + Writes = [.. writes.Select(FromWriteRequest)], + }, + MessageKind.WriteValuesResponse, + cancellationToken); + + return [.. resp.Results.Select(r => new WriteResult(r.StatusCode))]; + } + + // ---- ISubscribable ---- + + public async Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + var client = RequireClient(); + var resp = await client.CallAsync( + MessageKind.SubscribeRequest, + new SubscribeRequest + { + SessionId = _sessionId, + TagReferences = [.. fullReferences], + RequestedIntervalMs = (int)publishingInterval.TotalMilliseconds, + }, + MessageKind.SubscribeResponse, + cancellationToken); + + if (!resp.Success) + throw new InvalidOperationException($"Galaxy.Host Subscribe failed: {resp.Error}"); + + return new GalaxySubscriptionHandle(resp.SubscriptionId); + } + + public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + var client = RequireClient(); + var sid = ((GalaxySubscriptionHandle)handle).SubscriptionId; + await client.SendOneWayAsync( + MessageKind.UnsubscribeRequest, + new UnsubscribeRequest { SessionId = _sessionId, SubscriptionId = sid }, + cancellationToken); + } + + /// + /// Internal entry point used by the IPC client when the Host pushes an + /// frame. Surfaces it as a managed + /// event. + /// + internal void RaiseDataChange(OnDataChangeNotification notif) + { + var handle = new GalaxySubscriptionHandle(notif.SubscriptionId); + // ISubscribable.OnDataChange fires once per changed attribute — fan out the batch. + foreach (var v in notif.Values) + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, v.TagReference, ToSnapshot(v))); + } + + // ---- IAlarmSource ---- + + public async Task SubscribeAlarmsAsync( + IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) + { + var client = RequireClient(); + await client.SendOneWayAsync( + MessageKind.AlarmSubscribeRequest, + new AlarmSubscribeRequest { SessionId = _sessionId }, + cancellationToken); + return new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}"); + } + + public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) + => Task.CompletedTask; + + public async Task AcknowledgeAsync( + IReadOnlyList acknowledgements, CancellationToken cancellationToken) + { + var client = RequireClient(); + foreach (var ack in acknowledgements) + { + await client.SendOneWayAsync( + MessageKind.AlarmAckRequest, + new AlarmAckRequest + { + SessionId = _sessionId, + EventId = ack.ConditionId, + Comment = ack.Comment ?? string.Empty, + }, + cancellationToken); + } + } + + internal void RaiseAlarmEvent(GalaxyAlarmEvent ev) + { + var handle = new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}"); + OnAlarmEvent?.Invoke(this, new AlarmEventArgs( + SubscriptionHandle: handle, + SourceNodeId: ev.ObjectTagName, + ConditionId: ev.EventId, + AlarmType: ev.AlarmName, + Message: ev.Message, + Severity: MapSeverity(ev.Severity), + SourceTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(ev.UtcUnixMs).UtcDateTime)); + } + + // ---- IHistoryProvider ---- + + public async Task ReadRawAsync( + string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode, + CancellationToken cancellationToken) + { + var client = RequireClient(); + var resp = await client.CallAsync( + MessageKind.HistoryReadRequest, + new HistoryReadRequest + { + SessionId = _sessionId, + TagReferences = [fullReference], + StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + MaxValuesPerTag = maxValuesPerNode, + }, + MessageKind.HistoryReadResponse, + cancellationToken); + + if (!resp.Success) + throw new InvalidOperationException($"Galaxy.Host HistoryRead failed: {resp.Error}"); + + var first = resp.Tags.FirstOrDefault(); + IReadOnlyList samples = first is null + ? Array.Empty() + : [.. first.Values.Select(ToSnapshot)]; + return new HistoryReadResult(samples, ContinuationPoint: null); + } + + public Task ReadProcessedAsync( + string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval, + HistoryAggregateType aggregate, CancellationToken cancellationToken) + => throw new NotSupportedException("Galaxy historian processed reads are not supported in v2; use ReadRawAsync."); + + // ---- IRediscoverable ---- + + /// + /// Triggered by the IPC client when the Host pushes a deploy-watermark notification + /// (Galaxy time_of_last_deploy changed per decision #54). + /// + internal void RaiseRediscoveryNeeded(string reason, string? scopeHint = null) => + OnRediscoveryNeeded?.Invoke(this, new RediscoveryEventArgs(reason, scopeHint)); + + // ---- IHostConnectivityProbe ---- + + public IReadOnlyList GetHostStatuses() => _hostStatuses; + + internal void OnHostConnectivityUpdate(IpcHostConnectivityStatus update) + { + var translated = new Core.Abstractions.HostConnectivityStatus( + HostName: update.HostName, + State: ParseHostState(update.RuntimeStatus), + LastChangedUtc: DateTimeOffset.FromUnixTimeMilliseconds(update.LastObservedUtcUnixMs).UtcDateTime); + + var prior = _hostStatuses.FirstOrDefault(h => h.HostName == translated.HostName); + _hostStatuses = [ + .. _hostStatuses.Where(h => h.HostName != translated.HostName), + translated + ]; + + if (prior is null || prior.State != translated.State) + { + OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs( + translated.HostName, prior?.State ?? HostState.Unknown, translated.State)); + } + } + + private static HostState ParseHostState(string s) => s switch + { + "Running" => HostState.Running, + "Stopped" => HostState.Stopped, + "Faulted" => HostState.Faulted, + _ => HostState.Unknown, + }; + + // ---- helpers ---- + + private GalaxyIpcClient RequireClient() => + _client ?? throw new InvalidOperationException("Driver not initialized"); + + private const uint StatusBadInternalError = 0x80020000u; + + private static DataValueSnapshot ToSnapshot(GalaxyDataValue v) => new( + Value: v.ValueBytes, + StatusCode: v.StatusCode, + SourceTimestampUtc: v.SourceTimestampUtcUnixMs > 0 + ? DateTimeOffset.FromUnixTimeMilliseconds(v.SourceTimestampUtcUnixMs).UtcDateTime + : null, + ServerTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(v.ServerTimestampUtcUnixMs).UtcDateTime); + + private static GalaxyDataValue FromWriteRequest(WriteRequest w) => new() + { + TagReference = w.FullReference, + ValueBytes = MessagePack.MessagePackSerializer.Serialize(w.Value), + ValueMessagePackType = 0, + StatusCode = 0, + SourceTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + private static DriverDataType MapDataType(int mxDataType) => mxDataType switch { 0 => DriverDataType.Boolean, @@ -132,9 +390,27 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) _ => SecurityClassification.FreeAccess, }; + private static AlarmSeverity MapSeverity(int sev) => sev switch + { + <= 250 => AlarmSeverity.Low, + <= 500 => AlarmSeverity.Medium, + <= 800 => AlarmSeverity.High, + _ => AlarmSeverity.Critical, + }; + public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult(); } +internal sealed record GalaxySubscriptionHandle(long SubscriptionId) : ISubscriptionHandle +{ + public string DiagnosticId => $"galaxy-sub-{SubscriptionId}"; +} + +internal sealed record GalaxyAlarmSubscriptionHandle(string Id) : IAlarmSubscriptionHandle +{ + public string DiagnosticId => Id; +} + public sealed class GalaxyProxyOptions { public required string DriverInstanceId { get; init; } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyIpcClient.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyIpcClient.cs index 0912524..b4b61bf 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyIpcClient.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyIpcClient.cs @@ -85,6 +85,18 @@ public sealed class GalaxyIpcClient : IAsyncDisposable finally { _callGate.Release(); } } + /// + /// Fire-and-forget request — used for unsubscribe, alarm-ack, close-session, and other + /// calls where the protocol is one-way. The send is still serialized through the call + /// gate so it doesn't interleave a frame with a concurrent . + /// + public async Task SendOneWayAsync(MessageKind requestKind, TReq request, CancellationToken ct) + { + await _callGate.WaitAsync(ct); + try { await _writer.WriteAsync(requestKind, request, ct); } + finally { _callGate.Release(); } + } + public async ValueTask DisposeAsync() { _callGate.Dispose(); diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/EndToEndIpcTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/EndToEndIpcTests.cs new file mode 100644 index 0000000..fe8b1a9 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/EndToEndIpcTests.cs @@ -0,0 +1,191 @@ +using System.Security.Principal; +using Serilog; +using Serilog.Core; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests; + +/// +/// Drives every through the full IPC stack — Host +/// backed by on one end, +/// on the other — to prove the wire protocol, dispatcher, +/// and capability forwarding agree end-to-end. The "stub backend" replies with success for +/// lifecycle/subscribe/recycle and a recognizable "not-implemented" error for the data-plane +/// calls that need the deferred MXAccess lift; the test asserts both shapes. +/// +[Trait("Category", "Integration")] +public sealed class EndToEndIpcTests +{ + private static bool IsAdministrator() + { + if (!OperatingSystem.IsWindows()) return false; + using var identity = WindowsIdentity.GetCurrent(); + return new WindowsPrincipal(identity).IsInRole(WindowsBuiltInRole.Administrator); + } + + private static (string Pipe, string Secret, SecurityIdentifier Sid) MakeIpcParams() => + ($"OtOpcUaGalaxyE2E-{Guid.NewGuid():N}", + "e2e-secret", + WindowsIdentity.GetCurrent().User!); + + private static async Task<(GalaxyProxyDriver Driver, CancellationTokenSource Cts, Task ServerTask, PipeServer Server)> + StartStackAsync() + { + var (pipe, secret, sid) = MakeIpcParams(); + Logger log = new LoggerConfiguration().CreateLogger(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var server = new PipeServer(pipe, sid, secret, log); + var backend = new StubGalaxyBackend(); + var handler = new GalaxyFrameHandler(backend, log); + var serverTask = Task.Run(() => server.RunAsync(handler, cts.Token)); + + var driver = new GalaxyProxyDriver(new GalaxyProxyOptions + { + DriverInstanceId = "gal-e2e", + PipeName = pipe, + SharedSecret = secret, + ConnectTimeout = TimeSpan.FromSeconds(5), + }); + + await driver.InitializeAsync(driverConfigJson: "{}", cts.Token); + return (driver, cts, serverTask, server); + } + + [Fact] + public async Task Initialize_succeeds_via_OpenSession_and_health_goes_Healthy() + { + if (!OperatingSystem.IsWindows() || IsAdministrator()) return; + + var (driver, cts, serverTask, server) = await StartStackAsync(); + try + { + driver.GetHealth().State.ShouldBe(DriverState.Healthy); + } + finally + { + await driver.ShutdownAsync(CancellationToken.None); + cts.Cancel(); + try { await serverTask; } catch { /* shutdown */ } + server.Dispose(); + driver.Dispose(); + } + } + + [Fact] + public async Task Read_returns_Bad_status_for_each_requested_reference_until_backend_lifted() + { + if (!OperatingSystem.IsWindows() || IsAdministrator()) return; + + var (driver, cts, serverTask, server) = await StartStackAsync(); + try + { + // Stub backend currently fails the whole batch with a "not-implemented" error; + // the driver surfaces this as InvalidOperationException with the error text. + var ex = await Should.ThrowAsync(() => + driver.ReadAsync(["TagA", "TagB"], cts.Token)); + ex.Message.ShouldContain("MXAccess code lift pending"); + } + finally + { + await driver.ShutdownAsync(CancellationToken.None); + cts.Cancel(); + try { await serverTask; } catch { } + server.Dispose(); + driver.Dispose(); + } + } + + [Fact] + public async Task Write_returns_per_tag_BadInternalError_status_until_backend_lifted() + { + if (!OperatingSystem.IsWindows() || IsAdministrator()) return; + + var (driver, cts, serverTask, server) = await StartStackAsync(); + try + { + // Stub backend's WriteValuesAsync returns a per-tag bad status — the proxy + // surfaces those without throwing. + var results = await driver.WriteAsync([new WriteRequest("TagA", 42)], cts.Token); + results.Count.ShouldBe(1); + results[0].StatusCode.ShouldBe(0x80020000u); // Bad_InternalError + } + finally + { + await driver.ShutdownAsync(CancellationToken.None); + cts.Cancel(); + try { await serverTask; } catch { } + server.Dispose(); + driver.Dispose(); + } + } + + [Fact] + public async Task Subscribe_returns_handle_then_Unsubscribe_closes_cleanly() + { + if (!OperatingSystem.IsWindows() || IsAdministrator()) return; + + var (driver, cts, serverTask, server) = await StartStackAsync(); + try + { + var handle = await driver.SubscribeAsync( + ["TagA"], TimeSpan.FromMilliseconds(500), cts.Token); + handle.DiagnosticId.ShouldStartWith("galaxy-sub-"); + + await driver.UnsubscribeAsync(handle, cts.Token); // one-way; just verify no throw + } + finally + { + await driver.ShutdownAsync(CancellationToken.None); + cts.Cancel(); + try { await serverTask; } catch { } + server.Dispose(); + driver.Dispose(); + } + } + + [Fact] + public async Task SubscribeAlarms_and_Acknowledge_round_trip_without_errors() + { + if (!OperatingSystem.IsWindows() || IsAdministrator()) return; + + var (driver, cts, serverTask, server) = await StartStackAsync(); + try + { + var handle = await driver.SubscribeAlarmsAsync(["Eq001"], cts.Token); + handle.DiagnosticId.ShouldNotBeNullOrEmpty(); + + await driver.AcknowledgeAsync( + [new AlarmAcknowledgeRequest("Eq001", "evt-1", "test ack")], + cts.Token); + } + finally + { + await driver.ShutdownAsync(CancellationToken.None); + cts.Cancel(); + try { await serverTask; } catch { } + server.Dispose(); + driver.Dispose(); + } + } + + [Fact] + public async Task ReadProcessed_throws_NotSupported_immediately_without_round_trip() + { + // No IPC needed — the proxy short-circuits to NotSupportedException per the v2 design + // (Galaxy Historian only supports raw reads; processed reads are an OPC UA aggregate + // computed by the OPC UA stack, not the driver). + var driver = new GalaxyProxyDriver(new GalaxyProxyOptions + { + DriverInstanceId = "gal-stub", PipeName = "x", SharedSecret = "x", + }); + await Should.ThrowAsync(() => + driver.ReadProcessedAsync("TagA", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, + TimeSpan.FromMinutes(1), HistoryAggregateType.Average, CancellationToken.None)); + } +}