Files
2026-06-03 15:34:30 -04:00

18 KiB
Raw Permalink Blame History

Audit Log

The Audit Log component records every action a site or central script takes across a trust boundary — outbound API calls, outbound database writes, notification sends, and inbound API requests — into a central append-only AuditLog table, with a site SQLite hot-path, gRPC telemetry forwarding, and a reconciliation fallback.

Overview

Audit Log (#23) is a layered subsystem that runs on both site and central nodes. It exists alongside the operational stores it complements — Notifications (Notification Outbox, #21) and SiteCalls (Site Call Audit, #22) — rather than replacing them. The operational tables answer "what is the current state of this notification / cached call?"; the AuditLog answers "what happened, in what order, who did it, and what crossed the boundary?".

The component code lives in src/ZB.MOM.WW.ScadaBridge.AuditLog/, split by role:

  • Site/ — the script-thread write path: SqliteAuditWriter, the FallbackAuditWriter chain, and the Site/Telemetry/ drain that pushes rows to central.
  • Central/ — the central-node ingest singleton (AuditLogIngestActor), the direct-write path (CentralAuditWriter), the reconciliation puller (SiteAuditReconciliationActor), and retention maintenance.
  • Configuration/, Redaction/, Payload/ — options, the redactor, and the truncation/redaction primitives.

The same DI entry point, ServiceCollectionExtensions.AddAuditLog, registers the writer chain on every host; central nodes additionally call AddAuditLogCentralMaintenance, and site nodes call AddAuditLogHealthMetricsBridge. Because AddAuditLog runs on both site and central composition roots, it never registers a hosted service that would resolve a central-only dependency on a site — central-only registrations are split into their own helper.

Key Concepts

Script trust boundary

The audited scope is the script trust boundary, not framework traffic. The four channels are modelled by the AuditChannel enum (ApiOutbound, DbOutbound, Notification, ApiInbound), and the specific action by AuditKind (for example ApiCall, DbWriteCached, NotifySend, InboundRequest). Every row is built through ScadaBridgeAuditEventFactory.Create, which maps the domain vocabulary onto the canonical record: Channel/Kind/Status become Action/Category/Outcome plus a DetailsJson extension bag carrying every other domain field.

Canonical AuditEvent and DetailsJson

The transport type is the canonical ZB.MOM.WW.Audit.AuditEvent record — ten fields: EventId, OccurredAtUtc, Actor, Action, Outcome, Category, Target, SourceNode, CorrelationId, DetailsJson. ScadaBridge domain fields (ExecutionId, ParentExecutionId, SourceSiteId, RequestSummary, IngestedAtUtc, and so on) ride inside DetailsJson as an AuditDetails record, serialized by AuditDetailsCodec. AuditRowProjection.Decompose / Recompose move between the canonical record and the domain view.

ExecutionId vs CorrelationId

CorrelationId is the canonical top-level field and carries the per-operation lifecycle id — for cached calls it is the TrackedOperationId, and the cached-telemetry drain reads it back out to join the audit row to its operational tracking row (see SiteAuditTelemetryActor.OnCachedDrainAsync). ExecutionId is a DetailsJson field: the per-run correlation value shared by every row a single script execution or inbound request emits. ParentExecutionId (also in DetailsJson) is the cross-execution spawn pointer that bridges, for example, an inbound API request to the site script it routes to.

One row per lifecycle event

Each lifecycle event is one row. A synchronous call produces a single row; a cached call produces several (Submitted, Forwarded, Attempted, then a terminal Delivered/Parked/Discarded). Idempotency is on EventId, so the same row arriving twice — from a telemetry retry and from a reconciliation pull — collapses to a no-op everywhere it is written.

Architecture

Site write path

SqliteAuditWriter is the site hot-path store: a singleton holding one owned SqliteConnection behind a write lock, fed by a bounded Channel<T> that a background task drains in batches, so script threads never block on disk I/O. It writes two tables — the append-only canonical audit_event and a mutable audit_forward_state sidecar that tracks the forwarding lifecycle (from SqliteAuditWriter.InitializeSchema):

CREATE TABLE IF NOT EXISTS audit_event (
    EventId        TEXT NOT NULL,
    OccurredAtUtc  TEXT NOT NULL,
    Actor          TEXT NOT NULL,
    Action         TEXT NOT NULL,
    Outcome        TEXT NOT NULL,
    Category       TEXT NULL,
    Target         TEXT NULL,
    SourceNode     TEXT NULL,
    CorrelationId  TEXT NULL,
    DetailsJson    TEXT NULL,
    PRIMARY KEY (EventId)
);

The sidecar carries ForwardState (Pending/Forwarded/Reconciled, per the AuditForwardState enum), a duplicated OccurredAtUtc for the drain range scan, and a precomputed IsCachedKind flag so the cached/non-cached drain split is an integer predicate, not a DetailsJson parse on the read hot-path. The site store is ephemeral (roughly 7-day retention, recreated per deployment), so a schema change is an in-place reset rather than a migration.

SqliteAuditWriter also implements ISiteAuditQueue, the read/mark surface the drain and the reconciliation pull handler consume. The same singleton instance is bound to both ISiteAuditQueue and the hot-path IAuditWriter so the drain observes exactly the rows the script threads wrote.

Site fallback chain

The script-facing IAuditWriter is FallbackAuditWriter, not the raw SqliteAuditWriter. It redacts once up front, attempts the primary write, and on any primary failure stashes the (already redacted) row in a drop-oldest RingBufferFallback and returns success — a primary outage must never reach the calling script:

public async Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
    ArgumentNullException.ThrowIfNull(evt);
    var filtered = _redactor.Apply(evt);
    try
    {
        await _primary.WriteAsync(filtered, ct).ConfigureAwait(false);
    }
    catch (Exception ex)
    {
        _failureCounter.Increment();
        _logger.LogWarning(ex,
            "Primary audit writer threw; routing EventId {EventId} to drop-oldest ring.",
            filtered.EventId);
        _ring.TryEnqueue(filtered);
        return;
    }

    if (_ring.Count > 0)
    {
        await TryDrainRingAsync(ct).ConfigureAwait(false);
    }
}

On the next successful primary write the ring drains back through the primary in FIFO order.

Telemetry forward and central ingest

SiteAuditTelemetryActor drains the local queue and pushes to central over two parallel transports, each self-ticking on its own cadence (BusyIntervalSeconds while rows flow, IdleIntervalSeconds when empty):

  • IngestAuditEvents for single-row lifecycle events (sync ApiCall/DbWrite, NotifySend, InboundRequest).
  • IngestCachedTelemetry for cached-call rows, each joined to its IOperationTrackingStore snapshot so central can write AuditLog and SiteCalls together.

On the central node, AuditLogIngestActor is a cluster singleton. It opens a fresh DI scope per message because IAuditLogRepository is a scoped EF Core service, stamps IngestedAtUtc, and inserts idempotently — one bad row never sinks the batch:

private async Task IngestWithRepositoryAsync(
    IAuditLogRepository repository,
    IAuditRedactor? redactor,
    ICentralAuditWriteFailureCounter? failureCounter,
    IngestAuditEventsCommand cmd,
    DateTime nowUtc,
    List<Guid> accepted)
{
    foreach (var evt in cmd.Events)
    {
        try
        {
            var safeRedactor = redactor ?? SafeDefaultAuditRedactor.Instance;
            var filtered = safeRedactor.Apply(evt);
            var ingested = AuditRowProjection.WithIngestedAtUtc(filtered, nowUtc);
            await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
            accepted.Add(evt.EventId);
        }
        catch (Exception ex)
        {
            try { failureCounter?.Increment(); }
            catch { /* counter must never throw — defence in depth */ }
            _logger.LogError(ex,
                "Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.",
                evt.EventId);
        }
    }
}

The cached path, OnCachedTelemetryAsync, wraps each entry in its own MS SQL transaction and writes the AuditLog row and the SiteCalls row together, so the audit and operational mirrors never drift mid-row.

Central direct-write

Events that originate on central — Notification Outbox dispatch and Inbound API — never go through site telemetry. They call ICentralAuditWriter, implemented by CentralAuditWriter, which redacts, stamps SourceNode from INodeIdentityProvider when the caller has not, opens a per-call scope, and inserts idempotently. Like every audit path, it swallows and logs failures rather than propagating them.

Reconciliation and retention

SiteAuditReconciliationActor is a central singleton that, on a timer, pulls each site for rows at or after a per-site cursor and ingests them idempotently — the self-healing fallback for telemetry the push path missed. AuditLogPurgeActor drives the daily partition-switch purge against the central table, and AuditLogPartitionMaintenanceService rolls the monthly partition function forward so inserts never land in an unbounded tail partition.

Usage

Rows are written through one of two DI seams, never constructed ad hoc. Site boundary code resolves the hot-path IAuditWriter — the FallbackAuditWriter shown above — and writes without blocking on disk and without ever throwing an audit failure back at the script. Central-originated events (Notification Outbox dispatch, Inbound API) resolve ICentralAuditWriter instead; its CentralAuditWriter implementation redacts, stamps SourceNode, opens a per-call EF Core scope, and inserts idempotently, swallowing failures the same way:

public async Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
    if (evt is null)
    {
        _logger.LogWarning("CentralAuditWriter.WriteAsync received null event; ignoring.");
        return;
    }

    try
    {
        var filtered = _redactor.Apply(evt);
        if (filtered.SourceNode is null && _nodeIdentity?.NodeName is { } nodeName)
        {
            filtered = filtered with { SourceNode = nodeName };
        }

        await using var scope = _services.CreateAsyncScope();
        var repo = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
        var stamped = AuditRowProjection.WithIngestedAtUtc(filtered, DateTime.UtcNow);
        await repo.InsertIfNotExistsAsync(stamped, ct).ConfigureAwait(false);
    }
    catch (Exception ex)
    {
        try { _failureCounter.Increment(); }
        catch { /* counter must never throw — defence in depth */ }
        _logger.LogWarning(ex,
            "CentralAuditWriter failed for EventId {EventId} (Action={Action}, Outcome={Outcome})",
            evt.EventId, evt.Action, evt.Outcome);
    }
}

The two writer seams are intentionally distinct DI bindings: IAuditWriter is the site/boundary hot-path, ICentralAuditWriter is the central direct-write path. Keeping them separate stops a site composition root from accidentally resolving the central writer, which depends on a scoped IAuditLogRepository only registered by the Configuration Database.

Configuration

The top-level options class is AuditLogOptions, bound from the AuditLog section and validated on startup by AuditLogOptionsValidator. The writer and telemetry collaborators bind from nested sections; the constant section names live on ServiceCollectionExtensions.

Section Key Default Description
AuditLog DefaultCapBytes 8192 Payload-summary cap in bytes. Must be > 0.
AuditLog ErrorCapBytes 65536 Cap on error rows. Must be >= DefaultCapBytes.
AuditLog InboundMaxBytes 1048576 Per-body ceiling for ApiInbound summaries. Range [8192, 16777216].
AuditLog HeaderRedactList Authorization, X-Api-Key, Cookie, Set-Cookie HTTP headers redacted before persistence.
AuditLog GlobalBodyRedactors empty Body-content redactor regex patterns applied globally.
AuditLog PerTargetOverrides empty Per-target overrides keyed by target name (CapBytes, AdditionalBodyRedactors, RedactSqlParamsMatching).
AuditLog RetentionDays 365 Central retention window. Range [30, 3650].
AuditLog:SiteWriter DatabasePath auditlog.db Site SQLite file path.
AuditLog:SiteWriter ChannelCapacity 4096 Bounded write-queue capacity.
AuditLog:SiteWriter BatchSize 256 Max events per write transaction.
AuditLog:SiteTelemetry BatchSize 256 Max rows per gRPC drain batch.
AuditLog:SiteTelemetry BusyIntervalSeconds 5 Drain delay while rows are flowing.
AuditLog:SiteTelemetry IdleIntervalSeconds 30 Drain delay when the queue is empty.
AuditLog:PartitionMaintenance IntervalSeconds 86400 Partition roll-forward cadence.
AuditLog:PartitionMaintenance LookaheadMonths 1 Future months pf_AuditLog_Month must always cover.

PerTargetRedactionOverride is additive: per-target body redactors append to the global list, and RedactSqlParamsMatching is an opt-in case-insensitive regex applied only to DbOutbound rows (SQL parameter values are captured verbatim by default). Header redaction always runs — when no redactor is wired, the paths fall back to SafeDefaultAuditRedactor, which scrubs the default sensitive headers regardless.

SiteAuditReconciliationOptions exposes ReconciliationIntervalSeconds (default 300) and StalledAfterNonDrainingCycles (default 2); AuditLogPurgeOptions exposes IntervalHours (default 24), while the purge window itself is sourced from AuditLogOptions.RetentionDays so retention is tuned from one place.

Dependencies & Interactions

  • Commons (#16) — owns the canonical AuditEvent shape consumed here (via ZB.MOM.WW.Audit), the IAuditWriter / ICentralAuditWriter / ISiteAuditQueue interfaces, the AuditChannel / AuditKind / AuditForwardState enums, the AuditDetails / AuditRowProjection / ScadaBridgeAuditEventFactory projection types, and the ingest/pull message contracts.
  • Configuration Database (#17) — registers the scoped IAuditLogRepository (the central dbo.AuditLog table, partition boundaries, and InsertIfNotExistsAsync idempotency). Central hosts must call AddConfigurationDatabase for the ingest, direct-write, reconciliation, and purge paths to resolve their repository.
  • CentralSite Communication (#5) — supplies the gRPC transport: IngestAuditEvents / IngestCachedTelemetry push and the PullAuditEvents reconciliation pull, plus the DTO mappers.
  • Site Call Audit (#22) — shares the combined cached-telemetry packet. AuditLogIngestActor.OnCachedTelemetryAsync writes the AuditLog row and the SiteCalls upsert in one transaction; sites remain the source of truth for cached-call status.
  • Notification Outbox (#21) — a central direct-write caller of ICentralAuditWriter (dispatch lifecycle rows), alongside the Inbound API.
  • Health Monitoring (#11)AddAuditLogHealthMetricsBridge replaces the NoOp failure counters with bridges that surface SiteAuditWriteFailures and AuditRedactionFailure on the site health report; SiteAuditBacklogReporter polls the writer backlog. On central, AuditCentralHealthSnapshot exposes CentralAuditWriteFailures, AuditRedactionFailure, and per-site SiteAuditTelemetryStalled.
  • Design spec: Component-AuditLog.md.

Troubleshooting

Telemetry loss self-heals

If the push path misses rows (a gRPC blip, a central restart, a site briefly offline), the site keeps those rows Pending and SiteAuditReconciliationActor re-pulls them on its next tick. Idempotency on EventId makes duplicates from both paths a no-op, so no operator action is required. The reconciliation cursor is in-memory; a singleton restart resets it to DateTime.MinValue, which re-pulls everything the site still holds — conservative but correct.

A row repeatedly fails to ingest

SiteAuditReconciliationActor tracks per-EventId insert failures. While a row keeps failing below MaxPermanentInsertAttempts (5), the site cursor is held back so the next tick retries it. At the threshold the actor logs Critical, permanently abandons that one row, and advances the cursor so a single broken row cannot block all further progress for the site. A Critical log line naming an abandoned EventId is the signal to investigate that row's payload.

A site shows as stalled

When two consecutive reconciliation cycles both return rows and report MoreAvailable=true, the backlog is not draining and the actor latches the site as stalled, publishing SiteAuditTelemetryStalledChanged on the EventStream (surfaced as SiteAuditTelemetryStalled on the central health snapshot). Only transitions are published, so a stalled site does not flood the health surface.

Audit writes never abort the action

Every write path is best-effort by contract. A primary SQLite failure routes to the ring buffer; an ingest or direct-write failure is swallowed, logged, and counted on the health surface. The audited action's own success/failure path is authoritative — a missing audit row never means the action failed. The site retention purge enforces the matching invariant: a row is not dropped until it has reached Forwarded or Reconciled.