Replaces M1's NotSupportedException stub with the production drop-DROP-INDEX → CREATE-staging → SWITCH PARTITION → DROP-staging → CREATE-INDEX dance documented in alog.md §4. UX_AuditLog_EventId is intentionally non-aligned with ps_AuditLog_Month so single-column EventId uniqueness can be enforced cheaply for InsertIfNotExistsAsync; SQL Server rejects ALTER TABLE SWITCH while a non-aligned unique index is present, so the implementation drops it, switches the partition data into a GUID-suffixed staging table on [PRIMARY], drops staging (discarding the rows), and rebuilds the unique index — all inside an explicit transaction with a CATCH that guarantees the unique index is rebuilt regardless of failure point. Also adds GetPartitionBoundariesOlderThanAsync to IAuditLogRepository: a CROSS APPLY over sys.partition_range_values + per-partition MAX(OccurredAtUtc) to enumerate retention-eligible months for the M6 purge actor (next commit). Tests verify: * Old partition's rows are removed; other months untouched * UX_AuditLog_EventId is rebuilt after a successful switch * InsertIfNotExistsAsync's first-write-wins idempotency still holds after switch * On engineered SWITCH failure (inbound FK from a probe table), SqlException propagates AND UX_AuditLog_EventId is still present (CATCH branch ran) * GetPartitionBoundariesOlderThanAsync returns only boundaries whose partition's MAX(OccurredAtUtc) is strictly older than the threshold; empty partitions excluded
443 lines
19 KiB
C#
443 lines
19 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Options;
|
|
using ScadaLink.AuditLog.Central;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces.Repositories;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Types.Audit;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.ConfigurationDatabase;
|
|
using ScadaLink.ConfigurationDatabase.Repositories;
|
|
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
|
|
|
namespace ScadaLink.AuditLog.Tests.Central;
|
|
|
|
/// <summary>
|
|
/// Bundle B (M6-T3) tests for <see cref="SiteAuditReconciliationActor"/>. Most
|
|
/// tests substitute the <see cref="IAuditLogRepository"/> with an in-memory
|
|
/// recording stub so the actor's tick / cursor / stalled state machinery can
|
|
/// be exercised in milliseconds without an MSSQL container. The duplicate /
|
|
/// idempotency assertion uses the real <see cref="AuditLogRepository"/> against
|
|
/// the <see cref="MsSqlMigrationFixture"/> so we verify InsertIfNotExistsAsync
|
|
/// actually swallows duplicate-key collisions (the M2 Bundle A race-fix the
|
|
/// reconciliation puller depends on).
|
|
/// </summary>
|
|
public class SiteAuditReconciliationActorTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
|
{
|
|
private readonly MsSqlMigrationFixture _fixture;
|
|
|
|
public SiteAuditReconciliationActorTests(MsSqlMigrationFixture fixture)
|
|
{
|
|
_fixture = fixture;
|
|
}
|
|
|
|
private static AuditEvent NewEvent(
|
|
string siteId,
|
|
DateTime? occurredAt = null,
|
|
Guid? id = null) => new()
|
|
{
|
|
EventId = id ?? Guid.NewGuid(),
|
|
OccurredAtUtc = occurredAt ?? new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.ApiCall,
|
|
Status = AuditStatus.Delivered,
|
|
SourceSiteId = siteId,
|
|
};
|
|
|
|
private static SiteAuditReconciliationOptions FastTickOptions(
|
|
int batchSize = 256,
|
|
int stalledAfter = 2) =>
|
|
new()
|
|
{
|
|
// 100 ms tick keeps each test under a second. AwaitAssert covers
|
|
// schedule jitter so a 100 ms tick has up to ~3 s to fire.
|
|
ReconciliationIntervalSeconds = 300,
|
|
ReconciliationIntervalOverride = TimeSpan.FromMilliseconds(100),
|
|
BatchSize = batchSize,
|
|
StalledAfterNonDrainingCycles = stalledAfter,
|
|
};
|
|
|
|
/// <summary>
|
|
/// In-memory recording stub used for non-MSSQL tests. Captures every
|
|
/// <see cref="InsertIfNotExistsAsync"/> call AND deduplicates on
|
|
/// <see cref="AuditEvent.EventId"/> so duplicate-handling assertions don't
|
|
/// need a real database for the simple cases.
|
|
/// </summary>
|
|
private sealed class RecordingRepo : IAuditLogRepository
|
|
{
|
|
public List<AuditEvent> Inserted { get; } = new();
|
|
private readonly HashSet<Guid> _seen = new();
|
|
public int InsertCallCount { get; private set; }
|
|
|
|
public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default)
|
|
{
|
|
InsertCallCount++;
|
|
if (_seen.Add(evt.EventId))
|
|
{
|
|
Inserted.Add(evt);
|
|
}
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task<IReadOnlyList<AuditEvent>> QueryAsync(
|
|
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) =>
|
|
Task.FromResult<IReadOnlyList<AuditEvent>>(Inserted);
|
|
|
|
public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
|
|
Task.CompletedTask;
|
|
|
|
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
|
|
DateTime threshold, CancellationToken ct = default) =>
|
|
Task.FromResult<IReadOnlyList<DateTime>>(Array.Empty<DateTime>());
|
|
}
|
|
|
|
/// <summary>
|
|
/// In-memory enumerator returning a static list of sites.
|
|
/// </summary>
|
|
private sealed class StaticEnumerator : ISiteEnumerator
|
|
{
|
|
private readonly IReadOnlyList<SiteEntry> _sites;
|
|
public StaticEnumerator(params SiteEntry[] sites) => _sites = sites;
|
|
public Task<IReadOnlyList<SiteEntry>> EnumerateAsync(CancellationToken ct = default) =>
|
|
Task.FromResult(_sites);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Scripted pull client — returns the next queued response for the site
|
|
/// on each call, looping the last entry if the queue is exhausted. Also
|
|
/// records every invocation so tests can assert call counts + arguments.
|
|
/// </summary>
|
|
private sealed class ScriptedPullClient : IPullAuditEventsClient
|
|
{
|
|
public List<(string SiteId, DateTime SinceUtc, int BatchSize)> Calls { get; } = new();
|
|
private readonly Dictionary<string, Queue<PullAuditEventsResponse>> _scripted = new();
|
|
private readonly Dictionary<string, Exception> _throwOnSite = new();
|
|
|
|
public ScriptedPullClient Script(string siteId, params PullAuditEventsResponse[] responses)
|
|
{
|
|
_scripted[siteId] = new Queue<PullAuditEventsResponse>(responses);
|
|
return this;
|
|
}
|
|
|
|
public ScriptedPullClient ThrowFor(string siteId, Exception ex)
|
|
{
|
|
_throwOnSite[siteId] = ex;
|
|
return this;
|
|
}
|
|
|
|
public Task<PullAuditEventsResponse> PullAsync(
|
|
string siteId, DateTime sinceUtc, int batchSize, CancellationToken ct)
|
|
{
|
|
Calls.Add((siteId, sinceUtc, batchSize));
|
|
if (_throwOnSite.TryGetValue(siteId, out var ex))
|
|
{
|
|
throw ex;
|
|
}
|
|
if (_scripted.TryGetValue(siteId, out var queue) && queue.Count > 0)
|
|
{
|
|
return Task.FromResult(queue.Dequeue());
|
|
}
|
|
return Task.FromResult(
|
|
new PullAuditEventsResponse(Array.Empty<AuditEvent>(), MoreAvailable: false));
|
|
}
|
|
}
|
|
|
|
private IServiceProvider BuildScopedProvider(IAuditLogRepository repo)
|
|
{
|
|
var services = new ServiceCollection();
|
|
// The actor opens a scope per tick and resolves IAuditLogRepository
|
|
// from that scope; registering as scoped mirrors how
|
|
// AddConfigurationDatabase wires the real repository.
|
|
services.AddScoped(_ => repo);
|
|
return services.BuildServiceProvider();
|
|
}
|
|
|
|
private IActorRef CreateActor(
|
|
ISiteEnumerator sites,
|
|
IPullAuditEventsClient client,
|
|
IAuditLogRepository repo,
|
|
SiteAuditReconciliationOptions options)
|
|
{
|
|
var sp = BuildScopedProvider(repo);
|
|
return Sys.ActorOf(Props.Create(() => new SiteAuditReconciliationActor(
|
|
sites,
|
|
client,
|
|
sp,
|
|
Options.Create(options),
|
|
NullLogger<SiteAuditReconciliationActor>.Instance)));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Subscribes to the EventStream and collects every
|
|
/// <see cref="SiteAuditTelemetryStalledChanged"/> publication into a list
|
|
/// the test can assert on. Uses a probe actor so the stream's
|
|
/// fire-and-forget delivery is observable from the test thread.
|
|
/// </summary>
|
|
private (Akka.TestKit.TestProbe Probe, List<SiteAuditTelemetryStalledChanged> Captured) SubscribeStalled()
|
|
{
|
|
var probe = CreateTestProbe();
|
|
Sys.EventStream.Subscribe(probe.Ref, typeof(SiteAuditTelemetryStalledChanged));
|
|
var captured = new List<SiteAuditTelemetryStalledChanged>();
|
|
return (probe, captured);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 1. Timer_Fires_OnConfiguredInterval
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Timer_Fires_OnConfiguredInterval()
|
|
{
|
|
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
|
var client = new ScriptedPullClient();
|
|
var repo = new RecordingRepo();
|
|
var opts = FastTickOptions();
|
|
|
|
CreateActor(sites, client, repo, opts);
|
|
|
|
// The first scheduled tick fires after `ReconciliationIntervalSeconds`,
|
|
// which is 0 for the test — Akka's scheduler still respects the
|
|
// ScheduleTellRepeatedlyCancelable contract that issues a Tell on the
|
|
// scheduler thread, so we await visible side effects (a PullAsync call)
|
|
// rather than racing on internal state.
|
|
AwaitAssert(
|
|
() => Assert.True(client.Calls.Count >= 1, $"expected >= 1 pull call, got {client.Calls.Count}"),
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 2. Tick_PullsFromEachKnownSite
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Tick_PullsFromEachKnownSite()
|
|
{
|
|
var sites = new StaticEnumerator(
|
|
new SiteEntry("siteA", "http://siteA:8083"),
|
|
new SiteEntry("siteB", "http://siteB:8083"));
|
|
var client = new ScriptedPullClient();
|
|
var repo = new RecordingRepo();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions());
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
Assert.Contains(client.Calls, c => c.SiteId == "siteA");
|
|
Assert.Contains(client.Calls, c => c.SiteId == "siteB");
|
|
},
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 3. Tick_IngestEvents_ViaInsertIfNotExistsAsync
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Tick_IngestEvents_ViaInsertIfNotExistsAsync()
|
|
{
|
|
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
|
var e1 = NewEvent("siteA");
|
|
var e2 = NewEvent("siteA");
|
|
var client = new ScriptedPullClient().Script("siteA",
|
|
new PullAuditEventsResponse(new[] { e1, e2 }, MoreAvailable: false));
|
|
var repo = new RecordingRepo();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions());
|
|
|
|
AwaitAssert(() => Assert.Equal(2, repo.InsertCallCount),
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
Assert.Contains(repo.Inserted, e => e.EventId == e1.EventId);
|
|
Assert.Contains(repo.Inserted, e => e.EventId == e2.EventId);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 4. Tick_Duplicates_NotDoubleInserted (real MSSQL idempotency)
|
|
// ---------------------------------------------------------------------
|
|
|
|
private ScadaLinkDbContext CreateContext() =>
|
|
new(new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
|
.UseSqlServer(_fixture.ConnectionString).Options);
|
|
|
|
[SkippableFact]
|
|
public async Task Tick_Duplicates_NotDoubleInserted()
|
|
{
|
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
|
|
|
var siteId = "bundle-b-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
|
var pre = NewEvent(siteId);
|
|
|
|
// Seed the row directly so the actor sees it already present when the
|
|
// pull returns it.
|
|
await using (var seedContext = CreateContext())
|
|
{
|
|
await new AuditLogRepository(seedContext).InsertIfNotExistsAsync(pre);
|
|
}
|
|
|
|
// Stack one new and the pre-existing row in the pull response. The
|
|
// second-pull script returns empty so the actor settles.
|
|
var fresh = NewEvent(siteId);
|
|
var sites = new StaticEnumerator(new SiteEntry(siteId, "http://x:8083"));
|
|
var client = new ScriptedPullClient().Script(siteId,
|
|
new PullAuditEventsResponse(new[] { pre, fresh }, MoreAvailable: false));
|
|
|
|
await using var context = CreateContext();
|
|
var repo = new AuditLogRepository(context);
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions());
|
|
|
|
// Wait for the actor to ingest both rows.
|
|
await Task.Delay(TimeSpan.FromSeconds(1));
|
|
AwaitAssert(() => Assert.True(client.Calls.Count >= 1),
|
|
duration: TimeSpan.FromSeconds(3));
|
|
|
|
// Even though the pull returned 2 events, only 1 fresh row should
|
|
// exist in MSSQL alongside the pre-existing one — InsertIfNotExistsAsync
|
|
// is first-write-wins on EventId.
|
|
await using var read = CreateContext();
|
|
var rows = await read.Set<AuditEvent>()
|
|
.Where(e => e.SourceSiteId == siteId)
|
|
.ToListAsync();
|
|
Assert.Equal(2, rows.Count);
|
|
Assert.Contains(rows, r => r.EventId == pre.EventId);
|
|
Assert.Contains(rows, r => r.EventId == fresh.EventId);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 5. Cursor_Advances_ToMaxOccurredAtUtc
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Cursor_Advances_ToMaxOccurredAtUtc()
|
|
{
|
|
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
|
|
|
var t1 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
|
|
var t2 = new DateTime(2026, 5, 20, 10, 1, 0, DateTimeKind.Utc);
|
|
var t3 = new DateTime(2026, 5, 20, 10, 2, 0, DateTimeKind.Utc);
|
|
var e1 = NewEvent("siteA", t1);
|
|
var e2 = NewEvent("siteA", t2);
|
|
var e3 = NewEvent("siteA", t3);
|
|
|
|
// First pull returns three events with t1, t2, t3. Subsequent pulls
|
|
// return empty — but the test asserts the SECOND pull's since argument
|
|
// is t3 (the max OccurredAtUtc from the first pull).
|
|
var client = new ScriptedPullClient().Script("siteA",
|
|
new PullAuditEventsResponse(new[] { e1, e2, e3 }, MoreAvailable: false));
|
|
var repo = new RecordingRepo();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions());
|
|
|
|
// Wait until we have at least two pulls — the second one must use t3
|
|
// as its `since` argument because that was the max OccurredAtUtc in
|
|
// the first response.
|
|
AwaitAssert(() => Assert.True(client.Calls.Count >= 2,
|
|
$"need at least 2 pulls to assert cursor advancement, got {client.Calls.Count}"),
|
|
duration: TimeSpan.FromSeconds(5),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
|
|
Assert.Equal(DateTime.MinValue, client.Calls[0].SinceUtc);
|
|
Assert.Equal(t3, client.Calls[1].SinceUtc);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 6. Tick_OneSiteThrows_OtherSitesStillProcessed
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Tick_OneSiteThrows_OtherSitesStillProcessed()
|
|
{
|
|
var sites = new StaticEnumerator(
|
|
new SiteEntry("siteA", "http://siteA:8083"),
|
|
new SiteEntry("siteB", "http://siteB:8083"));
|
|
|
|
var bEvent = NewEvent("siteB");
|
|
var client = new ScriptedPullClient()
|
|
.ThrowFor("siteA", new InvalidOperationException("simulated transport failure"))
|
|
.Script("siteB",
|
|
new PullAuditEventsResponse(new[] { bEvent }, MoreAvailable: false));
|
|
var repo = new RecordingRepo();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions());
|
|
|
|
AwaitAssert(() =>
|
|
{
|
|
Assert.Contains(client.Calls, c => c.SiteId == "siteA");
|
|
Assert.Contains(repo.Inserted, e => e.EventId == bEvent.EventId);
|
|
},
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 7. StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void StalledDetection_TwoConsecutiveNonDrainingCycles_PublishesStalledTrue()
|
|
{
|
|
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
|
|
|
// Two scripted responses that each return events AND MoreAvailable=true
|
|
// — the second pull triggers the stalled transition.
|
|
var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
|
var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
|
var client = new ScriptedPullClient().Script("siteA",
|
|
new PullAuditEventsResponse(batch1, MoreAvailable: true),
|
|
new PullAuditEventsResponse(batch2, MoreAvailable: true));
|
|
|
|
var repo = new RecordingRepo();
|
|
var (probe, _) = SubscribeStalled();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2));
|
|
|
|
// Expect Stalled=true after the second non-draining tick. The probe
|
|
// waits with its own timeout (a few seconds gives the 0 s repeat
|
|
// interval ample slack).
|
|
var msg = probe.ExpectMsg<SiteAuditTelemetryStalledChanged>(TimeSpan.FromSeconds(5));
|
|
Assert.Equal("siteA", msg.SiteId);
|
|
Assert.True(msg.Stalled);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// 8. StalledDetection_DrainingCycle_PublishesStalledFalse
|
|
// ---------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void StalledDetection_DrainingCycle_PublishesStalledFalse()
|
|
{
|
|
var sites = new StaticEnumerator(new SiteEntry("siteA", "http://siteA:8083"));
|
|
|
|
// Two non-draining responses get the actor into Stalled=true, then a
|
|
// draining response (events but MoreAvailable=false) flips it back.
|
|
var batch1 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
|
var batch2 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
|
var batch3 = Enumerable.Range(0, 3).Select(_ => NewEvent("siteA")).ToArray();
|
|
var client = new ScriptedPullClient().Script("siteA",
|
|
new PullAuditEventsResponse(batch1, MoreAvailable: true),
|
|
new PullAuditEventsResponse(batch2, MoreAvailable: true),
|
|
new PullAuditEventsResponse(batch3, MoreAvailable: false));
|
|
|
|
var repo = new RecordingRepo();
|
|
var (probe, _) = SubscribeStalled();
|
|
|
|
CreateActor(sites, client, repo, FastTickOptions(stalledAfter: 2));
|
|
|
|
// First publication is the stalled=true transition; second is the
|
|
// back-to-draining flip. The actor publishes ONLY on transitions so we
|
|
// expect exactly these two messages in order.
|
|
var first = probe.ExpectMsg<SiteAuditTelemetryStalledChanged>(TimeSpan.FromSeconds(5));
|
|
Assert.True(first.Stalled);
|
|
|
|
var second = probe.ExpectMsg<SiteAuditTelemetryStalledChanged>(TimeSpan.FromSeconds(5));
|
|
Assert.False(second.Stalled);
|
|
Assert.Equal("siteA", second.SiteId);
|
|
}
|
|
}
|