152 lines
5.6 KiB
C#
152 lines
5.6 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using NSubstitute;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces.Repositories;
|
|
using ScadaLink.Commons.Messages.Audit;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.Communication.Actors;
|
|
|
|
namespace ScadaLink.Communication.Tests;
|
|
|
|
/// <summary>
|
|
/// Tests for the Audit Log (#23) site→central ClusterClient ingest routing on
|
|
/// <see cref="CentralCommunicationActor"/>. A site ClusterClient delivers
|
|
/// <see cref="IngestAuditEventsCommand"/> / <see cref="IngestCachedTelemetryCommand"/>
|
|
/// to the receptionist-registered actor, which forwards to the registered
|
|
/// <c>AuditLogIngestActor</c> proxy and routes the reply back to the site.
|
|
/// Mirrors the NotificationSubmit / RegisterNotificationOutbox pattern.
|
|
/// </summary>
|
|
public class CentralCommunicationActorAuditTests : TestKit
|
|
{
|
|
public CentralCommunicationActorAuditTests() : base(@"akka.loglevel = DEBUG") { }
|
|
|
|
private IActorRef CreateActor(TimeSpan? auditIngestAskTimeout = null)
|
|
{
|
|
var mockRepo = Substitute.For<ISiteRepository>();
|
|
mockRepo.GetAllSitesAsync(Arg.Any<CancellationToken>())
|
|
.Returns(new List<Commons.Entities.Sites.Site>());
|
|
|
|
var services = new ServiceCollection();
|
|
services.AddScoped(_ => mockRepo);
|
|
var sp = services.BuildServiceProvider();
|
|
|
|
var mockFactory = Substitute.For<ISiteClientFactory>();
|
|
return Sys.ActorOf(Props.Create(() =>
|
|
new CentralCommunicationActor(sp, mockFactory, auditIngestAskTimeout)));
|
|
}
|
|
|
|
private static AuditEvent SampleAuditEvent() => new()
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = DateTime.UtcNow,
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.ApiCall,
|
|
Status = AuditStatus.Delivered,
|
|
};
|
|
|
|
private static SiteCall SampleSiteCall() => new()
|
|
{
|
|
TrackedOperationId = TrackedOperationId.New(),
|
|
Channel = "OutboundApi",
|
|
Target = "ExternalSystemA",
|
|
SourceSite = "site1",
|
|
Status = "Delivered",
|
|
RetryCount = 0,
|
|
CreatedAtUtc = DateTime.UtcNow,
|
|
UpdatedAtUtc = DateTime.UtcNow,
|
|
IngestedAtUtc = DateTime.UtcNow,
|
|
};
|
|
|
|
[Fact]
|
|
public void IngestAuditEventsCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender()
|
|
{
|
|
var actor = CreateActor();
|
|
var auditProbe = CreateTestProbe();
|
|
actor.Tell(new RegisterAuditIngest(auditProbe.Ref));
|
|
|
|
var evt = SampleAuditEvent();
|
|
var cmd = new IngestAuditEventsCommand(new[] { evt });
|
|
actor.Tell(cmd);
|
|
|
|
// The audit-ingest proxy receives the command, with the original site
|
|
// sender preserved (Forward semantics).
|
|
auditProbe.ExpectMsg(cmd);
|
|
|
|
// When the proxy replies, the actor routes it back to the original sender.
|
|
var reply = new IngestAuditEventsReply(new[] { evt.EventId });
|
|
auditProbe.Reply(reply);
|
|
|
|
var received = ExpectMsg<IngestAuditEventsReply>();
|
|
Assert.Equal(new[] { evt.EventId }, received.AcceptedEventIds);
|
|
}
|
|
|
|
[Fact]
|
|
public void IngestAuditEventsCommand_WithNoProxyRegistered_RepliesEmptyAcceptedEventIds()
|
|
{
|
|
var actor = CreateActor();
|
|
|
|
actor.Tell(new IngestAuditEventsCommand(new[] { SampleAuditEvent() }));
|
|
|
|
var reply = ExpectMsg<IngestAuditEventsReply>();
|
|
Assert.Empty(reply.AcceptedEventIds);
|
|
}
|
|
|
|
[Fact]
|
|
public void IngestAuditEventsCommand_WhenProxyNeverReplies_PipesStatusFailureToSender()
|
|
{
|
|
// A short test-only Ask timeout (constructor seam) keeps the test fast —
|
|
// production uses the 30 s default.
|
|
var actor = CreateActor(auditIngestAskTimeout: TimeSpan.FromMilliseconds(200));
|
|
var auditProbe = CreateTestProbe();
|
|
actor.Tell(new RegisterAuditIngest(auditProbe.Ref));
|
|
|
|
var cmd = new IngestAuditEventsCommand(new[] { SampleAuditEvent() });
|
|
actor.Tell(cmd);
|
|
|
|
// The proxy receives the command but deliberately never replies.
|
|
auditProbe.ExpectMsg(cmd);
|
|
|
|
// The Ask times out; PipeTo forwards the faulted task as a Status.Failure
|
|
// to the original sender. This is the real transient signal the site's
|
|
// own Ask faults on — it is NOT swallowed into an empty ack.
|
|
var failure = ExpectMsg<Status.Failure>();
|
|
Assert.IsType<AskTimeoutException>(failure.Cause);
|
|
}
|
|
|
|
[Fact]
|
|
public void IngestCachedTelemetryCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender()
|
|
{
|
|
var actor = CreateActor();
|
|
var auditProbe = CreateTestProbe();
|
|
actor.Tell(new RegisterAuditIngest(auditProbe.Ref));
|
|
|
|
var entry = new CachedTelemetryEntry(SampleAuditEvent(), SampleSiteCall());
|
|
var cmd = new IngestCachedTelemetryCommand(new[] { entry });
|
|
actor.Tell(cmd);
|
|
|
|
auditProbe.ExpectMsg(cmd);
|
|
|
|
var reply = new IngestCachedTelemetryReply(new[] { entry.Audit.EventId });
|
|
auditProbe.Reply(reply);
|
|
|
|
var received = ExpectMsg<IngestCachedTelemetryReply>();
|
|
Assert.Equal(new[] { entry.Audit.EventId }, received.AcceptedEventIds);
|
|
}
|
|
|
|
[Fact]
|
|
public void IngestCachedTelemetryCommand_WithNoProxyRegistered_RepliesEmptyAcceptedEventIds()
|
|
{
|
|
var actor = CreateActor();
|
|
|
|
var entry = new CachedTelemetryEntry(SampleAuditEvent(), SampleSiteCall());
|
|
actor.Tell(new IngestCachedTelemetryCommand(new[] { entry }));
|
|
|
|
var reply = ExpectMsg<IngestCachedTelemetryReply>();
|
|
Assert.Empty(reply.AcceptedEventIds);
|
|
}
|
|
}
|