diff --git a/docs/plans/2026-05-20-auditlog-m3-cached-operations.md b/docs/plans/2026-05-20-auditlog-m3-cached-operations.md new file mode 100644 index 0000000..a9594a0 --- /dev/null +++ b/docs/plans/2026-05-20-auditlog-m3-cached-operations.md @@ -0,0 +1,212 @@ +# Audit Log #23 — M3 Cached Operations + Dual-Write Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:subagent-driven-development (bundled cadence per `feedback_subagent_cadence`). + +**Goal:** Cached external calls (`ExternalSystem.CachedCall`) and cached DB writes (`Database.CachedWrite`) each produce 4+ audit rows per operation (`CachedSubmit` → `ApiCallCached`/`DbWriteCached` × N attempts with statuses `Forwarded` then `Attempted` then `Delivered`/`Failed` → `CachedResolve` terminal) AND a `SiteCalls` row at central. Combined telemetry: site emits one packet per lifecycle event carrying both the AuditEvent and the SiteCalls upsert; central writes both in one MS SQL transaction. Audit-write failure never aborts the script. + +**Recommended-defaults applied:** +- Telemetry proto: **new top-level RPC `IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck)`** (sitestream.proto), separate from the M2 `IngestAuditEvents` to keep payload shapes distinct. +- Forwarder: **separate `CachedCallTelemetryForwarder`** actor (or static dispatcher hooking into the existing `SiteAuditTelemetryActor`'s SQLite queue) — write the audit row + tracking row in one SQLite transaction, then let the existing telemetry actor drain both via the new RPC. Reuse the M2 Channel/SQLite hot-path infrastructure; do NOT introduce a parallel writer. +- Provenance: mirror M2's `ScriptRuntimeContext` wrapper pattern — ScriptRuntimeContext's cached-call helpers capture instance/script/site and feed the combined packet. +- IntegrationTests E2E: same component-level pattern as M2 Bundle H (`DirectActorSiteStreamAuditClient`), but extracted into `tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/` for reuse. + +**M2 realities baked in (from roadmap line 446-459):** +- Use M1 vocabulary: `AuditKind.CachedSubmit` (enqueue), `AuditKind.ApiCallCached` / `AuditKind.DbWriteCached` (each attempt + post-forward), `AuditKind.CachedResolve` (terminal). `AuditStatus.Submitted` → `Forwarded` → `Attempted` × N → `Delivered`/`Failed`/`Parked`/`Discarded`. NO `CachedEnqueued`/`CachedAttempt`/`CachedTerminal` strings appear in code (those are pre-M1 spec wording the roadmap text still mentions; honor the enum vocabulary). +- NoOpSiteStreamAuditClient still in production until M6; E2E tests use the M2 Bundle H pattern. +- AuditEventMapper duplication note from M2: M3 should move the mapper into Commons (or document the gRPC inline duplication) since M3 adds a SECOND gRPC handler with the same DTO→entity translation work. +- CachedCallTelemetry message creates from scratch (additive per Commons REQ-COM-5a) — NOT renamed to CachedOperationTelemetry. + +--- + +## Bundles + +- **Bundle A — Commons types + tracking store** (T1, T2, T3, T4): TrackedOperationId, OperationTrackingStore, Tracking.Status API, CachedCallTelemetry message. +- **Bundle B — SiteCalls table EF + migration + repo** (T5, T6, T7). +- **Bundle C — SiteCallAudit project + actor** (T8). +- **Bundle D — Proto + central dual-write transaction** (T9, T10). +- **Bundle E — ESG / DB-gateway / S&F emissions** (T11, T12, T13, T14). +- **Bundle F — Host registration** (T15). +- **Bundle G — Integration tests** (T16, T17, T18). + +Final cross-bundle reviewer + merge to main. + +--- + +## Bundle A — Commons types + tracking store + +### Task A1: TrackedOperationId strong-typed ID +File: `src/ScadaLink.Commons/Types/TrackedOperationId.cs` — `public readonly record struct TrackedOperationId(Guid Value)`. Static `New()`, `Parse(string)`, `ToString()` returns Value.ToString("D"). Implicit conversion from Guid via `From(Guid)` (no operator implicit because record struct doesn't allow). Tests in `tests/ScadaLink.Commons.Tests/Types/TrackedOperationIdTests.cs`. Commit: `feat(commons): TrackedOperationId strong type (#23 M3)`. + +### Task A2: OperationTrackingStore (site-local SQLite) +File: `src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs` — `RecordEnqueueAsync`, `RecordAttemptAsync`, `RecordTerminalAsync`, `GetStatusAsync(TrackedOperationId)`, `PurgeTerminalAsync(olderThanUtc)`. +File: `src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs` — SQLite-backed, mirror SqliteAuditWriter pattern: Channel + background writer Task + write-lock. Schema: +```sql +CREATE TABLE IF NOT EXISTS OperationTracking ( + TrackedOperationId TEXT NOT NULL PRIMARY KEY, + Kind TEXT NOT NULL, + TargetSummary TEXT NULL, + Status TEXT NOT NULL, + RetryCount INTEGER NOT NULL DEFAULT 0, + LastError TEXT NULL, + HttpStatus INTEGER NULL, + CreatedAtUtc TEXT NOT NULL, + UpdatedAtUtc TEXT NOT NULL, + TerminalAtUtc TEXT NULL, + SourceInstanceId TEXT NULL, + SourceScript TEXT NULL); +CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated ON OperationTracking(Status, UpdatedAtUtc); +``` +Tests: schema, insert+update sequence, terminal purge (only terminal rows older than threshold). Commit: `feat(siteruntime): OperationTrackingStore site-local SQLite (#23 M3)`. + +### Task A3: Tracking.Status script API +File: `src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs` — add a `Tracking` accessor exposing `Status(TrackedOperationId)` reading via `IOperationTrackingStore.GetStatusAsync`. Returns a `TrackingStatusSnapshot` record (Commons/Types) with `Status`, `RetryCount`, `LastError`, `CreatedAtUtc`, `UpdatedAtUtc`, `TerminalAtUtc`. Returns null for unknown IDs. +Tests: known, unknown, terminal IDs. Commit: `feat(siteruntime): Tracking.Status script API (#23 M3)`. + +### Task A4: CachedCallTelemetry Commons message +File: `src/ScadaLink.Commons/Messages/Integration/CachedCallTelemetry.cs` — `public sealed record CachedCallTelemetry(TrackedOperationId TrackedOperationId, AuditEvent Audit, SiteCallOperational Operational)` plus `SiteCallOperational` record (TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount, LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc?). +Tests: round-trip; lifecycle-specific construction (Submit/Attempted/Resolve). Commit: `feat(commons): CachedCallTelemetry combined operational+audit packet (#23 M3)`. + +--- + +## Bundle B — SiteCalls EF + migration + repo + +### Task B1: SiteCall entity + EF mapping +File: `src/ScadaLink.Commons/Entities/Audit/SiteCall.cs` — `public sealed record SiteCall` with fields per `SiteCallOperational` plus `IngestedAtUtc`. +File: `src/ScadaLink.ConfigurationDatabase/Configurations/SiteCallEntityTypeConfiguration.cs` — table `SiteCalls`, PK on `TrackedOperationId`, indexes `IX_SiteCalls_Source_Created` on (SourceSite, CreatedAtUtc), `IX_SiteCalls_Status_Updated` on (Status, UpdatedAtUtc). +Modify: `ScadaLinkDbContext.cs` — `public DbSet SiteCalls => Set();`. +Tests as M1 pattern. Commit: `feat(configdb): map SiteCall to SiteCalls table (#23 M3)`. + +### Task B2: AddSiteCallsTable migration +Generate via `dotnet ef migrations add AddSiteCallsTable --project src/ScadaLink.ConfigurationDatabase --startup-project src/ScadaLink.Host`. No partitioning (operational state, not audit). Use MsSqlMigrationFixture for integration test. Commit: `feat(configdb): add SiteCalls migration (#23 M3)`. + +### Task B3: ISiteCallAuditRepository + EF impl +File: `src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs` — `UpsertAsync(SiteCall)` with **monotonic status progression** (later status wins; earlier status is no-op), `GetAsync(TrackedOperationId)`, `QueryAsync(filter, paging)`, `PurgeTerminalAsync(olderThanUtc)`. +File: `src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs` — implement via `MERGE` or `INSERT ... WHERE NOT EXISTS` + `UPDATE WHERE TerminalAtUtc IS NULL AND `. Tests use MsSqlMigrationFixture. Commit: `feat(configdb): ISiteCallAuditRepository + EF impl (#23 M3)`. + +--- + +## Bundle C — SiteCallAudit project + actor + +### Task C1: ScadaLink.SiteCallAudit project + actor +Create: `src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj` (mirrors ScadaLink.AuditLog csproj style — net10.0, references Commons + ConfigurationDatabase). +Create: `src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs` — central singleton actor handling `UpsertSiteCallCommand(SiteCall siteCall)` by calling `ISiteCallAuditRepository.UpsertAsync` (scope-per-message via IServiceProvider, mirror AuditLogIngestActor). Idempotent via repo's monotonic upsert. +Create: `src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs` — `AddSiteCallAudit()` registering actor props factory. +Create: `tests/ScadaLink.SiteCallAudit.Tests/` project. +Modify: `ScadaLink.slnx` — add src + tests entries. +Commit: `feat(scaudit): SiteCallAuditActor minimum surface (#22, #23 M3)`. + +--- + +## Bundle D — Proto + central dual-write transaction + +### Task D1: Extend sitestream.proto with IngestCachedTelemetry RPC +Follow the documented protobuf regen procedure from M2 Bundle C (temporarily uncomment ItemGroup, build, copy back, recomment). Add: +```proto +message SiteCallOperationalDto { + string tracked_operation_id = 1; + string channel = 2; + string target = 3; + string source_site = 4; + string status = 5; + int32 retry_count = 6; + string last_error = 7; + google.protobuf.Int32Value http_status = 8; + google.protobuf.Timestamp created_at_utc = 9; + google.protobuf.Timestamp updated_at_utc = 10; + google.protobuf.Timestamp terminal_at_utc = 11; // null when active +} +message CachedTelemetryPacket { + AuditEventDto audit_event = 1; + SiteCallOperationalDto operational = 2; +} +message CachedTelemetryBatch { repeated CachedTelemetryPacket packets = 1; } + +service SiteStreamService { + rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck); +} +``` +Test round-trips. Commit: `feat(comms): IngestCachedTelemetry RPC + CachedTelemetryPacket proto (#23 M3)`. + +### Task D2: Dual-write transaction in AuditLogIngestActor +File: `src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs` (extend) — add `IngestCachedTelemetryCommand` handler. Inside one `DbContext.Database.BeginTransactionAsync()`: +1. Call `IAuditLogRepository.InsertIfNotExistsAsync(auditEvent)` (idempotent already from M2 Bundle A). +2. Call `ISiteCallAuditRepository.UpsertAsync(siteCallOperational)` (monotonic). +3. Commit on both-success; rollback on either-throw (the central singleton SUPERVISES — actor doesn't crash). +4. Reply `IngestAck(acceptedIds)`. + +Modify: `src/ScadaLink.Communication/SiteStreamGrpc/SiteStreamGrpcServer.cs` — implement `IngestCachedTelemetry` gRPC handler routing to actor. Same inline FromDto pattern as M2 (move to mapper if time permits per M2 reviewer recommendation). + +Add: `src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryCommand.cs` and `IngestCachedTelemetryReply.cs` (Akka messages). + +Tests: +- Single packet → 1 AuditLog + 1 SiteCalls row. +- Duplicate `EventId` + same status → AuditLog no-op, SiteCalls no-op (monotonic), no error. +- Duplicate `EventId` + ADVANCED status → AuditLog no-op, SiteCalls updates. +- SiteCalls upsert throws → AuditLog rolled back (no orphan). +- AuditLog throws (non-duplicate) → SiteCalls rolled back. + +Commit: `feat(auditlog): combined telemetry dual-write transaction (#23 M3)`. + +--- + +## Bundle E — ESG / DB / S&F lifecycle emissions + +### Task E1: ScriptRuntimeContext.ExternalSystem.CachedCall wrapper +Mirror M2 Bundle F's `Call` wrapper. Differences: +- Emit on enqueue: AuditEvent(Kind=CachedSubmit, Status=Submitted) + SiteCallOperational(Status=Submitted, RetryCount=0). +- Calls `_externalSystemClient.CachedCallAsync` (resolves what S&F existing API surface looks like — discover by reading ExternalSystemClient). +- Returns a `TrackedOperationId` immediately (a TrackedOperationId tracking handle). +- Hands the operation to the existing StoreAndForward retry loop. + +For the per-attempt + terminal emissions, hook into the S&F dispatch loop (Bundle E2/E3). + +### Task E2: S&F retry-loop emission +Find the S&F retry-attempt callback site in `src/ScadaLink.StoreAndForward/`. On each attempt (success/transient/permanent): +- Build AuditEvent(Kind=ApiCallCached or DbWriteCached, Status=Attempted). +- Build SiteCallOperational(Status=Attempted, RetryCount=N, LastError, HttpStatus). +- Hand to `CachedCallTelemetryForwarder` which writes both to SQLite (AuditLog + OperationTracking tables, in one SQLite transaction) and lets SiteAuditTelemetryActor's drain loop push them. + +### Task E3: S&F terminal-state emission +On final state transition (Delivered / Failed / Parked / Discarded): +- Build AuditEvent(Kind=CachedResolve, Status={final state}). +- Build SiteCallOperational(Status={final state}, TerminalAtUtc=DateTime.UtcNow). +- Forward. + +### Task E4: Database.CachedWrite mirror +Same three-event pattern but Channel=DbOutbound, Kind=DbWriteCached for attempts, Kind=CachedSubmit for enqueue, Kind=CachedResolve for terminal. + +Tests in ExternalSystemGateway.Tests + StoreAndForward.Tests. + +Commit (bundle-level): one commit per task, descriptive messages following M2 style. + +--- + +## Bundle F — Host registration + +### Task F1: Register SiteCallAuditActor central singleton +File: `src/ScadaLink.Host/Actors/AkkaHostedService.cs` — register `SiteCallAuditActor` central singleton + proxy alongside `AuditLogIngestActor`. Hand the proxy to `SiteStreamGrpcServer.SetSiteCallAuditActor(proxy)` (mirroring `SetAuditIngestActor`). +File: `src/ScadaLink.Host/Program.cs` — call `.AddSiteCallAudit()` on the central role's services. +Tests in `tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs` (extend). +Commit: `feat(host): register SiteCallAuditActor central singleton (#22, #23 M3)`. + +--- + +## Bundle G — Integration tests + +### Task G1: Extract DirectActorSiteStreamAuditClient to shared infrastructure +Move from `tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs` private inner class into `tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/DirectActorSiteStreamAuditClient.cs`. Extend to also implement the new `IngestCachedTelemetryAsync` method (mirror pattern). + +### Task G2: Cached call E2E test +File: `tests/ScadaLink.AuditLog.Tests/Integration/CachedCallCombinedTelemetryTests.cs` (use AuditLog.Tests, not IntegrationTests, because the existing IntegrationTests harness disables Akka per M2 reality). Test: cached call that fails twice then succeeds produces 5 AuditLog rows (1 Submit + 1 Forwarded + 2 Attempted + 1 Resolve) + 1 SiteCalls row (Status=Delivered) + Tracking.Status reports Delivered. + +### Task G3: Cached DB write E2E test +File: `tests/ScadaLink.AuditLog.Tests/Integration/CachedWriteCombinedTelemetryTests.cs`. Mirror G2 for DB. + +### Task G4: Idempotency test +File: `tests/ScadaLink.AuditLog.Tests/Integration/CombinedTelemetryIdempotencyTests.cs`. Send the same packet twice; assert exactly 1 AuditLog row + 1 SiteCalls row. + +--- + +## Final cross-bundle review + merge + +Same template as M1/M2.