From c1fe1c4f83462a76c0220d7f38967d264f2c0566 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 28 May 2026 09:08:43 -0400 Subject: [PATCH] =?UTF-8?q?feat(audit):=20close=20AuditLog-001=20=E2=80=94?= =?UTF-8?q?=20wire=20combined-telemetry=20dual-write=20transport?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the last open code-review finding. The unreachable IngestCachedTelemetryAsync path now carries production cached-call lifecycle traffic, delivering the design's "AuditLog + SiteCalls in one MS SQL transaction" guarantee. Before this commit, the SiteCalls operational half had NO production transport at all — central's SiteCallAuditActor.OnUpsertAsync had zero producers, so cached-call operational state never reached the central mirror. Site-side partition (so neither path double-emits): - ISiteAuditQueue.ReadPendingCachedTelemetryAsync — new method returning rows where Kind ∈ {CachedSubmit, ApiCallCached, DbWriteCached, CachedResolve} AND ForwardState = Pending. - ISiteAuditQueue.ReadPendingAsync — XML doc updated, SQLite impl now filters Kind NOT IN the cached set so cached rows no longer ride the audit-only drain. New cached-drain in SiteAuditTelemetryActor: - Optional IOperationTrackingStore? ctor param (null on central composition roots — the cached scheduler is never armed there). - Independent CachedDrain message + scheduler tick parallel to the existing Drain — a stall on one path can't block the other; shared lifecycle CTS gates both. - OnCachedDrainAsync: reads cached audit rows, joins each with its matching SiteCallOperational snapshot via CorrelationId → TrackedOperationId from the tracking store, builds CachedTelemetryBatch, pushes via IngestCachedTelemetryAsync, marks ack'd rows Forwarded. - Orphan rows (no tracking snapshot, thrown tracking-store call, missing CorrelationId) logged at Warning + skipped — they stay Pending so reconciliation/retry picks them up later. Best-effort contract preserved. Central side: AuditLogIngestActor.OnCachedTelemetryAsync was already implemented (M3 Bundle G dead code today, alive after this commit) — performs InsertIfNotExists for AuditLog + UpsertAsync for SiteCalls inside a BeginTransactionAsync. The handler is idempotent on EventId, so any duplicate arrivals from concurrent push + reconciliation are silent no-ops. Composition root: AkkaHostedService now resolves IOperationTrackingStore via GetService<>() (site-only) and threads it through the actor's Props.Create. Tests added (+3 in SiteAuditTelemetryActorTests): - Cached rows route through the new transport, not the audit-only drain. - Orphan cached row (no tracking match) is logged + skipped, drain doesn't crash. - Ordinary audit rows still flow through the audit-only drain unchanged. - ParentExecutionIdCorrelationTests now unions both queues to assert all expected Kinds remain covered after the partition. Build clean; AuditLog.Tests 250/251 (the 1 fail is the pre-existing date-sensitive PartitionPurgeTests integration flake explicitly accepted across the session); SiteRuntime.Tests 302/302. README regenerated: 0 pending of 481 total. Session-final totals: 136 of 136 originally-open Theme findings closed across 11 commits (10 themed batches + this architectural close). --- code-reviews/AuditLog/findings.md | 73 ++++- code-reviews/README.md | 12 +- .../Site/SqliteAuditWriter.cs | 72 +++++ .../Site/Telemetry/SiteAuditTelemetryActor.cs | 283 ++++++++++++++++-- .../Interfaces/Services/ISiteAuditQueue.cs | 42 +++ .../Actors/AkkaHostedService.cs | 18 +- .../ParentExecutionIdCorrelationTests.cs | 10 +- .../Telemetry/SiteAuditTelemetryActorTests.cs | 222 +++++++++++++- 8 files changed, 698 insertions(+), 34 deletions(-) diff --git a/code-reviews/AuditLog/findings.md b/code-reviews/AuditLog/findings.md index e7ab7f8c..ba89a51d 100644 --- a/code-reviews/AuditLog/findings.md +++ b/code-reviews/AuditLog/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-28 | | Reviewer | claude-agent | | Commit reviewed | `1eb6e97` | -| Open findings | 1 | +| Open findings | 0 | ## Summary @@ -65,7 +65,7 @@ chain doesn't reject a central composition root that mistakenly calls the site b |--|--| | Severity | Medium | | Category | Design-document adherence | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs:45`, `src/ScadaLink.AuditLog/Site/Telemetry/ClusterClientSiteAuditClient.cs:86`, `src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs:198` | **Description** @@ -101,9 +101,74 @@ unreachable `OnCachedTelemetryAsync` dual-write code (after confirming the `AuditLogIngestActorCombinedTelemetryTests` integration tests exercise it via direct actor injection only). -**Resolution** +**Resolution (2026-05-28):** -_Unresolved._ +Wired the combined-telemetry transport end-to-end via recommendation (a). The +previously-unreachable `IngestCachedTelemetryAsync` client path now carries +cached-call lifecycle rows from the site SQLite hot-path through to the central +`AuditLogIngestActor.OnCachedTelemetryAsync` dual-write transaction. Changes: + +- **`ISiteAuditQueue`** (`src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs`): + added `ReadPendingCachedTelemetryAsync(int, CancellationToken)` returning + rows in `AuditForwardState.Pending` whose `Kind` is one of `CachedSubmit`, + `ApiCallCached`, `DbWriteCached`, `CachedResolve`. Updated `ReadPendingAsync` + XML doc to call out the partition. +- **`SqliteAuditWriter`** (`src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs`): + implemented `ReadPendingCachedTelemetryAsync` with a `Kind IN (...)` filter + reusing the existing `_readConnection` / `_readLock` decoupling; modified + `ReadPendingAsync` to add the symmetric `Kind NOT IN (...)` predicate so the + audit-only drain no longer double-emits cached rows. +- **`SiteAuditTelemetryActor`** (`src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs`): + added an optional `IOperationTrackingStore?` constructor parameter, a sibling + `CachedDrain` self-tick message, and an `OnCachedDrainAsync` handler running + in parallel with the existing audit-only drain. The cached-drain reads the + partitioned audit rows, joins each with the matching tracking-store + snapshot (looked up by `TrackedOperationId` via `CorrelationId`), builds a + `CachedTelemetryBatch`, pushes via `IngestCachedTelemetryAsync`, and marks + ack'd EventIds Forwarded. Orphan rows (no matching tracking snapshot, or a + thrown tracking-store call) are logged + skipped so the bad row never + blocks the rest of the batch; rows stay Pending and reconciliation / + retention handles them. The lifecycle CTS (AuditLog-010) gates both drains + uniformly. +- **`AkkaHostedService`** (`src/ScadaLink.Host/Actors/AkkaHostedService.cs`): + resolves `IOperationTrackingStore` via `GetService` (site-only registration) + and threads it through the actor's `Props.Create`. Central composition + roots and tests that don't register the tracking store get the legacy + audit-only behaviour — the cached scheduler is never armed. +- **Tests** (`tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs`): + added three regression tests asserting (1) cached rows route through + `IngestCachedTelemetryAsync` and NOT `IngestAuditEventsAsync`, (2) an + orphan row with no tracking snapshot is logged + skipped without crashing + the drain, (3) the audit-only drain still flows when the cached drain is + disabled (null tracking store). Updated `WaitForSiteRowsPersistedAsync` in + `ParentExecutionIdCorrelationTests` to union `ReadPendingCachedTelemetryAsync` + into the durability check — its `ReadPendingAsync(256) ∪ ReadForwardedAsync(256)` + assertion previously missed the cached kinds after the partition change. + +**Design notes / caveats.** + +- *Operational state at emission time is the latest tracking row, not the + per-event status.* The original spec described one combined packet per + lifecycle event, but the production wiring keeps the existing + `CachedCallTelemetryForwarder` dual-write (audit + tracking) and uses the + drain as a join. Central's `SiteCalls` upsert is monotonic so this is + consistent with the broader design — the audit row preserves per-event + granularity, the SiteCalls mirror reflects "most recent known" state. +- *Test-only `CombinedTelemetryDispatcher` wire push is now redundant but + harmless.* The dispatcher's manual `IngestCachedTelemetryAsync` call in + `CombinedTelemetryHarness` / `ParentExecutionIdCorrelationTests` still + executes; central's idempotent `InsertIfNotExistsAsync` swallows the + duplicate so it's a no-op. Removing it is a separate clean-up. +- *Per-actor cancellation gates both drains.* The lifecycle CTS (AuditLog-010) + is shared so `PostStop` cancels in-flight cached lookups + pushes at the + same instant as audit-only drains. + +Build: `dotnet build ScadaLink.slnx` — 0 warnings, 0 errors. +Tests: `dotnet test tests/ScadaLink.AuditLog.Tests` — 250 passed, 1 failed +(`PartitionPurgeTests.EndToEnd_OldestPartition_PurgedViaActor_NewerKept` — +pre-existing MS-SQL date-sensitive flake, called out in the prompt as +acceptable). `dotnet test tests/ScadaLink.SiteRuntime.Tests` — all 302 +passed. ### AuditLog-002 — `SupervisorStrategy` comments claim Resume semantics but code returns the default Restart decider diff --git a/code-reviews/README.md b/code-reviews/README.md index 8d8a1b67..d4918f75 100644 --- a/code-reviews/README.md +++ b/code-reviews/README.md @@ -41,15 +41,15 @@ module file and counted in **Total**. |----------|---------------| | Critical | 0 | | High | 0 | -| Medium | 1 | +| Medium | 0 | | Low | 0 | -| **Total** | **1** | +| **Total** | **0** | ## Module Status | Module | Last reviewed | Commit | Open (C/H/M/L) | Open | Total | |--------|---------------|--------|----------------|------|-------| -| [AuditLog](AuditLog/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/1/0 | 1 | 11 | +| [AuditLog](AuditLog/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 11 | | [CLI](CLI/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 23 | | [CentralUI](CentralUI/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 33 | | [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 14 | @@ -88,11 +88,9 @@ _None open._ _None open._ -### Medium (1) +### Medium (0) -| ID | Module | Title | -|----|--------|-------| -| AuditLog-001 | [AuditLog](AuditLog/findings.md) | Combined-telemetry transport is plumbed end-to-end but never invoked in production | +_None open._ ### Low (0) diff --git a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs index 7cd71470..dac90b7b 100644 --- a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs +++ b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs @@ -424,6 +424,20 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable } } + // AuditLog-001: cached-lifecycle audit kinds that ride the combined-telemetry + // drain (joined with the operational tracking row + pushed via + // IngestCachedTelemetryAsync into the central dual-write transaction). + // ReadPendingAsync EXCLUDES these so the audit-only drain doesn't double-emit + // them; ReadPendingCachedTelemetryAsync below is the dedicated read surface + // the new SiteAuditTelemetryActor cached-drain uses. + private static readonly string[] CachedTelemetryKindNames = + { + nameof(AuditKind.CachedSubmit), + nameof(AuditKind.ApiCallCached), + nameof(AuditKind.DbWriteCached), + nameof(AuditKind.CachedResolve), + }; + /// public Task> ReadPendingAsync(int limit, CancellationToken ct = default) { @@ -439,6 +453,11 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable // writer connection. _readLock serialises this connection across // multiple concurrent read callers since SqliteConnection itself is // not thread-safe. + // AuditLog-001: NOT IN ($cached1,$cached2,$cached3,$cached4) excludes the + // cached-lifecycle kinds — they flow through ReadPendingCachedTelemetryAsync + // + the combined-telemetry drain. Kind is stored as the enum's name (see + // FlushBatch's pKind.Value), so a string-IN against the constant kind + // names matches the on-disk shape exactly. lock (_readLock) { ObjectDisposedException.ThrowIf(_disposed, this); @@ -452,10 +471,63 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable ExecutionId, ParentExecutionId FROM AuditLog WHERE ForwardState = $pending + AND Kind NOT IN ($k0, $k1, $k2, $k3) ORDER BY OccurredAtUtc ASC, EventId ASC LIMIT $limit; """; cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString()); + cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]); + cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]); + cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]); + cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]); + cmd.Parameters.AddWithValue("$limit", limit); + + var rows = new List(Math.Min(limit, 256)); + using var reader = cmd.ExecuteReader(); + while (reader.Read()) + { + rows.Add(MapRow(reader)); + } + + return Task.FromResult>(rows); + } + } + + /// + public Task> ReadPendingCachedTelemetryAsync( + int limit, CancellationToken ct = default) + { + if (limit <= 0) + { + throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0."); + } + + // AuditLog-001: dedicated read surface for the cached-call lifecycle + // drain — symmetric to ReadPendingAsync but filtered to the four + // cached AuditKinds. Same _readConnection + _readLock pattern so the + // hot-path writer is not contended. + lock (_readLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + using var cmd = _readConnection.CreateCommand(); + cmd.CommandText = """ + SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId, + SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target, + Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, + RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState, + ExecutionId, ParentExecutionId + FROM AuditLog + WHERE ForwardState = $pending + AND Kind IN ($k0, $k1, $k2, $k3) + ORDER BY OccurredAtUtc ASC, EventId ASC + LIMIT $limit; + """; + cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString()); + cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]); + cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]); + cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]); + cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]); cmd.Parameters.AddWithValue("$limit", limit); var rows = new List(Math.Min(limit, 256)); diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs index 3b10c8cf..6f50ea66 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs @@ -1,52 +1,81 @@ using Akka.Actor; +using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces; using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types; using ScadaLink.Communication.Grpc; namespace ScadaLink.AuditLog.Site.Telemetry; /// /// Site-side actor that drains the local SQLite audit queue and pushes Pending -/// rows to central via the IngestAuditEvents gRPC RPC. On a successful -/// ack the matching EventIds flip to -/// ; on -/// a gRPC failure the rows stay Pending and the next drain retries. +/// rows to central via two parallel transports: +/// +/// IngestAuditEvents for the audit-only path — +/// sync ApiCall/DbWrite, NotifySend, InboundRequest and similar single-row +/// lifecycle events. +/// IngestCachedTelemetry for the combined-telemetry +/// path — cached-call lifecycle rows (CachedSubmit, +/// ApiCallCached/DbWriteCached, CachedResolve) joined +/// with the matching OperationTracking row, written at central as a +/// single dual-write transaction (AuditLog + SiteCalls). +/// /// /// /// -/// The drain self-tick is a private Drain message scheduled via the -/// actor system scheduler. The cadence is options-driven: BusyIntervalSeconds -/// when the previous drain found rows (or faulted — we want quick recovery), -/// IdleIntervalSeconds when the queue was empty. +/// The drain self-ticks via two private messages — Drain for the +/// audit-only path and CachedDrain for the combined path — each +/// scheduled independently. Cadence is options-driven: +/// BusyIntervalSeconds when the previous drain found rows (or faulted — +/// we want quick recovery), IdleIntervalSeconds when the queue was empty. +/// The two drains share the same cadence configuration but advance their own +/// timers so a stall on one path does not block the other. /// /// -/// Both collaborators are injected as interfaces ( -/// and ) so unit tests substitute with -/// NSubstitute and never touch real SQLite or gRPC. +/// Collaborators are injected as interfaces (, +/// , optional +/// ) so unit tests substitute with +/// NSubstitute and never touch real SQLite or gRPC. The +/// is optional — central composition +/// roots and tests that don't exercise the cached path can leave it null, in +/// which case the cached-drain scheduler is never armed. /// /// /// Per Bundle D's brief, audit-write paths must be fail-safe — a thrown -/// exception inside the actor MUST NOT crash it. The Drain handler wraps the -/// pipeline in a top-level try/catch that logs and re-schedules, and the +/// exception inside the actor MUST NOT crash it. Both Drain handlers wrap +/// their pipelines in a top-level try/catch that logs and re-schedules; the /// actor's defaults to /// 's Restart for -/// child actors — but this actor has no children, so the catch is what matters. +/// child actors — but this actor has no children, so the catch is what +/// matters. +/// +/// +/// AuditLog-001: wires the previously-unreachable combined-telemetry transport. +/// Prior to this the cached audit rows flowed through the audit-only drain via +/// IngestAuditEventsAsync and the central OnCachedTelemetryAsync +/// dual-write handler was dead production code; the operational SiteCalls +/// half was never sent to central. /// /// public class SiteAuditTelemetryActor : ReceiveActor { private readonly ISiteAuditQueue _queue; private readonly ISiteStreamAuditClient _client; + private readonly IOperationTrackingStore? _trackingStore; private readonly SiteAuditTelemetryOptions _options; private readonly ILogger _logger; private ICancelable? _pendingTick; + private ICancelable? _pendingCachedTick; // AuditLog-010: per-actor lifecycle CTS so an in-flight drain (queue read, // gRPC push, mark-forwarded write) is actually cancelled when the actor is // stopped — without it, a stuck IngestAuditEventsAsync would hold the // continuation through CoordinatedShutdown's actor-system terminate window. // Cancelled in PostStop; never reset (the actor is single-lifetime). + // The same CTS gates the cached-drain pipeline (queue read + tracking + // lookup + gRPC push) so both paths observe shutdown cooperatively. private readonly CancellationTokenSource _lifecycleCts = new(); /// Initializes the actor with its drain queue, gRPC client, options, and logger. @@ -54,11 +83,19 @@ public class SiteAuditTelemetryActor : ReceiveActor /// The gRPC client used to push audit events to central. /// Telemetry options controlling drain intervals and batch size. /// Logger instance. + /// + /// Optional site-local operation tracking store. When supplied the actor + /// runs the combined-telemetry cached-drain in parallel with the audit-only + /// drain; when null (central composition roots, tests that don't exercise + /// cached calls) the cached scheduler is never armed and only the + /// audit-only drain runs. + /// public SiteAuditTelemetryActor( ISiteAuditQueue queue, ISiteStreamAuditClient client, IOptions options, - ILogger logger) + ILogger logger, + IOperationTrackingStore? trackingStore = null) { ArgumentNullException.ThrowIfNull(queue); ArgumentNullException.ThrowIfNull(client); @@ -69,24 +106,31 @@ public class SiteAuditTelemetryActor : ReceiveActor _client = client; _options = options.Value; _logger = logger; + _trackingStore = trackingStore; ReceiveAsync(_ => OnDrainAsync()); + ReceiveAsync(_ => OnCachedDrainAsync()); } /// protected override void PreStart() { base.PreStart(); - // Initial tick fires on the busy interval so the actor starts polling + // Initial ticks fire on the busy interval so both drains start polling // soon after host startup. A subsequent empty drain will move to the // idle interval naturally. ScheduleNext(TimeSpan.FromSeconds(_options.BusyIntervalSeconds)); + if (_trackingStore is not null) + { + ScheduleNextCached(TimeSpan.FromSeconds(_options.BusyIntervalSeconds)); + } } /// protected override void PostStop() { _pendingTick?.Cancel(); + _pendingCachedTick?.Cancel(); // AuditLog-010: cancel any in-flight drain so a stuck queue read or // gRPC push does not hold the continuation past actor stop. try @@ -166,6 +210,138 @@ public class SiteAuditTelemetryActor : ReceiveActor } } + /// + /// AuditLog-001: combined-telemetry drain. Reads cached-lifecycle audit + /// rows, joins each with the matching + /// snapshot, builds a , and pushes via + /// . Rows + /// whose tracking snapshot is missing (race with retention purge / late + /// audit row) are logged + skipped — the operational half will be + /// re-emitted on the next lifecycle event, and the audit row stays + /// so a later + /// drain (or reconciliation pull) can revisit it. + /// + private async Task OnCachedDrainAsync() + { + var nextDelay = TimeSpan.FromSeconds(_options.BusyIntervalSeconds); + var ct = _lifecycleCts.Token; + try + { + // _trackingStore is non-null by construction here — the cached + // scheduler is only armed when it was supplied (see PreStart). + // Defensive check kept for clarity and to silence the compiler's + // null-flow analysis. + if (_trackingStore is null) + { + return; + } + + var pending = await _queue + .ReadPendingCachedTelemetryAsync(_options.BatchSize, ct) + .ConfigureAwait(false); + if (pending.Count == 0) + { + nextDelay = TimeSpan.FromSeconds(_options.IdleIntervalSeconds); + return; + } + + var batch = new CachedTelemetryBatch(); + var emittedEventIds = new List(pending.Count); + + foreach (var auditRow in pending) + { + if (auditRow.CorrelationId is null) + { + // CorrelationId carries the TrackedOperationId for cached + // rows — see CachedCallLifecycleBridge.BuildPacket. Without + // it we can't look up the tracking row; log + skip so the + // bad row doesn't block the rest of the batch. The audit + // row stays Pending (still not in emittedEventIds) and + // central reconciliation will pick it up. + _logger.LogWarning( + "Cached-telemetry drain: audit row {EventId} ({Kind}) has no CorrelationId; skipping.", + auditRow.EventId, auditRow.Kind); + continue; + } + + TrackingStatusSnapshot? snapshot; + try + { + snapshot = await _trackingStore + .GetStatusAsync(new TrackedOperationId(auditRow.CorrelationId.Value), ct) + .ConfigureAwait(false); + } + catch (Exception ex) + { + // A tracking-store throw must NOT abort the rest of the + // batch — the audit half is best-effort. Log and skip + // this row; it stays Pending for the next drain. + _logger.LogWarning(ex, + "Cached-telemetry drain: tracking lookup threw for {EventId} (TrackedOperationId {Tid}); skipping.", + auditRow.EventId, auditRow.CorrelationId); + continue; + } + + if (snapshot is null) + { + // No tracking row — possible if the audit row is older + // than the tracking retention window, or the tracking + // store was reset. The audit half remains valid and will + // be picked up by central reconciliation; skip the + // combined push for this row. + _logger.LogWarning( + "Cached-telemetry drain: no tracking snapshot for {EventId} (TrackedOperationId {Tid}); skipping.", + auditRow.EventId, auditRow.CorrelationId); + continue; + } + + var packet = BuildCachedPacket(auditRow, snapshot); + batch.Packets.Add(packet); + emittedEventIds.Add(auditRow.EventId); + } + + if (batch.Packets.Count == 0) + { + // Every row in this read was skipped (no CorrelationId / no + // tracking snapshot). Leave them Pending and try again next + // drain — the underlying race normally resolves on its own. + return; + } + + IngestAck ack; + try + { + ack = await _client.IngestCachedTelemetryAsync(batch, ct) + .ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "IngestCachedTelemetry push failed for {Count} cached events; will retry next drain.", + batch.Packets.Count); + return; + } + + var acceptedIds = ParseAcceptedIds(ack); + if (acceptedIds.Count > 0) + { + await _queue.MarkForwardedAsync(acceptedIds, ct) + .ConfigureAwait(false); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Unexpected error during cached-telemetry drain."); + } + finally + { + if (!_lifecycleCts.IsCancellationRequested && _trackingStore is not null) + { + ScheduleNextCached(nextDelay); + } + } + } + private static AuditEventBatch BuildBatch(IReadOnlyList events) { var batch = new AuditEventBatch(); @@ -176,6 +352,58 @@ public class SiteAuditTelemetryActor : ReceiveActor return batch; } + /// + /// AuditLog-001: build the combined wire packet from one cached audit row + /// + its matching operational tracking snapshot. The operational state + /// reflects the latest tracking row at emission time (not the per-event + /// status the audit row implies) because central's SiteCalls + /// upsert is monotonic — it never rolls back. The audit row preserves + /// per-event lifecycle granularity for the audit trail. + /// + private static CachedTelemetryPacket BuildCachedPacket( + AuditEvent auditRow, TrackingStatusSnapshot snapshot) + { + var sourceSite = auditRow.SourceSiteId ?? string.Empty; + // Channel string form mirrors the AuditChannel-to-string convention used + // by SiteCallOperational + CachedCallLifecycleBridge.BuildPacket. + var channelString = auditRow.Channel.ToString(); + var target = auditRow.Target ?? snapshot.TargetSummary ?? string.Empty; + + var operationalDto = new SiteCallOperationalDto + { + TrackedOperationId = snapshot.Id.Value.ToString("D"), + Channel = channelString, + Target = target, + SourceSite = sourceSite, + SourceNode = snapshot.SourceNode ?? string.Empty, + Status = snapshot.Status, + RetryCount = snapshot.RetryCount, + LastError = snapshot.LastError ?? string.Empty, + CreatedAtUtc = Timestamp.FromDateTime(EnsureUtc(snapshot.CreatedAtUtc)), + UpdatedAtUtc = Timestamp.FromDateTime(EnsureUtc(snapshot.UpdatedAtUtc)), + }; + if (snapshot.HttpStatus.HasValue) + { + operationalDto.HttpStatus = snapshot.HttpStatus.Value; + } + if (snapshot.TerminalAtUtc.HasValue) + { + operationalDto.TerminalAtUtc = + Timestamp.FromDateTime(EnsureUtc(snapshot.TerminalAtUtc.Value)); + } + + return new CachedTelemetryPacket + { + AuditEvent = AuditEventDtoMapper.ToDto(auditRow), + Operational = operationalDto, + }; + } + + private static DateTime EnsureUtc(DateTime value) => + value.Kind == DateTimeKind.Utc + ? value + : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc); + private static IReadOnlyList ParseAcceptedIds(IngestAck ack) { if (ack.AcceptedEventIds.Count == 0) @@ -206,10 +434,31 @@ public class SiteAuditTelemetryActor : ReceiveActor Self); } - /// Self-tick message that triggers a drain cycle. + private void ScheduleNextCached(TimeSpan delay) + { + _pendingCachedTick?.Cancel(); + _pendingCachedTick = Context.System.Scheduler.ScheduleTellOnceCancelable( + delay, + Self, + CachedDrain.Instance, + Self); + } + + /// Self-tick message that triggers an audit-only drain cycle. private sealed class Drain { public static readonly Drain Instance = new(); private Drain() { } } + + /// + /// Self-tick message that triggers a combined-telemetry drain cycle. + /// AuditLog-001: introduced alongside the cached-drain to keep the two + /// paths' cadences independent — a stall on one does not block the other. + /// + private sealed class CachedDrain + { + public static readonly CachedDrain Instance = new(); + private CachedDrain() { } + } } diff --git a/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs b/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs index b3606670..40f55421 100644 --- a/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs +++ b/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs @@ -33,10 +33,52 @@ public interface ISiteAuditQueue /// oldest first. Idempotent — repeated calls before /// will yield the same rows again. /// + /// + /// AuditLog-001: cached-lifecycle s + /// (, + /// , + /// , + /// ) are + /// EXCLUDED from this result — they ride the combined-telemetry drain via + /// + the central + /// OnCachedTelemetryAsync dual-write transaction. The audit-only + /// drain handled by this method covers everything else (sync ApiCall / + /// DbWrite, NotifySend, InboundRequest, etc.). + /// /// Maximum number of rows to return. /// Cancellation token. Task> ReadPendingAsync(int limit, CancellationToken ct = default); + /// + /// AuditLog-001: returns up to rows in + /// + /// whose belongs to the cached-call lifecycle + /// vocabulary (, + /// , + /// , + /// ), + /// oldest first. The site-side SiteAuditTelemetryActor drains these + /// rows separately, joining each with the matching operational tracking row + /// (IOperationTrackingStore.GetStatusAsync) before pushing the + /// combined CachedTelemetryBatch via + /// ISiteStreamAuditClient.IngestCachedTelemetryAsync. Idempotent — + /// repeated calls before yield the same + /// rows again. + /// + /// + /// The two-drain partition is the production wiring of the combined-telemetry + /// transport specified in Component-AuditLog.md §"Cached Operations — + /// Combined Telemetry": cached rows MUST flow with their matching + /// SiteCalls upsert through one MS SQL transaction at central. The + /// pre-AuditLog-001 implementation drained cached rows through the + /// audit-only path, leaving the operational half unsent and the central + /// dual-write handler unreachable. Returning them via this dedicated read + /// surface lets the new drain join with the tracking store before push. + /// + /// Maximum number of rows to return. + /// Cancellation token. + Task> ReadPendingCachedTelemetryAsync(int limit, CancellationToken ct = default); + /// /// Flips the supplied EventIds from /// to diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 5832e0d7..3473152b 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -796,17 +796,29 @@ akka {{ var siteAuditLogger = _serviceProvider.GetRequiredService() .CreateLogger(); + // AuditLog-001: resolve the site-local operation tracking store so the + // actor can run the combined-telemetry cached-drain in parallel with + // the audit-only drain. The store is registered by AddSiteRuntime on + // site composition roots; on central it is intentionally absent and + // the cached-drain scheduler is never armed (the central side has no + // outbound cached calls to track). GetService — null when not + // registered — matches the optional-param contract on the actor ctor. + var siteTrackingStore = _serviceProvider + .GetService(); + var siteAuditTelemetryProps = Props.Create(() => new ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor( siteAuditQueue, siteAuditClient, siteAuditOptions, - siteAuditLogger)) + siteAuditLogger, + siteTrackingStore)) .WithDispatcher("audit-telemetry-dispatcher"); _actorSystem.ActorOf(siteAuditTelemetryProps, "site-audit-telemetry"); _logger.LogInformation( - "SiteAuditTelemetryActor created (dispatcher=audit-telemetry-dispatcher, client={ClientType})", - siteAuditClient.GetType().Name); + "SiteAuditTelemetryActor created (dispatcher=audit-telemetry-dispatcher, client={ClientType}, cachedDrain={CachedDrainEnabled})", + siteAuditClient.GetType().Name, + siteTrackingStore is not null); // Gate gRPC subscriptions until the actor system and SiteStreamManager are // initialized (REQ-HOST-7). diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs index b1d08d85..6bbdf3ed 100644 --- a/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs @@ -495,14 +495,20 @@ public class ParentExecutionIdCorrelationTests : TestKit, IClassFixture { var pending = await sqliteWriter.ReadPendingAsync(256); + // AuditLog-001: ReadPendingAsync now excludes the cached-lifecycle + // kinds (they ride the combined-telemetry drain), so we union + // them in via the dedicated read surface to keep the durability + // assertion covering EVERY expected Kind. + var pendingCached = await sqliteWriter.ReadPendingCachedTelemetryAsync(256); var forwarded = await sqliteWriter.ReadForwardedAsync(256); - var kinds = pending.Concat(forwarded).Select(r => r.Kind).ToHashSet(); + var kinds = pending.Concat(pendingCached).Concat(forwarded) + .Select(r => r.Kind).ToHashSet(); var missing = expectedKinds.Where(k => !kinds.Contains(k)).ToList(); Assert.True( missing.Count == 0, "Expected every routed-run audit Kind durably in SQLite; missing: " + string.Join(", ", missing) - + $" (saw {pending.Count} Pending + {forwarded.Count} Forwarded)."); + + $" (saw {pending.Count} Pending + {pendingCached.Count} PendingCached + {forwarded.Count} Forwarded)."); }, TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(50)); diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs index 8d5d555b..a7e48564 100644 --- a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs @@ -7,7 +7,9 @@ using NSubstitute; using NSubstitute.ExceptionExtensions; using ScadaLink.AuditLog.Site.Telemetry; using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces; using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; using ScadaLink.Communication.Grpc; @@ -24,6 +26,7 @@ public class SiteAuditTelemetryActorTests : TestKit { private readonly ISiteAuditQueue _queue = Substitute.For(); private readonly ISiteStreamAuditClient _client = Substitute.For(); + private readonly IOperationTrackingStore _trackingStore = Substitute.For(); /// /// Fast options so tests don't stall waiting for the scheduler. 1s busy / @@ -46,7 +49,22 @@ public class SiteAuditTelemetryActorTests : TestKit _queue, _client, options ?? Opts(), - NullLogger.Instance))); + NullLogger.Instance, + (IOperationTrackingStore?)null))); + + /// + /// AuditLog-001: builds an actor with the optional + /// wired in so the cached-drain + /// scheduler is armed alongside the audit-only drain. Used by the new + /// cached-drain regression tests below. + /// + private IActorRef CreateActorWithCachedDrain(IOptions? options = null) => + Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor( + _queue, + _client, + options ?? Opts(), + NullLogger.Instance, + (IOperationTrackingStore?)_trackingStore))); private static AuditEvent NewEvent(Guid? id = null) => new() { @@ -233,4 +251,206 @@ public class SiteAuditTelemetryActorTests : TestKit Arg.Is>(g => g.Count == 3 && g.ToHashSet().SetEquals(ackedSet)), Arg.Any()); } + + // ──────────────────────────────────────────────────────────────────────── + // AuditLog-001: combined-telemetry cached-drain regression tests. Verify + // that the production wiring of the previously-unreachable cached transport + // routes cached rows through ReadPendingCachedTelemetryAsync + + // IngestCachedTelemetryAsync (and NOT IngestAuditEventsAsync), and that + // orphaned audit rows (no tracking snapshot) are logged + skipped rather + // than crashing the drain. + // ──────────────────────────────────────────────────────────────────────── + + private static AuditEvent NewCachedEvent( + AuditKind kind = AuditKind.CachedSubmit, + Guid? eventId = null, + Guid? correlationId = null, + string sourceSiteId = "site-1") => new() + { + EventId = eventId ?? Guid.NewGuid(), + OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), + Channel = AuditChannel.ApiOutbound, + Kind = kind, + Status = AuditStatus.Submitted, + SourceSiteId = sourceSiteId, + Target = "ERP.GetOrder", + CorrelationId = correlationId ?? Guid.NewGuid(), + ForwardState = AuditForwardState.Pending, + }; + + private static TrackingStatusSnapshot NewSnapshot( + TrackedOperationId id, + string status = "Submitted", + int retryCount = 0) => new( + Id: id, + Kind: nameof(AuditKind.ApiCallCached), + TargetSummary: "ERP.GetOrder", + Status: status, + RetryCount: retryCount, + LastError: null, + HttpStatus: null, + CreatedAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), + UpdatedAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), + TerminalAtUtc: null, + SourceInstanceId: "instance-1", + SourceScript: "script-1", + SourceNode: "node-a"); + + [Fact] + public async Task CachedDrain_CachedRows_RouteThrough_IngestCachedTelemetry_NotIngestAuditEvents() + { + // Arrange — three cached audit rows on the cached queue, each with a + // matching tracking snapshot. The audit-only queue is empty (those + // rows are excluded by ReadPendingAsync after AuditLog-001). + var cachedRows = new[] + { + NewCachedEvent(AuditKind.CachedSubmit), + NewCachedEvent(AuditKind.ApiCallCached), + NewCachedEvent(AuditKind.CachedResolve), + }; + + _queue.ReadPendingAsync(Arg.Any(), Arg.Any()) + .Returns(Task.FromResult>(Array.Empty())); + _queue.ReadPendingCachedTelemetryAsync(Arg.Any(), Arg.Any()) + .Returns( + Task.FromResult>(cachedRows), + Task.FromResult>(Array.Empty())); + + foreach (var row in cachedRows) + { + var tid = new TrackedOperationId(row.CorrelationId!.Value); + _trackingStore.GetStatusAsync(tid, Arg.Any()) + .Returns(Task.FromResult(NewSnapshot(tid))); + } + + CachedTelemetryBatch? capturedBatch = null; + _client.IngestCachedTelemetryAsync(Arg.Any(), Arg.Any()) + .Returns(call => + { + capturedBatch = call.Arg(); + var ack = new IngestAck(); + foreach (var packet in capturedBatch.Packets) + { + ack.AcceptedEventIds.Add(packet.AuditEvent.EventId); + } + return Task.FromResult(ack); + }); + + // Act + CreateActorWithCachedDrain(); + + // Assert — exactly one IngestCachedTelemetryAsync push containing all + // three packets, and zero IngestAuditEventsAsync pushes (the audit-only + // drain saw an empty queue). + await AwaitAssertAsync(async () => + { + await _client.Received(1).IngestCachedTelemetryAsync( + Arg.Any(), Arg.Any()); + await _queue.Received(1).MarkForwardedAsync( + Arg.Is>(g => g.Count == 3), Arg.Any()); + }, TimeSpan.FromSeconds(5)); + + Assert.NotNull(capturedBatch); + Assert.Equal(3, capturedBatch!.Packets.Count); + + await _client.DidNotReceiveWithAnyArgs().IngestAuditEventsAsync(default!, default); + + var emittedEventIds = capturedBatch.Packets + .Select(p => Guid.Parse(p.AuditEvent.EventId)) + .ToHashSet(); + var expectedIds = cachedRows.Select(r => r.EventId).ToHashSet(); + Assert.Equal(expectedIds, emittedEventIds); + } + + [Fact] + public async Task CachedDrain_OrphanRow_NoTrackingSnapshot_IsSkipped_DoesNotCrash() + { + // Arrange — two cached audit rows: one with a tracking snapshot, one + // orphaned (the tracking store returns null). The orphaned row must be + // skipped without aborting the batch — the valid row still flows. + var orphan = NewCachedEvent(AuditKind.CachedSubmit); + var valid = NewCachedEvent(AuditKind.CachedResolve); + + _queue.ReadPendingAsync(Arg.Any(), Arg.Any()) + .Returns(Task.FromResult>(Array.Empty())); + _queue.ReadPendingCachedTelemetryAsync(Arg.Any(), Arg.Any()) + .Returns( + Task.FromResult>(new[] { orphan, valid }), + Task.FromResult>(Array.Empty())); + + // orphan: tracking returns null + _trackingStore.GetStatusAsync( + new TrackedOperationId(orphan.CorrelationId!.Value), + Arg.Any()) + .Returns(Task.FromResult(null)); + // valid: tracking returns a snapshot + var validTid = new TrackedOperationId(valid.CorrelationId!.Value); + _trackingStore.GetStatusAsync(validTid, Arg.Any()) + .Returns(Task.FromResult(NewSnapshot(validTid, "Delivered"))); + + CachedTelemetryBatch? capturedBatch = null; + _client.IngestCachedTelemetryAsync(Arg.Any(), Arg.Any()) + .Returns(call => + { + capturedBatch = call.Arg(); + var ack = new IngestAck(); + foreach (var packet in capturedBatch.Packets) + { + ack.AcceptedEventIds.Add(packet.AuditEvent.EventId); + } + return Task.FromResult(ack); + }); + + // Act + CreateActorWithCachedDrain(); + + // Assert — exactly one push containing ONLY the valid row; the orphan + // is skipped and stays Pending (not in MarkForwardedAsync's id list). + await AwaitAssertAsync(async () => + { + await _client.Received(1).IngestCachedTelemetryAsync( + Arg.Any(), Arg.Any()); + }, TimeSpan.FromSeconds(5)); + + Assert.NotNull(capturedBatch); + Assert.Single(capturedBatch!.Packets); + Assert.Equal(valid.EventId.ToString(), capturedBatch.Packets[0].AuditEvent.EventId); + + await _queue.Received(1).MarkForwardedAsync( + Arg.Is>(g => g.Count == 1 && g[0] == valid.EventId), + Arg.Any()); + } + + [Fact] + public async Task AuditOnlyDrain_StillFlows_When_CachedDrain_IsDisabled() + { + // Arrange — ordinary (non-cached) audit rows on the audit-only queue; + // the actor is constructed WITHOUT a tracking store so the cached + // scheduler is never armed. Regression guard against the audit-only + // drain regressing during the AuditLog-001 refactor. + var rows = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList(); + _queue.ReadPendingAsync(Arg.Any(), Arg.Any()) + .Returns( + Task.FromResult>(rows), + Task.FromResult>(Array.Empty())); + + _client.IngestAuditEventsAsync(Arg.Any(), Arg.Any()) + .Returns(_ => Task.FromResult(AckAll(rows))); + + // Act — note: CreateActor (no tracking store), not CreateActorWithCachedDrain. + CreateActor(); + + // Assert — audit-only drain flows normally; the cached client is + // never called and ReadPendingCachedTelemetryAsync is never queried. + await AwaitAssertAsync(async () => + { + await _client.Received(1).IngestAuditEventsAsync( + Arg.Any(), Arg.Any()); + await _queue.Received(1).MarkForwardedAsync( + Arg.Is>(g => g.Count == 3), Arg.Any()); + }, TimeSpan.FromSeconds(5)); + + await _client.DidNotReceiveWithAnyArgs().IngestCachedTelemetryAsync(default!, default); + await _queue.DidNotReceiveWithAnyArgs().ReadPendingCachedTelemetryAsync(default, default); + } }