diff --git a/src/ScadaLink.AuditLog/ScadaLink.AuditLog.csproj b/src/ScadaLink.AuditLog/ScadaLink.AuditLog.csproj index 821fb5b..9641b66 100644 --- a/src/ScadaLink.AuditLog/ScadaLink.AuditLog.csproj +++ b/src/ScadaLink.AuditLog/ScadaLink.AuditLog.csproj @@ -8,6 +8,9 @@ + + diff --git a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs index 818270a..789b572 100644 --- a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs +++ b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs @@ -2,6 +2,7 @@ using System.Threading.Channels; using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using ScadaLink.AuditLog.Site.Telemetry; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Types.Enums; @@ -28,7 +29,7 @@ namespace ScadaLink.AuditLog.Site; /// the site SQLite schema — central stamps it on ingest. /// /// -public class SqliteAuditWriter : IAuditWriter, IAsyncDisposable, IDisposable +public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable, IDisposable { // Microsoft.Data.Sqlite reports a generic SQLITE_CONSTRAINT (error code 19) // on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY) diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/ISiteAuditQueue.cs b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteAuditQueue.cs new file mode 100644 index 0000000..9da55b5 --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteAuditQueue.cs @@ -0,0 +1,34 @@ +using ScadaLink.Commons.Entities.Audit; + +namespace ScadaLink.AuditLog.Site.Telemetry; + +/// +/// Site-local audit-log queue surface consumed by . +/// Extracted from so the telemetry actor can be +/// unit-tested against a stub without touching SQLite. +/// implements this interface; production wiring injects the same instance. +/// +/// +/// Only the two methods the drain loop needs are exposed — the hot-path +/// WriteAsync stays on +/// (script-thread surface), separated by concern from the +/// telemetry-actor surface so each side can be mocked independently. +/// +public interface ISiteAuditQueue +{ + /// + /// Returns up to rows currently in + /// , + /// oldest first. Idempotent — repeated calls before + /// will yield the same rows again. + /// + Task> ReadPendingAsync(int limit, CancellationToken ct = default); + + /// + /// Flips the supplied EventIds from + /// to + /// . + /// Non-existent or already-forwarded ids are silent no-ops. + /// + Task MarkForwardedAsync(IReadOnlyList eventIds, CancellationToken ct = default); +} diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs new file mode 100644 index 0000000..c25b05a --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs @@ -0,0 +1,23 @@ +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.AuditLog.Site.Telemetry; + +/// +/// Mockable abstraction over the central site-stream gRPC client surface that +/// uses to push +/// payloads. The production implementation (added in Bundle E host wiring) +/// wraps the auto-generated SiteStreamService.SiteStreamServiceClient; +/// unit tests substitute via NSubstitute against this interface so the actor +/// never needs a live gRPC channel. +/// +public interface ISiteStreamAuditClient +{ + /// + /// Pushes to the central IngestAuditEvents + /// RPC. The returned carries the + /// accepted_event_ids the actor will flip to + /// + /// in the site SQLite queue. + /// + Task IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct); +} diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs new file mode 100644 index 0000000..a820cf5 --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs @@ -0,0 +1,179 @@ +using Akka.Actor; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using ScadaLink.AuditLog.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.AuditLog.Site.Telemetry; + +/// +/// Site-side actor that drains the local SQLite audit queue and pushes Pending +/// rows to central via the IngestAuditEvents gRPC RPC. On a successful +/// ack the matching EventIds flip to +/// ; on +/// a gRPC failure the rows stay Pending and the next drain retries. +/// +/// +/// +/// The drain self-tick is a private Drain message scheduled via the +/// actor system scheduler. The cadence is options-driven: BusyIntervalSeconds +/// when the previous drain found rows (or faulted — we want quick recovery), +/// IdleIntervalSeconds when the queue was empty. +/// +/// +/// Both collaborators are injected as interfaces ( +/// and ) so unit tests substitute with +/// NSubstitute and never touch real SQLite or gRPC. +/// +/// +/// Per Bundle D's brief, audit-write paths must be fail-safe — a thrown +/// exception inside the actor MUST NOT crash it. The Drain handler wraps the +/// pipeline in a top-level try/catch that logs and re-schedules, and the +/// actor's defaults to +/// 's Restart for +/// child actors — but this actor has no children, so the catch is what matters. +/// +/// +public class SiteAuditTelemetryActor : ReceiveActor +{ + private readonly ISiteAuditQueue _queue; + private readonly ISiteStreamAuditClient _client; + private readonly SiteAuditTelemetryOptions _options; + private readonly ILogger _logger; + private ICancelable? _pendingTick; + + public SiteAuditTelemetryActor( + ISiteAuditQueue queue, + ISiteStreamAuditClient client, + IOptions options, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(queue); + ArgumentNullException.ThrowIfNull(client); + ArgumentNullException.ThrowIfNull(options); + ArgumentNullException.ThrowIfNull(logger); + + _queue = queue; + _client = client; + _options = options.Value; + _logger = logger; + + ReceiveAsync(_ => OnDrainAsync()); + } + + protected override void PreStart() + { + base.PreStart(); + // Initial tick fires on the busy interval so the actor starts polling + // soon after host startup. A subsequent empty drain will move to the + // idle interval naturally. + ScheduleNext(TimeSpan.FromSeconds(_options.BusyIntervalSeconds)); + } + + protected override void PostStop() + { + _pendingTick?.Cancel(); + base.PostStop(); + } + + private async Task OnDrainAsync() + { + var nextDelay = TimeSpan.FromSeconds(_options.BusyIntervalSeconds); + try + { + var pending = await _queue.ReadPendingAsync(_options.BatchSize, CancellationToken.None) + .ConfigureAwait(false); + if (pending.Count == 0) + { + // No rows — settle into the idle cadence until the next write + // bumps us back into the busy cadence. + nextDelay = TimeSpan.FromSeconds(_options.IdleIntervalSeconds); + return; + } + + var batch = BuildBatch(pending); + + IngestAck ack; + try + { + ack = await _client.IngestAuditEventsAsync(batch, CancellationToken.None) + .ConfigureAwait(false); + } + catch (Exception ex) + { + // gRPC fault — leave the rows in Pending so the next drain + // retries. Bundle D's brief: "On gRPC exception (any), log + // Warning, schedule next Drain in BusyIntervalSeconds." + _logger.LogWarning(ex, + "IngestAuditEvents push failed for {Count} pending events; will retry next drain.", + pending.Count); + return; + } + + var acceptedIds = ParseAcceptedIds(ack); + if (acceptedIds.Count > 0) + { + await _queue.MarkForwardedAsync(acceptedIds, CancellationToken.None) + .ConfigureAwait(false); + } + } + catch (Exception ex) + { + // Catch-all so a SQLite hiccup or mapper bug never crashes the + // actor. The next tick is still scheduled in the finally block. + _logger.LogError(ex, "Unexpected error during audit-log telemetry drain."); + } + finally + { + ScheduleNext(nextDelay); + } + } + + private static AuditEventBatch BuildBatch(IReadOnlyList events) + { + var batch = new AuditEventBatch(); + foreach (var e in events) + { + batch.Events.Add(AuditEventMapper.ToDto(e)); + } + return batch; + } + + private static IReadOnlyList ParseAcceptedIds(IngestAck ack) + { + if (ack.AcceptedEventIds.Count == 0) + { + return Array.Empty(); + } + + var list = new List(ack.AcceptedEventIds.Count); + foreach (var raw in ack.AcceptedEventIds) + { + if (Guid.TryParse(raw, out var id)) + { + list.Add(id); + } + // Malformed ids are ignored — central should never emit them, but + // we refuse to crash the actor over a bad string. + } + return list; + } + + private void ScheduleNext(TimeSpan delay) + { + _pendingTick?.Cancel(); + _pendingTick = Context.System.Scheduler.ScheduleTellOnceCancelable( + delay, + Self, + Drain.Instance, + Self); + } + + /// Self-tick message that triggers a drain cycle. + private sealed class Drain + { + public static readonly Drain Instance = new(); + private Drain() { } + } +} diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryOptions.cs b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryOptions.cs new file mode 100644 index 0000000..9aab759 --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryOptions.cs @@ -0,0 +1,28 @@ +namespace ScadaLink.AuditLog.Site.Telemetry; + +/// +/// Tuning knobs for the site-side drain +/// loop. Defaults mirror Bundle D's plan: drain every 5 s while rows are +/// flowing (busy), every 30 s when the queue is empty (idle). +/// +public sealed class SiteAuditTelemetryOptions +{ + /// + /// Maximum number of + /// rows read from the site SQLite queue and pushed in a single gRPC batch. + /// + public int BatchSize { get; set; } = 256; + + /// + /// Delay between drains when the previous drain found at least one Pending + /// row OR the previous push faulted. Re-drain quickly to keep telemetry + /// flowing and to retry transient gRPC errors. + /// + public int BusyIntervalSeconds { get; set; } = 5; + + /// + /// Delay between drains when the previous drain found no Pending rows. + /// Longer interval avoids hammering an idle SQLite + gRPC channel. + /// + public int IdleIntervalSeconds { get; set; } = 30; +} diff --git a/tests/ScadaLink.AuditLog.Tests/ScadaLink.AuditLog.Tests.csproj b/tests/ScadaLink.AuditLog.Tests/ScadaLink.AuditLog.Tests.csproj index 539e9a5..625f9c6 100644 --- a/tests/ScadaLink.AuditLog.Tests/ScadaLink.AuditLog.Tests.csproj +++ b/tests/ScadaLink.AuditLog.Tests/ScadaLink.AuditLog.Tests.csproj @@ -9,14 +9,30 @@ + + + + + + + @@ -25,6 +41,13 @@ + + diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs new file mode 100644 index 0000000..f8bef38 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs @@ -0,0 +1,235 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.AuditLog.Tests.Site.Telemetry; + +/// +/// Bundle D D1 tests for . The actor drains +/// the site SQLite queue via , pushes batches via +/// , and flips ack'd rows to Forwarded. +/// Both collaborators are NSubstitute mocks so the tests never touch real +/// SQLite or gRPC. +/// +public class SiteAuditTelemetryActorTests : TestKit +{ + private readonly ISiteAuditQueue _queue = Substitute.For(); + private readonly ISiteStreamAuditClient _client = Substitute.For(); + + /// + /// Fast options so tests don't stall waiting for the scheduler. 1s busy / + /// 2s idle still exercises the busy-vs-idle branching, but each test + /// completes in < 5 s wall-clock. + /// + private static IOptions Opts( + int batchSize = 256, + int busySeconds = 1, + int idleSeconds = 2) => + Options.Create(new SiteAuditTelemetryOptions + { + BatchSize = batchSize, + BusyIntervalSeconds = busySeconds, + IdleIntervalSeconds = idleSeconds, + }); + + private IActorRef CreateActor(IOptions? options = null) => + Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor( + _queue, + _client, + options ?? Opts(), + NullLogger.Instance))); + + private static AuditEvent NewEvent(Guid? id = null) => new() + { + EventId = id ?? Guid.NewGuid(), + OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + SourceSiteId = "site-1", + ForwardState = AuditForwardState.Pending, + }; + + private static IngestAck AckAll(IReadOnlyList events) + { + var ack = new IngestAck(); + foreach (var e in events) + { + ack.AcceptedEventIds.Add(e.EventId.ToString()); + } + return ack; + } + + [Fact] + public async Task Drain_With_50PendingRows_Sends_OneBatch_Of_50_Then_FlipsToForwarded() + { + // Arrange — 50 pending rows on the first read, then empty on subsequent + // reads so the actor settles after one productive drain. + var pending = Enumerable.Range(0, 50).Select(_ => NewEvent()).ToList(); + _queue.ReadPendingAsync(Arg.Any(), Arg.Any()) + .Returns( + Task.FromResult>(pending), + Task.FromResult>(Array.Empty())); + + AuditEventBatch? capturedBatch = null; + _client.IngestAuditEventsAsync(Arg.Any(), Arg.Any()) + .Returns(call => + { + capturedBatch = call.Arg(); + return Task.FromResult(AckAll(pending)); + }); + + // Act + CreateActor(); + + // Assert — give the scheduler time to fire the initial Drain tick. + await AwaitAssertAsync(async () => + { + await _client.Received(1).IngestAuditEventsAsync( + Arg.Any(), Arg.Any()); + await _queue.Received(1).MarkForwardedAsync( + Arg.Is>(g => g.Count == 50), Arg.Any()); + }, TimeSpan.FromSeconds(5)); + + Assert.NotNull(capturedBatch); + Assert.Equal(50, capturedBatch!.Events.Count); + + var expected = pending.Select(e => e.EventId).ToHashSet(); + await _queue.Received(1).MarkForwardedAsync( + Arg.Is>(g => g.ToHashSet().SetEquals(expected)), + Arg.Any()); + } + + [Fact] + public async Task Drain_GrpcThrows_RowsStayPending_NextDrainRetries() + { + // Arrange — first read returns 3 rows; the gRPC client throws on the + // first push, then succeeds on the second. After the second push the + // queue returns empty so the actor settles. + var batch = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList(); + _queue.ReadPendingAsync(Arg.Any(), Arg.Any()) + .Returns( + Task.FromResult>(batch), + Task.FromResult>(batch), + Task.FromResult>(Array.Empty())); + + var calls = 0; + _client.IngestAuditEventsAsync(Arg.Any(), Arg.Any()) + .Returns(_ => + { + calls++; + if (calls == 1) + { + throw new InvalidOperationException("simulated gRPC failure"); + } + return Task.FromResult(AckAll(batch)); + }); + + // Act + CreateActor(); + + // Assert — eventually MarkForwardedAsync is called exactly once (after + // the retry succeeded). The first failure must NOT have called + // MarkForwardedAsync because the rows stay Pending. + await AwaitAssertAsync(async () => + { + await _queue.Received(1).MarkForwardedAsync( + Arg.Any>(), Arg.Any()); + }, TimeSpan.FromSeconds(10)); + + Assert.True(calls >= 2, $"Expected at least 2 client calls (1 failure + 1 retry); saw {calls}"); + } + + [Fact] + public async Task Drain_ZeroPending_SchedulesAtIdleInterval_NoClientCall() + { + // Arrange — queue always empty. + _queue.ReadPendingAsync(Arg.Any(), Arg.Any()) + .Returns(Task.FromResult>(Array.Empty())); + + // Idle interval = 2 s. Pause 3 s after the first tick (1 s busy on + // PreStart) and assert the empty-queue branch did NOT push to the + // client. + CreateActor(Opts(busySeconds: 1, idleSeconds: 2)); + + // Allow the initial tick (~1 s) + a generous window for the idle re-tick. + await Task.Delay(TimeSpan.FromSeconds(3)); + + await _client.DidNotReceiveWithAnyArgs().IngestAuditEventsAsync(default!, default); + + // ReadPendingAsync was called at least once (initial tick), and at + // most twice within the 3 s window (initial + one idle re-tick). + var readCalls = _queue.ReceivedCalls() + .Count(c => c.GetMethodInfo().Name == nameof(ISiteAuditQueue.ReadPendingAsync)); + Assert.InRange(readCalls, 1, 2); + } + + [Fact] + public async Task Drain_NonZeroPending_SchedulesAtBusyInterval() + { + // Arrange — every read returns 1 row. With busy=1s the actor should + // re-drain quickly, producing multiple client calls inside a short + // window. + var single = new List { NewEvent() }; + _queue.ReadPendingAsync(Arg.Any(), Arg.Any()) + .Returns(Task.FromResult>(single)); + + _client.IngestAuditEventsAsync(Arg.Any(), Arg.Any()) + .Returns(call => Task.FromResult(AckAll(single))); + + CreateActor(Opts(busySeconds: 1, idleSeconds: 10)); + + // 3-second window with busy=1s should fit at least 2 drains. + await Task.Delay(TimeSpan.FromSeconds(3)); + + var pushCalls = _client.ReceivedCalls() + .Count(c => c.GetMethodInfo().Name == nameof(ISiteStreamAuditClient.IngestAuditEventsAsync)); + Assert.True(pushCalls >= 2, + $"Expected ≥2 pushes within 3s when busy=1s; saw {pushCalls}"); + } + + [Fact] + public async Task Drain_AcceptedEventIdsSubset_OnlyMarksAccepted() + { + // Arrange — 5 rows pushed, but the central ack only lists 3. + var rows = Enumerable.Range(0, 5).Select(_ => NewEvent()).ToList(); + var ackedIds = rows.Take(3).Select(r => r.EventId).ToList(); + + _queue.ReadPendingAsync(Arg.Any(), Arg.Any()) + .Returns( + Task.FromResult>(rows), + Task.FromResult>(Array.Empty())); + + var partialAck = new IngestAck(); + foreach (var id in ackedIds) + { + partialAck.AcceptedEventIds.Add(id.ToString()); + } + _client.IngestAuditEventsAsync(Arg.Any(), Arg.Any()) + .Returns(Task.FromResult(partialAck)); + + // Act + CreateActor(); + + await AwaitAssertAsync(async () => + { + await _queue.Received(1).MarkForwardedAsync( + Arg.Any>(), Arg.Any()); + }, TimeSpan.FromSeconds(5)); + + // Assert — exactly the 3 ack'd ids made it to MarkForwardedAsync, not + // the other 2. + var ackedSet = ackedIds.ToHashSet(); + await _queue.Received(1).MarkForwardedAsync( + Arg.Is>(g => g.Count == 3 && g.ToHashSet().SetEquals(ackedSet)), + Arg.Any()); + } +}