diff --git a/src/ScadaLink.AuditLog/Central/AuditLogPurgeActor.cs b/src/ScadaLink.AuditLog/Central/AuditLogPurgeActor.cs
new file mode 100644
index 0000000..153e238
--- /dev/null
+++ b/src/ScadaLink.AuditLog/Central/AuditLogPurgeActor.cs
@@ -0,0 +1,214 @@
+using System.Diagnostics;
+using Akka.Actor;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using ScadaLink.AuditLog.Configuration;
+using ScadaLink.Commons.Interfaces.Repositories;
+
+namespace ScadaLink.AuditLog.Central;
+
+///
+/// Central singleton (M6 Bundle C) that drives the daily AuditLog partition
+/// purge. On a configurable timer (default 24 hours) the actor:
+///
+/// - Queries
+/// for monthly boundaries whose latest OccurredAtUtc is older
+/// than DateTime.UtcNow - RetentionDays.
+/// - For each eligible boundary, calls
+/// which runs
+/// the drop-and-rebuild dance around UX_AuditLog_EventId.
+/// - Publishes on the actor-system
+/// EventStream so the Bundle E central health collector + ops surfaces
+/// can subscribe without coupling to this actor.
+///
+///
+///
+///
+/// Daily cadence. Partition switch is metadata-only but the
+/// drop-and-rebuild dance briefly removes UX_AuditLog_EventId; running
+/// more often than necessary trades unique-index rebuild outages for
+/// negligible freshness wins. The default 24-hour interval matches
+/// alog.md §10's retention policy.
+///
+///
+/// Continue-on-error. A single boundary that throws (transient SQL
+/// failure, contention with backup, missing object) must NOT prevent the
+/// other eligible boundaries from being purged on the same tick. Per-boundary
+/// work runs inside its own try/catch; the actor's
+/// uses Resume so any leaked exception keeps
+/// the singleton alive for the next tick.
+///
+///
+/// DI scopes. is a scoped EF Core
+/// service registered by AddConfigurationDatabase. The singleton
+/// opens one DI scope per tick and reuses the same repository across every
+/// boundary in that tick — mirrors the
+/// pattern.
+///
+///
+/// EventStream. Publishing through
+/// the EventStream rather than direct messaging avoids coupling this actor
+/// to its consumers; M6 Bundle E will subscribe a central health-counter
+/// bridge that surfaces purge progress on the central health report.
+///
+///
+public class AuditLogPurgeActor : ReceiveActor
+{
+ private readonly IServiceProvider _services;
+ private readonly AuditLogPurgeOptions _purgeOptions;
+ private readonly AuditLogOptions _auditOptions;
+ private readonly ILogger _logger;
+ private ICancelable? _timer;
+
+ public AuditLogPurgeActor(
+ IServiceProvider services,
+ IOptions purgeOptions,
+ IOptions auditOptions,
+ ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(purgeOptions);
+ ArgumentNullException.ThrowIfNull(auditOptions);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ _services = services;
+ _purgeOptions = purgeOptions.Value;
+ _auditOptions = auditOptions.Value;
+ _logger = logger;
+
+ ReceiveAsync(_ => OnTickAsync());
+ }
+
+ protected override void PreStart()
+ {
+ base.PreStart();
+ var interval = _purgeOptions.Interval;
+ _timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
+ initialDelay: interval,
+ interval: interval,
+ receiver: Self,
+ message: PurgeTick.Instance,
+ sender: Self);
+ }
+
+ protected override void PostStop()
+ {
+ _timer?.Cancel();
+ base.PostStop();
+ }
+
+ ///
+ /// Resume keeps the singleton alive across any leaked exception. Restart
+ /// would re-run PreStart and reschedule the timer (harmless but wasteful);
+ /// Stop is wrong because the singleton must keep ticking until shutdown.
+ ///
+ protected override SupervisorStrategy SupervisorStrategy()
+ {
+ return new OneForOneStrategy(
+ maxNrOfRetries: 0,
+ withinTimeRange: TimeSpan.Zero,
+ decider: Akka.Actor.SupervisorStrategy.DefaultDecider);
+ }
+
+ private async Task OnTickAsync()
+ {
+ // Capture EventStream BEFORE the first await. Accessing Context (and
+ // therefore Context.System) after an await is unsafe because Akka's
+ // ActorBase.Context throws "no active ActorContext" once the
+ // continuation runs on a thread that isn't currently dispatching this
+ // actor — mirrors the same Sender-capture pattern in
+ // AuditLogIngestActor.OnIngestAsync.
+ var eventStream = Context.System.EventStream;
+
+ // Compute the retention threshold from AuditLogOptions.RetentionDays
+ // each tick — the options class supports hot reload via
+ // IOptionsMonitor for the redaction policy and similar settings; we
+ // read the snapshot per-tick so an operator who lowers RetentionDays
+ // sees the change applied on the next purge without an actor
+ // restart.
+ var threshold = DateTime.UtcNow - TimeSpan.FromDays(_auditOptions.RetentionDays);
+
+ IServiceScope? scope = null;
+ IAuditLogRepository repository;
+ try
+ {
+ scope = _services.CreateScope();
+ repository = scope.ServiceProvider.GetRequiredService();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to resolve IAuditLogRepository for AuditLog purge tick.");
+ scope?.Dispose();
+ return;
+ }
+
+ try
+ {
+ IReadOnlyList boundaries;
+ try
+ {
+ boundaries = await repository
+ .GetPartitionBoundariesOlderThanAsync(threshold)
+ .ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(
+ ex,
+ "Failed to enumerate eligible AuditLog partition boundaries (threshold {ThresholdUtc:o}); skipping purge tick.",
+ threshold);
+ return;
+ }
+
+ if (boundaries.Count == 0)
+ {
+ return;
+ }
+
+ foreach (var boundary in boundaries)
+ {
+ // Per-boundary try/catch: one bad partition (transient SQL
+ // failure, missing object, contention with backup) does NOT
+ // abandon the rest of the tick.
+ var sw = Stopwatch.StartNew();
+ try
+ {
+ var rowsDeleted = await repository
+ .SwitchOutPartitionAsync(boundary)
+ .ConfigureAwait(false);
+ sw.Stop();
+
+ eventStream.Publish(
+ new AuditLogPurgedEvent(boundary, rowsDeleted, sw.ElapsedMilliseconds));
+
+ _logger.LogInformation(
+ "Purged AuditLog partition {MonthBoundary:yyyy-MM-dd}; {RowsDeleted} rows in {DurationMs} ms.",
+ boundary,
+ rowsDeleted,
+ sw.ElapsedMilliseconds);
+ }
+ catch (Exception ex)
+ {
+ sw.Stop();
+ _logger.LogError(
+ ex,
+ "Failed to purge AuditLog partition {MonthBoundary:yyyy-MM-dd}; other partitions continue. Elapsed {DurationMs} ms.",
+ boundary,
+ sw.ElapsedMilliseconds);
+ }
+ }
+ }
+ finally
+ {
+ scope.Dispose();
+ }
+ }
+
+ /// Self-tick triggering a purge pass across all eligible partitions.
+ internal sealed class PurgeTick
+ {
+ public static readonly PurgeTick Instance = new();
+ private PurgeTick() { }
+ }
+}
diff --git a/src/ScadaLink.AuditLog/Central/AuditLogPurgeOptions.cs b/src/ScadaLink.AuditLog/Central/AuditLogPurgeOptions.cs
new file mode 100644
index 0000000..5f9d824
--- /dev/null
+++ b/src/ScadaLink.AuditLog/Central/AuditLogPurgeOptions.cs
@@ -0,0 +1,43 @@
+namespace ScadaLink.AuditLog.Central;
+
+///
+/// Tuning knobs for the central singleton.
+/// Default cadence is 24 hours per the M6 plan; the retention window itself
+/// is sourced from
+/// (default 365) so operators tune retention from a single section.
+///
+///
+///
+/// The purge actor is a daily-cadence singleton, not a hot-loop, because
+/// partition-switch I/O is metadata-only but the drop-and-rebuild dance
+/// briefly removes the UX_AuditLog_EventId unique index — running
+/// more often than necessary trades index-rebuild outages for marginal
+/// freshness gains. Lower this only when an operator can prove they need
+/// sub-daily purge granularity.
+///
+///
+/// exists for tests to drop the cadence to
+/// milliseconds without polluting the production config surface; production
+/// binds only.
+///
+///
+public sealed class AuditLogPurgeOptions
+{
+ /// Period of the purge tick in hours (default 24).
+ public int IntervalHours { get; set; } = 24;
+
+ ///
+ /// Test-only override for finer control over the tick cadence than
+ /// whole-hour resolution allows. When non-null, takes precedence over
+ /// . Not bound from config — production
+ /// config exposes only.
+ ///
+ public TimeSpan? IntervalOverride { get; set; }
+
+ ///
+ /// Resolves the effective tick interval, honouring the test override
+ /// when set. Falls back to .
+ ///
+ public TimeSpan Interval =>
+ IntervalOverride ?? TimeSpan.FromHours(IntervalHours);
+}
diff --git a/src/ScadaLink.AuditLog/Central/AuditLogPurgedEvent.cs b/src/ScadaLink.AuditLog/Central/AuditLogPurgedEvent.cs
new file mode 100644
index 0000000..78d4987
--- /dev/null
+++ b/src/ScadaLink.AuditLog/Central/AuditLogPurgedEvent.cs
@@ -0,0 +1,29 @@
+namespace ScadaLink.AuditLog.Central;
+
+///
+/// Published on the actor-system EventStream by
+/// after each successful partition switch-out. Downstream consumers (Bundle E
+/// central health collector, ops dashboards, audit trails) subscribe so a
+/// purge action is observable without the actor needing to know about any
+/// specific subscriber.
+///
+///
+/// The pf_AuditLog_Month lower-bound boundary that was switched out — i.e.
+/// the first instant of the purged month in UTC.
+///
+///
+/// Approximate row count purged from the partition, sampled BEFORE the
+/// switch. Exact accounting would require a post-switch scan of the staging
+/// table, which the dance drops immediately, so this is the closest
+/// observable proxy. Zero is a valid value when the actor's enumerator
+/// included a partition the operator subsequently emptied by hand.
+///
+///
+/// Wall-clock time spent inside SwitchOutPartitionAsync for this
+/// boundary, in milliseconds. Useful for spotting the rare slow purge
+/// without spinning up dedicated telemetry.
+///
+public sealed record AuditLogPurgedEvent(
+ DateTime MonthBoundary,
+ long RowsDeleted,
+ long DurationMs);
diff --git a/src/ScadaLink.Commons/Interfaces/Repositories/IAuditLogRepository.cs b/src/ScadaLink.Commons/Interfaces/Repositories/IAuditLogRepository.cs
index 9932c5c..bcda482 100644
--- a/src/ScadaLink.Commons/Interfaces/Repositories/IAuditLogRepository.cs
+++ b/src/ScadaLink.Commons/Interfaces/Repositories/IAuditLogRepository.cs
@@ -45,7 +45,10 @@ public interface IAuditLogRepository
///
/// Switches out (purges) the monthly partition whose lower bound is
- /// .
+ /// and returns the approximate number
+ /// of rows discarded — sampled inside the transaction BEFORE the switch
+ /// so the row count reflects what the switch removed, not a post-purge
+ /// scan of a table that no longer exists.
///
///
///
@@ -71,7 +74,7 @@ public interface IAuditLogRepository
/// and the composite PK still rejects same-(EventId, OccurredAtUtc) rows.
///
///
- Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default);
+ Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default);
///
/// Returns the set of pf_AuditLog_Month partition lower-bound
diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/AuditLogRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/AuditLogRepository.cs
index 9dc2f41..d2d74ac 100644
--- a/src/ScadaLink.ConfigurationDatabase/Repositories/AuditLogRepository.cs
+++ b/src/ScadaLink.ConfigurationDatabase/Repositories/AuditLogRepository.cs
@@ -202,7 +202,7 @@ VALUES
/// index.
///
///
- public async Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default)
+ public async Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default)
{
// GUID-suffixed staging name: prevents collision with any concurrent
// purge attempt and avoids polluting the AuditLog object namespace with
@@ -214,6 +214,17 @@ VALUES
// settings.
var monthBoundaryStr = monthBoundary.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss");
+ // Two-statement batch: the first SELECT samples the per-partition row
+ // count BEFORE the dance so we can report it back to the purge actor;
+ // the second batch performs the drop-and-rebuild. We use OUTPUT-style
+ // variables wired through @@ROWCOUNT after the SWITCH is not viable
+ // because SWITCH is a metadata-only operation that doesn't move rows in
+ // a way @@ROWCOUNT can observe.
+ var sampleSql = $@"
+ SELECT COUNT_BIG(*) FROM dbo.AuditLog
+ WHERE $PARTITION.pf_AuditLog_Month(OccurredAtUtc) =
+ $partition.pf_AuditLog_Month('{monthBoundaryStr}');";
+
var sql = $@"
BEGIN TRY
BEGIN TRANSACTION;
@@ -292,7 +303,43 @@ VALUES
THROW;
END CATCH;";
+ // Sample the row count before the switch. The sample is best-effort
+ // (no transaction wrapping the sample-then-switch pair) because the
+ // central singleton is the only writer to this RPC and a daily-purge
+ // tick doesn't compete with concurrent SwitchOut callers. A
+ // concurrent INSERT racing the sample under-reports by at most a
+ // few rows, which is acceptable for an "approximate" purged-row
+ // count surfaced via AuditLogPurgedEvent.
+ long rowsDeleted = 0;
+ var conn = _context.Database.GetDbConnection();
+ var openedHere = false;
+ if (conn.State != System.Data.ConnectionState.Open)
+ {
+ await conn.OpenAsync(ct).ConfigureAwait(false);
+ openedHere = true;
+ }
+ try
+ {
+ await using (var sampleCmd = conn.CreateCommand())
+ {
+ sampleCmd.CommandText = sampleSql;
+ var sampleResult = await sampleCmd.ExecuteScalarAsync(ct).ConfigureAwait(false);
+ if (sampleResult is not null && sampleResult is not DBNull)
+ {
+ rowsDeleted = Convert.ToInt64(sampleResult);
+ }
+ }
+ }
+ finally
+ {
+ if (openedHere)
+ {
+ await conn.CloseAsync().ConfigureAwait(false);
+ }
+ }
+
await _context.Database.ExecuteSqlRawAsync(sql, ct);
+ return rowsDeleted;
}
///
diff --git a/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs
index 203fb6d..724ae68 100644
--- a/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs
+++ b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs
@@ -214,7 +214,7 @@ public class AuditLogIngestActorTests : TestKit, IClassFixture
_inner.QueryAsync(filter, paging, ct);
- public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
+ public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
_inner.SwitchOutPartitionAsync(monthBoundary, ct);
public Task> GetPartitionBoundariesOlderThanAsync(
diff --git a/tests/ScadaLink.AuditLog.Tests/Central/AuditLogPurgeActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogPurgeActorTests.cs
new file mode 100644
index 0000000..afa20bf
--- /dev/null
+++ b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogPurgeActorTests.cs
@@ -0,0 +1,376 @@
+using Akka.Actor;
+using Akka.TestKit.Xunit2;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
+using ScadaLink.AuditLog.Central;
+using ScadaLink.AuditLog.Configuration;
+using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces.Repositories;
+using ScadaLink.Commons.Types.Audit;
+using ScadaLink.Commons.Types.Enums;
+using ScadaLink.ConfigurationDatabase;
+using ScadaLink.ConfigurationDatabase.Repositories;
+using ScadaLink.ConfigurationDatabase.Tests.Migrations;
+
+namespace ScadaLink.AuditLog.Tests.Central;
+
+///
+/// Bundle C (#23 M6-T4) tests for . The fast,
+/// schedule-only tests substitute a recording stub for
+/// so the timer + per-boundary error-isolation
+/// + event-publish machinery can be exercised without an MSSQL container.
+/// The end-to-end "real partition gets switched out" assertion lives in the
+/// repository tests (Bundle C of M6-T4); this actor file is purely about the
+/// actor's policy decisions.
+///
+public class AuditLogPurgeActorTests : TestKit, IClassFixture
+{
+ private readonly MsSqlMigrationFixture _fixture;
+
+ public AuditLogPurgeActorTests(MsSqlMigrationFixture fixture)
+ {
+ _fixture = fixture;
+ }
+
+ ///
+ /// In-memory recording stub. Captures every
+ /// + every
+ /// so tests can assert which boundaries
+ /// the actor chose to purge and how many ticks it issued. Also lets a
+ /// specific boundary be configured to throw so the continue-on-error path
+ /// is exercisable.
+ ///
+ private sealed class RecordingRepo : IAuditLogRepository
+ {
+ public List ThresholdQueries { get; } = new();
+ public List SwitchedBoundaries { get; } = new();
+ public Func RowsPerBoundary { get; set; } = _ => 0L;
+ public DateTime? ThrowOnBoundary { get; set; }
+ public Exception? BoundaryException { get; set; }
+
+ // The actor enumerator returns whichever list is configured here.
+ // Mutating this between ticks lets tests simulate "no longer
+ // eligible" boundaries on the second tick.
+ public List Boundaries { get; set; } = new();
+
+ public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) =>
+ Task.CompletedTask;
+
+ public Task> QueryAsync(
+ AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) =>
+ Task.FromResult>(Array.Empty());
+
+ public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default)
+ {
+ if (ThrowOnBoundary.HasValue && monthBoundary == ThrowOnBoundary.Value)
+ {
+ throw BoundaryException ?? new InvalidOperationException("simulated switch failure");
+ }
+ SwitchedBoundaries.Add(monthBoundary);
+ return Task.FromResult(RowsPerBoundary(monthBoundary));
+ }
+
+ public Task> GetPartitionBoundariesOlderThanAsync(
+ DateTime threshold, CancellationToken ct = default)
+ {
+ ThresholdQueries.Add(threshold);
+ return Task.FromResult>(Boundaries.ToArray());
+ }
+ }
+
+ private IServiceProvider BuildScopedProvider(IAuditLogRepository repo)
+ {
+ var services = new ServiceCollection();
+ // Mirror AddConfigurationDatabase: IAuditLogRepository is scoped, so
+ // the actor opens a fresh scope per tick and resolves there.
+ services.AddScoped(_ => repo);
+ return services.BuildServiceProvider();
+ }
+
+ private IActorRef CreateActor(
+ IAuditLogRepository repo,
+ AuditLogPurgeOptions purgeOptions,
+ AuditLogOptions? auditOptions = null)
+ {
+ var sp = BuildScopedProvider(repo);
+ return Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
+ sp,
+ Options.Create(purgeOptions),
+ Options.Create(auditOptions ?? new AuditLogOptions()),
+ NullLogger.Instance)));
+ }
+
+ private static AuditLogPurgeOptions FastTickOptions(TimeSpan? interval = null) => new()
+ {
+ IntervalHours = 24,
+ IntervalOverride = interval ?? TimeSpan.FromMilliseconds(100),
+ };
+
+ ///
+ /// Subscribe a probe to the EventStream so the test can observe
+ /// publications synchronously.
+ ///
+ private Akka.TestKit.TestProbe SubscribePurged()
+ {
+ var probe = CreateTestProbe();
+ Sys.EventStream.Subscribe(probe.Ref, typeof(AuditLogPurgedEvent));
+ return probe;
+ }
+
+ // ---------------------------------------------------------------------
+ // 1. Tick_Fires_OnDailyInterval
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Tick_Fires_OnDailyInterval()
+ {
+ var repo = new RecordingRepo();
+ CreateActor(repo, FastTickOptions());
+
+ // The first scheduled tick fires after the configured interval. We
+ // assert the visible side effect (the enumerator was called) rather
+ // than racing on internal state.
+ AwaitAssert(
+ () => Assert.True(repo.ThresholdQueries.Count >= 1,
+ $"expected >= 1 enumerator call, got {repo.ThresholdQueries.Count}"),
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+ }
+
+ // ---------------------------------------------------------------------
+ // 2. Tick_OldPartitions_SwitchedOut
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Tick_OldPartitions_SwitchedOut()
+ {
+ var repo = new RecordingRepo
+ {
+ Boundaries = new List
+ {
+ new(2025, 11, 1, 0, 0, 0, DateTimeKind.Utc),
+ new(2025, 12, 1, 0, 0, 0, DateTimeKind.Utc),
+ },
+ RowsPerBoundary = _ => 42L,
+ };
+
+ CreateActor(repo, FastTickOptions());
+
+ AwaitAssert(
+ () =>
+ {
+ Assert.Contains(new DateTime(2025, 11, 1, 0, 0, 0, DateTimeKind.Utc), repo.SwitchedBoundaries);
+ Assert.Contains(new DateTime(2025, 12, 1, 0, 0, 0, DateTimeKind.Utc), repo.SwitchedBoundaries);
+ },
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+ }
+
+ // ---------------------------------------------------------------------
+ // 3. Tick_NewerPartitions_Untouched
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Tick_NewerPartitions_Untouched()
+ {
+ // The actor's contract: it only touches whatever the enumerator
+ // returns. The enumerator (in production) filters out non-eligible
+ // boundaries; here we simulate that by handing back an empty list
+ // and asserting the actor switched nothing despite the tick firing.
+ var repo = new RecordingRepo { Boundaries = new List() };
+
+ CreateActor(repo, FastTickOptions());
+
+ // Wait for at least one tick (visible via the enumerator call) then
+ // assert no switch happened.
+ AwaitAssert(
+ () => Assert.True(repo.ThresholdQueries.Count >= 1),
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+
+ Assert.Empty(repo.SwitchedBoundaries);
+ }
+
+ // ---------------------------------------------------------------------
+ // 4. Tick_PublishesPurgedEvent_WithRowCount
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Tick_PublishesPurgedEvent_WithRowCount()
+ {
+ var boundary = new DateTime(2025, 6, 1, 0, 0, 0, DateTimeKind.Utc);
+ var repo = new RecordingRepo
+ {
+ Boundaries = new List { boundary },
+ RowsPerBoundary = _ => 1234L,
+ };
+
+ var probe = SubscribePurged();
+ CreateActor(repo, FastTickOptions());
+
+ var msg = probe.ExpectMsg(TimeSpan.FromSeconds(5));
+ Assert.Equal(boundary, msg.MonthBoundary);
+ Assert.Equal(1234L, msg.RowsDeleted);
+ Assert.True(msg.DurationMs >= 0,
+ $"DurationMs should be non-negative; was {msg.DurationMs}");
+ }
+
+ // ---------------------------------------------------------------------
+ // 5. Tick_SwitchThrows_OtherPartitionsStillProcessed (continue-on-error)
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Tick_SwitchThrows_OtherPartitionsStillProcessed()
+ {
+ var poisonBoundary = new DateTime(2025, 7, 1, 0, 0, 0, DateTimeKind.Utc);
+ var goodBoundary = new DateTime(2025, 8, 1, 0, 0, 0, DateTimeKind.Utc);
+ var repo = new RecordingRepo
+ {
+ Boundaries = new List { poisonBoundary, goodBoundary },
+ ThrowOnBoundary = poisonBoundary,
+ BoundaryException = new InvalidOperationException("simulated switch failure for poison boundary"),
+ };
+
+ CreateActor(repo, FastTickOptions());
+
+ AwaitAssert(
+ () =>
+ {
+ // The good boundary was still switched even though the poison
+ // boundary threw.
+ Assert.Contains(goodBoundary, repo.SwitchedBoundaries);
+ Assert.DoesNotContain(poisonBoundary, repo.SwitchedBoundaries);
+ },
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+ }
+
+ // ---------------------------------------------------------------------
+ // 6. EndToEnd_RealPartition_RowsRemoved_PurgedEventPublished
+ // ---------------------------------------------------------------------
+
+ [SkippableFact]
+ public async Task EndToEnd_RealPartition_RowsRemoved_PurgedEventPublished()
+ {
+ Skip.IfNot(_fixture.Available, _fixture.SkipReason);
+
+ // Today is ~2026-05-20 per the test environment. With RetentionDays =
+ // 60 the actor computes threshold ≈ 2026-03-21:
+ // * Jan partition (MAX = Jan 15) → older than threshold → PURGED
+ // * Apr partition (MAX = Apr 15) → newer than threshold → KEPT
+ var siteId = "purge-e2e-" + Guid.NewGuid().ToString("N").Substring(0, 8);
+ var janEvt = new AuditEvent
+ {
+ EventId = Guid.NewGuid(),
+ OccurredAtUtc = new DateTime(2026, 1, 15, 0, 0, 0, DateTimeKind.Utc),
+ Channel = AuditChannel.ApiOutbound,
+ Kind = AuditKind.ApiCall,
+ Status = AuditStatus.Delivered,
+ SourceSiteId = siteId,
+ };
+ var aprEvt = new AuditEvent
+ {
+ EventId = Guid.NewGuid(),
+ OccurredAtUtc = new DateTime(2026, 4, 15, 0, 0, 0, DateTimeKind.Utc),
+ Channel = AuditChannel.ApiOutbound,
+ Kind = AuditKind.ApiCall,
+ Status = AuditStatus.Delivered,
+ SourceSiteId = siteId,
+ };
+
+ await using (var seedContext = CreateMsSqlContext())
+ {
+ var seedRepo = new AuditLogRepository(seedContext);
+ await seedRepo.InsertIfNotExistsAsync(janEvt);
+ await seedRepo.InsertIfNotExistsAsync(aprEvt);
+ }
+
+ // Wire the actor's DI scope to the real repository against the
+ // fixture's MSSQL database. The actor opens a fresh scope per tick,
+ // so register the context as scoped (mirroring the production
+ // AddConfigurationDatabase wiring).
+ var services = new ServiceCollection();
+ services.AddDbContext(
+ opts => opts.UseSqlServer(_fixture.ConnectionString),
+ ServiceLifetime.Scoped);
+ services.AddScoped();
+ var sp = services.BuildServiceProvider();
+
+ var auditOptions = new AuditLogOptions { RetentionDays = 60 };
+ var purgeOptions = new AuditLogPurgeOptions
+ {
+ IntervalHours = 24,
+ IntervalOverride = TimeSpan.FromMilliseconds(100),
+ };
+
+ var probe = SubscribePurged();
+ Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
+ sp,
+ Options.Create(purgeOptions),
+ Options.Create(auditOptions),
+ NullLogger.Instance)));
+
+ // The probe receives one AuditLogPurgedEvent per partition the actor
+ // purges per tick — other test runs that share the fixture DB may
+ // also leave behind eligible partitions, but this test creates its
+ // own fixture DB so the Jan-2026 partition is the only eligible one.
+ // Use FishForMessage to filter just in case, with a generous timeout
+ // because the real drop-and-rebuild dance against MSSQL routinely
+ // takes a couple of seconds on a busy dev container.
+ var janBoundary = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ var matched = probe.FishForMessage(
+ isMessage: m => m.MonthBoundary == janBoundary,
+ max: TimeSpan.FromSeconds(30));
+
+ Assert.True(matched.RowsDeleted >= 1,
+ $"Expected RowsDeleted >= 1 for the Jan-2026 partition; got {matched.RowsDeleted}.");
+
+ // Settle: allow any in-flight tick to commit before reading.
+ await Task.Delay(TimeSpan.FromMilliseconds(500));
+ await using var verifyContext = CreateMsSqlContext();
+ var rows = await verifyContext.Set()
+ .Where(e => e.SourceSiteId == siteId)
+ .ToListAsync();
+
+ Assert.DoesNotContain(rows, r => r.EventId == janEvt.EventId);
+ Assert.Contains(rows, r => r.EventId == aprEvt.EventId);
+ }
+
+ private ScadaLinkDbContext CreateMsSqlContext() =>
+ new(new DbContextOptionsBuilder()
+ .UseSqlServer(_fixture.ConnectionString).Options);
+
+ // ---------------------------------------------------------------------
+ // 7. Threshold_UsesAuditLogOptionsRetentionDays
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void Threshold_UsesAuditLogOptionsRetentionDays()
+ {
+ // The actor computes the threshold from AuditLogOptions.RetentionDays;
+ // assert the enumerator received a threshold whose value is in the
+ // expected window (today - retentionDays) rather than DateTime.MinValue
+ // or some other accidental default. We use a non-default retention
+ // (30 days) so the assertion isn't satisfied by the 365 default.
+ var repo = new RecordingRepo();
+ CreateActor(
+ repo,
+ FastTickOptions(),
+ auditOptions: new AuditLogOptions { RetentionDays = 30 });
+
+ AwaitAssert(
+ () => Assert.True(repo.ThresholdQueries.Count >= 1),
+ duration: TimeSpan.FromSeconds(3),
+ interval: TimeSpan.FromMilliseconds(50));
+
+ var threshold = repo.ThresholdQueries[0];
+ var expected = DateTime.UtcNow - TimeSpan.FromDays(30);
+ // 1-minute slack covers test-thread scheduling jitter between the
+ // tick firing and the assertion running.
+ Assert.True(
+ Math.Abs((threshold - expected).TotalMinutes) < 1.0,
+ $"threshold {threshold:o} should be within 1 minute of {expected:o}");
+ }
+}
diff --git a/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs
index d1dad16..5cbcfe9 100644
--- a/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs
+++ b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditReconciliationActorTests.cs
@@ -87,8 +87,8 @@ public class SiteAuditReconciliationActorTests : TestKit, IClassFixture
Task.FromResult>(Inserted);
- public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
- Task.CompletedTask;
+ public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
+ Task.FromResult(0L);
public Task> GetPartitionBoundariesOlderThanAsync(
DateTime threshold, CancellationToken ct = default) =>