feat(auditlog): AuditLogIngestActor + gRPC handler (#23)
This commit is contained in:
95
src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs
Normal file
95
src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs
Normal file
@@ -0,0 +1,95 @@
|
||||
using Akka.Actor;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
using ScadaLink.Commons.Messages.Audit;
|
||||
|
||||
namespace ScadaLink.AuditLog.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Central-side singleton (per Bundle E wiring) that ingests batches of
|
||||
/// <see cref="AuditEvent"/> rows pushed from sites via the
|
||||
/// <c>IngestAuditEvents</c> gRPC RPC. Each row is stamped with the central-side
|
||||
/// <see cref="AuditEvent.IngestedAtUtc"/> and inserted idempotently via
|
||||
/// <see cref="IAuditLogRepository.InsertIfNotExistsAsync"/> — duplicates are
|
||||
/// silently swallowed (first-write-wins per Bundle A's hardening).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Idempotency is the contract: a row that already exists at central counts
|
||||
/// as "accepted" for the purposes of the reply, because the storage state is
|
||||
/// consistent and the site is free to flip its local row to <c>Forwarded</c>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Per Bundle D's brief, audit-write failures must NEVER abort the user-facing
|
||||
/// action. The actor wraps each repository call in its own try/catch so a
|
||||
/// single bad row cannot cause the rest of the batch to be lost; the actor's
|
||||
/// <see cref="SupervisorStrategy"/> uses <c>Resume</c> so a thrown exception
|
||||
/// inside <c>ReceiveAsync</c> does not restart the actor (which would also
|
||||
/// reset any in-flight state).
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public class AuditLogIngestActor : ReceiveActor
|
||||
{
|
||||
private readonly IAuditLogRepository _repository;
|
||||
private readonly ILogger<AuditLogIngestActor> _logger;
|
||||
|
||||
public AuditLogIngestActor(
|
||||
IAuditLogRepository repository,
|
||||
ILogger<AuditLogIngestActor> logger)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(repository);
|
||||
ArgumentNullException.ThrowIfNull(logger);
|
||||
|
||||
_repository = repository;
|
||||
_logger = logger;
|
||||
|
||||
ReceiveAsync<IngestAuditEventsCommand>(OnIngestAsync);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Audit-write failures are best-effort by design (see alog.md §13): a
|
||||
/// thrown exception in the ingest pipeline must not crash the actor.
|
||||
/// Resume keeps the actor's state intact so the next batch is processed
|
||||
/// against the same repository instance.
|
||||
/// </summary>
|
||||
protected override SupervisorStrategy SupervisorStrategy()
|
||||
{
|
||||
return new OneForOneStrategy(maxNrOfRetries: 0, withinTimeRange: TimeSpan.Zero, decider:
|
||||
Akka.Actor.SupervisorStrategy.DefaultDecider);
|
||||
}
|
||||
|
||||
private async Task OnIngestAsync(IngestAuditEventsCommand cmd)
|
||||
{
|
||||
// Sender is captured before the first await — Akka resets Sender
|
||||
// between message dispatches, so a post-await Tell would go to
|
||||
// DeadLetters.
|
||||
var replyTo = Sender;
|
||||
var nowUtc = DateTime.UtcNow;
|
||||
var accepted = new List<Guid>(cmd.Events.Count);
|
||||
|
||||
foreach (var evt in cmd.Events)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Stamp IngestedAtUtc here, not at the site. Bundle A's
|
||||
// repository hardening already swallows duplicate-key races,
|
||||
// so the same id arriving twice (site retry, reconciliation)
|
||||
// is a silent no-op.
|
||||
var ingested = evt with { IngestedAtUtc = nowUtc };
|
||||
await _repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
|
||||
accepted.Add(evt.EventId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Per-row catch — one bad row never sinks the whole batch.
|
||||
// The row stays Pending at the site; the next drain retries.
|
||||
_logger.LogError(ex,
|
||||
"Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.",
|
||||
evt.EventId);
|
||||
}
|
||||
}
|
||||
|
||||
replyTo.Tell(new IngestAuditEventsReply(accepted));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
|
||||
namespace ScadaLink.Commons.Messages.Audit;
|
||||
|
||||
/// <summary>
|
||||
/// Akka message sent to the central <c>AuditLogIngestActor</c> (Audit Log #23,
|
||||
/// M2 site-sync pipeline) carrying a batch of <see cref="AuditEvent"/> rows
|
||||
/// decoded by the <c>SiteStreamGrpcServer</c> from a site's
|
||||
/// <c>IngestAuditEvents</c> gRPC RPC. The actor stamps
|
||||
/// <see cref="AuditEvent.IngestedAtUtc"/> and writes the rows idempotently to
|
||||
/// the central <c>AuditLog</c> table.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Lives in <c>ScadaLink.Commons</c> rather than <c>ScadaLink.AuditLog</c>
|
||||
/// because the gRPC server in <c>ScadaLink.Communication</c> needs to construct
|
||||
/// it, and <c>ScadaLink.AuditLog</c> already references
|
||||
/// <c>ScadaLink.Communication</c> (the proto DTOs live there). Putting the
|
||||
/// message in Commons avoids a project-reference cycle.
|
||||
/// </remarks>
|
||||
public sealed record IngestAuditEventsCommand(IReadOnlyList<AuditEvent> Events);
|
||||
@@ -0,0 +1,11 @@
|
||||
namespace ScadaLink.Commons.Messages.Audit;
|
||||
|
||||
/// <summary>
|
||||
/// Reply from the central <c>AuditLogIngestActor</c> for an
|
||||
/// <see cref="IngestAuditEventsCommand"/>. <see cref="AcceptedEventIds"/> lists
|
||||
/// every row the actor considers durably persisted at central — including ids
|
||||
/// that were already present before the call (first-write-wins idempotency).
|
||||
/// The gRPC handler echoes these ids back over the wire as the <c>IngestAck</c>
|
||||
/// the site uses to flip rows to <c>Forwarded</c>.
|
||||
/// </summary>
|
||||
public sealed record IngestAuditEventsReply(IReadOnlyList<Guid> AcceptedEventIds);
|
||||
@@ -4,6 +4,9 @@ using Akka.Actor;
|
||||
using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Messages.Audit;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using GrpcStatus = Grpc.Core.Status;
|
||||
|
||||
namespace ScadaLink.Communication.Grpc;
|
||||
@@ -23,6 +26,15 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
private readonly TimeSpan _maxStreamLifetime;
|
||||
private volatile bool _ready;
|
||||
private long _actorCounter;
|
||||
// Audit Log (#23 M2): central-side ingest actor proxy. Set by the host
|
||||
// after the cluster singleton starts (see Bundle E wiring). When null the
|
||||
// IngestAuditEvents RPC replies with an empty IngestAck so sites can
|
||||
// safely retry — wiring-incomplete is treated as transient, never fatal.
|
||||
private IActorRef? _auditIngestActor;
|
||||
// Per Bundle D's brief — Ask timeout is 30 s. The ingest actor's repo
|
||||
// calls are sub-100 ms in steady state; a generous timeout absorbs a slow
|
||||
// MSSQL connection without surfacing as a gRPC failure on a healthy site.
|
||||
private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
|
||||
|
||||
/// <summary>
|
||||
/// Test-only constructor — kept <c>internal</c> so the DI container sees a
|
||||
@@ -76,6 +88,19 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
_ready = true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Hands the central-side <c>AuditLogIngestActor</c> proxy to the gRPC
|
||||
/// server so the <see cref="IngestAuditEvents"/> RPC can route incoming
|
||||
/// site batches. Audit Log (#23) M2 wiring point — mirrors the way
|
||||
/// <c>CommunicationService.SetNotificationOutbox</c> takes the Notification
|
||||
/// Outbox singleton proxy. Bundle E supplies the actor after the cluster
|
||||
/// singleton starts.
|
||||
/// </summary>
|
||||
public void SetAuditIngestActor(IActorRef proxy)
|
||||
{
|
||||
_auditIngestActor = proxy;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
|
||||
/// </summary>
|
||||
@@ -168,6 +193,114 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Audit Log (#23) M2 site→central push RPC. Decodes a site batch into
|
||||
/// <see cref="AuditEvent"/> rows, Asks the central <c>AuditLogIngestActor</c>
|
||||
/// proxy to persist them, and echoes the accepted EventIds back so the site
|
||||
/// can flip its local rows to <c>Forwarded</c>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The DTO→entity conversion is inlined here (rather than calling the
|
||||
/// AuditLog mapper) to avoid a project-reference cycle:
|
||||
/// <c>ScadaLink.AuditLog</c> already references
|
||||
/// <c>ScadaLink.Communication</c>, so the gRPC server cannot reach back
|
||||
/// into AuditLog for its mapper. The shape mirrors
|
||||
/// <c>AuditEventMapper.FromDto</c> in <c>ScadaLink.AuditLog.Telemetry</c>;
|
||||
/// the two must evolve together.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// When <see cref="_auditIngestActor"/> is not yet wired (host startup
|
||||
/// race window), the RPC returns an empty <see cref="IngestAck"/> rather
|
||||
/// than failing — the site treats the missing ack as a transient outcome
|
||||
/// and retries on the next drain, which is the desired idempotent
|
||||
/// behaviour.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public override async Task<IngestAck> IngestAuditEvents(
|
||||
AuditEventBatch request,
|
||||
ServerCallContext context)
|
||||
{
|
||||
// Empty batch is a no-op; reply immediately so the client moves on.
|
||||
if (request.Events.Count == 0)
|
||||
{
|
||||
return new IngestAck();
|
||||
}
|
||||
|
||||
var actor = _auditIngestActor;
|
||||
if (actor is null)
|
||||
{
|
||||
// Wiring incomplete (host startup race). Sites treat an empty
|
||||
// ack as "nothing was acked, leave rows Pending, retry next
|
||||
// drain" — exactly the right behaviour during host bring-up.
|
||||
_logger.LogWarning(
|
||||
"IngestAuditEvents received {Count} events before SetAuditIngestActor was called; returning empty ack.",
|
||||
request.Events.Count);
|
||||
return new IngestAck();
|
||||
}
|
||||
|
||||
// Inlined FromDto. Keep in sync with AuditEventMapper.FromDto in
|
||||
// ScadaLink.AuditLog.Telemetry — there is no shared mapper because
|
||||
// doing so would create a project-reference cycle (AuditLog → Communication).
|
||||
var entities = new List<AuditEvent>(request.Events.Count);
|
||||
foreach (var dto in request.Events)
|
||||
{
|
||||
entities.Add(new AuditEvent
|
||||
{
|
||||
EventId = Guid.Parse(dto.EventId),
|
||||
OccurredAtUtc = DateTime.SpecifyKind(dto.OccurredAtUtc.ToDateTime(), DateTimeKind.Utc),
|
||||
IngestedAtUtc = null,
|
||||
Channel = Enum.Parse<AuditChannel>(dto.Channel),
|
||||
Kind = Enum.Parse<AuditKind>(dto.Kind),
|
||||
CorrelationId = string.IsNullOrEmpty(dto.CorrelationId) ? null : Guid.Parse(dto.CorrelationId),
|
||||
SourceSiteId = NullIfEmpty(dto.SourceSiteId),
|
||||
SourceInstanceId = NullIfEmpty(dto.SourceInstanceId),
|
||||
SourceScript = NullIfEmpty(dto.SourceScript),
|
||||
Actor = NullIfEmpty(dto.Actor),
|
||||
Target = NullIfEmpty(dto.Target),
|
||||
Status = Enum.Parse<AuditStatus>(dto.Status),
|
||||
HttpStatus = dto.HttpStatus,
|
||||
DurationMs = dto.DurationMs,
|
||||
ErrorMessage = NullIfEmpty(dto.ErrorMessage),
|
||||
ErrorDetail = NullIfEmpty(dto.ErrorDetail),
|
||||
RequestSummary = NullIfEmpty(dto.RequestSummary),
|
||||
ResponseSummary = NullIfEmpty(dto.ResponseSummary),
|
||||
PayloadTruncated = dto.PayloadTruncated,
|
||||
Extra = NullIfEmpty(dto.Extra),
|
||||
ForwardState = null,
|
||||
});
|
||||
}
|
||||
|
||||
var cmd = new IngestAuditEventsCommand(entities);
|
||||
IngestAuditEventsReply reply;
|
||||
try
|
||||
{
|
||||
reply = await actor.Ask<IngestAuditEventsReply>(
|
||||
cmd, AuditIngestAskTimeout, context.CancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Audit ingest is best-effort; failing this RPC at the gRPC layer
|
||||
// would surface as a transport error and force the site to retry
|
||||
// (which it would do anyway). Logging + an empty ack keeps the
|
||||
// semantics consistent with the "wiring incomplete" path above.
|
||||
_logger.LogError(ex,
|
||||
"AuditLogIngestActor Ask failed for batch of {Count} events; returning empty ack.",
|
||||
request.Events.Count);
|
||||
return new IngestAck();
|
||||
}
|
||||
|
||||
var ack = new IngestAck();
|
||||
foreach (var id in reply.AcceptedEventIds)
|
||||
{
|
||||
ack.AcceptedEventIds.Add(id.ToString());
|
||||
}
|
||||
return ack;
|
||||
}
|
||||
|
||||
private static string? NullIfEmpty(string? value) =>
|
||||
string.IsNullOrEmpty(value) ? null : value;
|
||||
|
||||
/// <summary>
|
||||
/// Tracks a single active stream so cleanup only removes its own entry.
|
||||
/// </summary>
|
||||
|
||||
@@ -0,0 +1,220 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ScadaLink.AuditLog.Central;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
using ScadaLink.Commons.Messages.Audit;
|
||||
using ScadaLink.Commons.Types.Audit;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.ConfigurationDatabase;
|
||||
using ScadaLink.ConfigurationDatabase.Repositories;
|
||||
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
||||
|
||||
namespace ScadaLink.AuditLog.Tests.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Bundle D D2 tests for <see cref="AuditLogIngestActor"/>. Uses the same
|
||||
/// <see cref="MsSqlMigrationFixture"/> as the M1 repository tests so the actor
|
||||
/// exercises real <see cref="AuditLogRepository.InsertIfNotExistsAsync"/>
|
||||
/// against a partitioned MSSQL schema (the only way to verify the
|
||||
/// IngestedAtUtc stamp + duplicate-key idempotency end to end).
|
||||
/// </summary>
|
||||
public class AuditLogIngestActorTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
||||
{
|
||||
private readonly MsSqlMigrationFixture _fixture;
|
||||
|
||||
public AuditLogIngestActorTests(MsSqlMigrationFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
}
|
||||
|
||||
private ScadaLinkDbContext CreateContext()
|
||||
{
|
||||
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
||||
.UseSqlServer(_fixture.ConnectionString)
|
||||
.Options;
|
||||
return new ScadaLinkDbContext(options);
|
||||
}
|
||||
|
||||
private static string NewSiteId() =>
|
||||
"test-bundle-d2-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||
|
||||
private static AuditEvent NewEvent(string siteId, Guid? id = null) => new()
|
||||
{
|
||||
EventId = id ?? Guid.NewGuid(),
|
||||
OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
|
||||
Channel = AuditChannel.ApiOutbound,
|
||||
Kind = AuditKind.ApiCall,
|
||||
Status = AuditStatus.Delivered,
|
||||
SourceSiteId = siteId,
|
||||
};
|
||||
|
||||
private IActorRef CreateActor(IAuditLogRepository repository) =>
|
||||
Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
|
||||
repository,
|
||||
NullLogger<AuditLogIngestActor>.Instance)));
|
||||
|
||||
[SkippableFact]
|
||||
public async Task Receive_BatchOf5_Calls_Repo_5Times_Acks_All_5()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = NewSiteId();
|
||||
var events = Enumerable.Range(0, 5).Select(_ => NewEvent(siteId)).ToList();
|
||||
|
||||
await using var context = CreateContext();
|
||||
var repo = new AuditLogRepository(context);
|
||||
var actor = CreateActor(repo);
|
||||
|
||||
actor.Tell(new IngestAuditEventsCommand(events), TestActor);
|
||||
|
||||
var reply = ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
|
||||
Assert.Equal(5, reply.AcceptedEventIds.Count);
|
||||
Assert.True(events.Select(e => e.EventId).ToHashSet().SetEquals(reply.AcceptedEventIds.ToHashSet()));
|
||||
|
||||
// Verify rows landed in MSSQL.
|
||||
await using var readContext = CreateContext();
|
||||
var rows = await readContext.Set<AuditEvent>()
|
||||
.Where(e => e.SourceSiteId == siteId)
|
||||
.ToListAsync();
|
||||
Assert.Equal(5, rows.Count);
|
||||
}
|
||||
|
||||
[SkippableFact]
|
||||
public async Task Receive_BatchWith_AlreadyExistingEvent_AcksAll_NoDoubleInsert()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = NewSiteId();
|
||||
var pre = NewEvent(siteId);
|
||||
|
||||
// Pre-insert one event directly via the repo so the actor sees it
|
||||
// already present when it processes the batch.
|
||||
await using (var seedContext = CreateContext())
|
||||
{
|
||||
var seedRepo = new AuditLogRepository(seedContext);
|
||||
await seedRepo.InsertIfNotExistsAsync(pre);
|
||||
}
|
||||
|
||||
// Build the batch including the pre-existing event plus 2 new ones.
|
||||
var fresh1 = NewEvent(siteId);
|
||||
var fresh2 = NewEvent(siteId);
|
||||
var batch = new List<AuditEvent> { pre, fresh1, fresh2 };
|
||||
|
||||
await using var context = CreateContext();
|
||||
var repo = new AuditLogRepository(context);
|
||||
var actor = CreateActor(repo);
|
||||
|
||||
actor.Tell(new IngestAuditEventsCommand(batch), TestActor);
|
||||
|
||||
var reply = ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
|
||||
// All 3 acked under idempotent first-write-wins.
|
||||
Assert.Equal(3, reply.AcceptedEventIds.Count);
|
||||
|
||||
// Verify no double-insert.
|
||||
await using var readContext = CreateContext();
|
||||
var count = await readContext.Set<AuditEvent>()
|
||||
.Where(e => e.SourceSiteId == siteId)
|
||||
.CountAsync();
|
||||
Assert.Equal(3, count);
|
||||
}
|
||||
|
||||
[SkippableFact]
|
||||
public async Task Receive_Sets_IngestedAtUtc_Before_Insert()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = NewSiteId();
|
||||
var events = Enumerable.Range(0, 3).Select(_ => NewEvent(siteId)).ToList();
|
||||
|
||||
var before = DateTime.UtcNow.AddSeconds(-1);
|
||||
|
||||
await using var context = CreateContext();
|
||||
var repo = new AuditLogRepository(context);
|
||||
var actor = CreateActor(repo);
|
||||
|
||||
actor.Tell(new IngestAuditEventsCommand(events), TestActor);
|
||||
ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
|
||||
|
||||
var after = DateTime.UtcNow.AddSeconds(1);
|
||||
|
||||
await using var readContext = CreateContext();
|
||||
var rows = await readContext.Set<AuditEvent>()
|
||||
.Where(e => e.SourceSiteId == siteId)
|
||||
.ToListAsync();
|
||||
|
||||
Assert.Equal(3, rows.Count);
|
||||
Assert.All(rows, r =>
|
||||
{
|
||||
Assert.NotNull(r.IngestedAtUtc);
|
||||
Assert.InRange(r.IngestedAtUtc!.Value, before, after);
|
||||
});
|
||||
}
|
||||
|
||||
[SkippableFact]
|
||||
public async Task Receive_RepoThrowsForOneEvent_Other4StillPersisted()
|
||||
{
|
||||
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||
|
||||
var siteId = NewSiteId();
|
||||
var events = Enumerable.Range(0, 5).Select(_ => NewEvent(siteId)).ToList();
|
||||
var poisonId = events[2].EventId;
|
||||
|
||||
// Wrapper repo that throws only when the poison EventId is being
|
||||
// inserted. The four neighbours must still land in MSSQL.
|
||||
await using var context = CreateContext();
|
||||
var realRepo = new AuditLogRepository(context);
|
||||
var wrappedRepo = new ThrowingRepository(realRepo, poisonId);
|
||||
var actor = CreateActor(wrappedRepo);
|
||||
|
||||
actor.Tell(new IngestAuditEventsCommand(events), TestActor);
|
||||
var reply = ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
|
||||
|
||||
// The actor catches the throw per-row, so 4 ids are accepted and 1 is
|
||||
// left out.
|
||||
Assert.Equal(4, reply.AcceptedEventIds.Count);
|
||||
Assert.DoesNotContain(poisonId, reply.AcceptedEventIds);
|
||||
|
||||
await using var readContext = CreateContext();
|
||||
var rows = await readContext.Set<AuditEvent>()
|
||||
.Where(e => e.SourceSiteId == siteId)
|
||||
.ToListAsync();
|
||||
Assert.Equal(4, rows.Count);
|
||||
Assert.DoesNotContain(rows, r => r.EventId == poisonId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tiny test double that delegates to a real repository but throws on a
|
||||
/// specified EventId. Used to verify per-row failure isolation: one bad
|
||||
/// row must not cause the rest of the batch to be lost.
|
||||
/// </summary>
|
||||
private sealed class ThrowingRepository : IAuditLogRepository
|
||||
{
|
||||
private readonly IAuditLogRepository _inner;
|
||||
private readonly Guid _poisonId;
|
||||
|
||||
public ThrowingRepository(IAuditLogRepository inner, Guid poisonId)
|
||||
{
|
||||
_inner = inner;
|
||||
_poisonId = poisonId;
|
||||
}
|
||||
|
||||
public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default)
|
||||
{
|
||||
if (evt.EventId == _poisonId)
|
||||
{
|
||||
throw new InvalidOperationException("simulated repo failure for poison row");
|
||||
}
|
||||
return _inner.InsertIfNotExistsAsync(evt, ct);
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<AuditEvent>> QueryAsync(
|
||||
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) =>
|
||||
_inner.QueryAsync(filter, paging, ct);
|
||||
|
||||
public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
|
||||
_inner.SwitchOutPartitionAsync(monthBoundary, ct);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NSubstitute;
|
||||
using ScadaLink.Commons.Messages.Audit;
|
||||
using ScadaLink.Communication.Grpc;
|
||||
|
||||
namespace ScadaLink.Communication.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Bundle D D2 tests for <see cref="SiteStreamGrpcServer.IngestAuditEvents"/>.
|
||||
/// Verifies the DTO→entity→actor→ack round-trip through the gRPC handler.
|
||||
/// A tiny <c>StubIngestActor</c> stands in for the central
|
||||
/// <c>AuditLogIngestActor</c>, replying with the EventIds it received so the
|
||||
/// test asserts the wiring without depending on MSSQL.
|
||||
/// </summary>
|
||||
public class SiteStreamIngestAuditEventsTests : TestKit
|
||||
{
|
||||
private readonly ISiteStreamSubscriber _subscriber = Substitute.For<ISiteStreamSubscriber>();
|
||||
|
||||
private SiteStreamGrpcServer CreateServer() =>
|
||||
new(_subscriber, NullLogger<SiteStreamGrpcServer>.Instance);
|
||||
|
||||
private static ServerCallContext NewContext(CancellationToken ct = default)
|
||||
{
|
||||
var context = Substitute.For<ServerCallContext>();
|
||||
context.CancellationToken.Returns(ct);
|
||||
return context;
|
||||
}
|
||||
|
||||
private static AuditEventDto NewDto(Guid? id = null) => new()
|
||||
{
|
||||
EventId = (id ?? Guid.NewGuid()).ToString(),
|
||||
OccurredAtUtc = Timestamp.FromDateTime(
|
||||
DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 0, 0), DateTimeKind.Utc)),
|
||||
Channel = "ApiOutbound",
|
||||
Kind = "ApiCall",
|
||||
Status = "Delivered",
|
||||
SourceSiteId = "site-1",
|
||||
};
|
||||
|
||||
[Fact]
|
||||
public async Task IngestAuditEvents_With_AuditIngestActor_Routes_To_Actor_Returns_Reply()
|
||||
{
|
||||
// Arrange — a stub actor that echoes every received EventId back.
|
||||
var stubActor = Sys.ActorOf(Props.Create(() => new EchoIngestActor()));
|
||||
|
||||
var server = CreateServer();
|
||||
server.SetAuditIngestActor(stubActor);
|
||||
|
||||
// Build a 3-event batch.
|
||||
var dtos = Enumerable.Range(0, 3).Select(_ => NewDto()).ToList();
|
||||
var batch = new AuditEventBatch();
|
||||
batch.Events.AddRange(dtos);
|
||||
|
||||
// Act
|
||||
var ack = await server.IngestAuditEvents(batch, NewContext());
|
||||
|
||||
// Assert — every dto's id appears in the ack, demonstrating end-to-
|
||||
// end routing through the actor.
|
||||
Assert.Equal(3, ack.AcceptedEventIds.Count);
|
||||
var expectedIds = dtos.Select(d => d.EventId).ToHashSet();
|
||||
Assert.True(expectedIds.SetEquals(ack.AcceptedEventIds.ToHashSet()));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IngestAuditEvents_NoActor_Wired_ReturnsEmptyAck()
|
||||
{
|
||||
var server = CreateServer();
|
||||
// Intentionally do NOT call SetAuditIngestActor — simulates host
|
||||
// startup race window.
|
||||
|
||||
var batch = new AuditEventBatch();
|
||||
batch.Events.Add(NewDto());
|
||||
|
||||
var ack = await server.IngestAuditEvents(batch, NewContext());
|
||||
|
||||
Assert.Empty(ack.AcceptedEventIds);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tiny ReceiveActor that echoes every EventId in an incoming
|
||||
/// <see cref="IngestAuditEventsCommand"/> back as an
|
||||
/// <see cref="IngestAuditEventsReply"/>. Stands in for the central
|
||||
/// AuditLogIngestActor so this test never touches MSSQL.
|
||||
/// </summary>
|
||||
private sealed class EchoIngestActor : ReceiveActor
|
||||
{
|
||||
public EchoIngestActor()
|
||||
{
|
||||
Receive<IngestAuditEventsCommand>(cmd =>
|
||||
{
|
||||
var ids = cmd.Events.Select(e => e.EventId).ToList();
|
||||
Sender.Tell(new IngestAuditEventsReply(ids));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user