M3 Bundle F (Task F1) wires the cached-call audit pipeline through the composition roots: - Central: register SiteCallAuditActor as a cluster singleton + proxy (mirrors AuditLogIngestActor and NotificationOutboxActor). Program.cs calls .AddSiteCallAudit() on the central role. - Site: register ICachedCallTelemetryForwarder + CachedCallLifecycleBridge in AddAuditLog (lazy factory — Central nodes degrade to audit-only emission because IOperationTrackingStore is site-only). - Site: bind CachedCallLifecycleBridge to ICachedCallLifecycleObserver so StoreAndForwardService picks it up via DI. - Site: introduce IStoreAndForwardSiteContext + Host adapter to surface the site id to StoreAndForwardService without creating a StoreAndForward -> HealthMonitoring project-reference cycle. - ScriptExecutionActor resolves ICachedCallTelemetryForwarder per script scope and threads it into ScriptRuntimeContext. CachedCallTelemetryForwarder's IOperationTrackingStore dependency is now nullable so Central DI validation succeeds with the lazy registration; the forwarder's tracking-half emission is a no-op when the store is absent. Tests: - AkkaHostedServiceAuditWiringTests: Central host builds with AddSiteCallAudit and resolves ICachedCallTelemetryForwarder; Site resolves the forwarder + bridge + observer + IStoreAndForwardSiteContext. - Full solution: 194 Host tests green, 241 SiteRuntime tests green, every other suite unchanged.
180 lines
8.4 KiB
C#
180 lines
8.4 KiB
C#
using Microsoft.Extensions.Logging;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
|
|
namespace ScadaLink.AuditLog.Site.Telemetry;
|
|
|
|
/// <summary>
|
|
/// Site-side dual emitter for cached-call lifecycle telemetry (Audit Log #23 /
|
|
/// M3). Sister to <see cref="SiteAuditTelemetryActor"/>: where the M2 actor
|
|
/// drains audit-only events, this forwarder takes a combined
|
|
/// <see cref="CachedCallTelemetry"/> packet and fans it out to the two
|
|
/// site-local stores in a single call:
|
|
/// <list type="bullet">
|
|
/// <item><description>The <see cref="AuditEvent"/> row is written via
|
|
/// <see cref="IAuditWriter"/> (the site <c>FallbackAuditWriter</c> +
|
|
/// <c>SqliteAuditWriter</c> chain established in M2).</description></item>
|
|
/// <item><description>The operational <see cref="SiteCallOperational"/> half
|
|
/// updates the site-local <c>OperationTracking</c> SQLite store via
|
|
/// <see cref="IOperationTrackingStore"/>, with the per-lifecycle method
|
|
/// (<c>Enqueue</c> / <c>Attempt</c> / <c>Terminal</c>) selected from the
|
|
/// audit row's <see cref="AuditKind"/>.</description></item>
|
|
/// </list>
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// <b>Best-effort contract (alog.md §7):</b> a thrown writer OR a thrown
|
|
/// tracking store must never propagate to the calling script. Both emission
|
|
/// halves are wrapped in independent try/catch blocks so a SQLite outage on
|
|
/// one side cannot starve the other — the failure is logged and the call
|
|
/// returns normally.
|
|
/// </para>
|
|
/// <para>
|
|
/// <b>Wire push deferred to M6.</b> M3 keeps this forwarder synchronous
|
|
/// against the local stores: there is no site→central gRPC channel yet, so
|
|
/// the <see cref="ISiteStreamAuditClient.IngestCachedTelemetryAsync"/> RPC
|
|
/// is registered on the interface (Bundle E1) but the production binding
|
|
/// remains <c>NoOpSiteStreamAuditClient</c>. Once M6 wires a real client the
|
|
/// drain pattern from <c>SiteAuditTelemetryActor</c> can be reused — the
|
|
/// <c>AuditEvent</c> rows already live in SQLite tagged
|
|
/// <see cref="AuditForwardState.Pending"/>, so a single drain loop sweeps
|
|
/// both M2 and M3 emissions.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder
|
|
{
|
|
private readonly IAuditWriter _auditWriter;
|
|
private readonly IOperationTrackingStore? _trackingStore;
|
|
private readonly ILogger<CachedCallTelemetryForwarder> _logger;
|
|
|
|
/// <summary>
|
|
/// Construct the forwarder. <paramref name="trackingStore"/> is optional —
|
|
/// when null only the audit half of the packet is emitted, which matches
|
|
/// the M3 Bundle F composition-root contract on Central nodes: the
|
|
/// AuditLog DI surface registers the forwarder unconditionally (mirroring
|
|
/// the IAuditWriter chain) but the site-only tracking store has no central
|
|
/// registration. Production site nodes wire both — the central lazy
|
|
/// resolution is a no-op path kept symmetric with the M2 writer chain.
|
|
/// </summary>
|
|
public CachedCallTelemetryForwarder(
|
|
IAuditWriter auditWriter,
|
|
IOperationTrackingStore? trackingStore,
|
|
ILogger<CachedCallTelemetryForwarder> logger)
|
|
{
|
|
_auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
_trackingStore = trackingStore;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Fan out one combined-telemetry packet to the audit writer and the
|
|
/// tracking store. Returns once both halves have been attempted (success
|
|
/// OR logged failure). NEVER throws — exceptions are caught per-half and
|
|
/// logged at warning level so the calling script's outbound action is not
|
|
/// disturbed.
|
|
/// </summary>
|
|
public async Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(telemetry);
|
|
|
|
// Independent try/catch — a thrown audit writer must not prevent the
|
|
// tracking-store update from running (and vice-versa). Both halves
|
|
// are best-effort.
|
|
await TryEmitAuditAsync(telemetry, ct).ConfigureAwait(false);
|
|
await TryEmitTrackingAsync(telemetry, ct).ConfigureAwait(false);
|
|
}
|
|
|
|
private async Task TryEmitAuditAsync(CachedCallTelemetry telemetry, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await _auditWriter.WriteAsync(telemetry.Audit, ct).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// alog.md §7 best-effort contract — log and swallow. The audit
|
|
// pipeline's own retry/recovery (RingBufferFallback in the
|
|
// FallbackAuditWriter) handles transient writer failures upstream;
|
|
// a throw bubbling up here means the writer's own swallow contract
|
|
// failed, which is itself best-effort-handled.
|
|
_logger.LogWarning(ex,
|
|
"CachedCallTelemetryForwarder: audit emission threw for EventId {EventId} (Kind {Kind}, Status {Status})",
|
|
telemetry.Audit.EventId, telemetry.Audit.Kind, telemetry.Audit.Status);
|
|
}
|
|
}
|
|
|
|
private async Task TryEmitTrackingAsync(CachedCallTelemetry telemetry, CancellationToken ct)
|
|
{
|
|
if (_trackingStore is null)
|
|
{
|
|
// No site-local tracking store wired — Central composition root or
|
|
// an integration-test host that skipped AddSiteRuntime. Emitting
|
|
// through the audit half is still meaningful; the tracking half
|
|
// is a no-op rather than an error.
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
switch (telemetry.Audit.Kind)
|
|
{
|
|
case AuditKind.CachedSubmit:
|
|
// Enqueue — insert-if-not-exists with the operational
|
|
// channel as the kind discriminator. RetryCount is fixed
|
|
// at 0 by the tracking store's INSERT contract.
|
|
await _trackingStore.RecordEnqueueAsync(
|
|
telemetry.Operational.TrackedOperationId,
|
|
telemetry.Operational.Channel,
|
|
telemetry.Operational.Target,
|
|
telemetry.Audit.SourceInstanceId,
|
|
telemetry.Audit.SourceScript,
|
|
ct).ConfigureAwait(false);
|
|
break;
|
|
|
|
case AuditKind.ApiCallCached:
|
|
case AuditKind.DbWriteCached:
|
|
// Attempt — advance retry counter + last-error/HTTP-status.
|
|
// Terminal rows are guarded by the store's WHERE clause.
|
|
await _trackingStore.RecordAttemptAsync(
|
|
telemetry.Operational.TrackedOperationId,
|
|
telemetry.Operational.Status,
|
|
telemetry.Operational.RetryCount,
|
|
telemetry.Operational.LastError,
|
|
telemetry.Operational.HttpStatus,
|
|
ct).ConfigureAwait(false);
|
|
break;
|
|
|
|
case AuditKind.CachedResolve:
|
|
// Terminal — first-write-wins on the resolve flip.
|
|
await _trackingStore.RecordTerminalAsync(
|
|
telemetry.Operational.TrackedOperationId,
|
|
telemetry.Operational.Status,
|
|
telemetry.Operational.LastError,
|
|
telemetry.Operational.HttpStatus,
|
|
ct).ConfigureAwait(false);
|
|
break;
|
|
|
|
default:
|
|
// Defensive — only the four cached-lifecycle kinds are
|
|
// expected on this path. Anything else is logged so a
|
|
// mis-routed packet is visible but never crashes the
|
|
// forwarder.
|
|
_logger.LogWarning(
|
|
"CachedCallTelemetryForwarder: unexpected audit kind {Kind} on tracking emission for EventId {EventId}",
|
|
telemetry.Audit.Kind, telemetry.Audit.EventId);
|
|
break;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"CachedCallTelemetryForwarder: tracking-store emission threw for TrackedOperationId {Id} (Status {Status})",
|
|
telemetry.Operational.TrackedOperationId, telemetry.Operational.Status);
|
|
}
|
|
}
|
|
}
|