feat(auditlog): SiteAuditReconciliationActor central singleton (#23 M6)
This commit is contained in:
45
src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs
Normal file
45
src/ScadaLink.AuditLog/Central/IPullAuditEventsClient.cs
Normal file
@@ -0,0 +1,45 @@
|
||||
using ScadaLink.Commons.Messages.Integration;
|
||||
|
||||
namespace ScadaLink.AuditLog.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Mockable abstraction over the central-side <c>PullAuditEvents</c> gRPC
|
||||
/// client surface that <see cref="SiteAuditReconciliationActor"/> uses to
|
||||
/// fetch the next reconciliation batch from a specific site. Extracted so the
|
||||
/// actor can be unit-tested against an in-memory stub without standing up a
|
||||
/// real <c>GrpcChannel</c> per site.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The production implementation (host wiring task) wraps the auto-generated
|
||||
/// <c>SiteStreamService.SiteStreamServiceClient</c>, multiplexing one
|
||||
/// <c>GrpcChannel</c> per site keyed on
|
||||
/// <see cref="SiteEntry.GrpcEndpoint"/>. Until that wiring lands the DI
|
||||
/// composition root binds a NoOp default that returns an empty response — the
|
||||
/// reconciliation tick is still scheduled and the cursor logic still runs, so
|
||||
/// regressions in the actor itself are caught even before the real client
|
||||
/// arrives.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Implementations MUST NOT throw on transport faults that the actor can
|
||||
/// tolerate (connection refused, deadline exceeded). The actor's contract is
|
||||
/// "one site's failure doesn't sink the rest of the tick"; an exception still
|
||||
/// won't crash the actor (the per-site try/catch catches it), but returning
|
||||
/// an empty response on a known-recoverable error keeps the logs cleaner.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public interface IPullAuditEventsClient
|
||||
{
|
||||
/// <summary>
|
||||
/// Issues a <c>PullAuditEvents</c> RPC against the site whose endpoint
|
||||
/// is registered against <paramref name="siteId"/>. Returns the next
|
||||
/// batch of <see cref="ScadaLink.Commons.Entities.Audit.AuditEvent"/>
|
||||
/// rows ordered oldest-first AND a <c>MoreAvailable</c> flag the actor
|
||||
/// uses to decide whether to fire another pull immediately.
|
||||
/// </summary>
|
||||
Task<PullAuditEventsResponse> PullAsync(
|
||||
string siteId,
|
||||
DateTime sinceUtc,
|
||||
int batchSize,
|
||||
CancellationToken ct);
|
||||
}
|
||||
34
src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs
Normal file
34
src/ScadaLink.AuditLog/Central/ISiteEnumerator.cs
Normal file
@@ -0,0 +1,34 @@
|
||||
namespace ScadaLink.AuditLog.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Enumeration surface consumed by <see cref="SiteAuditReconciliationActor"/> to
|
||||
/// discover which sites to poll on each reconciliation tick. Extracted so the
|
||||
/// actor can be unit-tested against a static list without depending on the
|
||||
/// production <c>ISiteRepository</c> + EF Core DbContext.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The production implementation wraps <c>ISiteRepository.GetAllSitesAsync</c>
|
||||
/// and projects each <c>Site</c> to a <see cref="SiteEntry"/> using the
|
||||
/// site's configured <c>GrpcNodeAAddress</c> (falling back to
|
||||
/// <c>GrpcNodeBAddress</c> when NodeA is unset). Sites with NO gRPC address
|
||||
/// configured are silently skipped — the reconciliation pull cannot reach
|
||||
/// them, but absence of an address is a configuration decision, not a runtime
|
||||
/// error.
|
||||
/// </remarks>
|
||||
public interface ISiteEnumerator
|
||||
{
|
||||
/// <summary>
|
||||
/// Returns the current set of sites the reconciliation puller should visit
|
||||
/// on the next tick. Implementations should reflect adds/removes promptly
|
||||
/// — the actor calls this once per tick.
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<SiteEntry>> EnumerateAsync(CancellationToken ct = default);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One reconciliation target: the site identifier the actor uses as the
|
||||
/// cursor key and the gRPC endpoint <see cref="IPullAuditEventsClient"/> dials
|
||||
/// to issue the pull. Endpoint is the bare authority (e.g. <c>http://siteA:8083</c>);
|
||||
/// transport selection (TLS, keepalive, etc.) is the client's concern.
|
||||
/// </summary>
|
||||
public sealed record SiteEntry(string SiteId, string GrpcEndpoint);
|
||||
324
src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs
Normal file
324
src/ScadaLink.AuditLog/Central/SiteAuditReconciliationActor.cs
Normal file
@@ -0,0 +1,324 @@
|
||||
using Akka.Actor;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
|
||||
namespace ScadaLink.AuditLog.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Central singleton (M6 Bundle B) that drives the audit-log reconciliation
|
||||
/// pull loop. On a configurable timer (default 5 minutes) the actor walks every
|
||||
/// known site, asks the site for any <see cref="AuditEvent"/> rows with
|
||||
/// <see cref="AuditEvent.OccurredAtUtc"/> >= the site's last reconciled
|
||||
/// cursor, ingests them idempotently into the central
|
||||
/// <see cref="IAuditLogRepository"/>, and advances the cursor.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <b>Self-healing telemetry, not a dispatcher.</b> The push path
|
||||
/// (<see cref="ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor"/> +
|
||||
/// <c>IngestAuditEvents</c>) is the primary mechanism. This actor exists so a
|
||||
/// missed push (gRPC blip, central restart, site offline) is eventually
|
||||
/// repaired by central re-pulling whatever the site still has in
|
||||
/// <c>Pending</c>/<c>Forwarded</c> state. Idempotency on
|
||||
/// <see cref="AuditEvent.EventId"/> (M2 Bundle A's race-fix) makes duplicate
|
||||
/// arrivals from both paths a silent no-op.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Cursor lifetime.</b> The per-site <c>LastReconciledAt</c> watermark is
|
||||
/// kept in-memory for the actor's lifetime. The cluster singleton normally
|
||||
/// survives the host process; on a deliberate failover OR a singleton restart
|
||||
/// the cursors reset to <see cref="DateTime.MinValue"/>. That is conservative
|
||||
/// but correct — the next tick simply asks for everything the site still has,
|
||||
/// and idempotent ingest swallows the dupes. Persisting cursors to MS SQL was
|
||||
/// considered and rejected for M6: the cost of a write per tick outweighs the
|
||||
/// rare benefit of avoiding one over-broad pull after a restart.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Stalled detection.</b> The brief calls a site "stalled" when two
|
||||
/// consecutive pull cycles BOTH return non-empty AND <c>MoreAvailable=true</c>
|
||||
/// — i.e. the backlog isn't draining. The actor publishes
|
||||
/// <see cref="SiteAuditTelemetryStalledChanged"/> on the actor system's
|
||||
/// EventStream so a future <c>ICentralHealthCollector</c> bridge (M6 Bundle E)
|
||||
/// can flip the health metric without coupling this actor to the health
|
||||
/// collection surface today.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Failure isolation.</b> A single site that throws (DNS, transport,
|
||||
/// repository write) must NOT prevent other sites from being polled on the
|
||||
/// same tick. The per-site work runs inside its own try/catch; the actor's
|
||||
/// supervisor strategy keeps it alive across any leaked exception with
|
||||
/// <see cref="Akka.Actor.SupervisorStrategy.DefaultDecider"/>'s Restart
|
||||
/// semantics — restart resets the in-memory cursors, but as noted above that's
|
||||
/// a safe (over-pull, idempotent) recovery.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>DI scopes.</b> <see cref="IAuditLogRepository"/> is a scoped EF Core
|
||||
/// service registered by <c>AddConfigurationDatabase</c>. The singleton actor
|
||||
/// opens one DI scope per tick and reuses the same repository across all
|
||||
/// sites in that tick — one DbContext per tick mirrors the
|
||||
/// <c>AuditLogIngestActor</c> + <c>NotificationOutboxActor</c> pattern.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public class SiteAuditReconciliationActor : ReceiveActor
|
||||
{
|
||||
private readonly ISiteEnumerator _sites;
|
||||
private readonly IPullAuditEventsClient _client;
|
||||
private readonly IServiceProvider _services;
|
||||
private readonly SiteAuditReconciliationOptions _options;
|
||||
private readonly ILogger<SiteAuditReconciliationActor> _logger;
|
||||
|
||||
/// <summary>
|
||||
/// Per-site reconciliation watermark — the highest
|
||||
/// <see cref="AuditEvent.OccurredAtUtc"/> seen for that site on a previous
|
||||
/// tick. Asking for <c>OccurredAtUtc >= cursor</c> rather than >
|
||||
/// is the site contract (<see cref="ScadaLink.Commons.Interfaces.Services.ISiteAuditQueue.ReadPendingSinceAsync"/>);
|
||||
/// duplicate-with-same-timestamp rows are filtered out by the idempotent
|
||||
/// repository write.
|
||||
/// </summary>
|
||||
private readonly Dictionary<string, DateTime> _cursors = new();
|
||||
|
||||
/// <summary>
|
||||
/// Per-site count of consecutive non-draining cycles. Resets to zero on the
|
||||
/// first draining (or empty) cycle.
|
||||
/// </summary>
|
||||
private readonly Dictionary<string, int> _nonDrainingCycles = new();
|
||||
|
||||
/// <summary>
|
||||
/// Per-site latched stalled state — used so the actor only publishes a
|
||||
/// <see cref="SiteAuditTelemetryStalledChanged"/> transition when the
|
||||
/// stalled flag actually changes, not on every tick while stalled.
|
||||
/// </summary>
|
||||
private readonly Dictionary<string, bool> _stalled = new();
|
||||
|
||||
private ICancelable? _timer;
|
||||
|
||||
public SiteAuditReconciliationActor(
|
||||
ISiteEnumerator sites,
|
||||
IPullAuditEventsClient client,
|
||||
IServiceProvider services,
|
||||
IOptions<SiteAuditReconciliationOptions> options,
|
||||
ILogger<SiteAuditReconciliationActor> logger)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(sites);
|
||||
ArgumentNullException.ThrowIfNull(client);
|
||||
ArgumentNullException.ThrowIfNull(services);
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
ArgumentNullException.ThrowIfNull(logger);
|
||||
|
||||
_sites = sites;
|
||||
_client = client;
|
||||
_services = services;
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
|
||||
ReceiveAsync<ReconciliationTick>(_ => OnTickAsync());
|
||||
}
|
||||
|
||||
protected override void PreStart()
|
||||
{
|
||||
base.PreStart();
|
||||
var interval = _options.ReconciliationInterval;
|
||||
_timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
|
||||
initialDelay: interval,
|
||||
interval: interval,
|
||||
receiver: Self,
|
||||
message: ReconciliationTick.Instance,
|
||||
sender: Self);
|
||||
}
|
||||
|
||||
protected override void PostStop()
|
||||
{
|
||||
_timer?.Cancel();
|
||||
base.PostStop();
|
||||
}
|
||||
|
||||
private async Task OnTickAsync()
|
||||
{
|
||||
IReadOnlyList<SiteEntry> sites;
|
||||
try
|
||||
{
|
||||
sites = await _sites.EnumerateAsync().ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Site enumeration failed; skipping reconciliation tick.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (sites.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
IServiceScope? scope = null;
|
||||
IAuditLogRepository repository;
|
||||
try
|
||||
{
|
||||
scope = _services.CreateScope();
|
||||
repository = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to resolve IAuditLogRepository for reconciliation tick.");
|
||||
scope?.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var site in sites)
|
||||
{
|
||||
try
|
||||
{
|
||||
await PullSiteAsync(site, repository).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Catch-all per the failure-isolation invariant: one site's
|
||||
// fault must not sink the rest of the tick. The cursor for
|
||||
// the failing site is left at its previous value so the
|
||||
// next tick retries the same window.
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"Reconciliation pull failed for site {SiteId}; other sites continue.",
|
||||
site.SiteId);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
scope.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Issues one <c>PullAuditEvents</c> RPC against the site, ingests the
|
||||
/// returned rows idempotently into the central repository, and advances
|
||||
/// the cursor based on the maximum <see cref="AuditEvent.OccurredAtUtc"/>
|
||||
/// observed. The brief's "saturate until backlog clears" intent is met by
|
||||
/// the natural cadence — each tick issues one pull, and a backed-up site
|
||||
/// drains across consecutive ticks. The stalled signal (two non-draining
|
||||
/// ticks in a row) surfaces when that drain isn't keeping up.
|
||||
/// </summary>
|
||||
private async Task PullSiteAsync(SiteEntry site, IAuditLogRepository repository)
|
||||
{
|
||||
var since = _cursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue;
|
||||
var response = await _client.PullAsync(
|
||||
site.SiteId, since, _options.BatchSize, CancellationToken.None)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var maxOccurred = since;
|
||||
var nowUtc = DateTime.UtcNow;
|
||||
foreach (var evt in response.Events)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Idempotent repository write: duplicate EventIds (from a
|
||||
// concurrent push, or a retry of this very pull) collapse to
|
||||
// a no-op courtesy of M2 Bundle A's race-fix on
|
||||
// InsertIfNotExistsAsync.
|
||||
var ingested = evt with { IngestedAtUtc = nowUtc };
|
||||
await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Per-row catch so one bad event does not abandon the rest of
|
||||
// the batch. The cursor still advances based on OccurredAtUtc
|
||||
// — the row was returned by the site, so the next tick won't
|
||||
// re-fetch it; if it permanently fails to persist, that's an
|
||||
// operational concern surfaced by the log, not a hot-loop
|
||||
// trigger.
|
||||
_logger.LogError(
|
||||
ex,
|
||||
"Reconciliation ingest failed for AuditEvent {EventId} from site {SiteId}.",
|
||||
evt.EventId,
|
||||
site.SiteId);
|
||||
}
|
||||
|
||||
if (evt.OccurredAtUtc > maxOccurred)
|
||||
{
|
||||
maxOccurred = evt.OccurredAtUtc;
|
||||
}
|
||||
}
|
||||
|
||||
_cursors[site.SiteId] = maxOccurred;
|
||||
|
||||
var nonDraining = response.MoreAvailable && response.Events.Count > 0;
|
||||
UpdateStalledState(site.SiteId, draining: !nonDraining);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Flips the per-site stalled flag based on whether this tick drained the
|
||||
/// queue. A "draining" cycle is one where the server reported no more rows
|
||||
/// available OR returned zero events. A "non-draining" cycle is the
|
||||
/// inverse (events returned AND <c>MoreAvailable=true</c>).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The state machine: counter increments on each consecutive non-draining
|
||||
/// tick. On reaching <see cref="SiteAuditReconciliationOptions.StalledAfterNonDrainingCycles"/>
|
||||
/// the actor latches <c>Stalled=true</c> and publishes the transition; on
|
||||
/// any subsequent draining tick the counter resets to zero AND, if the
|
||||
/// latch is currently true, the actor publishes <c>Stalled=false</c>. Only
|
||||
/// transitions are published — repeated ticks in the same state are
|
||||
/// silent so a downstream subscriber doesn't see a flood of redundant
|
||||
/// notifications.
|
||||
/// </remarks>
|
||||
private void UpdateStalledState(string siteId, bool draining)
|
||||
{
|
||||
var wasStalled = _stalled.TryGetValue(siteId, out var prior) && prior;
|
||||
|
||||
if (draining)
|
||||
{
|
||||
_nonDrainingCycles[siteId] = 0;
|
||||
if (wasStalled)
|
||||
{
|
||||
_stalled[siteId] = false;
|
||||
Context.System.EventStream.Publish(
|
||||
new SiteAuditTelemetryStalledChanged(siteId, Stalled: false));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
var consecutive = _nonDrainingCycles.GetValueOrDefault(siteId) + 1;
|
||||
_nonDrainingCycles[siteId] = consecutive;
|
||||
|
||||
if (consecutive >= _options.StalledAfterNonDrainingCycles && !wasStalled)
|
||||
{
|
||||
_stalled[siteId] = true;
|
||||
Context.System.EventStream.Publish(
|
||||
new SiteAuditTelemetryStalledChanged(siteId, Stalled: true));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Resume on any unhandled exception inside the receive — the singleton
|
||||
/// MUST stay alive even if the per-tick try/catch leaks. Restart would
|
||||
/// reset the cursors (safe but wasteful); Resume preserves them.
|
||||
/// </summary>
|
||||
protected override SupervisorStrategy SupervisorStrategy()
|
||||
{
|
||||
return new OneForOneStrategy(
|
||||
maxNrOfRetries: 0,
|
||||
withinTimeRange: TimeSpan.Zero,
|
||||
decider: Akka.Actor.SupervisorStrategy.DefaultDecider);
|
||||
}
|
||||
|
||||
/// <summary>Self-tick triggering a reconciliation pass across all sites.</summary>
|
||||
internal sealed class ReconciliationTick
|
||||
{
|
||||
public static readonly ReconciliationTick Instance = new();
|
||||
private ReconciliationTick() { }
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Published on the actor system EventStream when a site's reconciliation
|
||||
/// puller transitions into or out of the "stalled" state (backlog not
|
||||
/// draining across multiple cycles). The M6 Bundle E central health collector
|
||||
/// will subscribe to this and surface
|
||||
/// <c>SiteAuditTelemetryStalled</c> on the health-report payload.
|
||||
/// </summary>
|
||||
public sealed record SiteAuditTelemetryStalledChanged(string SiteId, bool Stalled);
|
||||
@@ -0,0 +1,60 @@
|
||||
namespace ScadaLink.AuditLog.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Tuning knobs for the central <see cref="SiteAuditReconciliationActor"/> singleton.
|
||||
/// Defaults mirror the M6 Bundle B brief: pull every 5 minutes per site, 256 rows per
|
||||
/// batch, declare a site "stalled" after two consecutive pull cycles return non-empty
|
||||
/// AND <c>MoreAvailable=true</c> (the backlog is not draining).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Per the M6 plan the reconciliation actor is the fallback when push telemetry is
|
||||
/// lost; it is intentionally low-frequency. Lowering
|
||||
/// <see cref="ReconciliationIntervalSeconds"/> in production trades MS SQL load for
|
||||
/// fresher self-healing — keep the default unless a deployment can prove the extra
|
||||
/// load is acceptable.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <see cref="StalledAfterNonDrainingCycles"/> = 2 because a single non-draining
|
||||
/// cycle can happen on a surge (e.g. a backed-up site replays its hot queue); the
|
||||
/// stalled signal should only fire when the backlog persists across cycles, which is
|
||||
/// the symptom the central health surface is asking us to detect.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class SiteAuditReconciliationOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Period of the reconciliation tick. Each tick visits every known site once.
|
||||
/// </summary>
|
||||
public int ReconciliationIntervalSeconds { get; set; } = 300;
|
||||
|
||||
/// <summary>
|
||||
/// Test-only override for finer control over the tick cadence than
|
||||
/// whole-second resolution allows. When non-null, takes precedence over
|
||||
/// <see cref="ReconciliationIntervalSeconds"/>. Not bound from config —
|
||||
/// production config exposes <see cref="ReconciliationIntervalSeconds"/>
|
||||
/// only.
|
||||
/// </summary>
|
||||
public TimeSpan? ReconciliationIntervalOverride { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Resolves the effective tick interval, honouring the test override when
|
||||
/// set. Falls back to <see cref="ReconciliationIntervalSeconds"/>.
|
||||
/// </summary>
|
||||
public TimeSpan ReconciliationInterval =>
|
||||
ReconciliationIntervalOverride ?? TimeSpan.FromSeconds(ReconciliationIntervalSeconds);
|
||||
|
||||
/// <summary>
|
||||
/// Maximum number of <see cref="ScadaLink.Commons.Entities.Audit.AuditEvent"/>
|
||||
/// rows requested in a single <c>PullAuditEvents</c> RPC call.
|
||||
/// </summary>
|
||||
public int BatchSize { get; set; } = 256;
|
||||
|
||||
/// <summary>
|
||||
/// Number of consecutive non-draining cycles (events returned AND
|
||||
/// <c>MoreAvailable=true</c>) that must accumulate for a site before the actor
|
||||
/// publishes <c>SiteAuditTelemetryStalledChanged(Stalled: true)</c> on the
|
||||
/// EventStream.
|
||||
/// </summary>
|
||||
public int StalledAfterNonDrainingCycles { get; set; } = 2;
|
||||
}
|
||||
@@ -0,0 +1,438 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ScadaLink.AuditLog.Central;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
using ScadaLink.Commons.Messages.Integration;
|
||||
using ScadaLink.Commons.Types.Audit;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.ConfigurationDatabase;
|
||||
using ScadaLink.ConfigurationDatabase.Repositories;
|
||||
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
||||
|
||||
namespace ScadaLink.AuditLog.Tests.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Bundle B (M6-T3) tests for <see cref="SiteAuditReconciliationActor"/>. Most
|
||||
/// tests substitute the <see cref="IAuditLogRepository"/> with an in-memory
|
||||
/// recording stub so the actor's tick / cursor / stalled state machinery can
|
||||
/// be exercised in milliseconds without an MSSQL container. The duplicate /
|
||||
/// idempotency assertion uses the real <see cref="AuditLogRepository"/> against
|
||||
/// the <see cref="MsSqlMigrationFixture"/> so we verify InsertIfNotExistsAsync
|
||||
/// actually swallows duplicate-key collisions (the M2 Bundle A race-fix the
|
||||
/// reconciliation puller depends on).
|
||||
/// </summary>
|
||||
public class SiteAuditReconciliationActorTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
||||
{
|
||||
private readonly MsSqlMigrationFixture _fixture;
|
||||
|
||||
public SiteAuditReconciliationActorTests(MsSqlMigrationFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
}
|
||||
|
||||
private static AuditEvent NewEvent(
|
||||
string siteId,
|
||||
DateTime? occurredAt = null,
|
||||
Guid? id = null) => new()
|
||||
{
|
||||
EventId = id ?? Guid.NewGuid(),
|
||||
OccurredAtUtc = occurredAt ?? new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
|
||||
Channel = AuditChannel.ApiOutbound,
|
||||
Kind = AuditKind.ApiCall,
|
||||
Status = AuditStatus.Delivered,
|
||||
SourceSiteId = siteId,
|
||||
};
|
||||
|
||||
private static SiteAuditReconciliationOptions FastTickOptions(
|
||||
int batchSize = 256,
|
||||
int stalledAfter = 2) =>
|
||||
new()
|
||||
{
|
||||
// 100 ms tick keeps each test under a second. AwaitAssert covers
|
||||
// schedule jitter so a 100 ms tick has up to ~3 s to fire.
|
||||
ReconciliationIntervalSeconds = 300,
|
||||
ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100),
|
||||
BatchSize = batchSize,
|
||||
StalledAfterNonDrainingCycles = stalledAfter,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// In-memory recording stub used for non-MSSQL tests. Captures every
|
||||
/// <see cref="InsertIfNotExistsAsync"/> call AND deduplicates on
|
||||
/// <see cref="AuditEvent.EventId"/> so duplicate-handling assertions don't
|
||||
/// need a real database for the simple cases.
|
||||
/// </summary>
|
||||
private sealed class RecordingRepo : IAuditLogRepository
|
||||
{
|
||||
public List<AuditEvent> Inserted { get; } = new();
|
||||
private readonly HashSet<Guid> _seen = new();
|
||||
public int InsertCallCount { get; private set; }
|
||||
|
||||
public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default)
|
||||
{
|
||||
InsertCallCount++;
|
||||
if (_seen.Add(evt.EventId))
|
||||
{
|
||||
Inserted.Add(evt);
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<AuditEvent>> QueryAsync(
|
||||
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) =>
|
||||
Task.FromResult<IReadOnlyList<AuditEvent>>(Inserted);
|
||||
|
||||
public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
|
||||
Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// In-memory enumerator returning a static list of sites.
|
||||
/// </summary>
|
||||
private sealed class StaticEnumerator : ISiteEnumerator
|
||||
{
|
||||
private readonly IReadOnlyList<SiteEntry> _sites;
|
||||
public StaticEnumerator(params SiteEntry[] sites) => _sites = sites;
|
||||
public Task<IReadOnlyList<SiteEntry>> EnumerateAsync(CancellationToken ct = default) =>
|
||||
Task.FromResult(_sites);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Scripted pull client — returns the next queued response for the site
|
||||
/// on each call, looping the last entry if the queue is exhausted. Also
|
||||
/// records every invocation so tests can assert call counts + arguments.
|
||||
/// </summary>
|
||||
private sealed class ScriptedPullClient : IPullAuditEventsClient
|
||||
{
|
||||
public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new();
|
||||
private readonly Dictionary<string, Queue<PullAuditEventsResponse>> _scripted = new();
|
||||
private readonly Dictionary<string, Exception> _throwOnSite = new();
|
||||
|
||||
public ScriptedPullClient Script(string siteId, params PullAuditEventsResponse[] responses)
|
||||
{
|
||||
_scripted[siteId] = new Queue<PullAuditEventsResponse>(responses);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ScriptedPullClient ThrowFor(string siteId, Exception ex)
|
||||
{
|
||||
_throwOnSite[siteId] = ex;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Task<PullAuditEventsResponse> PullAsync(
|
||||
string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct)
|
||||
{
|
||||
Calls.Add((siteId, sinceUtc, batchSize));
|
||||
if (_throwOnSite.TryGetValue(siteId, out var ex))
|
||||
{
|
||||
throw ex;
|
||||
}
|
||||
if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0)
|
||||
{
|
||||
return Task.FromResult(queue.Dequeue());
|
||||
}
|
||||
return Task.FromResult(
|
||||
new PullAuditEventsResponse(Array.Empty<AuditEvent>(), MoreAvailable: false));
|
||||
}
|
||||
}
|
||||
|
||||
private IServiceProvider BuildScopedProvider(IAuditLogRepository repo)
|
||||
{
|
||||
var services = new ServiceCollection();
|
||||
// The actor opens a scope per tick and resolves IAuditLogRepository
|
||||
// from that scope; registering as scoped mirrors how
|
||||
// AddConfigurationDatabase wires the real repository.
|
||||
services.AddScoped(_ => repo);
|
||||
return services.BuildServiceProvider();
|
||||
}
|
||||
|
||||
private IActorRef CreateActor(
|
||||
ISiteEnumerator sites,
|
||||
IPullAuditEventsClient client,
|
||||
IAuditLogRepository repo,
|
||||
SiteAuditReconciliationOptions options)
|
||||
{
|
||||
var sp = BuildScopedProvider(repo);
|
||||
return Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor(
|
||||
sites,
|
||||
client,
|
||||
sp,
|
||||
Options.Create(options),
|
||||
NullLogger<SiteAuditReconciliationActor>.Instance)));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Subscribes to the EventStream and collects every
|
||||
/// <see cref="SiteAuditTelemetryStalledChanged"/> publication into a list
|
||||
/// the test can assert on. Uses a probe actor so the stream's
|
||||
/// fire-and-forget delivery is observable from the test thread.
|
||||
/// </summary>
|
||||
private (Akka.TestKit.TestProbe Probe, List<SiteAuditTelemetryStalledChanged> Captured) SubscribeStalled()
|
||||
{
|
||||
var probe = CreateTestProbe();
|
||||
Sys.EventStream.Subscribe(probe.Ref, typeof(SiteAuditTelemetryStalledChanged));
|
||||
var captured = new List<SiteAuditTelemetryStalledChanged>();
|
||||
return (probe, captured);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 1. Timer_Fires_OnConfiguredInterval
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Timer_Fires_OnConfiguredInterval()
|
||||
{
|
||||
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
||||
var client = new ScriptedPullClient();
|
||||
var repo = new RecordingRepo();
|
||||
var opts = FastTickOptions();
|
||||
|
||||
CreateActor(sites, client, repo, opts);
|
||||
|
||||
// The first scheduled tick fires after `ReconciliationIntervalSeconds`,
|
||||
// which is 0 for the test — Akka's scheduler still respects the
|
||||
// ScheduleTellRepeatedlyCancelable contract that issues a Tell on the
|
||||
// scheduler thread, so we await visible side effects (a PullAsync call)
|
||||
// rather than racing on internal state.
|
||||
AwaitAssert(
|
||||
() => Assert.True(client.Calls.Count >= 1, $"expected >= 1 pull call, got {client.Calls.Count}"),
|
||||
duration: TimeSpan.FromSeconds(3),
|
||||
interval: TimeSpan.FromMilliseconds(50));
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 2. Tick_PullsFromEachKnownSite
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Tick_PullsFromEachKnownSite()
|
||||
{
|
||||
var sites = new StaticEnumerator(
|
||||
new SiteEntry("siteA", "http://siteA:8083"),
|
||||
new SiteEntry("siteB", "http://siteB:8083"));
|
||||
var client = new ScriptedPullClient();
|
||||
var repo = new RecordingRepo();
|
||||
|
||||
CreateActor(sites, client, repo, FastTickOptions());
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
Assert.Contains(client.Calls, c => c.SiteId == "siteA");
|
||||
Assert.Contains(client.Calls, c => c.SiteId == "siteB");
|
||||
},
|
||||
duration: TimeSpan.FromSeconds(3),
|
||||
interval: TimeSpan.FromMilliseconds(50));
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 3. Tick_IngestEvents_ViaInsertIfNotExistsAsync
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Tick_IngestEvents_ViaInsertIfNotExistsAsync()
|
||||
{
|
||||
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
||||
var e1 = NewEvent("siteA");
|
||||
var e2 = NewEvent("siteA");
|
||||
var client = new ScriptedPullClient().Script("siteA",
|
||||
new PullAuditEventsResponse(new[] { e1, e2 }, MoreAvailable: false));
|
||||
var repo = new RecordingRepo();
|
||||
|
||||
CreateActor(sites, client, repo, FastTickOptions());
|
||||
|
||||
AwaitAssert(() => Assert.Equal(2, repo.InsertCallCount),
|
||||
duration: TimeSpan.FromSeconds(3),
|
||||
interval: TimeSpan.FromMilliseconds(50));
|
||||
Assert.Contains(repo.Inserted, e => e.EventId == e1.EventId);
|
||||
Assert.Contains(repo.Inserted, e => e.EventId == e2.EventId);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 4. Tick_Duplicates_NotDoubleInserted (real MSSQL idempotency)
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
private ScadaLinkDbContext CreateContext() =>
|
||||
new(new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
||||
.UseSqlServer(_fixture.ConnectionString).Options);
|
||||
|
||||
[SkippableFact]
|
||||
public async Task Tick_Duplicates_NotDoubleInserted()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = "bundle-b-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||
var pre = NewEvent(siteId);
|
||||
|
||||
// Seed the row directly so the actor sees it already present when the
|
||||
// pull returns it.
|
||||
await using (var seedContext = CreateContext())
|
||||
{
|
||||
await new AuditLogRepository(seedContext).InsertIfNotExistsAsync(pre);
|
||||
}
|
||||
|
||||
// Stack one new and the pre-existing row in the pull response. The
|
||||
// second-pull script returns empty so the actor settles.
|
||||
var fresh = NewEvent(siteId);
|
||||
var sites = new StaticEnumerator(new SiteEntry(siteId, "http://x:8083"));
|
||||
var client = new ScriptedPullClient().Script(siteId,
|
||||
new PullAuditEventsResponse(new[] { pre, fresh }, MoreAvailable: false));
|
||||
|
||||
await using var context = CreateContext();
|
||||
var repo = new AuditLogRepository(context);
|
||||
|
||||
CreateActor(sites, client, repo, FastTickOptions());
|
||||
|
||||
// Wait for the actor to ingest both rows.
|
||||
await Task.Delay(TimeSpan.FromSeconds(1));
|
||||
AwaitAssert(() => Assert.True(client.Calls.Count >= 1),
|
||||
duration: TimeSpan.FromSeconds(3));
|
||||
|
||||
// Even though the pull returned 2 events, only 1 fresh row should
|
||||
// exist in MSSQL alongside the pre-existing one — InsertIfNotExistsAsync
|
||||
// is first-write-wins on EventId.
|
||||
await using var read = CreateContext();
|
||||
var rows = await read.Set<AuditEvent>()
|
||||
.Where(e => e.SourceSiteId == siteId)
|
||||
.ToListAsync();
|
||||
Assert.Equal(2, rows.Count);
|
||||
Assert.Contains(rows, r => r.EventId == pre.EventId);
|
||||
Assert.Contains(rows, r => r.EventId == fresh.EventId);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 5. Cursor_Advances_ToMaxOccurredAtUtc
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Cursor_Advances_ToMaxOccurredAtUtc()
|
||||
{
|
||||
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
||||
|
||||
var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
|
||||
var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc);
|
||||
var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc);
|
||||
var e1 = NewEvent("siteA", t1);
|
||||
var e2 = NewEvent("siteA", t2);
|
||||
var e3 = NewEvent("siteA", t3);
|
||||
|
||||
// First pull returns three events with t1, t2, t3. Subsequent pulls
|
||||
// return empty — but the test asserts the SECOND pull's since argument
|
||||
// is t3 (the max OccurredAtUtc from the first pull).
|
||||
var client = new ScriptedPullClient().Script("siteA",
|
||||
new PullAuditEventsResponse(new[] { e1, e2, e3 }, MoreAvailable: false));
|
||||
var repo = new RecordingRepo();
|
||||
|
||||
CreateActor(sites, client, repo, FastTickOptions());
|
||||
|
||||
// Wait until we have at least two pulls — the second one must use t3
|
||||
// as its `since` argument because that was the max OccurredAtUtc in
|
||||
// the first response.
|
||||
AwaitAssert(() => Assert.True(client.Calls.Count >= 2,
|
||||
$"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"),
|
||||
duration: TimeSpan.FromSeconds(5),
|
||||
interval: TimeSpan.FromMilliseconds(50));
|
||||
|
||||
Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc);
|
||||
Assert.Equal(t3, client.Calls[1].SinceUtc);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 6. Tick_OneSiteThrows_OtherSitesStillProcessed
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Tick_OneSiteThrows_OtherSitesStillProcessed()
|
||||
{
|
||||
var sites = new StaticEnumerator(
|
||||
new SiteEntry("siteA", "http://siteA:8083"),
|
||||
new SiteEntry("siteB", "http://siteB:8083"));
|
||||
|
||||
var bEvent = NewEvent("siteB");
|
||||
var client = new ScriptedPullClient()
|
||||
.ThrowFor("siteA", new InvalidOperationException("simulated transport failure"))
|
||||
.Script("siteB",
|
||||
new PullAuditEventsResponse(new[] { bEvent }, MoreAvailable: false));
|
||||
var repo = new RecordingRepo();
|
||||
|
||||
CreateActor(sites, client, repo, FastTickOptions());
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
Assert.Contains(client.Calls, c => c.SiteId == "siteA");
|
||||
Assert.Contains(repo.Inserted, e => e.EventId == bEvent.EventId);
|
||||
},
|
||||
duration: TimeSpan.FromSeconds(3),
|
||||
interval: TimeSpan.FromMilliseconds(50));
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 7. StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue()
|
||||
{
|
||||
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
||||
|
||||
// Two scripted responses that each return events AND MoreAvailable=true
|
||||
// — the second pull triggers the stalled transition.
|
||||
var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
||||
var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
||||
var client = new ScriptedPullClient().Script("siteA",
|
||||
new PullAuditEventsResponse(batch1, MoreAvailable: true),
|
||||
new PullAuditEventsResponse(batch2, MoreAvailable: true));
|
||||
|
||||
var repo = new RecordingRepo();
|
||||
var (probe, _) = SubscribeStalled();
|
||||
|
||||
CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2));
|
||||
|
||||
// Expect Stalled=true after the second non-draining tick. The probe
|
||||
// waits with its own timeout (a few seconds gives the 0 s repeat
|
||||
// interval ample slack).
|
||||
var msg = probe.ExpectMsg<SiteAuditTelemetryStalledChanged>(TimeSpan.FromSeconds(5));
|
||||
Assert.Equal("siteA", msg.SiteId);
|
||||
Assert.True(msg.Stalled);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 8. StalledDetection_DrainingCycle_PublishesStalledFalse
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void StalledDetection_DrainingCycle_PublishesStalledFalse()
|
||||
{
|
||||
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
||||
|
||||
// Two non-draining responses get the actor into Stalled=true, then a
|
||||
// draining response (events but MoreAvailable=false) flips it back.
|
||||
var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
||||
var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
||||
var batch3 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
||||
var client = new ScriptedPullClient().Script("siteA",
|
||||
new PullAuditEventsResponse(batch1, MoreAvailable: true),
|
||||
new PullAuditEventsResponse(batch2, MoreAvailable: true),
|
||||
new PullAuditEventsResponse(batch3, MoreAvailable: false));
|
||||
|
||||
var repo = new RecordingRepo();
|
||||
var (probe, _) = SubscribeStalled();
|
||||
|
||||
CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2));
|
||||
|
||||
// First publication is the stalled=true transition; second is the
|
||||
// back-to-draining flip. The actor publishes ONLY on transitions so we
|
||||
// expect exactly these two messages in order.
|
||||
var first = probe.ExpectMsg<SiteAuditTelemetryStalledChanged>(TimeSpan.FromSeconds(5));
|
||||
Assert.True(first.Stalled);
|
||||
|
||||
var second = probe.ExpectMsg<SiteAuditTelemetryStalledChanged>(TimeSpan.FromSeconds(5));
|
||||
Assert.False(second.Stalled);
|
||||
Assert.Equal("siteA", second.SiteId);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user