Files
scadalink-design/docs/plans/2026-05-20-auditlog-m3-cached-operations.md
Joseph Doherty 4ca0b3ce2a docs(audit): add M3 cached-operations implementation plan (#23)
7 bundles, 18 tasks. Vocabulary alignment baked in (CachedSubmit /
ApiCallCached / DbWriteCached / CachedResolve, statuses Submitted /
Forwarded / Attempted / Delivered / Failed / Parked / Discarded).

Recommended defaults locked: new IngestCachedTelemetry RPC, separate
CachedCallTelemetryForwarder reusing the SiteAuditTelemetryActor drain
loop, ScriptRuntimeContext wrapper for provenance, component-level e2e
in AuditLog.Tests via the extracted DirectActorSiteStreamAuditClient
helper.
2026-05-20 13:43:46 -04:00

14 KiB
Raw Blame History

Audit Log #23 — M3 Cached Operations + Dual-Write Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:subagent-driven-development (bundled cadence per feedback_subagent_cadence).

Goal: Cached external calls (ExternalSystem.CachedCall) and cached DB writes (Database.CachedWrite) each produce 4+ audit rows per operation (CachedSubmitApiCallCached/DbWriteCached × N attempts with statuses Forwarded then Attempted then Delivered/FailedCachedResolve terminal) AND a SiteCalls row at central. Combined telemetry: site emits one packet per lifecycle event carrying both the AuditEvent and the SiteCalls upsert; central writes both in one MS SQL transaction. Audit-write failure never aborts the script.

Recommended-defaults applied:

  • Telemetry proto: new top-level RPC IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck) (sitestream.proto), separate from the M2 IngestAuditEvents to keep payload shapes distinct.
  • Forwarder: separate CachedCallTelemetryForwarder actor (or static dispatcher hooking into the existing SiteAuditTelemetryActor's SQLite queue) — write the audit row + tracking row in one SQLite transaction, then let the existing telemetry actor drain both via the new RPC. Reuse the M2 Channel/SQLite hot-path infrastructure; do NOT introduce a parallel writer.
  • Provenance: mirror M2's ScriptRuntimeContext wrapper pattern — ScriptRuntimeContext's cached-call helpers capture instance/script/site and feed the combined packet.
  • IntegrationTests E2E: same component-level pattern as M2 Bundle H (DirectActorSiteStreamAuditClient), but extracted into tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/ for reuse.

M2 realities baked in (from roadmap line 446-459):

  • Use M1 vocabulary: AuditKind.CachedSubmit (enqueue), AuditKind.ApiCallCached / AuditKind.DbWriteCached (each attempt + post-forward), AuditKind.CachedResolve (terminal). AuditStatus.SubmittedForwardedAttempted × N → Delivered/Failed/Parked/Discarded. NO CachedEnqueued/CachedAttempt/CachedTerminal strings appear in code (those are pre-M1 spec wording the roadmap text still mentions; honor the enum vocabulary).
  • NoOpSiteStreamAuditClient still in production until M6; E2E tests use the M2 Bundle H pattern.
  • AuditEventMapper duplication note from M2: M3 should move the mapper into Commons (or document the gRPC inline duplication) since M3 adds a SECOND gRPC handler with the same DTO→entity translation work.
  • CachedCallTelemetry message creates from scratch (additive per Commons REQ-COM-5a) — NOT renamed to CachedOperationTelemetry.

Bundles

  • Bundle A — Commons types + tracking store (T1, T2, T3, T4): TrackedOperationId, OperationTrackingStore, Tracking.Status API, CachedCallTelemetry message.
  • Bundle B — SiteCalls table EF + migration + repo (T5, T6, T7).
  • Bundle C — SiteCallAudit project + actor (T8).
  • Bundle D — Proto + central dual-write transaction (T9, T10).
  • Bundle E — ESG / DB-gateway / S&F emissions (T11, T12, T13, T14).
  • Bundle F — Host registration (T15).
  • Bundle G — Integration tests (T16, T17, T18).

Final cross-bundle reviewer + merge to main.


Bundle A — Commons types + tracking store

Task A1: TrackedOperationId strong-typed ID

File: src/ScadaLink.Commons/Types/TrackedOperationId.cspublic readonly record struct TrackedOperationId(Guid Value). Static New(), Parse(string), ToString() returns Value.ToString("D"). Implicit conversion from Guid via From(Guid) (no operator implicit because record struct doesn't allow). Tests in tests/ScadaLink.Commons.Tests/Types/TrackedOperationIdTests.cs. Commit: feat(commons): TrackedOperationId strong type (#23 M3).

Task A2: OperationTrackingStore (site-local SQLite)

File: src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.csRecordEnqueueAsync, RecordAttemptAsync, RecordTerminalAsync, GetStatusAsync(TrackedOperationId), PurgeTerminalAsync(olderThanUtc). File: src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs — SQLite-backed, mirror SqliteAuditWriter pattern: Channel + background writer Task + write-lock. Schema:

CREATE TABLE IF NOT EXISTS OperationTracking (
  TrackedOperationId TEXT NOT NULL PRIMARY KEY,
  Kind TEXT NOT NULL,
  TargetSummary TEXT NULL,
  Status TEXT NOT NULL,
  RetryCount INTEGER NOT NULL DEFAULT 0,
  LastError TEXT NULL,
  HttpStatus INTEGER NULL,
  CreatedAtUtc TEXT NOT NULL,
  UpdatedAtUtc TEXT NOT NULL,
  TerminalAtUtc TEXT NULL,
  SourceInstanceId TEXT NULL,
  SourceScript TEXT NULL);
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated ON OperationTracking(Status, UpdatedAtUtc);

Tests: schema, insert+update sequence, terminal purge (only terminal rows older than threshold). Commit: feat(siteruntime): OperationTrackingStore site-local SQLite (#23 M3).

Task A3: Tracking.Status script API

File: src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs — add a Tracking accessor exposing Status(TrackedOperationId) reading via IOperationTrackingStore.GetStatusAsync. Returns a TrackingStatusSnapshot record (Commons/Types) with Status, RetryCount, LastError, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc. Returns null for unknown IDs. Tests: known, unknown, terminal IDs. Commit: feat(siteruntime): Tracking.Status script API (#23 M3).

Task A4: CachedCallTelemetry Commons message

File: src/ScadaLink.Commons/Messages/Integration/CachedCallTelemetry.cspublic sealed record CachedCallTelemetry(TrackedOperationId TrackedOperationId, AuditEvent Audit, SiteCallOperational Operational) plus SiteCallOperational record (TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount, LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc?). Tests: round-trip; lifecycle-specific construction (Submit/Attempted/Resolve). Commit: feat(commons): CachedCallTelemetry combined operational+audit packet (#23 M3).


Bundle B — SiteCalls EF + migration + repo

Task B1: SiteCall entity + EF mapping

File: src/ScadaLink.Commons/Entities/Audit/SiteCall.cspublic sealed record SiteCall with fields per SiteCallOperational plus IngestedAtUtc. File: src/ScadaLink.ConfigurationDatabase/Configurations/SiteCallEntityTypeConfiguration.cs — table SiteCalls, PK on TrackedOperationId, indexes IX_SiteCalls_Source_Created on (SourceSite, CreatedAtUtc), IX_SiteCalls_Status_Updated on (Status, UpdatedAtUtc). Modify: ScadaLinkDbContext.cspublic DbSet<SiteCall> SiteCalls => Set<SiteCall>();. Tests as M1 pattern. Commit: feat(configdb): map SiteCall to SiteCalls table (#23 M3).

Task B2: AddSiteCallsTable migration

Generate via dotnet ef migrations add AddSiteCallsTable --project src/ScadaLink.ConfigurationDatabase --startup-project src/ScadaLink.Host. No partitioning (operational state, not audit). Use MsSqlMigrationFixture for integration test. Commit: feat(configdb): add SiteCalls migration (#23 M3).

Task B3: ISiteCallAuditRepository + EF impl

File: src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.csUpsertAsync(SiteCall) with monotonic status progression (later status wins; earlier status is no-op), GetAsync(TrackedOperationId), QueryAsync(filter, paging), PurgeTerminalAsync(olderThanUtc). File: src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs — implement via MERGE or INSERT ... WHERE NOT EXISTS + UPDATE WHERE TerminalAtUtc IS NULL AND <monotonic order check>. Tests use MsSqlMigrationFixture. Commit: feat(configdb): ISiteCallAuditRepository + EF impl (#23 M3).


Bundle C — SiteCallAudit project + actor

Task C1: ScadaLink.SiteCallAudit project + actor

Create: src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj (mirrors ScadaLink.AuditLog csproj style — net10.0, references Commons + ConfigurationDatabase). Create: src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs — central singleton actor handling UpsertSiteCallCommand(SiteCall siteCall) by calling ISiteCallAuditRepository.UpsertAsync (scope-per-message via IServiceProvider, mirror AuditLogIngestActor). Idempotent via repo's monotonic upsert. Create: src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.csAddSiteCallAudit() registering actor props factory. Create: tests/ScadaLink.SiteCallAudit.Tests/ project. Modify: ScadaLink.slnx — add src + tests entries. Commit: feat(scaudit): SiteCallAuditActor minimum surface (#22, #23 M3).


Bundle D — Proto + central dual-write transaction

Task D1: Extend sitestream.proto with IngestCachedTelemetry RPC

Follow the documented protobuf regen procedure from M2 Bundle C (temporarily uncomment ItemGroup, build, copy back, recomment). Add:

message SiteCallOperationalDto {
  string tracked_operation_id = 1;
  string channel = 2;
  string target = 3;
  string source_site = 4;
  string status = 5;
  int32 retry_count = 6;
  string last_error = 7;
  google.protobuf.Int32Value http_status = 8;
  google.protobuf.Timestamp created_at_utc = 9;
  google.protobuf.Timestamp updated_at_utc = 10;
  google.protobuf.Timestamp terminal_at_utc = 11;  // null when active
}
message CachedTelemetryPacket {
  AuditEventDto audit_event = 1;
  SiteCallOperationalDto operational = 2;
}
message CachedTelemetryBatch { repeated CachedTelemetryPacket packets = 1; }

service SiteStreamService {
  rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
}

Test round-trips. Commit: feat(comms): IngestCachedTelemetry RPC + CachedTelemetryPacket proto (#23 M3).

Task D2: Dual-write transaction in AuditLogIngestActor

File: src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs (extend) — add IngestCachedTelemetryCommand handler. Inside one DbContext.Database.BeginTransactionAsync():

  1. Call IAuditLogRepository.InsertIfNotExistsAsync(auditEvent) (idempotent already from M2 Bundle A).
  2. Call ISiteCallAuditRepository.UpsertAsync(siteCallOperational) (monotonic).
  3. Commit on both-success; rollback on either-throw (the central singleton SUPERVISES — actor doesn't crash).
  4. Reply IngestAck(acceptedIds).

Modify: src/ScadaLink.Communication/SiteStreamGrpc/SiteStreamGrpcServer.cs — implement IngestCachedTelemetry gRPC handler routing to actor. Same inline FromDto pattern as M2 (move to mapper if time permits per M2 reviewer recommendation).

Add: src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryCommand.cs and IngestCachedTelemetryReply.cs (Akka messages).

Tests:

  • Single packet → 1 AuditLog + 1 SiteCalls row.
  • Duplicate EventId + same status → AuditLog no-op, SiteCalls no-op (monotonic), no error.
  • Duplicate EventId + ADVANCED status → AuditLog no-op, SiteCalls updates.
  • SiteCalls upsert throws → AuditLog rolled back (no orphan).
  • AuditLog throws (non-duplicate) → SiteCalls rolled back.

Commit: feat(auditlog): combined telemetry dual-write transaction (#23 M3).


Bundle E — ESG / DB / S&F lifecycle emissions

Task E1: ScriptRuntimeContext.ExternalSystem.CachedCall wrapper

Mirror M2 Bundle F's Call wrapper. Differences:

  • Emit on enqueue: AuditEvent(Kind=CachedSubmit, Status=Submitted) + SiteCallOperational(Status=Submitted, RetryCount=0).
  • Calls _externalSystemClient.CachedCallAsync (resolves what S&F existing API surface looks like — discover by reading ExternalSystemClient).
  • Returns a TrackedOperationId immediately (a TrackedOperationId tracking handle).
  • Hands the operation to the existing StoreAndForward retry loop.

For the per-attempt + terminal emissions, hook into the S&F dispatch loop (Bundle E2/E3).

Task E2: S&F retry-loop emission

Find the S&F retry-attempt callback site in src/ScadaLink.StoreAndForward/. On each attempt (success/transient/permanent):

  • Build AuditEvent(Kind=ApiCallCached or DbWriteCached, Status=Attempted).
  • Build SiteCallOperational(Status=Attempted, RetryCount=N, LastError, HttpStatus).
  • Hand to CachedCallTelemetryForwarder which writes both to SQLite (AuditLog + OperationTracking tables, in one SQLite transaction) and lets SiteAuditTelemetryActor's drain loop push them.

Task E3: S&F terminal-state emission

On final state transition (Delivered / Failed / Parked / Discarded):

  • Build AuditEvent(Kind=CachedResolve, Status={final state}).
  • Build SiteCallOperational(Status={final state}, TerminalAtUtc=DateTime.UtcNow).
  • Forward.

Task E4: Database.CachedWrite mirror

Same three-event pattern but Channel=DbOutbound, Kind=DbWriteCached for attempts, Kind=CachedSubmit for enqueue, Kind=CachedResolve for terminal.

Tests in ExternalSystemGateway.Tests + StoreAndForward.Tests.

Commit (bundle-level): one commit per task, descriptive messages following M2 style.


Bundle F — Host registration

Task F1: Register SiteCallAuditActor central singleton

File: src/ScadaLink.Host/Actors/AkkaHostedService.cs — register SiteCallAuditActor central singleton + proxy alongside AuditLogIngestActor. Hand the proxy to SiteStreamGrpcServer.SetSiteCallAuditActor(proxy) (mirroring SetAuditIngestActor). File: src/ScadaLink.Host/Program.cs — call .AddSiteCallAudit() on the central role's services. Tests in tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs (extend). Commit: feat(host): register SiteCallAuditActor central singleton (#22, #23 M3).


Bundle G — Integration tests

Task G1: Extract DirectActorSiteStreamAuditClient to shared infrastructure

Move from tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs private inner class into tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/DirectActorSiteStreamAuditClient.cs. Extend to also implement the new IngestCachedTelemetryAsync method (mirror pattern).

Task G2: Cached call E2E test

File: tests/ScadaLink.AuditLog.Tests/Integration/CachedCallCombinedTelemetryTests.cs (use AuditLog.Tests, not IntegrationTests, because the existing IntegrationTests harness disables Akka per M2 reality). Test: cached call that fails twice then succeeds produces 5 AuditLog rows (1 Submit + 1 Forwarded + 2 Attempted + 1 Resolve) + 1 SiteCalls row (Status=Delivered) + Tracking.Status reports Delivered.

Task G3: Cached DB write E2E test

File: tests/ScadaLink.AuditLog.Tests/Integration/CachedWriteCombinedTelemetryTests.cs. Mirror G2 for DB.

Task G4: Idempotency test

File: tests/ScadaLink.AuditLog.Tests/Integration/CombinedTelemetryIdempotencyTests.cs. Send the same packet twice; assert exactly 1 AuditLog row + 1 SiteCalls row.


Final cross-bundle review + merge

Same template as M1/M2.