diff --git a/code-reviews/AuditLog/findings.md b/code-reviews/AuditLog/findings.md
index e7ab7f8c..ba89a51d 100644
--- a/code-reviews/AuditLog/findings.md
+++ b/code-reviews/AuditLog/findings.md
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-28 |
| Reviewer | claude-agent |
| Commit reviewed | `1eb6e97` |
-| Open findings | 1 |
+| Open findings | 0 |
## Summary
@@ -65,7 +65,7 @@ chain doesn't reject a central composition root that mistakenly calls the site b
|--|--|
| Severity | Medium |
| Category | Design-document adherence |
-| Status | Open |
+| Status | Resolved |
| Location | `src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs:45`, `src/ScadaLink.AuditLog/Site/Telemetry/ClusterClientSiteAuditClient.cs:86`, `src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs:198` |
**Description**
@@ -101,9 +101,74 @@ unreachable `OnCachedTelemetryAsync` dual-write code (after confirming the
`AuditLogIngestActorCombinedTelemetryTests` integration tests exercise it via direct
actor injection only).
-**Resolution**
+**Resolution (2026-05-28):**
-_Unresolved._
+Wired the combined-telemetry transport end-to-end via recommendation (a). The
+previously-unreachable `IngestCachedTelemetryAsync` client path now carries
+cached-call lifecycle rows from the site SQLite hot-path through to the central
+`AuditLogIngestActor.OnCachedTelemetryAsync` dual-write transaction. Changes:
+
+- **`ISiteAuditQueue`** (`src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs`):
+ added `ReadPendingCachedTelemetryAsync(int, CancellationToken)` returning
+ rows in `AuditForwardState.Pending` whose `Kind` is one of `CachedSubmit`,
+ `ApiCallCached`, `DbWriteCached`, `CachedResolve`. Updated `ReadPendingAsync`
+ XML doc to call out the partition.
+- **`SqliteAuditWriter`** (`src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs`):
+ implemented `ReadPendingCachedTelemetryAsync` with a `Kind IN (...)` filter
+ reusing the existing `_readConnection` / `_readLock` decoupling; modified
+ `ReadPendingAsync` to add the symmetric `Kind NOT IN (...)` predicate so the
+ audit-only drain no longer double-emits cached rows.
+- **`SiteAuditTelemetryActor`** (`src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs`):
+ added an optional `IOperationTrackingStore?` constructor parameter, a sibling
+ `CachedDrain` self-tick message, and an `OnCachedDrainAsync` handler running
+ in parallel with the existing audit-only drain. The cached-drain reads the
+ partitioned audit rows, joins each with the matching tracking-store
+ snapshot (looked up by `TrackedOperationId` via `CorrelationId`), builds a
+ `CachedTelemetryBatch`, pushes via `IngestCachedTelemetryAsync`, and marks
+ ack'd EventIds Forwarded. Orphan rows (no matching tracking snapshot, or a
+ thrown tracking-store call) are logged + skipped so the bad row never
+ blocks the rest of the batch; rows stay Pending and reconciliation /
+ retention handles them. The lifecycle CTS (AuditLog-010) gates both drains
+ uniformly.
+- **`AkkaHostedService`** (`src/ScadaLink.Host/Actors/AkkaHostedService.cs`):
+ resolves `IOperationTrackingStore` via `GetService` (site-only registration)
+ and threads it through the actor's `Props.Create`. Central composition
+ roots and tests that don't register the tracking store get the legacy
+ audit-only behaviour — the cached scheduler is never armed.
+- **Tests** (`tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs`):
+ added three regression tests asserting (1) cached rows route through
+ `IngestCachedTelemetryAsync` and NOT `IngestAuditEventsAsync`, (2) an
+ orphan row with no tracking snapshot is logged + skipped without crashing
+ the drain, (3) the audit-only drain still flows when the cached drain is
+ disabled (null tracking store). Updated `WaitForSiteRowsPersistedAsync` in
+ `ParentExecutionIdCorrelationTests` to union `ReadPendingCachedTelemetryAsync`
+ into the durability check — its `ReadPendingAsync(256) ∪ ReadForwardedAsync(256)`
+ assertion previously missed the cached kinds after the partition change.
+
+**Design notes / caveats.**
+
+- *Operational state at emission time is the latest tracking row, not the
+ per-event status.* The original spec described one combined packet per
+ lifecycle event, but the production wiring keeps the existing
+ `CachedCallTelemetryForwarder` dual-write (audit + tracking) and uses the
+ drain as a join. Central's `SiteCalls` upsert is monotonic so this is
+ consistent with the broader design — the audit row preserves per-event
+ granularity, the SiteCalls mirror reflects "most recent known" state.
+- *Test-only `CombinedTelemetryDispatcher` wire push is now redundant but
+ harmless.* The dispatcher's manual `IngestCachedTelemetryAsync` call in
+ `CombinedTelemetryHarness` / `ParentExecutionIdCorrelationTests` still
+ executes; central's idempotent `InsertIfNotExistsAsync` swallows the
+ duplicate so it's a no-op. Removing it is a separate clean-up.
+- *Per-actor cancellation gates both drains.* The lifecycle CTS (AuditLog-010)
+ is shared so `PostStop` cancels in-flight cached lookups + pushes at the
+ same instant as audit-only drains.
+
+Build: `dotnet build ScadaLink.slnx` — 0 warnings, 0 errors.
+Tests: `dotnet test tests/ScadaLink.AuditLog.Tests` — 250 passed, 1 failed
+(`PartitionPurgeTests.EndToEnd_OldestPartition_PurgedViaActor_NewerKept` —
+pre-existing MS-SQL date-sensitive flake, called out in the prompt as
+acceptable). `dotnet test tests/ScadaLink.SiteRuntime.Tests` — all 302
+passed.
### AuditLog-002 — `SupervisorStrategy` comments claim Resume semantics but code returns the default Restart decider
diff --git a/code-reviews/README.md b/code-reviews/README.md
index 8d8a1b67..d4918f75 100644
--- a/code-reviews/README.md
+++ b/code-reviews/README.md
@@ -41,15 +41,15 @@ module file and counted in **Total**.
|----------|---------------|
| Critical | 0 |
| High | 0 |
-| Medium | 1 |
+| Medium | 0 |
| Low | 0 |
-| **Total** | **1** |
+| **Total** | **0** |
## Module Status
| Module | Last reviewed | Commit | Open (C/H/M/L) | Open | Total |
|--------|---------------|--------|----------------|------|-------|
-| [AuditLog](AuditLog/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/1/0 | 1 | 11 |
+| [AuditLog](AuditLog/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 11 |
| [CLI](CLI/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 23 |
| [CentralUI](CentralUI/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 33 |
| [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-28 | `1eb6e97` | 0/0/0/0 | 0 | 14 |
@@ -88,11 +88,9 @@ _None open._
_None open._
-### Medium (1)
+### Medium (0)
-| ID | Module | Title |
-|----|--------|-------|
-| AuditLog-001 | [AuditLog](AuditLog/findings.md) | Combined-telemetry transport is plumbed end-to-end but never invoked in production |
+_None open._
### Low (0)
diff --git a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
index 7cd71470..dac90b7b 100644
--- a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
+++ b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
@@ -424,6 +424,20 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
}
}
+ // AuditLog-001: cached-lifecycle audit kinds that ride the combined-telemetry
+ // drain (joined with the operational tracking row + pushed via
+ // IngestCachedTelemetryAsync into the central dual-write transaction).
+ // ReadPendingAsync EXCLUDES these so the audit-only drain doesn't double-emit
+ // them; ReadPendingCachedTelemetryAsync below is the dedicated read surface
+ // the new SiteAuditTelemetryActor cached-drain uses.
+ private static readonly string[] CachedTelemetryKindNames =
+ {
+ nameof(AuditKind.CachedSubmit),
+ nameof(AuditKind.ApiCallCached),
+ nameof(AuditKind.DbWriteCached),
+ nameof(AuditKind.CachedResolve),
+ };
+
///
public Task> ReadPendingAsync(int limit, CancellationToken ct = default)
{
@@ -439,6 +453,11 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
// writer connection. _readLock serialises this connection across
// multiple concurrent read callers since SqliteConnection itself is
// not thread-safe.
+ // AuditLog-001: NOT IN ($cached1,$cached2,$cached3,$cached4) excludes the
+ // cached-lifecycle kinds — they flow through ReadPendingCachedTelemetryAsync
+ // + the combined-telemetry drain. Kind is stored as the enum's name (see
+ // FlushBatch's pKind.Value), so a string-IN against the constant kind
+ // names matches the on-disk shape exactly.
lock (_readLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
@@ -452,10 +471,63 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
ExecutionId, ParentExecutionId
FROM AuditLog
WHERE ForwardState = $pending
+ AND Kind NOT IN ($k0, $k1, $k2, $k3)
ORDER BY OccurredAtUtc ASC, EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
+ cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]);
+ cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]);
+ cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]);
+ cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]);
+ cmd.Parameters.AddWithValue("$limit", limit);
+
+ var rows = new List(Math.Min(limit, 256));
+ using var reader = cmd.ExecuteReader();
+ while (reader.Read())
+ {
+ rows.Add(MapRow(reader));
+ }
+
+ return Task.FromResult>(rows);
+ }
+ }
+
+ ///
+ public Task> ReadPendingCachedTelemetryAsync(
+ int limit, CancellationToken ct = default)
+ {
+ if (limit <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
+ }
+
+ // AuditLog-001: dedicated read surface for the cached-call lifecycle
+ // drain — symmetric to ReadPendingAsync but filtered to the four
+ // cached AuditKinds. Same _readConnection + _readLock pattern so the
+ // hot-path writer is not contended.
+ lock (_readLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ using var cmd = _readConnection.CreateCommand();
+ cmd.CommandText = """
+ SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
+ SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target,
+ Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
+ RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState,
+ ExecutionId, ParentExecutionId
+ FROM AuditLog
+ WHERE ForwardState = $pending
+ AND Kind IN ($k0, $k1, $k2, $k3)
+ ORDER BY OccurredAtUtc ASC, EventId ASC
+ LIMIT $limit;
+ """;
+ cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
+ cmd.Parameters.AddWithValue("$k0", CachedTelemetryKindNames[0]);
+ cmd.Parameters.AddWithValue("$k1", CachedTelemetryKindNames[1]);
+ cmd.Parameters.AddWithValue("$k2", CachedTelemetryKindNames[2]);
+ cmd.Parameters.AddWithValue("$k3", CachedTelemetryKindNames[3]);
cmd.Parameters.AddWithValue("$limit", limit);
var rows = new List(Math.Min(limit, 256));
diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs
index 3b10c8cf..6f50ea66 100644
--- a/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs
+++ b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs
@@ -1,52 +1,81 @@
using Akka.Actor;
+using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces;
using ScadaLink.Commons.Interfaces.Services;
+using ScadaLink.Commons.Types;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Site.Telemetry;
///
/// Site-side actor that drains the local SQLite audit queue and pushes Pending
-/// rows to central via the IngestAuditEvents gRPC RPC. On a successful
-/// ack the matching EventIds flip to
-/// ; on
-/// a gRPC failure the rows stay Pending and the next drain retries.
+/// rows to central via two parallel transports:
+///
+/// - IngestAuditEvents for the audit-only path —
+/// sync ApiCall/DbWrite, NotifySend, InboundRequest and similar single-row
+/// lifecycle events.
+/// - IngestCachedTelemetry for the combined-telemetry
+/// path — cached-call lifecycle rows (CachedSubmit,
+/// ApiCallCached/DbWriteCached, CachedResolve) joined
+/// with the matching OperationTracking row, written at central as a
+/// single dual-write transaction (AuditLog + SiteCalls).
+///
///
///
///
-/// The drain self-tick is a private Drain message scheduled via the
-/// actor system scheduler. The cadence is options-driven: BusyIntervalSeconds
-/// when the previous drain found rows (or faulted — we want quick recovery),
-/// IdleIntervalSeconds when the queue was empty.
+/// The drain self-ticks via two private messages — Drain for the
+/// audit-only path and CachedDrain for the combined path — each
+/// scheduled independently. Cadence is options-driven:
+/// BusyIntervalSeconds when the previous drain found rows (or faulted —
+/// we want quick recovery), IdleIntervalSeconds when the queue was empty.
+/// The two drains share the same cadence configuration but advance their own
+/// timers so a stall on one path does not block the other.
///
///
-/// Both collaborators are injected as interfaces (
-/// and ) so unit tests substitute with
-/// NSubstitute and never touch real SQLite or gRPC.
+/// Collaborators are injected as interfaces (,
+/// , optional
+/// ) so unit tests substitute with
+/// NSubstitute and never touch real SQLite or gRPC. The
+/// is optional — central composition
+/// roots and tests that don't exercise the cached path can leave it null, in
+/// which case the cached-drain scheduler is never armed.
///
///
/// Per Bundle D's brief, audit-write paths must be fail-safe — a thrown
-/// exception inside the actor MUST NOT crash it. The Drain handler wraps the
-/// pipeline in a top-level try/catch that logs and re-schedules, and the
+/// exception inside the actor MUST NOT crash it. Both Drain handlers wrap
+/// their pipelines in a top-level try/catch that logs and re-schedules; the
/// actor's defaults to
/// 's Restart for
-/// child actors — but this actor has no children, so the catch is what matters.
+/// child actors — but this actor has no children, so the catch is what
+/// matters.
+///
+///
+/// AuditLog-001: wires the previously-unreachable combined-telemetry transport.
+/// Prior to this the cached audit rows flowed through the audit-only drain via
+/// IngestAuditEventsAsync and the central OnCachedTelemetryAsync
+/// dual-write handler was dead production code; the operational SiteCalls
+/// half was never sent to central.
///
///
public class SiteAuditTelemetryActor : ReceiveActor
{
private readonly ISiteAuditQueue _queue;
private readonly ISiteStreamAuditClient _client;
+ private readonly IOperationTrackingStore? _trackingStore;
private readonly SiteAuditTelemetryOptions _options;
private readonly ILogger _logger;
private ICancelable? _pendingTick;
+ private ICancelable? _pendingCachedTick;
// AuditLog-010: per-actor lifecycle CTS so an in-flight drain (queue read,
// gRPC push, mark-forwarded write) is actually cancelled when the actor is
// stopped — without it, a stuck IngestAuditEventsAsync would hold the
// continuation through CoordinatedShutdown's actor-system terminate window.
// Cancelled in PostStop; never reset (the actor is single-lifetime).
+ // The same CTS gates the cached-drain pipeline (queue read + tracking
+ // lookup + gRPC push) so both paths observe shutdown cooperatively.
private readonly CancellationTokenSource _lifecycleCts = new();
/// Initializes the actor with its drain queue, gRPC client, options, and logger.
@@ -54,11 +83,19 @@ public class SiteAuditTelemetryActor : ReceiveActor
/// The gRPC client used to push audit events to central.
/// Telemetry options controlling drain intervals and batch size.
/// Logger instance.
+ ///
+ /// Optional site-local operation tracking store. When supplied the actor
+ /// runs the combined-telemetry cached-drain in parallel with the audit-only
+ /// drain; when null (central composition roots, tests that don't exercise
+ /// cached calls) the cached scheduler is never armed and only the
+ /// audit-only drain runs.
+ ///
public SiteAuditTelemetryActor(
ISiteAuditQueue queue,
ISiteStreamAuditClient client,
IOptions options,
- ILogger logger)
+ ILogger logger,
+ IOperationTrackingStore? trackingStore = null)
{
ArgumentNullException.ThrowIfNull(queue);
ArgumentNullException.ThrowIfNull(client);
@@ -69,24 +106,31 @@ public class SiteAuditTelemetryActor : ReceiveActor
_client = client;
_options = options.Value;
_logger = logger;
+ _trackingStore = trackingStore;
ReceiveAsync(_ => OnDrainAsync());
+ ReceiveAsync(_ => OnCachedDrainAsync());
}
///
protected override void PreStart()
{
base.PreStart();
- // Initial tick fires on the busy interval so the actor starts polling
+ // Initial ticks fire on the busy interval so both drains start polling
// soon after host startup. A subsequent empty drain will move to the
// idle interval naturally.
ScheduleNext(TimeSpan.FromSeconds(_options.BusyIntervalSeconds));
+ if (_trackingStore is not null)
+ {
+ ScheduleNextCached(TimeSpan.FromSeconds(_options.BusyIntervalSeconds));
+ }
}
///
protected override void PostStop()
{
_pendingTick?.Cancel();
+ _pendingCachedTick?.Cancel();
// AuditLog-010: cancel any in-flight drain so a stuck queue read or
// gRPC push does not hold the continuation past actor stop.
try
@@ -166,6 +210,138 @@ public class SiteAuditTelemetryActor : ReceiveActor
}
}
+ ///
+ /// AuditLog-001: combined-telemetry drain. Reads cached-lifecycle audit
+ /// rows, joins each with the matching
+ /// snapshot, builds a , and pushes via
+ /// . Rows
+ /// whose tracking snapshot is missing (race with retention purge / late
+ /// audit row) are logged + skipped — the operational half will be
+ /// re-emitted on the next lifecycle event, and the audit row stays
+ /// so a later
+ /// drain (or reconciliation pull) can revisit it.
+ ///
+ private async Task OnCachedDrainAsync()
+ {
+ var nextDelay = TimeSpan.FromSeconds(_options.BusyIntervalSeconds);
+ var ct = _lifecycleCts.Token;
+ try
+ {
+ // _trackingStore is non-null by construction here — the cached
+ // scheduler is only armed when it was supplied (see PreStart).
+ // Defensive check kept for clarity and to silence the compiler's
+ // null-flow analysis.
+ if (_trackingStore is null)
+ {
+ return;
+ }
+
+ var pending = await _queue
+ .ReadPendingCachedTelemetryAsync(_options.BatchSize, ct)
+ .ConfigureAwait(false);
+ if (pending.Count == 0)
+ {
+ nextDelay = TimeSpan.FromSeconds(_options.IdleIntervalSeconds);
+ return;
+ }
+
+ var batch = new CachedTelemetryBatch();
+ var emittedEventIds = new List(pending.Count);
+
+ foreach (var auditRow in pending)
+ {
+ if (auditRow.CorrelationId is null)
+ {
+ // CorrelationId carries the TrackedOperationId for cached
+ // rows — see CachedCallLifecycleBridge.BuildPacket. Without
+ // it we can't look up the tracking row; log + skip so the
+ // bad row doesn't block the rest of the batch. The audit
+ // row stays Pending (still not in emittedEventIds) and
+ // central reconciliation will pick it up.
+ _logger.LogWarning(
+ "Cached-telemetry drain: audit row {EventId} ({Kind}) has no CorrelationId; skipping.",
+ auditRow.EventId, auditRow.Kind);
+ continue;
+ }
+
+ TrackingStatusSnapshot? snapshot;
+ try
+ {
+ snapshot = await _trackingStore
+ .GetStatusAsync(new TrackedOperationId(auditRow.CorrelationId.Value), ct)
+ .ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ // A tracking-store throw must NOT abort the rest of the
+ // batch — the audit half is best-effort. Log and skip
+ // this row; it stays Pending for the next drain.
+ _logger.LogWarning(ex,
+ "Cached-telemetry drain: tracking lookup threw for {EventId} (TrackedOperationId {Tid}); skipping.",
+ auditRow.EventId, auditRow.CorrelationId);
+ continue;
+ }
+
+ if (snapshot is null)
+ {
+ // No tracking row — possible if the audit row is older
+ // than the tracking retention window, or the tracking
+ // store was reset. The audit half remains valid and will
+ // be picked up by central reconciliation; skip the
+ // combined push for this row.
+ _logger.LogWarning(
+ "Cached-telemetry drain: no tracking snapshot for {EventId} (TrackedOperationId {Tid}); skipping.",
+ auditRow.EventId, auditRow.CorrelationId);
+ continue;
+ }
+
+ var packet = BuildCachedPacket(auditRow, snapshot);
+ batch.Packets.Add(packet);
+ emittedEventIds.Add(auditRow.EventId);
+ }
+
+ if (batch.Packets.Count == 0)
+ {
+ // Every row in this read was skipped (no CorrelationId / no
+ // tracking snapshot). Leave them Pending and try again next
+ // drain — the underlying race normally resolves on its own.
+ return;
+ }
+
+ IngestAck ack;
+ try
+ {
+ ack = await _client.IngestCachedTelemetryAsync(batch, ct)
+ .ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "IngestCachedTelemetry push failed for {Count} cached events; will retry next drain.",
+ batch.Packets.Count);
+ return;
+ }
+
+ var acceptedIds = ParseAcceptedIds(ack);
+ if (acceptedIds.Count > 0)
+ {
+ await _queue.MarkForwardedAsync(acceptedIds, ct)
+ .ConfigureAwait(false);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Unexpected error during cached-telemetry drain.");
+ }
+ finally
+ {
+ if (!_lifecycleCts.IsCancellationRequested && _trackingStore is not null)
+ {
+ ScheduleNextCached(nextDelay);
+ }
+ }
+ }
+
private static AuditEventBatch BuildBatch(IReadOnlyList events)
{
var batch = new AuditEventBatch();
@@ -176,6 +352,58 @@ public class SiteAuditTelemetryActor : ReceiveActor
return batch;
}
+ ///
+ /// AuditLog-001: build the combined wire packet from one cached audit row
+ /// + its matching operational tracking snapshot. The operational state
+ /// reflects the latest tracking row at emission time (not the per-event
+ /// status the audit row implies) because central's SiteCalls
+ /// upsert is monotonic — it never rolls back. The audit row preserves
+ /// per-event lifecycle granularity for the audit trail.
+ ///
+ private static CachedTelemetryPacket BuildCachedPacket(
+ AuditEvent auditRow, TrackingStatusSnapshot snapshot)
+ {
+ var sourceSite = auditRow.SourceSiteId ?? string.Empty;
+ // Channel string form mirrors the AuditChannel-to-string convention used
+ // by SiteCallOperational + CachedCallLifecycleBridge.BuildPacket.
+ var channelString = auditRow.Channel.ToString();
+ var target = auditRow.Target ?? snapshot.TargetSummary ?? string.Empty;
+
+ var operationalDto = new SiteCallOperationalDto
+ {
+ TrackedOperationId = snapshot.Id.Value.ToString("D"),
+ Channel = channelString,
+ Target = target,
+ SourceSite = sourceSite,
+ SourceNode = snapshot.SourceNode ?? string.Empty,
+ Status = snapshot.Status,
+ RetryCount = snapshot.RetryCount,
+ LastError = snapshot.LastError ?? string.Empty,
+ CreatedAtUtc = Timestamp.FromDateTime(EnsureUtc(snapshot.CreatedAtUtc)),
+ UpdatedAtUtc = Timestamp.FromDateTime(EnsureUtc(snapshot.UpdatedAtUtc)),
+ };
+ if (snapshot.HttpStatus.HasValue)
+ {
+ operationalDto.HttpStatus = snapshot.HttpStatus.Value;
+ }
+ if (snapshot.TerminalAtUtc.HasValue)
+ {
+ operationalDto.TerminalAtUtc =
+ Timestamp.FromDateTime(EnsureUtc(snapshot.TerminalAtUtc.Value));
+ }
+
+ return new CachedTelemetryPacket
+ {
+ AuditEvent = AuditEventDtoMapper.ToDto(auditRow),
+ Operational = operationalDto,
+ };
+ }
+
+ private static DateTime EnsureUtc(DateTime value) =>
+ value.Kind == DateTimeKind.Utc
+ ? value
+ : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
+
private static IReadOnlyList ParseAcceptedIds(IngestAck ack)
{
if (ack.AcceptedEventIds.Count == 0)
@@ -206,10 +434,31 @@ public class SiteAuditTelemetryActor : ReceiveActor
Self);
}
- /// Self-tick message that triggers a drain cycle.
+ private void ScheduleNextCached(TimeSpan delay)
+ {
+ _pendingCachedTick?.Cancel();
+ _pendingCachedTick = Context.System.Scheduler.ScheduleTellOnceCancelable(
+ delay,
+ Self,
+ CachedDrain.Instance,
+ Self);
+ }
+
+ /// Self-tick message that triggers an audit-only drain cycle.
private sealed class Drain
{
public static readonly Drain Instance = new();
private Drain() { }
}
+
+ ///
+ /// Self-tick message that triggers a combined-telemetry drain cycle.
+ /// AuditLog-001: introduced alongside the cached-drain to keep the two
+ /// paths' cadences independent — a stall on one does not block the other.
+ ///
+ private sealed class CachedDrain
+ {
+ public static readonly CachedDrain Instance = new();
+ private CachedDrain() { }
+ }
}
diff --git a/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs b/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs
index b3606670..40f55421 100644
--- a/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs
+++ b/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs
@@ -33,10 +33,52 @@ public interface ISiteAuditQueue
/// oldest first. Idempotent — repeated calls before
/// will yield the same rows again.
///
+ ///
+ /// AuditLog-001: cached-lifecycle s
+ /// (,
+ /// ,
+ /// ,
+ /// ) are
+ /// EXCLUDED from this result — they ride the combined-telemetry drain via
+ /// + the central
+ /// OnCachedTelemetryAsync dual-write transaction. The audit-only
+ /// drain handled by this method covers everything else (sync ApiCall /
+ /// DbWrite, NotifySend, InboundRequest, etc.).
+ ///
/// Maximum number of rows to return.
/// Cancellation token.
Task> ReadPendingAsync(int limit, CancellationToken ct = default);
+ ///
+ /// AuditLog-001: returns up to rows in
+ ///
+ /// whose belongs to the cached-call lifecycle
+ /// vocabulary (,
+ /// ,
+ /// ,
+ /// ),
+ /// oldest first. The site-side SiteAuditTelemetryActor drains these
+ /// rows separately, joining each with the matching operational tracking row
+ /// (IOperationTrackingStore.GetStatusAsync) before pushing the
+ /// combined CachedTelemetryBatch via
+ /// ISiteStreamAuditClient.IngestCachedTelemetryAsync. Idempotent —
+ /// repeated calls before yield the same
+ /// rows again.
+ ///
+ ///
+ /// The two-drain partition is the production wiring of the combined-telemetry
+ /// transport specified in Component-AuditLog.md §"Cached Operations —
+ /// Combined Telemetry": cached rows MUST flow with their matching
+ /// SiteCalls upsert through one MS SQL transaction at central. The
+ /// pre-AuditLog-001 implementation drained cached rows through the
+ /// audit-only path, leaving the operational half unsent and the central
+ /// dual-write handler unreachable. Returning them via this dedicated read
+ /// surface lets the new drain join with the tracking store before push.
+ ///
+ /// Maximum number of rows to return.
+ /// Cancellation token.
+ Task> ReadPendingCachedTelemetryAsync(int limit, CancellationToken ct = default);
+
///
/// Flips the supplied EventIds from
/// to
diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
index 5832e0d7..3473152b 100644
--- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs
+++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
@@ -796,17 +796,29 @@ akka {{
var siteAuditLogger = _serviceProvider.GetRequiredService()
.CreateLogger();
+ // AuditLog-001: resolve the site-local operation tracking store so the
+ // actor can run the combined-telemetry cached-drain in parallel with
+ // the audit-only drain. The store is registered by AddSiteRuntime on
+ // site composition roots; on central it is intentionally absent and
+ // the cached-drain scheduler is never armed (the central side has no
+ // outbound cached calls to track). GetService — null when not
+ // registered — matches the optional-param contract on the actor ctor.
+ var siteTrackingStore = _serviceProvider
+ .GetService();
+
var siteAuditTelemetryProps = Props.Create(() =>
new ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor(
siteAuditQueue,
siteAuditClient,
siteAuditOptions,
- siteAuditLogger))
+ siteAuditLogger,
+ siteTrackingStore))
.WithDispatcher("audit-telemetry-dispatcher");
_actorSystem.ActorOf(siteAuditTelemetryProps, "site-audit-telemetry");
_logger.LogInformation(
- "SiteAuditTelemetryActor created (dispatcher=audit-telemetry-dispatcher, client={ClientType})",
- siteAuditClient.GetType().Name);
+ "SiteAuditTelemetryActor created (dispatcher=audit-telemetry-dispatcher, client={ClientType}, cachedDrain={CachedDrainEnabled})",
+ siteAuditClient.GetType().Name,
+ siteTrackingStore is not null);
// Gate gRPC subscriptions until the actor system and SiteStreamManager are
// initialized (REQ-HOST-7).
diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs
index b1d08d85..6bbdf3ed 100644
--- a/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs
+++ b/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs
@@ -495,14 +495,20 @@ public class ParentExecutionIdCorrelationTests : TestKit, IClassFixture
{
var pending = await sqliteWriter.ReadPendingAsync(256);
+ // AuditLog-001: ReadPendingAsync now excludes the cached-lifecycle
+ // kinds (they ride the combined-telemetry drain), so we union
+ // them in via the dedicated read surface to keep the durability
+ // assertion covering EVERY expected Kind.
+ var pendingCached = await sqliteWriter.ReadPendingCachedTelemetryAsync(256);
var forwarded = await sqliteWriter.ReadForwardedAsync(256);
- var kinds = pending.Concat(forwarded).Select(r => r.Kind).ToHashSet();
+ var kinds = pending.Concat(pendingCached).Concat(forwarded)
+ .Select(r => r.Kind).ToHashSet();
var missing = expectedKinds.Where(k => !kinds.Contains(k)).ToList();
Assert.True(
missing.Count == 0,
"Expected every routed-run audit Kind durably in SQLite; missing: "
+ string.Join(", ", missing)
- + $" (saw {pending.Count} Pending + {forwarded.Count} Forwarded).");
+ + $" (saw {pending.Count} Pending + {pendingCached.Count} PendingCached + {forwarded.Count} Forwarded).");
},
TimeSpan.FromSeconds(30),
TimeSpan.FromMilliseconds(50));
diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs
index 8d5d555b..a7e48564 100644
--- a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs
+++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs
@@ -7,7 +7,9 @@ using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces;
using ScadaLink.Commons.Interfaces.Services;
+using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Communication.Grpc;
@@ -24,6 +26,7 @@ public class SiteAuditTelemetryActorTests : TestKit
{
private readonly ISiteAuditQueue _queue = Substitute.For();
private readonly ISiteStreamAuditClient _client = Substitute.For();
+ private readonly IOperationTrackingStore _trackingStore = Substitute.For();
///
/// Fast options so tests don't stall waiting for the scheduler. 1s busy /
@@ -46,7 +49,22 @@ public class SiteAuditTelemetryActorTests : TestKit
_queue,
_client,
options ?? Opts(),
- NullLogger.Instance)));
+ NullLogger.Instance,
+ (IOperationTrackingStore?)null)));
+
+ ///
+ /// AuditLog-001: builds an actor with the optional
+ /// wired in so the cached-drain
+ /// scheduler is armed alongside the audit-only drain. Used by the new
+ /// cached-drain regression tests below.
+ ///
+ private IActorRef CreateActorWithCachedDrain(IOptions? options = null) =>
+ Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
+ _queue,
+ _client,
+ options ?? Opts(),
+ NullLogger.Instance,
+ (IOperationTrackingStore?)_trackingStore)));
private static AuditEvent NewEvent(Guid? id = null) => new()
{
@@ -233,4 +251,206 @@ public class SiteAuditTelemetryActorTests : TestKit
Arg.Is>(g => g.Count == 3 && g.ToHashSet().SetEquals(ackedSet)),
Arg.Any());
}
+
+ // ────────────────────────────────────────────────────────────────────────
+ // AuditLog-001: combined-telemetry cached-drain regression tests. Verify
+ // that the production wiring of the previously-unreachable cached transport
+ // routes cached rows through ReadPendingCachedTelemetryAsync +
+ // IngestCachedTelemetryAsync (and NOT IngestAuditEventsAsync), and that
+ // orphaned audit rows (no tracking snapshot) are logged + skipped rather
+ // than crashing the drain.
+ // ────────────────────────────────────────────────────────────────────────
+
+ private static AuditEvent NewCachedEvent(
+ AuditKind kind = AuditKind.CachedSubmit,
+ Guid? eventId = null,
+ Guid? correlationId = null,
+ string sourceSiteId = "site-1") => new()
+ {
+ EventId = eventId ?? Guid.NewGuid(),
+ OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
+ Channel = AuditChannel.ApiOutbound,
+ Kind = kind,
+ Status = AuditStatus.Submitted,
+ SourceSiteId = sourceSiteId,
+ Target = "ERP.GetOrder",
+ CorrelationId = correlationId ?? Guid.NewGuid(),
+ ForwardState = AuditForwardState.Pending,
+ };
+
+ private static TrackingStatusSnapshot NewSnapshot(
+ TrackedOperationId id,
+ string status = "Submitted",
+ int retryCount = 0) => new(
+ Id: id,
+ Kind: nameof(AuditKind.ApiCallCached),
+ TargetSummary: "ERP.GetOrder",
+ Status: status,
+ RetryCount: retryCount,
+ LastError: null,
+ HttpStatus: null,
+ CreatedAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
+ UpdatedAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
+ TerminalAtUtc: null,
+ SourceInstanceId: "instance-1",
+ SourceScript: "script-1",
+ SourceNode: "node-a");
+
+ [Fact]
+ public async Task CachedDrain_CachedRows_RouteThrough_IngestCachedTelemetry_NotIngestAuditEvents()
+ {
+ // Arrange — three cached audit rows on the cached queue, each with a
+ // matching tracking snapshot. The audit-only queue is empty (those
+ // rows are excluded by ReadPendingAsync after AuditLog-001).
+ var cachedRows = new[]
+ {
+ NewCachedEvent(AuditKind.CachedSubmit),
+ NewCachedEvent(AuditKind.ApiCallCached),
+ NewCachedEvent(AuditKind.CachedResolve),
+ };
+
+ _queue.ReadPendingAsync(Arg.Any(), Arg.Any())
+ .Returns(Task.FromResult>(Array.Empty()));
+ _queue.ReadPendingCachedTelemetryAsync(Arg.Any(), Arg.Any())
+ .Returns(
+ Task.FromResult>(cachedRows),
+ Task.FromResult>(Array.Empty()));
+
+ foreach (var row in cachedRows)
+ {
+ var tid = new TrackedOperationId(row.CorrelationId!.Value);
+ _trackingStore.GetStatusAsync(tid, Arg.Any())
+ .Returns(Task.FromResult(NewSnapshot(tid)));
+ }
+
+ CachedTelemetryBatch? capturedBatch = null;
+ _client.IngestCachedTelemetryAsync(Arg.Any(), Arg.Any())
+ .Returns(call =>
+ {
+ capturedBatch = call.Arg();
+ var ack = new IngestAck();
+ foreach (var packet in capturedBatch.Packets)
+ {
+ ack.AcceptedEventIds.Add(packet.AuditEvent.EventId);
+ }
+ return Task.FromResult(ack);
+ });
+
+ // Act
+ CreateActorWithCachedDrain();
+
+ // Assert — exactly one IngestCachedTelemetryAsync push containing all
+ // three packets, and zero IngestAuditEventsAsync pushes (the audit-only
+ // drain saw an empty queue).
+ await AwaitAssertAsync(async () =>
+ {
+ await _client.Received(1).IngestCachedTelemetryAsync(
+ Arg.Any(), Arg.Any());
+ await _queue.Received(1).MarkForwardedAsync(
+ Arg.Is>(g => g.Count == 3), Arg.Any());
+ }, TimeSpan.FromSeconds(5));
+
+ Assert.NotNull(capturedBatch);
+ Assert.Equal(3, capturedBatch!.Packets.Count);
+
+ await _client.DidNotReceiveWithAnyArgs().IngestAuditEventsAsync(default!, default);
+
+ var emittedEventIds = capturedBatch.Packets
+ .Select(p => Guid.Parse(p.AuditEvent.EventId))
+ .ToHashSet();
+ var expectedIds = cachedRows.Select(r => r.EventId).ToHashSet();
+ Assert.Equal(expectedIds, emittedEventIds);
+ }
+
+ [Fact]
+ public async Task CachedDrain_OrphanRow_NoTrackingSnapshot_IsSkipped_DoesNotCrash()
+ {
+ // Arrange — two cached audit rows: one with a tracking snapshot, one
+ // orphaned (the tracking store returns null). The orphaned row must be
+ // skipped without aborting the batch — the valid row still flows.
+ var orphan = NewCachedEvent(AuditKind.CachedSubmit);
+ var valid = NewCachedEvent(AuditKind.CachedResolve);
+
+ _queue.ReadPendingAsync(Arg.Any(), Arg.Any())
+ .Returns(Task.FromResult>(Array.Empty()));
+ _queue.ReadPendingCachedTelemetryAsync(Arg.Any(), Arg.Any())
+ .Returns(
+ Task.FromResult>(new[] { orphan, valid }),
+ Task.FromResult>(Array.Empty()));
+
+ // orphan: tracking returns null
+ _trackingStore.GetStatusAsync(
+ new TrackedOperationId(orphan.CorrelationId!.Value),
+ Arg.Any())
+ .Returns(Task.FromResult(null));
+ // valid: tracking returns a snapshot
+ var validTid = new TrackedOperationId(valid.CorrelationId!.Value);
+ _trackingStore.GetStatusAsync(validTid, Arg.Any())
+ .Returns(Task.FromResult(NewSnapshot(validTid, "Delivered")));
+
+ CachedTelemetryBatch? capturedBatch = null;
+ _client.IngestCachedTelemetryAsync(Arg.Any(), Arg.Any())
+ .Returns(call =>
+ {
+ capturedBatch = call.Arg();
+ var ack = new IngestAck();
+ foreach (var packet in capturedBatch.Packets)
+ {
+ ack.AcceptedEventIds.Add(packet.AuditEvent.EventId);
+ }
+ return Task.FromResult(ack);
+ });
+
+ // Act
+ CreateActorWithCachedDrain();
+
+ // Assert — exactly one push containing ONLY the valid row; the orphan
+ // is skipped and stays Pending (not in MarkForwardedAsync's id list).
+ await AwaitAssertAsync(async () =>
+ {
+ await _client.Received(1).IngestCachedTelemetryAsync(
+ Arg.Any(), Arg.Any());
+ }, TimeSpan.FromSeconds(5));
+
+ Assert.NotNull(capturedBatch);
+ Assert.Single(capturedBatch!.Packets);
+ Assert.Equal(valid.EventId.ToString(), capturedBatch.Packets[0].AuditEvent.EventId);
+
+ await _queue.Received(1).MarkForwardedAsync(
+ Arg.Is>(g => g.Count == 1 && g[0] == valid.EventId),
+ Arg.Any());
+ }
+
+ [Fact]
+ public async Task AuditOnlyDrain_StillFlows_When_CachedDrain_IsDisabled()
+ {
+ // Arrange — ordinary (non-cached) audit rows on the audit-only queue;
+ // the actor is constructed WITHOUT a tracking store so the cached
+ // scheduler is never armed. Regression guard against the audit-only
+ // drain regressing during the AuditLog-001 refactor.
+ var rows = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList();
+ _queue.ReadPendingAsync(Arg.Any(), Arg.Any())
+ .Returns(
+ Task.FromResult>(rows),
+ Task.FromResult>(Array.Empty()));
+
+ _client.IngestAuditEventsAsync(Arg.Any(), Arg.Any())
+ .Returns(_ => Task.FromResult(AckAll(rows)));
+
+ // Act — note: CreateActor (no tracking store), not CreateActorWithCachedDrain.
+ CreateActor();
+
+ // Assert — audit-only drain flows normally; the cached client is
+ // never called and ReadPendingCachedTelemetryAsync is never queried.
+ await AwaitAssertAsync(async () =>
+ {
+ await _client.Received(1).IngestAuditEventsAsync(
+ Arg.Any(), Arg.Any());
+ await _queue.Received(1).MarkForwardedAsync(
+ Arg.Is>(g => g.Count == 3), Arg.Any());
+ }, TimeSpan.FromSeconds(5));
+
+ await _client.DidNotReceiveWithAnyArgs().IngestCachedTelemetryAsync(default!, default);
+ await _queue.DidNotReceiveWithAnyArgs().ReadPendingCachedTelemetryAsync(default, default);
+ }
}