feat(audit): close AuditLog-001 — wire combined-telemetry dual-write transport

Closes the last open code-review finding. The unreachable
IngestCachedTelemetryAsync path now carries production cached-call
lifecycle traffic, delivering the design's "AuditLog + SiteCalls in one
MS SQL transaction" guarantee. Before this commit, the SiteCalls
operational half had NO production transport at all — central's
SiteCallAuditActor.OnUpsertAsync had zero producers, so cached-call
operational state never reached the central mirror.

Site-side partition (so neither path double-emits):
- ISiteAuditQueue.ReadPendingCachedTelemetryAsync — new method returning
  rows where Kind ∈ {CachedSubmit, ApiCallCached, DbWriteCached,
  CachedResolve} AND ForwardState = Pending.
- ISiteAuditQueue.ReadPendingAsync — XML doc updated, SQLite impl now
  filters Kind NOT IN the cached set so cached rows no longer ride the
  audit-only drain.

New cached-drain in SiteAuditTelemetryActor:
- Optional IOperationTrackingStore? ctor param (null on central
  composition roots — the cached scheduler is never armed there).
- Independent CachedDrain message + scheduler tick parallel to the
  existing Drain — a stall on one path can't block the other; shared
  lifecycle CTS gates both.
- OnCachedDrainAsync: reads cached audit rows, joins each with its
  matching SiteCallOperational snapshot via CorrelationId →
  TrackedOperationId from the tracking store, builds CachedTelemetryBatch,
  pushes via IngestCachedTelemetryAsync, marks ack'd rows Forwarded.
- Orphan rows (no tracking snapshot, thrown tracking-store call,
  missing CorrelationId) logged at Warning + skipped — they stay
  Pending so reconciliation/retry picks them up later. Best-effort
  contract preserved.

Central side: AuditLogIngestActor.OnCachedTelemetryAsync was already
implemented (M3 Bundle G dead code today, alive after this commit) —
performs InsertIfNotExists for AuditLog + UpsertAsync for SiteCalls
inside a BeginTransactionAsync. The handler is idempotent on EventId,
so any duplicate arrivals from concurrent push + reconciliation are
silent no-ops.

Composition root: AkkaHostedService now resolves IOperationTrackingStore
via GetService<>() (site-only) and threads it through the actor's
Props.Create.

Tests added (+3 in SiteAuditTelemetryActorTests):
- Cached rows route through the new transport, not the audit-only drain.
- Orphan cached row (no tracking match) is logged + skipped, drain
  doesn't crash.
- Ordinary audit rows still flow through the audit-only drain unchanged.
- ParentExecutionIdCorrelationTests now unions both queues to assert
  all expected Kinds remain covered after the partition.

Build clean; AuditLog.Tests 250/251 (the 1 fail is the pre-existing
date-sensitive PartitionPurgeTests integration flake explicitly accepted
across the session); SiteRuntime.Tests 302/302.

README regenerated: 0 pending of 481 total.

Session-final totals: 136 of 136 originally-open Theme findings closed
across 11 commits (10 themed batches + this architectural close).
This commit is contained in:
Joseph Doherty
2026-05-28 09:08:43 -04:00
parent 11950b0a8e
commit c1fe1c4f83
8 changed files with 698 additions and 34 deletions
+69 -4
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-28 |
| Reviewer | claude-agent |
| Commit reviewed | `1eb6e97` |
| Open findings | 1 |
| Open findings | 0 |
## Summary
@@ -65,7 +65,7 @@ chain doesn't reject a central composition root that mistakenly calls the site b
|--|--|
| Severity | Medium |
| Category | Design-document adherence |
| Status | Open |
| Status | Resolved |
| Location | `src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs:45`, `src/ScadaLink.AuditLog/Site/Telemetry/ClusterClientSiteAuditClient.cs:86`, `src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs:198` |
**Description**
@@ -101,9 +101,74 @@ unreachable `OnCachedTelemetryAsync` dual-write code (after confirming the
`AuditLogIngestActorCombinedTelemetryTests` integration tests exercise it via direct
actor injection only).
**Resolution**
**Resolution (2026-05-28):**
_Unresolved._
Wired the combined-telemetry transport end-to-end via recommendation (a). The
previously-unreachable `IngestCachedTelemetryAsync` client path now carries
cached-call lifecycle rows from the site SQLite hot-path through to the central
`AuditLogIngestActor.OnCachedTelemetryAsync` dual-write transaction. Changes:
- **`ISiteAuditQueue`** (`src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs`):
added `ReadPendingCachedTelemetryAsync(int, CancellationToken)` returning
rows in `AuditForwardState.Pending` whose `Kind` is one of `CachedSubmit`,
`ApiCallCached`, `DbWriteCached`, `CachedResolve`. Updated `ReadPendingAsync`
XML doc to call out the partition.
- **`SqliteAuditWriter`** (`src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs`):
implemented `ReadPendingCachedTelemetryAsync` with a `Kind IN (...)` filter
reusing the existing `_readConnection` / `_readLock` decoupling; modified
`ReadPendingAsync` to add the symmetric `Kind NOT IN (...)` predicate so the
audit-only drain no longer double-emits cached rows.
- **`SiteAuditTelemetryActor`** (`src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs`):
added an optional `IOperationTrackingStore?` constructor parameter, a sibling
`CachedDrain` self-tick message, and an `OnCachedDrainAsync` handler running
in parallel with the existing audit-only drain. The cached-drain reads the
partitioned audit rows, joins each with the matching tracking-store
snapshot (looked up by `TrackedOperationId` via `CorrelationId`), builds a
`CachedTelemetryBatch`, pushes via `IngestCachedTelemetryAsync`, and marks
ack'd EventIds Forwarded. Orphan rows (no matching tracking snapshot, or a
thrown tracking-store call) are logged + skipped so the bad row never
blocks the rest of the batch; rows stay Pending and reconciliation /
retention handles them. The lifecycle CTS (AuditLog-010) gates both drains
uniformly.
- **`AkkaHostedService`** (`src/ScadaLink.Host/Actors/AkkaHostedService.cs`):
resolves `IOperationTrackingStore` via `GetService` (site-only registration)
and threads it through the actor's `Props.Create`. Central composition
roots and tests that don't register the tracking store get the legacy
audit-only behaviour — the cached scheduler is never armed.
- **Tests** (`tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs`):
added three regression tests asserting (1) cached rows route through
`IngestCachedTelemetryAsync` and NOT `IngestAuditEventsAsync`, (2) an
orphan row with no tracking snapshot is logged + skipped without crashing
the drain, (3) the audit-only drain still flows when the cached drain is
disabled (null tracking store). Updated `WaitForSiteRowsPersistedAsync` in
`ParentExecutionIdCorrelationTests` to union `ReadPendingCachedTelemetryAsync`
into the durability check — its `ReadPendingAsync(256) ReadForwardedAsync(256)`
assertion previously missed the cached kinds after the partition change.
**Design notes / caveats.**
- *Operational state at emission time is the latest tracking row, not the
per-event status.* The original spec described one combined packet per
lifecycle event, but the production wiring keeps the existing
`CachedCallTelemetryForwarder` dual-write (audit + tracking) and uses the
drain as a join. Central's `SiteCalls` upsert is monotonic so this is
consistent with the broader design — the audit row preserves per-event
granularity, the SiteCalls mirror reflects "most recent known" state.
- *Test-only `CombinedTelemetryDispatcher` wire push is now redundant but
harmless.* The dispatcher's manual `IngestCachedTelemetryAsync` call in
`CombinedTelemetryHarness` / `ParentExecutionIdCorrelationTests` still
executes; central's idempotent `InsertIfNotExistsAsync` swallows the
duplicate so it's a no-op. Removing it is a separate clean-up.
- *Per-actor cancellation gates both drains.* The lifecycle CTS (AuditLog-010)
is shared so `PostStop` cancels in-flight cached lookups + pushes at the
same instant as audit-only drains.
Build: `dotnet build ScadaLink.slnx` — 0 warnings, 0 errors.
Tests: `dotnet test tests/ScadaLink.AuditLog.Tests` — 250 passed, 1 failed
(`PartitionPurgeTests.EndToEnd_OldestPartition_PurgedViaActor_NewerKept`
pre-existing MS-SQL date-sensitive flake, called out in the prompt as
acceptable). `dotnet test tests/ScadaLink.SiteRuntime.Tests` — all 302
passed.
### AuditLog-002 — `SupervisorStrategy` comments claim Resume semantics but code returns the default Restart decider
+5 -7
View File
@@ -41,15 +41,15 @@ module file and counted in **Total**.
|----------|---------------|
| Critical | 0 |
| High | 0 |
| Medium | 1 |
| Medium | 0 |
| Low | 0 |
| **Total** | **1** |
| **Total** | **0** |
## Module Status
| Module | Last reviewed | Commit | Open (C/H/M/L) | Open | Total |
|--------|---------------|--------|----------------|------|-------|
| [AuditLog](AuditLog/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/1/0 | 1 | 11 |
| [AuditLog](AuditLog/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 11 |
| [CLI](CLI/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 23 |
| [CentralUI](CentralUI/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 33 |
| [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 14 |
@@ -88,11 +88,9 @@ _None open._
_None open._
### Medium (1)
### Medium (0)
| ID | Module | Title |
|----|--------|-------|
| AuditLog-001 | [AuditLog](AuditLog/findings.md) | Combined-telemetry transport is plumbed end-to-end but never invoked in production |
_None open._
### Low (0)
@@ -424,6 +424,20 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
}
}
// AuditLog-001: cached-lifecycle audit kinds that ride the combined-telemetry
// drain (joined with the operational tracking row + pushed via
// IngestCachedTelemetryAsync into the central dual-write transaction).
// ReadPendingAsync EXCLUDES these so the audit-only drain doesn't double-emit
// them; ReadPendingCachedTelemetryAsync below is the dedicated read surface
// the new SiteAuditTelemetryActor cached-drain uses.
private static readonly string[] CachedTelemetryKindNames =
{
nameof(AuditKind.CachedSubmit),
nameof(AuditKind.ApiCallCached),
nameof(AuditKind.DbWriteCached),
nameof(AuditKind.CachedResolve),
};
/// <inheritdoc />
public Task<IReadOnlyList<AuditEvent>> ReadPendingAsync(int limit, CancellationToken ct = default)
{
@@ -439,6 +453,11 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// writer connection. _readLock serialises this connection across
// multiple concurrent read callers since SqliteConnection itself is
// not thread-safe.
// AuditLog-001: NOT IN ($cached1,$cached2,$cached3,$cached4) excludes the
// cached-lifecycle kinds — they flow through ReadPendingCachedTelemetryAsync
// + the combined-telemetry drain. Kind is stored as the enum's name (see
// FlushBatch's pKind.Value), so a string-IN against the constant kind
// names matches the on-disk shape exactly.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
@@ -452,10 +471,63 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
ExecutionId, ParentExecutionId
FROM AuditLog
WHERE ForwardState = $pending
AND Kind NOT IN ($k0, $k1, $k2, $k3)
ORDER BY OccurredAtUtc ASC, EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]);
cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]);
cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]);
cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]);
cmd.Parameters.AddWithValue("$limit", limit);
var rows = new List<AuditEvent>(Math.Min(limit, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
}
}
/// <inheritdoc />
public Task<IReadOnlyList<AuditEvent>> ReadPendingCachedTelemetryAsync(
int limit, CancellationToken ct = default)
{
if (limit <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
}
// AuditLog-001: dedicated read surface for the cached-call lifecycle
// drain — symmetric to ReadPendingAsync but filtered to the four
// cached AuditKinds. Same _readConnection + _readLock pattern so the
// hot-path writer is not contended.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _readConnection.CreateCommand();
cmd.CommandText = """
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
ExecutionId, ParentExecutionId
FROM AuditLog
WHERE ForwardState = $pending
AND Kind IN ($k0, $k1, $k2, $k3)
ORDER BY OccurredAtUtc ASC, EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]);
cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]);
cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]);
cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]);
cmd.Parameters.AddWithValue("$limit", limit);
var rows = new List<AuditEvent>(Math.Min(limit, 256));
@@ -1,52 +1,81 @@
using Akka.Actor;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Site.Telemetry;
/// <summary>
/// Site-side actor that drains the local SQLite audit queue and pushes Pending
/// rows to central via the <c>IngestAuditEvents</c> gRPC RPC. On a successful
/// ack the matching EventIds flip to
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Forwarded"/>; on
/// a gRPC failure the rows stay Pending and the next drain retries.
/// rows to central via two parallel transports:
/// <list type="bullet">
/// <item><description><c>IngestAuditEvents</c> for the audit-only path —
/// sync ApiCall/DbWrite, NotifySend, InboundRequest and similar single-row
/// lifecycle events.</description></item>
/// <item><description><c>IngestCachedTelemetry</c> for the combined-telemetry
/// path — cached-call lifecycle rows (<c>CachedSubmit</c>,
/// <c>ApiCallCached</c>/<c>DbWriteCached</c>, <c>CachedResolve</c>) joined
/// with the matching <c>OperationTracking</c> row, written at central as a
/// single dual-write transaction (AuditLog + SiteCalls).</description></item>
/// </list>
/// </summary>
/// <remarks>
/// <para>
/// The drain self-tick is a private <c>Drain</c> message scheduled via the
/// actor system scheduler. The cadence is options-driven: <c>BusyIntervalSeconds</c>
/// when the previous drain found rows (or faulted — we want quick recovery),
/// <c>IdleIntervalSeconds</c> when the queue was empty.
/// The drain self-ticks via two private messages — <c>Drain</c> for the
/// audit-only path and <c>CachedDrain</c> for the combined path — each
/// scheduled independently. Cadence is options-driven:
/// <c>BusyIntervalSeconds</c> when the previous drain found rows (or faulted —
/// we want quick recovery), <c>IdleIntervalSeconds</c> when the queue was empty.
/// The two drains share the same cadence configuration but advance their own
/// timers so a stall on one path does not block the other.
/// </para>
/// <para>
/// Both collaborators are injected as interfaces (<see cref="ISiteAuditQueue"/>
/// and <see cref="ISiteStreamAuditClient"/>) so unit tests substitute with
/// NSubstitute and never touch real SQLite or gRPC.
/// Collaborators are injected as interfaces (<see cref="ISiteAuditQueue"/>,
/// <see cref="ISiteStreamAuditClient"/>, optional
/// <see cref="IOperationTrackingStore"/>) so unit tests substitute with
/// NSubstitute and never touch real SQLite or gRPC. The
/// <see cref="IOperationTrackingStore"/> is optional — central composition
/// roots and tests that don't exercise the cached path can leave it null, in
/// which case the cached-drain scheduler is never armed.
/// </para>
/// <para>
/// Per Bundle D's brief, audit-write paths must be fail-safe — a thrown
/// exception inside the actor MUST NOT crash it. The Drain handler wraps the
/// pipeline in a top-level try/catch that logs and re-schedules, and the
/// exception inside the actor MUST NOT crash it. Both Drain handlers wrap
/// their pipelines in a top-level try/catch that logs and re-schedules; the
/// actor's <see cref="SupervisorStrategy"/> defaults to
/// <see cref="Akka.Actor.SupervisorStrategy.DefaultStrategy"/>'s Restart for
/// child actors — but this actor has no children, so the catch is what matters.
/// child actors — but this actor has no children, so the catch is what
/// matters.
/// </para>
/// <para>
/// AuditLog-001: wires the previously-unreachable combined-telemetry transport.
/// Prior to this the cached audit rows flowed through the audit-only drain via
/// <c>IngestAuditEventsAsync</c> and the central <c>OnCachedTelemetryAsync</c>
/// dual-write handler was dead production code; the operational <c>SiteCalls</c>
/// half was never sent to central.
/// </para>
/// </remarks>
public class SiteAuditTelemetryActor : ReceiveActor
{
private readonly ISiteAuditQueue _queue;
private readonly ISiteStreamAuditClient _client;
private readonly IOperationTrackingStore? _trackingStore;
private readonly SiteAuditTelemetryOptions _options;
private readonly ILogger<SiteAuditTelemetryActor> _logger;
private ICancelable? _pendingTick;
private ICancelable? _pendingCachedTick;
// AuditLog-010: per-actor lifecycle CTS so an in-flight drain (queue read,
// gRPC push, mark-forwarded write) is actually cancelled when the actor is
// stopped — without it, a stuck IngestAuditEventsAsync would hold the
// continuation through CoordinatedShutdown's actor-system terminate window.
// Cancelled in PostStop; never reset (the actor is single-lifetime).
// The same CTS gates the cached-drain pipeline (queue read + tracking
// lookup + gRPC push) so both paths observe shutdown cooperatively.
private readonly CancellationTokenSource _lifecycleCts = new();
/// <summary>Initializes the actor with its drain queue, gRPC client, options, and logger.</summary>
@@ -54,11 +83,19 @@ public class SiteAuditTelemetryActor : ReceiveActor
/// <param name="client">The gRPC client used to push audit events to central.</param>
/// <param name="options">Telemetry options controlling drain intervals and batch size.</param>
/// <param name="logger">Logger instance.</param>
/// <param name="trackingStore">
/// Optional site-local operation tracking store. When supplied the actor
/// runs the combined-telemetry cached-drain in parallel with the audit-only
/// drain; when null (central composition roots, tests that don't exercise
/// cached calls) the cached scheduler is never armed and only the
/// audit-only drain runs.
/// </param>
public SiteAuditTelemetryActor(
ISiteAuditQueue queue,
ISiteStreamAuditClient client,
IOptions<SiteAuditTelemetryOptions> options,
ILogger<SiteAuditTelemetryActor> logger)
ILogger<SiteAuditTelemetryActor> logger,
IOperationTrackingStore? trackingStore = null)
{
ArgumentNullException.ThrowIfNull(queue);
ArgumentNullException.ThrowIfNull(client);
@@ -69,24 +106,31 @@ public class SiteAuditTelemetryActor : ReceiveActor
_client = client;
_options = options.Value;
_logger = logger;
_trackingStore = trackingStore;
ReceiveAsync<Drain>(_ => OnDrainAsync());
ReceiveAsync<CachedDrain>(_ => OnCachedDrainAsync());
}
/// <inheritdoc />
protected override void PreStart()
{
base.PreStart();
// Initial tick fires on the busy interval so the actor starts polling
// Initial ticks fire on the busy interval so both drains start polling
// soon after host startup. A subsequent empty drain will move to the
// idle interval naturally.
ScheduleNext(TimeSpan.FromSeconds(_options.BusyIntervalSeconds));
if (_trackingStore is not null)
{
ScheduleNextCached(TimeSpan.FromSeconds(_options.BusyIntervalSeconds));
}
}
/// <inheritdoc />
protected override void PostStop()
{
_pendingTick?.Cancel();
_pendingCachedTick?.Cancel();
// AuditLog-010: cancel any in-flight drain so a stuck queue read or
// gRPC push does not hold the continuation past actor stop.
try
@@ -166,6 +210,138 @@ public class SiteAuditTelemetryActor : ReceiveActor
}
}
/// <summary>
/// AuditLog-001: combined-telemetry drain. Reads cached-lifecycle audit
/// rows, joins each with the matching <see cref="IOperationTrackingStore"/>
/// snapshot, builds a <see cref="CachedTelemetryBatch"/>, and pushes via
/// <see cref="ISiteStreamAuditClient.IngestCachedTelemetryAsync"/>. Rows
/// whose tracking snapshot is missing (race with retention purge / late
/// audit row) are logged + skipped — the operational half will be
/// re-emitted on the next lifecycle event, and the audit row stays
/// <see cref="Commons.Types.Enums.AuditForwardState.Pending"/> so a later
/// drain (or reconciliation pull) can revisit it.
/// </summary>
private async Task OnCachedDrainAsync()
{
var nextDelay = TimeSpan.FromSeconds(_options.BusyIntervalSeconds);
var ct = _lifecycleCts.Token;
try
{
// _trackingStore is non-null by construction here — the cached
// scheduler is only armed when it was supplied (see PreStart).
// Defensive check kept for clarity and to silence the compiler's
// null-flow analysis.
if (_trackingStore is null)
{
return;
}
var pending = await _queue
.ReadPendingCachedTelemetryAsync(_options.BatchSize, ct)
.ConfigureAwait(false);
if (pending.Count == 0)
{
nextDelay = TimeSpan.FromSeconds(_options.IdleIntervalSeconds);
return;
}
var batch = new CachedTelemetryBatch();
var emittedEventIds = new List<Guid>(pending.Count);
foreach (var auditRow in pending)
{
if (auditRow.CorrelationId is null)
{
// CorrelationId carries the TrackedOperationId for cached
// rows — see CachedCallLifecycleBridge.BuildPacket. Without
// it we can't look up the tracking row; log + skip so the
// bad row doesn't block the rest of the batch. The audit
// row stays Pending (still not in emittedEventIds) and
// central reconciliation will pick it up.
_logger.LogWarning(
"Cached-telemetry drain: audit row {EventId} ({Kind}) has no CorrelationId; skipping.",
auditRow.EventId, auditRow.Kind);
continue;
}
TrackingStatusSnapshot? snapshot;
try
{
snapshot = await _trackingStore
.GetStatusAsync(new TrackedOperationId(auditRow.CorrelationId.Value), ct)
.ConfigureAwait(false);
}
catch (Exception ex)
{
// A tracking-store throw must NOT abort the rest of the
// batch — the audit half is best-effort. Log and skip
// this row; it stays Pending for the next drain.
_logger.LogWarning(ex,
"Cached-telemetry drain: tracking lookup threw for {EventId} (TrackedOperationId {Tid}); skipping.",
auditRow.EventId, auditRow.CorrelationId);
continue;
}
if (snapshot is null)
{
// No tracking row — possible if the audit row is older
// than the tracking retention window, or the tracking
// store was reset. The audit half remains valid and will
// be picked up by central reconciliation; skip the
// combined push for this row.
_logger.LogWarning(
"Cached-telemetry drain: no tracking snapshot for {EventId} (TrackedOperationId {Tid}); skipping.",
auditRow.EventId, auditRow.CorrelationId);
continue;
}
var packet = BuildCachedPacket(auditRow, snapshot);
batch.Packets.Add(packet);
emittedEventIds.Add(auditRow.EventId);
}
if (batch.Packets.Count == 0)
{
// Every row in this read was skipped (no CorrelationId / no
// tracking snapshot). Leave them Pending and try again next
// drain — the underlying race normally resolves on its own.
return;
}
IngestAck ack;
try
{
ack = await _client.IngestCachedTelemetryAsync(batch, ct)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"IngestCachedTelemetry push failed for {Count} cached events; will retry next drain.",
batch.Packets.Count);
return;
}
var acceptedIds = ParseAcceptedIds(ack);
if (acceptedIds.Count > 0)
{
await _queue.MarkForwardedAsync(acceptedIds, ct)
.ConfigureAwait(false);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error during cached-telemetry drain.");
}
finally
{
if (!_lifecycleCts.IsCancellationRequested && _trackingStore is not null)
{
ScheduleNextCached(nextDelay);
}
}
}
private static AuditEventBatch BuildBatch(IReadOnlyList<AuditEvent> events)
{
var batch = new AuditEventBatch();
@@ -176,6 +352,58 @@ public class SiteAuditTelemetryActor : ReceiveActor
return batch;
}
/// <summary>
/// AuditLog-001: build the combined wire packet from one cached audit row
/// + its matching operational tracking snapshot. The operational state
/// reflects the latest tracking row at emission time (not the per-event
/// status the audit row implies) because central's <c>SiteCalls</c>
/// upsert is monotonic — it never rolls back. The audit row preserves
/// per-event lifecycle granularity for the audit trail.
/// </summary>
private static CachedTelemetryPacket BuildCachedPacket(
AuditEvent auditRow, TrackingStatusSnapshot snapshot)
{
var sourceSite = auditRow.SourceSiteId ?? string.Empty;
// Channel string form mirrors the AuditChannel-to-string convention used
// by SiteCallOperational + CachedCallLifecycleBridge.BuildPacket.
var channelString = auditRow.Channel.ToString();
var target = auditRow.Target ?? snapshot.TargetSummary ?? string.Empty;
var operationalDto = new SiteCallOperationalDto
{
TrackedOperationId = snapshot.Id.Value.ToString("D"),
Channel = channelString,
Target = target,
SourceSite = sourceSite,
SourceNode = snapshot.SourceNode ?? string.Empty,
Status = snapshot.Status,
RetryCount = snapshot.RetryCount,
LastError = snapshot.LastError ?? string.Empty,
CreatedAtUtc = Timestamp.FromDateTime(EnsureUtc(snapshot.CreatedAtUtc)),
UpdatedAtUtc = Timestamp.FromDateTime(EnsureUtc(snapshot.UpdatedAtUtc)),
};
if (snapshot.HttpStatus.HasValue)
{
operationalDto.HttpStatus = snapshot.HttpStatus.Value;
}
if (snapshot.TerminalAtUtc.HasValue)
{
operationalDto.TerminalAtUtc =
Timestamp.FromDateTime(EnsureUtc(snapshot.TerminalAtUtc.Value));
}
return new CachedTelemetryPacket
{
AuditEvent = AuditEventDtoMapper.ToDto(auditRow),
Operational = operationalDto,
};
}
private static DateTime EnsureUtc(DateTime value) =>
value.Kind == DateTimeKind.Utc
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
private static IReadOnlyList<Guid> ParseAcceptedIds(IngestAck ack)
{
if (ack.AcceptedEventIds.Count == 0)
@@ -206,10 +434,31 @@ public class SiteAuditTelemetryActor : ReceiveActor
Self);
}
/// <summary>Self-tick message that triggers a drain cycle.</summary>
private void ScheduleNextCached(TimeSpan delay)
{
_pendingCachedTick?.Cancel();
_pendingCachedTick = Context.System.Scheduler.ScheduleTellOnceCancelable(
delay,
Self,
CachedDrain.Instance,
Self);
}
/// <summary>Self-tick message that triggers an audit-only drain cycle.</summary>
private sealed class Drain
{
public static readonly Drain Instance = new();
private Drain() { }
}
/// <summary>
/// Self-tick message that triggers a combined-telemetry drain cycle.
/// AuditLog-001: introduced alongside the cached-drain to keep the two
/// paths' cadences independent — a stall on one does not block the other.
/// </summary>
private sealed class CachedDrain
{
public static readonly CachedDrain Instance = new();
private CachedDrain() { }
}
}
@@ -33,10 +33,52 @@ public interface ISiteAuditQueue
/// oldest first. Idempotent — repeated calls before
/// <see cref="MarkForwardedAsync"/> will yield the same rows again.
/// </summary>
/// <remarks>
/// AuditLog-001: cached-lifecycle <see cref="AuditEvent.Kind"/>s
/// (<see cref="ScadaLink.Commons.Types.Enums.AuditKind.CachedSubmit"/>,
/// <see cref="ScadaLink.Commons.Types.Enums.AuditKind.ApiCallCached"/>,
/// <see cref="ScadaLink.Commons.Types.Enums.AuditKind.DbWriteCached"/>,
/// <see cref="ScadaLink.Commons.Types.Enums.AuditKind.CachedResolve"/>) are
/// EXCLUDED from this result — they ride the combined-telemetry drain via
/// <see cref="ReadPendingCachedTelemetryAsync"/> + the central
/// <c>OnCachedTelemetryAsync</c> dual-write transaction. The audit-only
/// drain handled by this method covers everything else (sync ApiCall /
/// DbWrite, NotifySend, InboundRequest, etc.).
/// </remarks>
/// <param name="limit">Maximum number of rows to return.</param>
/// <param name="ct">Cancellation token.</param>
Task<IReadOnlyList<AuditEvent>> ReadPendingAsync(int limit, CancellationToken ct = default);
/// <summary>
/// AuditLog-001: returns up to <paramref name="limit"/> rows in
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Pending"/>
/// whose <see cref="AuditEvent.Kind"/> belongs to the cached-call lifecycle
/// vocabulary (<see cref="ScadaLink.Commons.Types.Enums.AuditKind.CachedSubmit"/>,
/// <see cref="ScadaLink.Commons.Types.Enums.AuditKind.ApiCallCached"/>,
/// <see cref="ScadaLink.Commons.Types.Enums.AuditKind.DbWriteCached"/>,
/// <see cref="ScadaLink.Commons.Types.Enums.AuditKind.CachedResolve"/>),
/// oldest first. The site-side <c>SiteAuditTelemetryActor</c> drains these
/// rows separately, joining each with the matching operational tracking row
/// (<c>IOperationTrackingStore.GetStatusAsync</c>) before pushing the
/// combined <c>CachedTelemetryBatch</c> via
/// <c>ISiteStreamAuditClient.IngestCachedTelemetryAsync</c>. Idempotent —
/// repeated calls before <see cref="MarkForwardedAsync"/> yield the same
/// rows again.
/// </summary>
/// <remarks>
/// The two-drain partition is the production wiring of the combined-telemetry
/// transport specified in Component-AuditLog.md §"Cached Operations —
/// Combined Telemetry": cached rows MUST flow with their matching
/// <c>SiteCalls</c> upsert through one MS SQL transaction at central. The
/// pre-AuditLog-001 implementation drained cached rows through the
/// audit-only path, leaving the operational half unsent and the central
/// dual-write handler unreachable. Returning them via this dedicated read
/// surface lets the new drain join with the tracking store before push.
/// </remarks>
/// <param name="limit">Maximum number of rows to return.</param>
/// <param name="ct">Cancellation token.</param>
Task<IReadOnlyList<AuditEvent>> ReadPendingCachedTelemetryAsync(int limit, CancellationToken ct = default);
/// <summary>
/// Flips the supplied EventIds from
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Pending"/> to
+15 -3
View File
@@ -796,17 +796,29 @@ akka {{
var siteAuditLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor>();
// AuditLog-001: resolve the site-local operation tracking store so the
// actor can run the combined-telemetry cached-drain in parallel with
// the audit-only drain. The store is registered by AddSiteRuntime on
// site composition roots; on central it is intentionally absent and
// the cached-drain scheduler is never armed (the central side has no
// outbound cached calls to track). GetService — null when not
// registered — matches the optional-param contract on the actor ctor.
var siteTrackingStore = _serviceProvider
.GetService<ScadaLink.Commons.Interfaces.IOperationTrackingStore>();
var siteAuditTelemetryProps = Props.Create(() =>
new ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor(
siteAuditQueue,
siteAuditClient,
siteAuditOptions,
siteAuditLogger))
siteAuditLogger,
siteTrackingStore))
.WithDispatcher("audit-telemetry-dispatcher");
_actorSystem.ActorOf(siteAuditTelemetryProps, "site-audit-telemetry");
_logger.LogInformation(
"SiteAuditTelemetryActor created (dispatcher=audit-telemetry-dispatcher, client={ClientType})",
siteAuditClient.GetType().Name);
"SiteAuditTelemetryActor created (dispatcher=audit-telemetry-dispatcher, client={ClientType}, cachedDrain={CachedDrainEnabled})",
siteAuditClient.GetType().Name,
siteTrackingStore is not null);
// Gate gRPC subscriptions until the actor system and SiteStreamManager are
// initialized (REQ-HOST-7).
@@ -495,14 +495,20 @@ public class ParentExecutionIdCorrelationTests : TestKit, IClassFixture<MsSqlMig
async () =>
{
var pending = await sqliteWriter.ReadPendingAsync(256);
// AuditLog-001: ReadPendingAsync now excludes the cached-lifecycle
// kinds (they ride the combined-telemetry drain), so we union
// them in via the dedicated read surface to keep the durability
// assertion covering EVERY expected Kind.
var pendingCached = await sqliteWriter.ReadPendingCachedTelemetryAsync(256);
var forwarded = await sqliteWriter.ReadForwardedAsync(256);
var kinds = pending.Concat(forwarded).Select(r => r.Kind).ToHashSet();
var kinds = pending.Concat(pendingCached).Concat(forwarded)
.Select(r => r.Kind).ToHashSet();
var missing = expectedKinds.Where(k => !kinds.Contains(k)).ToList();
Assert.True(
missing.Count == 0,
"Expected every routed-run audit Kind durably in SQLite; missing: "
+ string.Join(", ", missing)
+ $" (saw {pending.Count} Pending + {forwarded.Count} Forwarded).");
+ $" (saw {pending.Count} Pending + {pendingCached.Count} PendingCached + {forwarded.Count} Forwarded).");
},
TimeSpan.FromSeconds(30),
TimeSpan.FromMilliseconds(50));
@@ -7,7 +7,9 @@ using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Communication.Grpc;
@@ -24,6 +26,7 @@ public class SiteAuditTelemetryActorTests : TestKit
{
private readonly ISiteAuditQueue _queue = Substitute.For<ISiteAuditQueue>();
private readonly ISiteStreamAuditClient _client = Substitute.For<ISiteStreamAuditClient>();
private readonly IOperationTrackingStore _trackingStore = Substitute.For<IOperationTrackingStore>();
/// <summary>
/// Fast options so tests don't stall waiting for the scheduler. 1s busy /
@@ -46,7 +49,22 @@ public class SiteAuditTelemetryActorTests : TestKit
_queue,
_client,
options ?? Opts(),
NullLogger<SiteAuditTelemetryActor>.Instance)));
NullLogger<SiteAuditTelemetryActor>.Instance,
(IOperationTrackingStore?)null)));
/// <summary>
/// AuditLog-001: builds an actor with the optional
/// <see cref="IOperationTrackingStore"/> wired in so the cached-drain
/// scheduler is armed alongside the audit-only drain. Used by the new
/// cached-drain regression tests below.
/// </summary>
private IActorRef CreateActorWithCachedDrain(IOptions<SiteAuditTelemetryOptions>? options = null) =>
Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
_queue,
_client,
options ?? Opts(),
NullLogger<SiteAuditTelemetryActor>.Instance,
(IOperationTrackingStore?)_trackingStore)));
private static AuditEvent NewEvent(Guid? id = null) => new()
{
@@ -233,4 +251,206 @@ public class SiteAuditTelemetryActorTests : TestKit
Arg.Is<IReadOnlyList<Guid>>(g => g.Count == 3 && g.ToHashSet().SetEquals(ackedSet)),
Arg.Any<CancellationToken>());
}
// ────────────────────────────────────────────────────────────────────────
// AuditLog-001: combined-telemetry cached-drain regression tests. Verify
// that the production wiring of the previously-unreachable cached transport
// routes cached rows through ReadPendingCachedTelemetryAsync +
// IngestCachedTelemetryAsync (and NOT IngestAuditEventsAsync), and that
// orphaned audit rows (no tracking snapshot) are logged + skipped rather
// than crashing the drain.
// ────────────────────────────────────────────────────────────────────────
private static AuditEvent NewCachedEvent(
AuditKind kind = AuditKind.CachedSubmit,
Guid? eventId = null,
Guid? correlationId = null,
string sourceSiteId = "site-1") => new()
{
EventId = eventId ?? Guid.NewGuid(),
OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
Channel = AuditChannel.ApiOutbound,
Kind = kind,
Status = AuditStatus.Submitted,
SourceSiteId = sourceSiteId,
Target = "ERP.GetOrder",
CorrelationId = correlationId ?? Guid.NewGuid(),
ForwardState = AuditForwardState.Pending,
};
private static TrackingStatusSnapshot NewSnapshot(
TrackedOperationId id,
string status = "Submitted",
int retryCount = 0) => new(
Id: id,
Kind: nameof(AuditKind.ApiCallCached),
TargetSummary: "ERP.GetOrder",
Status: status,
RetryCount: retryCount,
LastError: null,
HttpStatus: null,
CreatedAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
UpdatedAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
TerminalAtUtc: null,
SourceInstanceId: "instance-1",
SourceScript: "script-1",
SourceNode: "node-a");
[Fact]
public async Task CachedDrain_CachedRows_RouteThrough_IngestCachedTelemetry_NotIngestAuditEvents()
{
// Arrange — three cached audit rows on the cached queue, each with a
// matching tracking snapshot. The audit-only queue is empty (those
// rows are excluded by ReadPendingAsync after AuditLog-001).
var cachedRows = new[]
{
NewCachedEvent(AuditKind.CachedSubmit),
NewCachedEvent(AuditKind.ApiCallCached),
NewCachedEvent(AuditKind.CachedResolve),
};
_queue.ReadPendingAsync(Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(Task.FromResult<IReadOnlyList<AuditEvent>>(Array.Empty<AuditEvent>()));
_queue.ReadPendingCachedTelemetryAsync(Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(
Task.FromResult<IReadOnlyList<AuditEvent>>(cachedRows),
Task.FromResult<IReadOnlyList<AuditEvent>>(Array.Empty<AuditEvent>()));
foreach (var row in cachedRows)
{
var tid = new TrackedOperationId(row.CorrelationId!.Value);
_trackingStore.GetStatusAsync(tid, Arg.Any<CancellationToken>())
.Returns(Task.FromResult<TrackingStatusSnapshot?>(NewSnapshot(tid)));
}
CachedTelemetryBatch? capturedBatch = null;
_client.IngestCachedTelemetryAsync(Arg.Any<CachedTelemetryBatch>(), Arg.Any<CancellationToken>())
.Returns(call =>
{
capturedBatch = call.Arg<CachedTelemetryBatch>();
var ack = new IngestAck();
foreach (var packet in capturedBatch.Packets)
{
ack.AcceptedEventIds.Add(packet.AuditEvent.EventId);
}
return Task.FromResult(ack);
});
// Act
CreateActorWithCachedDrain();
// Assert — exactly one IngestCachedTelemetryAsync push containing all
// three packets, and zero IngestAuditEventsAsync pushes (the audit-only
// drain saw an empty queue).
await AwaitAssertAsync(async () =>
{
await _client.Received(1).IngestCachedTelemetryAsync(
Arg.Any<CachedTelemetryBatch>(), Arg.Any<CancellationToken>());
await _queue.Received(1).MarkForwardedAsync(
Arg.Is<IReadOnlyList<Guid>>(g => g.Count == 3), Arg.Any<CancellationToken>());
}, TimeSpan.FromSeconds(5));
Assert.NotNull(capturedBatch);
Assert.Equal(3, capturedBatch!.Packets.Count);
await _client.DidNotReceiveWithAnyArgs().IngestAuditEventsAsync(default!, default);
var emittedEventIds = capturedBatch.Packets
.Select(p => Guid.Parse(p.AuditEvent.EventId))
.ToHashSet();
var expectedIds = cachedRows.Select(r => r.EventId).ToHashSet();
Assert.Equal(expectedIds, emittedEventIds);
}
[Fact]
public async Task CachedDrain_OrphanRow_NoTrackingSnapshot_IsSkipped_DoesNotCrash()
{
// Arrange — two cached audit rows: one with a tracking snapshot, one
// orphaned (the tracking store returns null). The orphaned row must be
// skipped without aborting the batch — the valid row still flows.
var orphan = NewCachedEvent(AuditKind.CachedSubmit);
var valid = NewCachedEvent(AuditKind.CachedResolve);
_queue.ReadPendingAsync(Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(Task.FromResult<IReadOnlyList<AuditEvent>>(Array.Empty<AuditEvent>()));
_queue.ReadPendingCachedTelemetryAsync(Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(
Task.FromResult<IReadOnlyList<AuditEvent>>(new[] { orphan, valid }),
Task.FromResult<IReadOnlyList<AuditEvent>>(Array.Empty<AuditEvent>()));
// orphan: tracking returns null
_trackingStore.GetStatusAsync(
new TrackedOperationId(orphan.CorrelationId!.Value),
Arg.Any<CancellationToken>())
.Returns(Task.FromResult<TrackingStatusSnapshot?>(null));
// valid: tracking returns a snapshot
var validTid = new TrackedOperationId(valid.CorrelationId!.Value);
_trackingStore.GetStatusAsync(validTid, Arg.Any<CancellationToken>())
.Returns(Task.FromResult<TrackingStatusSnapshot?>(NewSnapshot(validTid, "Delivered")));
CachedTelemetryBatch? capturedBatch = null;
_client.IngestCachedTelemetryAsync(Arg.Any<CachedTelemetryBatch>(), Arg.Any<CancellationToken>())
.Returns(call =>
{
capturedBatch = call.Arg<CachedTelemetryBatch>();
var ack = new IngestAck();
foreach (var packet in capturedBatch.Packets)
{
ack.AcceptedEventIds.Add(packet.AuditEvent.EventId);
}
return Task.FromResult(ack);
});
// Act
CreateActorWithCachedDrain();
// Assert — exactly one push containing ONLY the valid row; the orphan
// is skipped and stays Pending (not in MarkForwardedAsync's id list).
await AwaitAssertAsync(async () =>
{
await _client.Received(1).IngestCachedTelemetryAsync(
Arg.Any<CachedTelemetryBatch>(), Arg.Any<CancellationToken>());
}, TimeSpan.FromSeconds(5));
Assert.NotNull(capturedBatch);
Assert.Single(capturedBatch!.Packets);
Assert.Equal(valid.EventId.ToString(), capturedBatch.Packets[0].AuditEvent.EventId);
await _queue.Received(1).MarkForwardedAsync(
Arg.Is<IReadOnlyList<Guid>>(g => g.Count == 1 && g[0] == valid.EventId),
Arg.Any<CancellationToken>());
}
[Fact]
public async Task AuditOnlyDrain_StillFlows_When_CachedDrain_IsDisabled()
{
// Arrange — ordinary (non-cached) audit rows on the audit-only queue;
// the actor is constructed WITHOUT a tracking store so the cached
// scheduler is never armed. Regression guard against the audit-only
// drain regressing during the AuditLog-001 refactor.
var rows = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList();
_queue.ReadPendingAsync(Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(
Task.FromResult<IReadOnlyList<AuditEvent>>(rows),
Task.FromResult<IReadOnlyList<AuditEvent>>(Array.Empty<AuditEvent>()));
_client.IngestAuditEventsAsync(Arg.Any<AuditEventBatch>(), Arg.Any<CancellationToken>())
.Returns(_ => Task.FromResult(AckAll(rows)));
// Act — note: CreateActor (no tracking store), not CreateActorWithCachedDrain.
CreateActor();
// Assert — audit-only drain flows normally; the cached client is
// never called and ReadPendingCachedTelemetryAsync is never queried.
await AwaitAssertAsync(async () =>
{
await _client.Received(1).IngestAuditEventsAsync(
Arg.Any<AuditEventBatch>(), Arg.Any<CancellationToken>());
await _queue.Received(1).MarkForwardedAsync(
Arg.Is<IReadOnlyList<Guid>>(g => g.Count == 3), Arg.Any<CancellationToken>());
}, TimeSpan.FromSeconds(5));
await _client.DidNotReceiveWithAnyArgs().IngestCachedTelemetryAsync(default!, default);
await _queue.DidNotReceiveWithAnyArgs().ReadPendingCachedTelemetryAsync(default, default);
}
}