Files
ScadaBridge/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/AuditLogPurgeActorTests.cs
T
Joseph Doherty 639e331db1 test+docs(m5): M5.7 — de-date 2 EndToEnd purge tests (closes #52); document T3-T8 in Component-AuditLog/-CLI/README/CLAUDE
Tests: anchor SeedOccurredAt() to a fixed thresholdAnchor (2026-01-20) and compute
RetentionDays dynamically (UtcNow - anchor + 1d) so the threshold always sits near
Jan 20 2026, between the Jan-15 "old" seed (purged) and Apr-15/Jun-15 "kept" seeds.
Seed dates stay within the explicit pf_AuditLog_Month boundary range (Jan 2026 –
Dec 2027) — relative-from-now offsets landed before 2026-01-01 (the catch-all
partition, invisible to GetPartitionBoundariesOlderThanAsync). Both tests confirmed
passing; all 284 AuditLog tests green.

Docs:
- Component-AuditLog.md: per-channel retention overrides (T3, PerChannelRetentionDays
  + bounded DELETE + AuditLogPurge:ChannelPurgeBatchSize); ParentExecutionId tag-cascade
  now spans alarm-triggered + nested CallScript/CallShared + inbound→routed (T4, "no
  further spawn points deferred"); per-node stuck KPIs for Notification Outbox +
  Site Call Audit (T6); T7 structured response-capture increments (request headers in
  Extra.requestHeaders, AuditInboundCeilingHits counter, per-method SkipBodyCapture);
  T8 CLI audit tree; T1 hash-chain + T2 Parquet explicitly marked deferred to v1.x.
- Component-CLI.md + README.md: document audit tree --execution-id <guid> and
  audit backfill-source-node --sentinel/--before/--batch with exact options verified
  against AuditCommands.cs; update Interactions to list new endpoints.
- CLAUDE.md: update audit-log design-decision bullets for T3 per-channel retention,
  T4 tag-cascade complete, T6 per-node KPIs, T7 inbound capture increments, T8 tree
  command; clarify T1/T2 remain deferred to v1.x.
2026-06-16 22:26:09 -04:00

495 lines
21 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
using ZB.MOM.WW.Audit;
using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Entities;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase;
using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Repositories;
using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests.Migrations;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Central;
/// <summary>
/// Bundle C (#23 M6-T4) tests for <see cref="AuditLogPurgeActor"/>. The fast,
/// schedule-only tests substitute a recording stub for
/// <see cref="IAuditLogRepository"/> so the timer + per-boundary error-isolation
/// + event-publish machinery can be exercised without an MSSQL container.
/// The end-to-end "real partition gets switched out" assertion lives in the
/// repository tests (Bundle C of M6-T4); this actor file is purely about the
/// actor's policy decisions.
/// </summary>
public class AuditLogPurgeActorTests : TestKit, IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public AuditLogPurgeActorTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
/// <summary>
/// In-memory recording stub. Captures every
/// <see cref="GetPartitionBoundariesOlderThanAsync"/> + every
/// <see cref="SwitchOutPartitionAsync"/> so tests can assert which boundaries
/// the actor chose to purge and how many ticks it issued. Also lets a
/// specific boundary be configured to throw so the continue-on-error path
/// is exercisable.
/// </summary>
private sealed class RecordingRepo : IAuditLogRepository
{
public List<DateTime> ThresholdQueries { get; } = new();
public List<DateTime> SwitchedBoundaries { get; } = new();
public Func<DateTime, long> RowsPerBoundary { get; set; } = _ => 0L;
public DateTime? ThrowOnBoundary { get; set; }
public Exception? BoundaryException { get; set; }
// M5.5 (T3): records every per-channel purge call as
// (channel, threshold, batchSize) so tests can assert which channels the
// actor chose to purge and with what window.
public List<(string Channel, DateTime Threshold, int BatchSize)> ChannelPurges { get; } = new();
public Func<string, long> RowsPerChannel { get; set; } = _ => 0L;
// The actor enumerator returns whichever list is configured here.
// Mutating this between ticks lets tests simulate "no longer
// eligible" boundaries on the second tick.
public List<DateTime> Boundaries { get; set; } = new();
public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) =>
Task.CompletedTask;
public Task<IReadOnlyList<AuditEvent>> QueryAsync(
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<AuditEvent>>(Array.Empty<AuditEvent>());
public Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default)
{
if (ThrowOnBoundary.HasValue && monthBoundary == ThrowOnBoundary.Value)
{
throw BoundaryException ?? new InvalidOperationException("simulated switch failure");
}
SwitchedBoundaries.Add(monthBoundary);
return Task.FromResult(RowsPerBoundary(monthBoundary));
}
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
DateTime threshold, CancellationToken ct = default)
{
ThresholdQueries.Add(threshold);
return Task.FromResult<IReadOnlyList<DateTime>>(Boundaries.ToArray());
}
public Task<long> PurgeChannelOlderThanAsync(
string channel, DateTime threshold, int batchSize, CancellationToken ct = default)
{
ChannelPurges.Add((channel, threshold, batchSize));
return Task.FromResult(RowsPerChannel(channel));
}
public Task<long> BackfillSourceNodeAsync(
string sentinel, DateTime before, int batchSize, CancellationToken ct = default) =>
Task.FromResult(0L);
public Task<ZB.MOM.WW.ScadaBridge.Commons.Types.AuditLogKpiSnapshot> GetKpiSnapshotAsync(
TimeSpan window, DateTime? nowUtc = null, CancellationToken ct = default) =>
Task.FromResult(new ZB.MOM.WW.ScadaBridge.Commons.Types.AuditLogKpiSnapshot(0L, 0L, 0L, nowUtc ?? DateTime.UtcNow));
public Task<IReadOnlyList<ExecutionTreeNode>> GetExecutionTreeAsync(
Guid executionId, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<ExecutionTreeNode>>(Array.Empty<ExecutionTreeNode>());
public Task<IReadOnlyList<string>> GetDistinctSourceNodesAsync(CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<string>>(Array.Empty<string>());
}
private IServiceProvider BuildScopedProvider(IAuditLogRepository repo)
{
var services = new ServiceCollection();
// Mirror AddConfigurationDatabase: IAuditLogRepository is scoped, so
// the actor opens a fresh scope per tick and resolves there.
services.AddScoped(_ => repo);
return services.BuildServiceProvider();
}
private IActorRef CreateActor(
IAuditLogRepository repo,
AuditLogPurgeOptions purgeOptions,
AuditLogOptions? auditOptions = null)
{
var sp = BuildScopedProvider(repo);
return Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
sp,
Options.Create(purgeOptions),
Options.Create(auditOptions ?? new AuditLogOptions()),
NullLogger<AuditLogPurgeActor>.Instance)));
}
private static AuditLogPurgeOptions FastTickOptions(TimeSpan? interval = null) => new()
{
IntervalHours = 24,
IntervalOverride = interval ?? TimeSpan.FromMilliseconds(100),
};
/// <summary>
/// Subscribe a probe to the EventStream so the test can observe
/// <see cref="AuditLogPurgedEvent"/> publications synchronously.
/// </summary>
private Akka.TestKit.TestProbe SubscribePurged()
{
var probe = CreateTestProbe();
Sys.EventStream.Subscribe(probe.Ref, typeof(AuditLogPurgedEvent));
return probe;
}
// ---------------------------------------------------------------------
// 1. Tick_Fires_OnDailyInterval
// ---------------------------------------------------------------------
[Fact]
public void Tick_Fires_OnDailyInterval()
{
var repo = new RecordingRepo();
CreateActor(repo, FastTickOptions());
// The first scheduled tick fires after the configured interval. We
// assert the visible side effect (the enumerator was called) rather
// than racing on internal state.
AwaitAssert(
() => Assert.True(repo.ThresholdQueries.Count >= 1,
$"expected >= 1 enumerator call, got {repo.ThresholdQueries.Count}"),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
// ---------------------------------------------------------------------
// 2. Tick_OldPartitions_SwitchedOut
// ---------------------------------------------------------------------
[Fact]
public void Tick_OldPartitions_SwitchedOut()
{
var repo = new RecordingRepo
{
Boundaries = new List<DateTime>
{
new(2025, 11, 1, 0, 0, 0, DateTimeKind.Utc),
new(2025, 12, 1, 0, 0, 0, DateTimeKind.Utc),
},
RowsPerBoundary = _ => 42L,
};
CreateActor(repo, FastTickOptions());
AwaitAssert(
() =>
{
Assert.Contains(new DateTime(2025, 11, 1, 0, 0, 0, DateTimeKind.Utc), repo.SwitchedBoundaries);
Assert.Contains(new DateTime(2025, 12, 1, 0, 0, 0, DateTimeKind.Utc), repo.SwitchedBoundaries);
},
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
// ---------------------------------------------------------------------
// 3. Tick_NewerPartitions_Untouched
// ---------------------------------------------------------------------
[Fact]
public void Tick_NewerPartitions_Untouched()
{
// The actor's contract: it only touches whatever the enumerator
// returns. The enumerator (in production) filters out non-eligible
// boundaries; here we simulate that by handing back an empty list
// and asserting the actor switched nothing despite the tick firing.
var repo = new RecordingRepo { Boundaries = new List<DateTime>() };
CreateActor(repo, FastTickOptions());
// Wait for at least one tick (visible via the enumerator call) then
// assert no switch happened.
AwaitAssert(
() => Assert.True(repo.ThresholdQueries.Count >= 1),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
Assert.Empty(repo.SwitchedBoundaries);
}
// ---------------------------------------------------------------------
// 4. Tick_PublishesPurgedEvent_WithRowCount
// ---------------------------------------------------------------------
[Fact]
public void Tick_PublishesPurgedEvent_WithRowCount()
{
var boundary = new DateTime(2025, 6, 1, 0, 0, 0, DateTimeKind.Utc);
var repo = new RecordingRepo
{
Boundaries = new List<DateTime> { boundary },
RowsPerBoundary = _ => 1234L,
};
var probe = SubscribePurged();
CreateActor(repo, FastTickOptions());
var msg = probe.ExpectMsg<AuditLogPurgedEvent>(TimeSpan.FromSeconds(5));
Assert.Equal(boundary, msg.MonthBoundary);
Assert.Equal(1234L, msg.RowsDeleted);
Assert.True(msg.DurationMs >= 0,
$"DurationMs should be non-negative; was {msg.DurationMs}");
}
// ---------------------------------------------------------------------
// 5. Tick_SwitchThrows_OtherPartitionsStillProcessed (continue-on-error)
// ---------------------------------------------------------------------
[Fact]
public void Tick_SwitchThrows_OtherPartitionsStillProcessed()
{
var poisonBoundary = new DateTime(2025, 7, 1, 0, 0, 0, DateTimeKind.Utc);
var goodBoundary = new DateTime(2025, 8, 1, 0, 0, 0, DateTimeKind.Utc);
var repo = new RecordingRepo
{
Boundaries = new List<DateTime> { poisonBoundary, goodBoundary },
ThrowOnBoundary = poisonBoundary,
BoundaryException = new InvalidOperationException("simulated switch failure for poison boundary"),
};
CreateActor(repo, FastTickOptions());
AwaitAssert(
() =>
{
// The good boundary was still switched even though the poison
// boundary threw.
Assert.Contains(goodBoundary, repo.SwitchedBoundaries);
Assert.DoesNotContain(poisonBoundary, repo.SwitchedBoundaries);
},
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
// ---------------------------------------------------------------------
// 6. EndToEnd_RealPartition_RowsRemoved_PurgedEventPublished
// ---------------------------------------------------------------------
[SkippableFact]
public async Task EndToEnd_RealPartition_RowsRemoved_PurgedEventPublished()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// Seeds two rows within the defined pf_AuditLog_Month partition range (Jan 2026
// Dec 2027). RetentionDays is computed dynamically so the purge threshold always
// anchors near 2026-01-20, keeping the test date-independent:
// old row = Jan 15 2026 → Jan 15 < threshold ~Jan 20 → partition PURGED
// kept row = Apr 15 2026 → Apr 15 > threshold ~Jan 20 → partition KEPT
//
// Using a fixed thresholdAnchor rather than "N months ago" avoids the problem
// of relative seeds landing before 2026-01-01 (the catch-all partition that
// GetPartitionBoundariesOlderThanAsync never returns).
var thresholdAnchor = new DateTime(2026, 1, 20, 0, 0, 0, DateTimeKind.Utc);
var retentionDays = (int)(DateTime.UtcNow - thresholdAnchor).TotalDays + 1;
var oldOccurred = new DateTime(2026, 1, 15, 0, 0, 0, DateTimeKind.Utc);
var keptOccurred = new DateTime(2026, 4, 15, 0, 0, 0, DateTimeKind.Utc);
var siteId = "purge-e2e-" + Guid.NewGuid().ToString("N").Substring(0, 8);
var oldEvt = ScadaBridgeAuditEventFactory.Create(
eventId: Guid.NewGuid(),
occurredAtUtc: oldOccurred,
channel: AuditChannel.ApiOutbound,
kind: AuditKind.ApiCall,
status: AuditStatus.Delivered,
sourceSiteId: siteId);
var keptEvt = ScadaBridgeAuditEventFactory.Create(
eventId: Guid.NewGuid(),
occurredAtUtc: keptOccurred,
channel: AuditChannel.ApiOutbound,
kind: AuditKind.ApiCall,
status: AuditStatus.Delivered,
sourceSiteId: siteId);
await using (var seedContext = CreateMsSqlContext())
{
var seedRepo = new AuditLogRepository(seedContext);
await seedRepo.InsertIfNotExistsAsync(oldEvt);
await seedRepo.InsertIfNotExistsAsync(keptEvt);
}
// Wire the actor's DI scope to the real repository against the
// fixture's MSSQL database. The actor opens a fresh scope per tick,
// so register the context as scoped (mirroring the production
// AddConfigurationDatabase wiring).
var services = new ServiceCollection();
services.AddDbContext<ScadaBridgeDbContext>(
opts => opts.UseSqlServer(_fixture.ConnectionString),
ServiceLifetime.Scoped);
services.AddScoped<IAuditLogRepository, AuditLogRepository>();
var sp = services.BuildServiceProvider();
var auditOptions = new AuditLogOptions { RetentionDays = retentionDays };
var purgeOptions = new AuditLogPurgeOptions
{
IntervalHours = 24,
IntervalOverride = TimeSpan.FromMilliseconds(100),
};
var probe = SubscribePurged();
Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
sp,
Options.Create(purgeOptions),
Options.Create(auditOptions),
NullLogger<AuditLogPurgeActor>.Instance)));
// Fish for the Jan-2026 partition boundary — the only eligible one in this
// fixture DB. The generous timeout covers the real drop-and-rebuild dance
// against MSSQL which routinely takes a couple of seconds on a busy dev container.
var janBoundary = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
var matched = probe.FishForMessage<AuditLogPurgedEvent>(
isMessage: m => m.MonthBoundary == janBoundary,
max: TimeSpan.FromSeconds(30));
Assert.True(matched.RowsDeleted >= 1,
$"Expected RowsDeleted >= 1 for the Jan-2026 partition; got {matched.RowsDeleted}.");
// Settle: allow any in-flight tick to commit before reading.
await Task.Delay(TimeSpan.FromMilliseconds(500));
await using var verifyContext = CreateMsSqlContext();
var rows = await verifyContext.Set<AuditLogRow>()
.Where(e => e.SourceSiteId == siteId)
.ToListAsync();
Assert.DoesNotContain(rows, r => r.EventId == oldEvt.EventId);
Assert.Contains(rows, r => r.EventId == keptEvt.EventId);
}
private ScadaBridgeDbContext CreateMsSqlContext() =>
new(new DbContextOptionsBuilder<ScadaBridgeDbContext>()
.UseSqlServer(_fixture.ConnectionString).Options);
// ---------------------------------------------------------------------
// 7. Threshold_UsesAuditLogOptionsRetentionDays
// ---------------------------------------------------------------------
[Fact]
public void Threshold_UsesAuditLogOptionsRetentionDays()
{
// The actor computes the threshold from AuditLogOptions.RetentionDays;
// assert the enumerator received a threshold whose value is in the
// expected window (today - retentionDays) rather than DateTime.MinValue
// or some other accidental default. We use a non-default retention
// (30 days) so the assertion isn't satisfied by the 365 default.
var repo = new RecordingRepo();
CreateActor(
repo,
FastTickOptions(),
auditOptions: new AuditLogOptions { RetentionDays = 30 });
AwaitAssert(
() => Assert.True(repo.ThresholdQueries.Count >= 1),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
var threshold = repo.ThresholdQueries[0];
var expected = DateTime.UtcNow - TimeSpan.FromDays(30);
// 1-minute slack covers test-thread scheduling jitter between the
// tick firing and the assertion running.
Assert.True(
Math.Abs((threshold - expected).TotalMinutes) < 1.0,
$"threshold {threshold:o} should be within 1 minute of {expected:o}");
}
// ---------------------------------------------------------------------
// 8. PerChannelOverride_ShorterThanGlobal_TriggersChannelPurge (M5.5 T3)
// ---------------------------------------------------------------------
[Fact]
public void PerChannelOverride_ShorterThanGlobal_TriggersChannelPurge()
{
// ApiOutbound has a 30-day override under a 365-day global window — strictly
// shorter, so the actor must run a per-channel purge with a threshold of
// ~today-30d and the configured batch size.
var repo = new RecordingRepo { Boundaries = new List<DateTime>() };
var purgeOptions = FastTickOptions();
purgeOptions.ChannelPurgeBatchSizeConfigured = 1234;
// Build the options OUTSIDE the Props expression tree — a collection/dictionary
// initializer is not legal inside an expression-tree lambda (CS8074).
var auditOptions = Options.Create(new AuditLogOptions
{
RetentionDays = 365,
PerChannelRetentionDays = new Dictionary<string, int> { ["ApiOutbound"] = 30 },
});
var purgeOptionsWrapped = Options.Create(purgeOptions);
var sp = BuildScopedProvider(repo);
Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
sp,
purgeOptionsWrapped,
auditOptions,
NullLogger<AuditLogPurgeActor>.Instance)));
AwaitAssert(
() => Assert.Contains(repo.ChannelPurges, p => p.Channel == "ApiOutbound"),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
var purge = repo.ChannelPurges.First(p => p.Channel == "ApiOutbound");
Assert.Equal(1234, purge.BatchSize);
var expected = DateTime.UtcNow - TimeSpan.FromDays(30);
Assert.True(
Math.Abs((purge.Threshold - expected).TotalMinutes) < 1.0,
$"channel threshold {purge.Threshold:o} should be within 1 minute of {expected:o}");
}
// ---------------------------------------------------------------------
// 9. PerChannelOverride_EqualOrLongerThanGlobal_SkipsChannelPurge (M5.5 T3)
// ---------------------------------------------------------------------
[Fact]
public void PerChannelOverride_EqualOrLongerThanGlobal_SkipsChannelPurge()
{
// DbOutbound = 365 (== global) and Notification = 400 (> global, validator would
// normally reject this but the actor must defensively skip it too). Neither is
// SHORTER than the global window, so the actor must NOT issue a channel purge —
// the global partition switch-out already governs those rows.
var repo = new RecordingRepo { Boundaries = new List<DateTime>() };
// Build the options OUTSIDE the Props expression tree (CS8074).
var auditOptions = Options.Create(new AuditLogOptions
{
RetentionDays = 365,
PerChannelRetentionDays = new Dictionary<string, int>
{
["DbOutbound"] = 365,
["Notification"] = 400,
},
});
var purgeOptions = Options.Create(FastTickOptions());
var sp = BuildScopedProvider(repo);
Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
sp,
purgeOptions,
auditOptions,
NullLogger<AuditLogPurgeActor>.Instance)));
// Wait for at least one tick (visible via the enumerator call), then assert no
// channel purge was issued.
AwaitAssert(
() => Assert.True(repo.ThresholdQueries.Count >= 1),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
Assert.Empty(repo.ChannelPurges);
}
}