Files
mxaccessgw/src/MxGateway.Worker/Sta/StaRuntime.cs
T
2026-04-26 17:19:00 -04:00

268 lines
6.9 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace MxGateway.Worker.Sta;
public sealed class StaRuntime : IDisposable
{
private readonly IStaComApartmentInitializer comApartmentInitializer;
private readonly StaMessagePump messagePump;
private readonly ConcurrentQueue<IStaWorkItem> commandQueue = new();
private readonly AutoResetEvent commandWakeEvent = new(false);
private readonly ManualResetEventSlim startedEvent = new(false);
private readonly ManualResetEventSlim stoppedEvent = new(false);
private readonly object gate = new();
private readonly Thread staThread;
private readonly TimeSpan idlePumpInterval;
private bool disposed;
private bool startRequested;
private bool shutdownRequested;
private Exception? startupException;
private long lastActivityUtcTicks;
private bool comInitialized;
public StaRuntime()
: this(new StaComApartmentInitializer(), new StaMessagePump(), TimeSpan.FromMilliseconds(50))
{
}
public StaRuntime(
IStaComApartmentInitializer comApartmentInitializer,
StaMessagePump messagePump,
TimeSpan idlePumpInterval)
{
this.comApartmentInitializer = comApartmentInitializer
?? throw new ArgumentNullException(nameof(comApartmentInitializer));
this.messagePump = messagePump ?? throw new ArgumentNullException(nameof(messagePump));
if (idlePumpInterval <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(idlePumpInterval),
"The idle pump interval must be greater than zero.");
}
this.idlePumpInterval = idlePumpInterval;
lastActivityUtcTicks = DateTimeOffset.UtcNow.UtcTicks;
staThread = new Thread(ThreadMain)
{
IsBackground = true,
Name = "MxGateway.Worker.STA"
};
staThread.SetApartmentState(ApartmentState.STA);
}
public int? StaThreadId { get; private set; }
public DateTimeOffset LastActivityUtc =>
new(new DateTime(Volatile.Read(ref lastActivityUtcTicks), DateTimeKind.Utc));
public bool IsRunning => startedEvent.IsSet && !stoppedEvent.IsSet;
public void Start()
{
ThrowIfDisposed();
lock (gate)
{
if (shutdownRequested)
{
throw new InvalidOperationException("The worker STA runtime is shutting down.");
}
if (!startRequested)
{
startRequested = true;
staThread.Start();
}
}
startedEvent.Wait();
if (startupException is not null)
{
throw new InvalidOperationException(
"The worker STA runtime failed to initialize.",
startupException);
}
}
public Task InvokeAsync(Action command, CancellationToken cancellationToken = default)
{
if (command is null)
{
throw new ArgumentNullException(nameof(command));
}
return InvokeAsync(
() =>
{
command();
return true;
},
cancellationToken);
}
public Task<T> InvokeAsync<T>(Func<T> command, CancellationToken cancellationToken = default)
{
if (command is null)
{
throw new ArgumentNullException(nameof(command));
}
ThrowIfDisposed();
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<T>(cancellationToken);
}
StaWorkItem<T> workItem = new(command, cancellationToken);
lock (gate)
{
if (shutdownRequested)
{
return Task.FromException<T>(
new InvalidOperationException("The worker STA runtime is shutting down."));
}
commandQueue.Enqueue(workItem);
}
commandWakeEvent.Set();
return workItem.Task;
}
public bool Shutdown(TimeSpan timeout)
{
if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
{
throw new ArgumentOutOfRangeException(nameof(timeout));
}
lock (gate)
{
shutdownRequested = true;
}
commandWakeEvent.Set();
if (!startedEvent.IsSet && !staThread.IsAlive)
{
CancelQueuedCommands();
stoppedEvent.Set();
return true;
}
bool stopped = stoppedEvent.Wait(timeout);
if (stopped)
{
CancelQueuedCommands();
}
return stopped;
}
public void Dispose()
{
if (disposed)
{
return;
}
bool stopped = Shutdown(TimeSpan.FromSeconds(5));
if (stopped)
{
commandWakeEvent.Dispose();
startedEvent.Dispose();
stoppedEvent.Dispose();
}
disposed = true;
}
private void ThreadMain()
{
try
{
StaThreadId = Thread.CurrentThread.ManagedThreadId;
comApartmentInitializer.Initialize();
comInitialized = true;
MarkActivity();
startedEvent.Set();
while (!IsShutdownRequested())
{
ProcessQueuedCommands();
messagePump.WaitForWorkOrMessages(commandWakeEvent, idlePumpInterval);
messagePump.PumpPendingMessages();
MarkActivity();
}
ProcessQueuedCommands();
}
catch (Exception exception)
{
startupException = exception;
startedEvent.Set();
}
finally
{
CancelQueuedCommands();
try
{
if (comInitialized)
{
comApartmentInitializer.Uninitialize();
}
}
finally
{
MarkActivity();
stoppedEvent.Set();
}
}
}
private void ProcessQueuedCommands()
{
while (commandQueue.TryDequeue(out IStaWorkItem? workItem))
{
MarkActivity();
workItem.Execute();
MarkActivity();
}
}
private void CancelQueuedCommands()
{
while (commandQueue.TryDequeue(out IStaWorkItem? workItem))
{
workItem.CancelBeforeExecution();
}
}
private bool IsShutdownRequested()
{
lock (gate)
{
return shutdownRequested;
}
}
private void MarkActivity()
{
Volatile.Write(ref lastActivityUtcTicks, DateTimeOffset.UtcNow.UtcTicks);
}
private void ThrowIfDisposed()
{
if (disposed)
{
throw new ObjectDisposedException(nameof(StaRuntime));
}
}
}