docs(audit): add M3 cached-operations implementation plan (#23)
7 bundles, 18 tasks. Vocabulary alignment baked in (CachedSubmit / ApiCallCached / DbWriteCached / CachedResolve, statuses Submitted / Forwarded / Attempted / Delivered / Failed / Parked / Discarded). Recommended defaults locked: new IngestCachedTelemetry RPC, separate CachedCallTelemetryForwarder reusing the SiteAuditTelemetryActor drain loop, ScriptRuntimeContext wrapper for provenance, component-level e2e in AuditLog.Tests via the extracted DirectActorSiteStreamAuditClient helper.
This commit is contained in:
212
docs/plans/2026-05-20-auditlog-m3-cached-operations.md
Normal file
212
docs/plans/2026-05-20-auditlog-m3-cached-operations.md
Normal file
@@ -0,0 +1,212 @@
|
|||||||
|
# Audit Log #23 — M3 Cached Operations + Dual-Write Implementation Plan
|
||||||
|
|
||||||
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:subagent-driven-development (bundled cadence per `feedback_subagent_cadence`).
|
||||||
|
|
||||||
|
**Goal:** Cached external calls (`ExternalSystem.CachedCall`) and cached DB writes (`Database.CachedWrite`) each produce 4+ audit rows per operation (`CachedSubmit` → `ApiCallCached`/`DbWriteCached` × N attempts with statuses `Forwarded` then `Attempted` then `Delivered`/`Failed` → `CachedResolve` terminal) AND a `SiteCalls` row at central. Combined telemetry: site emits one packet per lifecycle event carrying both the AuditEvent and the SiteCalls upsert; central writes both in one MS SQL transaction. Audit-write failure never aborts the script.
|
||||||
|
|
||||||
|
**Recommended-defaults applied:**
|
||||||
|
- Telemetry proto: **new top-level RPC `IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck)`** (sitestream.proto), separate from the M2 `IngestAuditEvents` to keep payload shapes distinct.
|
||||||
|
- Forwarder: **separate `CachedCallTelemetryForwarder`** actor (or static dispatcher hooking into the existing `SiteAuditTelemetryActor`'s SQLite queue) — write the audit row + tracking row in one SQLite transaction, then let the existing telemetry actor drain both via the new RPC. Reuse the M2 Channel/SQLite hot-path infrastructure; do NOT introduce a parallel writer.
|
||||||
|
- Provenance: mirror M2's `ScriptRuntimeContext` wrapper pattern — ScriptRuntimeContext's cached-call helpers capture instance/script/site and feed the combined packet.
|
||||||
|
- IntegrationTests E2E: same component-level pattern as M2 Bundle H (`DirectActorSiteStreamAuditClient`), but extracted into `tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/` for reuse.
|
||||||
|
|
||||||
|
**M2 realities baked in (from roadmap line 446-459):**
|
||||||
|
- Use M1 vocabulary: `AuditKind.CachedSubmit` (enqueue), `AuditKind.ApiCallCached` / `AuditKind.DbWriteCached` (each attempt + post-forward), `AuditKind.CachedResolve` (terminal). `AuditStatus.Submitted` → `Forwarded` → `Attempted` × N → `Delivered`/`Failed`/`Parked`/`Discarded`. NO `CachedEnqueued`/`CachedAttempt`/`CachedTerminal` strings appear in code (those are pre-M1 spec wording the roadmap text still mentions; honor the enum vocabulary).
|
||||||
|
- NoOpSiteStreamAuditClient still in production until M6; E2E tests use the M2 Bundle H pattern.
|
||||||
|
- AuditEventMapper duplication note from M2: M3 should move the mapper into Commons (or document the gRPC inline duplication) since M3 adds a SECOND gRPC handler with the same DTO→entity translation work.
|
||||||
|
- CachedCallTelemetry message creates from scratch (additive per Commons REQ-COM-5a) — NOT renamed to CachedOperationTelemetry.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Bundles
|
||||||
|
|
||||||
|
- **Bundle A — Commons types + tracking store** (T1, T2, T3, T4): TrackedOperationId, OperationTrackingStore, Tracking.Status API, CachedCallTelemetry message.
|
||||||
|
- **Bundle B — SiteCalls table EF + migration + repo** (T5, T6, T7).
|
||||||
|
- **Bundle C — SiteCallAudit project + actor** (T8).
|
||||||
|
- **Bundle D — Proto + central dual-write transaction** (T9, T10).
|
||||||
|
- **Bundle E — ESG / DB-gateway / S&F emissions** (T11, T12, T13, T14).
|
||||||
|
- **Bundle F — Host registration** (T15).
|
||||||
|
- **Bundle G — Integration tests** (T16, T17, T18).
|
||||||
|
|
||||||
|
Final cross-bundle reviewer + merge to main.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Bundle A — Commons types + tracking store
|
||||||
|
|
||||||
|
### Task A1: TrackedOperationId strong-typed ID
|
||||||
|
File: `src/ScadaLink.Commons/Types/TrackedOperationId.cs` — `public readonly record struct TrackedOperationId(Guid Value)`. Static `New()`, `Parse(string)`, `ToString()` returns Value.ToString("D"). Implicit conversion from Guid via `From(Guid)` (no operator implicit because record struct doesn't allow). Tests in `tests/ScadaLink.Commons.Tests/Types/TrackedOperationIdTests.cs`. Commit: `feat(commons): TrackedOperationId strong type (#23 M3)`.
|
||||||
|
|
||||||
|
### Task A2: OperationTrackingStore (site-local SQLite)
|
||||||
|
File: `src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs` — `RecordEnqueueAsync`, `RecordAttemptAsync`, `RecordTerminalAsync`, `GetStatusAsync(TrackedOperationId)`, `PurgeTerminalAsync(olderThanUtc)`.
|
||||||
|
File: `src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs` — SQLite-backed, mirror SqliteAuditWriter pattern: Channel<T> + background writer Task + write-lock. Schema:
|
||||||
|
```sql
|
||||||
|
CREATE TABLE IF NOT EXISTS OperationTracking (
|
||||||
|
TrackedOperationId TEXT NOT NULL PRIMARY KEY,
|
||||||
|
Kind TEXT NOT NULL,
|
||||||
|
TargetSummary TEXT NULL,
|
||||||
|
Status TEXT NOT NULL,
|
||||||
|
RetryCount INTEGER NOT NULL DEFAULT 0,
|
||||||
|
LastError TEXT NULL,
|
||||||
|
HttpStatus INTEGER NULL,
|
||||||
|
CreatedAtUtc TEXT NOT NULL,
|
||||||
|
UpdatedAtUtc TEXT NOT NULL,
|
||||||
|
TerminalAtUtc TEXT NULL,
|
||||||
|
SourceInstanceId TEXT NULL,
|
||||||
|
SourceScript TEXT NULL);
|
||||||
|
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated ON OperationTracking(Status, UpdatedAtUtc);
|
||||||
|
```
|
||||||
|
Tests: schema, insert+update sequence, terminal purge (only terminal rows older than threshold). Commit: `feat(siteruntime): OperationTrackingStore site-local SQLite (#23 M3)`.
|
||||||
|
|
||||||
|
### Task A3: Tracking.Status script API
|
||||||
|
File: `src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs` — add a `Tracking` accessor exposing `Status(TrackedOperationId)` reading via `IOperationTrackingStore.GetStatusAsync`. Returns a `TrackingStatusSnapshot` record (Commons/Types) with `Status`, `RetryCount`, `LastError`, `CreatedAtUtc`, `UpdatedAtUtc`, `TerminalAtUtc`. Returns null for unknown IDs.
|
||||||
|
Tests: known, unknown, terminal IDs. Commit: `feat(siteruntime): Tracking.Status script API (#23 M3)`.
|
||||||
|
|
||||||
|
### Task A4: CachedCallTelemetry Commons message
|
||||||
|
File: `src/ScadaLink.Commons/Messages/Integration/CachedCallTelemetry.cs` — `public sealed record CachedCallTelemetry(TrackedOperationId TrackedOperationId, AuditEvent Audit, SiteCallOperational Operational)` plus `SiteCallOperational` record (TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount, LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc?).
|
||||||
|
Tests: round-trip; lifecycle-specific construction (Submit/Attempted/Resolve). Commit: `feat(commons): CachedCallTelemetry combined operational+audit packet (#23 M3)`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Bundle B — SiteCalls EF + migration + repo
|
||||||
|
|
||||||
|
### Task B1: SiteCall entity + EF mapping
|
||||||
|
File: `src/ScadaLink.Commons/Entities/Audit/SiteCall.cs` — `public sealed record SiteCall` with fields per `SiteCallOperational` plus `IngestedAtUtc`.
|
||||||
|
File: `src/ScadaLink.ConfigurationDatabase/Configurations/SiteCallEntityTypeConfiguration.cs` — table `SiteCalls`, PK on `TrackedOperationId`, indexes `IX_SiteCalls_Source_Created` on (SourceSite, CreatedAtUtc), `IX_SiteCalls_Status_Updated` on (Status, UpdatedAtUtc).
|
||||||
|
Modify: `ScadaLinkDbContext.cs` — `public DbSet<SiteCall> SiteCalls => Set<SiteCall>();`.
|
||||||
|
Tests as M1 pattern. Commit: `feat(configdb): map SiteCall to SiteCalls table (#23 M3)`.
|
||||||
|
|
||||||
|
### Task B2: AddSiteCallsTable migration
|
||||||
|
Generate via `dotnet ef migrations add AddSiteCallsTable --project src/ScadaLink.ConfigurationDatabase --startup-project src/ScadaLink.Host`. No partitioning (operational state, not audit). Use MsSqlMigrationFixture for integration test. Commit: `feat(configdb): add SiteCalls migration (#23 M3)`.
|
||||||
|
|
||||||
|
### Task B3: ISiteCallAuditRepository + EF impl
|
||||||
|
File: `src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs` — `UpsertAsync(SiteCall)` with **monotonic status progression** (later status wins; earlier status is no-op), `GetAsync(TrackedOperationId)`, `QueryAsync(filter, paging)`, `PurgeTerminalAsync(olderThanUtc)`.
|
||||||
|
File: `src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs` — implement via `MERGE` or `INSERT ... WHERE NOT EXISTS` + `UPDATE WHERE TerminalAtUtc IS NULL AND <monotonic order check>`. Tests use MsSqlMigrationFixture. Commit: `feat(configdb): ISiteCallAuditRepository + EF impl (#23 M3)`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Bundle C — SiteCallAudit project + actor
|
||||||
|
|
||||||
|
### Task C1: ScadaLink.SiteCallAudit project + actor
|
||||||
|
Create: `src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj` (mirrors ScadaLink.AuditLog csproj style — net10.0, references Commons + ConfigurationDatabase).
|
||||||
|
Create: `src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs` — central singleton actor handling `UpsertSiteCallCommand(SiteCall siteCall)` by calling `ISiteCallAuditRepository.UpsertAsync` (scope-per-message via IServiceProvider, mirror AuditLogIngestActor). Idempotent via repo's monotonic upsert.
|
||||||
|
Create: `src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs` — `AddSiteCallAudit()` registering actor props factory.
|
||||||
|
Create: `tests/ScadaLink.SiteCallAudit.Tests/` project.
|
||||||
|
Modify: `ScadaLink.slnx` — add src + tests entries.
|
||||||
|
Commit: `feat(scaudit): SiteCallAuditActor minimum surface (#22, #23 M3)`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Bundle D — Proto + central dual-write transaction
|
||||||
|
|
||||||
|
### Task D1: Extend sitestream.proto with IngestCachedTelemetry RPC
|
||||||
|
Follow the documented protobuf regen procedure from M2 Bundle C (temporarily uncomment ItemGroup, build, copy back, recomment). Add:
|
||||||
|
```proto
|
||||||
|
message SiteCallOperationalDto {
|
||||||
|
string tracked_operation_id = 1;
|
||||||
|
string channel = 2;
|
||||||
|
string target = 3;
|
||||||
|
string source_site = 4;
|
||||||
|
string status = 5;
|
||||||
|
int32 retry_count = 6;
|
||||||
|
string last_error = 7;
|
||||||
|
google.protobuf.Int32Value http_status = 8;
|
||||||
|
google.protobuf.Timestamp created_at_utc = 9;
|
||||||
|
google.protobuf.Timestamp updated_at_utc = 10;
|
||||||
|
google.protobuf.Timestamp terminal_at_utc = 11; // null when active
|
||||||
|
}
|
||||||
|
message CachedTelemetryPacket {
|
||||||
|
AuditEventDto audit_event = 1;
|
||||||
|
SiteCallOperationalDto operational = 2;
|
||||||
|
}
|
||||||
|
message CachedTelemetryBatch { repeated CachedTelemetryPacket packets = 1; }
|
||||||
|
|
||||||
|
service SiteStreamService {
|
||||||
|
rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
Test round-trips. Commit: `feat(comms): IngestCachedTelemetry RPC + CachedTelemetryPacket proto (#23 M3)`.
|
||||||
|
|
||||||
|
### Task D2: Dual-write transaction in AuditLogIngestActor
|
||||||
|
File: `src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs` (extend) — add `IngestCachedTelemetryCommand` handler. Inside one `DbContext.Database.BeginTransactionAsync()`:
|
||||||
|
1. Call `IAuditLogRepository.InsertIfNotExistsAsync(auditEvent)` (idempotent already from M2 Bundle A).
|
||||||
|
2. Call `ISiteCallAuditRepository.UpsertAsync(siteCallOperational)` (monotonic).
|
||||||
|
3. Commit on both-success; rollback on either-throw (the central singleton SUPERVISES — actor doesn't crash).
|
||||||
|
4. Reply `IngestAck(acceptedIds)`.
|
||||||
|
|
||||||
|
Modify: `src/ScadaLink.Communication/SiteStreamGrpc/SiteStreamGrpcServer.cs` — implement `IngestCachedTelemetry` gRPC handler routing to actor. Same inline FromDto pattern as M2 (move to mapper if time permits per M2 reviewer recommendation).
|
||||||
|
|
||||||
|
Add: `src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryCommand.cs` and `IngestCachedTelemetryReply.cs` (Akka messages).
|
||||||
|
|
||||||
|
Tests:
|
||||||
|
- Single packet → 1 AuditLog + 1 SiteCalls row.
|
||||||
|
- Duplicate `EventId` + same status → AuditLog no-op, SiteCalls no-op (monotonic), no error.
|
||||||
|
- Duplicate `EventId` + ADVANCED status → AuditLog no-op, SiteCalls updates.
|
||||||
|
- SiteCalls upsert throws → AuditLog rolled back (no orphan).
|
||||||
|
- AuditLog throws (non-duplicate) → SiteCalls rolled back.
|
||||||
|
|
||||||
|
Commit: `feat(auditlog): combined telemetry dual-write transaction (#23 M3)`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Bundle E — ESG / DB / S&F lifecycle emissions
|
||||||
|
|
||||||
|
### Task E1: ScriptRuntimeContext.ExternalSystem.CachedCall wrapper
|
||||||
|
Mirror M2 Bundle F's `Call` wrapper. Differences:
|
||||||
|
- Emit on enqueue: AuditEvent(Kind=CachedSubmit, Status=Submitted) + SiteCallOperational(Status=Submitted, RetryCount=0).
|
||||||
|
- Calls `_externalSystemClient.CachedCallAsync` (resolves what S&F existing API surface looks like — discover by reading ExternalSystemClient).
|
||||||
|
- Returns a `TrackedOperationId` immediately (a TrackedOperationId tracking handle).
|
||||||
|
- Hands the operation to the existing StoreAndForward retry loop.
|
||||||
|
|
||||||
|
For the per-attempt + terminal emissions, hook into the S&F dispatch loop (Bundle E2/E3).
|
||||||
|
|
||||||
|
### Task E2: S&F retry-loop emission
|
||||||
|
Find the S&F retry-attempt callback site in `src/ScadaLink.StoreAndForward/`. On each attempt (success/transient/permanent):
|
||||||
|
- Build AuditEvent(Kind=ApiCallCached or DbWriteCached, Status=Attempted).
|
||||||
|
- Build SiteCallOperational(Status=Attempted, RetryCount=N, LastError, HttpStatus).
|
||||||
|
- Hand to `CachedCallTelemetryForwarder` which writes both to SQLite (AuditLog + OperationTracking tables, in one SQLite transaction) and lets SiteAuditTelemetryActor's drain loop push them.
|
||||||
|
|
||||||
|
### Task E3: S&F terminal-state emission
|
||||||
|
On final state transition (Delivered / Failed / Parked / Discarded):
|
||||||
|
- Build AuditEvent(Kind=CachedResolve, Status={final state}).
|
||||||
|
- Build SiteCallOperational(Status={final state}, TerminalAtUtc=DateTime.UtcNow).
|
||||||
|
- Forward.
|
||||||
|
|
||||||
|
### Task E4: Database.CachedWrite mirror
|
||||||
|
Same three-event pattern but Channel=DbOutbound, Kind=DbWriteCached for attempts, Kind=CachedSubmit for enqueue, Kind=CachedResolve for terminal.
|
||||||
|
|
||||||
|
Tests in ExternalSystemGateway.Tests + StoreAndForward.Tests.
|
||||||
|
|
||||||
|
Commit (bundle-level): one commit per task, descriptive messages following M2 style.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Bundle F — Host registration
|
||||||
|
|
||||||
|
### Task F1: Register SiteCallAuditActor central singleton
|
||||||
|
File: `src/ScadaLink.Host/Actors/AkkaHostedService.cs` — register `SiteCallAuditActor` central singleton + proxy alongside `AuditLogIngestActor`. Hand the proxy to `SiteStreamGrpcServer.SetSiteCallAuditActor(proxy)` (mirroring `SetAuditIngestActor`).
|
||||||
|
File: `src/ScadaLink.Host/Program.cs` — call `.AddSiteCallAudit()` on the central role's services.
|
||||||
|
Tests in `tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs` (extend).
|
||||||
|
Commit: `feat(host): register SiteCallAuditActor central singleton (#22, #23 M3)`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Bundle G — Integration tests
|
||||||
|
|
||||||
|
### Task G1: Extract DirectActorSiteStreamAuditClient to shared infrastructure
|
||||||
|
Move from `tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs` private inner class into `tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/DirectActorSiteStreamAuditClient.cs`. Extend to also implement the new `IngestCachedTelemetryAsync` method (mirror pattern).
|
||||||
|
|
||||||
|
### Task G2: Cached call E2E test
|
||||||
|
File: `tests/ScadaLink.AuditLog.Tests/Integration/CachedCallCombinedTelemetryTests.cs` (use AuditLog.Tests, not IntegrationTests, because the existing IntegrationTests harness disables Akka per M2 reality). Test: cached call that fails twice then succeeds produces 5 AuditLog rows (1 Submit + 1 Forwarded + 2 Attempted + 1 Resolve) + 1 SiteCalls row (Status=Delivered) + Tracking.Status reports Delivered.
|
||||||
|
|
||||||
|
### Task G3: Cached DB write E2E test
|
||||||
|
File: `tests/ScadaLink.AuditLog.Tests/Integration/CachedWriteCombinedTelemetryTests.cs`. Mirror G2 for DB.
|
||||||
|
|
||||||
|
### Task G4: Idempotency test
|
||||||
|
File: `tests/ScadaLink.AuditLog.Tests/Integration/CombinedTelemetryIdempotencyTests.cs`. Send the same packet twice; assert exactly 1 AuditLog row + 1 SiteCalls row.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Final cross-bundle review + merge
|
||||||
|
|
||||||
|
Same template as M1/M2.
|
||||||
Reference in New Issue
Block a user