feat(siteruntime): ExternalSystem.CachedCall emits CachedSubmit telemetry (#23 M3)

Rework ScriptRuntimeContext.ExternalSystem.CachedCall to fit the M3
combined-telemetry model:

* Mints a fresh TrackedOperationId and emits one CachedSubmit packet
  via ICachedCallTelemetryForwarder BEFORE handing the call off — the
  SiteCalls row is materialised before the first delivery attempt so
  Tracking.Status(id) can observe a Submitted row even if immediate
  delivery resolves before the helper returns.
* Threads the TrackedOperationId into IExternalSystemClient.CachedCallAsync
  as a new optional parameter (and into IDatabaseGateway.CachedWriteAsync
  for the Database mirror set up here for E6). The gateway uses the id
  as the StoreAndForward messageId so the retry loop (Tasks E4/E5) can
  recover it from StoreAndForwardMessage.Id.
* Returns the TrackedOperationId rather than ExternalCallResult — the
  script's contract is now "get a tracking handle, observe outcome via
  Tracking.Status". Best-effort emission: a thrown forwarder is logged
  + swallowed; the original call still runs and the id is still returned.

DatabaseHelper gets the matching siteId / sourceScript / forwarder
fields and a parallel CachedSubmit emitter (Channel=DbOutbound) so Task
E6's Database.CachedWrite mirror plugs in without further runtime
wiring.

New ICachedCallTelemetryForwarder seam in Commons.Interfaces.Services
so SiteRuntime depends on Commons (existing arrow) rather than
ScadaLink.AuditLog (would have introduced a new dependency).

Bundle E task E3 (and helper-shape work for E6).
This commit is contained in:
Joseph Doherty
2026-05-20 14:48:05 -04:00
parent 2145b29d4d
commit 42430dd10a
8 changed files with 587 additions and 16 deletions

View File

@@ -7,6 +7,7 @@ using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Instance;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Messages.ScriptExecution;
using ScadaLink.Commons.Types;
@@ -94,6 +95,16 @@ public class ScriptRuntimeContext
/// </summary>
private readonly IOperationTrackingStore? _operationTrackingStore;
/// <summary>
/// Audit Log #23 (M3 Bundle E — Task E3): site-side dual emitter for
/// cached-call lifecycle telemetry. Optional — when null
/// <c>ExternalSystem.CachedCall</c> / <c>Database.CachedWrite</c> still
/// return a <see cref="TrackedOperationId"/> and invoke the underlying
/// store-and-forward path, but no audit / SiteCalls telemetry is emitted
/// (tests / minimal hosts that don't wire the audit pipeline).
/// </summary>
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
public ScriptRuntimeContext(
IActorRef instanceActor,
IActorRef self,
@@ -110,7 +121,8 @@ public class ScriptRuntimeContext
string siteId = "",
string? sourceScript = null,
IAuditWriter? auditWriter = null,
IOperationTrackingStore? operationTrackingStore = null)
IOperationTrackingStore? operationTrackingStore = null,
ICachedCallTelemetryForwarder? cachedForwarder = null)
{
_instanceActor = instanceActor;
_self = self;
@@ -128,6 +140,7 @@ public class ScriptRuntimeContext
_sourceScript = sourceScript;
_auditWriter = auditWriter;
_operationTrackingStore = operationTrackingStore;
_cachedForwarder = cachedForwarder;
}
/// <summary>
@@ -228,14 +241,21 @@ public class ScriptRuntimeContext
/// ExternalSystem.CachedCall("systemName", "methodName", params)
/// </summary>
public ExternalSystemHelper ExternalSystem => new(
_externalSystemClient, _instanceName, _logger, _auditWriter, _siteId, _sourceScript);
_externalSystemClient, _instanceName, _logger, _auditWriter, _siteId, _sourceScript,
// Audit Log #23 (M3 Bundle E — Task E3): emit CachedSubmit telemetry
// on every ExternalSystem.CachedCall enqueue.
_cachedForwarder);
/// <summary>
/// WP-13: Provides access to database operations.
/// Database.Connection("name")
/// Database.CachedWrite("name", "sql", params)
/// </summary>
public DatabaseHelper Database => new(_databaseGateway, _instanceName, _logger);
public DatabaseHelper Database => new(
_databaseGateway, _instanceName, _logger, _siteId, _sourceScript,
// Audit Log #23 (M3 Bundle E — Task E6): emit CachedSubmit telemetry on
// every Database.CachedWrite enqueue.
_cachedForwarder);
/// <summary>
/// Provides access to the Notification Outbox API.
@@ -328,14 +348,19 @@ public class ScriptRuntimeContext
private readonly IAuditWriter? _auditWriter;
private readonly string _siteId;
private readonly string? _sourceScript;
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
// Internal constructor for tests living in ScadaLink.SiteRuntime.Tests
// (via InternalsVisibleTo). Production sites resolve the helper through
// ScriptRuntimeContext.ExternalSystem.
internal ExternalSystemHelper(
IExternalSystemClient? client,
string instanceName,
ILogger logger,
IAuditWriter? auditWriter = null,
string siteId = "",
string? sourceScript = null)
string? sourceScript = null,
ICachedCallTelemetryForwarder? cachedForwarder = null)
{
_client = client;
_instanceName = instanceName;
@@ -343,6 +368,7 @@ public class ScriptRuntimeContext
_auditWriter = auditWriter;
_siteId = siteId;
_sourceScript = sourceScript;
_cachedForwarder = cachedForwarder;
}
public async Task<ExternalCallResult> Call(
@@ -381,7 +407,22 @@ public class ScriptRuntimeContext
}
}
public async Task<ExternalCallResult> CachedCall(
/// <summary>
/// Submit a cached outbound API call (Audit Log #23 / M3). Mints a
/// fresh <see cref="TrackedOperationId"/>, emits the lifecycle's first
/// <c>CachedSubmit</c> telemetry packet, hands the call to the
/// store-and-forward retry loop (which emits per-attempt and terminal
/// telemetry under the same id — Bundle E Tasks E4/E5), and returns
/// the id immediately so the script can later query
/// <c>Tracking.Status(id)</c>.
/// </summary>
/// <remarks>
/// <b>Best-effort emission (alog.md §7):</b> if the forwarder throws,
/// the failure is logged and swallowed; the underlying cached-call
/// path still runs and the id is still returned. The script must never
/// be aborted by an audit-pipeline failure.
/// </remarks>
public async Task<TrackedOperationId> CachedCall(
string systemName,
string methodName,
IReadOnlyDictionary<string, object?>? parameters = null,
@@ -390,7 +431,119 @@ public class ScriptRuntimeContext
if (_client == null)
throw new InvalidOperationException("External system client not available");
return await _client.CachedCallAsync(systemName, methodName, parameters, _instanceName, cancellationToken);
var trackedId = TrackedOperationId.New();
var occurredAtUtc = DateTime.UtcNow;
var target = $"{systemName}.{methodName}";
// Emit CachedSubmit telemetry BEFORE handing off to the S&F
// engine — that way the SiteCalls row is materialised before the
// first delivery attempt and Tracking.Status(id) can observe a
// Submitted row even if the immediate-delivery attempt happens to
// resolve before this method returns.
await EmitCachedSubmitTelemetryAsync(
systemName, methodName, target, trackedId, occurredAtUtc, cancellationToken)
.ConfigureAwait(false);
// Hand off to the existing cached-call path. The TrackedOperationId
// becomes the S&F message id so the retry loop (Bundle E Tasks
// E4/E5) can read it back via StoreAndForwardMessage.Id. The
// underlying ExternalCallResult is intentionally discarded — the
// script's contract is now "return the tracking id, observe outcome
// via Tracking.Status".
try
{
await _client.CachedCallAsync(
systemName,
methodName,
parameters,
_instanceName,
cancellationToken,
trackedId).ConfigureAwait(false);
}
catch (Exception ex)
{
// The cached-call surface returns ExternalCallResult on permanent
// failure rather than throwing; a throw here is exceptional
// (e.g. cancellation, resolver outage). Log it and rethrow — the
// script does need to learn about catastrophic failures. The
// tracked id was still returned via the telemetry submit above.
_logger.LogWarning(ex,
"ExternalSystem.CachedCall threw for {System}.{Method} (TrackedOperationId {Id})",
systemName, methodName, trackedId);
throw;
}
return trackedId;
}
/// <summary>
/// Best-effort emission of the CachedSubmit lifecycle event. Any
/// exception thrown by the forwarder is logged and swallowed so the
/// calling script's enqueue is not disturbed.
/// </summary>
private async Task EmitCachedSubmitTelemetryAsync(
string systemName,
string methodName,
string target,
TrackedOperationId trackedId,
DateTime occurredAtUtc,
CancellationToken cancellationToken)
{
if (_cachedForwarder == null)
{
return;
}
CachedCallTelemetry telemetry;
try
{
telemetry = new CachedCallTelemetry(
Audit: new AuditEvent
{
EventId = Guid.NewGuid(),
OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.CachedSubmit,
CorrelationId = trackedId.Value,
SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId,
SourceInstanceId = _instanceName,
SourceScript = _sourceScript,
Target = target,
Status = AuditStatus.Submitted,
ForwardState = AuditForwardState.Pending,
},
Operational: new SiteCallOperational(
TrackedOperationId: trackedId,
Channel: "ApiOutbound",
Target: target,
SourceSite: _siteId,
Status: "Submitted",
RetryCount: 0,
LastError: null,
HttpStatus: null,
CreatedAtUtc: occurredAtUtc,
UpdatedAtUtc: occurredAtUtc,
TerminalAtUtc: null));
}
catch (Exception buildEx)
{
_logger.LogWarning(buildEx,
"Failed to build CachedSubmit telemetry for {System}.{Method} (TrackedOperationId {Id}) — skipping emission",
systemName, methodName, trackedId);
return;
}
try
{
await _cachedForwarder.ForwardAsync(telemetry, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"CachedSubmit telemetry forward failed for {System}.{Method} (TrackedOperationId {Id})",
systemName, methodName, trackedId);
}
}
/// <summary>
@@ -536,17 +689,37 @@ public class ScriptRuntimeContext
/// <summary>
/// WP-13: Helper for Database.Connection/CachedWrite syntax.
/// </summary>
/// <remarks>
/// Audit Log #23 (M3 Bundle E — Task E6): <see cref="CachedWrite"/> mirrors
/// <see cref="ExternalSystemHelper.CachedCall"/> — mints a
/// <see cref="TrackedOperationId"/>, emits the lifecycle's first
/// CachedSubmit packet (Channel <c>DbOutbound</c>), hands off to the S&amp;F
/// retry loop, and returns the id. Per-attempt + terminal telemetry is
/// emitted by the retry loop (Tasks E4/E5).
/// </remarks>
public class DatabaseHelper
{
private readonly IDatabaseGateway? _gateway;
private readonly string _instanceName;
private readonly ILogger _logger;
private readonly string _siteId;
private readonly string? _sourceScript;
private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
internal DatabaseHelper(IDatabaseGateway? gateway, string instanceName, ILogger logger)
internal DatabaseHelper(
IDatabaseGateway? gateway,
string instanceName,
ILogger logger,
string siteId = "",
string? sourceScript = null,
ICachedCallTelemetryForwarder? cachedForwarder = null)
{
_gateway = gateway;
_instanceName = instanceName;
_logger = logger;
_siteId = siteId;
_sourceScript = sourceScript;
_cachedForwarder = cachedForwarder;
}
public async Task<System.Data.Common.DbConnection> Connection(
@@ -559,7 +732,13 @@ public class ScriptRuntimeContext
return await _gateway.GetConnectionAsync(name, cancellationToken);
}
public async Task CachedWrite(
/// <summary>
/// Submit a cached outbound database write. Mints a fresh
/// <see cref="TrackedOperationId"/>, emits CachedSubmit telemetry on
/// <c>DbOutbound</c>, hands off to the cached-write S&amp;F path, and
/// returns the id. Best-effort emission per alog.md §7.
/// </summary>
public async Task<TrackedOperationId> CachedWrite(
string name,
string sql,
IReadOnlyDictionary<string, object?>? parameters = null,
@@ -568,7 +747,95 @@ public class ScriptRuntimeContext
if (_gateway == null)
throw new InvalidOperationException("Database gateway not available");
await _gateway.CachedWriteAsync(name, sql, parameters, _instanceName, cancellationToken);
var trackedId = TrackedOperationId.New();
var occurredAtUtc = DateTime.UtcNow;
// The DB cached-write target uses the connection name (the only
// human-readable handle the gateway carries on the buffered row).
var target = name;
await EmitCachedDbSubmitTelemetryAsync(
name, trackedId, target, occurredAtUtc, cancellationToken)
.ConfigureAwait(false);
try
{
await _gateway.CachedWriteAsync(
name, sql, parameters, _instanceName, cancellationToken, trackedId)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Database.CachedWrite threw for {Connection} (TrackedOperationId {Id})",
name, trackedId);
throw;
}
return trackedId;
}
private async Task EmitCachedDbSubmitTelemetryAsync(
string connectionName,
TrackedOperationId trackedId,
string target,
DateTime occurredAtUtc,
CancellationToken cancellationToken)
{
if (_cachedForwarder == null)
{
return;
}
CachedCallTelemetry telemetry;
try
{
telemetry = new CachedCallTelemetry(
Audit: new AuditEvent
{
EventId = Guid.NewGuid(),
OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc),
Channel = AuditChannel.DbOutbound,
Kind = AuditKind.CachedSubmit,
CorrelationId = trackedId.Value,
SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId,
SourceInstanceId = _instanceName,
SourceScript = _sourceScript,
Target = target,
Status = AuditStatus.Submitted,
ForwardState = AuditForwardState.Pending,
},
Operational: new SiteCallOperational(
TrackedOperationId: trackedId,
Channel: "DbOutbound",
Target: target,
SourceSite: _siteId,
Status: "Submitted",
RetryCount: 0,
LastError: null,
HttpStatus: null,
CreatedAtUtc: occurredAtUtc,
UpdatedAtUtc: occurredAtUtc,
TerminalAtUtc: null));
}
catch (Exception buildEx)
{
_logger.LogWarning(buildEx,
"Failed to build CachedSubmit telemetry for Database.CachedWrite {Connection} (TrackedOperationId {Id}) — skipping emission",
connectionName, trackedId);
return;
}
try
{
await _cachedForwarder.ForwardAsync(telemetry, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"CachedSubmit telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})",
connectionName, trackedId);
}
}
}