using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
///
/// Fake implementation of IMxGatewayClientTransport for testing.
///
internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport
{
private readonly Queue _invokeReplies = new();
private readonly List _events = [];
///
/// Gets the gateway client options.
///
public MxGatewayClientOptions Options { get; } = options;
///
/// Gets null, since this is a test fake without a real gRPC client.
///
public MxAccessGateway.MxAccessGatewayClient? RawClient => null;
///
/// Gets the list of captured OpenSessionAsync calls.
///
public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = [];
///
/// Gets the list of captured CloseSessionAsync calls.
///
public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = [];
///
/// Gets the list of captured InvokeAsync calls.
///
public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = [];
///
/// Gets the list of captured StreamEventsAsync calls.
///
public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = [];
///
/// Gets the list of captured AcknowledgeAlarmAsync calls.
///
public List<(AcknowledgeAlarmRequest Request, CallOptions CallOptions)> AcknowledgeAlarmCalls { get; } = [];
///
/// Gets the list of captured StreamAlarmsAsync calls.
///
public List<(StreamAlarmsRequest Request, CallOptions CallOptions)> StreamAlarmsCalls { get; } = [];
///
/// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync.
///
public Queue AcknowledgeAlarmExceptions { get; } = new();
private readonly Queue _acknowledgeReplies = new();
private readonly List _activeAlarmSnapshots = [];
///
/// Gets or sets the reply to return from OpenSessionAsync.
///
public OpenSessionReply OpenSessionReply { get; set; } = new()
{
SessionId = "session-fixture",
BackendName = "mxaccess-worker",
GatewayProtocolVersion = 1,
WorkerProtocolVersion = 1,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
///
/// Gets or sets the reply to return from CloseSessionAsync.
///
public CloseSessionReply CloseSessionReply { get; set; } = new()
{
SessionId = "session-fixture",
FinalState = SessionState.Closed,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
///
/// Gets the queue of exceptions to throw from OpenSessionAsync.
///
public Queue OpenSessionExceptions { get; } = new();
///
/// Gets the queue of exceptions to throw from CloseSessionAsync.
///
public Queue CloseSessionExceptions { get; } = new();
///
/// Gets or sets a value indicating whether thrown s are mapped
/// to the way the production gRPC transport does. Lets
/// retry tests exercise the wrapped-exception predicate branch that runs in production.
///
public bool MapTransportExceptions { get; set; }
///
/// Gets or sets an optional hook awaited inside CloseSessionAsync after the call is
/// recorded; lets tests pause a close mid-flight to observe concurrent dispose.
///
public Func? CloseSessionHook { get; set; }
///
/// Gets the queue of exceptions to throw from InvokeAsync.
///
public Queue InvokeExceptions { get; } = new();
///
/// Verifies that the OpenSessionAsync call is recorded and returns the configured reply.
///
/// The OpenSessionRequest to process.
/// Call options specifying RPC behavior.
public Task OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
OpenSessionCalls.Add((request, callOptions));
if (OpenSessionExceptions.TryDequeue(out Exception? exception))
{
throw Translate(exception, callOptions);
}
return Task.FromResult(OpenSessionReply);
}
///
/// Verifies that the CloseSessionAsync call is recorded and returns the configured reply.
///
/// The CloseSessionRequest to process.
/// Call options specifying RPC behavior.
public async Task CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
CloseSessionCalls.Add((request, callOptions));
if (CloseSessionHook is not null)
{
await CloseSessionHook().ConfigureAwait(false);
}
if (CloseSessionExceptions.TryDequeue(out Exception? exception))
{
throw Translate(exception, callOptions);
}
return CloseSessionReply;
}
///
/// Verifies that the InvokeAsync call is recorded and returns the next enqueued reply.
///
/// The MxCommandRequest to process.
/// Call options specifying RPC behavior.
public Task InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
InvokeCalls.Add((request, callOptions));
if (InvokeExceptions.TryDequeue(out Exception? exception))
{
throw Translate(exception, callOptions);
}
return Task.FromResult(_invokeReplies.Dequeue());
}
///
/// Verifies that the StreamEventsAsync call is recorded and yields all enqueued events.
///
/// The StreamEventsRequest to process.
/// Call options specifying RPC behavior.
public async IAsyncEnumerable StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions)
{
StreamEventsCalls.Add((request, callOptions));
foreach (MxEvent gatewayEvent in _events)
{
callOptions.CancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return gatewayEvent;
}
}
///
/// Enqueues a reply to be returned from the next InvokeAsync call.
///
/// The reply to enqueue.
public void AddInvokeReply(MxCommandReply reply)
{
_invokeReplies.Enqueue(reply);
}
///
/// Enqueues an event to be yielded from StreamEventsAsync.
///
/// The event to enqueue.
public void AddEvent(MxEvent gatewayEvent)
{
_events.Add(gatewayEvent);
}
///
/// Records the acknowledge call and returns the next enqueued reply (or default).
///
public Task AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CallOptions callOptions)
{
AcknowledgeAlarmCalls.Add((request, callOptions));
if (AcknowledgeAlarmExceptions.TryDequeue(out Exception? exception))
{
throw Translate(exception, callOptions);
}
return Task.FromResult(_acknowledgeReplies.Count > 0
? _acknowledgeReplies.Dequeue()
: new AcknowledgeAlarmReply
{
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Status = new MxStatusProxy { Success = 1, Category = MxStatusCategory.Ok },
});
}
///
/// Records the call and yields each enqueued snapshot as an active-alarm
/// feed message, then a snapshot-complete sentinel.
///
public async IAsyncEnumerable StreamAlarmsAsync(
StreamAlarmsRequest request,
CallOptions callOptions)
{
StreamAlarmsCalls.Add((request, callOptions));
foreach (ActiveAlarmSnapshot snapshot in _activeAlarmSnapshots)
{
callOptions.CancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return new AlarmFeedMessage { ActiveAlarm = snapshot };
}
yield return new AlarmFeedMessage { SnapshotComplete = true };
}
/// Enqueues an acknowledge reply.
public void AddAcknowledgeReply(AcknowledgeAlarmReply reply)
{
_acknowledgeReplies.Enqueue(reply);
}
/// Enqueues a snapshot yielded from StreamAlarmsAsync as an active-alarm message.
public void AddActiveAlarmSnapshot(ActiveAlarmSnapshot snapshot)
{
_activeAlarmSnapshots.Add(snapshot);
}
///
/// Maps a queued exception the way the production gRPC transport does when
/// is set; otherwise returns it unchanged.
///
private Exception Translate(Exception exception, CallOptions callOptions)
{
if (MapTransportExceptions && exception is RpcException rpcException)
{
return RpcExceptionMapper.Map(rpcException, callOptions.CancellationToken);
}
return exception;
}
}