using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Ipc;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.TestSupport;
///
/// Single configurable test double shared by
/// the IPC tests. Replaces the two independent (and previously diverged)
/// FakeRuntimeSession copies in WorkerPipeSessionTests and
/// WorkerPipeClientTests: one supported dispatch blocking and event enqueue, the
/// other did not. This consolidated double supports every configuration both
/// call sites needed, so a minimal caller simply leaves the options unset.
///
internal sealed class FakeRuntimeSession : IWorkerRuntimeSession
{
private readonly ManualResetEventSlim releaseDispatch = new(false);
private readonly object gate = new();
private readonly Queue events = new();
private WorkerRuntimeHeartbeatSnapshot snapshot = new(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty);
/// Gets the event signaled when dispatch begins.
public ManualResetEventSlim DispatchStarted { get; } = new(false);
/// Blocks dispatch execution until explicitly released.
public bool BlockDispatch { get; set; }
/// Gets or sets whether to throw an exception after dispatch is released.
public bool ThrowAfterDispatchReleased { get; set; }
/// Gets or sets whether ShutdownGracefullyAsync throws a TimeoutException.
public bool ThrowTimeoutOnShutdown { get; set; }
/// Gets a value indicating whether Dispose was called.
public bool Disposed { get; private set; }
/// Starts the worker session with the given session ID and process ID.
/// The session identifier.
/// The worker process ID.
/// Cancellation token.
/// Worker ready response.
public Task StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
});
}
/// Dispatches a command to the STA thread.
/// The command to dispatch.
/// The command reply.
public Task DispatchAsync(StaCommand command)
{
return Task.Run(
() =>
{
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
command.CorrelationId));
DispatchStarted.Set();
if (BlockDispatch)
{
releaseDispatch.Wait(TimeSpan.FromSeconds(5));
}
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty));
if (ThrowAfterDispatchReleased)
{
throw new InvalidOperationException("Command failed after shutdown started.");
}
return new MxCommandReply
{
SessionId = command.SessionId,
CorrelationId = command.CorrelationId,
Kind = command.Kind,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
};
});
}
/// Captures current heartbeat snapshot.
/// Current runtime heartbeat snapshot.
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
{
lock (gate)
{
return snapshot;
}
}
/// Drains queued events up to the specified limit.
/// Maximum events to drain; 0 drains all.
/// The drained events.
public IReadOnlyList DrainEvents(uint maxEvents)
{
lock (gate)
{
int drainCount = maxEvents == 0
? events.Count
: Math.Min(events.Count, checked((int)Math.Min(maxEvents, int.MaxValue)));
List drained = new(drainCount);
for (int index = 0; index < drainCount; index++)
{
drained.Add(events.Dequeue());
}
return drained;
}
}
/// Drains a pending fault if any.
/// Pending fault or null.
public WorkerFault? DrainFault()
{
return null;
}
/// Cancels command by correlation ID.
/// The command correlation ID.
/// True if cancelled; false otherwise.
public bool CancelCommand(string correlationId)
{
return false;
}
/// Requests graceful shutdown.
public void RequestShutdown()
{
releaseDispatch.Set();
}
/// Shuts down gracefully within the specified timeout.
/// Shutdown timeout period.
/// Cancellation token.
/// Shutdown result.
public Task ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
releaseDispatch.Set();
if (ThrowTimeoutOnShutdown)
{
return Task.FromException(
new TimeoutException("Simulated graceful shutdown timeout."));
}
return Task.FromResult(new MxAccessShutdownResult(Array.Empty()));
}
/// Releases a blocked dispatch.
public void ReleaseDispatch()
{
releaseDispatch.Set();
}
/// Sets the current heartbeat snapshot.
/// The snapshot to set.
public void SetSnapshot(WorkerRuntimeHeartbeatSnapshot value)
{
lock (gate)
{
snapshot = value;
}
}
/// Enqueues a worker event to be drained.
/// The event to enqueue.
public void EnqueueEvent(WorkerEvent workerEvent)
{
lock (gate)
{
events.Enqueue(workerEvent);
}
}
/// Disposes resources.
public void Dispose()
{
Disposed = true;
releaseDispatch.Set();
releaseDispatch.Dispose();
DispatchStarted.Dispose();
}
}