75ffa09b8f
Adds InstanceActor one-shot waiter registry (fast-path + change-match + scheduled timeout self-eviction), threads per-script timeout token through ScriptRuntimeContext, and exposes Attributes.WaitAsync(value|predicate, timeout). Replaces handshake busy-poll. Implements spec docs/plans/2026-06-17-waitfor-attribute-change-helper-spec.md §3-§5; §6 routed variant + WaitForAsync + quality-only mode deferred.
306 lines
17 KiB
C#
306 lines
17 KiB
C#
using Akka.Actor;
|
|
using Microsoft.CodeAnalysis.Scripting;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.ScriptExecution;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
|
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
|
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
|
|
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts;
|
|
using ZB.MOM.WW.ScadaBridge.StoreAndForward;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors;
|
|
|
|
/// <summary>
|
|
/// WP-15: Script Execution Actor -- short-lived child of Script Actor.
|
|
/// Receives compiled code, params, Instance Actor ref, and call depth.
|
|
/// Executes the script via Script Runtime API, returns result, then stops.
|
|
///
|
|
/// The actor itself and its mailbox run on the default Akka dispatcher; only the
|
|
/// script body is dispatched off the actor thread, onto the dedicated
|
|
/// <see cref="ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts.ScriptExecutionScheduler"/>
|
|
/// (SiteRuntime-009), so blocking script I/O cannot starve the shared thread pool
|
|
/// or stall other Akka dispatchers.
|
|
///
|
|
/// WP-32: Script failures are logged but do not disable the script.
|
|
/// Supervision: Stop on unhandled exception (parent ScriptActor decides).
|
|
/// </summary>
|
|
public class ScriptExecutionActor : ReceiveActor
|
|
{
|
|
/// <summary>
|
|
/// Initializes the actor and immediately begins script execution on construction.
|
|
/// </summary>
|
|
/// <param name="scriptName">Name of the script being executed.</param>
|
|
/// <param name="instanceName">Name of the instance that owns the script.</param>
|
|
/// <param name="compiledScript">Compiled Roslyn script to execute.</param>
|
|
/// <param name="parameters">Optional named parameter values for the script.</param>
|
|
/// <param name="callDepth">Current call-nesting depth (used to enforce the max-depth limit).</param>
|
|
/// <param name="instanceActor">Parent instance actor reference for attribute access.</param>
|
|
/// <param name="sharedScriptLibrary">Library of shared scripts available during execution.</param>
|
|
/// <param name="options">Site runtime options applied during execution.</param>
|
|
/// <param name="replyTo">Actor reference that receives the script result.</param>
|
|
/// <param name="correlationId">Application-level correlation id threaded through the execution.</param>
|
|
/// <param name="logger">Logger for script execution events.</param>
|
|
/// <param name="scope">Script scope controlling which APIs are available.</param>
|
|
/// <param name="healthCollector">Optional health collector for recording execution metrics.</param>
|
|
/// <param name="serviceProvider">Optional DI service provider for script execution services.</param>
|
|
/// <param name="parentExecutionId">ExecutionId of the spawning inbound-API execution for audit correlation; null for normal runs.</param>
|
|
/// <param name="executionTimeoutSeconds">M2.5 (#9): per-script execution timeout in seconds. Null or non-positive falls back to the global <see cref="SiteRuntimeOptions.ScriptExecutionTimeoutSeconds"/>.</param>
|
|
public ScriptExecutionActor(
|
|
string scriptName,
|
|
string instanceName,
|
|
Script<object?> compiledScript,
|
|
IReadOnlyDictionary<string, object?>? parameters,
|
|
int callDepth,
|
|
IActorRef instanceActor,
|
|
SharedScriptLibrary sharedScriptLibrary,
|
|
SiteRuntimeOptions options,
|
|
IActorRef replyTo,
|
|
string correlationId,
|
|
ILogger logger,
|
|
Commons.Types.Scripts.ScriptScope scope,
|
|
ISiteHealthCollector? healthCollector = null,
|
|
IServiceProvider? serviceProvider = null,
|
|
// Audit Log #23 (ParentExecutionId): the spawning execution's
|
|
// ExecutionId for an inbound-API-routed call. Null for normal
|
|
// (tag-change / timer) runs and nested Script.Call invocations.
|
|
Guid? parentExecutionId = null,
|
|
// M2.5 (#9): per-script execution timeout override (seconds). Null or
|
|
// non-positive falls back to the global ScriptExecutionTimeoutSeconds.
|
|
int? executionTimeoutSeconds = null)
|
|
{
|
|
// Immediately begin execution
|
|
var self = Self;
|
|
var parent = Context.Parent;
|
|
|
|
ExecuteScript(
|
|
scriptName, instanceName, compiledScript, parameters, callDepth,
|
|
instanceActor, sharedScriptLibrary, options, replyTo, correlationId,
|
|
self, parent, logger, scope, healthCollector, serviceProvider,
|
|
parentExecutionId, executionTimeoutSeconds);
|
|
}
|
|
|
|
private static void ExecuteScript(
|
|
string scriptName,
|
|
string instanceName,
|
|
Script<object?> compiledScript,
|
|
IReadOnlyDictionary<string, object?>? parameters,
|
|
int callDepth,
|
|
IActorRef instanceActor,
|
|
SharedScriptLibrary sharedScriptLibrary,
|
|
SiteRuntimeOptions options,
|
|
IActorRef replyTo,
|
|
string correlationId,
|
|
IActorRef self,
|
|
IActorRef parent,
|
|
ILogger logger,
|
|
Commons.Types.Scripts.ScriptScope scope,
|
|
ISiteHealthCollector? healthCollector,
|
|
IServiceProvider? serviceProvider,
|
|
Guid? parentExecutionId,
|
|
int? executionTimeoutSeconds)
|
|
{
|
|
// M2.5 (#9): per-script timeout overrides the global default. A null or
|
|
// non-positive per-script value (≤ 0) falls back to the global.
|
|
var timeout = TimeSpan.FromSeconds(
|
|
executionTimeoutSeconds is { } perScript && perScript > 0
|
|
? perScript
|
|
: options.ScriptExecutionTimeoutSeconds);
|
|
|
|
// SiteRuntime-009: run the script body on the dedicated script-execution
|
|
// scheduler, not the shared .NET thread pool, so blocking script I/O cannot
|
|
// starve the global pool and stall Akka dispatchers / HTTP handling.
|
|
var scheduler = ScriptExecutionScheduler.Shared(options);
|
|
|
|
// Notification Outbox: the site communication actor that Notify.Status queries
|
|
// central through. Resolved by actor path so the Notify helper does not need an
|
|
// IActorRef threaded all the way down from the host wiring.
|
|
var siteCommunicationActor = Context.System.ActorSelection("/user/site-communication");
|
|
|
|
// CTS must be created inside the async lambda so it outlives this method
|
|
_ = Task.Factory.StartNew(async () =>
|
|
{
|
|
IServiceScope? serviceScope = null;
|
|
// ISiteEventLogger is a singleton; resolve from the root provider so
|
|
// it is available to the catch blocks regardless of scope state.
|
|
var siteEventLogger = serviceProvider?.GetService<ISiteEventLogger>();
|
|
using var cts = new CancellationTokenSource(timeout);
|
|
try
|
|
{
|
|
// Resolve integration services from DI (scoped lifetime)
|
|
IExternalSystemClient? externalSystemClient = null;
|
|
IDatabaseGateway? databaseGateway = null;
|
|
// Notification Outbox: the S&F engine is a singleton; the site identity
|
|
// provider supplies the site id stamped on enqueued notifications.
|
|
StoreAndForwardService? storeAndForward = null;
|
|
var siteId = string.Empty;
|
|
// Audit Log #23 (M2 Bundle F): the writer is a singleton (FallbackAuditWriter
|
|
// composes the SQLite hot-path + drop-oldest ring); null in tests / hosts
|
|
// that haven't called AddAuditLog, which the helper handles as a no-op.
|
|
IAuditWriter? auditWriter = null;
|
|
// Audit Log #23 (M3 Bundle A — Task A3): site-local tracking store
|
|
// backing Tracking.Status(id). Singleton; null in tests / hosts
|
|
// that haven't wired the store, which the helper handles by
|
|
// throwing on access.
|
|
IOperationTrackingStore? operationTrackingStore = null;
|
|
// Audit Log #23 (M3 Bundle F — Task F1): site-side cached-call
|
|
// telemetry forwarder. Singleton bound to the AuditLog
|
|
// composition root; null in tests / hosts that haven't called
|
|
// AddAuditLog, in which case the cached-call helpers degrade
|
|
// to the no-emission path (the underlying S&F handoff still
|
|
// happens and a TrackedOperationId is still returned).
|
|
ICachedCallTelemetryForwarder? cachedForwarder = null;
|
|
// SourceNode-stamping (Tasks 13/14): the local node name
|
|
// resolved from INodeIdentityProvider — node-a/node-b on site
|
|
// hosts. Null in tests / hosts that haven't registered the
|
|
// provider, in which case NotificationSubmit.SourceNode and
|
|
// SiteCallOperational.SourceNode stay null and central
|
|
// persists the rows with SourceNode NULL.
|
|
string? sourceNode = null;
|
|
|
|
if (serviceProvider != null)
|
|
{
|
|
serviceScope = serviceProvider.CreateScope();
|
|
externalSystemClient = serviceScope.ServiceProvider.GetService<IExternalSystemClient>();
|
|
databaseGateway = serviceScope.ServiceProvider.GetService<IDatabaseGateway>();
|
|
storeAndForward = serviceScope.ServiceProvider.GetService<StoreAndForwardService>();
|
|
siteId = serviceScope.ServiceProvider.GetService<ISiteIdentityProvider>()?.SiteId
|
|
?? string.Empty;
|
|
auditWriter = serviceScope.ServiceProvider.GetService<IAuditWriter>();
|
|
operationTrackingStore = serviceScope.ServiceProvider.GetService<IOperationTrackingStore>();
|
|
cachedForwarder = serviceScope.ServiceProvider.GetService<ICachedCallTelemetryForwarder>();
|
|
sourceNode = serviceScope.ServiceProvider.GetService<INodeIdentityProvider>()?.NodeName;
|
|
}
|
|
|
|
var context = new ScriptRuntimeContext(
|
|
instanceActor,
|
|
self,
|
|
sharedScriptLibrary,
|
|
callDepth,
|
|
options.MaxScriptCallDepth,
|
|
timeout,
|
|
instanceName,
|
|
logger,
|
|
externalSystemClient,
|
|
databaseGateway,
|
|
storeAndForward,
|
|
siteCommunicationActor,
|
|
siteId,
|
|
// Notification Outbox (FU3): stamp the executing script onto outbound
|
|
// notifications using the Site Event Logging "Source" convention.
|
|
sourceScript: $"ScriptActor:{scriptName}",
|
|
// Audit Log #23 (M2 Bundle F): emit one ApiOutbound/ApiCall row per
|
|
// ExternalSystem.Call. Writer is best-effort; failures are logged
|
|
// and swallowed inside the helper so the script's call path is
|
|
// never aborted by an audit failure.
|
|
auditWriter: auditWriter,
|
|
// Audit Log #23 (M3 Bundle A — Task A3): site-local tracking store
|
|
// backing Tracking.Status(id). Authoritative source of truth for
|
|
// cached-call status — read directly by the script API.
|
|
operationTrackingStore: operationTrackingStore,
|
|
// Audit Log #23 (M3 Bundle F — Task F1): cached-call telemetry
|
|
// forwarder for ExternalSystem.CachedCall / Database.CachedWrite
|
|
// CachedSubmit emission + the immediate-success terminal-row
|
|
// emission. Best-effort: null degrades the helpers to a
|
|
// no-emission path; the S&F handoff and TrackedOperationId
|
|
// return are unaffected.
|
|
cachedForwarder: cachedForwarder,
|
|
// Audit Log #23 (ParentExecutionId): the spawning execution's
|
|
// id for an inbound-API-routed call. The routed script still
|
|
// mints its own fresh ExecutionId — this records the spawner.
|
|
// Null for normal (tag-change / timer) runs.
|
|
parentExecutionId: parentExecutionId,
|
|
// SourceNode-stamping (Tasks 13/14): the local node name
|
|
// (node-a/node-b on a site) — threaded down so Notify.Send
|
|
// and the four cached-call telemetry constructors can stamp
|
|
// it onto NotificationSubmit.SourceNode and
|
|
// SiteCallOperational.SourceNode respectively.
|
|
sourceNode: sourceNode,
|
|
// M2.12 (#25): thread the singleton site event logger so
|
|
// recursion-limit violations at CallScript/CallShared emit a
|
|
// script Error site event in addition to ILogger.LogError.
|
|
siteEventLogger: siteEventLogger,
|
|
// WaitForAttribute (spec §4.3/§4.4): thread the per-script
|
|
// execution-timeout token so Attributes.WaitAsync's Ask is
|
|
// bounded by the script's own ExecutionTimeoutSeconds — a
|
|
// shorter script deadline wins over the wait's own timeout.
|
|
scriptTimeoutToken: cts.Token);
|
|
|
|
var globals = new ScriptGlobals
|
|
{
|
|
Instance = context,
|
|
Parameters = new ScriptParameters(parameters ?? new Dictionary<string, object?>()),
|
|
CancellationToken = cts.Token,
|
|
Scope = scope
|
|
};
|
|
|
|
// M1.8: operational `script` event — execution started. Fire-and-forget
|
|
// (the `_ =` discards the task) so the event log can never block or
|
|
// fault the script's own run; mirrors the existing Error-path emit.
|
|
_ = siteEventLogger?.LogEventAsync(
|
|
"script", "Info", instanceName, $"ScriptActor:{scriptName}",
|
|
$"Script '{scriptName}' on instance '{instanceName}' started");
|
|
|
|
var state = await compiledScript.RunAsync(globals, cts.Token);
|
|
|
|
// Send result to requester if this was an Ask-based call
|
|
if (!replyTo.IsNobody())
|
|
{
|
|
replyTo.Tell(new ScriptCallResult(correlationId, true, state.ReturnValue, null));
|
|
}
|
|
|
|
// M1.8: operational `script` event — execution completed successfully.
|
|
_ = siteEventLogger?.LogEventAsync(
|
|
"script", "Info", instanceName, $"ScriptActor:{scriptName}",
|
|
$"Script '{scriptName}' on instance '{instanceName}' completed");
|
|
|
|
// Notify parent of completion
|
|
parent.Tell(new ScriptActor.ScriptExecutionCompleted(scriptName, true, null));
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
healthCollector?.IncrementScriptError();
|
|
var errorMsg = $"Script '{scriptName}' on instance '{instanceName}' timed out after {timeout.TotalSeconds}s";
|
|
logger.LogWarning(errorMsg);
|
|
|
|
// WP-32: Failures recorded to site event log; script NOT disabled after failure.
|
|
_ = siteEventLogger?.LogEventAsync(
|
|
"script", "Error", instanceName, $"ScriptActor:{scriptName}", errorMsg);
|
|
|
|
if (!replyTo.IsNobody())
|
|
{
|
|
replyTo.Tell(new ScriptCallResult(correlationId, false, null, errorMsg));
|
|
}
|
|
|
|
parent.Tell(new ScriptActor.ScriptExecutionCompleted(scriptName, false, errorMsg));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
healthCollector?.IncrementScriptError();
|
|
// WP-32: Failures recorded to site event log; script NOT disabled after failure.
|
|
var errorMsg = $"Script '{scriptName}' on instance '{instanceName}' failed: {ex.Message}";
|
|
logger.LogError(ex, "Script execution failed: {Script} on {Instance}", scriptName, instanceName);
|
|
|
|
_ = siteEventLogger?.LogEventAsync(
|
|
"script", "Error", instanceName, $"ScriptActor:{scriptName}", errorMsg, ex.ToString());
|
|
|
|
if (!replyTo.IsNobody())
|
|
{
|
|
replyTo.Tell(new ScriptCallResult(correlationId, false, null, errorMsg));
|
|
}
|
|
|
|
parent.Tell(new ScriptActor.ScriptExecutionCompleted(scriptName, false, errorMsg));
|
|
}
|
|
finally
|
|
{
|
|
// Dispose the DI scope (and scoped services) after script execution completes
|
|
serviceScope?.Dispose();
|
|
// Stop self after execution completes
|
|
self.Tell(PoisonPill.Instance);
|
|
}
|
|
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler).Unwrap();
|
|
}
|
|
}
|