feat(audit): M5.5 per-channel retention overrides via purge-role bounded delete (T3)

This commit is contained in:
Joseph Doherty
2026-06-16 21:47:50 -04:00
parent 55630b48b6
commit 50b674accc
13 changed files with 583 additions and 3 deletions
@@ -216,6 +216,10 @@ public class AuditLogIngestActorTests : TestKit, IClassFixture<MsSqlMigrationFix
public Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
_inner.SwitchOutPartitionAsync(monthBoundary, ct);
public Task<long> PurgeChannelOlderThanAsync(
string channel, DateTime threshold, int batchSize, CancellationToken ct = default) =>
_inner.PurgeChannelOlderThanAsync(channel, threshold, batchSize, ct);
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
DateTime threshold, CancellationToken ct = default) =>
_inner.GetPartitionBoundariesOlderThanAsync(threshold, ct);
@@ -51,6 +51,12 @@ public class AuditLogPurgeActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
public DateTime? ThrowOnBoundary { get; set; }
public Exception? BoundaryException { get; set; }
// M5.5 (T3): records every per-channel purge call as
// (channel, threshold, batchSize) so tests can assert which channels the
// actor chose to purge and with what window.
public List<(string Channel, DateTime Threshold, int BatchSize)> ChannelPurges { get; } = new();
public Func<string, long> RowsPerChannel { get; set; } = _ => 0L;
// The actor enumerator returns whichever list is configured here.
// Mutating this between ticks lets tests simulate "no longer
// eligible" boundaries on the second tick.
@@ -80,6 +86,13 @@ public class AuditLogPurgeActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
return Task.FromResult<IReadOnlyList<DateTime>>(Boundaries.ToArray());
}
public Task<long> PurgeChannelOlderThanAsync(
string channel, DateTime threshold, int batchSize, CancellationToken ct = default)
{
ChannelPurges.Add((channel, threshold, batchSize));
return Task.FromResult(RowsPerChannel(channel));
}
public Task<ZB.MOM.WW.ScadaBridge.Commons.Types.AuditLogKpiSnapshot> GetKpiSnapshotAsync(
TimeSpan window, DateTime? nowUtc = null, CancellationToken ct = default) =>
Task.FromResult(new ZB.MOM.WW.ScadaBridge.Commons.Types.AuditLogKpiSnapshot(0L, 0L, 0L, nowUtc ?? DateTime.UtcNow));
@@ -381,4 +394,90 @@ public class AuditLogPurgeActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
Math.Abs((threshold - expected).TotalMinutes) < 1.0,
$"threshold {threshold:o} should be within 1 minute of {expected:o}");
}
// ---------------------------------------------------------------------
// 8. PerChannelOverride_ShorterThanGlobal_TriggersChannelPurge (M5.5 T3)
// ---------------------------------------------------------------------
[Fact]
public void PerChannelOverride_ShorterThanGlobal_TriggersChannelPurge()
{
// ApiOutbound has a 30-day override under a 365-day global window — strictly
// shorter, so the actor must run a per-channel purge with a threshold of
// ~today-30d and the configured batch size.
var repo = new RecordingRepo { Boundaries = new List<DateTime>() };
var purgeOptions = FastTickOptions();
purgeOptions.ChannelPurgeBatchSizeConfigured = 1234;
// Build the options OUTSIDE the Props expression tree — a collection/dictionary
// initializer is not legal inside an expression-tree lambda (CS8074).
var auditOptions = Options.Create(new AuditLogOptions
{
RetentionDays = 365,
PerChannelRetentionDays = new Dictionary<string, int> { ["ApiOutbound"] = 30 },
});
var purgeOptionsWrapped = Options.Create(purgeOptions);
var sp = BuildScopedProvider(repo);
Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
sp,
purgeOptionsWrapped,
auditOptions,
NullLogger<AuditLogPurgeActor>.Instance)));
AwaitAssert(
() => Assert.Contains(repo.ChannelPurges, p => p.Channel == "ApiOutbound"),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
var purge = repo.ChannelPurges.First(p => p.Channel == "ApiOutbound");
Assert.Equal(1234, purge.BatchSize);
var expected = DateTime.UtcNow - TimeSpan.FromDays(30);
Assert.True(
Math.Abs((purge.Threshold - expected).TotalMinutes) < 1.0,
$"channel threshold {purge.Threshold:o} should be within 1 minute of {expected:o}");
}
// ---------------------------------------------------------------------
// 9. PerChannelOverride_EqualOrLongerThanGlobal_SkipsChannelPurge (M5.5 T3)
// ---------------------------------------------------------------------
[Fact]
public void PerChannelOverride_EqualOrLongerThanGlobal_SkipsChannelPurge()
{
// DbOutbound = 365 (== global) and Notification = 400 (> global, validator would
// normally reject this but the actor must defensively skip it too). Neither is
// SHORTER than the global window, so the actor must NOT issue a channel purge —
// the global partition switch-out already governs those rows.
var repo = new RecordingRepo { Boundaries = new List<DateTime>() };
// Build the options OUTSIDE the Props expression tree (CS8074).
var auditOptions = Options.Create(new AuditLogOptions
{
RetentionDays = 365,
PerChannelRetentionDays = new Dictionary<string, int>
{
["DbOutbound"] = 365,
["Notification"] = 400,
},
});
var purgeOptions = Options.Create(FastTickOptions());
var sp = BuildScopedProvider(repo);
Sys.ActorOf(Props.Create(() => new AuditLogPurgeActor(
sp,
purgeOptions,
auditOptions,
NullLogger<AuditLogPurgeActor>.Instance)));
// Wait for at least one tick (visible via the enumerator call), then assert no
// channel purge was issued.
AwaitAssert(
() => Assert.True(repo.ThresholdQueries.Count >= 1),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
Assert.Empty(repo.ChannelPurges);
}
}
@@ -44,6 +44,9 @@ public class CentralAuditWriteFailuresTests : TestKit
Task.FromResult<IReadOnlyList<AuditEvent>>(Array.Empty<AuditEvent>());
public Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
Task.FromResult(0L);
public Task<long> PurgeChannelOlderThanAsync(
string channel, DateTime threshold, int batchSize, CancellationToken ct = default) =>
Task.FromResult(0L);
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
DateTime threshold, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<DateTime>>(Array.Empty<DateTime>());
@@ -89,6 +89,10 @@ public class SiteAuditReconciliationActorTests : TestKit, IClassFixture<MsSqlMig
public Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
Task.FromResult(0L);
public Task<long> PurgeChannelOlderThanAsync(
string channel, DateTime threshold, int batchSize, CancellationToken ct = default) =>
Task.FromResult(0L);
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
DateTime threshold, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<DateTime>>(Array.Empty<DateTime>());
@@ -50,4 +50,107 @@ public class AuditLogOptionsValidatorTests
result.Failures!,
f => f.Contains(nameof(AuditLogOptions.InboundMaxBytes), StringComparison.Ordinal));
}
// ---------------------------------------------------------------------
// M5.5 (T3) per-channel retention overrides
// ---------------------------------------------------------------------
[Fact]
public void Validate_PerChannelRetention_ShorterThanGlobal_Passes()
{
// A per-channel window strictly shorter than the global window is the
// sanctioned case — the purge actor expires those rows earlier via the
// maintenance-path row DELETE.
var validator = new AuditLogOptionsValidator();
var opts = new AuditLogOptions
{
RetentionDays = 365,
PerChannelRetentionDays = new Dictionary<string, int>
{
["ApiOutbound"] = 90,
["Notification"] = 30, // floor (MinRetentionDays)
},
};
Assert.True(validator.Validate(null, opts).Succeeded);
}
[Fact]
public void Validate_PerChannelRetention_EqualToGlobal_Passes()
{
// Equal to global is allowed (the bound is [Min, RetentionDays] inclusive);
// the purge actor simply treats it as a no-op since it is not SHORTER.
var validator = new AuditLogOptionsValidator();
var opts = new AuditLogOptions
{
RetentionDays = 200,
PerChannelRetentionDays = new Dictionary<string, int> { ["DbOutbound"] = 200 },
};
Assert.True(validator.Validate(null, opts).Succeeded);
}
[Fact]
public void Validate_PerChannelRetention_LongerThanGlobal_Fails()
{
// A per-channel window LONGER than the global window is meaningless under
// month-partition switch-out (governed by the global window) and is rejected.
var validator = new AuditLogOptionsValidator();
var opts = new AuditLogOptions
{
RetentionDays = 100,
PerChannelRetentionDays = new Dictionary<string, int> { ["ApiInbound"] = 200 },
};
var result = validator.Validate(null, opts);
Assert.False(result.Succeeded);
Assert.Contains(
result.Failures!,
f => f.Contains(nameof(AuditLogOptions.PerChannelRetentionDays), StringComparison.Ordinal)
&& f.Contains("ApiInbound", StringComparison.Ordinal));
}
[Fact]
public void Validate_PerChannelRetention_BelowMinimum_Fails()
{
var validator = new AuditLogOptionsValidator();
var opts = new AuditLogOptions
{
RetentionDays = 365,
PerChannelRetentionDays = new Dictionary<string, int> { ["ApiOutbound"] = 29 },
};
var result = validator.Validate(null, opts);
Assert.False(result.Succeeded);
Assert.Contains(
result.Failures!,
f => f.Contains(nameof(AuditLogOptions.PerChannelRetentionDays), StringComparison.Ordinal));
}
[Fact]
public void Validate_PerChannelRetention_UnknownChannelKey_Fails()
{
// Keys must be recognized AuditChannel names; a typo / unknown key is rejected
// rather than silently ignored so a misconfiguration surfaces at boot.
var validator = new AuditLogOptionsValidator();
var opts = new AuditLogOptions
{
RetentionDays = 365,
PerChannelRetentionDays = new Dictionary<string, int> { ["NotAChannel"] = 90 },
};
var result = validator.Validate(null, opts);
Assert.False(result.Succeeded);
Assert.Contains(
result.Failures!,
f => f.Contains("NotAChannel", StringComparison.Ordinal));
}
[Fact]
public void Validate_PerChannelRetention_DefaultEmpty_Passes()
{
// The default (no overrides) must pass — this is the common case.
var validator = new AuditLogOptionsValidator();
Assert.True(validator.Validate(null, new AuditLogOptions()).Succeeded);
}
}
@@ -67,19 +67,25 @@ public class PartitionPurgeTests : TestKit, IClassFixture<MsSqlMigrationFixture>
SqlConnection conn,
Guid eventId,
DateTime occurredAtUtc,
string siteId)
string siteId,
string channel = "ApiOutbound",
string kind = "ApiCall")
{
await using var cmd = conn.CreateCommand();
// C5 (Task 2.5): dbo.AuditLog is now the 10 canonical columns + DetailsJson;
// the ScadaBridge domain fields (channel/kind/status/sourceSiteId) ride in
// DetailsJson and the SourceSiteId/Kind/Status computed columns auto-derive.
// Action = "{channel}.{kind}", Category = channel name, Outcome = Success.
// The channel/kind are parameterized so the M5.5 per-channel purge test can
// seed multiple channels into the same partition.
cmd.CommandText = @"
INSERT INTO dbo.AuditLog
(EventId, OccurredAtUtc, Actor, Action, Outcome, Category, Target, SourceNode, CorrelationId, DetailsJson)
VALUES
(@EventId, @OccurredAtUtc, NULL, 'ApiOutbound.ApiCall', 'Success', 'ApiOutbound', NULL, NULL, NULL,
(@EventId, @OccurredAtUtc, NULL, @Action, 'Success', @Category, NULL, NULL, NULL,
@DetailsJson);";
cmd.Parameters.Add("@Action", System.Data.SqlDbType.VarChar, 64).Value = $"{channel}.{kind}";
cmd.Parameters.Add("@Category", System.Data.SqlDbType.VarChar, 32).Value = channel;
cmd.Parameters.Add("@EventId", System.Data.SqlDbType.UniqueIdentifier).Value = eventId;
// SqlDbType.DateTime2 with explicit Scale 7 matches the
// OccurredAtUtc column shape (datetime2(7)) and avoids the implicit
@@ -97,7 +103,7 @@ VALUES
// the computed SourceSiteId column the verify queries scope on. payloadTruncated
// is always present (the codec always writes the bool).
var detailsJson =
"{\"channel\":\"ApiOutbound\",\"kind\":\"ApiCall\",\"status\":\"Delivered\"," +
"{\"channel\":\"" + channel + "\",\"kind\":\"" + kind + "\",\"status\":\"Delivered\"," +
"\"sourceSiteId\":\"" + siteId + "\",\"payloadTruncated\":false}";
cmd.Parameters.Add("@DetailsJson", System.Data.SqlDbType.NVarChar, -1).Value = detailsJson;
await cmd.ExecuteNonQueryAsync();
@@ -354,4 +360,87 @@ WHERE name = 'UX_AuditLog_EventId'
Assert.Single(rows);
Assert.Equal(freshEventId, rows[0].EventId);
}
// ---------------------------------------------------------------------
// 4. PerChannelOverride_DeletesOnlyOverriddenChannelsOldRows (M5.5 T3)
// ---------------------------------------------------------------------
/// <summary>
/// M5.5 (T3): exercises <see cref="IAuditLogRepository.PurgeChannelOlderThanAsync"/>
/// directly against the real repository + fixture DB. Seeds, in the SAME partition,
/// old + recent rows for an OVERRIDDEN channel (<c>ApiOutbound</c>) and old + recent
/// rows for an UN-overridden channel (<c>DbOutbound</c>), then runs the per-channel
/// purge for <c>ApiOutbound</c> only. Asserts:
/// <list type="number">
/// <item>The overridden channel's OLD rows are deleted.</item>
/// <item>The overridden channel's RECENT rows (newer than the channel threshold) survive.</item>
/// <item>The un-overridden channel's rows (old AND recent) are completely untouched
/// — they follow the global window, which the channel purge never applies to them.</item>
/// </list>
/// This is the maintenance-path row DELETE; the fixture connects as <c>sa</c>, which
/// the append-only writer-role DENYs do not bind (the role granularity is exercised
/// in the repository/migration tests).
/// </summary>
[SkippableFact]
public async Task PerChannelOverride_DeletesOnlyOverriddenChannelsOldRows()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = "perchannel-" + Guid.NewGuid().ToString("N").Substring(0, 8);
// Two timestamps: one OLD (older than the channel threshold we will purge with)
// and one RECENT (newer than it). Both sit comfortably inside the retention
// window so the global partition purge would NOT touch either — isolating the
// per-channel DELETE as the only force acting here.
var oldOccurred = new DateTime(2026, 1, 15, 0, 0, 0, DateTimeKind.Utc);
var recentOccurred = new DateTime(2026, 5, 15, 0, 0, 0, DateTimeKind.Utc);
var apiOldId = Guid.NewGuid(); // ApiOutbound, old → SHOULD be deleted
var apiRecentId = Guid.NewGuid(); // ApiOutbound, recent→ SHOULD survive
var dbOldId = Guid.NewGuid(); // DbOutbound, old → SHOULD survive (un-overridden)
var dbRecentId = Guid.NewGuid(); // DbOutbound, recent → SHOULD survive
await using (var seedConn = _fixture.OpenConnection())
{
await DirectInsertAsync(seedConn, apiOldId, oldOccurred, siteId, channel: "ApiOutbound", kind: "ApiCall");
await DirectInsertAsync(seedConn, apiRecentId, recentOccurred, siteId, channel: "ApiOutbound", kind: "ApiCall");
await DirectInsertAsync(seedConn, dbOldId, oldOccurred, siteId, channel: "DbOutbound", kind: "DbWrite");
await DirectInsertAsync(seedConn, dbRecentId, recentOccurred, siteId, channel: "DbOutbound", kind: "DbWrite");
}
// Purge ApiOutbound rows older than a threshold that sits strictly between the
// old (Jan 15) and recent (May 15) seeds — e.g. Mar 1. Only apiOldId qualifies.
var channelThreshold = new DateTime(2026, 3, 1, 0, 0, 0, DateTimeKind.Utc);
await using (var ctx = CreateContext())
{
var repo = new AuditLogRepository(ctx);
var deleted = await repo.PurgeChannelOlderThanAsync(
channel: "ApiOutbound",
threshold: channelThreshold,
batchSize: 2);
Assert.Equal(1L, deleted);
// Idempotent: a second run deletes nothing (the eligible row is gone).
var deletedAgain = await repo.PurgeChannelOlderThanAsync(
channel: "ApiOutbound",
threshold: channelThreshold,
batchSize: 2);
Assert.Equal(0L, deletedAgain);
}
await using var verify = CreateContext();
var rows = await verify.Set<AuditLogRow>()
.Where(e => e.SourceSiteId == siteId)
.ToListAsync();
// Overridden channel: old gone, recent kept.
Assert.DoesNotContain(rows, r => r.EventId == apiOldId);
Assert.Contains(rows, r => r.EventId == apiRecentId);
// Un-overridden channel: BOTH rows untouched (follow the global window).
Assert.Contains(rows, r => r.EventId == dbOldId);
Assert.Contains(rows, r => r.EventId == dbRecentId);
}
}
@@ -89,6 +89,10 @@ public class SiteAuditPushFlowTests : TestKit
public Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default)
=> throw new NotSupportedException();
public Task<long> PurgeChannelOlderThanAsync(
string channel, DateTime threshold, int batchSize, CancellationToken ct = default)
=> throw new NotSupportedException();
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
DateTime threshold, CancellationToken ct = default)
=> throw new NotSupportedException();