diff --git a/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs b/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs index 01496bb..2b2c580 100644 --- a/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs +++ b/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; +using ScadaLink.ConfigurationDatabase; namespace ScadaLink.AuditLog.Central; @@ -61,6 +62,11 @@ public class AuditLogIngestActor : ReceiveActor _logger = logger; ReceiveAsync(OnIngestAsync); + // The single-repository test ctor cannot service the M3 dual-write — + // it has no SiteCalls repo and no DbContext. The handler still + // registers (so callers don't dead-letter) but replies empty so the + // test surface stays explicit about what this ctor supports. + ReceiveAsync(OnCachedTelemetryWithoutDualWriteAsync); } /// @@ -81,6 +87,7 @@ public class AuditLogIngestActor : ReceiveActor _logger = logger; ReceiveAsync(OnIngestAsync); + ReceiveAsync(OnCachedTelemetryAsync); } /// @@ -150,4 +157,98 @@ public class AuditLogIngestActor : ReceiveActor replyTo.Tell(new IngestAuditEventsReply(accepted)); } + + /// + /// M3 dual-write handler. For every the + /// actor opens a fresh MS SQL transaction, inserts the AuditLog row + /// idempotently AND upserts the SiteCalls row monotonically. Both succeed + /// or both roll back, so the audit and operational mirrors never drift + /// mid-row. The IngestedAtUtc stamp is unified between the two rows so a + /// downstream join lines up cleanly. + /// + /// + /// Per-entry isolation — one entry's failed transaction does NOT abort + /// other entries in the batch (each gets its own + /// + /// scope and a try/catch around it). Audit-write failure NEVER aborts the + /// user-facing action — the site keeps the row Pending and retries on the + /// next drain. + /// + private async Task OnCachedTelemetryAsync(IngestCachedTelemetryCommand cmd) + { + var replyTo = Sender; + var accepted = new List(cmd.Entries.Count); + + try + { + await using var scope = _serviceProvider!.CreateAsyncScope(); + var auditRepo = scope.ServiceProvider.GetRequiredService(); + var siteCallRepo = scope.ServiceProvider.GetRequiredService(); + var dbContext = scope.ServiceProvider.GetRequiredService(); + + foreach (var entry in cmd.Entries) + { + try + { + await using var tx = await dbContext.Database + .BeginTransactionAsync() + .ConfigureAwait(false); + + // Stamp IngestedAtUtc on both rows from a single + // central-side instant so a join on the two tables sees + // matching timestamps (debugging convenience, not a + // correctness invariant). + var ingestedAt = DateTime.UtcNow; + var auditStamped = entry.Audit with { IngestedAtUtc = ingestedAt }; + var siteCallStamped = entry.SiteCall with { IngestedAtUtc = ingestedAt }; + + await auditRepo.InsertIfNotExistsAsync(auditStamped) + .ConfigureAwait(false); + await siteCallRepo.UpsertAsync(siteCallStamped) + .ConfigureAwait(false); + + await tx.CommitAsync().ConfigureAwait(false); + accepted.Add(entry.Audit.EventId); + } + catch (Exception ex) + { + // Both rows rolled back via the disposing transaction. The + // EventId is NOT added to `accepted` so the site keeps its + // row Pending and retries on the next drain. Other entries + // in the batch continue with their own transactions. + _logger.LogError( + ex, + "Combined telemetry dual-write failed for AuditEvent {EventId} / TrackedOperationId {TrackedOpId}; rolled back.", + entry.Audit.EventId, + entry.SiteCall.TrackedOperationId); + } + } + } + catch (Exception ex) + { + // Resolving the scope itself threw (e.g. DI mis-wiring). Log and + // reply with whatever we managed to accept (likely empty) — the + // central singleton MUST stay alive. + _logger.LogError( + ex, + "Combined telemetry batch ingest failed before per-entry processing."); + } + + replyTo.Tell(new IngestCachedTelemetryReply(accepted)); + } + + /// + /// Fallback handler installed on the single-repository test ctor — that + /// ctor has no DbContext and no , so + /// it cannot service the dual-write. Logs a warning and replies with an + /// empty ack so callers fall through to their retry path. + /// + private Task OnCachedTelemetryWithoutDualWriteAsync(IngestCachedTelemetryCommand cmd) + { + _logger.LogWarning( + "AuditLogIngestActor received IngestCachedTelemetryCommand on the single-repository ctor; dual-write requires the IServiceProvider ctor. Replying with empty ack ({Count} entries).", + cmd.Entries.Count); + Sender.Tell(new IngestCachedTelemetryReply(Array.Empty())); + return Task.CompletedTask; + } } diff --git a/src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryCommand.cs b/src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryCommand.cs new file mode 100644 index 0000000..e7c63a1 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryCommand.cs @@ -0,0 +1,30 @@ +using ScadaLink.Commons.Entities.Audit; + +namespace ScadaLink.Commons.Messages.Audit; + +/// +/// Akka message sent to the central AuditLogIngestActor (Audit Log #23 M3 +/// Bundle D dual-write transaction) carrying a batch of combined audit + +/// site-call telemetry packets decoded by the SiteStreamGrpcServer from a +/// site's IngestCachedTelemetry gRPC RPC. For each entry the actor writes +/// the row AND the upsert inside +/// a single MS SQL transaction — both succeed or both roll back, so the audit +/// and operational mirrors never drift mid-row. +/// +/// +/// Lives in ScadaLink.Commons for the same reason as +/// IngestAuditEventsCommand: the gRPC server in +/// ScadaLink.Communication constructs it and ScadaLink.AuditLog +/// already references Communication. Putting the message in Commons avoids a +/// project-reference cycle. +/// +public sealed record IngestCachedTelemetryCommand(IReadOnlyList Entries); + +/// +/// One lifecycle event of a cached call: the to insert +/// (idempotent on ) plus the +/// to upsert (monotonic on +/// ). The two rows are paired so the +/// central dual-write transaction can commit them atomically. +/// +public sealed record CachedTelemetryEntry(AuditEvent Audit, SiteCall SiteCall); diff --git a/src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryReply.cs b/src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryReply.cs new file mode 100644 index 0000000..1de259d --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Audit/IngestCachedTelemetryReply.cs @@ -0,0 +1,10 @@ +namespace ScadaLink.Commons.Messages.Audit; + +/// +/// Reply from the central AuditLogIngestActor for an +/// . +/// lists every entry whose dual-write transaction (AuditLog INSERT + SiteCalls +/// UPSERT) committed; entries whose transaction rolled back are absent so the +/// site can leave the row Pending and retry on the next drain. +/// +public sealed record IngestCachedTelemetryReply(IReadOnlyList AcceptedEventIds); diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs index 71560c3..1da14ec 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs @@ -6,6 +6,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; using GrpcStatus = Grpc.Core.Status; @@ -298,9 +299,132 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase return ack; } + /// + /// Audit Log (#23) M3 site→central combined-telemetry push RPC. Decodes a + /// batch of entries into matched + /// (AuditEvent, SiteCall) pairs, Asks the central AuditLogIngestActor + /// proxy to persist them in dual-write transactions, and echoes the + /// AuditEvent EventIds that committed back so the site can flip its local + /// rows to Forwarded. + /// + /// + /// Same wiring-incomplete fallback as : when + /// the actor proxy has not been set the RPC replies with an empty ack so + /// sites treat the outcome as transient and retry, never a hard fault. + /// + public override async Task IngestCachedTelemetry( + CachedTelemetryBatch request, + ServerCallContext context) + { + if (request.Packets.Count == 0) + { + return new IngestAck(); + } + + var actor = _auditIngestActor; + if (actor is null) + { + _logger.LogWarning( + "IngestCachedTelemetry received {Count} packets before SetAuditIngestActor was called; returning empty ack.", + request.Packets.Count); + return new IngestAck(); + } + + var entries = new List(request.Packets.Count); + foreach (var packet in request.Packets) + { + var auditEvent = MapAuditEventFromDto(packet.AuditEvent); + var siteCall = MapSiteCallFromDto(packet.Operational); + entries.Add(new CachedTelemetryEntry(auditEvent, siteCall)); + } + + var cmd = new IngestCachedTelemetryCommand(entries); + IngestCachedTelemetryReply reply; + try + { + reply = await actor.Ask( + cmd, AuditIngestAskTimeout, context.CancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, + "AuditLogIngestActor Ask failed for combined telemetry batch of {Count} packets; returning empty ack.", + request.Packets.Count); + return new IngestAck(); + } + + var ack = new IngestAck(); + foreach (var id in reply.AcceptedEventIds) + { + ack.AcceptedEventIds.Add(id.ToString()); + } + return ack; + } + private static string? NullIfEmpty(string? value) => string.IsNullOrEmpty(value) ? null : value; + /// + /// Inlined audit-event DTO→entity translation, kept in sync with the + /// handler above. Extracted to a private + /// helper so the M3 dual-write RPC can reuse it without duplicating yet + /// another copy. The shape still mirrors + /// AuditEventMapper.FromDto in ScadaLink.AuditLog.Telemetry; + /// the two must evolve together (the project-reference cycle that + /// prevents calling the AuditLog mapper directly is documented on + /// ). + /// + private static AuditEvent MapAuditEventFromDto(AuditEventDto dto) => + new() + { + EventId = Guid.Parse(dto.EventId), + OccurredAtUtc = DateTime.SpecifyKind(dto.OccurredAtUtc.ToDateTime(), DateTimeKind.Utc), + IngestedAtUtc = null, + Channel = Enum.Parse(dto.Channel), + Kind = Enum.Parse(dto.Kind), + CorrelationId = NullIfEmpty(dto.CorrelationId) is { } cid ? Guid.Parse(cid) : null, + SourceSiteId = NullIfEmpty(dto.SourceSiteId), + SourceInstanceId = NullIfEmpty(dto.SourceInstanceId), + SourceScript = NullIfEmpty(dto.SourceScript), + Actor = NullIfEmpty(dto.Actor), + Target = NullIfEmpty(dto.Target), + Status = Enum.Parse(dto.Status), + HttpStatus = dto.HttpStatus, + DurationMs = dto.DurationMs, + ErrorMessage = NullIfEmpty(dto.ErrorMessage), + ErrorDetail = NullIfEmpty(dto.ErrorDetail), + RequestSummary = NullIfEmpty(dto.RequestSummary), + ResponseSummary = NullIfEmpty(dto.ResponseSummary), + PayloadTruncated = dto.PayloadTruncated, + Extra = NullIfEmpty(dto.Extra), + ForwardState = null, + }; + + /// + /// Translates a into the persistence + /// entity. is stamped here as a + /// placeholder; the central ingest actor overwrites it inside the + /// dual-write transaction so the AuditLog and SiteCalls rows share one + /// instant. + /// + private static SiteCall MapSiteCallFromDto(SiteCallOperationalDto dto) => new() + { + TrackedOperationId = TrackedOperationId.Parse(dto.TrackedOperationId), + Channel = dto.Channel, + Target = dto.Target, + SourceSite = dto.SourceSite, + Status = dto.Status, + RetryCount = dto.RetryCount, + LastError = string.IsNullOrEmpty(dto.LastError) ? null : dto.LastError, + HttpStatus = dto.HttpStatus, + CreatedAtUtc = DateTime.SpecifyKind(dto.CreatedAtUtc.ToDateTime(), DateTimeKind.Utc), + UpdatedAtUtc = DateTime.SpecifyKind(dto.UpdatedAtUtc.ToDateTime(), DateTimeKind.Utc), + TerminalAtUtc = dto.TerminalAtUtc is null + ? null + : DateTime.SpecifyKind(dto.TerminalAtUtc.ToDateTime(), DateTimeKind.Utc), + IngestedAtUtc = DateTime.UtcNow, // overwritten by AuditLogIngestActor + }; + /// /// Tracks a single active stream so cleanup only removes its own entry. /// diff --git a/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorCombinedTelemetryTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorCombinedTelemetryTests.cs new file mode 100644 index 0000000..f61d1cd --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorCombinedTelemetryTests.cs @@ -0,0 +1,391 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.AuditLog.Central; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; + +namespace ScadaLink.AuditLog.Tests.Central; + +/// +/// Bundle D D2 tests for 's M3 combined- +/// telemetry dual-write transaction. Uses the same +/// as the M1 + M2 repository tests so the actor exercises real +/// + +/// against a per-test MSSQL +/// database. The transaction commits or rolls back inside one +/// . +/// +public class AuditLogIngestActorCombinedTelemetryTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public AuditLogIngestActorCombinedTelemetryTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private ScadaLinkDbContext CreateReadContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaLinkDbContext(options); + } + + private static string NewSiteId() => + "test-bundle-d2-cached-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private static (AuditEvent audit, SiteCall siteCall) NewEntry( + string siteId, + TrackedOperationId? trackedOperationId = null, + Guid? eventId = null, + string status = "Submitted", + AuditStatus auditStatus = AuditStatus.Submitted) + { + var trackedId = trackedOperationId ?? TrackedOperationId.New(); + var now = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); + + var audit = new AuditEvent + { + EventId = eventId ?? Guid.NewGuid(), + OccurredAtUtc = now, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.CachedSubmit, + Status = auditStatus, + SourceSiteId = siteId, + CorrelationId = trackedId.Value, + }; + + var siteCall = new SiteCall + { + TrackedOperationId = trackedId, + Channel = "ApiOutbound", + Target = "ERP.GetOrder", + SourceSite = siteId, + Status = status, + RetryCount = 0, + CreatedAtUtc = now, + UpdatedAtUtc = now, + IngestedAtUtc = now, // overwritten by the actor + }; + + return (audit, siteCall); + } + + /// + /// Builds a minimal DI container around the per-test MSSQL fixture's + /// connection string — DbContext + the two repositories the dual-write + /// handler resolves. Mirrors AddConfigurationDatabase without the + /// DataProtection wiring (we never write secret columns in these tests). + /// + private IServiceProvider BuildServiceProvider( + Func? siteCallRepoFactory = null) + { + var services = new ServiceCollection(); + services.AddDbContext(opts => + opts.UseSqlServer(_fixture.ConnectionString) + .ConfigureWarnings(w => w.Ignore( + Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning))); + services.AddScoped(sp => + new AuditLogRepository(sp.GetRequiredService())); + if (siteCallRepoFactory is null) + { + services.AddScoped(sp => + new SiteCallAuditRepository(sp.GetRequiredService())); + } + else + { + services.AddScoped(sp => + siteCallRepoFactory(sp.GetRequiredService())); + } + return services.BuildServiceProvider(); + } + + private IActorRef CreateActor(IServiceProvider serviceProvider) => + Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( + serviceProvider, + NullLogger.Instance))); + + [SkippableFact] + public async Task Receive_OneCachedPacket_WritesAuditRow_AND_SiteCallRow_AcksId() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var (audit, siteCall) = NewEntry(siteId); + + var sp = BuildServiceProvider(); + var actor = CreateActor(sp); + + actor.Tell( + new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit, siteCall) }), + TestActor); + + var reply = ExpectMsg(TimeSpan.FromSeconds(15)); + Assert.Single(reply.AcceptedEventIds); + Assert.Equal(audit.EventId, reply.AcceptedEventIds[0]); + + // Verify rows landed in both tables. + await using var read = CreateReadContext(); + var auditRow = await read.Set().SingleOrDefaultAsync(e => e.EventId == audit.EventId); + Assert.NotNull(auditRow); + Assert.NotNull(auditRow!.IngestedAtUtc); + + var siteCallRow = await read.Set() + .SingleOrDefaultAsync(s => s.TrackedOperationId == siteCall.TrackedOperationId); + Assert.NotNull(siteCallRow); + Assert.Equal(siteCall.Status, siteCallRow!.Status); + } + + [SkippableFact] + public async Task Receive_DuplicateEventId_SameStatus_NoOp_RowCountUnchanged_AcksId() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var trackedId = TrackedOperationId.New(); + var eventId = Guid.NewGuid(); + var (audit, siteCall) = NewEntry(siteId, trackedId, eventId); + + var sp = BuildServiceProvider(); + var actor = CreateActor(sp); + + // First write + actor.Tell( + new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit, siteCall) }), + TestActor); + ExpectMsg(TimeSpan.FromSeconds(15)); + + // Second write — same EventId and TrackedOperationId, same status. Both + // the audit insert (idempotent) and the SiteCalls upsert (monotonic + // same-rank → no-op) should silently do nothing while still acking. + actor.Tell( + new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit, siteCall) }), + TestActor); + var reply = ExpectMsg(TimeSpan.FromSeconds(15)); + + Assert.Single(reply.AcceptedEventIds); + Assert.Equal(eventId, reply.AcceptedEventIds[0]); + + await using var read = CreateReadContext(); + var auditCount = await read.Set().CountAsync(e => e.EventId == eventId); + Assert.Equal(1, auditCount); + + var siteCallCount = await read.Set() + .CountAsync(s => s.TrackedOperationId == trackedId); + Assert.Equal(1, siteCallCount); + } + + [SkippableFact] + public async Task Receive_DuplicateEventId_AdvancedSiteCallStatus_UpdatesSiteCall() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var trackedId = TrackedOperationId.New(); + + var sp = BuildServiceProvider(); + var actor = CreateActor(sp); + + // 1st packet — Submitted (audit EventId #1, SiteCalls Status=Submitted). + var (auditSubmit, siteCallSubmit) = NewEntry( + siteId, trackedId, status: "Submitted", auditStatus: AuditStatus.Submitted); + actor.Tell( + new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(auditSubmit, siteCallSubmit) }), + TestActor); + ExpectMsg(TimeSpan.FromSeconds(15)); + + // 2nd packet — Attempted with retry count 1 (audit EventId #2, + // SiteCalls Status=Attempted — monotonic upsert wins). Same + // TrackedOperationId throughout. + var (auditAttempt, siteCallAttempt) = NewEntry( + siteId, trackedId, status: "Attempted", auditStatus: AuditStatus.Attempted); + var advanced = siteCallAttempt with { RetryCount = 1, UpdatedAtUtc = siteCallAttempt.UpdatedAtUtc.AddMinutes(1) }; + actor.Tell( + new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(auditAttempt, advanced) }), + TestActor); + var reply = ExpectMsg(TimeSpan.FromSeconds(15)); + + Assert.Single(reply.AcceptedEventIds); + Assert.Equal(auditAttempt.EventId, reply.AcceptedEventIds[0]); + + // Both audit rows exist. + await using var read = CreateReadContext(); + var auditRows = await read.Set() + .Where(e => e.SourceSiteId == siteId) + .ToListAsync(); + Assert.Equal(2, auditRows.Count); + + // SiteCalls row advanced to Attempted with retry count 1. + var siteCallRow = await read.Set() + .SingleAsync(s => s.TrackedOperationId == trackedId); + Assert.Equal("Attempted", siteCallRow.Status); + Assert.Equal(1, siteCallRow.RetryCount); + } + + [SkippableFact] + public async Task Receive_AuditInsertSucceeds_SiteCallThrows_BothRolledBack_NoOrphanRow() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var (audit, siteCall) = NewEntry(siteId); + + // Wrap the SiteCalls repo so UpsertAsync always throws — the dual-write + // transaction must roll back the AuditLog INSERT done in the same + // transaction, leaving no orphan row. + var sp = BuildServiceProvider( + ctx => new ThrowingSiteCallRepo(new SiteCallAuditRepository(ctx))); + var actor = CreateActor(sp); + + actor.Tell( + new IngestCachedTelemetryCommand(new[] { new CachedTelemetryEntry(audit, siteCall) }), + TestActor); + + var reply = ExpectMsg(TimeSpan.FromSeconds(15)); + Assert.Empty(reply.AcceptedEventIds); + + await using var read = CreateReadContext(); + var auditRow = await read.Set().SingleOrDefaultAsync(e => e.EventId == audit.EventId); + Assert.Null(auditRow); + + var siteCallRow = await read.Set() + .SingleOrDefaultAsync(s => s.TrackedOperationId == siteCall.TrackedOperationId); + Assert.Null(siteCallRow); + } + + [SkippableFact] + public async Task Receive_FiveCachedPackets_AllPersistedSeparately_AllAcked() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var entries = Enumerable.Range(0, 5).Select(_ => + { + var (audit, siteCall) = NewEntry(siteId); + return new CachedTelemetryEntry(audit, siteCall); + }).ToList(); + + var sp = BuildServiceProvider(); + var actor = CreateActor(sp); + + actor.Tell(new IngestCachedTelemetryCommand(entries), TestActor); + + var reply = ExpectMsg(TimeSpan.FromSeconds(15)); + Assert.Equal(5, reply.AcceptedEventIds.Count); + Assert.True(entries.Select(e => e.Audit.EventId).ToHashSet() + .SetEquals(reply.AcceptedEventIds.ToHashSet())); + + await using var read = CreateReadContext(); + var auditCount = await read.Set().CountAsync(e => e.SourceSiteId == siteId); + Assert.Equal(5, auditCount); + + var siteCallCount = await read.Set().CountAsync(s => s.SourceSite == siteId); + Assert.Equal(5, siteCallCount); + } + + [SkippableFact] + public async Task Receive_OnePacketSucceeds_NextPacketThrows_FirstStillCommitted_BatchContinues() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var (audit1, siteCall1) = NewEntry(siteId); + var (audit2, siteCall2) = NewEntry(siteId); + var (audit3, siteCall3) = NewEntry(siteId); + var poisonTrackedId = siteCall2.TrackedOperationId; + + // Throw only for the middle entry's TrackedOperationId — entries on + // either side must commit their own transactions independently. + var sp = BuildServiceProvider( + ctx => new PoisonOnIdSiteCallRepo(new SiteCallAuditRepository(ctx), poisonTrackedId)); + var actor = CreateActor(sp); + + actor.Tell( + new IngestCachedTelemetryCommand(new[] + { + new CachedTelemetryEntry(audit1, siteCall1), + new CachedTelemetryEntry(audit2, siteCall2), + new CachedTelemetryEntry(audit3, siteCall3), + }), + TestActor); + + var reply = ExpectMsg(TimeSpan.FromSeconds(15)); + + // Two entries committed; poison entry rolled back. + Assert.Equal(2, reply.AcceptedEventIds.Count); + Assert.Contains(audit1.EventId, reply.AcceptedEventIds); + Assert.Contains(audit3.EventId, reply.AcceptedEventIds); + Assert.DoesNotContain(audit2.EventId, reply.AcceptedEventIds); + + await using var read = CreateReadContext(); + var auditRows = await read.Set().Where(e => e.SourceSiteId == siteId).ToListAsync(); + Assert.Equal(2, auditRows.Count); + Assert.DoesNotContain(auditRows, r => r.EventId == audit2.EventId); + + var siteCallRows = await read.Set().Where(s => s.SourceSite == siteId).ToListAsync(); + Assert.Equal(2, siteCallRows.Count); + Assert.DoesNotContain(siteCallRows, r => r.TrackedOperationId == poisonTrackedId); + } + + /// + /// Test double — throws unconditionally from so + /// the dual-write transaction is forced to roll back. Lets the AuditLog + /// row insert succeed in-transaction; the rollback must remove it. + /// + private sealed class ThrowingSiteCallRepo : ISiteCallAuditRepository + { + private readonly ISiteCallAuditRepository _inner; + public ThrowingSiteCallRepo(ISiteCallAuditRepository inner) { _inner = inner; } + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => + throw new InvalidOperationException("simulated SiteCalls upsert failure"); + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => + _inner.GetAsync(id, ct); + public Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => + _inner.QueryAsync(filter, paging, ct); + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => + _inner.PurgeTerminalAsync(olderThanUtc, ct); + } + + /// + /// Test double — throws only when the supplied poison TrackedOperationId + /// is the one being upserted. Demonstrates per-entry transaction isolation: + /// one entry's failed transaction must not abort the batch's other entries. + /// + private sealed class PoisonOnIdSiteCallRepo : ISiteCallAuditRepository + { + private readonly ISiteCallAuditRepository _inner; + private readonly TrackedOperationId _poisonId; + public PoisonOnIdSiteCallRepo(ISiteCallAuditRepository inner, TrackedOperationId poisonId) + { + _inner = inner; + _poisonId = poisonId; + } + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) + { + if (siteCall.TrackedOperationId == _poisonId) + { + throw new InvalidOperationException("simulated SiteCalls upsert failure for poison id"); + } + return _inner.UpsertAsync(siteCall, ct); + } + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => + _inner.GetAsync(id, ct); + public Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => + _inner.QueryAsync(filter, paging, ct); + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => + _inner.PurgeTerminalAsync(olderThanUtc, ct); + } +} diff --git a/tests/ScadaLink.Communication.Tests/SiteStreamIngestCachedTelemetryTests.cs b/tests/ScadaLink.Communication.Tests/SiteStreamIngestCachedTelemetryTests.cs new file mode 100644 index 0000000..b5194a0 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/SiteStreamIngestCachedTelemetryTests.cs @@ -0,0 +1,121 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests; + +/// +/// Bundle D D2 tests for . +/// Verifies the DTO→entity→actor→ack round-trip through the gRPC handler. A +/// tiny EchoCachedIngestActor stands in for the central +/// AuditLogIngestActor, replying with the EventIds it received so the +/// test asserts the wiring without depending on MSSQL. +/// +public class SiteStreamIngestCachedTelemetryTests : TestKit +{ + private readonly ISiteStreamSubscriber _subscriber = Substitute.For(); + + private SiteStreamGrpcServer CreateServer() => + new(_subscriber, NullLogger.Instance); + + private static ServerCallContext NewContext(CancellationToken ct = default) + { + var context = Substitute.For(); + context.CancellationToken.Returns(ct); + return context; + } + + private static CachedTelemetryPacket NewPacket(Guid? eventId = null, Guid? trackedId = null) + { + var now = Timestamp.FromDateTime( + DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 0, 0), DateTimeKind.Utc)); + return new CachedTelemetryPacket + { + AuditEvent = new AuditEventDto + { + EventId = (eventId ?? Guid.NewGuid()).ToString(), + OccurredAtUtc = now, + Channel = "ApiOutbound", + Kind = "CachedSubmit", + Status = "Submitted", + SourceSiteId = "site-1", + CorrelationId = (trackedId ?? Guid.NewGuid()).ToString(), + }, + Operational = new SiteCallOperationalDto + { + TrackedOperationId = (trackedId ?? Guid.NewGuid()).ToString(), + Channel = "ApiOutbound", + Target = "ERP.GetOrder", + SourceSite = "site-1", + Status = "Submitted", + RetryCount = 0, + CreatedAtUtc = now, + UpdatedAtUtc = now, + }, + }; + } + + [Fact] + public async Task IngestCachedTelemetry_RoutesToActor_ReturnsReply() + { + // Arrange — stub actor that echoes every received EventId back. + var stubActor = Sys.ActorOf(Props.Create(() => new EchoCachedIngestActor())); + + var server = CreateServer(); + server.SetAuditIngestActor(stubActor); + + var packets = Enumerable.Range(0, 3) + .Select(_ => NewPacket()) + .ToList(); + + var batch = new CachedTelemetryBatch(); + batch.Packets.AddRange(packets); + + // Act + var ack = await server.IngestCachedTelemetry(batch, NewContext()); + + // Assert — every packet's EventId appears in the ack, demonstrating + // end-to-end routing through the actor. + Assert.Equal(3, ack.AcceptedEventIds.Count); + var expectedIds = packets.Select(p => p.AuditEvent.EventId).ToHashSet(); + Assert.True(expectedIds.SetEquals(ack.AcceptedEventIds.ToHashSet())); + } + + [Fact] + public async Task IngestCachedTelemetry_NoActorWired_ReturnsEmptyAck() + { + var server = CreateServer(); + // Intentionally do NOT call SetAuditIngestActor — simulates host + // startup race window. + + var batch = new CachedTelemetryBatch(); + batch.Packets.Add(NewPacket()); + + var ack = await server.IngestCachedTelemetry(batch, NewContext()); + + Assert.Empty(ack.AcceptedEventIds); + } + + /// + /// Tiny ReceiveActor that echoes every EventId in an incoming + /// back as an + /// . Stands in for the central + /// AuditLogIngestActor so this test never touches MSSQL. + /// + private sealed class EchoCachedIngestActor : ReceiveActor + { + public EchoCachedIngestActor() + { + Receive(cmd => + { + var ids = cmd.Entries.Select(e => e.Audit.EventId).ToList(); + Sender.Tell(new IngestCachedTelemetryReply(ids)); + }); + } + } +}