using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using MxGateway.Contracts.Proto; using MxGateway.Worker.Sta; namespace MxGateway.Worker.MxAccess; public sealed class MxAccessStaSession : IWorkerRuntimeSession { private readonly IMxAccessComObjectFactory factory; private readonly IMxAccessEventSink eventSink; private readonly MxAccessEventQueue eventQueue; private readonly StaRuntime staRuntime; private StaCommandDispatcher? commandDispatcher; private MxAccessSession? session; private bool disposed; public MxAccessStaSession() : this( new StaRuntime(), new MxAccessComObjectFactory(), new MxAccessEventQueue()) { } public MxAccessStaSession( StaRuntime staRuntime, IMxAccessComObjectFactory factory, IMxAccessEventSink eventSink) : this(staRuntime, factory, eventSink, new MxAccessEventQueue()) { } public MxAccessStaSession( StaRuntime staRuntime, IMxAccessComObjectFactory factory, MxAccessEventQueue eventQueue) : this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue) { } public MxAccessStaSession( StaRuntime staRuntime, IMxAccessComObjectFactory factory, IMxAccessEventSink eventSink, MxAccessEventQueue eventQueue) { this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime)); this.factory = factory ?? throw new ArgumentNullException(nameof(factory)); this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink)); this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue)); } public MxAccessEventQueue EventQueue => eventQueue; public Task StartAsync( int workerProcessId, CancellationToken cancellationToken = default) { return StartAsync(string.Empty, workerProcessId, cancellationToken); } public Task StartAsync( string sessionId, int workerProcessId, CancellationToken cancellationToken = default) { staRuntime.Start(); return staRuntime.InvokeAsync( () => { if (session is not null) { throw new InvalidOperationException("MXAccess COM session has already been created."); } session = MxAccessSession.Create(factory, eventSink, sessionId); commandDispatcher = new StaCommandDispatcher( staRuntime, new MxAccessCommandExecutor(session)); return session.CreateWorkerReady(workerProcessId); }, cancellationToken); } public Task DispatchAsync(StaCommand command) { if (commandDispatcher is null) { throw new InvalidOperationException("MXAccess COM session has not been started."); } return commandDispatcher.DispatchAsync(command); } public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat() { uint pendingCommandCount = 0; string currentCommandCorrelationId = string.Empty; if (commandDispatcher is not null) { pendingCommandCount = (uint)commandDispatcher.PendingCommandCount; currentCommandCorrelationId = commandDispatcher.CurrentCommandCorrelationId; } return new WorkerRuntimeHeartbeatSnapshot( staRuntime.LastActivityUtc, pendingCommandCount, (uint)eventQueue.Count, eventQueue.LastEventSequence, currentCommandCorrelationId); } public void RequestShutdown() { commandDispatcher?.RequestShutdown(); } public IReadOnlyList DrainEvents(uint maxEvents) { return eventQueue.Drain(maxEvents); } public Task> GetRegisteredServerHandlesAsync( CancellationToken cancellationToken = default) { if (session is null) { throw new InvalidOperationException("MXAccess COM session has not been started."); } return staRuntime.InvokeAsync( () => session.HandleRegistry.ServerHandles, cancellationToken); } public Task> GetRegisteredItemHandlesAsync( CancellationToken cancellationToken = default) { if (session is null) { throw new InvalidOperationException("MXAccess COM session has not been started."); } return staRuntime.InvokeAsync( () => session.HandleRegistry.ItemHandles, cancellationToken); } public Task> GetRegisteredAdviceHandlesAsync( CancellationToken cancellationToken = default) { if (session is null) { throw new InvalidOperationException("MXAccess COM session has not been started."); } return staRuntime.InvokeAsync( () => session.HandleRegistry.AdviceHandles, cancellationToken); } public async Task ShutdownGracefullyAsync( TimeSpan timeout, CancellationToken cancellationToken = default) { if (timeout <= TimeSpan.Zero) { throw new ArgumentOutOfRangeException( nameof(timeout), "MXAccess graceful shutdown timeout must be greater than zero."); } if (disposed) { return new MxAccessShutdownResult(Array.Empty()); } commandDispatcher?.RequestShutdown(); Stopwatch stopwatch = Stopwatch.StartNew(); MxAccessShutdownResult result; if (session is null) { result = new MxAccessShutdownResult(Array.Empty()); } else { using CancellationTokenSource shutdownCancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); shutdownCancellation.CancelAfter(timeout); Task cleanupTask = staRuntime.InvokeAsync( () => session.ShutdownGracefully(), shutdownCancellation.Token); Task delayTask = Task.Delay(timeout, cancellationToken); Task completedTask = await Task.WhenAny(cleanupTask, delayTask).ConfigureAwait(false); if (completedTask != cleanupTask) { cancellationToken.ThrowIfCancellationRequested(); throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); } result = await cleanupTask.ConfigureAwait(false); } TimeSpan remaining = timeout - stopwatch.Elapsed; if (remaining <= TimeSpan.Zero || !staRuntime.Shutdown(remaining)) { throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); } staRuntime.Dispose(); disposed = true; return result; } public void Dispose() { if (disposed) { return; } RequestShutdown(); if (session is not null) { staRuntime.InvokeAsync(() => session.Dispose()).GetAwaiter().GetResult(); } staRuntime.Dispose(); disposed = true; } }