Galaxy IPC unblock — live dev-box E2E path
Three root-cause fixes to get an elevated dev-box shell past session open through to real MXAccess reads: 1. PipeAcl — drop BUILTIN\Administrators deny ACE. UAC's filtered token carries the Admins SID as deny-only, so the deny fired even from non-elevated admin-account shells. The per-connection SID check in PipeServer.VerifyCaller remains the real authorization boundary. 2. PipeServer — swap the Hello-read / VerifyCaller order. ImpersonateNamedPipeClient returns ERROR_CANNOT_IMPERSONATE until at least one frame has been read from the pipe; reading Hello first satisfies that rule. Previously the ACL deny-first path masked this race — removing the deny ACE exposed it. 3. GalaxyIpcClient — add a background reader + single pending-response slot. A RuntimeStatusChange event between OpenSessionRequest and OpenSessionResponse used to satisfy the caller's single ReadFrameAsync and fail CallAsync with "Expected OpenSessionResponse, got RuntimeStatusChange". The reader now routes response kinds (and ErrorResponse) to the pending TCS and everything else to a handler the driver registers in InitializeAsync. The Proxy was already set up to raise managed events from RaiseDataChange / RaiseAlarmEvent / OnHostConnectivityUpdate — those helpers had no caller until now. 4. RedundancyPublisherHostedService — swallow BadServerHalted while polling host.Server.CurrentInstance. StandardServer throws that code during startup rather than returning null, so the first poll attempt crashed the BackgroundService (and the host) before OnServerStarted ran. This race was latent behind the Galaxy init failure above. Updates docs that described the Admins deny ACE + mandatory non-elevated shells, and drops the admin-skip guards from every Galaxy integration + E2E fixture that had them (IpcHandshakeIntegrationTests, EndToEndIpcTests, ParityFixture, LiveStackFixture, HostSubprocessParityTests). Adds GalaxyIpcClientRoutingTests covering the router's request/response match, ErrorResponse, event-between-call, idle event, and peer-close paths. Verified live on the dev box against the p7-smoke cluster (gen 6): driver registered=1 failedInit=0, Phase 7 bridge subscribed, OPC UA server up on 4840, MXAccess read round-trip returns real data with Status=0x00000000. Task #112 — partial: Galaxy live stack is functional end-to-end. The supplied test-galaxy.ps1 script still fails because the UNS walker encodes TagConfig JSON as the tag's NodeId instead of the seeded TagId (pre-existing; separate issue from this commit). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -8,9 +8,18 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
||||
/// <summary>
|
||||
/// Builds the <see cref="PipeSecurity"/> required by <c>driver-stability.md §"IPC Security"</c>:
|
||||
/// only the configured OtOpcUa server principal SID gets <c>ReadWrite | Synchronize</c>;
|
||||
/// LocalSystem and Administrators are explicitly denied. Any other authenticated user falls
|
||||
/// through to the implicit deny.
|
||||
/// LocalSystem is explicitly denied. Any other authenticated user falls through to the
|
||||
/// implicit deny.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Earlier revisions also denied <c>BUILTIN\Administrators</c>, which broke live testing
|
||||
/// on dev boxes where the allowed user (<c>dohertj2</c>) is also a member of the local
|
||||
/// Administrators group — UAC's filtered token still carries the Admins SID as deny-only,
|
||||
/// so the deny ACE fired even from non-elevated shells. The per-connection
|
||||
/// <see cref="PipeServer.VerifyCaller"/> check already gates on the exact allowed SID,
|
||||
/// which is the real authorization boundary, so the Admins deny added no defence in depth
|
||||
/// in that topology.
|
||||
/// </remarks>
|
||||
public static class PipeAcl
|
||||
{
|
||||
public static PipeSecurity Create(SecurityIdentifier allowedSid)
|
||||
@@ -25,12 +34,8 @@ public static class PipeAcl
|
||||
AccessControlType.Allow));
|
||||
|
||||
var localSystem = new SecurityIdentifier(WellKnownSidType.LocalSystemSid, null);
|
||||
var admins = new SecurityIdentifier(WellKnownSidType.BuiltinAdministratorsSid, null);
|
||||
|
||||
if (allowedSid != localSystem)
|
||||
security.AddAccessRule(new PipeAccessRule(localSystem, PipeAccessRights.FullControl, AccessControlType.Deny));
|
||||
if (allowedSid != admins)
|
||||
security.AddAccessRule(new PipeAccessRule(admins, PipeAccessRights.FullControl, AccessControlType.Deny));
|
||||
|
||||
// Owner = allowed SID so the deny rules can't be removed without write-DACL rights.
|
||||
security.SetOwner(allowedSid);
|
||||
|
||||
@@ -56,17 +56,12 @@ public sealed class PipeServer : IDisposable
|
||||
{
|
||||
await _current.WaitForConnectionAsync(linked.Token).ConfigureAwait(false);
|
||||
|
||||
if (!VerifyCaller(_current, out var reason))
|
||||
{
|
||||
_logger.Warning("IPC caller rejected: {Reason}", reason);
|
||||
_current.Disconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
using var reader = new FrameReader(_current, leaveOpen: true);
|
||||
using var writer = new FrameWriter(_current, leaveOpen: true);
|
||||
|
||||
// First frame must be a Hello with the correct shared secret.
|
||||
// First frame must be a Hello with the correct shared secret. Reading it before
|
||||
// the caller-SID impersonation check satisfies Windows' ERROR_CANNOT_IMPERSONATE
|
||||
// rule — ImpersonateNamedPipeClient fails until at least one frame has been read.
|
||||
var first = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
|
||||
if (first is null || first.Value.Kind != MessageKind.Hello)
|
||||
{
|
||||
@@ -74,6 +69,13 @@ public sealed class PipeServer : IDisposable
|
||||
return;
|
||||
}
|
||||
|
||||
if (!VerifyCaller(_current, out var reason))
|
||||
{
|
||||
_logger.Warning("IPC caller rejected: {Reason}", reason);
|
||||
_current.Disconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
var hello = MessagePackSerializer.Deserialize<Hello>(first.Value.Body);
|
||||
if (!string.Equals(hello.SharedSecret, _sharedSecret, StringComparison.Ordinal))
|
||||
{
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using MessagePack;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
|
||||
@@ -48,6 +49,12 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
|
||||
_client = await GalaxyIpcClient.ConnectAsync(
|
||||
options.PipeName, options.SharedSecret, options.ConnectTimeout, cancellationToken);
|
||||
|
||||
// Route Host-pushed event frames to the matching Raise* methods. Must be set BEFORE
|
||||
// the first CallAsync so a RuntimeStatusChange arriving between OpenSessionRequest
|
||||
// and OpenSessionResponse lands on the handler rather than unblocking the call with
|
||||
// the wrong kind.
|
||||
_client.SetEventHandler(DispatchHostEventAsync);
|
||||
|
||||
var resp = await _client.CallAsync<OpenSessionRequest, OpenSessionResponse>(
|
||||
MessageKind.OpenSessionRequest,
|
||||
new OpenSessionRequest { DriverInstanceId = DriverInstanceId, DriverConfigJson = driverConfigJson },
|
||||
@@ -459,6 +466,37 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
|
||||
|
||||
// ---- helpers ----
|
||||
|
||||
/// <summary>
|
||||
/// Event-handler registered with <see cref="GalaxyIpcClient.SetEventHandler"/>. Decodes
|
||||
/// the MessagePack body into the matching wire contract and delegates to the existing
|
||||
/// <c>Raise*</c> helpers. Unknown kinds are silently ignored — the IPC contract is
|
||||
/// append-only, so a newer Host sending a kind this Proxy doesn't recognise shouldn't
|
||||
/// break the session.
|
||||
/// </summary>
|
||||
private Task DispatchHostEventAsync(MessageKind kind, byte[] body)
|
||||
{
|
||||
switch (kind)
|
||||
{
|
||||
case MessageKind.OnDataChangeNotification:
|
||||
RaiseDataChange(MessagePackSerializer.Deserialize<OnDataChangeNotification>(body));
|
||||
break;
|
||||
case MessageKind.AlarmEvent:
|
||||
RaiseAlarmEvent(MessagePackSerializer.Deserialize<GalaxyAlarmEvent>(body));
|
||||
break;
|
||||
case MessageKind.HostConnectivityStatus:
|
||||
OnHostConnectivityUpdate(MessagePackSerializer.Deserialize<IpcHostConnectivityStatus>(body));
|
||||
break;
|
||||
case MessageKind.RuntimeStatusChange:
|
||||
var rsc = MessagePackSerializer.Deserialize<RuntimeStatusChangeNotification>(body);
|
||||
OnHostConnectivityUpdate(rsc.Status);
|
||||
break;
|
||||
// HistorianConnectivityStatus has no consumer on this Proxy today — drop.
|
||||
// Response kinds never reach the event handler; the client routes those to
|
||||
// their pending CallAsync TCS.
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private GalaxyIpcClient RequireClient() =>
|
||||
_client ?? throw new InvalidOperationException("Driver not initialized");
|
||||
|
||||
|
||||
@@ -7,14 +7,35 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Client-side IPC channel to a running <c>Driver.Galaxy.Host</c>. Owns the data-plane pipe
|
||||
/// connection and serializes request/response round-trips. One instance per session.
|
||||
/// connection, serializes request/response round-trips, and routes unsolicited push frames
|
||||
/// (<see cref="MessageKind.OnDataChangeNotification"/>, <see cref="MessageKind.AlarmEvent"/>,
|
||||
/// <see cref="MessageKind.HostConnectivityStatus"/>, <see cref="MessageKind.RuntimeStatusChange"/>,
|
||||
/// <see cref="MessageKind.HistorianConnectivityStatus"/>) to a handler supplied via
|
||||
/// <see cref="SetEventHandler"/>. One instance per session.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// A single background reader task owns the read side of the pipe. Calls are serialized by
|
||||
/// <see cref="_writeGate"/>, so at most one pending response is outstanding at a time — the
|
||||
/// reader uses a single pending-response slot. Any frame that doesn't match the pending
|
||||
/// expected kind (or <see cref="MessageKind.ErrorResponse"/>) is treated as a push event and
|
||||
/// forwarded to the registered handler. Without this router, a push event arriving between
|
||||
/// request and response would satisfy the caller's read and fail the next
|
||||
/// <see cref="CallAsync{TReq, TResp}"/> with an "Expected X, got Y" error.
|
||||
/// </remarks>
|
||||
public sealed class GalaxyIpcClient : IAsyncDisposable
|
||||
{
|
||||
private readonly NamedPipeClientStream _stream;
|
||||
private readonly FrameReader _reader;
|
||||
private readonly FrameWriter _writer;
|
||||
private readonly SemaphoreSlim _callGate = new(1, 1);
|
||||
private readonly SemaphoreSlim _writeGate = new(1, 1);
|
||||
private readonly CancellationTokenSource _readerCts = new();
|
||||
|
||||
private readonly object _pendingLock = new();
|
||||
private TaskCompletionSource<(MessageKind Kind, byte[] Body)>? _pending;
|
||||
private MessageKind _pendingExpected;
|
||||
|
||||
private Task? _readerTask;
|
||||
private Func<MessageKind, byte[], Task>? _eventHandler;
|
||||
|
||||
private GalaxyIpcClient(NamedPipeClientStream stream)
|
||||
{
|
||||
@@ -41,6 +62,9 @@ public sealed class GalaxyIpcClient : IAsyncDisposable
|
||||
await client._writer.WriteAsync(MessageKind.Hello,
|
||||
new Hello { PeerName = "Galaxy.Proxy", SharedSecret = sharedSecret }, ct);
|
||||
|
||||
// Hello/HelloAck is the one round-trip that runs inline before the reader loop
|
||||
// starts — the Host expects its response-side write before accepting any other
|
||||
// frames, so there's no push-event window to worry about here.
|
||||
var ack = await client._reader.ReadFrameAsync(ct);
|
||||
if (ack is null || ack.Value.Kind != MessageKind.HelloAck)
|
||||
throw new InvalidOperationException("Did not receive HelloAck from Galaxy.Host");
|
||||
@@ -49,6 +73,7 @@ public sealed class GalaxyIpcClient : IAsyncDisposable
|
||||
if (!ackMsg.Accepted)
|
||||
throw new UnauthorizedAccessException($"Galaxy.Host rejected Hello: {ackMsg.RejectReason}");
|
||||
|
||||
client._readerTask = Task.Run(() => client.ReadLoopAsync(client._readerCts.Token));
|
||||
return client;
|
||||
}
|
||||
catch
|
||||
@@ -58,50 +83,155 @@ public sealed class GalaxyIpcClient : IAsyncDisposable
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Round-trips a request and returns the first frame of the response.</summary>
|
||||
/// <summary>
|
||||
/// Register a handler that receives unsolicited push frames. Safe to call once per
|
||||
/// session — typically during the driver's <c>InitializeAsync</c> right after
|
||||
/// <see cref="ConnectAsync"/>. The handler is invoked on the reader's thread-pool
|
||||
/// task; it should not block. Exceptions thrown by the handler are swallowed so a
|
||||
/// buggy event subscriber cannot kill the reader loop.
|
||||
/// </summary>
|
||||
public void SetEventHandler(Func<MessageKind, byte[], Task> handler)
|
||||
=> _eventHandler = handler ?? throw new ArgumentNullException(nameof(handler));
|
||||
|
||||
/// <summary>Round-trips a request and returns the deserialized response.</summary>
|
||||
public async Task<TResp> CallAsync<TReq, TResp>(
|
||||
MessageKind requestKind, TReq request, MessageKind expectedResponseKind, CancellationToken ct)
|
||||
{
|
||||
await _callGate.WaitAsync(ct);
|
||||
await _writeGate.WaitAsync(ct);
|
||||
var tcs = new TaskCompletionSource<(MessageKind, byte[])>(
|
||||
TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
try
|
||||
{
|
||||
lock (_pendingLock)
|
||||
{
|
||||
if (_pending is not null)
|
||||
throw new InvalidOperationException(
|
||||
"GalaxyIpcClient pending-response slot is not empty — call re-entry is a bug");
|
||||
_pending = tcs;
|
||||
_pendingExpected = expectedResponseKind;
|
||||
}
|
||||
|
||||
await _writer.WriteAsync(requestKind, request, ct);
|
||||
|
||||
var frame = await _reader.ReadFrameAsync(ct);
|
||||
if (frame is null) throw new EndOfStreamException("IPC peer closed before response");
|
||||
using var reg = ct.Register(static s =>
|
||||
((TaskCompletionSource<(MessageKind, byte[])>)s!).TrySetCanceled(), tcs);
|
||||
var frame = await tcs.Task.ConfigureAwait(false);
|
||||
|
||||
if (frame.Value.Kind == MessageKind.ErrorResponse)
|
||||
if (frame.Item1 == MessageKind.ErrorResponse)
|
||||
{
|
||||
var err = MessagePackSerializer.Deserialize<ErrorResponse>(frame.Value.Body);
|
||||
var err = MessagePackSerializer.Deserialize<ErrorResponse>(frame.Item2);
|
||||
throw new GalaxyIpcException(err.Code, err.Message);
|
||||
}
|
||||
|
||||
if (frame.Value.Kind != expectedResponseKind)
|
||||
throw new InvalidOperationException(
|
||||
$"Expected {expectedResponseKind}, got {frame.Value.Kind}");
|
||||
|
||||
return MessagePackSerializer.Deserialize<TResp>(frame.Value.Body);
|
||||
return MessagePackSerializer.Deserialize<TResp>(frame.Item2);
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock (_pendingLock)
|
||||
{
|
||||
if (ReferenceEquals(_pending, tcs)) _pending = null;
|
||||
}
|
||||
_writeGate.Release();
|
||||
}
|
||||
finally { _callGate.Release(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fire-and-forget request — used for unsubscribe, alarm-ack, close-session, and other
|
||||
/// calls where the protocol is one-way. The send is still serialized through the call
|
||||
/// calls where the protocol is one-way. The send is still serialized through the write
|
||||
/// gate so it doesn't interleave a frame with a concurrent <see cref="CallAsync{TReq, TResp}"/>.
|
||||
/// </summary>
|
||||
public async Task SendOneWayAsync<TReq>(MessageKind requestKind, TReq request, CancellationToken ct)
|
||||
{
|
||||
await _callGate.WaitAsync(ct);
|
||||
await _writeGate.WaitAsync(ct);
|
||||
try { await _writer.WriteAsync(requestKind, request, ct); }
|
||||
finally { _callGate.Release(); }
|
||||
finally { _writeGate.Release(); }
|
||||
}
|
||||
|
||||
private async Task ReadLoopAsync(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
(MessageKind Kind, byte[] Body)? frame;
|
||||
try
|
||||
{
|
||||
var read = await _reader.ReadFrameAsync(ct).ConfigureAwait(false);
|
||||
frame = read is null ? null : (read.Value.Kind, read.Value.Body);
|
||||
}
|
||||
catch (OperationCanceledException) { break; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
FailPending(ex);
|
||||
break;
|
||||
}
|
||||
|
||||
if (frame is null)
|
||||
{
|
||||
FailPending(new EndOfStreamException("IPC peer closed the pipe"));
|
||||
break;
|
||||
}
|
||||
|
||||
// Route: response-ish frame to pending TCS if one is waiting, else treat as event.
|
||||
// ErrorResponse always terminates a pending call — that's the Host signalling a
|
||||
// request-scoped failure. Unsolicited ErrorResponse with no pending call shouldn't
|
||||
// happen under a well-formed protocol; if it does, we drop it to the event channel
|
||||
// so it shows up in logs rather than deadlocking the next CallAsync.
|
||||
TaskCompletionSource<(MessageKind, byte[])>? pendingTcs = null;
|
||||
lock (_pendingLock)
|
||||
{
|
||||
if (_pending is not null && (frame.Value.Kind == _pendingExpected
|
||||
|| frame.Value.Kind == MessageKind.ErrorResponse))
|
||||
{
|
||||
pendingTcs = _pending;
|
||||
_pending = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (pendingTcs is not null)
|
||||
{
|
||||
pendingTcs.TrySetResult(frame.Value);
|
||||
continue;
|
||||
}
|
||||
|
||||
var handler = _eventHandler;
|
||||
if (handler is null) continue;
|
||||
|
||||
try { await handler(frame.Value.Kind, frame.Value.Body).ConfigureAwait(false); }
|
||||
catch
|
||||
{
|
||||
// A buggy subscriber must not kill the reader. The handler is expected to
|
||||
// do its own logging; swallowing here keeps the channel alive for the next
|
||||
// frame + the next CallAsync.
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Any still-pending call after the loop exits would otherwise hang forever.
|
||||
FailPending(new EndOfStreamException("IPC reader loop exited"));
|
||||
}
|
||||
}
|
||||
|
||||
private void FailPending(Exception ex)
|
||||
{
|
||||
TaskCompletionSource<(MessageKind, byte[])>? tcs;
|
||||
lock (_pendingLock) { tcs = _pending; _pending = null; }
|
||||
tcs?.TrySetException(ex);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
_callGate.Dispose();
|
||||
_readerCts.Cancel();
|
||||
if (_readerTask is not null)
|
||||
{
|
||||
try { await _readerTask.ConfigureAwait(false); } catch { /* shutdown */ }
|
||||
}
|
||||
|
||||
_writeGate.Dispose();
|
||||
_reader.Dispose();
|
||||
_writer.Dispose();
|
||||
_readerCts.Dispose();
|
||||
await _stream.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,10 +91,21 @@ public sealed class RedundancyPublisherHostedService(
|
||||
// Bounded retry so a genuine failure to start doesn't pin the hosted service forever.
|
||||
// 60s is generous — production boot is ~2s on this box; cert PKI + certificate-creation
|
||||
// cases have been observed to take up to 15s cold.
|
||||
//
|
||||
// StandardServer.CurrentInstance throws BadServerHalted before OnServerStarted has run,
|
||||
// rather than returning null, so we catch that specifically and retry. Other
|
||||
// ServiceResultException codes (e.g. BadInternalError) are still propagated — a true
|
||||
// boot failure shouldn't look like "not ready yet".
|
||||
var deadline = DateTime.UtcNow.AddSeconds(60);
|
||||
while (!ct.IsCancellationRequested && DateTime.UtcNow < deadline)
|
||||
{
|
||||
var serverInternal = host.Server?.CurrentInstance;
|
||||
Opc.Ua.Server.IServerInternal? serverInternal = null;
|
||||
try { serverInternal = host.Server?.CurrentInstance; }
|
||||
catch (Opc.Ua.ServiceResultException ex) when (ex.StatusCode == Opc.Ua.StatusCodes.BadServerHalted)
|
||||
{
|
||||
// Server is mid-startup — keep polling.
|
||||
}
|
||||
|
||||
if (serverInternal?.ServerObject is not null)
|
||||
{
|
||||
var writerLogger = loggerFactory.CreateLogger<ServerRedundancyNodeWriter>();
|
||||
|
||||
Reference in New Issue
Block a user