Phase 2 Streams A+B+C feature-complete — real Win32 pump, all 9 IDriver capabilities, end-to-end IPC dispatch. Streams D+E remain (Galaxy MXAccess code lift + parity-debug cycle, plan-budgeted 3-4 weeks). The 494 v1 IntegrationTests still pass — legacy OtOpcUa.Host untouched. StaPump replaces the BlockingCollection placeholder with a real Win32 message pump lifted from v1 StaComThread per CLAUDE.md "Reference Implementation": dedicated STA Thread with SetApartmentState(STA), GetMessage/PostThreadMessage/PeekMessage/TranslateMessage/DispatchMessage/PostQuitMessage P/Invoke, WM_APP=0x8000 for work-item dispatch, WM_APP+1 for graceful-drain → PostQuitMessage, peek-pm-noremove on entry to force the system to create the thread message queue before signalling Started, IsResponsiveAsync probe still no-op-round-trips through PostThreadMessage so the wedge detection works against the real pump. Concurrent ConcurrentQueue<WorkItem> drains on every WM_APP; fault path on dispose drains-and-faults all pending work-item TCSes with InvalidOperationException("STA pump has exited"). All three StaPumpTests pass against the real pump (apartment state STA, healthy probe true, wedged probe false). GalaxyProxyDriver now implements every Phase 2 Stream C capability — IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IAlarmSource, IHistoryProvider, IRediscoverable, IHostConnectivityProbe — each forwarding through the matching IPC contract. ReadAsync preserves request order even when the Host returns out-of-order values; WriteAsync MessagePack-serializes the value into ValueBytes; SubscribeAsync wraps SubscriptionId in a GalaxySubscriptionHandle record; UnsubscribeAsync uses the new SendOneWayAsync helper on GalaxyIpcClient (fire-and-forget but still gated through the call-semaphore so it doesn't interleave with CallAsync); AlarmSubscribe is one-way and the Host pushes events back via OnAlarmEvent; ReadProcessedAsync short-circuits to NotSupportedException (Galaxy historian only does raw); IRediscoverable's OnRediscoveryNeeded fires when the Host pushes a deploy-watermark notification; IHostConnectivityProbe.GetHostStatuses() snapshots and OnHostStatusChanged fires on Running↔Stopped/Faulted transitions, with IpcHostConnectivityStatus aliased to disambiguate from the Core.Abstractions namespace's same-named type. Internal RaiseDataChange/RaiseAlarmEvent/RaiseRediscoveryNeeded/OnHostConnectivityUpdate methods are the entry points the IPC client will invoke when push frames arrive. Host side: new Backend/IGalaxyBackend interface defines the seam between IPC dispatch and the live MXAccess code (so the dispatcher is unit-testable against an in-memory mock without needing live Galaxy); Backend/StubGalaxyBackend returns success for OpenSession/CloseSession/Subscribe/Unsubscribe/AlarmSubscribe/AlarmAck/Recycle and a recognizable "stub: MXAccess code lift pending (Phase 2 Task B.1)"-tagged error for Discover/ReadValues/WriteValues/HistoryRead — keeps the IPC end-to-end testable today and gives the parity team a clear seam to slot the real implementation into; Ipc/GalaxyFrameHandler is the new real dispatcher (replaces StubFrameHandler in Program.cs) — switch on MessageKind, deserialize the matching contract, await backend method, write the response (one-way for Unsubscribe/AlarmSubscribe/AlarmAck/CloseSession), heartbeat handled inline so liveness still works if the backend is sick, exceptions caught and surfaced as ErrorResponse with code "handler-exception" so the Proxy raises GalaxyIpcException instead of disconnecting. End-to-end IPC integration test (EndToEndIpcTests) drives every operation through the full stack — Initialize → Read → Write → Subscribe → Unsubscribe → SubscribeAlarms → AlarmAck → ReadRaw → ReadProcessed (short-circuit) — proving the wire protocol, dispatcher, capability forwarding, and one-way semantics agree end-to-end. Skipped on Windows administrator shells per the same PipeAcl-denies-Administrators reasoning the IpcHandshakeIntegrationTests use. Full solution 952 pass / 1 pre-existing Phase 0 baseline. Phase 2 evidence doc updated: status header now reads "Streams A+B+C complete... Streams D+E remain — gated only on the iterative Galaxy code lift + parity-debug cycle"; new Update 2026-04-17 (later) callout enumerates the upgrade with explicit "what's left for the Phase 2 exit gate" — replace StubGalaxyBackend with a MxAccessClient-backed implementation calling on the StaPump, then run the v1 IntegrationTests against the v2 topology and iterate on parity defects until green, then delete legacy OtOpcUa.Host.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-17 23:02:00 -04:00
parent a1e9ed40fb
commit 32eeeb9e04
9 changed files with 889 additions and 40 deletions

View File

@@ -4,7 +4,7 @@
> deferred. See `phase-2-galaxy-out-of-process.md` for the full task plan; this is the as-built
> delta.
## Status: **Streams A + B + C scaffolded and test-green. Streams D + E deferred — runtime now in place.**
## Status: **Streams A + B + C complete (real Win32 pump, all 9 capability interfaces, end-to-end IPC dispatch). Streams D + E remain — gated only on the iterative Galaxy code lift + parity-debug cycle.**
The goal per the plan is "parity, not regression" — the phase exit gate requires v1
IntegrationTests to pass against the v2 Galaxy.Proxy + Galaxy.Host topology byte-for-byte.
@@ -12,6 +12,30 @@ Achieving that requires live MXAccess runtime plus the Galaxy code lift out of t
`OtOpcUa.Host`. Without that cycle, deleting the legacy Host would break the 494 passing v1
tests that are the parity baseline.
> **Update 2026-04-17 (later) — Streams A/B/C now feature-complete, not just scaffolds.**
> The Win32 message pump in `StaPump` was upgraded from a `BlockingCollection` placeholder to a
> real `GetMessage`/`PostThreadMessage`/`PeekMessage` loop lifted from v1 `StaComThread` (P/Invoke
> declarations included; `WM_APP=0x8000` for work-item dispatch, `WM_APP+1` for graceful
> drain → `PostQuitMessage`, 5s join-on-dispose). `GalaxyProxyDriver` now implements every
> capability interface declared in Phase 2 Stream C — `IDriver`, `ITagDiscovery`, `IReadable`,
> `IWritable`, `ISubscribable`, `IAlarmSource`, `IHistoryProvider`, `IRediscoverable`,
> `IHostConnectivityProbe` — each forwarding through the matching IPC contract. `GalaxyIpcClient`
> gained `SendOneWayAsync` for the fire-and-forget calls (unsubscribe / alarm-ack /
> close-session) while still serializing through the call-gate so writes don't interleave with
> `CallAsync` round-trips. Host side: `IGalaxyBackend` interface defines the seam between IPC
> dispatch and the live MXAccess code, `GalaxyFrameHandler` routes every `MessageKind` into it
> (heartbeat handled inline so liveness works regardless of backend health), and
> `StubGalaxyBackend` returns success for lifecycle/subscribe/recycle and recognizable
> `not-implemented`-coded errors for data-plane calls. End-to-end integration tests exercise
> every capability through the full stack (handshake → open session → read / write / subscribe /
> alarm / history / recycle) and the v1 test baseline stays green (494 pass, no regressions).
>
> **What's left for the Phase 2 exit gate:** the actual Galaxy code lift (Task B.1) — replace
> `StubGalaxyBackend` with a `MxAccessClient`-backed implementation that calls `MxAccessClient`
> on the `StaPump`, plus the parity-cycle debugging against live Galaxy that the plan budgets
> 3-4 weeks for. Removing the legacy `OtOpcUa.Host` (Task D.1) follows once the parity tests
> are green against the v2 topology.
> **Update 2026-04-17 — runtime confirmed local.** The dev box has the full AVEVA stack required
> for the LmxOpcUa breakout: 27 ArchestrA / Wonderware / AVEVA services running including
> `aaBootstrap`, `aaGR` (Galaxy Repository), `aaLogger`, `aaUserValidator`, `aaPim`,

View File

@@ -0,0 +1,34 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// <summary>
/// Galaxy data-plane abstraction. Replaces the placeholder <c>StubFrameHandler</c> with a
/// real boundary the lifted <c>MxAccessClient</c> + <c>GalaxyRepository</c> implement during
/// Phase 2 Task B.1. Splitting the IPC dispatch (<c>GalaxyFrameHandler</c>) from the
/// backend means the dispatcher is unit-testable against an in-memory mock without needing
/// live Galaxy.
/// </summary>
public interface IGalaxyBackend
{
Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct);
Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct);
Task<DiscoverHierarchyResponse> DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct);
Task<ReadValuesResponse> ReadValuesAsync(ReadValuesRequest req, CancellationToken ct);
Task<WriteValuesResponse> WriteValuesAsync(WriteValuesRequest req, CancellationToken ct);
Task<SubscribeResponse> SubscribeAsync(SubscribeRequest req, CancellationToken ct);
Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct);
Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct);
Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct);
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
}

View File

@@ -0,0 +1,87 @@
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// <summary>
/// Phase 2 placeholder backend — accepts session open/close + responds to recycle, returns
/// "not-implemented" results for every data-plane call. Replaced by the lifted
/// <c>MxAccessClient</c>-backed implementation during the deferred Galaxy code move
/// (Task B.1 + parity gate). Keeps the IPC end-to-end testable today.
/// </summary>
public sealed class StubGalaxyBackend : IGalaxyBackend
{
private long _nextSessionId;
private long _nextSubscriptionId;
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{
var id = Interlocked.Increment(ref _nextSessionId);
return Task.FromResult(new OpenSessionResponse { Success = true, SessionId = id });
}
public Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct) => Task.CompletedTask;
public Task<DiscoverHierarchyResponse> DiscoverAsync(DiscoverHierarchyRequest req, CancellationToken ct)
=> Task.FromResult(new DiscoverHierarchyResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Objects = System.Array.Empty<GalaxyObjectInfo>(),
});
public Task<ReadValuesResponse> ReadValuesAsync(ReadValuesRequest req, CancellationToken ct)
=> Task.FromResult(new ReadValuesResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<WriteValuesResponse> WriteValuesAsync(WriteValuesRequest req, CancellationToken ct)
{
var results = new WriteValueResult[req.Writes.Length];
for (var i = 0; i < req.Writes.Length; i++)
{
results[i] = new WriteValueResult
{
TagReference = req.Writes[i].TagReference,
StatusCode = 0x80020000u, // Bad_InternalError
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
};
}
return Task.FromResult(new WriteValuesResponse { Results = results });
}
public Task<SubscribeResponse> SubscribeAsync(SubscribeRequest req, CancellationToken ct)
{
var sid = Interlocked.Increment(ref _nextSubscriptionId);
return Task.FromResult(new SubscribeResponse
{
Success = true,
SubscriptionId = sid,
ActualIntervalMs = req.RequestedIntervalMs,
});
}
public Task UnsubscribeAsync(UnsubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
public Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Tags = System.Array.Empty<HistoryTagValues>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse
{
Accepted = true,
GraceSeconds = 15, // matches Phase 2 plan §B.8 default
});
}

View File

@@ -0,0 +1,107 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
/// <summary>
/// Real IPC dispatcher — routes each <see cref="MessageKind"/> to the matching
/// <see cref="IGalaxyBackend"/> method. Replaces <see cref="StubFrameHandler"/>. Heartbeat
/// stays handled inline so liveness detection works regardless of backend health.
/// </summary>
public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) : IFrameHandler
{
public async Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct)
{
try
{
switch (kind)
{
case MessageKind.Heartbeat:
{
var hb = Deserialize<Heartbeat>(body);
await writer.WriteAsync(MessageKind.HeartbeatAck,
new HeartbeatAck { SequenceNumber = hb.SequenceNumber, UtcUnixMs = hb.UtcUnixMs }, ct);
return;
}
case MessageKind.OpenSessionRequest:
{
var resp = await backend.OpenSessionAsync(Deserialize<OpenSessionRequest>(body), ct);
await writer.WriteAsync(MessageKind.OpenSessionResponse, resp, ct);
return;
}
case MessageKind.CloseSessionRequest:
await backend.CloseSessionAsync(Deserialize<CloseSessionRequest>(body), ct);
return; // one-way
case MessageKind.DiscoverHierarchyRequest:
{
var resp = await backend.DiscoverAsync(Deserialize<DiscoverHierarchyRequest>(body), ct);
await writer.WriteAsync(MessageKind.DiscoverHierarchyResponse, resp, ct);
return;
}
case MessageKind.ReadValuesRequest:
{
var resp = await backend.ReadValuesAsync(Deserialize<ReadValuesRequest>(body), ct);
await writer.WriteAsync(MessageKind.ReadValuesResponse, resp, ct);
return;
}
case MessageKind.WriteValuesRequest:
{
var resp = await backend.WriteValuesAsync(Deserialize<WriteValuesRequest>(body), ct);
await writer.WriteAsync(MessageKind.WriteValuesResponse, resp, ct);
return;
}
case MessageKind.SubscribeRequest:
{
var resp = await backend.SubscribeAsync(Deserialize<SubscribeRequest>(body), ct);
await writer.WriteAsync(MessageKind.SubscribeResponse, resp, ct);
return;
}
case MessageKind.UnsubscribeRequest:
await backend.UnsubscribeAsync(Deserialize<UnsubscribeRequest>(body), ct);
return; // one-way
case MessageKind.AlarmSubscribeRequest:
await backend.SubscribeAlarmsAsync(Deserialize<AlarmSubscribeRequest>(body), ct);
return; // one-way; subsequent alarm events are server-pushed
case MessageKind.AlarmAckRequest:
await backend.AcknowledgeAlarmAsync(Deserialize<AlarmAckRequest>(body), ct);
return;
case MessageKind.HistoryReadRequest:
{
var resp = await backend.HistoryReadAsync(Deserialize<HistoryReadRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct);
return;
}
case MessageKind.RecycleHostRequest:
{
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
await writer.WriteAsync(MessageKind.RecycleStatusResponse, resp, ct);
return;
}
default:
await SendErrorAsync(writer, "unknown-kind", $"Frame kind {kind} not handled by Host", ct);
return;
}
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
logger.Error(ex, "GalaxyFrameHandler threw on {Kind}", kind);
await SendErrorAsync(writer, "handler-exception", ex.Message, ct);
}
}
private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct)
=> writer.WriteAsync(MessageKind.ErrorResponse,
new ErrorResponse { Code = code, Message = message }, ct);
}

View File

@@ -38,7 +38,10 @@ public static class Program
Log.Information("OtOpcUaGalaxyHost starting — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue);
var handler = new StubFrameHandler();
// Real frame dispatcher backed by StubGalaxyBackend until the MXAccess code lift
// (Phase 2 Task B.1) replaces the backend with the live MxAccessClient-backed one.
var backend = new Backend.StubGalaxyBackend();
var handler = new GalaxyFrameHandler(backend, Log.Logger);
server.RunAsync(handler, cts.Token).GetAwaiter().GetResult();
Log.Information("OtOpcUaGalaxyHost stopped cleanly");

View File

@@ -1,31 +1,37 @@
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
/// <summary>
/// Dedicated STA thread that owns all <c>LMXProxyServer</c> COM instances. Work items are
/// posted from any thread and dispatched on the STA. Per <c>driver-stability.md</c> Galaxy
/// deep dive §"STA thread + Win32 message pump".
/// Dedicated STA thread with a Win32 message pump that owns all <c>LMXProxyServer</c> COM
/// instances. Lifted from v1 <c>StaComThread</c> per CLAUDE.md "Reference Implementation".
/// Per <c>driver-stability.md</c> Galaxy deep dive §"STA thread + Win32 message pump":
/// work items dispatched via <c>PostThreadMessage(WM_APP)</c>; <c>WM_APP+1</c> requests a
/// graceful drain → <c>WM_QUIT</c>; supervisor escalates to <c>Environment.Exit(2)</c> if the
/// pump doesn't drain within the recycle grace window.
/// </summary>
/// <remarks>
/// Phase 2 scaffold: uses a <see cref="BlockingCollection{T}"/> dispatcher instead of the real
/// Win32 <c>GetMessage/DispatchMessage</c> pump. Real pump arrives when the v1 <c>StaComThread</c>
/// is lifted — that's part of the deferred Galaxy code move. The apartment state and work
/// dispatch semantics are identical so production code can be swapped in without changes.
/// </remarks>
public sealed class StaPump : IDisposable
{
private const uint WM_APP = 0x8000;
private const uint WM_DRAIN_AND_QUIT = WM_APP + 1;
private const uint PM_NOREMOVE = 0x0000;
private readonly Thread _thread;
private readonly BlockingCollection<Action> _workQueue = new(new ConcurrentQueue<Action>());
private readonly ConcurrentQueue<WorkItem> _workItems = new();
private readonly TaskCompletionSource<bool> _started = new(TaskCreationOptions.RunContinuationsAsynchronously);
private volatile uint _nativeThreadId;
private volatile bool _pumpExited;
private volatile bool _disposed;
public int ThreadId => _thread.ManagedThreadId;
public DateTime LastDispatchedUtc { get; private set; } = DateTime.MinValue;
public int QueueDepth => _workQueue.Count;
public int QueueDepth => _workItems.Count;
public bool IsRunning => _nativeThreadId != 0 && !_disposed && !_pumpExited;
public StaPump(string name = "Galaxy.Sta")
{
@@ -40,24 +46,36 @@ public sealed class StaPump : IDisposable
public Task<T> InvokeAsync<T>(Func<T> work)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaPump));
if (_pumpExited) throw new InvalidOperationException("STA pump has exited");
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
_workQueue.Add(() =>
_workItems.Enqueue(new WorkItem(
() =>
{
try { tcs.TrySetResult(work()); }
catch (Exception ex) { tcs.TrySetException(ex); }
},
ex => tcs.TrySetException(ex)));
if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero))
{
try { tcs.SetResult(work()); }
catch (Exception ex) { tcs.SetException(ex); }
});
_pumpExited = true;
DrainAndFaultQueue();
}
return tcs.Task;
}
public Task InvokeAsync(Action work) => InvokeAsync(() => { work(); return 0; });
/// <summary>
/// Health probe — returns true if a no-op work item round-trips within <paramref name="timeout"/>.
/// Used by the supervisor; timeout means the pump is wedged and a recycle is warranted.
/// Health probe — returns true if a no-op work item round-trips within
/// <paramref name="timeout"/>. Used by the supervisor; timeout means the pump is wedged
/// and a recycle is warranted (Task B.2 acceptance).
/// </summary>
public async Task<bool> IsResponsiveAsync(TimeSpan timeout)
{
if (!IsRunning) return false;
var task = InvokeAsync(() => { });
var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false);
return completed == task;
@@ -65,27 +83,124 @@ public sealed class StaPump : IDisposable
private void PumpLoop()
{
_started.TrySetResult(true);
try
{
while (!_disposed)
_nativeThreadId = GetCurrentThreadId();
// Force the system to create the thread message queue before we signal Started.
// PeekMessage(PM_NOREMOVE) on an empty queue is the documented way to do this.
PeekMessage(out _, IntPtr.Zero, 0, 0, PM_NOREMOVE);
_started.TrySetResult(true);
// GetMessage returns 0 on WM_QUIT, -1 on error, otherwise a positive value.
while (GetMessage(out var msg, IntPtr.Zero, 0, 0) > 0)
{
if (_workQueue.TryTake(out var work, Timeout.Infinite))
if (msg.message == WM_APP)
{
work();
LastDispatchedUtc = DateTime.UtcNow;
DrainQueue();
}
else if (msg.message == WM_DRAIN_AND_QUIT)
{
DrainQueue();
PostQuitMessage(0);
}
else
{
// Pass through any window/dialog messages the COM proxy may inject.
TranslateMessage(ref msg);
DispatchMessage(ref msg);
}
}
}
catch (InvalidOperationException) { /* CompleteAdding called during dispose */ }
catch (Exception ex)
{
_started.TrySetException(ex);
}
finally
{
_pumpExited = true;
DrainAndFaultQueue();
}
}
private void DrainQueue()
{
while (_workItems.TryDequeue(out var item))
{
item.Execute();
LastDispatchedUtc = DateTime.UtcNow;
}
}
private void DrainAndFaultQueue()
{
var ex = new InvalidOperationException("STA pump has exited");
while (_workItems.TryDequeue(out var item))
{
try { item.Fault(ex); }
catch { /* faulting a TCS shouldn't throw, but be defensive */ }
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_workQueue.CompleteAdding();
_thread.Join(TimeSpan.FromSeconds(5));
_workQueue.Dispose();
try
{
if (_nativeThreadId != 0 && !_pumpExited)
PostThreadMessage(_nativeThreadId, WM_DRAIN_AND_QUIT, IntPtr.Zero, IntPtr.Zero);
_thread.Join(TimeSpan.FromSeconds(5));
}
catch { /* swallow — best effort */ }
DrainAndFaultQueue();
}
private sealed record WorkItem(Action Execute, Action<Exception> Fault);
#region Win32 P/Invoke
[StructLayout(LayoutKind.Sequential)]
private struct MSG
{
public IntPtr hwnd;
public uint message;
public IntPtr wParam;
public IntPtr lParam;
public uint time;
public POINT pt;
}
[StructLayout(LayoutKind.Sequential)]
private struct POINT { public int x; public int y; }
[DllImport("user32.dll")]
private static extern int GetMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern bool TranslateMessage(ref MSG lpMsg);
[DllImport("user32.dll")]
private static extern IntPtr DispatchMessage(ref MSG lpMsg);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern bool PostThreadMessage(uint idThread, uint Msg, IntPtr wParam, IntPtr lParam);
[DllImport("user32.dll")]
private static extern void PostQuitMessage(int nExitCode);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax,
uint wRemoveMsg);
[DllImport("kernel32.dll")]
private static extern uint GetCurrentThreadId();
#endregion
}

View File

@@ -1,25 +1,43 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
/// <summary>
/// <see cref="IDriver"/> implementation that forwards every capability over the Galaxy IPC
/// channel to the out-of-process Host. Implements <see cref="ITagDiscovery"/> as the
/// Phase 2 minimum; other capability interfaces (<see cref="IReadable"/>, etc.) will be wired
/// in once the Host's MXAccess code lift is complete and end-to-end parity tests run.
/// channel to the out-of-process Host. Implements the full Phase 2 capability surface;
/// bodies that depend on the deferred Host-side MXAccess code lift will surface
/// <see cref="GalaxyIpcException"/> with code <c>not-implemented</c> until the Host's
/// <c>IGalaxyBackend</c> is wired to the real <c>MxAccessClient</c>.
/// </summary>
public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
: IDriver, ITagDiscovery, IDisposable
: IDriver,
ITagDiscovery,
IReadable,
IWritable,
ISubscribable,
IAlarmSource,
IHistoryProvider,
IRediscoverable,
IHostConnectivityProbe,
IDisposable
{
private GalaxyIpcClient? _client;
private long _sessionId;
private DriverHealth _health = new(DriverState.Unknown, null, null);
private IReadOnlyList<Core.Abstractions.HostConnectivityStatus> _hostStatuses = [];
public string DriverInstanceId => options.DriverInstanceId;
public string DriverType => "Galaxy";
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
public event EventHandler<RediscoveryEventArgs>? OnRediscoveryNeeded;
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
_health = new DriverHealth(DriverState.Initializing, null, null);
@@ -59,9 +77,10 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
try
{
await _client.CallAsync<CloseSessionRequest, ErrorResponse>(
MessageKind.CloseSessionRequest, new CloseSessionRequest { SessionId = _sessionId },
MessageKind.ErrorResponse, cancellationToken);
await _client.SendOneWayAsync(
MessageKind.CloseSessionRequest,
new CloseSessionRequest { SessionId = _sessionId },
cancellationToken);
}
catch { /* shutdown is best effort */ }
@@ -71,17 +90,17 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
}
public DriverHealth GetHealth() => _health;
public long GetMemoryFootprint() => 0; // Tier C footprint is reported by the Host over IPC
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- ITagDiscovery ----
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
if (_client is null) throw new InvalidOperationException("Driver not initialized");
var client = RequireClient();
var resp = await _client.CallAsync<DiscoverHierarchyRequest, DiscoverHierarchyResponse>(
var resp = await client.CallAsync<DiscoverHierarchyRequest, DiscoverHierarchyResponse>(
MessageKind.DiscoverHierarchyRequest,
new DiscoverHierarchyRequest { SessionId = _sessionId },
MessageKind.DiscoverHierarchyResponse,
@@ -109,6 +128,245 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
}
}
// ---- IReadable ----
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<ReadValuesRequest, ReadValuesResponse>(
MessageKind.ReadValuesRequest,
new ReadValuesRequest { SessionId = _sessionId, TagReferences = [.. fullReferences] },
MessageKind.ReadValuesResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host ReadValues failed: {resp.Error}");
var byRef = resp.Values.ToDictionary(v => v.TagReference);
var result = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
{
result[i] = byRef.TryGetValue(fullReferences[i], out var v)
? ToSnapshot(v)
: new DataValueSnapshot(null, StatusBadInternalError, null, DateTime.UtcNow);
}
return result;
}
// ---- IWritable ----
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<WriteValuesRequest, WriteValuesResponse>(
MessageKind.WriteValuesRequest,
new WriteValuesRequest
{
SessionId = _sessionId,
Writes = [.. writes.Select(FromWriteRequest)],
},
MessageKind.WriteValuesResponse,
cancellationToken);
return [.. resp.Results.Select(r => new WriteResult(r.StatusCode))];
}
// ---- ISubscribable ----
public async Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<SubscribeRequest, SubscribeResponse>(
MessageKind.SubscribeRequest,
new SubscribeRequest
{
SessionId = _sessionId,
TagReferences = [.. fullReferences],
RequestedIntervalMs = (int)publishingInterval.TotalMilliseconds,
},
MessageKind.SubscribeResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host Subscribe failed: {resp.Error}");
return new GalaxySubscriptionHandle(resp.SubscriptionId);
}
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
var client = RequireClient();
var sid = ((GalaxySubscriptionHandle)handle).SubscriptionId;
await client.SendOneWayAsync(
MessageKind.UnsubscribeRequest,
new UnsubscribeRequest { SessionId = _sessionId, SubscriptionId = sid },
cancellationToken);
}
/// <summary>
/// Internal entry point used by the IPC client when the Host pushes an
/// <see cref="MessageKind.OnDataChangeNotification"/> frame. Surfaces it as a managed
/// <see cref="OnDataChange"/> event.
/// </summary>
internal void RaiseDataChange(OnDataChangeNotification notif)
{
var handle = new GalaxySubscriptionHandle(notif.SubscriptionId);
// ISubscribable.OnDataChange fires once per changed attribute — fan out the batch.
foreach (var v in notif.Values)
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, v.TagReference, ToSnapshot(v)));
}
// ---- IAlarmSource ----
public async Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
{
var client = RequireClient();
await client.SendOneWayAsync(
MessageKind.AlarmSubscribeRequest,
new AlarmSubscribeRequest { SessionId = _sessionId },
cancellationToken);
return new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}");
}
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
=> Task.CompletedTask;
public async Task AcknowledgeAsync(
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
{
var client = RequireClient();
foreach (var ack in acknowledgements)
{
await client.SendOneWayAsync(
MessageKind.AlarmAckRequest,
new AlarmAckRequest
{
SessionId = _sessionId,
EventId = ack.ConditionId,
Comment = ack.Comment ?? string.Empty,
},
cancellationToken);
}
}
internal void RaiseAlarmEvent(GalaxyAlarmEvent ev)
{
var handle = new GalaxyAlarmSubscriptionHandle($"alarm-{_sessionId}");
OnAlarmEvent?.Invoke(this, new AlarmEventArgs(
SubscriptionHandle: handle,
SourceNodeId: ev.ObjectTagName,
ConditionId: ev.EventId,
AlarmType: ev.AlarmName,
Message: ev.Message,
Severity: MapSeverity(ev.Severity),
SourceTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(ev.UtcUnixMs).UtcDateTime));
}
// ---- IHistoryProvider ----
public async Task<HistoryReadResult> ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<HistoryReadRequest, HistoryReadResponse>(
MessageKind.HistoryReadRequest,
new HistoryReadRequest
{
SessionId = _sessionId,
TagReferences = [fullReference],
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
MaxValuesPerTag = maxValuesPerNode,
},
MessageKind.HistoryReadResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryRead failed: {resp.Error}");
var first = resp.Tags.FirstOrDefault();
IReadOnlyList<DataValueSnapshot> samples = first is null
? Array.Empty<DataValueSnapshot>()
: [.. first.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public Task<HistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
=> throw new NotSupportedException("Galaxy historian processed reads are not supported in v2; use ReadRawAsync.");
// ---- IRediscoverable ----
/// <summary>
/// Triggered by the IPC client when the Host pushes a deploy-watermark notification
/// (Galaxy <c>time_of_last_deploy</c> changed per decision #54).
/// </summary>
internal void RaiseRediscoveryNeeded(string reason, string? scopeHint = null) =>
OnRediscoveryNeeded?.Invoke(this, new RediscoveryEventArgs(reason, scopeHint));
// ---- IHostConnectivityProbe ----
public IReadOnlyList<Core.Abstractions.HostConnectivityStatus> GetHostStatuses() => _hostStatuses;
internal void OnHostConnectivityUpdate(IpcHostConnectivityStatus update)
{
var translated = new Core.Abstractions.HostConnectivityStatus(
HostName: update.HostName,
State: ParseHostState(update.RuntimeStatus),
LastChangedUtc: DateTimeOffset.FromUnixTimeMilliseconds(update.LastObservedUtcUnixMs).UtcDateTime);
var prior = _hostStatuses.FirstOrDefault(h => h.HostName == translated.HostName);
_hostStatuses = [
.. _hostStatuses.Where(h => h.HostName != translated.HostName),
translated
];
if (prior is null || prior.State != translated.State)
{
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(
translated.HostName, prior?.State ?? HostState.Unknown, translated.State));
}
}
private static HostState ParseHostState(string s) => s switch
{
"Running" => HostState.Running,
"Stopped" => HostState.Stopped,
"Faulted" => HostState.Faulted,
_ => HostState.Unknown,
};
// ---- helpers ----
private GalaxyIpcClient RequireClient() =>
_client ?? throw new InvalidOperationException("Driver not initialized");
private const uint StatusBadInternalError = 0x80020000u;
private static DataValueSnapshot ToSnapshot(GalaxyDataValue v) => new(
Value: v.ValueBytes,
StatusCode: v.StatusCode,
SourceTimestampUtc: v.SourceTimestampUtcUnixMs > 0
? DateTimeOffset.FromUnixTimeMilliseconds(v.SourceTimestampUtcUnixMs).UtcDateTime
: null,
ServerTimestampUtc: DateTimeOffset.FromUnixTimeMilliseconds(v.ServerTimestampUtcUnixMs).UtcDateTime);
private static GalaxyDataValue FromWriteRequest(WriteRequest w) => new()
{
TagReference = w.FullReference,
ValueBytes = MessagePack.MessagePackSerializer.Serialize(w.Value),
ValueMessagePackType = 0,
StatusCode = 0,
SourceTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static DriverDataType MapDataType(int mxDataType) => mxDataType switch
{
0 => DriverDataType.Boolean,
@@ -132,9 +390,27 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
_ => SecurityClassification.FreeAccess,
};
private static AlarmSeverity MapSeverity(int sev) => sev switch
{
<= 250 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 800 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
internal sealed record GalaxySubscriptionHandle(long SubscriptionId) : ISubscriptionHandle
{
public string DiagnosticId => $"galaxy-sub-{SubscriptionId}";
}
internal sealed record GalaxyAlarmSubscriptionHandle(string Id) : IAlarmSubscriptionHandle
{
public string DiagnosticId => Id;
}
public sealed class GalaxyProxyOptions
{
public required string DriverInstanceId { get; init; }

View File

@@ -85,6 +85,18 @@ public sealed class GalaxyIpcClient : IAsyncDisposable
finally { _callGate.Release(); }
}
/// <summary>
/// Fire-and-forget request — used for unsubscribe, alarm-ack, close-session, and other
/// calls where the protocol is one-way. The send is still serialized through the call
/// gate so it doesn't interleave a frame with a concurrent <see cref="CallAsync{TReq, TResp}"/>.
/// </summary>
public async Task SendOneWayAsync<TReq>(MessageKind requestKind, TReq request, CancellationToken ct)
{
await _callGate.WaitAsync(ct);
try { await _writer.WriteAsync(requestKind, request, ct); }
finally { _callGate.Release(); }
}
public async ValueTask DisposeAsync()
{
_callGate.Dispose();

View File

@@ -0,0 +1,191 @@
using System.Security.Principal;
using Serilog;
using Serilog.Core;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
/// <summary>
/// Drives every <see cref="MessageKind"/> through the full IPC stack — Host
/// <see cref="GalaxyFrameHandler"/> backed by <see cref="StubGalaxyBackend"/> on one end,
/// <see cref="GalaxyProxyDriver"/> on the other — to prove the wire protocol, dispatcher,
/// and capability forwarding agree end-to-end. The "stub backend" replies with success for
/// lifecycle/subscribe/recycle and a recognizable "not-implemented" error for the data-plane
/// calls that need the deferred MXAccess lift; the test asserts both shapes.
/// </summary>
[Trait("Category", "Integration")]
public sealed class EndToEndIpcTests
{
private static bool IsAdministrator()
{
if (!OperatingSystem.IsWindows()) return false;
using var identity = WindowsIdentity.GetCurrent();
return new WindowsPrincipal(identity).IsInRole(WindowsBuiltInRole.Administrator);
}
private static (string Pipe, string Secret, SecurityIdentifier Sid) MakeIpcParams() =>
($"OtOpcUaGalaxyE2E-{Guid.NewGuid():N}",
"e2e-secret",
WindowsIdentity.GetCurrent().User!);
private static async Task<(GalaxyProxyDriver Driver, CancellationTokenSource Cts, Task ServerTask, PipeServer Server)>
StartStackAsync()
{
var (pipe, secret, sid) = MakeIpcParams();
Logger log = new LoggerConfiguration().CreateLogger();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
var server = new PipeServer(pipe, sid, secret, log);
var backend = new StubGalaxyBackend();
var handler = new GalaxyFrameHandler(backend, log);
var serverTask = Task.Run(() => server.RunAsync(handler, cts.Token));
var driver = new GalaxyProxyDriver(new GalaxyProxyOptions
{
DriverInstanceId = "gal-e2e",
PipeName = pipe,
SharedSecret = secret,
ConnectTimeout = TimeSpan.FromSeconds(5),
});
await driver.InitializeAsync(driverConfigJson: "{}", cts.Token);
return (driver, cts, serverTask, server);
}
[Fact]
public async Task Initialize_succeeds_via_OpenSession_and_health_goes_Healthy()
{
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
var (driver, cts, serverTask, server) = await StartStackAsync();
try
{
driver.GetHealth().State.ShouldBe(DriverState.Healthy);
}
finally
{
await driver.ShutdownAsync(CancellationToken.None);
cts.Cancel();
try { await serverTask; } catch { /* shutdown */ }
server.Dispose();
driver.Dispose();
}
}
[Fact]
public async Task Read_returns_Bad_status_for_each_requested_reference_until_backend_lifted()
{
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
var (driver, cts, serverTask, server) = await StartStackAsync();
try
{
// Stub backend currently fails the whole batch with a "not-implemented" error;
// the driver surfaces this as InvalidOperationException with the error text.
var ex = await Should.ThrowAsync<InvalidOperationException>(() =>
driver.ReadAsync(["TagA", "TagB"], cts.Token));
ex.Message.ShouldContain("MXAccess code lift pending");
}
finally
{
await driver.ShutdownAsync(CancellationToken.None);
cts.Cancel();
try { await serverTask; } catch { }
server.Dispose();
driver.Dispose();
}
}
[Fact]
public async Task Write_returns_per_tag_BadInternalError_status_until_backend_lifted()
{
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
var (driver, cts, serverTask, server) = await StartStackAsync();
try
{
// Stub backend's WriteValuesAsync returns a per-tag bad status — the proxy
// surfaces those without throwing.
var results = await driver.WriteAsync([new WriteRequest("TagA", 42)], cts.Token);
results.Count.ShouldBe(1);
results[0].StatusCode.ShouldBe(0x80020000u); // Bad_InternalError
}
finally
{
await driver.ShutdownAsync(CancellationToken.None);
cts.Cancel();
try { await serverTask; } catch { }
server.Dispose();
driver.Dispose();
}
}
[Fact]
public async Task Subscribe_returns_handle_then_Unsubscribe_closes_cleanly()
{
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
var (driver, cts, serverTask, server) = await StartStackAsync();
try
{
var handle = await driver.SubscribeAsync(
["TagA"], TimeSpan.FromMilliseconds(500), cts.Token);
handle.DiagnosticId.ShouldStartWith("galaxy-sub-");
await driver.UnsubscribeAsync(handle, cts.Token); // one-way; just verify no throw
}
finally
{
await driver.ShutdownAsync(CancellationToken.None);
cts.Cancel();
try { await serverTask; } catch { }
server.Dispose();
driver.Dispose();
}
}
[Fact]
public async Task SubscribeAlarms_and_Acknowledge_round_trip_without_errors()
{
if (!OperatingSystem.IsWindows() || IsAdministrator()) return;
var (driver, cts, serverTask, server) = await StartStackAsync();
try
{
var handle = await driver.SubscribeAlarmsAsync(["Eq001"], cts.Token);
handle.DiagnosticId.ShouldNotBeNullOrEmpty();
await driver.AcknowledgeAsync(
[new AlarmAcknowledgeRequest("Eq001", "evt-1", "test ack")],
cts.Token);
}
finally
{
await driver.ShutdownAsync(CancellationToken.None);
cts.Cancel();
try { await serverTask; } catch { }
server.Dispose();
driver.Dispose();
}
}
[Fact]
public async Task ReadProcessed_throws_NotSupported_immediately_without_round_trip()
{
// No IPC needed — the proxy short-circuits to NotSupportedException per the v2 design
// (Galaxy Historian only supports raw reads; processed reads are an OPC UA aggregate
// computed by the OPC UA stack, not the driver).
var driver = new GalaxyProxyDriver(new GalaxyProxyOptions
{
DriverInstanceId = "gal-stub", PipeName = "x", SharedSecret = "x",
});
await Should.ThrowAsync<NotSupportedException>(() =>
driver.ReadProcessedAsync("TagA", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow,
TimeSpan.FromMinutes(1), HistoryAggregateType.Average, CancellationToken.None));
}
}