Audit Log #23 M4 Bundle C — Task C1: every script-initiated Notify.To(list).Send(...) now emits exactly one Notification/NotifySend audit row via the IAuditWriter wired through ScriptRuntimeContext. The row carries Status=Submitted, Target=list name, RequestSummary={subject,body} JSON (M5 will redact), CorrelationId=NotificationId (parsed as Guid), provenance from context, ForwardState=Pending. Emission is best-effort per alog.md §7: a thrown audit writer is logged and swallowed inside the helper; the original NotificationId still flows back to the script and the underlying S&F enqueue still happened. Mirrors the M2 Bundle F ExternalSystem.Call wrapper pattern. Tests: 7 new tests in NotifySendAuditEmissionTests covering submitted- status, list-name target, request-summary JSON shape, writer-throws fail-safe, provenance, NotificationId/CorrelationId round-trip, and the null-writer degrade path.
1462 lines
63 KiB
C#
1462 lines
63 KiB
C#
using System.Diagnostics;
|
|
using System.Text.Json;
|
|
using System.Text.RegularExpressions;
|
|
using Akka.Actor;
|
|
using Microsoft.Extensions.Logging;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Messages.Instance;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Messages.Notification;
|
|
using ScadaLink.Commons.Messages.ScriptExecution;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.StoreAndForward;
|
|
|
|
namespace ScadaLink.SiteRuntime.Scripts;
|
|
|
|
/// <summary>
|
|
/// WP-18: Script Runtime API — injected into Script/Alarm Execution Actors.
|
|
/// Provides the API surface that user scripts interact with:
|
|
/// Instance.GetAttribute("name")
|
|
/// Instance.SetAttribute("name", value)
|
|
/// Instance.CallScript("scriptName", params)
|
|
/// Scripts.CallShared("scriptName", params)
|
|
///
|
|
/// WP-13 (Phase 7): Integration surface APIs:
|
|
/// ExternalSystem.Call("systemName", "methodName", params)
|
|
/// ExternalSystem.CachedCall("systemName", "methodName", params)
|
|
/// Database.Connection("name")
|
|
/// Database.CachedWrite("name", "sql", params)
|
|
/// Notify.To("listName").Send("subject", "message")
|
|
///
|
|
/// WP-20: Recursion Limit — call depth tracked and enforced.
|
|
/// </summary>
|
|
public class ScriptRuntimeContext
|
|
{
|
|
private readonly IActorRef _instanceActor;
|
|
private readonly IActorRef _self;
|
|
private readonly SharedScriptLibrary _sharedScriptLibrary;
|
|
private readonly int _currentCallDepth;
|
|
private readonly int _maxCallDepth;
|
|
private readonly TimeSpan _askTimeout;
|
|
private readonly ILogger _logger;
|
|
private readonly string _instanceName;
|
|
|
|
/// <summary>
|
|
/// WP-13: External system client for ExternalSystem.Call/CachedCall.
|
|
/// </summary>
|
|
private readonly IExternalSystemClient? _externalSystemClient;
|
|
|
|
/// <summary>
|
|
/// WP-13: Database gateway for Database.Connection/CachedWrite.
|
|
/// </summary>
|
|
private readonly IDatabaseGateway? _databaseGateway;
|
|
|
|
/// <summary>
|
|
/// Notification Outbox: the site Store-and-Forward Engine that <c>Notify.Send</c>
|
|
/// enqueues notifications into. The S&F engine forwards them to central.
|
|
/// </summary>
|
|
private readonly StoreAndForwardService? _storeAndForward;
|
|
|
|
/// <summary>
|
|
/// Notification Outbox: the site communication actor that <c>Notify.Status</c>
|
|
/// queries central through (via the ClusterClient command/control transport).
|
|
/// </summary>
|
|
private readonly ICanTell? _siteCommunicationActor;
|
|
|
|
/// <summary>
|
|
/// Notification Outbox: this site's identifier, stamped on enqueued notifications.
|
|
/// </summary>
|
|
private readonly string _siteId;
|
|
|
|
/// <summary>
|
|
/// Notification Outbox (FU3): identifier of the script currently executing in this
|
|
/// context — stamped onto <c>NotificationSubmit.SourceScript</c> for the central
|
|
/// audit trail. Uses the Site Event Logging "Source" convention
|
|
/// (<c>"ScriptActor:<scriptName>"</c>). Null when no single script owns the
|
|
/// context (e.g. alarm on-trigger paths that do not wire the Notify outbox).
|
|
/// </summary>
|
|
private readonly string? _sourceScript;
|
|
|
|
/// <summary>
|
|
/// Audit Log #23: best-effort emitter for boundary-crossing actions executed
|
|
/// by the script. Optional — when null the helpers degrade to a no-op audit
|
|
/// path so tests / contexts that do not need the audit pipeline still work.
|
|
/// </summary>
|
|
private readonly IAuditWriter? _auditWriter;
|
|
|
|
/// <summary>
|
|
/// Audit Log #23 (M3): site-local tracking store consulted by
|
|
/// <c>Tracking.Status(TrackedOperationId)</c>. Optional — when null the
|
|
/// helper throws on access, mirroring the existing
|
|
/// "service-not-wired" behaviour of the other integration helpers.
|
|
/// </summary>
|
|
private readonly IOperationTrackingStore? _operationTrackingStore;
|
|
|
|
/// <summary>
|
|
/// Audit Log #23 (M3 Bundle E — Task E3): site-side dual emitter for
|
|
/// cached-call lifecycle telemetry. Optional — when null
|
|
/// <c>ExternalSystem.CachedCall</c> / <c>Database.CachedWrite</c> still
|
|
/// return a <see cref="TrackedOperationId"/> and invoke the underlying
|
|
/// store-and-forward path, but no audit / SiteCalls telemetry is emitted
|
|
/// (tests / minimal hosts that don't wire the audit pipeline).
|
|
/// </summary>
|
|
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
|
|
|
|
public ScriptRuntimeContext(
|
|
IActorRef instanceActor,
|
|
IActorRef self,
|
|
SharedScriptLibrary sharedScriptLibrary,
|
|
int currentCallDepth,
|
|
int maxCallDepth,
|
|
TimeSpan askTimeout,
|
|
string instanceName,
|
|
ILogger logger,
|
|
IExternalSystemClient? externalSystemClient = null,
|
|
IDatabaseGateway? databaseGateway = null,
|
|
StoreAndForwardService? storeAndForward = null,
|
|
ICanTell? siteCommunicationActor = null,
|
|
string siteId = "",
|
|
string? sourceScript = null,
|
|
IAuditWriter? auditWriter = null,
|
|
IOperationTrackingStore? operationTrackingStore = null,
|
|
ICachedCallTelemetryForwarder? cachedForwarder = null)
|
|
{
|
|
_instanceActor = instanceActor;
|
|
_self = self;
|
|
_sharedScriptLibrary = sharedScriptLibrary;
|
|
_currentCallDepth = currentCallDepth;
|
|
_maxCallDepth = maxCallDepth;
|
|
_askTimeout = askTimeout;
|
|
_instanceName = instanceName;
|
|
_logger = logger;
|
|
_externalSystemClient = externalSystemClient;
|
|
_databaseGateway = databaseGateway;
|
|
_storeAndForward = storeAndForward;
|
|
_siteCommunicationActor = siteCommunicationActor;
|
|
_siteId = siteId;
|
|
_sourceScript = sourceScript;
|
|
_auditWriter = auditWriter;
|
|
_operationTrackingStore = operationTrackingStore;
|
|
_cachedForwarder = cachedForwarder;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the current value of an attribute from the Instance Actor.
|
|
/// Uses Ask pattern (system boundary between script execution and instance state).
|
|
/// </summary>
|
|
public async Task<object?> GetAttribute(string attributeName)
|
|
{
|
|
var correlationId = Guid.NewGuid().ToString();
|
|
var request = new GetAttributeRequest(
|
|
correlationId, _instanceName, attributeName, DateTimeOffset.UtcNow);
|
|
|
|
var response = await _instanceActor.Ask<GetAttributeResponse>(request, _askTimeout);
|
|
|
|
if (!response.Found)
|
|
{
|
|
_logger.LogWarning(
|
|
"GetAttribute: attribute '{Attribute}' not found on instance '{Instance}'",
|
|
attributeName, _instanceName);
|
|
}
|
|
|
|
return response.Value;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Sets an attribute value. For data-connected attributes the Instance Actor
|
|
/// forwards the write to the DCL, which writes the physical device; the
|
|
/// in-memory value is not optimistically updated. For static attributes the
|
|
/// Instance Actor updates the in-memory value and persists the override to
|
|
/// SQLite. All mutations are serialized through the Instance Actor mailbox.
|
|
///
|
|
/// The write is awaited so that a device-write failure on a data-connected
|
|
/// attribute is surfaced synchronously to the calling script as an
|
|
/// <see cref="InvalidOperationException"/>.
|
|
/// </summary>
|
|
public async Task SetAttribute(string attributeName, string value)
|
|
{
|
|
var correlationId = Guid.NewGuid().ToString();
|
|
var command = new SetStaticAttributeCommand(
|
|
correlationId, _instanceName, attributeName, value, DateTimeOffset.UtcNow);
|
|
|
|
// Ask — mutation serialized through the Instance Actor mailbox; the reply
|
|
// carries the device-write outcome for data-connected attributes.
|
|
var response = await _instanceActor.Ask<SetStaticAttributeResponse>(command, _askTimeout);
|
|
|
|
if (!response.Success)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"SetAttribute('{attributeName}') failed: {response.ErrorMessage}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Calls a sibling script on the same instance by name (Ask pattern).
|
|
/// WP-20: Enforces recursion limit.
|
|
/// WP-22: Uses Ask pattern for CallScript.
|
|
/// <paramref name="parameters"/> may be a dictionary or an anonymous object
|
|
/// (<c>new { name = "Bob" }</c>) — see <see cref="ScriptArgs"/>.
|
|
/// </summary>
|
|
public async Task<object?> CallScript(string scriptName, object? parameters = null)
|
|
{
|
|
var nextDepth = _currentCallDepth + 1;
|
|
if (nextDepth > _maxCallDepth)
|
|
{
|
|
var msg = $"Script call depth exceeded maximum of {_maxCallDepth}. " +
|
|
$"CallScript('{scriptName}') rejected at depth {nextDepth}.";
|
|
_logger.LogError(msg);
|
|
throw new InvalidOperationException(msg);
|
|
}
|
|
|
|
var correlationId = Guid.NewGuid().ToString();
|
|
var request = new ScriptCallRequest(
|
|
scriptName,
|
|
ScriptArgs.Normalize(parameters),
|
|
nextDepth,
|
|
correlationId);
|
|
|
|
// Ask the Instance Actor, which routes to the appropriate Script Actor
|
|
var result = await _instanceActor.Ask<ScriptCallResult>(request, _askTimeout);
|
|
|
|
if (!result.Success)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"CallScript('{scriptName}') failed: {result.ErrorMessage}");
|
|
}
|
|
|
|
return result.ReturnValue;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Provides access to shared script execution via the Scripts property.
|
|
/// </summary>
|
|
public ScriptCallHelper Scripts => new(_sharedScriptLibrary, this, _currentCallDepth, _maxCallDepth, _logger);
|
|
|
|
/// <summary>
|
|
/// WP-13: Provides access to external system calls.
|
|
/// ExternalSystem.Call("systemName", "methodName", params)
|
|
/// ExternalSystem.CachedCall("systemName", "methodName", params)
|
|
/// </summary>
|
|
public ExternalSystemHelper ExternalSystem => new(
|
|
_externalSystemClient, _instanceName, _logger, _auditWriter, _siteId, _sourceScript,
|
|
// Audit Log #23 (M3 Bundle E — Task E3): emit CachedSubmit telemetry
|
|
// on every ExternalSystem.CachedCall enqueue.
|
|
_cachedForwarder);
|
|
|
|
/// <summary>
|
|
/// WP-13: Provides access to database operations.
|
|
/// Database.Connection("name")
|
|
/// Database.CachedWrite("name", "sql", params)
|
|
/// </summary>
|
|
public DatabaseHelper Database => new(
|
|
_databaseGateway,
|
|
_instanceName,
|
|
_logger,
|
|
// Audit Log #23 (M4 Bundle A): wire the IAuditWriter so
|
|
// Database.Connection(name) returns an auditing decorator that
|
|
// emits one DbOutbound/DbWrite row per script-initiated
|
|
// Execute / ExecuteScalar / ExecuteReader.
|
|
_auditWriter,
|
|
_siteId,
|
|
_sourceScript,
|
|
// Audit Log #23 (M3 Bundle E — Task E6): emit CachedSubmit telemetry on
|
|
// every Database.CachedWrite enqueue.
|
|
_cachedForwarder);
|
|
|
|
/// <summary>
|
|
/// Provides access to the Notification Outbox API.
|
|
/// <c>Notify.To("listName").Send("subject", "message")</c> enqueues a notification
|
|
/// for central delivery and returns its <c>NotificationId</c>;
|
|
/// <c>Notify.Status(id)</c> queries the delivery status of that notification.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Audit Log #23 (M4 Bundle C): the <see cref="IAuditWriter"/> is threaded
|
|
/// through so <c>Notify.To(list).Send(...)</c> emits one
|
|
/// <c>Notification</c>/<c>NotifySend</c> audit row per accepted submission.
|
|
/// Best-effort per alog.md §7 — a thrown writer never aborts the script's
|
|
/// <c>Send</c>.
|
|
/// </remarks>
|
|
public NotifyHelper Notify => new(
|
|
_storeAndForward, _siteCommunicationActor, _siteId, _instanceName, _sourceScript, _askTimeout, _logger,
|
|
_auditWriter);
|
|
|
|
/// <summary>
|
|
/// Audit Log #23 (M3): site-local tracking-status API for cached operations.
|
|
/// <c>Tracking.Status(trackedOperationId)</c> reads the site SQLite tracking row
|
|
/// directly (authoritative source of truth — no central round-trip) and
|
|
/// returns a <see cref="TrackingStatusSnapshot"/>, or <c>null</c> when the
|
|
/// id is unknown / has already been purged.
|
|
/// </summary>
|
|
public TrackingHelper Tracking => new(_operationTrackingStore, _logger);
|
|
|
|
/// <summary>
|
|
/// Helper class for Scripts.CallShared() syntax.
|
|
/// </summary>
|
|
public class ScriptCallHelper
|
|
{
|
|
private readonly SharedScriptLibrary _library;
|
|
private readonly ScriptRuntimeContext _context;
|
|
private readonly int _currentCallDepth;
|
|
private readonly int _maxCallDepth;
|
|
private readonly ILogger _logger;
|
|
|
|
internal ScriptCallHelper(
|
|
SharedScriptLibrary library,
|
|
ScriptRuntimeContext context,
|
|
int currentCallDepth,
|
|
int maxCallDepth,
|
|
ILogger logger)
|
|
{
|
|
_library = library;
|
|
_context = context;
|
|
_currentCallDepth = currentCallDepth;
|
|
_maxCallDepth = maxCallDepth;
|
|
_logger = logger;
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-17: Executes a shared script inline (direct method call, not actor message).
|
|
/// WP-20: Enforces recursion limit.
|
|
/// <paramref name="parameters"/> may be a dictionary or an anonymous
|
|
/// object (<c>new { name = "Bob" }</c>) — see <see cref="ScriptArgs"/>.
|
|
/// </summary>
|
|
public async Task<object?> CallShared(
|
|
string scriptName,
|
|
object? parameters = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var nextDepth = _currentCallDepth + 1;
|
|
if (nextDepth > _maxCallDepth)
|
|
{
|
|
var msg = $"Script call depth exceeded maximum of {_maxCallDepth}. " +
|
|
$"CallShared('{scriptName}') rejected at depth {nextDepth}.";
|
|
_logger.LogError(msg);
|
|
throw new InvalidOperationException(msg);
|
|
}
|
|
|
|
return await _library.ExecuteAsync(
|
|
scriptName, _context, ScriptArgs.Normalize(parameters), cancellationToken);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-13: Helper for ExternalSystem.Call/CachedCall syntax.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Audit Log #23 (M2 Bundle F): every <see cref="Call"/> invocation emits
|
|
/// one <c>ApiOutbound</c>/<c>ApiCall</c> audit row via <see cref="IAuditWriter"/>.
|
|
/// The audit emission is wrapped in a try/catch that swallows every exception
|
|
/// — the audit pipeline is best-effort and must NEVER abort the script's
|
|
/// outbound call (alog.md §7). The original <see cref="ExternalCallResult"/>
|
|
/// (or the original thrown exception) flows back to the caller unchanged.
|
|
/// </remarks>
|
|
public class ExternalSystemHelper
|
|
{
|
|
private static readonly Regex HttpStatusRegex = new(
|
|
@"HTTP\s+(?<code>\d{3})",
|
|
RegexOptions.Compiled | RegexOptions.CultureInvariant);
|
|
|
|
private readonly IExternalSystemClient? _client;
|
|
private readonly string _instanceName;
|
|
private readonly ILogger _logger;
|
|
private readonly IAuditWriter? _auditWriter;
|
|
private readonly string _siteId;
|
|
private readonly string? _sourceScript;
|
|
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
|
|
|
|
// Internal constructor for tests living in ScadaLink.SiteRuntime.Tests
|
|
// (via InternalsVisibleTo). Production sites resolve the helper through
|
|
// ScriptRuntimeContext.ExternalSystem.
|
|
internal ExternalSystemHelper(
|
|
IExternalSystemClient? client,
|
|
string instanceName,
|
|
ILogger logger,
|
|
IAuditWriter? auditWriter = null,
|
|
string siteId = "",
|
|
string? sourceScript = null,
|
|
ICachedCallTelemetryForwarder? cachedForwarder = null)
|
|
{
|
|
_client = client;
|
|
_instanceName = instanceName;
|
|
_logger = logger;
|
|
_auditWriter = auditWriter;
|
|
_siteId = siteId;
|
|
_sourceScript = sourceScript;
|
|
_cachedForwarder = cachedForwarder;
|
|
}
|
|
|
|
public async Task<ExternalCallResult> Call(
|
|
string systemName,
|
|
string methodName,
|
|
IReadOnlyDictionary<string, object?>? parameters = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (_client == null)
|
|
throw new InvalidOperationException("External system client not available");
|
|
|
|
// Audit Log #23 (M2 Bundle F): wrap the outbound call so every
|
|
// attempt emits exactly one ApiOutbound/ApiCall row. The wrapper
|
|
// mirrors the existing call-site behaviour — the original result
|
|
// OR original exception flows back to the script untouched; the
|
|
// audit emission is best-effort.
|
|
var occurredAtUtc = DateTime.UtcNow;
|
|
var startTicks = Stopwatch.GetTimestamp();
|
|
ExternalCallResult? result = null;
|
|
Exception? thrown = null;
|
|
try
|
|
{
|
|
result = await _client.CallAsync(systemName, methodName, parameters, cancellationToken);
|
|
return result;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
thrown = ex;
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
var elapsedMs = (int)((Stopwatch.GetTimestamp() - startTicks)
|
|
* 1000d / Stopwatch.Frequency);
|
|
EmitCallAudit(systemName, methodName, occurredAtUtc, elapsedMs, result, thrown);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Submit a cached outbound API call (Audit Log #23 / M3). Mints a
|
|
/// fresh <see cref="TrackedOperationId"/>, emits the lifecycle's first
|
|
/// <c>CachedSubmit</c> telemetry packet, hands the call to the
|
|
/// store-and-forward retry loop (which emits per-attempt and terminal
|
|
/// telemetry under the same id — Bundle E Tasks E4/E5), and returns
|
|
/// the id immediately so the script can later query
|
|
/// <c>Tracking.Status(id)</c>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <b>Best-effort emission (alog.md §7):</b> if the forwarder throws,
|
|
/// the failure is logged and swallowed; the underlying cached-call
|
|
/// path still runs and the id is still returned. The script must never
|
|
/// be aborted by an audit-pipeline failure.
|
|
/// </remarks>
|
|
public async Task<TrackedOperationId> CachedCall(
|
|
string systemName,
|
|
string methodName,
|
|
IReadOnlyDictionary<string, object?>? parameters = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (_client == null)
|
|
throw new InvalidOperationException("External system client not available");
|
|
|
|
var trackedId = TrackedOperationId.New();
|
|
var occurredAtUtc = DateTime.UtcNow;
|
|
var target = $"{systemName}.{methodName}";
|
|
|
|
// Emit CachedSubmit telemetry BEFORE handing off to the S&F
|
|
// engine — that way the SiteCalls row is materialised before the
|
|
// first delivery attempt and Tracking.Status(id) can observe a
|
|
// Submitted row even if the immediate-delivery attempt happens to
|
|
// resolve before this method returns.
|
|
await EmitCachedSubmitTelemetryAsync(
|
|
systemName, methodName, target, trackedId, occurredAtUtc, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
// Hand off to the existing cached-call path. The TrackedOperationId
|
|
// becomes the S&F message id so the retry loop (Bundle E Tasks
|
|
// E4/E5) can read it back via StoreAndForwardMessage.Id.
|
|
//
|
|
// M3 Bundle F (F2): the result is now retained because the
|
|
// immediate-success path (WasBuffered=false) bypasses S&F entirely
|
|
// — no retry loop, no ICachedCallLifecycleObserver fire. The
|
|
// helper must emit the Attempted + CachedResolve terminal rows
|
|
// itself, otherwise Tracking.Status(id) would stay in Submitted
|
|
// forever and the audit log would be missing the M3 lifecycle.
|
|
// The WasBuffered=true path is unaffected — the S&F retry loop
|
|
// owns the Attempted + Resolve emissions in that case.
|
|
ExternalCallResult? result;
|
|
try
|
|
{
|
|
result = await _client.CachedCallAsync(
|
|
systemName,
|
|
methodName,
|
|
parameters,
|
|
_instanceName,
|
|
cancellationToken,
|
|
trackedId).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// The cached-call surface returns ExternalCallResult on permanent
|
|
// failure rather than throwing; a throw here is exceptional
|
|
// (e.g. cancellation, resolver outage). Log it and rethrow — the
|
|
// script does need to learn about catastrophic failures. The
|
|
// tracked id was still returned via the telemetry submit above.
|
|
_logger.LogWarning(ex,
|
|
"ExternalSystem.CachedCall threw for {System}.{Method} (TrackedOperationId {Id})",
|
|
systemName, methodName, trackedId);
|
|
throw;
|
|
}
|
|
|
|
// M3 Bundle F (F2): immediate-completion lifecycle — emit the
|
|
// missing Attempted + CachedResolve rows when the underlying call
|
|
// resolved without engaging the store-and-forward retry loop.
|
|
if (result is { WasBuffered: false })
|
|
{
|
|
await EmitImmediateTerminalTelemetryAsync(
|
|
systemName, methodName, target, trackedId, result, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
return trackedId;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Best-effort emission of the CachedSubmit lifecycle event. Any
|
|
/// exception thrown by the forwarder is logged and swallowed so the
|
|
/// calling script's enqueue is not disturbed.
|
|
/// </summary>
|
|
private async Task EmitCachedSubmitTelemetryAsync(
|
|
string systemName,
|
|
string methodName,
|
|
string target,
|
|
TrackedOperationId trackedId,
|
|
DateTime occurredAtUtc,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
if (_cachedForwarder == null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
CachedCallTelemetry telemetry;
|
|
try
|
|
{
|
|
telemetry = new CachedCallTelemetry(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.CachedSubmit,
|
|
CorrelationId = trackedId.Value,
|
|
SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId,
|
|
SourceInstanceId = _instanceName,
|
|
SourceScript = _sourceScript,
|
|
Target = target,
|
|
Status = AuditStatus.Submitted,
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: trackedId,
|
|
Channel: "ApiOutbound",
|
|
Target: target,
|
|
SourceSite: _siteId,
|
|
Status: "Submitted",
|
|
RetryCount: 0,
|
|
LastError: null,
|
|
HttpStatus: null,
|
|
CreatedAtUtc: occurredAtUtc,
|
|
UpdatedAtUtc: occurredAtUtc,
|
|
TerminalAtUtc: null));
|
|
}
|
|
catch (Exception buildEx)
|
|
{
|
|
_logger.LogWarning(buildEx,
|
|
"Failed to build CachedSubmit telemetry for {System}.{Method} (TrackedOperationId {Id}) — skipping emission",
|
|
systemName, methodName, trackedId);
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _cachedForwarder.ForwardAsync(telemetry, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"CachedSubmit telemetry forward failed for {System}.{Method} (TrackedOperationId {Id})",
|
|
systemName, methodName, trackedId);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// M3 Bundle F (F2): emit the Attempted + CachedResolve lifecycle
|
|
/// rows for an immediate-completion <c>CachedCall</c> (WasBuffered=false).
|
|
/// The S&F retry loop never engaged, so the
|
|
/// <c>ICachedCallLifecycleObserver</c> never fires — the helper must
|
|
/// produce both rows itself to keep the M3 audit contract whole
|
|
/// (Submit → Attempted → Resolve under one TrackedOperationId).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Best-effort emission: a throwing forwarder is logged and swallowed
|
|
/// per alog.md §7. The two rows are emitted INDEPENDENTLY so a single
|
|
/// forwarder fault doesn't drop both halves of the terminal pair.
|
|
/// </remarks>
|
|
private async Task EmitImmediateTerminalTelemetryAsync(
|
|
string systemName,
|
|
string methodName,
|
|
string target,
|
|
TrackedOperationId trackedId,
|
|
ExternalCallResult result,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
if (_cachedForwarder == null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var occurredAtUtc = DateTime.UtcNow;
|
|
// Extract an HTTP status from the error message when present
|
|
// (mirrors EmitCallAudit's existing HttpStatusRegex behaviour so
|
|
// the immediate-failure row carries the same HttpStatus value the
|
|
// synchronous Call() audit row would have stamped).
|
|
int? httpStatus = null;
|
|
if (!result.Success && !string.IsNullOrEmpty(result.ErrorMessage))
|
|
{
|
|
var match = HttpStatusRegex.Match(result.ErrorMessage);
|
|
if (match.Success && int.TryParse(match.Groups["code"].Value, out var code))
|
|
{
|
|
httpStatus = code;
|
|
}
|
|
}
|
|
|
|
// Status mapping for immediate completion:
|
|
// Success=true -> Delivered (audit) / "Delivered" (operational)
|
|
// Success=false -> Failed (audit) / "Failed" (operational)
|
|
// Permanent vs transient is not relevant here: a permanent failure
|
|
// returns Success=false WasBuffered=false (parked-equivalent); a
|
|
// transient failure with NO S&F engine wired likewise lands here
|
|
// with Success=false. Either way the terminal state is "the
|
|
// immediate attempt failed and the operation is done".
|
|
var auditTerminalStatus = result.Success
|
|
? AuditStatus.Delivered
|
|
: AuditStatus.Failed;
|
|
var operationalTerminalStatus = result.Success ? "Delivered" : "Failed";
|
|
|
|
// --- Attempted row -------------------------------------------------
|
|
CachedCallTelemetry attempted;
|
|
try
|
|
{
|
|
attempted = new CachedCallTelemetry(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.ApiCallCached,
|
|
CorrelationId = trackedId.Value,
|
|
SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId,
|
|
SourceInstanceId = _instanceName,
|
|
SourceScript = _sourceScript,
|
|
Target = target,
|
|
Status = AuditStatus.Attempted,
|
|
HttpStatus = httpStatus,
|
|
ErrorMessage = result.Success ? null : result.ErrorMessage,
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: trackedId,
|
|
Channel: "ApiOutbound",
|
|
Target: target,
|
|
SourceSite: _siteId,
|
|
Status: "Attempted",
|
|
// RetryCount stays 0 — the operation never reached the
|
|
// S&F retry sweep, so no retries were performed.
|
|
RetryCount: 0,
|
|
LastError: result.Success ? null : result.ErrorMessage,
|
|
HttpStatus: httpStatus,
|
|
CreatedAtUtc: occurredAtUtc,
|
|
UpdatedAtUtc: occurredAtUtc,
|
|
TerminalAtUtc: null));
|
|
}
|
|
catch (Exception buildEx)
|
|
{
|
|
_logger.LogWarning(buildEx,
|
|
"Failed to build immediate-Attempted telemetry for {System}.{Method} (TrackedOperationId {Id}) — skipping emission",
|
|
systemName, methodName, trackedId);
|
|
attempted = null!;
|
|
}
|
|
|
|
if (attempted is not null)
|
|
{
|
|
try
|
|
{
|
|
await _cachedForwarder.ForwardAsync(attempted, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Immediate-Attempted telemetry forward failed for {System}.{Method} (TrackedOperationId {Id})",
|
|
systemName, methodName, trackedId);
|
|
}
|
|
}
|
|
|
|
// --- CachedResolve row --------------------------------------------
|
|
CachedCallTelemetry resolve;
|
|
try
|
|
{
|
|
resolve = new CachedCallTelemetry(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.CachedResolve,
|
|
CorrelationId = trackedId.Value,
|
|
SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId,
|
|
SourceInstanceId = _instanceName,
|
|
SourceScript = _sourceScript,
|
|
Target = target,
|
|
Status = auditTerminalStatus,
|
|
HttpStatus = httpStatus,
|
|
ErrorMessage = result.Success ? null : result.ErrorMessage,
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: trackedId,
|
|
Channel: "ApiOutbound",
|
|
Target: target,
|
|
SourceSite: _siteId,
|
|
Status: operationalTerminalStatus,
|
|
RetryCount: 0,
|
|
LastError: result.Success ? null : result.ErrorMessage,
|
|
HttpStatus: httpStatus,
|
|
CreatedAtUtc: occurredAtUtc,
|
|
UpdatedAtUtc: occurredAtUtc,
|
|
// Immediate-completion terminal — mark TerminalAtUtc so
|
|
// SiteCallAudit can move the row to its purge eligible
|
|
// set.
|
|
TerminalAtUtc: occurredAtUtc));
|
|
}
|
|
catch (Exception buildEx)
|
|
{
|
|
_logger.LogWarning(buildEx,
|
|
"Failed to build immediate-CachedResolve telemetry for {System}.{Method} (TrackedOperationId {Id}) — skipping emission",
|
|
systemName, methodName, trackedId);
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _cachedForwarder.ForwardAsync(resolve, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Immediate-CachedResolve telemetry forward failed for {System}.{Method} (TrackedOperationId {Id})",
|
|
systemName, methodName, trackedId);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Best-effort emission of one <c>ApiOutbound</c>/<c>ApiCall</c> audit
|
|
/// row. Any exception thrown by the writer is logged and swallowed —
|
|
/// audit-write failures must never abort the user-facing action.
|
|
/// </summary>
|
|
private void EmitCallAudit(
|
|
string systemName,
|
|
string methodName,
|
|
DateTime occurredAtUtc,
|
|
int durationMs,
|
|
ExternalCallResult? result,
|
|
Exception? thrown)
|
|
{
|
|
if (_auditWriter == null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
AuditEvent evt;
|
|
try
|
|
{
|
|
evt = BuildCallAuditEvent(systemName, methodName, occurredAtUtc, durationMs, result, thrown);
|
|
}
|
|
catch (Exception buildEx)
|
|
{
|
|
// Building the event itself must never propagate. This is a
|
|
// defensive guard — populating a record from already-validated
|
|
// values shouldn't throw, but we honour the alog.md §7
|
|
// best-effort contract regardless.
|
|
_logger.LogWarning(buildEx,
|
|
"Failed to build Audit Log #23 event for {System}.{Method} — skipping emission",
|
|
systemName, methodName);
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
// Fire-and-forget so we never block the script on the audit
|
|
// writer; the writer itself is responsible for fast, durable
|
|
// enqueue (site SQLite hot-path). We DO observe failures via
|
|
// ContinueWith so a thrown writer is logged rather than going
|
|
// to the unobserved-task firehose.
|
|
var writeTask = _auditWriter.WriteAsync(evt, CancellationToken.None);
|
|
if (!writeTask.IsCompleted)
|
|
{
|
|
writeTask.ContinueWith(
|
|
t => _logger.LogWarning(t.Exception,
|
|
"Audit Log #23 write failed for EventId {EventId} ({System}.{Method})",
|
|
evt.EventId, systemName, methodName),
|
|
CancellationToken.None,
|
|
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
|
|
TaskScheduler.Default);
|
|
}
|
|
else if (writeTask.IsFaulted)
|
|
{
|
|
_logger.LogWarning(writeTask.Exception,
|
|
"Audit Log #23 write failed for EventId {EventId} ({System}.{Method})",
|
|
evt.EventId, systemName, methodName);
|
|
}
|
|
}
|
|
catch (Exception writeEx)
|
|
{
|
|
// Synchronous throw from WriteAsync (e.g. ArgumentNullException
|
|
// before the writer's own try/catch). Swallow + log per the
|
|
// alog.md §7 contract.
|
|
_logger.LogWarning(writeEx,
|
|
"Audit Log #23 write threw synchronously for EventId {EventId} ({System}.{Method})",
|
|
evt.EventId, systemName, methodName);
|
|
}
|
|
}
|
|
|
|
private AuditEvent BuildCallAuditEvent(
|
|
string systemName,
|
|
string methodName,
|
|
DateTime occurredAtUtc,
|
|
int durationMs,
|
|
ExternalCallResult? result,
|
|
Exception? thrown)
|
|
{
|
|
// Status: Delivered on a Success result; Failed otherwise (the
|
|
// ExternalSystemClient already maps HTTP non-2xx + transient
|
|
// exceptions into Success=false on the result, or surfaces a raw
|
|
// exception). M2 makes no distinction between transient + permanent
|
|
// failure here — both manifest as Status.Failed on the sync path.
|
|
var status = (thrown == null && result != null && result.Success)
|
|
? AuditStatus.Delivered
|
|
: AuditStatus.Failed;
|
|
|
|
string? errorMessage = null;
|
|
string? errorDetail = null;
|
|
int? httpStatus = null;
|
|
|
|
if (thrown != null)
|
|
{
|
|
errorMessage = thrown.Message;
|
|
errorDetail = thrown.ToString();
|
|
}
|
|
else if (result != null && !result.Success)
|
|
{
|
|
errorMessage = result.ErrorMessage;
|
|
// The ExternalSystemClient embeds the HTTP status code in the
|
|
// error message as "HTTP {code}". Parse it back out so the
|
|
// audit row carries the structured value.
|
|
if (!string.IsNullOrEmpty(result.ErrorMessage))
|
|
{
|
|
var match = HttpStatusRegex.Match(result.ErrorMessage);
|
|
if (match.Success
|
|
&& int.TryParse(match.Groups["code"].Value, out var parsed))
|
|
{
|
|
httpStatus = parsed;
|
|
}
|
|
}
|
|
}
|
|
|
|
return new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.ApiCall,
|
|
CorrelationId = null,
|
|
SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId,
|
|
SourceInstanceId = _instanceName,
|
|
SourceScript = _sourceScript,
|
|
Actor = null,
|
|
Target = $"{systemName}.{methodName}",
|
|
Status = status,
|
|
HttpStatus = httpStatus,
|
|
DurationMs = durationMs,
|
|
ErrorMessage = errorMessage,
|
|
ErrorDetail = errorDetail,
|
|
RequestSummary = null,
|
|
ResponseSummary = null,
|
|
PayloadTruncated = false,
|
|
Extra = null,
|
|
ForwardState = AuditForwardState.Pending,
|
|
};
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-13: Helper for Database.Connection/CachedWrite syntax.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Audit Log #23 (M3 Bundle E — Task E6): <see cref="CachedWrite"/> mirrors
|
|
/// <see cref="ExternalSystemHelper.CachedCall"/> — mints a
|
|
/// <see cref="TrackedOperationId"/>, emits the lifecycle's first
|
|
/// CachedSubmit packet (Channel <c>DbOutbound</c>), hands off to the S&F
|
|
/// retry loop, and returns the id. Per-attempt + terminal telemetry is
|
|
/// emitted by the retry loop (Tasks E4/E5).
|
|
/// </remarks>
|
|
public class DatabaseHelper
|
|
{
|
|
private readonly IDatabaseGateway? _gateway;
|
|
private readonly string _instanceName;
|
|
private readonly ILogger _logger;
|
|
private readonly string _siteId;
|
|
private readonly string? _sourceScript;
|
|
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
|
|
|
|
/// <summary>
|
|
/// Audit Log #23 (M4 Bundle A): best-effort emitter for synchronous
|
|
/// <c>Database.Connection</c>-routed Execute / ExecuteScalar /
|
|
/// ExecuteReader calls. When wired, <see cref="Connection"/> returns
|
|
/// an <see cref="AuditingDbConnection"/> that intercepts each command
|
|
/// execution and writes one <c>DbOutbound</c>/<c>DbWrite</c> audit
|
|
/// row. Optional — when null the helper falls back to the raw
|
|
/// inner <see cref="System.Data.Common.DbConnection"/> the gateway
|
|
/// returns (tests / minimal hosts that don't wire audit).
|
|
/// </summary>
|
|
private readonly IAuditWriter? _auditWriter;
|
|
|
|
internal DatabaseHelper(
|
|
IDatabaseGateway? gateway,
|
|
string instanceName,
|
|
ILogger logger,
|
|
IAuditWriter? auditWriter = null,
|
|
string siteId = "",
|
|
string? sourceScript = null,
|
|
ICachedCallTelemetryForwarder? cachedForwarder = null)
|
|
{
|
|
_gateway = gateway;
|
|
_instanceName = instanceName;
|
|
_logger = logger;
|
|
_auditWriter = auditWriter;
|
|
_siteId = siteId;
|
|
_sourceScript = sourceScript;
|
|
_cachedForwarder = cachedForwarder;
|
|
}
|
|
|
|
public async Task<System.Data.Common.DbConnection> Connection(
|
|
string name,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (_gateway == null)
|
|
throw new InvalidOperationException("Database gateway not available");
|
|
|
|
var inner = await _gateway.GetConnectionAsync(name, cancellationToken);
|
|
|
|
// Audit Log #23 (M4 Bundle A): wrap in an auditing decorator so
|
|
// every script-initiated Execute* / ExecuteReader on the returned
|
|
// connection emits one DbOutbound/DbWrite audit row. The wrapper
|
|
// delegates all other ADO.NET behaviour to the inner connection
|
|
// unchanged — including disposal, so the caller's existing
|
|
// dispose pattern (await using var conn = ...) still releases
|
|
// the underlying connection to the pool.
|
|
if (_auditWriter == null)
|
|
{
|
|
return inner;
|
|
}
|
|
|
|
return new AuditingDbConnection(
|
|
inner,
|
|
_auditWriter,
|
|
connectionName: name,
|
|
siteId: _siteId,
|
|
instanceName: _instanceName,
|
|
sourceScript: _sourceScript,
|
|
logger: _logger);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Submit a cached outbound database write. Mints a fresh
|
|
/// <see cref="TrackedOperationId"/>, emits CachedSubmit telemetry on
|
|
/// <c>DbOutbound</c>, hands off to the cached-write S&F path, and
|
|
/// returns the id. Best-effort emission per alog.md §7.
|
|
/// </summary>
|
|
public async Task<TrackedOperationId> CachedWrite(
|
|
string name,
|
|
string sql,
|
|
IReadOnlyDictionary<string, object?>? parameters = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (_gateway == null)
|
|
throw new InvalidOperationException("Database gateway not available");
|
|
|
|
var trackedId = TrackedOperationId.New();
|
|
var occurredAtUtc = DateTime.UtcNow;
|
|
// The DB cached-write target uses the connection name (the only
|
|
// human-readable handle the gateway carries on the buffered row).
|
|
var target = name;
|
|
|
|
await EmitCachedDbSubmitTelemetryAsync(
|
|
name, trackedId, target, occurredAtUtc, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
try
|
|
{
|
|
await _gateway.CachedWriteAsync(
|
|
name, sql, parameters, _instanceName, cancellationToken, trackedId)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Database.CachedWrite threw for {Connection} (TrackedOperationId {Id})",
|
|
name, trackedId);
|
|
throw;
|
|
}
|
|
|
|
return trackedId;
|
|
}
|
|
|
|
private async Task EmitCachedDbSubmitTelemetryAsync(
|
|
string connectionName,
|
|
TrackedOperationId trackedId,
|
|
string target,
|
|
DateTime occurredAtUtc,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
if (_cachedForwarder == null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
CachedCallTelemetry telemetry;
|
|
try
|
|
{
|
|
telemetry = new CachedCallTelemetry(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
|
|
Channel = AuditChannel.DbOutbound,
|
|
Kind = AuditKind.CachedSubmit,
|
|
CorrelationId = trackedId.Value,
|
|
SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId,
|
|
SourceInstanceId = _instanceName,
|
|
SourceScript = _sourceScript,
|
|
Target = target,
|
|
Status = AuditStatus.Submitted,
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: trackedId,
|
|
Channel: "DbOutbound",
|
|
Target: target,
|
|
SourceSite: _siteId,
|
|
Status: "Submitted",
|
|
RetryCount: 0,
|
|
LastError: null,
|
|
HttpStatus: null,
|
|
CreatedAtUtc: occurredAtUtc,
|
|
UpdatedAtUtc: occurredAtUtc,
|
|
TerminalAtUtc: null));
|
|
}
|
|
catch (Exception buildEx)
|
|
{
|
|
_logger.LogWarning(buildEx,
|
|
"Failed to build CachedSubmit telemetry for Database.CachedWrite {Connection} (TrackedOperationId {Id}) — skipping emission",
|
|
connectionName, trackedId);
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _cachedForwarder.ForwardAsync(telemetry, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"CachedSubmit telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})",
|
|
connectionName, trackedId);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Notification Outbox: helper for the <c>Notify</c> script API.
|
|
///
|
|
/// In the outbox design the site no longer delivers notification email inline.
|
|
/// <c>Notify.To("listName").Send(...)</c> enqueues the notification into the site
|
|
/// Store-and-Forward Engine — which forwards it to central — and returns a
|
|
/// <c>NotificationId</c> handle immediately. <c>Notify.Status(id)</c> later queries
|
|
/// the delivery status of that notification.
|
|
/// </summary>
|
|
public class NotifyHelper
|
|
{
|
|
private readonly StoreAndForwardService? _storeAndForward;
|
|
private readonly ICanTell? _siteCommunicationActor;
|
|
private readonly string _siteId;
|
|
private readonly string _instanceName;
|
|
private readonly string? _sourceScript;
|
|
private readonly TimeSpan _askTimeout;
|
|
private readonly ILogger _logger;
|
|
|
|
/// <summary>
|
|
/// Audit Log #23 (M4 Bundle C): best-effort emitter for the
|
|
/// <c>Notification</c>/<c>NotifySend</c> row produced when the script
|
|
/// calls <c>Notify.To(list).Send(...)</c>. Optional — when null the
|
|
/// <see cref="NotifyTarget"/> degrades to a no-op audit path so tests
|
|
/// / minimal hosts that don't wire AddAuditLog still work (mirrors the
|
|
/// M2 Bundle F <c>IExternalSystemClient</c> wrapper).
|
|
/// </summary>
|
|
private readonly IAuditWriter? _auditWriter;
|
|
|
|
internal NotifyHelper(
|
|
StoreAndForwardService? storeAndForward,
|
|
ICanTell? siteCommunicationActor,
|
|
string siteId,
|
|
string instanceName,
|
|
string? sourceScript,
|
|
TimeSpan askTimeout,
|
|
ILogger logger,
|
|
IAuditWriter? auditWriter = null)
|
|
{
|
|
_storeAndForward = storeAndForward;
|
|
_siteCommunicationActor = siteCommunicationActor;
|
|
_siteId = siteId;
|
|
_instanceName = instanceName;
|
|
_sourceScript = sourceScript;
|
|
_askTimeout = askTimeout;
|
|
_logger = logger;
|
|
_auditWriter = auditWriter;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Selects the notification list to send to.
|
|
/// </summary>
|
|
public NotifyTarget To(string listName)
|
|
{
|
|
return new NotifyTarget(
|
|
listName, _storeAndForward, _siteId, _instanceName, _sourceScript, _logger,
|
|
// Audit Log #23 (M4 Bundle C): forward the writer so Send()
|
|
// can emit one NotifySend(Submitted) row per accepted submission.
|
|
_auditWriter);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queries the delivery status of a previously-sent notification.
|
|
///
|
|
/// The query is issued to central via the site communication actor. While the
|
|
/// notification is still buffered in the site Store-and-Forward Engine — central
|
|
/// has no row for it yet (<c>Found: false</c>) but the buffer still holds the id —
|
|
/// the status is reported as the site-local <c>Forwarding</c> state. If central
|
|
/// has a row, its status is mapped through verbatim. If central does not know the
|
|
/// id and it is not buffered locally, the status is <c>Unknown</c>.
|
|
/// </summary>
|
|
public async Task<NotificationDeliveryStatus> Status(string notificationId)
|
|
{
|
|
if (_siteCommunicationActor == null)
|
|
throw new InvalidOperationException(
|
|
"Notification status query is not available — site communication actor not wired");
|
|
|
|
var correlationId = Guid.NewGuid().ToString();
|
|
var query = new NotificationStatusQuery(correlationId, notificationId);
|
|
|
|
NotificationStatusResponse response;
|
|
try
|
|
{
|
|
response = await _siteCommunicationActor
|
|
.Ask<NotificationStatusResponse>(query, _askTimeout);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Central could not be reached. Fall through to the buffer check: if the
|
|
// notification is still in the local S&F buffer it is Forwarding.
|
|
_logger.LogWarning(ex,
|
|
"Notification status query for {NotificationId} did not reach central",
|
|
notificationId);
|
|
response = new NotificationStatusResponse(
|
|
correlationId, Found: false, Status: "Unknown",
|
|
RetryCount: 0, LastError: null, DeliveredAt: null);
|
|
}
|
|
|
|
if (response.Found)
|
|
{
|
|
return new NotificationDeliveryStatus(
|
|
response.Status, response.RetryCount, response.LastError, response.DeliveredAt);
|
|
}
|
|
|
|
// Central has no row. If the notification is still buffered at the site it
|
|
// is in transit — report the site-local Forwarding state. Otherwise it is
|
|
// genuinely unknown (never sent, or already forwarded and central lost it).
|
|
if (_storeAndForward != null)
|
|
{
|
|
var buffered = await _storeAndForward.GetMessageByIdAsync(notificationId);
|
|
if (buffered != null)
|
|
{
|
|
return new NotificationDeliveryStatus(
|
|
"Forwarding", buffered.RetryCount, buffered.LastError, DeliveredAt: null);
|
|
}
|
|
}
|
|
|
|
return new NotificationDeliveryStatus("Unknown", 0, null, null);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Notification Outbox: target for <c>Notify.To("listName").Send(...)</c>.
|
|
/// </summary>
|
|
public class NotifyTarget
|
|
{
|
|
private readonly string _listName;
|
|
private readonly StoreAndForwardService? _storeAndForward;
|
|
private readonly string _siteId;
|
|
private readonly string _instanceName;
|
|
private readonly string? _sourceScript;
|
|
private readonly ILogger _logger;
|
|
|
|
/// <summary>
|
|
/// Audit Log #23 (M4 Bundle C): best-effort emitter for the
|
|
/// <c>Notification</c>/<c>NotifySend</c> row written immediately after
|
|
/// the underlying S&F enqueue accepts the submission. Optional —
|
|
/// when null no audit row is emitted (no-op path).
|
|
/// </summary>
|
|
private readonly IAuditWriter? _auditWriter;
|
|
|
|
internal NotifyTarget(
|
|
string listName,
|
|
StoreAndForwardService? storeAndForward,
|
|
string siteId,
|
|
string instanceName,
|
|
string? sourceScript,
|
|
ILogger logger,
|
|
IAuditWriter? auditWriter = null)
|
|
{
|
|
_listName = listName;
|
|
_storeAndForward = storeAndForward;
|
|
_siteId = siteId;
|
|
_instanceName = instanceName;
|
|
_sourceScript = sourceScript;
|
|
_logger = logger;
|
|
_auditWriter = auditWriter;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Enqueues a notification for central delivery and returns its
|
|
/// <c>NotificationId</c> immediately.
|
|
///
|
|
/// The notification is buffered into the site Store-and-Forward Engine under the
|
|
/// <see cref="StoreAndForwardCategory.Notification"/> category; the S&F
|
|
/// engine's <c>NotificationForwarder</c> forwards it to central and treats
|
|
/// central's ack as the delivery outcome. The returned <c>NotificationId</c> is
|
|
/// the single idempotency key end-to-end: it is the S&F message id, it is
|
|
/// carried inside the buffered payload, and it is the id the forwarder submits to
|
|
/// central. Pass it to <see cref="NotifyHelper.Status"/> to track delivery.
|
|
/// </summary>
|
|
public async Task<string> Send(
|
|
string subject,
|
|
string message,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (_storeAndForward == null)
|
|
throw new InvalidOperationException(
|
|
"Notification store-and-forward engine not available");
|
|
|
|
// The script controls the idempotency key: generate the NotificationId here,
|
|
// use it as the S&F message id, and carry it inside the buffered payload so
|
|
// the forwarder submits the same id to central on every retry.
|
|
var notificationId = Guid.NewGuid().ToString("N");
|
|
|
|
var payload = new NotificationSubmit(
|
|
NotificationId: notificationId,
|
|
ListName: _listName,
|
|
Subject: subject,
|
|
Body: message,
|
|
// SourceSiteId is re-stamped by the forwarder from its own site id; this
|
|
// value is the best-effort site id known to the script runtime.
|
|
SourceSiteId: _siteId,
|
|
SourceInstanceId: _instanceName,
|
|
// SourceScript (FU3): identifier of the script that raised this
|
|
// notification, threaded down from the script-execution context for the
|
|
// central audit trail. Null when no single script owns the context.
|
|
SourceScript: _sourceScript,
|
|
SiteEnqueuedAt: DateTimeOffset.UtcNow);
|
|
|
|
var payloadJson = JsonSerializer.Serialize(payload);
|
|
|
|
// The S&F engine assigns its own GUID to the message; pin the message id to
|
|
// the NotificationId so the buffer can be queried by it (Notify.Status) and
|
|
// the forwarder's idempotency key matches the buffered row.
|
|
var occurredAtUtc = DateTime.UtcNow;
|
|
await _storeAndForward.EnqueueAsync(
|
|
StoreAndForwardCategory.Notification,
|
|
target: _listName,
|
|
payloadJson: payloadJson,
|
|
originInstanceName: _instanceName,
|
|
messageId: notificationId);
|
|
|
|
_logger.LogDebug(
|
|
"Notify enqueued notification {NotificationId} to list '{List}' for central delivery",
|
|
notificationId, _listName);
|
|
|
|
// Audit Log #23 (M4 Bundle C): emit one Notification/NotifySend
|
|
// (Submitted) row per accepted submission. The emission is wired
|
|
// AFTER the EnqueueAsync returns so we only audit submissions the
|
|
// S&F engine accepted — a failed enqueue throws, never produces an
|
|
// audit row (mirrors ESG: audit fires after the boundary call
|
|
// returned a result, never speculatively). Best-effort per alog.md
|
|
// §7 — the audit write is wrapped in try/catch and any failure is
|
|
// logged + swallowed so the script's Send call still returns the
|
|
// NotificationId.
|
|
EmitNotifySendAudit(notificationId, subject, message, occurredAtUtc);
|
|
|
|
return notificationId;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Best-effort emission of one <c>Notification</c>/<c>NotifySend</c>
|
|
/// (Status <c>Submitted</c>) audit row. Any exception thrown by the
|
|
/// writer is logged and swallowed — audit-write failures must never
|
|
/// abort the user-facing <c>Notify.Send</c> call (alog.md §7).
|
|
/// </summary>
|
|
private void EmitNotifySendAudit(
|
|
string notificationId,
|
|
string subject,
|
|
string body,
|
|
DateTime occurredAtUtc)
|
|
{
|
|
if (_auditWriter == null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
AuditEvent evt;
|
|
try
|
|
{
|
|
// CorrelationId is the NotificationId parsed as a Guid. Notify
|
|
// mints the id via Guid.NewGuid().ToString("N") so the parse
|
|
// is expected to succeed; on the off-chance the format
|
|
// changes / a caller injects an unparseable value, leave it
|
|
// null per Bundle B's pattern rather than fail the emission.
|
|
Guid? correlationId = Guid.TryParse(notificationId, out var parsed) ? parsed : (Guid?)null;
|
|
|
|
// M4 captures the request summary verbatim — {"subject": "...", "body": "..."}.
|
|
// M5 will layer redaction / payload-cap enforcement on top.
|
|
var requestSummary = JsonSerializer.Serialize(new
|
|
{
|
|
subject = subject,
|
|
body = body,
|
|
});
|
|
|
|
evt = new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
|
|
Channel = AuditChannel.Notification,
|
|
Kind = AuditKind.NotifySend,
|
|
CorrelationId = correlationId,
|
|
SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId,
|
|
SourceInstanceId = _instanceName,
|
|
SourceScript = _sourceScript,
|
|
Actor = null,
|
|
Target = _listName,
|
|
Status = AuditStatus.Submitted,
|
|
HttpStatus = null,
|
|
// Send is fire-and-forget from the script's perspective —
|
|
// the dispatcher (NotificationOutboxActor) times each
|
|
// delivery attempt and stamps DurationMs on its
|
|
// NotifyDeliver(Attempted) rows.
|
|
DurationMs = null,
|
|
ErrorMessage = null,
|
|
ErrorDetail = null,
|
|
RequestSummary = requestSummary,
|
|
ResponseSummary = null,
|
|
PayloadTruncated = false,
|
|
Extra = null,
|
|
ForwardState = AuditForwardState.Pending,
|
|
};
|
|
}
|
|
catch (Exception buildEx)
|
|
{
|
|
// Defensive: building the event itself must never propagate.
|
|
_logger.LogWarning(buildEx,
|
|
"Failed to build Audit Log #23 NotifySend event for NotificationId {NotificationId} list '{List}' — skipping emission",
|
|
notificationId, _listName);
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
// Fire-and-forget (mirrors ExternalSystemHelper.EmitCallAudit)
|
|
// so the script is never blocked on the audit writer; we observe
|
|
// failures via ContinueWith so a thrown writer is logged rather
|
|
// than going to the unobserved-task firehose.
|
|
var writeTask = _auditWriter.WriteAsync(evt, CancellationToken.None);
|
|
if (!writeTask.IsCompleted)
|
|
{
|
|
writeTask.ContinueWith(
|
|
t => _logger.LogWarning(t.Exception,
|
|
"Audit Log #23 write failed for EventId {EventId} (NotifySend NotificationId {NotificationId})",
|
|
evt.EventId, notificationId),
|
|
CancellationToken.None,
|
|
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
|
|
TaskScheduler.Default);
|
|
}
|
|
else if (writeTask.IsFaulted)
|
|
{
|
|
_logger.LogWarning(writeTask.Exception,
|
|
"Audit Log #23 write failed for EventId {EventId} (NotifySend NotificationId {NotificationId})",
|
|
evt.EventId, notificationId);
|
|
}
|
|
}
|
|
catch (Exception writeEx)
|
|
{
|
|
// Synchronous throw from WriteAsync (e.g. ArgumentNullException
|
|
// before the writer's own try/catch). Swallow + log per alog.md §7.
|
|
_logger.LogWarning(writeEx,
|
|
"Audit Log #23 write threw synchronously for EventId {EventId} (NotifySend NotificationId {NotificationId})",
|
|
evt.EventId, notificationId);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Audit Log #23 (M3): script-side accessor for cached-operation tracking.
|
|
/// <c>Tracking.Status(trackedOperationId)</c> reads the site-local SQLite
|
|
/// row directly via <see cref="IOperationTrackingStore.GetStatusAsync"/> —
|
|
/// the site is the single source of truth for cached-call status, so no
|
|
/// central round-trip is needed and the call is answered authoritatively.
|
|
/// </summary>
|
|
public class TrackingHelper
|
|
{
|
|
private readonly IOperationTrackingStore? _store;
|
|
private readonly ILogger _logger;
|
|
|
|
internal TrackingHelper(IOperationTrackingStore? store, ILogger logger)
|
|
{
|
|
_store = store;
|
|
_logger = logger;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the latest tracking snapshot for the supplied id, or
|
|
/// <c>null</c> when the id is unknown (never recorded, or purged after
|
|
/// the retention window).
|
|
/// </summary>
|
|
/// <exception cref="InvalidOperationException">
|
|
/// Thrown when the script runtime was constructed without an
|
|
/// <see cref="IOperationTrackingStore"/> — mirrors the
|
|
/// "service-not-wired" failure mode of the other integration helpers.
|
|
/// </exception>
|
|
public Task<TrackingStatusSnapshot?> Status(
|
|
TrackedOperationId trackedOperationId,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (_store == null)
|
|
{
|
|
throw new InvalidOperationException(
|
|
"Operation tracking store not available");
|
|
}
|
|
|
|
return _store.GetStatusAsync(trackedOperationId, cancellationToken);
|
|
}
|
|
}
|
|
}
|