108 lines
3.6 KiB
C#
108 lines
3.6 KiB
C#
using System.Collections.Concurrent;
|
|
|
|
namespace ScadaLink.SiteRuntime.Scripts;
|
|
|
|
/// <summary>
|
|
/// SiteRuntime-009: a dedicated, bounded <see cref="TaskScheduler"/> for running script
|
|
/// and alarm on-trigger bodies.
|
|
///
|
|
/// Script bodies may perform synchronous blocking I/O (a database connection, a
|
|
/// synchronous external-system call). Running them on the shared .NET
|
|
/// <see cref="ThreadPool"/> lets a burst of blocking scripts starve the pool and stall
|
|
/// unrelated Akka dispatchers and HTTP request handling. This scheduler owns a fixed set
|
|
/// of dedicated threads, so script blocking is contained to those threads and cannot
|
|
/// exhaust the global pool.
|
|
///
|
|
/// The scheduler is process-wide (one set of threads for all instances) and is sized
|
|
/// from <see cref="SiteRuntimeOptions"/> the first time it is configured.
|
|
/// </summary>
|
|
public sealed class ScriptExecutionScheduler : TaskScheduler, IDisposable
|
|
{
|
|
private readonly BlockingCollection<Task> _queue = new();
|
|
private readonly List<Thread> _threads;
|
|
private int _disposed;
|
|
|
|
private static volatile ScriptExecutionScheduler? _shared;
|
|
private static readonly object SharedLock = new();
|
|
|
|
/// <summary>
|
|
/// The process-wide script-execution scheduler. Lazily created on first use with the
|
|
/// thread count from <see cref="SiteRuntimeOptions.ScriptExecutionThreadCount"/>; the
|
|
/// first caller wins, subsequent calls reuse the existing instance.
|
|
/// </summary>
|
|
public static ScriptExecutionScheduler Shared(SiteRuntimeOptions options)
|
|
{
|
|
if (_shared != null)
|
|
return _shared;
|
|
|
|
lock (SharedLock)
|
|
{
|
|
return _shared ??= new ScriptExecutionScheduler(options.ScriptExecutionThreadCount);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a scheduler backed by <paramref name="threadCount"/> dedicated threads.
|
|
/// </summary>
|
|
public ScriptExecutionScheduler(int threadCount)
|
|
{
|
|
if (threadCount < 1)
|
|
threadCount = 1;
|
|
|
|
_threads = new List<Thread>(threadCount);
|
|
for (var i = 0; i < threadCount; i++)
|
|
{
|
|
var thread = new Thread(WorkerLoop)
|
|
{
|
|
IsBackground = true,
|
|
Name = $"script-execution-{i}"
|
|
};
|
|
_threads.Add(thread);
|
|
thread.Start();
|
|
}
|
|
}
|
|
|
|
/// <summary>The number of dedicated worker threads.</summary>
|
|
public override int MaximumConcurrencyLevel => _threads.Count;
|
|
|
|
private void WorkerLoop()
|
|
{
|
|
try
|
|
{
|
|
foreach (var task in _queue.GetConsumingEnumerable())
|
|
{
|
|
TryExecuteTask(task);
|
|
}
|
|
}
|
|
catch (ObjectDisposedException)
|
|
{
|
|
// Scheduler disposed — worker exits.
|
|
}
|
|
}
|
|
|
|
protected override void QueueTask(Task task) => _queue.Add(task);
|
|
|
|
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
|
|
{
|
|
// Only inline if we are already on one of this scheduler's worker threads,
|
|
// so script work never escapes onto a thread-pool thread.
|
|
if (Thread.CurrentThread.Name?.StartsWith("script-execution-", StringComparison.Ordinal) != true)
|
|
return false;
|
|
|
|
return TryExecuteTask(task);
|
|
}
|
|
|
|
protected override IEnumerable<Task> GetScheduledTasks() => _queue.ToArray();
|
|
|
|
public void Dispose()
|
|
{
|
|
if (Interlocked.Exchange(ref _disposed, 1) != 0)
|
|
return;
|
|
|
|
_queue.CompleteAdding();
|
|
foreach (var thread in _threads)
|
|
thread.Join(TimeSpan.FromSeconds(5));
|
|
_queue.Dispose();
|
|
}
|
|
}
|