using Grpc.Core; using MxGateway.Contracts.Proto; namespace MxGateway.Client.Tests; /// /// Fake implementation of IMxGatewayClientTransport for testing. /// internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport { private readonly Queue _invokeReplies = new(); private readonly List _events = []; /// /// Gets the gateway client options. /// public MxGatewayClientOptions Options { get; } = options; /// /// Gets null, since this is a test fake without a real gRPC client. /// public MxAccessGateway.MxAccessGatewayClient? RawClient => null; /// /// Gets the list of captured OpenSessionAsync calls. /// public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = []; /// /// Gets the list of captured CloseSessionAsync calls. /// public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = []; /// /// Gets the list of captured InvokeAsync calls. /// public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = []; /// /// Gets the list of captured StreamEventsAsync calls. /// public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = []; /// /// Gets the list of captured AcknowledgeAlarmAsync calls. /// public List<(AcknowledgeAlarmRequest Request, CallOptions CallOptions)> AcknowledgeAlarmCalls { get; } = []; /// /// Gets the list of captured StreamAlarmsAsync calls. /// public List<(StreamAlarmsRequest Request, CallOptions CallOptions)> StreamAlarmsCalls { get; } = []; /// /// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync. /// public Queue AcknowledgeAlarmExceptions { get; } = new(); private readonly Queue _acknowledgeReplies = new(); private readonly List _activeAlarmSnapshots = []; /// /// Gets or sets the reply to return from OpenSessionAsync. /// public OpenSessionReply OpenSessionReply { get; set; } = new() { SessionId = "session-fixture", BackendName = "mxaccess-worker", GatewayProtocolVersion = 1, WorkerProtocolVersion = 1, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }; /// /// Gets or sets the reply to return from CloseSessionAsync. /// public CloseSessionReply CloseSessionReply { get; set; } = new() { SessionId = "session-fixture", FinalState = SessionState.Closed, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }; /// /// Gets the queue of exceptions to throw from OpenSessionAsync. /// public Queue OpenSessionExceptions { get; } = new(); /// /// Gets the queue of exceptions to throw from CloseSessionAsync. /// public Queue CloseSessionExceptions { get; } = new(); /// /// Gets or sets a value indicating whether thrown s are mapped /// to the way the production gRPC transport does. Lets /// retry tests exercise the wrapped-exception predicate branch that runs in production. /// public bool MapTransportExceptions { get; set; } /// /// Gets or sets an optional hook awaited inside CloseSessionAsync after the call is /// recorded; lets tests pause a close mid-flight to observe concurrent dispose. /// public Func? CloseSessionHook { get; set; } /// /// Gets the queue of exceptions to throw from InvokeAsync. /// public Queue InvokeExceptions { get; } = new(); /// /// Verifies that the OpenSessionAsync call is recorded and returns the configured reply. /// /// The OpenSessionRequest to process. /// Call options specifying RPC behavior. public Task OpenSessionAsync( OpenSessionRequest request, CallOptions callOptions) { OpenSessionCalls.Add((request, callOptions)); if (OpenSessionExceptions.TryDequeue(out Exception? exception)) { throw Translate(exception, callOptions); } return Task.FromResult(OpenSessionReply); } /// /// Verifies that the CloseSessionAsync call is recorded and returns the configured reply. /// /// The CloseSessionRequest to process. /// Call options specifying RPC behavior. public async Task CloseSessionAsync( CloseSessionRequest request, CallOptions callOptions) { CloseSessionCalls.Add((request, callOptions)); if (CloseSessionHook is not null) { await CloseSessionHook().ConfigureAwait(false); } if (CloseSessionExceptions.TryDequeue(out Exception? exception)) { throw Translate(exception, callOptions); } return CloseSessionReply; } /// /// Verifies that the InvokeAsync call is recorded and returns the next enqueued reply. /// /// The MxCommandRequest to process. /// Call options specifying RPC behavior. public Task InvokeAsync( MxCommandRequest request, CallOptions callOptions) { InvokeCalls.Add((request, callOptions)); if (InvokeExceptions.TryDequeue(out Exception? exception)) { throw Translate(exception, callOptions); } return Task.FromResult(_invokeReplies.Dequeue()); } /// /// Verifies that the StreamEventsAsync call is recorded and yields all enqueued events. /// /// The StreamEventsRequest to process. /// Call options specifying RPC behavior. public async IAsyncEnumerable StreamEventsAsync( StreamEventsRequest request, CallOptions callOptions) { StreamEventsCalls.Add((request, callOptions)); foreach (MxEvent gatewayEvent in _events) { callOptions.CancellationToken.ThrowIfCancellationRequested(); await Task.Yield(); yield return gatewayEvent; } } /// /// Enqueues a reply to be returned from the next InvokeAsync call. /// /// The reply to enqueue. public void AddInvokeReply(MxCommandReply reply) { _invokeReplies.Enqueue(reply); } /// /// Enqueues an event to be yielded from StreamEventsAsync. /// /// The event to enqueue. public void AddEvent(MxEvent gatewayEvent) { _events.Add(gatewayEvent); } /// /// Records the acknowledge call and returns the next enqueued reply (or default). /// public Task AcknowledgeAlarmAsync( AcknowledgeAlarmRequest request, CallOptions callOptions) { AcknowledgeAlarmCalls.Add((request, callOptions)); if (AcknowledgeAlarmExceptions.TryDequeue(out Exception? exception)) { throw Translate(exception, callOptions); } return Task.FromResult(_acknowledgeReplies.Count > 0 ? _acknowledgeReplies.Dequeue() : new AcknowledgeAlarmReply { CorrelationId = request.ClientCorrelationId, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, Status = new MxStatusProxy { Success = 1, Category = MxStatusCategory.Ok }, }); } /// /// Records the call and yields each enqueued snapshot as an active-alarm /// feed message, then a snapshot-complete sentinel. /// public async IAsyncEnumerable StreamAlarmsAsync( StreamAlarmsRequest request, CallOptions callOptions) { StreamAlarmsCalls.Add((request, callOptions)); foreach (ActiveAlarmSnapshot snapshot in _activeAlarmSnapshots) { callOptions.CancellationToken.ThrowIfCancellationRequested(); await Task.Yield(); yield return new AlarmFeedMessage { ActiveAlarm = snapshot }; } yield return new AlarmFeedMessage { SnapshotComplete = true }; } /// Enqueues an acknowledge reply. public void AddAcknowledgeReply(AcknowledgeAlarmReply reply) { _acknowledgeReplies.Enqueue(reply); } /// Enqueues a snapshot yielded from StreamAlarmsAsync as an active-alarm message. public void AddActiveAlarmSnapshot(ActiveAlarmSnapshot snapshot) { _activeAlarmSnapshots.Add(snapshot); } /// /// Maps a queued exception the way the production gRPC transport does when /// is set; otherwise returns it unchanged. /// private Exception Translate(Exception exception, CallOptions callOptions) { if (MapTransportExceptions && exception is RpcException rpcException) { return RpcExceptionMapper.Map(rpcException, callOptions.CancellationToken); } return exception; } }