using System.Collections.Concurrent; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Text; using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ZB.MOM.WW.MxGateway.Contracts; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Security.Authentication; using ZB.MOM.WW.MxGateway.Server.Security.Authorization; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; using ZB.MOM.WW.MxGateway.IntegrationTests.TestSupport; using Xunit.Abstractions; namespace ZB.MOM.WW.MxGateway.IntegrationTests; [Collection(LiveResourcesCollection.Name)] [Trait("Category", "LiveMxAccess")] public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output) { private static readonly TimeSpan CommandTimeout = TimeSpan.FromSeconds(15); private static readonly TimeSpan StreamShutdownTimeout = TimeSpan.FromSeconds(10); /// /// Verifies that a gateway session can register, add item, advise, and stream events from live MXAccess. /// [LiveMxAccessFact] public async Task GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses() { string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); Assert.True( File.Exists(workerExecutablePath), $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); TestWorkerProcessFactory processFactory = new(output); await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output); using RecordingServerStreamWriter eventWriter = new(); string? sessionId = null; Task? streamTask = null; using CancellationTokenSource streamCancellation = new(); try { OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "live-mxaccess-smoke", ClientCorrelationId = "live-open", CommandTimeout = Duration.FromTimeSpan(CommandTimeout), }, new TestServerCallContext()).ConfigureAwait(false); sessionId = openReply.SessionId; output.WriteLine($"OpenSession status={openReply.ProtocolStatus.Code} session={sessionId} worker_pid={openReply.WorkerProcessId}"); Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); Assert.True(openReply.WorkerProcessId > 0); streamTask = fixture.Service.StreamEvents( new StreamEventsRequest { SessionId = sessionId }, eventWriter, new TestServerCallContext(streamCancellation.Token)); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(sessionId), new TestServerCallContext()).ConfigureAwait(false); LogReply("Register", registerReply); Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); Assert.True(registerReply.Register.ServerHandle > 0); MxCommandReply addItemReply = await fixture.Service.Invoke( CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("AddItem", addItemReply); Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); Assert.True(addItemReply.AddItem.ItemHandle > 0); MxCommandReply adviseReply = await fixture.Service.Invoke( CreateAdviseRequest( sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("Advise", adviseReply); Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); // A live MXAccess provider can deliver an initial registration-state // or bad-quality bootstrap event before the OnDataChange the worker // is contracted to emit. Match on the family rather than trusting // whatever event arrives first so a genuine ordering defect cannot // pass spuriously or leave a later wrong event unverified. MxEvent dataChange = await eventWriter .WaitForMessageAsync( candidate => candidate.Family == MxEventFamily.OnDataChange, IntegrationTestEnvironment.LiveMxAccessEventTimeout, streamCancellation.Token) .ConfigureAwait(false); LogEvent(dataChange); Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family); Assert.Equal(sessionId, dataChange.SessionId); Assert.Equal(registerReply.Register.ServerHandle, dataChange.ServerHandle); Assert.Equal(addItemReply.AddItem.ItemHandle, dataChange.ItemHandle); } finally { await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false); } } /// /// Verifies that a Write command round-trips through live MXAccess against an advised item /// and that the worker emits a matching event /// — the proof of round-trip the cross-language client e2e runner relies on. /// [LiveMxAccessFact] public async Task GatewaySession_WithLiveWorker_WritesValueToAdvisedItem() { string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); Assert.True( File.Exists(workerExecutablePath), $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); TestWorkerProcessFactory processFactory = new(output); await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output); using RecordingServerStreamWriter eventWriter = new(); string? sessionId = null; Task? streamTask = null; using CancellationTokenSource streamCancellation = new(); try { OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "live-mxaccess-write", ClientCorrelationId = "live-open-write", CommandTimeout = Duration.FromTimeSpan(CommandTimeout), }, new TestServerCallContext()).ConfigureAwait(false); sessionId = openReply.SessionId; Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); streamTask = fixture.Service.StreamEvents( new StreamEventsRequest { SessionId = sessionId }, eventWriter, new TestServerCallContext(streamCancellation.Token)); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(sessionId), new TestServerCallContext()).ConfigureAwait(false); LogReply("Register", registerReply); Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); MxCommandReply addItemReply = await fixture.Service.Invoke( CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("AddItem", addItemReply); Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); Assert.True(addItemReply.AddItem.ItemHandle > 0); MxCommandReply adviseReply = await fixture.Service.Invoke( CreateAdviseRequest( sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("Advise", adviseReply); Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); MxCommandReply writeReply = await fixture.Service.Invoke( CreateWriteRequest( sessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("Write", writeReply); // Happy-path Write: the worker COM call succeeded so HResultConverter // produces ProtocolStatusCode.Ok. An MXAccess rejection (a write to a // bad item, a secured-item failure) would surface as // ProtocolStatusCode.MxaccessFailure with a non-zero hresult — never // as an RpcException / transport fault, because the command still // completed its round-trip to the worker and back. Assert.Equal(ProtocolStatusCode.Ok, writeReply.ProtocolStatus.Code); Assert.Equal(MxCommandKind.Write, writeReply.Kind); // Proof of round-trip: MXAccess fires OnWriteComplete (event id 2) // after the underlying provider acknowledges the write — that is // the event the cross-language client e2e runner asserts on. We // scan the recorded stream (so an interleaving OnDataChange does // not preempt the match) for an OnWriteComplete carrying the same // server/item handles the Write command targeted. MxEvent writeComplete = await eventWriter .WaitForMessageAsync( candidate => candidate.Family == MxEventFamily.OnWriteComplete && candidate.ServerHandle == registerReply.Register.ServerHandle && candidate.ItemHandle == addItemReply.AddItem.ItemHandle, IntegrationTestEnvironment.LiveMxAccessEventTimeout, streamCancellation.Token) .ConfigureAwait(false); LogEvent(writeComplete); Assert.Equal(MxEventFamily.OnWriteComplete, writeComplete.Family); Assert.Equal(sessionId, writeComplete.SessionId); Assert.Equal(registerReply.Register.ServerHandle, writeComplete.ServerHandle); Assert.Equal(addItemReply.AddItem.ItemHandle, writeComplete.ItemHandle); // The stream task must not be in a faulted state. ShutDownAsync's // broad catch would otherwise swallow the fault and silently let // this Write-parity coverage pass against a broken event pipeline. Assert.False( streamTask.IsFaulted, streamTask.Exception?.ToString() ?? "Event stream task faulted without an exception."); } finally { // Cancel the stream call before draining so StreamEvents observes // cancellation rather than blocking on the channel. Any unhandled // stream-task fault is rethrown from ShutDownAsync into the test. streamCancellation.Cancel(); await ShutDownAsync(fixture, processFactory, sessionId, streamTask, propagateStreamFaults: true).ConfigureAwait(false); } } /// /// Verifies that an AddItem against an invalid server handle surfaces the MXAccess failure /// without faulting the gateway transport, exercising the invalid-handle parity path. /// [LiveMxAccessFact] public async Task GatewaySession_WithLiveWorker_InvalidHandleCommand_SurfacesFailureWithoutTransportFault() { string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); Assert.True( File.Exists(workerExecutablePath), $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); TestWorkerProcessFactory processFactory = new(output); await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output); string? sessionId = null; try { OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "live-mxaccess-invalid-handle", ClientCorrelationId = "live-open-invalid", CommandTimeout = Duration.FromTimeSpan(CommandTimeout), }, new TestServerCallContext()).ConfigureAwait(false); sessionId = openReply.SessionId; Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); // Deliberately skip Register: server handle 0x7FFFFFFF was never // issued by MXAccess. The worker must invoke COM and relay the // invalid-handle failure rather than the gateway short-circuiting. MxCommandReply addItemReply = await fixture.Service.Invoke( CreateAddItemRequest(sessionId, serverHandle: int.MaxValue), new TestServerCallContext()).ConfigureAwait(false); LogReply("AddItem(invalid-handle)", addItemReply); // MXAccess parity: an invalid handle is an MXAccess-level failure. // The command still completed its worker round-trip, so the gateway // must reply with ProtocolStatusCode.MxaccessFailure and a non-zero // hresult carrying the COM failure (per HResultConverter) — never a // gRPC transport fault. The assertion below just checks the status // is not Ok; the failure detail lives in hresult / the status proxies. Assert.NotEqual(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); Assert.True( addItemReply.AddItem is null || addItemReply.AddItem.ItemHandle <= 0, "Invalid-handle AddItem must not yield a usable item handle."); } finally { await ShutDownAsync(fixture, processFactory, sessionId, streamTask: null).ConfigureAwait(false); } } /// /// Verifies the MXAccess teardown chain: Unadvise then RemoveItem then Unregister /// each return , and the worker stops emitting /// OnDataChange events for the un-advised item. Exercises the lifecycle-ordering /// parity CLAUDE.md singles out as a "do not synthesize" rule. /// [LiveMxAccessFact] public async Task GatewaySession_WithLiveWorker_UnadviseRemoveItemUnregister_TeardownOrderingParity() { string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); Assert.True( File.Exists(workerExecutablePath), $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); TestWorkerProcessFactory processFactory = new(output); await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output); using RecordingServerStreamWriter eventWriter = new(); string? sessionId = null; Task? streamTask = null; using CancellationTokenSource streamCancellation = new(); try { OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "live-mxaccess-teardown", ClientCorrelationId = "live-open-teardown", CommandTimeout = Duration.FromTimeSpan(CommandTimeout), }, new TestServerCallContext()).ConfigureAwait(false); sessionId = openReply.SessionId; Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); streamTask = fixture.Service.StreamEvents( new StreamEventsRequest { SessionId = sessionId }, eventWriter, new TestServerCallContext(streamCancellation.Token)); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(sessionId), new TestServerCallContext()).ConfigureAwait(false); LogReply("Register", registerReply); Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); int serverHandle = registerReply.Register.ServerHandle; MxCommandReply addItemReply = await fixture.Service.Invoke( CreateAddItemRequest(sessionId, serverHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("AddItem", addItemReply); Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); int itemHandle = addItemReply.AddItem.ItemHandle; MxCommandReply adviseReply = await fixture.Service.Invoke( CreateAdviseRequest(sessionId, serverHandle, itemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("Advise", adviseReply); Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); // Wait for an OnDataChange to prove the subscription is live before tearing it down. MxEvent firstDataChange = await eventWriter .WaitForMessageAsync( candidate => candidate.Family == MxEventFamily.OnDataChange && candidate.ServerHandle == serverHandle && candidate.ItemHandle == itemHandle, IntegrationTestEnvironment.LiveMxAccessEventTimeout, streamCancellation.Token) .ConfigureAwait(false); LogEvent(firstDataChange); // 1) UnAdvise — must reply Ok; the worker must stop emitting OnDataChange // for this (server, item) pair after this returns. MxCommandReply unadviseReply = await fixture.Service.Invoke( CreateUnAdviseRequest(sessionId, serverHandle, itemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("UnAdvise", unadviseReply); Assert.Equal(ProtocolStatusCode.Ok, unadviseReply.ProtocolStatus.Code); Assert.Equal(MxCommandKind.UnAdvise, unadviseReply.Kind); // 2) RemoveItem — must reply Ok against the same handles. MxCommandReply removeItemReply = await fixture.Service.Invoke( CreateRemoveItemRequest(sessionId, serverHandle, itemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("RemoveItem", removeItemReply); Assert.Equal(ProtocolStatusCode.Ok, removeItemReply.ProtocolStatus.Code); Assert.Equal(MxCommandKind.RemoveItem, removeItemReply.Kind); // 3) Unregister — closes the client session inside the worker. MxCommandReply unregisterReply = await fixture.Service.Invoke( CreateUnregisterRequest(sessionId, serverHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("Unregister", unregisterReply); Assert.Equal(ProtocolStatusCode.Ok, unregisterReply.ProtocolStatus.Code); Assert.Equal(MxCommandKind.Unregister, unregisterReply.Kind); // Parity rule: after UnAdvise returns Ok the worker must stop emitting // OnDataChange for this (server, item) pair. Events the provider already // published before that ack are in-flight and not a regression — the rule // only constrains events generated AFTER the teardown returned. So the // "before" baseline is taken *after* a first settle window drains those // in-flight events, not before UnAdvise was issued (which races against // the round-trip + STA dispatch + pipe send window — see IntegrationTests-017). // // RecordingServerStreamWriter.Messages returns a snapshot copy under its // own lock, so iterating after each settle window is safe without external // sync. await Task.Delay(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); int dataChangeCountAfterFirstSettle = CountMatchingEvents( eventWriter, e => e.Family == MxEventFamily.OnDataChange && e.ServerHandle == serverHandle && e.ItemHandle == itemHandle); await Task.Delay(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); int dataChangeCountAfterSecondSettle = CountMatchingEvents( eventWriter, e => e.Family == MxEventFamily.OnDataChange && e.ServerHandle == serverHandle && e.ItemHandle == itemHandle); output.WriteLine( $"DataChange count after first settle={dataChangeCountAfterFirstSettle} after second settle={dataChangeCountAfterSecondSettle}"); Assert.Equal(dataChangeCountAfterFirstSettle, dataChangeCountAfterSecondSettle); // A RemoveItem against the just-freed item handle must not silently succeed — // the worker has to relay MXAccess's invalid-handle response. Closing the // session is enough for parity, but we sanity-check that re-using the freed // pair does not accidentally appear Ok. MxCommandReply secondRemoveItemReply = await fixture.Service.Invoke( CreateRemoveItemRequest(sessionId, serverHandle, itemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("RemoveItem(stale)", secondRemoveItemReply); Assert.NotEqual(ProtocolStatusCode.Ok, secondRemoveItemReply.ProtocolStatus.Code); } finally { streamCancellation.Cancel(); await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false); } } /// /// Verifies the MXAccess WriteSecured path: AuthenticateUser resolves a /// user id, then WriteSecured against the advised item completes its round-trip /// to the worker and back. CLAUDE.md singles out WriteSecured ordering as a /// parity surface the gateway must not "fix" — the test asserts the reply kind and /// protocol status, not a fabricated outcome. /// [LiveMxAccessFact] public async Task GatewaySession_WithLiveWorker_WriteSecured_AuthenticatedRoundTripParity() { string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); Assert.True( File.Exists(workerExecutablePath), $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); // IntegrationTests-019: CLAUDE.md's credential-redaction rule covers every log // surface the test sees, not just the reply's DiagnosticMessage. Wire a buffering // wrapper around output and route the worker stdout/stderr echo and the gateway // ILogger sink through it so the post-run assertion covers the accumulated test // output. A regression that logged the request body, the WorkerCommandRequest // envelope, or printed the credential from inside the worker is caught here // even if the bare DiagnosticMessage check still passes. RecordingTestOutputHelper recordedOutput = new(output); TestWorkerProcessFactory processFactory = new(recordedOutput); await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, recordedOutput); // Stream events so a regression that emitted an OperationComplete or // OnWriteComplete with wrong handles would still be observable via the test // output (we don't assert a specific event here — the docs note successful // writes raise only OnWriteComplete, but WriteSecured against an unprotected // item commonly fails with 0x80004021 in this provider, which raises no event). using RecordingServerStreamWriter eventWriter = new(); string? sessionId = null; Task? streamTask = null; using CancellationTokenSource streamCancellation = new(); (string verifyUser, string verifyPassword) = ResolveLiveMxAccessSecuredCredentials(); try { OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "live-mxaccess-write-secured", ClientCorrelationId = "live-open-write-secured", CommandTimeout = Duration.FromTimeSpan(CommandTimeout), }, new TestServerCallContext()).ConfigureAwait(false); sessionId = openReply.SessionId; Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); streamTask = fixture.Service.StreamEvents( new StreamEventsRequest { SessionId = sessionId }, eventWriter, new TestServerCallContext(streamCancellation.Token)); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(sessionId), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "Register", registerReply); Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); int serverHandle = registerReply.Register.ServerHandle; MxCommandReply addItemReply = await fixture.Service.Invoke( CreateAddItemRequest(sessionId, serverHandle), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "AddItem", addItemReply); Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); int itemHandle = addItemReply.AddItem.ItemHandle; MxCommandReply adviseReply = await fixture.Service.Invoke( CreateAdviseRequest(sessionId, serverHandle, itemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "Advise", adviseReply); Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); // AuthenticateUser resolves an ArchestrA user id for the WriteSecured call. // Credentials are env-overridable so the test honors the gateway's "do not // log secrets" rule and works against either MXAccess's own user store or // the LmxOpcUa-baseline GLAuth-bridged ArchestrA identity (admin/admin123). MxCommandReply authReply = await fixture.Service.Invoke( CreateAuthenticateUserRequest(sessionId, serverHandle, verifyUser, verifyPassword), new TestServerCallContext()).ConfigureAwait(false); recordedOutput.WriteLine( $"AuthenticateUser status={authReply.ProtocolStatus.Code} hresult={authReply.Hresult} user_id={authReply.AuthenticateUser?.UserId}"); // AuthenticateUser is allowed to fail (the underlying provider may reject // the credential pair); we use the returned user id if non-zero and fall // back to 0 ("operator only" / no verifier) so the parity assertion holds. int currentUserId = authReply.ProtocolStatus.Code == ProtocolStatusCode.Ok && authReply.AuthenticateUser is not null && authReply.AuthenticateUser.UserId != 0 ? authReply.AuthenticateUser.UserId : 0; MxCommandReply writeSecuredReply = await fixture.Service.Invoke( CreateWriteSecuredRequest( sessionId, serverHandle, itemHandle, currentUserId, verifierUserId: 0), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "WriteSecured", writeSecuredReply); // Parity: the command itself completed its round-trip — the reply kind is // WriteSecured and the gateway protocol status is set. The MXAccess outcome // (Ok for an unprotected provider, MxaccessFailure with hresult 0x80004021 // when the item is not WriteSecured-eligible) lives in protocol_status + // hresult, never as a transport fault. The diagnostic message must never // contain the credential. Assert.Equal(MxCommandKind.WriteSecured, writeSecuredReply.Kind); Assert.True( writeSecuredReply.ProtocolStatus.Code is ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure, $"Unexpected WriteSecured protocol status {writeSecuredReply.ProtocolStatus.Code}."); Assert.DoesNotContain(verifyPassword, writeSecuredReply.DiagnosticMessage ?? string.Empty, StringComparison.Ordinal); } finally { streamCancellation.Cancel(); await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false); } // CLAUDE.md credential contract: passwords and WriteSecured payloads must never // reach logs. The buffered output covers the gateway ILogger sink, worker // stdout/stderr, and every direct WriteLine the test body issued. A regression // that dumped the request envelope, the AuthenticateUserCommand body, or any // command-level WriteSecured payload would land here and trip this assertion. Assert.DoesNotContain(verifyPassword, recordedOutput.Captured, StringComparison.Ordinal); } /// /// B8 live verification of the COM commands the B-bundle added against a fake /// IMxAccessServer: AuthenticateUser, ArchestrAUserToId, Suspend, /// and Activate. The contract being proven is that each command round-trips /// to the worker and back carrying a real MXAccess outcome (Ok / an MxStatusProxy / /// a non-zero HResult) and is NOT short-circuited to INVALID_REQUEST the way an /// unimplemented command would be. MXAccess-level rejections (a wrong item class for /// Suspend/Activate commonly returns 0x80070057) are parity, not test failures — we /// assert the reply kind plus a non-INVALID_REQUEST protocol status, and log the /// HResult for the record. /// [LiveMxAccessFact] public async Task GatewaySession_WithLiveWorker_NewComCommands_RoundTripWithRealReplies() { string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); Assert.True( File.Exists(workerExecutablePath), $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); // Credential-redaction: AuthenticateUser carries a password. Route every test // log surface through the buffering helper so the post-run assertion proves the // password never reached the gateway logger, worker stdout/stderr, or any // WriteLine the test body issued (same pattern as the WriteSecured parity test). RecordingTestOutputHelper recordedOutput = new(output); TestWorkerProcessFactory processFactory = new(recordedOutput); await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, recordedOutput); string? sessionId = null; (string verifyUser, string verifyPassword) = ResolveLiveMxAccessSecuredCredentials(); try { OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "live-mxaccess-new-com-commands", ClientCorrelationId = "live-open-new-com", CommandTimeout = Duration.FromTimeSpan(CommandTimeout), }, new TestServerCallContext()).ConfigureAwait(false); sessionId = openReply.SessionId; Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(sessionId), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "Register", registerReply); Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); int serverHandle = registerReply.Register.ServerHandle; MxCommandReply addItemReply = await fixture.Service.Invoke( CreateAddItemRequest(sessionId, serverHandle), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "AddItem", addItemReply); Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); int itemHandle = addItemReply.AddItem.ItemHandle; // AuthenticateUser — the B-bundle command under live verification. Before the // B-bundle this command was unimplemented and the worker short-circuited it to // INVALID_REQUEST. It must now produce a real reply (Ok with a user id when the // provider accepts the credential, or a real MXAccess HResult when it does not). MxCommandReply authReply = await fixture.Service.Invoke( CreateAuthenticateUserRequest(sessionId, serverHandle, verifyUser, verifyPassword), new TestServerCallContext()).ConfigureAwait(false); recordedOutput.WriteLine( $"AuthenticateUser status={authReply.ProtocolStatus.Code} hresult={authReply.Hresult} user_id={authReply.AuthenticateUser?.UserId}"); Assert.Equal(MxCommandKind.AuthenticateUser, authReply.Kind); Assert.NotEqual(ProtocolStatusCode.InvalidRequest, authReply.ProtocolStatus.Code); Assert.True( authReply.ProtocolStatus.Code is ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure, $"AuthenticateUser must surface a real MXAccess outcome, got {authReply.ProtocolStatus.Code}."); int authenticatedUserId = authReply.ProtocolStatus.Code == ProtocolStatusCode.Ok && authReply.AuthenticateUser is not null ? authReply.AuthenticateUser.UserId : 0; if (authReply.ProtocolStatus.Code == ProtocolStatusCode.Ok) { // On the dev rig AuthenticateUser("Administrator","") resolves to user id 1. // Don't pin the exact value (provider/user-store dependent) — just prove a // success carried a usable, non-zero ArchestrA user id through the reply. Assert.NotEqual(0, authenticatedUserId); } // ArchestrAUserToId — resolves an ArchestrA user GUID to an integer user id. // We feed an empty/placeholder GUID: the value is provider-dependent, so the // assertion is the parity one (real reply, never INVALID_REQUEST). A non-zero // HResult here is the expected MXAccess rejection of an unknown GUID. MxCommandReply userToIdReply = await fixture.Service.Invoke( CreateArchestrAUserToIdRequest(sessionId, serverHandle, userIdGuid: string.Empty), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "ArchestrAUserToId", userToIdReply); recordedOutput.WriteLine($"ArchestrAUserToId user_id={userToIdReply.ArchestraUserToId?.UserId}"); Assert.Equal(MxCommandKind.ArchestraUserToId, userToIdReply.Kind); Assert.NotEqual(ProtocolStatusCode.InvalidRequest, userToIdReply.ProtocolStatus.Code); Assert.True( userToIdReply.ProtocolStatus.Code is ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure, $"ArchestrAUserToId must surface a real MXAccess outcome, got {userToIdReply.ProtocolStatus.Code}."); // Suspend / Activate against the advised item. The dev-rig TestInt item class // may not be suspendable (MXAccess returns 0x80070057 / E_INVALIDARG for a // wrong item class — see B8 notes). That is MXAccess parity: assert the reply // kind and a non-INVALID_REQUEST status, surface the HResult and MxStatusProxy // for the record, and do NOT treat a provider-side rejection as a test failure. MxCommandReply suspendReply = await fixture.Service.Invoke( CreateSuspendRequest(sessionId, serverHandle, itemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "Suspend", suspendReply); recordedOutput.WriteLine( $"Suspend status_proxy success={suspendReply.Suspend?.Status?.Success} hresult=0x{(uint)suspendReply.Hresult:X8}"); Assert.Equal(MxCommandKind.Suspend, suspendReply.Kind); Assert.NotEqual(ProtocolStatusCode.InvalidRequest, suspendReply.ProtocolStatus.Code); Assert.True( suspendReply.ProtocolStatus.Code is ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure, $"Suspend must surface a real MXAccess outcome, got {suspendReply.ProtocolStatus.Code}."); MxCommandReply activateReply = await fixture.Service.Invoke( CreateActivateRequest(sessionId, serverHandle, itemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReplyTo(recordedOutput, "Activate", activateReply); recordedOutput.WriteLine( $"Activate status_proxy success={activateReply.Activate?.Status?.Success} hresult=0x{(uint)activateReply.Hresult:X8}"); Assert.Equal(MxCommandKind.Activate, activateReply.Kind); Assert.NotEqual(ProtocolStatusCode.InvalidRequest, activateReply.ProtocolStatus.Code); Assert.True( activateReply.ProtocolStatus.Code is ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure, $"Activate must surface a real MXAccess outcome, got {activateReply.ProtocolStatus.Code}."); } finally { await ShutDownAsync(fixture, processFactory, sessionId, streamTask: null).ConfigureAwait(false); } // Credential contract: the AuthenticateUser password must never reach any log // surface (gateway logger, worker stdout/stderr, or test WriteLine). Assert.DoesNotContain(verifyPassword, recordedOutput.Captured, StringComparison.Ordinal); } /// /// B8 §3.2 buffered-data path: adds a BUFFERED item (AddBufferedItem), sets the /// buffered update interval (SetBufferedUpdateInterval), advises it, then attempts /// to observe an event carrying multiple /// samples so the worker's multi-sample conversion (VariantConverter → /// OnBufferedDataChangeEvent quality/timestamp arrays) can be validated live. /// /// The AddBufferedItem + SetBufferedUpdateInterval round-trips are asserted unconditionally /// (they are the B-bundle commands under verification). The buffered EVENT capture is /// best-effort: if the rig's object logic does not drive a buffered batch within the live /// event timeout (the same environmental limitation seen with the externally-undrivable /// alarm rig), the test records the buffered conversion as an unverified residual rather /// than failing — the command path is proven, the live multi-sample conversion is not. /// When a batch IS captured, the converted value and quality/timestamp arrays are asserted /// to be non-empty and internally consistent (no crash, no dropped payload). /// /// [LiveMxAccessFact] public async Task GatewaySession_WithLiveWorker_BufferedItem_AddsSetsIntervalAndAttemptsCapture() { string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); Assert.True( File.Exists(workerExecutablePath), $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); TestWorkerProcessFactory processFactory = new(output); await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output); using RecordingServerStreamWriter eventWriter = new(); string? sessionId = null; Task? streamTask = null; using CancellationTokenSource streamCancellation = new(); // AddBufferedItem takes (item_definition, item_context) like AddItem2. The dev rig // exposes TestChildObject.TestInt; the buffered form is item="TestInt", // context="TestChildObject" (per B8 notes). Split the configured live item so a // custom MXGATEWAY_LIVE_MXACCESS_ITEM override still works. (string bufferedItem, string bufferedContext) = SplitLiveItemForBuffered(IntegrationTestEnvironment.LiveMxAccessItem); try { OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "live-mxaccess-buffered", ClientCorrelationId = "live-open-buffered", CommandTimeout = Duration.FromTimeSpan(CommandTimeout), }, new TestServerCallContext()).ConfigureAwait(false); sessionId = openReply.SessionId; Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); streamTask = fixture.Service.StreamEvents( new StreamEventsRequest { SessionId = sessionId }, eventWriter, new TestServerCallContext(streamCancellation.Token)); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(sessionId), new TestServerCallContext()).ConfigureAwait(false); LogReply("Register", registerReply); Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); int serverHandle = registerReply.Register.ServerHandle; // SetBufferedUpdateInterval first so the buffered cadence is established before // the item is added/advised. MXAccess rounds to 100ms units and rejects < 1. MxCommandReply intervalReply = await fixture.Service.Invoke( CreateSetBufferedUpdateIntervalRequest(sessionId, serverHandle, updateIntervalMilliseconds: 1000), new TestServerCallContext()).ConfigureAwait(false); LogReply("SetBufferedUpdateInterval", intervalReply); Assert.Equal(MxCommandKind.SetBufferedUpdateInterval, intervalReply.Kind); Assert.NotEqual(ProtocolStatusCode.InvalidRequest, intervalReply.ProtocolStatus.Code); Assert.Equal(ProtocolStatusCode.Ok, intervalReply.ProtocolStatus.Code); // AddBufferedItem — must return a real item handle (the dev rig yields handle 1). MxCommandReply addBufferedReply = await fixture.Service.Invoke( CreateAddBufferedItemRequest(sessionId, serverHandle, bufferedItem, bufferedContext), new TestServerCallContext()).ConfigureAwait(false); LogReply("AddBufferedItem", addBufferedReply); Assert.Equal(MxCommandKind.AddBufferedItem, addBufferedReply.Kind); Assert.NotEqual(ProtocolStatusCode.InvalidRequest, addBufferedReply.ProtocolStatus.Code); Assert.Equal(ProtocolStatusCode.Ok, addBufferedReply.ProtocolStatus.Code); Assert.NotNull(addBufferedReply.AddBufferedItem); int bufferedItemHandle = addBufferedReply.AddBufferedItem.ItemHandle; Assert.True(bufferedItemHandle > 0, "AddBufferedItem must yield a usable item handle."); MxCommandReply adviseReply = await fixture.Service.Invoke( CreateAdviseRequest(sessionId, serverHandle, bufferedItemHandle), new TestServerCallContext()).ConfigureAwait(false); LogReply("Advise(buffered)", adviseReply); Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); // Best-effort capture of a SAMPLE-BEARING buffered batch. // // Live observation (B8): immediately after Advise the provider delivers an // initial OnBufferedDataChange with data_type=NoData / raw_data_type=0 and zero // quality+timestamp samples — the buffered analogue of the bad-quality/ // registration-state bootstrap event the OnDataChange tests skip with their // family-match predicate. That empty bootstrap is parity, NOT a dropped payload: // the converter ran without crashing and there were simply no samples to carry. // We therefore match only a batch that actually carries samples, so a real // multi-sample conversion can be validated and the empty bootstrap is skipped // rather than mistaken for a defect. MxEvent? bufferedBatch = null; try { bufferedBatch = await eventWriter .WaitForMessageAsync( candidate => candidate.Family == MxEventFamily.OnBufferedDataChange && candidate.ServerHandle == serverHandle && candidate.ItemHandle == bufferedItemHandle && candidate.OnBufferedDataChange is not null && (CountArrayElements(candidate.OnBufferedDataChange.QualityValues) > 0 || CountArrayElements(candidate.OnBufferedDataChange.TimestampValues) > 0), IntegrationTestEnvironment.LiveMxAccessEventTimeout, streamCancellation.Token) .ConfigureAwait(false); } catch (TimeoutException) { bufferedBatch = null; } // Whether or not a sample-bearing batch arrived, record what buffered events the // rig DID deliver (typically just the empty NoData bootstrap) for the record. int bootstrapBufferedEvents = CountMatchingEvents( eventWriter, e => e.Family == MxEventFamily.OnBufferedDataChange && e.ServerHandle == serverHandle && e.ItemHandle == bufferedItemHandle); if (bufferedBatch is null) { // RESIDUAL (documented): the command path (AddBufferedItem + // SetBufferedUpdateInterval + Advise) is proven and the buffered EVENT plumbing // is live (the empty NoData bootstrap arrives and converts without crashing), // but the rig did not drive a sample-bearing buffered batch within the timeout // — the same environmental limitation as the externally-undrivable alarm rig. // The §3.2 OnBufferedDataChange MULTI-SAMPLE conversion therefore remains // unverified live. This is environmental, not a defect — let the test pass. output.WriteLine( "B8 RESIDUAL: AddBufferedItem/SetBufferedUpdateInterval/Advise round-tripped and " + $"{bootstrapBufferedEvents} OnBufferedDataChange event(s) arrived (empty NoData " + "bootstrap, converted without crash/drop), but no sample-bearing buffered batch " + $"was observed within {IntegrationTestEnvironment.LiveMxAccessEventTimeout}. Live " + "§3.2 multi-sample conversion remains unverified (rig object logic may not drive " + "buffered samples on demand)."); return; } // A SAMPLE-BEARING buffered batch was captured — validate the §3.2 conversion. LogEvent(bufferedBatch); OnBufferedDataChangeEvent body = bufferedBatch.OnBufferedDataChange; Assert.NotNull(body); int qualityCount = CountArrayElements(body.QualityValues); int timestampCount = CountArrayElements(body.TimestampValues); output.WriteLine( $"B8 CAPTURED buffered batch: data_type={body.DataType} raw_data_type={body.RawDataType} " + $"quality_samples={qualityCount} timestamp_samples={timestampCount} " + $"value_kind={bufferedBatch.Value?.KindCase}"); // The predicate guaranteed at least one sample; the converted aggregate value // must also exist (no crash, no dropped payload). Assert.True( qualityCount > 0 || timestampCount > 0, "Sample-bearing OnBufferedDataChange lost its samples after the predicate matched."); Assert.NotNull(bufferedBatch.Value); // When MXAccess delivers parallel quality + timestamp arrays the converted // arrays must agree in length; a mismatch is a real conversion defect (a sample // was dropped on one side). if (qualityCount > 0 && timestampCount > 0) { Assert.Equal(qualityCount, timestampCount); } } finally { streamCancellation.Cancel(); await ShutDownAsync(fixture, processFactory, sessionId, streamTask).ConfigureAwait(false); } } /// /// Verifies that killing the worker process marks the session /// with a clean fault classification — the gateway /// must observe the abnormal exit, transition the session, and surface a non-empty /// fault description rather than hanging or crashing. /// [LiveMxAccessFact] public async Task GatewaySession_WithLiveWorker_AbnormalWorkerExit_MarksSessionFaulted() { string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath(); Assert.True( File.Exists(workerExecutablePath), $"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}."); TestWorkerProcessFactory processFactory = new(output); await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output); using RecordingServerStreamWriter eventWriter = new(); string? sessionId = null; Task? streamTask = null; using CancellationTokenSource streamCancellation = new(); try { OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "live-mxaccess-abnormal-exit", ClientCorrelationId = "live-open-abnormal", CommandTimeout = Duration.FromTimeSpan(CommandTimeout), }, new TestServerCallContext()).ConfigureAwait(false); sessionId = openReply.SessionId; Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); streamTask = fixture.Service.StreamEvents( new StreamEventsRequest { SessionId = sessionId }, eventWriter, new TestServerCallContext(streamCancellation.Token)); // Kill the worker process directly. WorkerClient's read loop hits an // end-of-stream on the named pipe and routes through SetFaulted; the // session manager then marks the session Faulted. We avoid CloseSession // so the transition is driven by the abnormal exit, not a graceful path. processFactory.KillAllAndDetach(); DateTimeOffset waitDeadline = DateTimeOffset.UtcNow + StreamShutdownTimeout; SessionState observedState = SessionState.Unspecified; string? observedFault = null; while (DateTimeOffset.UtcNow < waitDeadline) { if (fixture.TryGetSession(sessionId, out GatewaySession? session)) { observedState = session.State; observedFault = session.FinalFault; if (observedState == SessionState.Faulted) { break; } } await Task.Delay(TimeSpan.FromMilliseconds(50)).ConfigureAwait(false); } output.WriteLine($"AbnormalExit observed_state={observedState} fault={observedFault}"); Assert.Equal(SessionState.Faulted, observedState); Assert.False(string.IsNullOrWhiteSpace(observedFault), "Faulted session must carry a non-empty fault description."); // The fault classification must come from a known worker-client error code so // operators get an actionable cause string rather than an opaque exception // trace. We accept the classifications WorkerClient actually drives on an // abnormal exit (kill-the-process path): the read loop hits EndOfStream and // calls SetFaulted with WorkerClientErrorCode.PipeDisconnected and the // message "Worker pipe disconnected." (see WorkerClient.cs:378-381). The // earlier broad list (including "worker") matched every WorkerClient fault // message (they all begin with "Worker"); tighten to the pipe/disconnect/ // end-of-stream classifications that match THIS path, so a regression that // routed an unrelated fault here would surface as a test failure rather // than silently passing (see IntegrationTests-020). "heartbeat" is dropped // because HeartbeatGraceSeconds (15s) exceeds the StreamShutdownTimeout // (10s) poll window, so a heartbeat-expired transition can never be // observed inside this test. Assert.True( observedFault!.Contains("pipe disconnected", StringComparison.OrdinalIgnoreCase) || observedFault.Contains("end of stream", StringComparison.OrdinalIgnoreCase), $"Fault description '{observedFault}' did not match a known abnormal-exit classification " + "(expected 'pipe disconnected' or 'end of stream' from WorkerClient's EndOfStream path)."); // IntegrationTests-021: also assert the StreamEvents call observed the fault // — the chain that puts the session into Faulted goes through ReadEventsAsync // propagating a WorkerClientException into EventStreamService, which calls // session.MarkFaulted. The gateway then maps the WorkerClientException to an // RpcException at the public boundary (MxAccessGatewayService.MapException → // MapWorkerClientException). Polling session.State alone would silently pass // if a future refactor moved MarkFaulted off the stream-consumption path — // assert the streamTask itself terminated with a fault so the test couples // to the actual fault-propagation path. Compare to the inverse assertion in // the Write parity test (line 217: Assert.False(streamTask.IsFaulted, ...)). try { await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false); } catch (Exception streamException) { output.WriteLine($"StreamEvents task terminated with: {streamException.GetType().Name}: {streamException.Message}"); } Assert.True( streamTask.IsCompleted, "StreamEvents task did not complete within the shutdown timeout after the worker was killed."); Assert.True( streamTask.IsFaulted, "StreamEvents task must fault on abnormal worker exit, not complete cleanly — " + "the fault-propagation path from WorkerClient.SetFaulted through ReadEventsAsync is the contract."); } finally { streamCancellation.Cancel(); // sessionId is intentionally null here — the session is already faulted and a // CloseSession round-trip would just log a cleanup failure. We still wait for // the worker process exit so the next test starts with a clean state. await ShutDownAsync(fixture, processFactory, sessionId: null, streamTask).ConfigureAwait(false); } } /// /// Closes the session and drains the event stream / worker processes without letting a /// cleanup timeout mask the original failure from the test body. /// /// /// When , a faulted is rethrown so the /// test fails on a silent stream-task exception (the Write parity test relies on this so /// stream-side defects in event delivery are visible). When , all /// cleanup exceptions are logged and swallowed so a real test-body assertion failure is not /// masked by a shutdown timeout (the original IntegrationTests-004 fix). /// private async Task ShutDownAsync( GatewayServiceFixture fixture, TestWorkerProcessFactory processFactory, string? sessionId, Task? streamTask, bool propagateStreamFaults = false) { Exception? streamFault = null; try { if (!string.IsNullOrWhiteSpace(sessionId)) { await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false); } } catch (Exception ex) { output.WriteLine($"Cleanup error during session close: {ex}"); } if (streamTask is not null) { try { await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false); } catch (OperationCanceledException ex) { // A linked CancellationToken on the streaming TestServerCallContext is the // intended way to stop StreamEvents promptly — treat the resulting // OperationCanceledException as a clean shutdown, not a fault. output.WriteLine($"Event stream task cancelled during shutdown: {ex.Message}"); } catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) { // MxAccessGatewayService.MapException intentionally converts the // server-side OperationCanceledException into RpcException(Cancelled) // so real gRPC clients see the standard cancellation status. Treat // that as the same clean-shutdown signal as a raw OCE. output.WriteLine($"Event stream task cancelled during shutdown: {ex.Status.Detail}"); } catch (Exception ex) { // Cleanup runs in a finally block. By default a faulted StreamEvents task is // logged and swallowed so a test-body assertion failure is not masked. When // the caller opts into propagateStreamFaults (the Write parity test), we // rethrow the fault after the worker-process wait so a silent stream-side // defect actually fails the test. output.WriteLine($"Event stream task faulted during shutdown: {ex}"); if (propagateStreamFaults) { streamFault = ex; } } } try { await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false); } catch (Exception ex) { output.WriteLine($"Cleanup error while waiting for worker processes to exit: {ex}"); } if (streamFault is not null) { throw streamFault; } } private static MxCommandRequest CreateRegisterRequest(string sessionId) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-register", Command = new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = IntegrationTestEnvironment.LiveMxAccessClientName, }, }, }; } private static MxCommandRequest CreateAddItemRequest( string sessionId, int serverHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-add-item", Command = new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = serverHandle, ItemDefinition = IntegrationTestEnvironment.LiveMxAccessItem, }, }, }; } private static MxCommandRequest CreateAdviseRequest( string sessionId, int serverHandle, int itemHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-advise", Command = new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, }; } private static MxCommandRequest CreateWriteRequest( string sessionId, int serverHandle, int itemHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-write", Command = new MxCommand { Kind = MxCommandKind.Write, Write = new WriteCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, Value = new MxValue { DataType = MxDataType.Integer, Int32Value = 1, }, }, }, }; } private static MxCommandRequest CreateUnAdviseRequest( string sessionId, int serverHandle, int itemHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-unadvise", Command = new MxCommand { Kind = MxCommandKind.UnAdvise, UnAdvise = new UnAdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, }; } private static MxCommandRequest CreateRemoveItemRequest( string sessionId, int serverHandle, int itemHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-remove-item", Command = new MxCommand { Kind = MxCommandKind.RemoveItem, RemoveItem = new RemoveItemCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, }; } private static MxCommandRequest CreateUnregisterRequest( string sessionId, int serverHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-unregister", Command = new MxCommand { Kind = MxCommandKind.Unregister, Unregister = new UnregisterCommand { ServerHandle = serverHandle, }, }, }; } private static MxCommandRequest CreateAuthenticateUserRequest( string sessionId, int serverHandle, string verifyUser, string verifyPassword) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-authenticate-user", Command = new MxCommand { Kind = MxCommandKind.AuthenticateUser, AuthenticateUser = new AuthenticateUserCommand { ServerHandle = serverHandle, VerifyUser = verifyUser, VerifyUserPassword = verifyPassword, }, }, }; } private static MxCommandRequest CreateArchestrAUserToIdRequest( string sessionId, int serverHandle, string userIdGuid) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-archestra-user-to-id", Command = new MxCommand { Kind = MxCommandKind.ArchestraUserToId, ArchestraUserToId = new ArchestrAUserToIdCommand { ServerHandle = serverHandle, UserIdGuid = userIdGuid, }, }, }; } private static MxCommandRequest CreateAddBufferedItemRequest( string sessionId, int serverHandle, string itemDefinition, string itemContext) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-add-buffered-item", Command = new MxCommand { Kind = MxCommandKind.AddBufferedItem, AddBufferedItem = new AddBufferedItemCommand { ServerHandle = serverHandle, ItemDefinition = itemDefinition, ItemContext = itemContext, }, }, }; } private static MxCommandRequest CreateSetBufferedUpdateIntervalRequest( string sessionId, int serverHandle, int updateIntervalMilliseconds) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-set-buffered-update-interval", Command = new MxCommand { Kind = MxCommandKind.SetBufferedUpdateInterval, SetBufferedUpdateInterval = new SetBufferedUpdateIntervalCommand { ServerHandle = serverHandle, UpdateIntervalMilliseconds = updateIntervalMilliseconds, }, }, }; } private static MxCommandRequest CreateSuspendRequest( string sessionId, int serverHandle, int itemHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-suspend", Command = new MxCommand { Kind = MxCommandKind.Suspend, Suspend = new SuspendCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, }; } private static MxCommandRequest CreateActivateRequest( string sessionId, int serverHandle, int itemHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-activate", Command = new MxCommand { Kind = MxCommandKind.Activate, Activate = new ActivateCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, }; } private static MxCommandRequest CreateWriteSecuredRequest( string sessionId, int serverHandle, int itemHandle, int currentUserId, int verifierUserId) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "live-write-secured", Command = new MxCommand { Kind = MxCommandKind.WriteSecured, WriteSecured = new WriteSecuredCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, CurrentUserId = currentUserId, VerifierUserId = verifierUserId, Value = new MxValue { DataType = MxDataType.Integer, Int32Value = 2, }, }, }, }; } private static (string VerifyUser, string VerifyPassword) ResolveLiveMxAccessSecuredCredentials() { string verifyUser = Environment.GetEnvironmentVariable("MXGATEWAY_LIVE_MXACCESS_WRITE_SECURED_USER") ?? "admin"; string verifyPassword = Environment.GetEnvironmentVariable("MXGATEWAY_LIVE_MXACCESS_WRITE_SECURED_PASSWORD") ?? "admin123"; return (verifyUser, verifyPassword); } /// /// Splits a dotted MXAccess reference (e.g. "TestChildObject.TestInt") into the /// (item_definition, item_context) pair AddBufferedItem expects — attribute name and /// owning object. An undotted reference is passed through with an empty context. /// private static (string Item, string Context) SplitLiveItemForBuffered(string liveItem) { int lastDot = liveItem.LastIndexOf('.'); if (lastDot <= 0 || lastDot >= liveItem.Length - 1) { return (liveItem, string.Empty); } string context = liveItem[..lastDot]; string item = liveItem[(lastDot + 1)..]; return (item, context); } /// /// Counts the elements in a converted buffered across whichever /// typed-array oneof case the VariantConverter populated, so the buffered-capture /// assertions are independent of the rig item's element type. /// private static int CountArrayElements(MxArray? array) { if (array is null) { return 0; } return array.ValuesCase switch { MxArray.ValuesOneofCase.BoolValues => array.BoolValues.Values.Count, MxArray.ValuesOneofCase.Int32Values => array.Int32Values.Values.Count, MxArray.ValuesOneofCase.Int64Values => array.Int64Values.Values.Count, MxArray.ValuesOneofCase.FloatValues => array.FloatValues.Values.Count, MxArray.ValuesOneofCase.DoubleValues => array.DoubleValues.Values.Count, MxArray.ValuesOneofCase.StringValues => array.StringValues.Values.Count, MxArray.ValuesOneofCase.TimestampValues => array.TimestampValues.Values.Count, MxArray.ValuesOneofCase.RawValues => array.RawValues.Values.Count, _ => 0, }; } private static int CountMatchingEvents( RecordingServerStreamWriter writer, Func predicate) { int count = 0; foreach (MxEvent message in writer.Messages) { if (predicate(message)) { count++; } } return count; } private async Task CloseSessionAsync( GatewayServiceFixture fixture, string sessionId) { CloseSessionReply closeReply = await fixture.Service.CloseSession( new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = "live-close", }, new TestServerCallContext()).ConfigureAwait(false); output.WriteLine($"CloseSession status={closeReply.ProtocolStatus.Code} final_state={closeReply.FinalState}"); } private void LogReply( string method, MxCommandReply reply) { LogReplyTo(output, method, reply); } private static void LogReplyTo( ITestOutputHelper sink, string method, MxCommandReply reply) { sink.WriteLine( $"{method} status={reply.ProtocolStatus.Code} hresult={reply.Hresult} diagnostic={reply.DiagnosticMessage}"); foreach (MxStatusProxy status in reply.Statuses) { sink.WriteLine( $"{method} mxstatus success={status.Success} category={status.Category} detail={status.Detail} text={status.DiagnosticText}"); } } private void LogEvent(MxEvent dataChange) { output.WriteLine( $"Event family={dataChange.Family} worker_sequence={dataChange.WorkerSequence} server_handle={dataChange.ServerHandle} item_handle={dataChange.ItemHandle} quality={dataChange.Quality}"); output.WriteLine( $"Event value_type={dataChange.Value?.DataType} raw_status={dataChange.RawStatus}"); } /// /// Test fixture that assembles the gateway service with a worker process factory for live MXAccess testing. /// private sealed class GatewayServiceFixture : IAsyncDisposable { private readonly GatewayMetrics _metrics = new(); private readonly SessionRegistry _registry = new(); private readonly ILoggerFactory _loggerFactory; /// /// Initializes the fixture with worker executable path, factory, and test output helper. /// /// Path to the worker process executable. /// Factory for creating worker processes. /// Test output helper for logging. public GatewayServiceFixture( string workerExecutablePath, IWorkerProcessFactory processFactory, ITestOutputHelper output) { IOptions options = Options.Create(CreateOptions(workerExecutablePath)); _loggerFactory = LoggerFactory.Create(builder => builder.AddProvider(new TestOutputLoggerProvider(output))); WorkerProcessLauncher launcher = new( options, processFactory, new WorkerProcessStartedProbe(), _metrics); SessionWorkerClientFactory workerClientFactory = new( launcher, options, _metrics, _loggerFactory); SessionManager sessionManager = new( _registry, workerClientFactory, options, _metrics, logger: _loggerFactory.CreateLogger()); MxAccessGrpcMapper mapper = new(); EventStreamService eventStreamService = new( sessionManager, options, mapper, _metrics, NullDashboardEventBroadcaster.Instance, _loggerFactory.CreateLogger()); Service = new MxAccessGatewayService( sessionManager, new GatewayRequestIdentityAccessor(), new AllowAllConstraintEnforcer(), new MxAccessGrpcRequestValidator(), mapper, eventStreamService, _metrics, _loggerFactory.CreateLogger(), new ZB.MOM.WW.MxGateway.Server.Alarms.GatewayAlarmMonitor( sessionManager, EmptyAlarmWatchListResolver.Instance, _metrics, options, _loggerFactory.CreateLogger())); } /// /// The assembled gateway service instance. /// public MxAccessGatewayService Service { get; } /// /// Looks up a session by id directly against the in-process registry. The abnormal /// worker-exit test needs to observe the session's State / FinalFault as the gateway /// transitions it to Faulted, which the public gRPC API only exposes indirectly via /// CloseSession's reply (and not before a graceful close completes). /// /// The session identifier. /// The session if found; otherwise null. public bool TryGetSession(string sessionId, [MaybeNullWhen(false)] out GatewaySession session) { return _registry.TryGet(sessionId, out session); } /// /// Disposes the fixture resources and closes all sessions. /// public async ValueTask DisposeAsync() { foreach (GatewaySession session in _registry.Snapshot()) { await session.DisposeAsync().ConfigureAwait(false); } _loggerFactory.Dispose(); _metrics.Dispose(); } private static GatewayOptions CreateOptions(string workerExecutablePath) { return new GatewayOptions { Worker = new WorkerOptions { ExecutablePath = workerExecutablePath, StartupTimeoutSeconds = 30, ShutdownTimeoutSeconds = 15, HeartbeatIntervalSeconds = 5, HeartbeatGraceSeconds = 15, MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, RequiredArchitecture = WorkerArchitecture.X86, }, Sessions = new SessionOptions { DefaultCommandTimeoutSeconds = 15, MaxSessions = 1, }, Events = new EventOptions { QueueCapacity = 32, }, }; } } /// /// Gathers messages written to a server stream for test inspection. /// private sealed class RecordingServerStreamWriter : IServerStreamWriter, IDisposable { private readonly object syncRoot = new(); private readonly List messages = []; private readonly SemaphoreSlim messageArrived = new(0); /// /// All messages that have been written to the stream. /// public IReadOnlyList Messages { get { lock (syncRoot) { return messages.ToArray(); } } } /// /// Inherited write options. /// public WriteOptions? WriteOptions { get; set; } /// /// Records the message and signals any pending waiter. /// /// The message to write. public Task WriteAsync(T message) { lock (syncRoot) { messages.Add(message); } messageArrived.Release(); return Task.CompletedTask; } /// /// Waits for the first recorded message that satisfies , /// up to the specified timeout. Earlier non-matching messages (for example a /// registration-state bootstrap event) are skipped rather than treated as the result. /// /// Filter the awaited message must satisfy. /// The maximum total time to wait. /// /// Token observed alongside the timeout so a per-test cancellation (for example the /// gRPC call context's token) aborts the wait promptly instead of hanging until the /// timeout elapses. /// /// The first message that satisfies the predicate. public async Task WaitForMessageAsync( Func predicate, TimeSpan timeout, CancellationToken cancellationToken = default) { using CancellationTokenSource timeoutCancellation = new(timeout); using CancellationTokenSource linkedCancellation = CancellationTokenSource.CreateLinkedTokenSource(timeoutCancellation.Token, cancellationToken); int scanned = 0; while (true) { T[] snapshot; lock (syncRoot) { snapshot = messages.ToArray(); } for (; scanned < snapshot.Length; scanned++) { if (predicate(snapshot[scanned])) { return snapshot[scanned]; } } try { await messageArrived.WaitAsync(linkedCancellation.Token).ConfigureAwait(false); } catch (OperationCanceledException) when (timeoutCancellation.IsCancellationRequested) { throw new TimeoutException( $"No stream message satisfied the predicate within {timeout}. Recorded {scanned} message(s)."); } } } /// /// Releases the wait handle backing messageArrived. The writer owns an /// field so it must be disposable itself; the leak /// is otherwise bounded only by how many opt-in live tests run. /// public void Dispose() { messageArrived.Dispose(); } } /// /// Minimal stub for invoking the gRPC service /// in-process. It is a hand-written fake with no verification behavior — it /// only supplies the context values the service reads during a call. /// private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext { private readonly Metadata requestHeaders = []; private readonly Metadata responseTrailers = []; private readonly Dictionary userState = []; private Status status; private WriteOptions? writeOptions; /// protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test"; /// protected override string HostCore => "localhost"; /// protected override string PeerCore => "ipv4:127.0.0.1:5000"; /// protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1); /// protected override Metadata RequestHeadersCore => requestHeaders; /// protected override CancellationToken CancellationTokenCore => cancellationToken; /// protected override Metadata ResponseTrailersCore => responseTrailers; /// protected override Status StatusCore { get => status; set => status = value; } /// protected override WriteOptions? WriteOptionsCore { get => writeOptions; set => writeOptions = value; } /// protected override AuthContext AuthContextCore { get; } = new( string.Empty, new Dictionary>(StringComparer.Ordinal)); /// protected override IDictionary UserStateCore => userState; /// protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) { return Task.CompletedTask; } /// protected override ContextPropagationToken CreatePropagationTokenCore( ContextPropagationOptions? options) { throw new NotSupportedException(); } } /// /// Factory that launches worker processes and records their outputs for testing. /// private sealed class TestWorkerProcessFactory(ITestOutputHelper output) : IWorkerProcessFactory { private readonly ConcurrentBag processes = []; /// public IWorkerProcess Start(ProcessStartInfo startInfo) { startInfo.RedirectStandardError = true; startInfo.RedirectStandardOutput = true; startInfo.UseShellExecute = false; Process process = new() { StartInfo = startInfo, EnableRaisingEvents = true, }; process.OutputDataReceived += (_, args) => WriteWorkerOutput("stdout", args.Data); process.ErrorDataReceived += (_, args) => WriteWorkerOutput("stderr", args.Data); if (!process.Start()) { process.Dispose(); throw new InvalidOperationException("Worker process failed to start."); } process.BeginOutputReadLine(); process.BeginErrorReadLine(); TestWorkerProcess workerProcess = new(process); processes.Add(workerProcess); output.WriteLine($"WorkerProcess started pid={workerProcess.Id} path={startInfo.FileName}"); return workerProcess; } /// public async Task WaitForProcessesAsync(TimeSpan timeout) { foreach (TestWorkerProcess process in processes) { if (process.HasExited) { output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}"); continue; } using CancellationTokenSource timeoutCancellation = new(timeout); await process.WaitForExitAsync(timeoutCancellation.Token).ConfigureAwait(false); output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}"); } } /// /// Kills every recorded worker process tree so the abnormal-exit test can simulate a /// crashed worker without going through the graceful shutdown handshake. Failures to /// kill an already-dead process are tolerated. /// public void KillAllAndDetach() { foreach (TestWorkerProcess process in processes) { if (process.HasExited) { continue; } try { process.Kill(entireProcessTree: true); output.WriteLine($"WorkerProcess killed pid={process.Id} (abnormal-exit simulation)"); } catch (InvalidOperationException ex) { output.WriteLine($"WorkerProcess kill skipped pid={process.Id}: {ex.Message}"); } } } private void WriteWorkerOutput( string streamName, string? line) { if (!string.IsNullOrWhiteSpace(line)) { output.WriteLine($"worker_{streamName}: {line}"); } } } /// /// Adapter wrapping a System.Diagnostics.Process as IWorkerProcess for testing. /// private sealed class TestWorkerProcess(Process process) : IWorkerProcess { /// public int Id => process.Id; /// public bool HasExited => process.HasExited; /// public int? ExitCode => process.HasExited ? process.ExitCode : null; /// public async ValueTask WaitForExitAsync(CancellationToken cancellationToken) { await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false); } /// public void Kill(bool entireProcessTree) { process.Kill(entireProcessTree); } /// public void Dispose() { process.Dispose(); } } /// /// Logger provider that writes all output to the test output helper. /// private sealed class TestOutputLoggerProvider(ITestOutputHelper output) : ILoggerProvider { /// public ILogger CreateLogger(string categoryName) { return new TestOutputLogger(output, categoryName); } /// public void Dispose() { } } /// /// Logger that writes messages to the test output helper. /// private sealed class TestOutputLogger( ITestOutputHelper output, string categoryName) : ILogger { /// public IDisposable? BeginScope(TState state) where TState : notnull { return null; } /// public bool IsEnabled(LogLevel logLevel) { return logLevel >= LogLevel.Information; } /// public void Log( LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) { if (!IsEnabled(logLevel)) { return; } output.WriteLine($"{logLevel} {categoryName}: {formatter(state, exception)}"); if (exception is not null) { output.WriteLine(exception.ToString()); } } } /// /// Buffering wrapper around an that mirrors every line /// written through it into a the test owns. The WriteSecured /// parity test (IntegrationTests-019) uses this to make CLAUDE.md's "passwords and /// WriteSecured payloads must never reach logs" rule a property of the entire /// test output stream — gateway entries (echoed via /// ), worker stdout/stderr (echoed via /// ), and direct /// output.WriteLine calls all land in the same buffer, so a future maintenance /// change that prints a credential through any of those channels is caught by the /// assertion rather than slipping past the existing DiagnosticMessage check. /// private sealed class RecordingTestOutputHelper(ITestOutputHelper inner) : ITestOutputHelper { private readonly StringBuilder buffer = new(); private readonly object syncRoot = new(); /// Gets the accumulated output buffer contents. public string Captured { get { lock (syncRoot) { return buffer.ToString(); } } } /// Writes a line of text to the buffer and inner helper. /// The message to write. public void WriteLine(string message) { lock (syncRoot) { buffer.AppendLine(message); } inner.WriteLine(message); } /// Writes a formatted line of text to the buffer and inner helper. /// The message format string. /// The format arguments. public void WriteLine(string format, params object[] args) { string formatted = string.Format(System.Globalization.CultureInfo.InvariantCulture, format, args); lock (syncRoot) { buffer.AppendLine(formatted); } inner.WriteLine(format, args); } } private sealed class AllowAllConstraintEnforcer : IConstraintEnforcer { /// public Task CheckReadTagAsync( ApiKeyIdentity? identity, string tagAddress, CancellationToken cancellationToken) => Task.FromResult(null); /// public Task CheckReadHandleAsync( ApiKeyIdentity? identity, GatewaySession session, int serverHandle, int itemHandle, CancellationToken cancellationToken) => Task.FromResult(null); /// public Task CheckWriteHandleAsync( ApiKeyIdentity? identity, GatewaySession session, int serverHandle, int itemHandle, CancellationToken cancellationToken) => Task.FromResult(null); /// public Task RecordDenialAsync( ApiKeyIdentity? identity, string commandKind, string target, ConstraintFailure failure, string? correlationId, CancellationToken cancellationToken) => Task.CompletedTask; } }