Files
ScadaBridge/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs
T
Joseph Doherty 75ffa09b8f feat(siteruntime): event-driven Attributes.WaitAsync attribute-change helper
Adds InstanceActor one-shot waiter registry (fast-path + change-match + scheduled
timeout self-eviction), threads per-script timeout token through ScriptRuntimeContext,
and exposes Attributes.WaitAsync(value|predicate, timeout). Replaces handshake busy-poll.
Implements spec docs/plans/2026-06-17-waitfor-attribute-change-helper-spec.md §3-§5;
§6 routed variant + WaitForAsync + quality-only mode deferred.
2026-06-17 08:25:06 -04:00

2279 lines
111 KiB
C#

using System.Diagnostics;
using System.Text.Json;
using System.Text.RegularExpressions;
using Akka.Actor;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Instance;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Notification;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.ScriptExecution;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using AuditEvent = ZB.MOM.WW.Audit.AuditEvent;
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
using ZB.MOM.WW.ScadaBridge.StoreAndForward;
namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts;
/// <summary>
/// WP-18: Script Runtime API — injected into Script/Alarm Execution Actors.
/// Provides the API surface that user scripts interact with:
/// Instance.GetAttribute("name")
/// Instance.SetAttribute("name", value)
/// Instance.CallScript("scriptName", params)
/// Scripts.CallShared("scriptName", params)
///
/// WP-13 (Phase 7): Integration surface APIs:
/// ExternalSystem.Call("systemName", "methodName", params)
/// ExternalSystem.CachedCall("systemName", "methodName", params)
/// Database.Connection("name")
/// Database.CachedWrite("name", "sql", params)
/// Notify.To("listName").Send("subject", "message")
///
/// WP-20: Recursion Limit — call depth tracked and enforced.
/// </summary>
public class ScriptRuntimeContext
{
private readonly IActorRef _instanceActor;
private readonly IActorRef _self;
private readonly SharedScriptLibrary _sharedScriptLibrary;
private readonly int _currentCallDepth;
private readonly int _maxCallDepth;
private readonly TimeSpan _askTimeout;
private readonly ILogger _logger;
private readonly string _instanceName;
/// <summary>
/// WaitForAttribute (spec §4.3): the per-script execution-timeout token from
/// the owning <c>ScriptExecutionActor</c>/<c>AlarmExecutionActor</c>
/// (<c>cts.Token</c>). Bounds the <c>Attributes.WaitAsync</c> Ask so a script
/// that hits its own <c>ExecutionTimeoutSeconds</c> abandons the wait. Defaults
/// to <see cref="CancellationToken.None"/> for contexts that do not thread one
/// (legacy callers / tests / the alarm path when it has no CTS).
/// </summary>
private readonly CancellationToken _scriptTimeoutToken;
/// <summary>
/// WP-13: External system client for ExternalSystem.Call/CachedCall.
/// </summary>
private readonly IExternalSystemClient? _externalSystemClient;
/// <summary>
/// WP-13: Database gateway for Database.Connection/CachedWrite.
/// </summary>
private readonly IDatabaseGateway? _databaseGateway;
/// <summary>
/// Notification Outbox: the site Store-and-Forward Engine that <c>Notify.Send</c>
/// enqueues notifications into. The S&amp;F engine forwards them to central.
/// </summary>
private readonly StoreAndForwardService? _storeAndForward;
/// <summary>
/// Notification Outbox: the site communication actor that <c>Notify.Status</c>
/// queries central through (via the ClusterClient command/control transport).
/// </summary>
private readonly ICanTell? _siteCommunicationActor;
/// <summary>
/// Notification Outbox: this site's identifier, stamped on enqueued notifications.
/// </summary>
private readonly string _siteId;
/// <summary>
/// SourceNode-stamping (Task 13/14): the cluster node name supplied by
/// <c>INodeIdentityProvider</c> on the local host — <c>node-a</c>/<c>node-b</c>
/// for site nodes. Stamped onto <c>NotificationSubmit.SourceNode</c> by
/// <see cref="NotifyTarget.Send"/> and onto <c>SiteCallOperational.SourceNode</c>
/// by the four <see cref="ExternalSystemHelper"/> / <see cref="DatabaseHelper"/>
/// cached-call telemetry construction sites so central can persist it on the
/// <c>Notifications</c> / <c>SiteCalls</c> rows. Null when no provider is
/// wired (legacy hosts / tests) — the helper construction sites pass null
/// through verbatim, leaving the central row's SourceNode as NULL too.
/// </summary>
private readonly string? _sourceNode;
/// <summary>
/// Notification Outbox (FU3): identifier of the script currently executing in this
/// context — stamped onto <c>NotificationSubmit.SourceScript</c> for the central
/// audit trail. Uses the Site Event Logging "Source" convention
/// (<c>"ScriptActor:&lt;scriptName&gt;"</c>). Null when no single script owns the
/// context (e.g. alarm on-trigger paths that do not wire the Notify outbox).
/// </summary>
private readonly string? _sourceScript;
/// <summary>
/// M2.12 (#25): site event logger for recording recursion-limit violations
/// to the local SQLite event log. Optional — when null the emission is
/// skipped; the existing <c>_logger.LogError</c> + throw path is unchanged.
/// </summary>
private readonly ISiteEventLogger? _siteEventLogger;
/// <summary>
/// Audit Log #23: best-effort emitter for boundary-crossing actions executed
/// by the script. Optional — when null the helpers degrade to a no-op audit
/// path so tests / contexts that do not need the audit pipeline still work.
/// </summary>
private readonly IAuditWriter? _auditWriter;
/// <summary>
/// Audit Log #23 (M3): site-local tracking store consulted by
/// <c>Tracking.Status(TrackedOperationId)</c>. Optional — when null the
/// helper throws on access, mirroring the existing
/// "service-not-wired" behaviour of the other integration helpers.
/// </summary>
private readonly IOperationTrackingStore? _operationTrackingStore;
/// <summary>
/// Audit Log #23 (M3 Bundle E — Task E3): site-side dual emitter for
/// cached-call lifecycle telemetry. Optional — when null
/// <c>ExternalSystem.CachedCall</c> / <c>Database.CachedWrite</c> still
/// return a <see cref="TrackedOperationId"/> and invoke the underlying
/// store-and-forward path, but no audit / SiteCalls telemetry is emitted
/// (tests / minimal hosts that don't wire the audit pipeline).
/// </summary>
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
/// <summary>
/// Audit Log #23: the per-execution id for this script run. Every
/// trust-boundary audit row emitted by this script execution
/// (sync <c>ApiCall</c>/<c>DbWrite</c>, cached-call lifecycle rows,
/// <c>NotifySend</c>) is stamped into <c>AuditEvent.ExecutionId</c> with
/// this value so all the rows from one script run can be correlated
/// together — independently of the per-operation
/// <c>AuditEvent.CorrelationId</c>.
/// </summary>
private readonly Guid _executionId;
/// <summary>
/// Audit Log #23 (ParentExecutionId): the spawning execution's
/// <see cref="_executionId"/> when this script run was spawned by another
/// execution — for an inbound-API-routed call this is the inbound request's
/// per-request execution id. <c>null</c> for normal (tag-change /
/// timer-triggered) runs and nested <c>CallScript</c> invocations. The
/// routed script still mints its OWN fresh <see cref="_executionId"/>; this
/// field records the spawner so a spawned execution's audit rows can point
/// back at the execution that spawned it. (Task 5 wires the emitter that
/// stamps this onto <c>AuditEvent.ParentExecutionId</c>.)
/// </summary>
private readonly Guid? _parentExecutionId;
/// <summary>
/// Initializes a new script runtime context with all dependencies for script
/// execution, external system calls, database access, and notification delivery.
/// </summary>
/// <param name="instanceActor">Reference to the Instance Actor managing this instance's state.</param>
/// <param name="self">Reference to the executing script actor.</param>
/// <param name="sharedScriptLibrary">Library containing shared scripts available to all instances.</param>
/// <param name="currentCallDepth">Current recursion depth of script calls.</param>
/// <param name="maxCallDepth">Maximum allowed recursion depth before an error is thrown.</param>
/// <param name="askTimeout">Timeout for Ask pattern messages to the Instance Actor.</param>
/// <param name="instanceName">Unique name of the instance being executed.</param>
/// <param name="logger">Logger for diagnostics and warnings.</param>
/// <param name="externalSystemClient">Optional client for external system API calls.</param>
/// <param name="databaseGateway">Optional gateway for database connection and cached write access.</param>
/// <param name="storeAndForward">Optional store-and-forward service for notification delivery.</param>
/// <param name="siteCommunicationActor">Optional actor for site-to-central communication (ClusterClient).</param>
/// <param name="siteId">Identifier of the site where this instance is running.</param>
/// <param name="sourceScript">Optional name of the source script for audit trail identification.</param>
/// <param name="auditWriter">Optional writer for audit log entries.</param>
/// <param name="operationTrackingStore">Optional store for tracking cached operation status.</param>
/// <param name="cachedForwarder">Optional forwarder for cached call telemetry.</param>
/// <param name="executionId">
/// Audit Log #23: the per-execution id for this script run. When omitted
/// (tag-change / timer-triggered executions) a fresh id is generated; an
/// inbound caller may supply one to tie the execution to an upstream
/// request. Stamped into <c>AuditEvent.ExecutionId</c> on every
/// trust-boundary audit row this execution emits.
/// </param>
/// <param name="parentExecutionId">
/// Audit Log #23 (ParentExecutionId): the spawning execution's
/// <c>ExecutionId</c> — supplied for an inbound-API-routed call (the
/// inbound request's per-request id), <c>null</c> for normal (tag-change /
/// timer-triggered) runs. The routed script still generates its own fresh
/// <paramref name="executionId"/>; this only records the spawner.
/// </param>
/// <param name="sourceNode">Optional cluster node identifier (node-a/node-b) for audit trail stamping.</param>
/// <param name="siteEventLogger">
/// M2.12 (#25): optional site event logger. When supplied, recursion-limit
/// violations at <c>CallScript</c> and <c>CallShared</c> emit a
/// <c>script</c> Error event in addition to the existing
/// <c>ILogger.LogError</c> + throw. When null the existing behaviour is
/// unchanged; all existing callers and tests remain source-compatible.
/// </param>
/// <param name="scriptTimeoutToken">
/// WaitForAttribute (spec §4.3): the per-script execution-timeout token
/// (<c>cts.Token</c> on the owning execution actor) used to bound
/// <c>Attributes.WaitAsync</c>. Defaults to
/// <see cref="CancellationToken.None"/> for callers / tests that do not
/// thread one — those waits are bounded only by their own timeout.
/// </param>
public ScriptRuntimeContext(
IActorRef instanceActor,
IActorRef self,
SharedScriptLibrary sharedScriptLibrary,
int currentCallDepth,
int maxCallDepth,
TimeSpan askTimeout,
string instanceName,
ILogger logger,
IExternalSystemClient? externalSystemClient = null,
IDatabaseGateway? databaseGateway = null,
StoreAndForwardService? storeAndForward = null,
ICanTell? siteCommunicationActor = null,
string siteId = "",
string? sourceScript = null,
IAuditWriter? auditWriter = null,
IOperationTrackingStore? operationTrackingStore = null,
ICachedCallTelemetryForwarder? cachedForwarder = null,
Guid? executionId = null,
Guid? parentExecutionId = null,
string? sourceNode = null,
ISiteEventLogger? siteEventLogger = null,
CancellationToken scriptTimeoutToken = default)
{
_instanceActor = instanceActor;
_self = self;
_sharedScriptLibrary = sharedScriptLibrary;
_currentCallDepth = currentCallDepth;
_maxCallDepth = maxCallDepth;
_askTimeout = askTimeout;
_instanceName = instanceName;
_logger = logger;
_externalSystemClient = externalSystemClient;
_databaseGateway = databaseGateway;
_storeAndForward = storeAndForward;
_siteCommunicationActor = siteCommunicationActor;
_siteId = siteId;
_sourceScript = sourceScript;
_auditWriter = auditWriter;
_operationTrackingStore = operationTrackingStore;
_cachedForwarder = cachedForwarder;
// SourceNode-stamping (Task 13/14): the local node name read from
// INodeIdentityProvider at the ScriptExecutionActor; null when no
// provider was wired so the downstream callsites pass null through
// verbatim — leaving central SourceNode as NULL.
_sourceNode = sourceNode;
_executionId = executionId ?? Guid.NewGuid();
// Audit Log #23 (ParentExecutionId): stored verbatim — no `?? NewGuid()`
// fallback. A non-routed run legitimately has no parent and stays null.
_parentExecutionId = parentExecutionId;
// M2.12 (#25): optional — null when not wired (tests / AlarmExecutionActor).
_siteEventLogger = siteEventLogger;
// WaitForAttribute (spec §4.3): default(CancellationToken) == None when
// not threaded in — the WaitAsync Ask is then bounded only by its own timeout.
_scriptTimeoutToken = scriptTimeoutToken;
}
/// <summary>
/// Audit Log #23 (M5.4): this run's own per-execution id. Exposed so a
/// nested <c>Scripts.CallShared</c> can record it as the spawned shared
/// script's <c>ParentExecutionId</c>, forming a true execution tree.
/// </summary>
internal Guid ExecutionId => _executionId;
/// <summary>
/// Audit Log #23 (M5.4): the spawning execution's id for this run (null for
/// a root run). Exposed for test assertions on the execution tree.
/// </summary>
internal Guid? ParentExecutionId => _parentExecutionId;
/// <summary>
/// Audit Log #23 (M5.4 — ParentExecutionId tag-cascade): builds a child
/// <see cref="ScriptRuntimeContext"/> for an inline <c>Scripts.CallShared</c>
/// invocation. The shared script runs inline (no actor hop) but is modelled
/// as its OWN execution node in the audit tree: it mints a fresh
/// <see cref="_executionId"/> and records THIS run's <see cref="_executionId"/>
/// as its <c>ParentExecutionId</c>, so <c>B → CallShared(C)</c> yields
/// <c>C.ParentExecutionId == B.ExecutionId</c>. Every other dependency
/// (actors, gateways, audit writer, site id, source node, call-depth) is
/// carried over verbatim from this context.
/// </summary>
/// <param name="childCallDepth">The recursion depth of the shared-script call.</param>
internal ScriptRuntimeContext CreateChildContextForSharedScript(int childCallDepth)
{
return new ScriptRuntimeContext(
_instanceActor,
_self,
_sharedScriptLibrary,
childCallDepth,
_maxCallDepth,
_askTimeout,
_instanceName,
_logger,
_externalSystemClient,
_databaseGateway,
_storeAndForward,
_siteCommunicationActor,
_siteId,
_sourceScript,
_auditWriter,
_operationTrackingStore,
_cachedForwarder,
// Fresh execution id for the shared-script run (omit so the ctor mints one)…
executionId: null,
// …parented to THIS run's execution id (the spawner).
parentExecutionId: _executionId,
sourceNode: _sourceNode,
siteEventLogger: _siteEventLogger,
// WaitForAttribute (spec §4.3): an inline shared-script call shares the
// parent run's execution-timeout token so a WaitAsync inside the shared
// script is bounded by the SAME script deadline.
scriptTimeoutToken: _scriptTimeoutToken);
}
/// <summary>
/// M2.12 (#25): fire-and-forget emission of a <c>script</c> Error site event
/// for a recursion-limit violation. Mirrors the call shape used by
/// <c>ScriptExecutionActor</c>'s catch blocks (WP-32 / M1.8). A fault from
/// the site-event logger is observed-and-dropped (best-effort) via
/// <c>ContinueWith(OnlyOnFaulted)</c> — it never blocks or faults the
/// <c>_logger.LogError</c> + throw path that follows. A null logger is a no-op.
/// </summary>
private void EmitRecursionLimitEventAsync(string msg)
{
if (_siteEventLogger == null)
return;
var source = string.IsNullOrEmpty(_instanceName)
? "recursion-guard"
: $"InstanceScript:{_instanceName}";
var logTask = _siteEventLogger.LogEventAsync("script", "Error", _instanceName, source, msg);
if (!logTask.IsCompleted)
{
logTask.ContinueWith(
t => _logger.LogWarning(t.Exception,
"Site event log write failed for recursion-limit violation on instance '{Instance}'",
_instanceName),
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
else if (logTask.IsFaulted)
{
_logger.LogWarning(logTask.Exception,
"Site event log write failed for recursion-limit violation on instance '{Instance}'",
_instanceName);
}
}
/// <summary>
/// Gets the current value of an attribute from the Instance Actor.
/// Uses Ask pattern (system boundary between script execution and instance state).
/// </summary>
/// <param name="attributeName">Name of the attribute to retrieve.</param>
/// <returns>The current attribute value, or null if not found.</returns>
public async Task<object?> GetAttribute(string attributeName)
{
var correlationId = Guid.NewGuid().ToString();
var request = new GetAttributeRequest(
correlationId, _instanceName, attributeName, DateTimeOffset.UtcNow);
var response = await _instanceActor.Ask<GetAttributeResponse>(request, _askTimeout);
if (!response.Found)
{
_logger.LogWarning(
"GetAttribute: attribute '{Attribute}' not found on instance '{Instance}'",
attributeName, _instanceName);
}
return response.Value;
}
/// <summary>
/// WaitForAttribute (spec §3-§5): waits event-driven for an attribute to reach
/// a value (encoded-equality), satisfy a site-local predicate, or change at all,
/// bounded by <paramref name="timeout"/>. Returns <c>true</c> if matched within
/// the timeout, <c>false</c> on timeout — NEVER throws on timeout. The backing
/// <c>Attributes.WaitAsync</c> for the accessor.
///
/// <para>
/// The Ask is bounded by the script's own execution-timeout token (§4.3): a
/// script that hits its <c>ExecutionTimeoutSeconds</c> abandons the wait. The
/// Ask timeout is the wait timeout plus a small <see cref="_askTimeout"/> slack
/// so the InstanceActor's own scheduled timeout reply is the authoritative path
/// for the false/timed-out outcome, not the Ask deadline.
/// </para>
/// </summary>
/// <param name="name">The scope-resolved attribute name to wait on.</param>
/// <param name="targetValueEncoded">
/// The codec-encoded target value; null (with null <paramref name="predicate"/>)
/// means "any change".
/// </param>
/// <param name="predicate">Site-local predicate; null when the encoded target is used.</param>
/// <param name="timeout">How long to wait before returning false.</param>
/// <returns><c>true</c> on match within the timeout; <c>false</c> on timeout.</returns>
public async Task<bool> WaitAttribute(
string name, string? targetValueEncoded, Func<object?, bool>? predicate, TimeSpan timeout)
{
var cid = Guid.NewGuid().ToString();
var req = new WaitForAttributeRequest(
cid, _instanceName, name, targetValueEncoded, predicate, timeout, DateTimeOffset.UtcNow);
var resp = await _instanceActor.Ask<WaitForAttributeResponse>(
req, timeout + _askTimeout, _scriptTimeoutToken);
return resp.Matched;
}
/// <summary>
/// Sets an attribute value. For data-connected attributes the Instance Actor
/// forwards the write to the DCL, which writes the physical device; the
/// in-memory value is not optimistically updated. For static attributes the
/// Instance Actor updates the in-memory value and persists the override to
/// SQLite. All mutations are serialized through the Instance Actor mailbox.
///
/// The write is awaited so that a device-write failure on a data-connected
/// attribute is surfaced synchronously to the calling script as an
/// <see cref="InvalidOperationException"/>.
/// </summary>
/// <param name="attributeName">Name of the attribute to set.</param>
/// <param name="value">String value to set for the attribute.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task SetAttribute(string attributeName, string value)
{
var correlationId = Guid.NewGuid().ToString();
var command = new SetStaticAttributeCommand(
correlationId, _instanceName, attributeName, value, DateTimeOffset.UtcNow);
// Ask — mutation serialized through the Instance Actor mailbox; the reply
// carries the device-write outcome for data-connected attributes.
var response = await _instanceActor.Ask<SetStaticAttributeResponse>(command, _askTimeout);
if (!response.Success)
{
throw new InvalidOperationException(
$"SetAttribute('{attributeName}') failed: {response.ErrorMessage}");
}
}
/// <summary>
/// Calls a sibling script on the same instance by name (Ask pattern).
/// WP-20: Enforces recursion limit.
/// WP-22: Uses Ask pattern for CallScript.
/// <paramref name="parameters"/> may be a dictionary or an anonymous object
/// (<c>new { name = "Bob" }</c>) — see <see cref="ScriptArgs"/>.
/// </summary>
/// <param name="scriptName">Name of the script to call.</param>
/// <param name="parameters">Optional parameters to pass to the script (dictionary or anonymous object).</param>
/// <returns>The return value from the called script.</returns>
public async Task<object?> CallScript(string scriptName, object? parameters = null)
{
var nextDepth = _currentCallDepth + 1;
if (nextDepth > _maxCallDepth)
{
var msg = $"Script call depth exceeded maximum of {_maxCallDepth}. " +
$"CallScript('{scriptName}') rejected at depth {nextDepth}.";
_logger.LogError(msg);
// M2.12 (#25): emit to site event log in addition to ILogger; fire-and-forget.
EmitRecursionLimitEventAsync(msg);
throw new InvalidOperationException(msg);
}
var correlationId = Guid.NewGuid().ToString();
var request = new ScriptCallRequest(
scriptName,
ScriptArgs.Normalize(parameters),
nextDepth,
correlationId,
// Audit Log #23 (M5.4 — ParentExecutionId tag-cascade): the child
// script run is a NEW execution spawned BY this run. Its parent is
// THIS run's own ExecutionId — NOT the inherited _parentExecutionId.
// So A → CallScript(B) yields B.ParentExecutionId == A.ExecutionId,
// building a true multi-level execution tree rather than flattening
// every nested call under the original inbound spawner.
ParentExecutionId: _executionId);
// Ask the Instance Actor, which routes to the appropriate Script Actor
var result = await _instanceActor.Ask<ScriptCallResult>(request, _askTimeout);
if (!result.Success)
{
throw new InvalidOperationException(
$"CallScript('{scriptName}') failed: {result.ErrorMessage}");
}
return result.ReturnValue;
}
/// <summary>
/// Provides access to shared script execution via the Scripts property.
/// </summary>
public ScriptCallHelper Scripts => new(_sharedScriptLibrary, this, _currentCallDepth, _maxCallDepth, _logger);
/// <summary>
/// WP-13: Provides access to external system calls.
/// ExternalSystem.Call("systemName", "methodName", params)
/// ExternalSystem.CachedCall("systemName", "methodName", params)
/// </summary>
public ExternalSystemHelper ExternalSystem => new(
_externalSystemClient, _instanceName, _logger, _executionId, _auditWriter, _siteId, _sourceScript,
// Audit Log #23 (M3 Bundle E — Task E3): emit CachedSubmit telemetry
// on every ExternalSystem.CachedCall enqueue.
_cachedForwarder,
// Audit Log #23 (ParentExecutionId): the spawning execution's id,
// threaded alongside _executionId. Null for non-routed runs.
_parentExecutionId,
// SourceNode-stamping (Task 14): the local node name (node-a/node-b),
// threaded so the cached-call telemetry construction sites can stamp
// it onto SiteCallOperational.SourceNode.
_sourceNode);
/// <summary>
/// WP-13: Provides access to database operations.
/// Database.Connection("name")
/// Database.CachedWrite("name", "sql", params)
/// </summary>
public DatabaseHelper Database => new(
_databaseGateway,
_instanceName,
_logger,
_executionId,
// Audit Log #23 (M4 Bundle A): wire the IAuditWriter so
// Database.Connection(name) returns an auditing decorator that
// emits one DbOutbound/DbWrite row per script-initiated
// Execute / ExecuteScalar / ExecuteReader.
_auditWriter,
_siteId,
_sourceScript,
// Audit Log #23 (M3 Bundle E — Task E6): emit CachedSubmit telemetry on
// every Database.CachedWrite enqueue.
_cachedForwarder,
// Audit Log #23 (ParentExecutionId): the spawning execution's id,
// threaded alongside _executionId. Null for non-routed runs.
_parentExecutionId,
// SourceNode-stamping (Task 14): the local node name (node-a/node-b),
// threaded so Database.CachedWrite's CachedSubmit telemetry can
// stamp it onto SiteCallOperational.SourceNode.
_sourceNode);
/// <summary>
/// Provides access to the Notification Outbox API.
/// <c>Notify.To("listName").Send("subject", "message")</c> enqueues a notification
/// for central delivery and returns its <c>NotificationId</c>;
/// <c>Notify.Status(id)</c> queries the delivery status of that notification.
/// </summary>
/// <remarks>
/// Audit Log #23 (M4 Bundle C): the <see cref="IAuditWriter"/> is threaded
/// through so <c>Notify.To(list).Send(...)</c> emits one
/// <c>Notification</c>/<c>NotifySend</c> audit row per accepted submission.
/// Best-effort per alog.md §7 — a thrown writer never aborts the script's
/// <c>Send</c>.
/// </remarks>
public NotifyHelper Notify => new(
_storeAndForward, _siteCommunicationActor, _siteId, _instanceName, _sourceScript, _askTimeout, _logger,
_executionId, _auditWriter,
// Audit Log #23 (ParentExecutionId): the spawning execution's id,
// threaded alongside _executionId. Null for non-routed runs.
_parentExecutionId,
// SourceNode-stamping (Task 13): the local node name (node-a/node-b),
// threaded so NotifyTarget.Send can stamp it onto NotificationSubmit.
_sourceNode);
/// <summary>
/// Audit Log #23 (M3): site-local tracking-status API for cached operations.
/// <c>Tracking.Status(trackedOperationId)</c> reads the site SQLite tracking row
/// directly (authoritative source of truth — no central round-trip) and
/// returns a <see cref="TrackingStatusSnapshot"/>, or <c>null</c> when the
/// id is unknown / has already been purged.
/// </summary>
public TrackingHelper Tracking => new(_operationTrackingStore, _logger);
/// <summary>
/// Helper class for Scripts.CallShared() syntax.
/// </summary>
/// <summary>
/// Helper class for calling shared scripts through the Scripts property.
/// </summary>
public class ScriptCallHelper
{
private readonly SharedScriptLibrary _library;
private readonly ScriptRuntimeContext _context;
private readonly int _currentCallDepth;
private readonly int _maxCallDepth;
private readonly ILogger _logger;
/// <summary>
/// Initializes a new shared script call helper.
/// </summary>
/// <param name="library">The shared script library containing available scripts.</param>
/// <param name="context">The runtime context of the calling script.</param>
/// <param name="currentCallDepth">Current recursion depth of script calls.</param>
/// <param name="maxCallDepth">Maximum allowed recursion depth.</param>
/// <param name="logger">Logger for diagnostics.</param>
internal ScriptCallHelper(
SharedScriptLibrary library,
ScriptRuntimeContext context,
int currentCallDepth,
int maxCallDepth,
ILogger logger)
{
_library = library;
_context = context;
_currentCallDepth = currentCallDepth;
_maxCallDepth = maxCallDepth;
_logger = logger;
}
/// <summary>
/// WP-17: Executes a shared script inline (direct method call, not actor message).
/// WP-20: Enforces recursion limit.
/// <paramref name="parameters"/> may be a dictionary or an anonymous
/// object (<c>new { name = "Bob" }</c>) — see <see cref="ScriptArgs"/>.
/// </summary>
/// <param name="scriptName">Name of the shared script to execute.</param>
/// <param name="parameters">Optional parameters to pass to the script (dictionary or anonymous object).</param>
/// <param name="cancellationToken">Cancellation token for async execution.</param>
/// <returns>The return value from the shared script.</returns>
public async Task<object?> CallShared(
string scriptName,
object? parameters = null,
CancellationToken cancellationToken = default)
{
var nextDepth = _currentCallDepth + 1;
if (nextDepth > _maxCallDepth)
{
var msg = $"Script call depth exceeded maximum of {_maxCallDepth}. " +
$"CallShared('{scriptName}') rejected at depth {nextDepth}.";
_logger.LogError(msg);
// M2.12 (#25): emit to site event log via the parent context's
// helper — single emission path, fire-and-forget.
_context.EmitRecursionLimitEventAsync(msg);
throw new InvalidOperationException(msg);
}
// Audit Log #23 (M5.4 — ParentExecutionId tag-cascade): the shared
// script runs inline, but is modelled as its OWN execution node — a
// child context mints a fresh ExecutionId parented to the caller's
// ExecutionId, so its audit rows chain under the calling run.
var childContext = _context.CreateChildContextForSharedScript(nextDepth);
return await _library.ExecuteAsync(
scriptName, childContext, ScriptArgs.Normalize(parameters), cancellationToken);
}
}
/// <summary>
/// WP-13: Helper for ExternalSystem.Call/CachedCall syntax.
/// </summary>
/// <remarks>
/// Audit Log #23 (M2 Bundle F): every <see cref="Call"/> invocation emits
/// one <c>ApiOutbound</c>/<c>ApiCall</c> audit row via <see cref="IAuditWriter"/>.
/// The audit emission is wrapped in a try/catch that swallows every exception
/// — the audit pipeline is best-effort and must NEVER abort the script's
/// outbound call (alog.md §7). The original <see cref="ExternalCallResult"/>
/// (or the original thrown exception) flows back to the caller unchanged.
/// </remarks>
/// <summary>
/// Helper for external system API calls from scripts.
/// </summary>
public class ExternalSystemHelper
{
private static readonly Regex HttpStatusRegex = new(
@"HTTP\s+(?<code>\d{3})",
RegexOptions.Compiled | RegexOptions.CultureInvariant);
private readonly IExternalSystemClient? _client;
private readonly string _instanceName;
private readonly ILogger _logger;
private readonly Guid _executionId;
/// <summary>
/// Audit Log #23 (ParentExecutionId): the spawning execution's id when
/// this run was inbound-API-routed; <c>null</c> for non-routed runs.
/// Threaded alongside <see cref="_executionId"/> ready for the Task 5
/// emitter — no audit row carries it yet.
/// </summary>
private readonly Guid? _parentExecutionId;
private readonly IAuditWriter? _auditWriter;
private readonly string _siteId;
private readonly string? _sourceScript;
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
/// <summary>
/// SourceNode-stamping (Task 14): the local cluster node name on
/// which this script is executing (<c>node-a</c>/<c>node-b</c>).
/// Stamped onto <c>SiteCallOperational.SourceNode</c> on the three
/// cached-call telemetry construction sites (CachedSubmit + the two
/// immediate-completion rows) so central can persist it on the
/// <c>SiteCalls</c> row.
/// </summary>
private readonly string? _sourceNode;
/// <summary>
/// Initializes a new external system helper for script API calls.
/// </summary>
/// <param name="client">Optional client for external system API calls.</param>
/// <param name="instanceName">Unique name of the instance making the call.</param>
/// <param name="logger">Logger for diagnostics and warnings.</param>
/// <param name="executionId">Unique identifier for this script execution.</param>
/// <param name="auditWriter">Optional writer for audit log entries.</param>
/// <param name="siteId">Identifier of the site where this call originates.</param>
/// <param name="sourceScript">Optional name of the source script for audit trail.</param>
/// <param name="cachedForwarder">Optional forwarder for cached call telemetry.</param>
/// <param name="parentExecutionId">Optional identifier of the parent execution (for routed calls).</param>
/// <param name="sourceNode">Optional cluster node identifier (node-a/node-b) for audit stamping.</param>
// Internal constructor for tests living in ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests
// (via InternalsVisibleTo). Production sites resolve the helper through
// ScriptRuntimeContext.ExternalSystem.
//
// Parameter ordering: executionId sits immediately after the
// ILogger across all four audit-threaded ctors (ExternalSystemHelper,
// DatabaseHelper, AuditingDbConnection, AuditingDbCommand) — a required
// Guid cannot follow the optional provenance params without a
// required-after-optional compile error, so the post-logger slot is the
// one consistent position that compiles cleanly everywhere. The nullable
// parentExecutionId is a trailing optional param so existing positional
// callers stay source-compatible.
internal ExternalSystemHelper(
IExternalSystemClient? client,
string instanceName,
ILogger logger,
Guid executionId,
IAuditWriter? auditWriter = null,
string siteId = "",
string? sourceScript = null,
ICachedCallTelemetryForwarder? cachedForwarder = null,
Guid? parentExecutionId = null,
string? sourceNode = null)
{
_client = client;
_instanceName = instanceName;
_logger = logger;
_executionId = executionId;
_auditWriter = auditWriter;
_siteId = siteId;
_sourceScript = sourceScript;
_cachedForwarder = cachedForwarder;
_parentExecutionId = parentExecutionId;
_sourceNode = sourceNode;
}
/// <summary>
/// Makes a synchronous external system API call with immediate response.
/// </summary>
/// <param name="systemName">Name of the external system to call.</param>
/// <param name="methodName">Name of the method to invoke on the external system.</param>
/// <param name="parameters">Optional parameters to pass to the method.</param>
/// <param name="cancellationToken">Cancellation token for the async operation.</param>
/// <returns>Result of the external API call including status and response data.</returns>
public async Task<ExternalCallResult> Call(
string systemName,
string methodName,
IReadOnlyDictionary<string, object?>? parameters = null,
CancellationToken cancellationToken = default)
{
if (_client == null)
throw new InvalidOperationException("External system client not available");
// Audit Log #23 (M2 Bundle F): wrap the outbound call so every
// attempt emits exactly one ApiOutbound/ApiCall row. The wrapper
// mirrors the existing call-site behaviour — the original result
// OR original exception flows back to the script untouched; the
// audit emission is best-effort.
var occurredAtUtc = DateTime.UtcNow;
var startTicks = Stopwatch.GetTimestamp();
ExternalCallResult? result = null;
Exception? thrown = null;
try
{
result = await _client.CallAsync(systemName, methodName, parameters, cancellationToken);
return result;
}
catch (Exception ex)
{
thrown = ex;
throw;
}
finally
{
var elapsedMs = (int)((Stopwatch.GetTimestamp() - startTicks)
* 1000d / Stopwatch.Frequency);
EmitCallAudit(systemName, methodName, occurredAtUtc, elapsedMs, result, thrown, parameters);
}
}
/// <summary>
/// Submit a cached outbound API call (Audit Log #23 / M3). Mints a
/// fresh <see cref="TrackedOperationId"/>, emits the lifecycle's first
/// <c>CachedSubmit</c> telemetry packet, hands the call to the
/// store-and-forward retry loop (which emits per-attempt and terminal
/// telemetry under the same id — Bundle E Tasks E4/E5), and returns
/// the id immediately so the script can later query
/// <c>Tracking.Status(id)</c>.
/// </summary>
/// <remarks>
/// <b>Best-effort emission (alog.md §7):</b> if the forwarder throws,
/// the failure is logged and swallowed; the underlying cached-call
/// path still runs and the id is still returned. The script must never
/// be aborted by an audit-pipeline failure.
/// </remarks>
/// <param name="systemName">Name of the external system to call.</param>
/// <param name="methodName">Name of the method to invoke on the external system.</param>
/// <param name="parameters">Optional parameters to pass to the method.</param>
/// <param name="cancellationToken">Cancellation token for the async operation.</param>
/// <returns>Tracked operation identifier for status queries.</returns>
public async Task<TrackedOperationId> CachedCall(
string systemName,
string methodName,
IReadOnlyDictionary<string, object?>? parameters = null,
CancellationToken cancellationToken = default)
{
if (_client == null)
throw new InvalidOperationException("External system client not available");
var trackedId = TrackedOperationId.New();
var occurredAtUtc = DateTime.UtcNow;
var target = $"{systemName}.{methodName}";
// Emit CachedSubmit telemetry BEFORE handing off to the S&F
// engine — that way the SiteCalls row is materialised before the
// first delivery attempt and Tracking.Status(id) can observe a
// Submitted row even if the immediate-delivery attempt happens to
// resolve before this method returns.
await EmitCachedSubmitTelemetryAsync(
systemName, methodName, target, trackedId, occurredAtUtc, parameters, cancellationToken)
.ConfigureAwait(false);
// Hand off to the existing cached-call path. The TrackedOperationId
// becomes the S&F message id so the retry loop (Bundle E Tasks
// E4/E5) can read it back via StoreAndForwardMessage.Id.
//
// M3 Bundle F (F2): the result is now retained because the
// immediate-success path (WasBuffered=false) bypasses S&F entirely
// — no retry loop, no ICachedCallLifecycleObserver fire. The
// helper must emit the Attempted + CachedResolve terminal rows
// itself, otherwise Tracking.Status(id) would stay in Submitted
// forever and the audit log would be missing the M3 lifecycle.
// The WasBuffered=true path is unaffected — the S&F retry loop
// owns the Attempted + Resolve emissions in that case.
ExternalCallResult? result;
try
{
result = await _client.CachedCallAsync(
systemName,
methodName,
parameters,
_instanceName,
cancellationToken,
trackedId,
// Audit Log #23 (ExecutionId Task 4): thread the script
// execution's ExecutionId + SourceScript so a buffered
// cached call's retry-loop audit rows carry them.
executionId: _executionId,
sourceScript: _sourceScript,
// Audit Log #23 (ParentExecutionId Task 6): thread the
// spawning inbound-API request's ExecutionId so a buffered
// cached call's retry-loop audit rows carry it too. Null
// for a non-routed run.
parentExecutionId: _parentExecutionId).ConfigureAwait(false);
}
catch (Exception ex)
{
// The cached-call surface returns ExternalCallResult on permanent
// failure rather than throwing; a throw here is exceptional
// (e.g. cancellation, resolver outage). Log it and rethrow — the
// script does need to learn about catastrophic failures. The
// tracked id was still returned via the telemetry submit above.
_logger.LogWarning(ex,
"ExternalSystem.CachedCall threw for {System}.{Method} (TrackedOperationId {Id})",
systemName, methodName, trackedId);
throw;
}
// M3 Bundle F (F2): immediate-completion lifecycle — emit the
// missing Attempted + CachedResolve rows when the underlying call
// resolved without engaging the store-and-forward retry loop.
if (result is { WasBuffered: false })
{
await EmitImmediateTerminalTelemetryAsync(
systemName, methodName, target, trackedId, result, parameters, cancellationToken)
.ConfigureAwait(false);
}
return trackedId;
}
/// <summary>
/// Best-effort emission of the CachedSubmit lifecycle event. Any
/// exception thrown by the forwarder is logged and swallowed so the
/// calling script's enqueue is not disturbed.
/// </summary>
private async Task EmitCachedSubmitTelemetryAsync(
string systemName,
string methodName,
string target,
TrackedOperationId trackedId,
DateTime occurredAtUtc,
IReadOnlyDictionary<string, object?>? parameters,
CancellationToken cancellationToken)
{
if (_cachedForwarder == null)
{
return;
}
CachedCallTelemetry telemetry;
try
{
telemetry = new CachedCallTelemetry(
Audit: ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.ApiOutbound,
kind: AuditKind.CachedSubmit,
status: AuditStatus.Submitted,
occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
target: target,
// CorrelationId stays the per-operation lifecycle id
// (TrackedOperationId); ExecutionId carries the
// per-execution id shared across this script run.
correlationId: trackedId.Value,
executionId: _executionId,
// Audit Log #23 (ParentExecutionId): the spawning
// execution's id; null for non-routed runs.
parentExecutionId: _parentExecutionId,
sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId,
sourceInstanceId: _instanceName,
sourceScript: _sourceScript,
// Submit precedes the call — request args only, no response yet.
requestSummary: SerializeRequest(parameters)),
Operational: new SiteCallOperational(
TrackedOperationId: trackedId,
Channel: "ApiOutbound",
Target: target,
SourceSite: _siteId,
// SourceNode-stamping (Task 14): the local node name
// (node-a/node-b) — threaded through INodeIdentityProvider
// at the ScriptExecutionActor; null when no provider was
// wired so central persists SiteCalls.SourceNode as NULL.
SourceNode: _sourceNode,
Status: "Submitted",
RetryCount: 0,
LastError: null,
HttpStatus: null,
CreatedAtUtc: occurredAtUtc,
UpdatedAtUtc: occurredAtUtc,
TerminalAtUtc: null));
}
catch (Exception buildEx)
{
_logger.LogWarning(buildEx,
"Failed to build CachedSubmit telemetry for {System}.{Method} (TrackedOperationId {Id}) — skipping emission",
systemName, methodName, trackedId);
return;
}
try
{
await _cachedForwarder.ForwardAsync(telemetry, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"CachedSubmit telemetry forward failed for {System}.{Method} (TrackedOperationId {Id})",
systemName, methodName, trackedId);
}
}
/// <summary>
/// M3 Bundle F (F2): emit the Attempted + CachedResolve lifecycle
/// rows for an immediate-completion <c>CachedCall</c> (WasBuffered=false).
/// The S&amp;F retry loop never engaged, so the
/// <c>ICachedCallLifecycleObserver</c> never fires — the helper must
/// produce both rows itself to keep the M3 audit contract whole
/// (Submit → Attempted → Resolve under one TrackedOperationId).
/// </summary>
/// <remarks>
/// Best-effort emission: a throwing forwarder is logged and swallowed
/// per alog.md §7. The two rows are emitted INDEPENDENTLY so a single
/// forwarder fault doesn't drop both halves of the terminal pair.
/// </remarks>
private async Task EmitImmediateTerminalTelemetryAsync(
string systemName,
string methodName,
string target,
TrackedOperationId trackedId,
ExternalCallResult result,
IReadOnlyDictionary<string, object?>? parameters,
CancellationToken cancellationToken)
{
if (_cachedForwarder == null)
{
return;
}
var occurredAtUtc = DateTime.UtcNow;
// Extract an HTTP status from the error message when present
// (mirrors EmitCallAudit's existing HttpStatusRegex behaviour so
// the immediate-failure row carries the same HttpStatus value the
// synchronous Call() audit row would have stamped).
int? httpStatus = null;
if (!result.Success && !string.IsNullOrEmpty(result.ErrorMessage))
{
var match = HttpStatusRegex.Match(result.ErrorMessage);
if (match.Success && int.TryParse(match.Groups["code"].Value, out var code))
{
httpStatus = code;
}
}
// Status mapping for immediate completion:
// Success=true -> Delivered (audit) / "Delivered" (operational)
// Success=false -> Failed (audit) / "Failed" (operational)
// Permanent vs transient is not relevant here: a permanent failure
// returns Success=false WasBuffered=false (parked-equivalent); a
// transient failure with NO S&F engine wired likewise lands here
// with Success=false. Either way the terminal state is "the
// immediate attempt failed and the operation is done".
var auditTerminalStatus = result.Success
? AuditStatus.Delivered
: AuditStatus.Failed;
var operationalTerminalStatus = result.Success ? "Delivered" : "Failed";
// --- Attempted row -------------------------------------------------
CachedCallTelemetry attempted;
try
{
attempted = new CachedCallTelemetry(
Audit: ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.ApiOutbound,
kind: AuditKind.ApiCallCached,
status: AuditStatus.Attempted,
occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
target: target,
// CorrelationId = per-operation lifecycle id;
// ExecutionId = per-execution id for this script run.
correlationId: trackedId.Value,
executionId: _executionId,
// Audit Log #23 (ParentExecutionId): the spawning
// execution's id; null for non-routed runs.
parentExecutionId: _parentExecutionId,
sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId,
sourceInstanceId: _instanceName,
sourceScript: _sourceScript,
httpStatus: httpStatus,
errorMessage: result.Success ? null : result.ErrorMessage,
requestSummary: SerializeRequest(parameters),
responseSummary: result.ResponseJson),
Operational: new SiteCallOperational(
TrackedOperationId: trackedId,
Channel: "ApiOutbound",
Target: target,
SourceSite: _siteId,
// SourceNode-stamping (Task 14): the local node name
// (node-a/node-b) — threaded through INodeIdentityProvider
// at the ScriptExecutionActor; null when no provider was
// wired so central persists SiteCalls.SourceNode as NULL.
SourceNode: _sourceNode,
Status: "Attempted",
// RetryCount stays 0 — the operation never reached the
// S&F retry sweep, so no retries were performed.
RetryCount: 0,
LastError: result.Success ? null : result.ErrorMessage,
HttpStatus: httpStatus,
CreatedAtUtc: occurredAtUtc,
UpdatedAtUtc: occurredAtUtc,
TerminalAtUtc: null));
}
catch (Exception buildEx)
{
_logger.LogWarning(buildEx,
"Failed to build immediate-Attempted telemetry for {System}.{Method} (TrackedOperationId {Id}) — skipping emission",
systemName, methodName, trackedId);
attempted = null!;
}
if (attempted is not null)
{
try
{
await _cachedForwarder.ForwardAsync(attempted, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Immediate-Attempted telemetry forward failed for {System}.{Method} (TrackedOperationId {Id})",
systemName, methodName, trackedId);
}
}
// --- CachedResolve row --------------------------------------------
CachedCallTelemetry resolve;
try
{
resolve = new CachedCallTelemetry(
Audit: ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.ApiOutbound,
kind: AuditKind.CachedResolve,
status: auditTerminalStatus,
occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
target: target,
// CorrelationId = per-operation lifecycle id;
// ExecutionId = per-execution id for this script run.
correlationId: trackedId.Value,
executionId: _executionId,
// Audit Log #23 (ParentExecutionId): the spawning
// execution's id; null for non-routed runs.
parentExecutionId: _parentExecutionId,
sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId,
sourceInstanceId: _instanceName,
sourceScript: _sourceScript,
httpStatus: httpStatus,
errorMessage: result.Success ? null : result.ErrorMessage,
requestSummary: SerializeRequest(parameters),
responseSummary: result.ResponseJson),
Operational: new SiteCallOperational(
TrackedOperationId: trackedId,
Channel: "ApiOutbound",
Target: target,
SourceSite: _siteId,
// SourceNode-stamping (Task 14): the local node name
// (node-a/node-b) — threaded through INodeIdentityProvider
// at the ScriptExecutionActor; null when no provider was
// wired so central persists SiteCalls.SourceNode as NULL.
SourceNode: _sourceNode,
Status: operationalTerminalStatus,
RetryCount: 0,
LastError: result.Success ? null : result.ErrorMessage,
HttpStatus: httpStatus,
CreatedAtUtc: occurredAtUtc,
UpdatedAtUtc: occurredAtUtc,
// Immediate-completion terminal — mark TerminalAtUtc so
// SiteCallAudit can move the row to its purge eligible
// set.
TerminalAtUtc: occurredAtUtc));
}
catch (Exception buildEx)
{
_logger.LogWarning(buildEx,
"Failed to build immediate-CachedResolve telemetry for {System}.{Method} (TrackedOperationId {Id}) — skipping emission",
systemName, methodName, trackedId);
return;
}
try
{
await _cachedForwarder.ForwardAsync(resolve, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Immediate-CachedResolve telemetry forward failed for {System}.{Method} (TrackedOperationId {Id})",
systemName, methodName, trackedId);
}
}
/// <summary>
/// Best-effort emission of one <c>ApiOutbound</c>/<c>ApiCall</c> audit
/// row. Any exception thrown by the writer is logged and swallowed —
/// audit-write failures must never abort the user-facing action.
/// </summary>
private void EmitCallAudit(
string systemName,
string methodName,
DateTime occurredAtUtc,
int durationMs,
ExternalCallResult? result,
Exception? thrown,
IReadOnlyDictionary<string, object?>? parameters)
{
if (_auditWriter == null)
{
return;
}
AuditEvent evt;
try
{
evt = BuildCallAuditEvent(
systemName, methodName, occurredAtUtc, durationMs, result, thrown, parameters);
}
catch (Exception buildEx)
{
// Building the event itself must never propagate. This is a
// defensive guard — populating a record from already-validated
// values shouldn't throw, but we honour the alog.md §7
// best-effort contract regardless.
_logger.LogWarning(buildEx,
"Failed to build Audit Log #23 event for {System}.{Method} — skipping emission",
systemName, methodName);
return;
}
try
{
// Fire-and-forget so we never block the script on the audit
// writer; the writer itself is responsible for fast, durable
// enqueue (site SQLite hot-path). We DO observe failures via
// ContinueWith so a thrown writer is logged rather than going
// to the unobserved-task firehose.
var writeTask = _auditWriter.WriteAsync(evt, CancellationToken.None);
if (!writeTask.IsCompleted)
{
writeTask.ContinueWith(
t => _logger.LogWarning(t.Exception,
"Audit Log #23 write failed for EventId {EventId} ({System}.{Method})",
evt.EventId, systemName, methodName),
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
else if (writeTask.IsFaulted)
{
_logger.LogWarning(writeTask.Exception,
"Audit Log #23 write failed for EventId {EventId} ({System}.{Method})",
evt.EventId, systemName, methodName);
}
}
catch (Exception writeEx)
{
// Synchronous throw from WriteAsync (e.g. ArgumentNullException
// before the writer's own try/catch). Swallow + log per the
// alog.md §7 contract.
_logger.LogWarning(writeEx,
"Audit Log #23 write threw synchronously for EventId {EventId} ({System}.{Method})",
evt.EventId, systemName, methodName);
}
}
private AuditEvent BuildCallAuditEvent(
string systemName,
string methodName,
DateTime occurredAtUtc,
int durationMs,
ExternalCallResult? result,
Exception? thrown,
IReadOnlyDictionary<string, object?>? parameters)
{
// Status: Delivered on a Success result; Failed otherwise (the
// ExternalSystemClient already maps HTTP non-2xx + transient
// exceptions into Success=false on the result, or surfaces a raw
// exception). M2 makes no distinction between transient + permanent
// failure here — both manifest as Status.Failed on the sync path.
var status = (thrown == null && result != null && result.Success)
? AuditStatus.Delivered
: AuditStatus.Failed;
string? errorMessage = null;
string? errorDetail = null;
int? httpStatus = null;
if (thrown != null)
{
errorMessage = thrown.Message;
errorDetail = thrown.ToString();
}
else if (result != null && !result.Success)
{
errorMessage = result.ErrorMessage;
// The ExternalSystemClient embeds the HTTP status code in the
// error message as "HTTP {code}". Parse it back out so the
// audit row carries the structured value.
if (!string.IsNullOrEmpty(result.ErrorMessage))
{
var match = HttpStatusRegex.Match(result.ErrorMessage);
if (match.Success
&& int.TryParse(match.Groups["code"].Value, out var parsed))
{
httpStatus = parsed;
}
}
}
return ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.ApiOutbound,
kind: AuditKind.ApiCall,
status: status,
occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
// Outbound channel: per the Audit Log Actor-column spec the actor
// is the calling script. Null when no single script owns the call
// (e.g. a shared script running inline).
actor: _sourceScript,
target: $"{systemName}.{methodName}",
// Audit Log #23: a sync one-shot call has no operation
// lifecycle, so CorrelationId is null. ExecutionId carries the
// per-execution id so all the sync ApiCall/DbWrite rows from
// one script run can be correlated together.
correlationId: null,
executionId: _executionId,
// Audit Log #23 (ParentExecutionId): the spawning execution's
// id; null for non-routed runs.
parentExecutionId: _parentExecutionId,
sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId,
sourceInstanceId: _instanceName,
sourceScript: _sourceScript,
httpStatus: httpStatus,
durationMs: durationMs,
errorMessage: errorMessage,
errorDetail: errorDetail,
// Payload capture: the request arguments and the response body.
// The audit writer's redactor applies the configured size cap and
// header/secret redaction downstream — the emitter just hands
// over the raw values.
requestSummary: SerializeRequest(parameters),
responseSummary: result?.ResponseJson,
payloadTruncated: false,
extra: null);
}
/// <summary>
/// Serialises the outbound-call argument dictionary into the JSON
/// <c>RequestSummary</c> stamped on <c>ApiOutbound</c> audit rows.
/// Returns <c>null</c> for a null/empty argument set. Serialization
/// failure is swallowed (returns <c>null</c>) — a payload that cannot be
/// summarised must never abort the best-effort audit emission.
/// </summary>
private static string? SerializeRequest(IReadOnlyDictionary<string, object?>? parameters)
{
if (parameters is null || parameters.Count == 0)
{
return null;
}
try
{
return JsonSerializer.Serialize(parameters);
}
catch (Exception)
{
return null;
}
}
}
/// <summary>
/// WP-13: Helper for Database.Connection/CachedWrite syntax.
/// </summary>
/// <remarks>
/// Audit Log #23 (M3 Bundle E — Task E6): <see cref="CachedWrite"/> mirrors
/// <see cref="ExternalSystemHelper.CachedCall"/> — mints a
/// <see cref="TrackedOperationId"/>, emits the lifecycle's first
/// CachedSubmit packet (Channel <c>DbOutbound</c>), hands off to the S&amp;F
/// retry loop, and returns the id. Per-attempt + terminal telemetry is
/// emitted by the retry loop (Tasks E4/E5).
/// </remarks>
public class DatabaseHelper
{
private readonly IDatabaseGateway? _gateway;
private readonly string _instanceName;
private readonly ILogger _logger;
private readonly Guid _executionId;
/// <summary>
/// Audit Log #23 (ParentExecutionId): the spawning execution's id when
/// this run was inbound-API-routed; <c>null</c> for non-routed runs.
/// Threaded alongside <see cref="_executionId"/> ready for the Task 5
/// emitter — no audit row carries it yet.
/// </summary>
private readonly Guid? _parentExecutionId;
private readonly string _siteId;
private readonly string? _sourceScript;
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
/// <summary>
/// Audit Log #23 (M4 Bundle A): best-effort emitter for synchronous
/// <c>Database.Connection</c>-routed Execute / ExecuteScalar /
/// ExecuteReader calls. When wired, <see cref="Connection"/> returns
/// an <see cref="AuditingDbConnection"/> that intercepts each command
/// execution and writes one <c>DbOutbound</c>/<c>DbWrite</c> audit
/// row. Optional — when null the helper falls back to the raw
/// inner <see cref="System.Data.Common.DbConnection"/> the gateway
/// returns (tests / minimal hosts that don't wire audit).
/// </summary>
private readonly IAuditWriter? _auditWriter;
/// <summary>
/// SourceNode-stamping (Task 14): the local cluster node name on
/// which this script is executing (<c>node-a</c>/<c>node-b</c>).
/// Stamped onto <c>SiteCallOperational.SourceNode</c> at the
/// <c>Database.CachedWrite</c> CachedSubmit telemetry construction
/// site so central can persist it on the <c>SiteCalls</c> row.
/// </summary>
private readonly string? _sourceNode;
/// <summary>
/// Initializes a new database helper for script database access.
/// </summary>
/// <param name="gateway">Optional gateway for database connection and cached write access.</param>
/// <param name="instanceName">Unique name of the instance making the call.</param>
/// <param name="logger">Logger for diagnostics and warnings.</param>
/// <param name="executionId">Unique identifier for this script execution.</param>
/// <param name="auditWriter">Optional writer for audit log entries.</param>
/// <param name="siteId">Identifier of the site where this call originates.</param>
/// <param name="sourceScript">Optional name of the source script for audit trail.</param>
/// <param name="cachedForwarder">Optional forwarder for cached call telemetry.</param>
/// <param name="parentExecutionId">Optional identifier of the parent execution (for routed calls).</param>
/// <param name="sourceNode">Optional cluster node identifier (node-a/node-b) for audit stamping.</param>
// Parameter ordering: executionId sits immediately after the
// ILogger — see the note on ExternalSystemHelper's ctor for why the
// post-logger slot is the one consistent position across all four
// audit-threaded ctors. parentExecutionId is a trailing optional param.
internal DatabaseHelper(
IDatabaseGateway? gateway,
string instanceName,
ILogger logger,
Guid executionId,
IAuditWriter? auditWriter = null,
string siteId = "",
string? sourceScript = null,
ICachedCallTelemetryForwarder? cachedForwarder = null,
Guid? parentExecutionId = null,
string? sourceNode = null)
{
_gateway = gateway;
_instanceName = instanceName;
_logger = logger;
_executionId = executionId;
_auditWriter = auditWriter;
_siteId = siteId;
_sourceScript = sourceScript;
_cachedForwarder = cachedForwarder;
_parentExecutionId = parentExecutionId;
_sourceNode = sourceNode;
}
/// <summary>
/// Gets a database connection by name, optionally wrapped with audit logging.
/// </summary>
/// <param name="name">Name of the database connection to retrieve.</param>
/// <param name="cancellationToken">Cancellation token for the async operation.</param>
/// <returns>An auditing-wrapped database connection ready for use.</returns>
public async Task<System.Data.Common.DbConnection> Connection(
string name,
CancellationToken cancellationToken = default)
{
if (_gateway == null)
throw new InvalidOperationException("Database gateway not available");
var inner = await _gateway.GetConnectionAsync(name, cancellationToken);
// Audit Log #23 (M4 Bundle A): wrap in an auditing decorator so
// every script-initiated Execute* / ExecuteReader on the returned
// connection emits one DbOutbound/DbWrite audit row. The wrapper
// delegates all other ADO.NET behaviour to the inner connection
// unchanged — including disposal, so the caller's existing
// dispose pattern (await using var conn = ...) still releases
// the underlying connection to the pool.
if (_auditWriter == null)
{
return inner;
}
return new AuditingDbConnection(
inner,
_auditWriter,
connectionName: name,
siteId: _siteId,
instanceName: _instanceName,
sourceScript: _sourceScript,
logger: _logger,
executionId: _executionId,
// Audit Log #23 (ParentExecutionId): the spawning execution's
// id, threaded alongside _executionId. Null for non-routed runs.
parentExecutionId: _parentExecutionId);
}
/// <summary>
/// Submit a cached outbound database write. Mints a fresh
/// <see cref="TrackedOperationId"/>, emits CachedSubmit telemetry on
/// <c>DbOutbound</c>, hands off to the cached-write S&amp;F path, and
/// returns the id. Best-effort emission per alog.md §7.
/// </summary>
/// <param name="name">Name of the database connection to use.</param>
/// <param name="sql">SQL statement to execute.</param>
/// <param name="parameters">Optional parameters to pass to the SQL statement.</param>
/// <param name="cancellationToken">Cancellation token for the async operation.</param>
/// <returns>Tracked operation identifier for status queries.</returns>
public async Task<TrackedOperationId> CachedWrite(
string name,
string sql,
IReadOnlyDictionary<string, object?>? parameters = null,
CancellationToken cancellationToken = default)
{
if (_gateway == null)
throw new InvalidOperationException("Database gateway not available");
var trackedId = TrackedOperationId.New();
var occurredAtUtc = DateTime.UtcNow;
// The DB cached-write target uses the connection name (the only
// human-readable handle the gateway carries on the buffered row).
var target = name;
await EmitCachedDbSubmitTelemetryAsync(
name, trackedId, target, occurredAtUtc, cancellationToken)
.ConfigureAwait(false);
// M2.3 (#7): the gateway now attempts the write immediately and
// classifies the outcome (mirroring ExternalSystem.CachedCall). The
// result is retained because the immediate paths (WasBuffered=false —
// immediate success OR a synchronous permanent failure) bypass the
// S&F retry loop entirely, so no retry-loop telemetry ever fires.
// This helper must emit the Attempted + CachedResolve terminal rows
// itself, otherwise Tracking.Status(id) would stay Submitted forever
// and the audit log would be missing the terminal lifecycle. The
// WasBuffered=true path is unaffected — the S&F retry loop owns the
// Attempted + Resolve emissions there.
ExternalCallResult? result;
try
{
result = await _gateway.CachedWriteAsync(
name, sql, parameters, _instanceName, cancellationToken, trackedId,
// Audit Log #23 (ExecutionId Task 4): thread the script
// execution's ExecutionId + SourceScript so a buffered
// cached write's retry-loop audit rows carry them.
executionId: _executionId,
sourceScript: _sourceScript,
// Audit Log #23 (ParentExecutionId Task 6): thread the
// spawning inbound-API request's ExecutionId so a buffered
// cached write's retry-loop audit rows carry it too. Null
// for a non-routed run.
parentExecutionId: _parentExecutionId)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Database.CachedWrite threw for {Connection} (TrackedOperationId {Id})",
name, trackedId);
throw;
}
// M2.3 (#7): immediate-completion lifecycle — emit the missing
// Attempted + CachedResolve rows when the underlying write resolved
// without engaging the store-and-forward retry loop (immediate
// success or a synchronous permanent failure).
if (result is { WasBuffered: false })
{
await EmitImmediateDbTerminalTelemetryAsync(
name, target, trackedId, result, cancellationToken)
.ConfigureAwait(false);
}
return trackedId;
}
/// <summary>
/// M2.3 (#7): best-effort emission of the immediate-completion lifecycle
/// for a <c>Database.CachedWrite</c> that resolved without the S&amp;F
/// retry loop — emits an <c>Attempted</c> row then a terminal
/// <c>CachedResolve</c> row (<c>Delivered</c> on success, <c>Failed</c> on
/// a synchronous permanent SQL error). The DB parallel of
/// <see cref="EmitImmediateTerminalTelemetryAsync"/>. Any forwarder
/// failure is logged and swallowed (alog.md §7).
/// </summary>
private async Task EmitImmediateDbTerminalTelemetryAsync(
string connectionName,
string target,
TrackedOperationId trackedId,
ExternalCallResult result,
CancellationToken cancellationToken)
{
if (_cachedForwarder == null)
{
return;
}
var occurredAtUtc = DateTime.UtcNow;
// Status mapping mirrors the API path: success -> Delivered, a
// synchronous permanent failure -> Failed. A transient failure never
// reaches here (WasBuffered=true), so "the immediate attempt failed
// and the operation is done" always means a permanent failure.
var auditTerminalStatus = result.Success ? AuditStatus.Delivered : AuditStatus.Failed;
var operationalTerminalStatus = result.Success ? "Delivered" : "Failed";
// --- Attempted row -------------------------------------------------
CachedCallTelemetry? attempted = TryBuildDbTerminalTelemetry(
connectionName, target, trackedId, occurredAtUtc,
AuditKind.DbWriteCached, AuditStatus.Attempted, "Attempted",
result, isTerminal: false);
if (attempted is not null)
{
try
{
await _cachedForwarder.ForwardAsync(attempted, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Immediate-Attempted telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})",
connectionName, trackedId);
}
}
// --- CachedResolve row --------------------------------------------
CachedCallTelemetry? resolve = TryBuildDbTerminalTelemetry(
connectionName, target, trackedId, occurredAtUtc,
AuditKind.CachedResolve, auditTerminalStatus, operationalTerminalStatus,
result, isTerminal: true);
if (resolve is not null)
{
try
{
await _cachedForwarder.ForwardAsync(resolve, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Immediate-CachedResolve telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})",
connectionName, trackedId);
}
}
}
/// <summary>
/// Builds one immediate-completion <c>DbOutbound</c> telemetry packet, or
/// returns <c>null</c> (and logs) when construction throws — so a build
/// failure skips emission rather than aborting the script.
/// </summary>
private CachedCallTelemetry? TryBuildDbTerminalTelemetry(
string connectionName,
string target,
TrackedOperationId trackedId,
DateTime occurredAtUtc,
AuditKind kind,
AuditStatus auditStatus,
string operationalStatus,
ExternalCallResult result,
bool isTerminal)
{
try
{
return new CachedCallTelemetry(
Audit: ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.DbOutbound,
kind: kind,
status: auditStatus,
occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
target: target,
correlationId: trackedId.Value,
executionId: _executionId,
parentExecutionId: _parentExecutionId,
sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId,
sourceInstanceId: _instanceName,
sourceScript: _sourceScript,
errorMessage: result.Success ? null : result.ErrorMessage),
Operational: new SiteCallOperational(
TrackedOperationId: trackedId,
Channel: "DbOutbound",
Target: target,
SourceSite: _siteId,
SourceNode: _sourceNode,
Status: operationalStatus,
RetryCount: 0,
LastError: result.Success ? null : result.ErrorMessage,
HttpStatus: null,
CreatedAtUtc: occurredAtUtc,
UpdatedAtUtc: occurredAtUtc,
TerminalAtUtc: isTerminal ? occurredAtUtc : null));
}
catch (Exception buildEx)
{
_logger.LogWarning(buildEx,
"Failed to build immediate-{Kind} telemetry for Database.CachedWrite {Connection} (TrackedOperationId {Id}) — skipping emission",
kind, connectionName, trackedId);
return null;
}
}
private async Task EmitCachedDbSubmitTelemetryAsync(
string connectionName,
TrackedOperationId trackedId,
string target,
DateTime occurredAtUtc,
CancellationToken cancellationToken)
{
if (_cachedForwarder == null)
{
return;
}
CachedCallTelemetry telemetry;
try
{
telemetry = new CachedCallTelemetry(
Audit: ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.DbOutbound,
kind: AuditKind.CachedSubmit,
status: AuditStatus.Submitted,
occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
target: target,
// CorrelationId = per-operation lifecycle id
// (TrackedOperationId); ExecutionId = per-execution id.
correlationId: trackedId.Value,
executionId: _executionId,
// Audit Log #23 (ParentExecutionId): the spawning
// execution's id; null for non-routed runs.
parentExecutionId: _parentExecutionId,
sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId,
sourceInstanceId: _instanceName,
sourceScript: _sourceScript),
Operational: new SiteCallOperational(
TrackedOperationId: trackedId,
Channel: "DbOutbound",
Target: target,
SourceSite: _siteId,
// SourceNode-stamping (Task 14): the local node name
// (node-a/node-b) — threaded through INodeIdentityProvider
// at the ScriptExecutionActor; null when no provider was
// wired so central persists SiteCalls.SourceNode as NULL.
SourceNode: _sourceNode,
Status: "Submitted",
RetryCount: 0,
LastError: null,
HttpStatus: null,
CreatedAtUtc: occurredAtUtc,
UpdatedAtUtc: occurredAtUtc,
TerminalAtUtc: null));
}
catch (Exception buildEx)
{
_logger.LogWarning(buildEx,
"Failed to build CachedSubmit telemetry for Database.CachedWrite {Connection} (TrackedOperationId {Id}) — skipping emission",
connectionName, trackedId);
return;
}
try
{
await _cachedForwarder.ForwardAsync(telemetry, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"CachedSubmit telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})",
connectionName, trackedId);
}
}
}
/// <summary>
/// Notification Outbox: helper for the <c>Notify</c> script API.
///
/// In the outbox design the site no longer delivers notification email inline.
/// <c>Notify.To("listName").Send(...)</c> enqueues the notification into the site
/// Store-and-Forward Engine — which forwards it to central — and returns a
/// <c>NotificationId</c> handle immediately. <c>Notify.Status(id)</c> later queries
/// the delivery status of that notification.
/// </summary>
public class NotifyHelper
{
private readonly StoreAndForwardService? _storeAndForward;
private readonly ICanTell? _siteCommunicationActor;
private readonly string _siteId;
private readonly string _instanceName;
private readonly string? _sourceScript;
private readonly TimeSpan _askTimeout;
private readonly ILogger _logger;
/// <summary>
/// Audit Log #23: the per-execution id for this script run, stamped
/// into <c>AuditEvent.ExecutionId</c> on the <c>NotifySend</c> row.
/// </summary>
private readonly Guid _executionId;
/// <summary>
/// Audit Log #23 (ParentExecutionId): the spawning execution's id when
/// this run was inbound-API-routed; <c>null</c> for non-routed runs.
/// Threaded alongside <see cref="_executionId"/> ready for the Task 5
/// emitter — no audit row carries it yet.
/// </summary>
private readonly Guid? _parentExecutionId;
/// <summary>
/// Audit Log #23 (M4 Bundle C): best-effort emitter for the
/// <c>Notification</c>/<c>NotifySend</c> row produced when the script
/// calls <c>Notify.To(list).Send(...)</c>. Optional — when null the
/// <see cref="NotifyTarget"/> degrades to a no-op audit path so tests
/// / minimal hosts that don't wire AddAuditLog still work (mirrors the
/// M2 Bundle F <c>IExternalSystemClient</c> wrapper).
/// </summary>
private readonly IAuditWriter? _auditWriter;
/// <summary>
/// SourceNode-stamping (Task 13): the cluster node name on which this
/// script is executing — <c>node-a</c>/<c>node-b</c>. Stamped onto
/// <c>NotificationSubmit.SourceNode</c> by <see cref="NotifyTarget.Send"/>
/// so central can persist it on the <c>Notifications</c> row.
/// </summary>
private readonly string? _sourceNode;
/// <summary>
/// Initializes a new notification helper for script notification delivery.
/// </summary>
/// <param name="storeAndForward">Optional store-and-forward service for notification delivery.</param>
/// <param name="siteCommunicationActor">Optional actor for site-to-central communication.</param>
/// <param name="siteId">Identifier of the site where this notification originates.</param>
/// <param name="instanceName">Unique name of the instance sending the notification.</param>
/// <param name="sourceScript">Optional name of the source script for audit trail.</param>
/// <param name="askTimeout">Timeout for Ask pattern messages.</param>
/// <param name="logger">Logger for diagnostics and warnings.</param>
/// <param name="executionId">Unique identifier for this script execution.</param>
/// <param name="auditWriter">Optional writer for audit log entries.</param>
/// <param name="parentExecutionId">Optional identifier of the parent execution (for routed calls).</param>
/// <param name="sourceNode">Optional cluster node identifier (node-a/node-b) for audit stamping.</param>
// Parameter ordering: executionId sits immediately after the ILogger,
// consistent with the other audit-threaded ctors. parentExecutionId is
// a trailing optional param.
internal NotifyHelper(
StoreAndForwardService? storeAndForward,
ICanTell? siteCommunicationActor,
string siteId,
string instanceName,
string? sourceScript,
TimeSpan askTimeout,
ILogger logger,
Guid executionId,
IAuditWriter? auditWriter = null,
Guid? parentExecutionId = null,
string? sourceNode = null)
{
_storeAndForward = storeAndForward;
_siteCommunicationActor = siteCommunicationActor;
_siteId = siteId;
_instanceName = instanceName;
_sourceScript = sourceScript;
_askTimeout = askTimeout;
_logger = logger;
_executionId = executionId;
_auditWriter = auditWriter;
_parentExecutionId = parentExecutionId;
_sourceNode = sourceNode;
}
/// <summary>
/// Selects the notification list to send to.
/// </summary>
/// <param name="listName">Name of the notification list to target.</param>
/// <returns>A notification target ready for message composition and sending.</returns>
public NotifyTarget To(string listName)
{
return new NotifyTarget(
listName, _storeAndForward, _siteId, _instanceName, _sourceScript, _logger,
// Audit Log #23: the per-execution id stamped into the
// NotifySend row's ExecutionId column.
_executionId,
// Audit Log #23 (M4 Bundle C): forward the writer so Send()
// can emit one NotifySend(Submitted) row per accepted submission.
_auditWriter,
// Audit Log #23 (ParentExecutionId): the spawning execution's
// id, threaded alongside _executionId. Null for non-routed runs.
_parentExecutionId,
// SourceNode-stamping (Task 13): the local node name, stamped
// onto NotificationSubmit.SourceNode in Send().
_sourceNode);
}
/// <summary>
/// Queries the delivery status of a previously-sent notification.
///
/// The query is issued to central via the site communication actor. While the
/// notification is still buffered in the site Store-and-Forward Engine — central
/// has no row for it yet (<c>Found: false</c>) but the buffer still holds the id —
/// the status is reported as the site-local <c>Forwarding</c> state. If central
/// has a row, its status is mapped through verbatim. If central does not know the
/// id and it is not buffered locally, the status is <c>Unknown</c>.
/// </summary>
/// <param name="notificationId">The notification identifier returned from Send.</param>
/// <returns>The delivery status of the notification.</returns>
public async Task<NotificationDeliveryStatus> Status(string notificationId)
{
if (_siteCommunicationActor == null)
throw new InvalidOperationException(
"Notification status query is not available — site communication actor not wired");
var correlationId = Guid.NewGuid().ToString();
var query = new NotificationStatusQuery(correlationId, notificationId);
NotificationStatusResponse response;
try
{
response = await _siteCommunicationActor
.Ask<NotificationStatusResponse>(query, _askTimeout);
}
catch (Exception ex)
{
// Central could not be reached. Fall through to the buffer check: if the
// notification is still in the local S&F buffer it is Forwarding.
_logger.LogWarning(ex,
"Notification status query for {NotificationId} did not reach central",
notificationId);
response = new NotificationStatusResponse(
correlationId, Found: false, Status: "Unknown",
RetryCount: 0, LastError: null, DeliveredAt: null);
}
if (response.Found)
{
return new NotificationDeliveryStatus(
response.Status, response.RetryCount, response.LastError, response.DeliveredAt);
}
// Central has no row. If the notification is still buffered at the site it
// is in transit — report the site-local Forwarding state. Otherwise it is
// genuinely unknown (never sent, or already forwarded and central lost it).
if (_storeAndForward != null)
{
var buffered = await _storeAndForward.GetMessageByIdAsync(notificationId);
if (buffered != null)
{
return new NotificationDeliveryStatus(
"Forwarding", buffered.RetryCount, buffered.LastError, DeliveredAt: null);
}
}
return new NotificationDeliveryStatus("Unknown", 0, null, null);
}
}
/// <summary>
/// Notification Outbox: target for <c>Notify.To("listName").Send(...)</c>.
/// </summary>
public class NotifyTarget
{
private readonly string _listName;
private readonly StoreAndForwardService? _storeAndForward;
private readonly string _siteId;
private readonly string _instanceName;
private readonly string? _sourceScript;
private readonly ILogger _logger;
/// <summary>
/// Audit Log #23: the per-execution id for this script run, stamped
/// into <c>AuditEvent.ExecutionId</c> on the <c>NotifySend</c> row.
/// </summary>
private readonly Guid _executionId;
/// <summary>
/// Audit Log #23 (ParentExecutionId): the spawning execution's id when
/// this run was inbound-API-routed; <c>null</c> for non-routed runs.
/// Threaded alongside <see cref="_executionId"/> ready for the Task 5
/// emitter — no audit row carries it yet.
/// </summary>
private readonly Guid? _parentExecutionId;
/// <summary>
/// Audit Log #23 (M4 Bundle C): best-effort emitter for the
/// <c>Notification</c>/<c>NotifySend</c> row written immediately after
/// the underlying S&amp;F enqueue accepts the submission. Optional —
/// when null no audit row is emitted (no-op path).
/// </summary>
private readonly IAuditWriter? _auditWriter;
/// <summary>
/// SourceNode-stamping (Task 13): the cluster node name on which this
/// script is executing (<c>node-a</c>/<c>node-b</c>). Stamped onto the
/// <c>NotificationSubmit.SourceNode</c> field in <see cref="Send"/> so
/// the central <c>NotificationOutboxActor</c> can persist it on the
/// <c>Notifications</c> row.
/// </summary>
private readonly string? _sourceNode;
/// <summary>
/// Initializes a new notification target for a specific notification list.
/// </summary>
/// <param name="listName">Name of the notification list to target.</param>
/// <param name="storeAndForward">Optional store-and-forward service for notification delivery.</param>
/// <param name="siteId">Identifier of the site where this notification originates.</param>
/// <param name="instanceName">Unique name of the instance sending the notification.</param>
/// <param name="sourceScript">Optional name of the source script for audit trail.</param>
/// <param name="logger">Logger for diagnostics and warnings.</param>
/// <param name="executionId">Unique identifier for this script execution.</param>
/// <param name="auditWriter">Optional writer for audit log entries.</param>
/// <param name="parentExecutionId">Optional identifier of the parent execution (for routed calls).</param>
/// <param name="sourceNode">Optional cluster node identifier (node-a/node-b) for audit stamping.</param>
internal NotifyTarget(
string listName,
StoreAndForwardService? storeAndForward,
string siteId,
string instanceName,
string? sourceScript,
ILogger logger,
Guid executionId,
IAuditWriter? auditWriter = null,
Guid? parentExecutionId = null,
string? sourceNode = null)
{
_listName = listName;
_storeAndForward = storeAndForward;
_siteId = siteId;
_instanceName = instanceName;
_sourceScript = sourceScript;
_logger = logger;
_executionId = executionId;
_auditWriter = auditWriter;
_parentExecutionId = parentExecutionId;
_sourceNode = sourceNode;
}
/// <summary>
/// Enqueues a notification for central delivery and returns its
/// <c>NotificationId</c> immediately.
///
/// The notification is buffered into the site Store-and-Forward Engine under the
/// <see cref="StoreAndForwardCategory.Notification"/> category; the S&amp;F
/// engine's <c>NotificationForwarder</c> forwards it to central and treats
/// central's ack as the delivery outcome. The returned <c>NotificationId</c> is
/// the single idempotency key end-to-end: it is the S&amp;F message id, it is
/// carried inside the buffered payload, and it is the id the forwarder submits to
/// central. Pass it to <see cref="NotifyHelper.Status"/> to track delivery.
/// </summary>
/// <param name="subject">Subject line for the notification.</param>
/// <param name="message">Message body for the notification.</param>
/// <param name="cancellationToken">Cancellation token for the async operation.</param>
/// <returns>The notification identifier for status tracking.</returns>
public async Task<string> Send(
string subject,
string message,
CancellationToken cancellationToken = default)
{
if (_storeAndForward == null)
throw new InvalidOperationException(
"Notification store-and-forward engine not available");
// The script controls the idempotency key: generate the NotificationId here,
// use it as the S&F message id, and carry it inside the buffered payload so
// the forwarder submits the same id to central on every retry.
var notificationId = Guid.NewGuid().ToString("N");
var payload = new NotificationSubmit(
NotificationId: notificationId,
ListName: _listName,
Subject: subject,
Body: message,
// SourceSiteId is re-stamped by the forwarder from its own site id; this
// value is the best-effort site id known to the script runtime.
SourceSiteId: _siteId,
SourceInstanceId: _instanceName,
// SourceScript (FU3): identifier of the script that raised this
// notification, threaded down from the script-execution context for the
// central audit trail. Null when no single script owns the context.
SourceScript: _sourceScript,
SiteEnqueuedAt: DateTimeOffset.UtcNow,
// OriginExecutionId (Audit Log #23): the SAME per-execution id stamped
// onto this run's NotifySend audit row. It rides inside the serialized
// payload through the S&F buffer to central, where the dispatcher echoes
// it onto the NotifyDeliver rows so all rows for one run share an id.
OriginExecutionId: _executionId,
// OriginParentExecutionId (Audit Log #23): the SAME parent-execution id
// stamped onto this run's NotifySend audit row — the spawning run's id
// for an inbound-API-routed execution, null otherwise. It rides through
// the S&F buffer to central, where the dispatcher echoes it onto the
// NotifyDeliver rows so the central rows carry the routed run's parent id.
OriginParentExecutionId: _parentExecutionId,
// SourceNode-stamping (Task 13): the cluster node name on which this
// notification was emitted (node-a/node-b). Stamped from the local
// INodeIdentityProvider via ScriptExecutionActor. Rides inside the
// serialized payload through the S&F buffer to central, where
// NotificationOutboxActor persists it on the Notifications row.
SourceNode: _sourceNode);
var payloadJson = JsonSerializer.Serialize(payload);
// The S&F engine assigns its own GUID to the message; pin the message id to
// the NotificationId so the buffer can be queried by it (Notify.Status) and
// the forwarder's idempotency key matches the buffered row.
var occurredAtUtc = DateTime.UtcNow;
await _storeAndForward.EnqueueAsync(
StoreAndForwardCategory.Notification,
target: _listName,
payloadJson: payloadJson,
originInstanceName: _instanceName,
messageId: notificationId);
_logger.LogDebug(
"Notify enqueued notification {NotificationId} to list '{List}' for central delivery",
notificationId, _listName);
// Audit Log #23 (M4 Bundle C): emit one Notification/NotifySend
// (Submitted) row per accepted submission. The emission is wired
// AFTER the EnqueueAsync returns so we only audit submissions the
// S&F engine accepted — a failed enqueue throws, never produces an
// audit row (mirrors ESG: audit fires after the boundary call
// returned a result, never speculatively). Best-effort per alog.md
// §7 — the audit write is wrapped in try/catch and any failure is
// logged + swallowed so the script's Send call still returns the
// NotificationId.
EmitNotifySendAudit(notificationId, subject, message, occurredAtUtc);
return notificationId;
}
/// <summary>
/// Best-effort emission of one <c>Notification</c>/<c>NotifySend</c>
/// (Status <c>Submitted</c>) audit row. Any exception thrown by the
/// writer is logged and swallowed — audit-write failures must never
/// abort the user-facing <c>Notify.Send</c> call (alog.md §7).
/// </summary>
private void EmitNotifySendAudit(
string notificationId,
string subject,
string body,
DateTime occurredAtUtc)
{
if (_auditWriter == null)
{
return;
}
AuditEvent evt;
try
{
// CorrelationId is the NotificationId parsed as a Guid. Notify
// mints the id via Guid.NewGuid().ToString("N") so the parse
// is expected to succeed; on the off-chance the format
// changes / a caller injects an unparseable value, leave it
// null per Bundle B's pattern rather than fail the emission.
Guid? correlationId = Guid.TryParse(notificationId, out var parsed) ? parsed : (Guid?)null;
// Capture the request summary — {"subject": "...", "body": "..."}.
// Payload cap and per-target body redaction are applied at write
// time by the audit redactor (AuditLogOptions / PerTargetRedactionOverride).
var requestSummary = JsonSerializer.Serialize(new
{
subject = subject,
body = body,
});
evt = ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.Notification,
kind: AuditKind.NotifySend,
status: AuditStatus.Submitted,
occurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
// Outbound channel: per the Audit Log Actor-column spec the
// actor is the calling script. Null when no single script
// owns the call (e.g. a shared script running inline).
actor: _sourceScript,
target: _listName,
// CorrelationId is the NotificationId-derived per-operation
// lifecycle id; ExecutionId carries the per-execution id.
correlationId: correlationId,
executionId: _executionId,
// Audit Log #23 (ParentExecutionId): the spawning
// execution's id; null for non-routed runs.
parentExecutionId: _parentExecutionId,
sourceSiteId: string.IsNullOrEmpty(_siteId) ? null : _siteId,
sourceInstanceId: _instanceName,
sourceScript: _sourceScript,
httpStatus: null,
// Send is fire-and-forget from the script's perspective —
// the dispatcher (NotificationOutboxActor) times each
// delivery attempt and stamps DurationMs on its
// NotifyDeliver(Attempted) rows.
durationMs: null,
errorMessage: null,
errorDetail: null,
requestSummary: requestSummary,
responseSummary: null,
payloadTruncated: false,
extra: null);
}
catch (Exception buildEx)
{
// Defensive: building the event itself must never propagate.
_logger.LogWarning(buildEx,
"Failed to build Audit Log #23 NotifySend event for NotificationId {NotificationId} list '{List}' — skipping emission",
notificationId, _listName);
return;
}
try
{
// Fire-and-forget (mirrors ExternalSystemHelper.EmitCallAudit)
// so the script is never blocked on the audit writer; we observe
// failures via ContinueWith so a thrown writer is logged rather
// than going to the unobserved-task firehose.
var writeTask = _auditWriter.WriteAsync(evt, CancellationToken.None);
if (!writeTask.IsCompleted)
{
writeTask.ContinueWith(
t => _logger.LogWarning(t.Exception,
"Audit Log #23 write failed for EventId {EventId} (NotifySend NotificationId {NotificationId})",
evt.EventId, notificationId),
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
else if (writeTask.IsFaulted)
{
_logger.LogWarning(writeTask.Exception,
"Audit Log #23 write failed for EventId {EventId} (NotifySend NotificationId {NotificationId})",
evt.EventId, notificationId);
}
}
catch (Exception writeEx)
{
// Synchronous throw from WriteAsync (e.g. ArgumentNullException
// before the writer's own try/catch). Swallow + log per alog.md §7.
_logger.LogWarning(writeEx,
"Audit Log #23 write threw synchronously for EventId {EventId} (NotifySend NotificationId {NotificationId})",
evt.EventId, notificationId);
}
}
}
/// <summary>
/// Audit Log #23 (M3): script-side accessor for cached-operation tracking.
/// <c>Tracking.Status(trackedOperationId)</c> reads the site-local SQLite
/// row directly via <see cref="IOperationTrackingStore.GetStatusAsync"/> —
/// the site is the single source of truth for cached-call status, so no
/// central round-trip is needed and the call is answered authoritatively.
/// </summary>
public class TrackingHelper
{
private readonly IOperationTrackingStore? _store;
private readonly ILogger _logger;
/// <summary>
/// Initializes a new operation tracking helper.
/// </summary>
/// <param name="store">Optional store for tracking cached operation status.</param>
/// <param name="logger">Logger for diagnostics and warnings.</param>
internal TrackingHelper(IOperationTrackingStore? store, ILogger logger)
{
_store = store;
_logger = logger;
}
/// <summary>
/// Returns the latest tracking snapshot for the supplied id, or
/// <c>null</c> when the id is unknown (never recorded, or purged after
/// the retention window).
/// </summary>
/// <param name="trackedOperationId">The tracked operation identifier to query.</param>
/// <param name="cancellationToken">Cancellation token for the async operation.</param>
/// <returns>The tracking status snapshot, or null if the operation is not found.</returns>
/// <exception cref="InvalidOperationException">
/// Thrown when the script runtime was constructed without an
/// <see cref="IOperationTrackingStore"/> — mirrors the
/// "service-not-wired" failure mode of the other integration helpers.
/// </exception>
public Task<TrackingStatusSnapshot?> Status(
TrackedOperationId trackedOperationId,
CancellationToken cancellationToken = default)
{
if (_store == null)
{
throw new InvalidOperationException(
"Operation tracking store not available");
}
return _store.GetStatusAsync(trackedOperationId, cancellationToken);
}
}
}