8 bundles (A: race-fix + tiebreaker, B: SQLite writer + ring fallback, C: gRPC proto + mapper, D: telemetry actor + ingest actor + gRPC handler, E: host wiring, F: ESG audit emission via ScriptRuntimeContext wrapper, G: SiteAuditWriteFailures health metric, H: component-level e2e test). Brainstorm decisions locked: provenance via ScriptRuntimeContext wrapper, push-primary telemetry, component-level e2e (no factory expansion), mirror SiteEventLogger Channel<T> pattern for SqliteAuditWriter.
409 lines
23 KiB
Markdown
409 lines
23 KiB
Markdown
# Audit Log #23 — M2 Site Pipeline (sync-only) Implementation Plan
|
||
|
||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:subagent-driven-development (bundled cadence per `feedback_subagent_cadence`).
|
||
|
||
**Goal:** First end-to-end audit emission. A script-initiated `ExternalSystem.Call()` produces exactly one `ApiOutbound`/`ApiCall` row in the central `AuditLog` table via site SQLite hot-path + gRPC push telemetry + central ingest actor. Audit-write failures NEVER abort the script.
|
||
|
||
**Architecture (decisions locked):**
|
||
- Provenance: **Wrap CallAsync in ScriptRuntimeContext** — IExternalSystemClient.CallAsync signature unchanged; ScriptRuntimeContext.ExternalSystem.Call captures instance/script/site and emits the AuditEvent via IAuditWriter.
|
||
- Direction: **Push primary** — SiteAuditTelemetryActor batches Pending rows and pushes via a new `IngestAuditEvents` unary gRPC RPC on `sitestream.proto`. Pull (reconciliation) deferred to M6.
|
||
- E2E: **Component-level test** via TestKit + MSSQL fixture; stubbed gRPC client forwards directly to the central ingest actor. No expansion of `ScadaLinkWebApplicationFactory`.
|
||
- Site writer: **Mirror SiteEventLogger** — `Channel<PendingAuditEvent>` + background writer Task for sub-ms enqueue durability.
|
||
|
||
**M1 realities baked in:**
|
||
- Enum vocabulary: `AuditKind.ApiCall` for sync API call; `AuditStatus.Delivered` for success, `AuditStatus.Failed` for HTTP non-2xx (permanent OR transient → both Failed for a sync call; cached path differs in M3). The "Status=Success/TransientFailure/PermanentFailure" wording in the roadmap is stale and must be replaced with the new vocabulary.
|
||
- `AuditLogRepository.InsertIfNotExistsAsync` race window — M2 is the first concurrent writer; harden it before AuditLogIngestActor lands.
|
||
- Keyset tiebreaker test gap from Bundle D — add a same-OccurredAt test in M2.
|
||
- `MsSqlMigrationFixture` reusable as-is; promoted to `[CollectionDefinition]`-shared if multiple test classes need it (defer until actually needed).
|
||
- `Xunit.SkippableFact` + `Skip.IfNot(_fixture.Available, _fixture.SkipReason)` for any MSSQL-dependent tests.
|
||
- `ScadaLink.AuditLog/Site/` and `ScadaLink.AuditLog/Central/` and `ScadaLink.AuditLog/Telemetry/` subfolders. DI extension `AddAuditLog` is the registration point.
|
||
|
||
**Tech stack additions:**
|
||
- `Microsoft.Data.Sqlite 10.0.7` (pinned).
|
||
- `Akka.TestKit.Xunit2 1.5.62` (pinned).
|
||
- `Grpc.Tools` already configured in `ScadaLink.Communication.csproj`.
|
||
|
||
---
|
||
|
||
## Bundles
|
||
|
||
- **Bundle A — Repo race-fix + tiebreaker test** (M1 realities catch-up).
|
||
- **Bundle B — Site SQLite writer + fallback** (M2-T1, T2, T3, T4).
|
||
- **Bundle C — gRPC proto + mapper** (M2-T5, T6).
|
||
- **Bundle D — Telemetry actor + ingest actor + gRPC handler** (M2-T7, T8).
|
||
- **Bundle E — Host wiring** (M2-T9).
|
||
- **Bundle F — ESG emission via ScriptRuntimeContext wrapper** (M2-T10).
|
||
- **Bundle G — Health metric SiteAuditWriteFailures** (M2-T11).
|
||
- **Bundle H — Component-level integration test** (M2-T12).
|
||
|
||
Final cross-bundle reviewer pass, then merge + roadmap update.
|
||
|
||
---
|
||
|
||
## Bundle A — Repo race-fix + keyset tiebreaker test
|
||
|
||
### Task A1: Harden `InsertIfNotExistsAsync` against duplicate-key race
|
||
|
||
**Files:**
|
||
- Modify: `src/ScadaLink.ConfigurationDatabase/Repositories/AuditLogRepository.cs:30-60` — wrap the `ExecuteSqlInterpolatedAsync` call in a `try/catch Microsoft.Data.SqlClient.SqlException` that swallows error numbers 2601 and 2627 (unique-index violation on `UX_AuditLog_EventId`) and logs at Debug. Other SqlExceptions rethrow.
|
||
- Modify: `tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/AuditLogRepositoryTests.cs` — add:
|
||
- `InsertIfNotExistsAsync_ConcurrentDuplicateInserts_ProduceExactlyOneRow` — fire 50 parallel `InsertIfNotExistsAsync` calls with the same `EventId`, assert row count = 1 and no exception escapes.
|
||
- `QueryAsync_Keyset_SameOccurredAtUtc_TiebreaksOnEventId` — Bundle D reviewer's deferred recommendation. Insert 4 rows with identical OccurredAtUtc but distinct EventIds; page through them with PageSize=2; assert no overlap, correct count, and that the second page's first row's EventId is strictly less than the first page's last row's EventId.
|
||
|
||
**Steps:**
|
||
1. Write failing concurrency test.
|
||
2. Run: expect SqlException 2601/2627 OR identical-row-count violation.
|
||
3. Add try/catch in the repo.
|
||
4. Run: pass.
|
||
5. Write failing keyset-tiebreaker test.
|
||
6. Run: depending on EF Core 10's Guid.CompareTo translation, this may already pass — confirm.
|
||
7. If passing, the test locks in the behavior; commit anyway.
|
||
8. Commit: `fix(configdb): InsertIfNotExistsAsync swallows duplicate-key races + add keyset tiebreaker test (#23)`.
|
||
|
||
**Bundle A acceptance:** All ConfigurationDatabase.Tests still green; 2 new tests pass.
|
||
|
||
---
|
||
|
||
## Bundle B — Site SQLite writer + fallback (mirror SiteEventLogger pattern)
|
||
|
||
### Task B1: `SqliteAuditWriter` — schema + connection bootstrap
|
||
|
||
**Files:**
|
||
- Create: `src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs` — implements `IAuditWriter` per Bundle A's signature (single `Task WriteAsync(AuditEvent evt, CancellationToken ct = default)`). Constructor takes `IOptions<SqliteAuditWriterOptions>` + `ILogger`. Single `SqliteConnection` opened at construction (`Data Source={path};Cache=Shared`). Sync `_writeLock` Monitor-pattern (mirrors `SiteEventLogger.cs:32`). Inline `InitializeSchema()` runs `PRAGMA auto_vacuum = INCREMENTAL` + `CREATE TABLE IF NOT EXISTS AuditLog (...)`.
|
||
- Create: `src/ScadaLink.AuditLog/Site/SqliteAuditWriterOptions.cs` — `string DatabasePath = "auditlog.db"`, `int ChannelCapacity = 4096` (bounded; drop-oldest applies in Bundle B-T3 ring overflow, but the writer's pending channel is bounded as a safety net), `int BatchSize = 256`, `int FlushIntervalMs = 50`.
|
||
- Create: `tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterSchemaTests.cs`.
|
||
|
||
**Schema (20 site columns + ForwardState — IngestedAtUtc is central-only):**
|
||
|
||
```sql
|
||
CREATE TABLE IF NOT EXISTS AuditLog (
|
||
EventId TEXT NOT NULL,
|
||
OccurredAtUtc TEXT NOT NULL,
|
||
Channel TEXT NOT NULL,
|
||
Kind TEXT NOT NULL,
|
||
CorrelationId TEXT NULL,
|
||
SourceSiteId TEXT NULL,
|
||
SourceInstanceId TEXT NULL,
|
||
SourceScript TEXT NULL,
|
||
Actor TEXT NULL,
|
||
Target TEXT NULL,
|
||
Status TEXT NOT NULL,
|
||
HttpStatus INTEGER NULL,
|
||
DurationMs INTEGER NULL,
|
||
ErrorMessage TEXT NULL,
|
||
ErrorDetail TEXT NULL,
|
||
RequestSummary TEXT NULL,
|
||
ResponseSummary TEXT NULL,
|
||
PayloadTruncated INTEGER NOT NULL,
|
||
Extra TEXT NULL,
|
||
ForwardState TEXT NOT NULL,
|
||
PRIMARY KEY (EventId)
|
||
);
|
||
CREATE INDEX IF NOT EXISTS IX_SiteAuditLog_ForwardState_Occurred
|
||
ON AuditLog (ForwardState, OccurredAtUtc);
|
||
```
|
||
|
||
**Tests:**
|
||
1. `Opens_Creates_AuditLog_Table_With_All_Columns_And_PK`
|
||
2. `Opens_Creates_IX_ForwardState_Occurred_Index`
|
||
3. `PRAGMA_auto_vacuum_Is_INCREMENTAL`
|
||
|
||
**Steps:**
|
||
1. Failing test asserts table + PK + 20 columns + index via `PRAGMA table_info(AuditLog)` + `PRAGMA index_list(AuditLog)`.
|
||
2. Implement constructor + InitializeSchema with inline SQL.
|
||
3. Run: pass.
|
||
4. Commit: `feat(auditlog): SqliteAuditWriter schema bootstrap (#23)`.
|
||
|
||
### Task B2: `SqliteAuditWriter` — Channel<T> + background writer for hot-path
|
||
|
||
**Files:**
|
||
- Modify: `src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs` — add `Channel<PendingAuditEvent> _writeQueue` (bounded BoundedChannelFullMode.Wait, default capacity 4096), background `Task ProcessWriteQueueAsync()` launched in constructor. `WriteAsync` enqueues + returns the pending's `TaskCompletionSource`. The loop reads up to `BatchSize`, opens a transaction, INSERTs all events, commits, completes the TCS for each.
|
||
- Pattern mirrors `src/ScadaLink.SiteEventLogging/SiteEventLogger.cs:135-173`.
|
||
- Test: `tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs`.
|
||
|
||
**Tests:**
|
||
1. `WriteAsync_FreshEvent_PersistsWithForwardStatePending` — write one event, query SQLite, assert row has `ForwardState='Pending'`.
|
||
2. `WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions` — fire 1000 parallel WriteAsync, assert row count = 1000 and zero exceptions surface.
|
||
3. `WriteAsync_LatencyP99_LessThan_5ms_For_Enqueue` — assert TCS Task.IsCompleted within reasonable time AFTER awaiting, but the enqueue itself returns near-instantly (verify via a stopwatch around the Channel.Writer.TryWriteAsync).
|
||
4. `WriteAsync_DuplicateEventId_FirstWriteWins_NoException` — insert same EventId twice, assert one row only and no exception (the PRIMARY KEY violation is caught/swallowed in the writer loop).
|
||
|
||
**Steps:**
|
||
1. Failing tests for 1, 2, 4.
|
||
2. Implement Channel + background loop + transactional batch INSERT.
|
||
3. Run: pass.
|
||
4. Commit: `feat(auditlog): SqliteAuditWriter Channel-based hot-path write (#23)`.
|
||
|
||
### Task B3: `RingBufferFallback`
|
||
|
||
**Files:**
|
||
- Create: `src/ScadaLink.AuditLog/Site/RingBufferFallback.cs` — `Channel<AuditEvent>` bounded at 1024 with `BoundedChannelFullMode.DropOldest`. Exposes `bool TryEnqueue(AuditEvent)`, `IAsyncEnumerable<AuditEvent> DrainAsync(CancellationToken)`, and an event `RingBufferOverflowed` (callback for the health counter).
|
||
- Test: `tests/ScadaLink.AuditLog.Tests/Site/RingBufferFallbackTests.cs`.
|
||
|
||
**Tests:**
|
||
1. `Enqueue_1025_Into_1024Cap_Ring_DropsOldest_AndRaisesOverflow` — invoke 1025 enqueues, assert the OverflowEvent counter increments once, and the surviving 1024 are the latest.
|
||
2. `DrainAsync_Yields_FIFO_Then_Completes_When_Empty`.
|
||
|
||
**Steps:**
|
||
1. Failing tests.
|
||
2. Implement using `Channel.CreateBounded<AuditEvent>(new BoundedChannelOptions(1024) { FullMode = BoundedChannelFullMode.DropOldest })`.
|
||
3. Run: pass.
|
||
4. Commit: `feat(auditlog): RingBufferFallback with drop-oldest overflow (#23)`.
|
||
|
||
### Task B4: `FallbackAuditWriter` — compose primary + ring
|
||
|
||
**Files:**
|
||
- Create: `src/ScadaLink.AuditLog/Site/FallbackAuditWriter.cs` — implements `IAuditWriter`. Constructor takes the primary `SqliteAuditWriter` + `RingBufferFallback` + `IAuditWriteFailureCounter` (lightweight DI'd interface, Bundle G implements it as `SiteAuditWriteFailures` counter on health metrics). On primary success: returns. On primary throw: increments counter, enqueues into ring (DropOldest), returns success. On the NEXT successful primary call (success after a failure window), drains the ring back through the primary.
|
||
- Test: `tests/ScadaLink.AuditLog.Tests/Site/FallbackAuditWriterTests.cs`.
|
||
|
||
**Tests:**
|
||
1. `WriteAsync_PrimaryThrows_EventLandsInRing_CallReturnsSuccess`.
|
||
2. `WriteAsync_PrimaryRecovers_RingDrains_InFIFOOrder_OnNextWrite`.
|
||
3. `WriteAsync_PrimaryAlwaysSucceeds_Ring_StaysEmpty`.
|
||
|
||
**Steps:**
|
||
1. Failing tests.
|
||
2. Implement; mock the primary with a `Func<AuditEvent, Task>` flip-switch failure.
|
||
3. Run: pass.
|
||
4. Commit: `feat(auditlog): FallbackAuditWriter compose SQLite + ring (#23)`.
|
||
|
||
**Bundle B acceptance:** 4 tasks merged. `ScadaLink.AuditLog.Tests` adds ~12+ tests. No regressions.
|
||
|
||
---
|
||
|
||
## Bundle C — gRPC proto + mapper
|
||
|
||
### Task C1: Extend `sitestream.proto` with `IngestAuditEvents`
|
||
|
||
**Files:**
|
||
- Modify: `src/ScadaLink.Communication/Protos/sitestream.proto` — add the messages and unary RPC. Use `google.protobuf.Timestamp` for `OccurredAtUtc`; encode enums as `string` (matches the EF mapping).
|
||
|
||
Proposed addition:
|
||
```proto
|
||
message AuditEventDto {
|
||
string event_id = 1;
|
||
google.protobuf.Timestamp occurred_at_utc = 2;
|
||
string channel = 3;
|
||
string kind = 4;
|
||
string correlation_id = 5; // empty string when null
|
||
string source_site_id = 6;
|
||
string source_instance_id = 7;
|
||
string source_script = 8;
|
||
string actor = 9;
|
||
string target = 10;
|
||
string status = 11;
|
||
google.protobuf.Int32Value http_status = 12;
|
||
google.protobuf.Int32Value duration_ms = 13;
|
||
string error_message = 14;
|
||
string error_detail = 15;
|
||
string request_summary = 16;
|
||
string response_summary = 17;
|
||
bool payload_truncated = 18;
|
||
string extra = 19;
|
||
}
|
||
message AuditEventBatch { repeated AuditEventDto events = 1; }
|
||
message IngestAck { repeated string accepted_event_ids = 1; }
|
||
|
||
service SiteStreamService {
|
||
// existing rpcs...
|
||
rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
|
||
}
|
||
```
|
||
|
||
(Use `google.protobuf.Int32Value` to encode nullable ints; empty string semantics for nullable text fields.)
|
||
|
||
- Test: `tests/ScadaLink.Communication.Tests/Protos/AuditEventProtoTests.cs`.
|
||
|
||
**Steps:**
|
||
1. Edit proto + rebuild (`dotnet build src/ScadaLink.Communication/`).
|
||
2. Failing test round-trips an `AuditEventDto` through `ToByteArray()` and `Parser.ParseFrom()`; asserts all populated fields survive.
|
||
3. Run: pass.
|
||
4. Commit: `feat(comms): IngestAuditEvents RPC + AuditEventDto proto (#23)`.
|
||
|
||
### Task C2: `AuditEvent` ↔ `AuditEventDto` mapper
|
||
|
||
**Files:**
|
||
- Create: `src/ScadaLink.AuditLog/Telemetry/AuditEventMapper.cs` — static `ToDto(AuditEvent)` and `FromDto(AuditEventDto)`. Handles nullable→empty-string, Timestamp↔DateTime UTC, enum↔string. ForwardState NOT carried in the proto (site-local only; central never sees it).
|
||
- Test: `tests/ScadaLink.AuditLog.Tests/Telemetry/AuditEventMapperTests.cs`.
|
||
|
||
**Tests:**
|
||
1. `Roundtrip_FullyPopulated_PreservesAllFields`.
|
||
2. `Roundtrip_AllNullableFieldsNull_ProducesEmptyDtoFields`.
|
||
3. `FromDto_EmptyOptionalString_BecomesNullProperty`.
|
||
4. `ToDto_Sets_OccurredAtUtc_As_UtcTimestamp` — Round-trip with `DateTimeKind.Utc` preserved.
|
||
|
||
**Steps:**
|
||
1. Failing tests.
|
||
2. Implement.
|
||
3. Run: pass.
|
||
4. Commit: `feat(auditlog): AuditEvent ↔ proto mapper (#23)`.
|
||
|
||
**Bundle C acceptance:** Communication.Tests + AuditLog.Tests still green; proto rebuilds cleanly.
|
||
|
||
---
|
||
|
||
## Bundle D — SiteAuditTelemetryActor + AuditLogIngestActor + gRPC handler
|
||
|
||
### Task D1: `SiteAuditTelemetryActor` — drain loop
|
||
|
||
**Files:**
|
||
- Create: `src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs` — `ReceiveActor`. On `Drain`: queries `SqliteAuditWriter.ReadPendingAsync(BatchSize)`, calls `gRPC client.IngestAuditEventsAsync(batch)`, on ack flips returned EventIds to `Forwarded` via `SqliteAuditWriter.MarkForwardedAsync(eventIds)`. Re-schedules `Drain` self-tick: 5s if ≥1 row drained, 30s otherwise. On gRPC error: re-schedule 5s; rows stay Pending.
|
||
- Modify: `src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs` — add `ReadPendingAsync(int limit, CancellationToken)` returning `IReadOnlyList<AuditEvent>` (with ForwardState=Pending), and `MarkForwardedAsync(IReadOnlyList<Guid> eventIds, CancellationToken)`.
|
||
- Create: `src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryOptions.cs` — `BatchSize=256`, `BusyIntervalSeconds=5`, `IdleIntervalSeconds=30`.
|
||
- Test: `tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs` using `TestKit` + NSubstitute-mocked gRPC client.
|
||
|
||
**Tests:**
|
||
1. `Drain_With_50PendingRows_Sends_OneBatch_Of_50`.
|
||
2. `Drain_Ack_Flips_Rows_To_Forwarded`.
|
||
3. `Drain_GrpcThrows_Rows_StayPending_NextTick_Retries`.
|
||
4. `Drain_Cadence_5s_AfterNonZero_30s_AfterZero` (via `TestScheduler`).
|
||
|
||
**Steps:**
|
||
1. Failing tests.
|
||
2. Implement.
|
||
3. Run: pass.
|
||
4. Commit: `feat(auditlog): SiteAuditTelemetryActor drain loop (#23)`.
|
||
|
||
### Task D2: `AuditLogIngestActor` + gRPC server handler
|
||
|
||
**Files:**
|
||
- Create: `src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs` — `ReceiveActor` accepting `IngestAuditEventsCommand(IReadOnlyList<AuditEvent> events, IActorRef replyTo)`. For each event, calls `IAuditLogRepository.InsertIfNotExistsAsync` (which now swallows duplicates per Bundle A). Sets `IngestedAtUtc = DateTime.UtcNow` before insert (this is the central-side timestamp). Replies with `IngestAck(acceptedEventIds)` — by spec "accepted" includes already-existed rows (idempotent semantics).
|
||
- Create: `src/ScadaLink.AuditLog/Central/IngestAuditEventsCommand.cs` (Akka message).
|
||
- Create: `src/ScadaLink.AuditLog/Central/IngestAck.cs` (Akka reply).
|
||
- Modify: `src/ScadaLink.Communication/SiteStreamGrpc/SiteStreamGrpcServer.cs` — implement `public override async Task<IngestAck> IngestAuditEvents(AuditEventBatch request, ServerCallContext context)` — Ask the central `AuditLogIngestActor` proxy with the deserialized batch, await reply, return.
|
||
- Modify: `src/ScadaLink.Communication/SiteStreamGrpc/SiteStreamGrpcServer.cs` — add a setter `SetAuditIngestActor(IActorRef)` mirroring how `SetNotificationOutbox` is wired (per recon: Notification Outbox proxy is handed in via `commService?.SetNotificationOutbox(outboxProxy)`).
|
||
- Test: `tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs`.
|
||
- Test: `tests/ScadaLink.Communication.Tests/SiteStreamIngestAuditEventsTests.cs`.
|
||
|
||
**Tests:**
|
||
1. `Receive_BatchOf5_Calls_Repo_5Times_Acks_All`.
|
||
2. `Receive_BatchWith_AlreadyExistingEvent_AcksAll_NoDoubleInsert` (idempotent).
|
||
3. `Receive_RepoThrowsTransient_Replies_AckExcludingFailedEventIds_LogsError` (partial-failure semantics — what gets acked is what was persisted).
|
||
4. `Receive_Sets_IngestedAtUtc_Before_Insert`.
|
||
5. `gRPC_Handler_Routes_To_Actor_Returns_Reply`.
|
||
|
||
**Steps:**
|
||
1. Failing tests.
|
||
2. Implement actor + gRPC handler.
|
||
3. Run: pass.
|
||
4. Commit: `feat(auditlog): AuditLogIngestActor + gRPC handler (#23)`.
|
||
|
||
**Bundle D acceptance:** New actor + gRPC handler tests all green.
|
||
|
||
---
|
||
|
||
## Bundle E — Host wiring (central singleton + site actor + dispatcher)
|
||
|
||
### Task E1: Register `AuditLogIngestActor` + `SiteAuditTelemetryActor` + dispatcher
|
||
|
||
**Files:**
|
||
- Modify: `src/ScadaLink.Host/Actors/AkkaHostedService.cs` — mirror the Notification Outbox pattern (recon report's exact lines 272-295):
|
||
- Central role: `AuditLogIngestActor` as `ClusterSingletonManager` (singleton name `"audit-log-ingest"`) + `ClusterSingletonProxy` (`"audit-log-ingest-proxy"`). Hand the proxy to `SiteStreamGrpcServer.SetAuditIngestActor(proxy)`.
|
||
- Site role: `SiteAuditTelemetryActor` as a per-site actor (`actorSystem.ActorOf(Props.Create(...)`), bound to the dedicated dispatcher (below).
|
||
- Modify: HOCON in `src/ScadaLink.Host/Configuration/` (the existing akka config file) — add:
|
||
```
|
||
audit-telemetry-dispatcher {
|
||
type = ForkJoinDispatcher
|
||
throughput = 100
|
||
dedicated-thread-pool { thread-count = 2 }
|
||
}
|
||
```
|
||
Apply `.WithDispatcher("audit-telemetry-dispatcher")` to `SiteAuditTelemetryActor`'s Props.
|
||
- Modify: `src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs:AddAuditLog` — register the SqliteAuditWriter+RingBufferFallback+FallbackAuditWriter chain and the actor factories.
|
||
- Test: `tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs`.
|
||
|
||
**Tests:**
|
||
1. `Central_Host_Starts_With_AuditLogIngest_Singleton_Healthy`.
|
||
2. `Site_Host_Starts_With_SiteAuditTelemetry_Bound_To_DedicatedDispatcher`.
|
||
3. `AuditWriter_Resolves_From_DI_To_FallbackAuditWriter`.
|
||
|
||
**Steps:**
|
||
1. Failing tests against current host (which doesn't wire audit).
|
||
2. Implement wiring.
|
||
3. Run: pass.
|
||
4. Commit: `feat(host): register Audit Log #23 singletons with dedicated dispatcher`.
|
||
|
||
**Bundle E acceptance:** Host.Tests still green; 3 new tests pass.
|
||
|
||
---
|
||
|
||
## Bundle F — ESG audit emission via ScriptRuntimeContext wrapper
|
||
|
||
### Task F1: Wrap `ExternalSystem.Call` in `ScriptRuntimeContext` to emit audit
|
||
|
||
**Files:**
|
||
- Modify: `src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs` — find the existing `ExternalSystem.Call` method (or add one if scripts call through a dynamic API surface). Inside, after `_externalSystemClient.CallAsync(...)` returns OR throws, build the `AuditEvent` (channel=`ApiOutbound`, kind=`ApiCall`, status=`Delivered` for success, `Failed` for HTTP non-2xx or exception, populate `Target=$"{systemName}.{methodName}"`, `SourceSiteId={siteId}`, `SourceInstanceId={instanceName}`, `SourceScript={sourceScript}`, `DurationMs={stopwatch}`, `HttpStatus`, `ErrorMessage`). Call `_auditWriter.WriteAsync(evt)` inside a try/catch that swallows + logs at Warning + increments `SiteAuditWriteFailures` (via the same counter Bundle G defines). Re-throw the original ExternalSystem exception (if any) so the script sees its original error path unchanged.
|
||
- Modify: `src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs` constructor — inject `IAuditWriter`.
|
||
- Modify: `src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs` — resolve and pass `IAuditWriter` into the ScriptRuntimeContext.
|
||
- Test: `tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCallAuditEmissionTests.cs`.
|
||
|
||
**Tests:**
|
||
1. `Call_Success_EmitsOneEvent_Channel_ApiOutbound_Kind_ApiCall_Status_Delivered`.
|
||
2. `Call_HTTP500_EmitsEvent_Status_Failed_HttpStatus_500_ErrorMessage_Set`.
|
||
3. `Call_HTTP400_EmitsEvent_Status_Failed_HttpStatus_400`.
|
||
4. `Call_ClientThrows_NetworkError_EmitsEvent_Status_Failed_ErrorMessage_SetFromException`.
|
||
5. `AuditWriter_Throws_Script_Call_Returns_Original_Result_Unchanged_Audit_Failure_Counter_Incremented`.
|
||
6. `Provenance_Populated_FromContext` — SourceInstanceId, SourceScript, SourceSiteId all match the ScriptRuntimeContext's values.
|
||
|
||
**Steps:**
|
||
1. Failing tests.
|
||
2. Implement wrapper + provenance threading.
|
||
3. Run: pass.
|
||
4. Commit: `feat(siteruntime): ExternalSystem.Call emits Audit Log #23 event on every sync call`.
|
||
|
||
**Bundle F acceptance:** SiteRuntime.Tests still green; 6 new tests.
|
||
|
||
---
|
||
|
||
## Bundle G — Health metric `SiteAuditWriteFailures`
|
||
|
||
### Task G1: Counter + DI surface
|
||
|
||
**Files:**
|
||
- Create: `src/ScadaLink.AuditLog/Site/IAuditWriteFailureCounter.cs` — `void Increment();`. Bundle B's `FallbackAuditWriter` already takes this.
|
||
- Modify: `src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs` — add `int _siteAuditWriteFailures` field + `IncrementSiteAuditWriteFailures()` method using `Interlocked.Increment`. Expose via a snapshot read.
|
||
- Modify: `src/ScadaLink.HealthMonitoring/SiteHealthState.cs` — add `SiteAuditWriteFailures` property to the report payload.
|
||
- Implementation: a small adapter class `HealthMetricsAuditWriteFailureCounter : IAuditWriteFailureCounter` registered in DI that bridges to `ISiteHealthCollector.IncrementSiteAuditWriteFailures()`.
|
||
- Test: `tests/ScadaLink.HealthMonitoring.Tests/SiteAuditWriteFailuresMetricTests.cs`.
|
||
|
||
**Tests:**
|
||
1. `Increment_Three_Times_Counter_Reports_3`.
|
||
2. `Report_Payload_Includes_SiteAuditWriteFailures`.
|
||
|
||
**Steps:**
|
||
1. Failing tests.
|
||
2. Implement counter + adapter + DI registration.
|
||
3. Run: pass.
|
||
4. Commit: `feat(health): SiteAuditWriteFailures counter (#23)`.
|
||
|
||
**Bundle G acceptance:** HealthMonitoring.Tests still green; 2 new tests.
|
||
|
||
---
|
||
|
||
## Bundle H — Component-level integration test
|
||
|
||
### Task H1: End-to-end via TestKit + MSSQL fixture
|
||
|
||
**Files:**
|
||
- Create: `tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs` — uses `MsSqlMigrationFixture` (the M1 reusable fixture; depend on `Xunit.SkippableFact`):
|
||
- Brings up `SqliteAuditWriter` against `:memory:`.
|
||
- Brings up `SiteAuditTelemetryActor` via TestKit.
|
||
- Brings up `AuditLogIngestActor` via TestKit, configured with the MSSQL `IAuditLogRepository` from M1.
|
||
- Stubs the gRPC client by overriding the actor's gRPC dependency with a direct `IActorRef`-backed mock that forwards `IngestAuditEvents` directly to the central actor.
|
||
- Writes one `AuditEvent` via the FallbackAuditWriter.
|
||
- Drives a `Drain` tick on the telemetry actor.
|
||
- Asserts the row appears in the MS SQL `AuditLog` table within 5 seconds via `IAuditLogRepository.QueryAsync`.
|
||
|
||
**Steps:**
|
||
1. Failing test (telemetry not yet wired).
|
||
2. Wire the components together via the test harness.
|
||
3. Run: pass.
|
||
4. Commit: `test(auditlog): end-to-end sync-call emission via TestKit + MSSQL fixture (#23)`.
|
||
|
||
**Bundle H acceptance:** New test passes when MSSQL container is up; skips cleanly when down.
|
||
|
||
---
|
||
|
||
## Final cross-bundle review
|
||
|
||
After Bundles A–H, dispatch a final reviewer agent with the same template as M1's. Acceptance gate: full `dotnet test ScadaLink.slnx` green. Then merge `--no-ff` with summary; update M3–M8 with M2 realities; status paragraph; proceed to M3.
|