464 lines
18 KiB
C#
464 lines
18 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Options;
|
|
using ScadaLink.Commons.Messages.Audit;
|
|
using ScadaLink.Commons.Messages.Deployment;
|
|
using ScadaLink.Commons.Messages.Notification;
|
|
using ScadaLink.Commons.Types.Audit;
|
|
using ScadaLink.Commons.Types.Notifications;
|
|
|
|
namespace ScadaLink.Communication.Tests;
|
|
|
|
/// <summary>
|
|
/// WP-2: Tests for CommunicationService initialization and state.
|
|
/// </summary>
|
|
public class CommunicationServiceTests : TestKit
|
|
{
|
|
[Fact]
|
|
public async Task BeforeInitialization_ThrowsOnUsage()
|
|
{
|
|
var options = Options.Create(new CommunicationOptions());
|
|
var logger = NullLogger<CommunicationService>.Instance;
|
|
var service = new CommunicationService(options, logger);
|
|
|
|
// CommunicationService requires SetCommunicationActor before use
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.DeployInstanceAsync("site1",
|
|
new DeployInstanceCommand(
|
|
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow)));
|
|
}
|
|
|
|
[Fact]
|
|
public void UnsubscribeDebugView_IsTellNotAsk()
|
|
{
|
|
// Verify the method signature is void (fire-and-forget Tell pattern)
|
|
var method = typeof(CommunicationService).GetMethod("UnsubscribeDebugView");
|
|
Assert.NotNull(method);
|
|
Assert.Equal(typeof(void), method!.ReturnType);
|
|
}
|
|
|
|
// ── DeploymentManager-006: query-the-site-before-redeploy ──
|
|
|
|
[Fact]
|
|
public async Task QueryDeploymentStateAsync_BeforeInitialization_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.QueryDeploymentStateAsync("site1",
|
|
new DeploymentStateQueryRequest("corr-1", "inst1", DateTimeOffset.UtcNow)));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task QueryDeploymentStateAsync_SendsEnvelopeAndReturnsResponse()
|
|
{
|
|
// The query must be dispatched as a SiteEnvelope over the existing
|
|
// command/control transport, exactly like other site-directed commands,
|
|
// and the typed response returned to the caller.
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
// A probe stands in for CentralCommunicationActor: it asserts the
|
|
// envelope shape and replies with a typed response.
|
|
var commActor = Sys.ActorOf(Props.Create(() => new EchoStateQueryActor()));
|
|
service.SetCommunicationActor(commActor);
|
|
|
|
var request = new DeploymentStateQueryRequest("corr-9", "QueriedInst", DateTimeOffset.UtcNow);
|
|
var response = await service.QueryDeploymentStateAsync("site-a", request);
|
|
|
|
Assert.Equal("corr-9", response.CorrelationId);
|
|
Assert.Equal("QueriedInst", response.InstanceUniqueName);
|
|
Assert.True(response.IsDeployed);
|
|
Assert.Equal("sha256:applied", response.AppliedRevisionHash);
|
|
}
|
|
|
|
// ── Notification Outbox: central-side outbox actor calls ──
|
|
|
|
[Fact]
|
|
public async Task QueryNotificationOutboxAsync_BeforeOutboxSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.QueryNotificationOutboxAsync(
|
|
new NotificationOutboxQueryRequest(
|
|
"corr-1", null, null, null, null, false, null, null, null, 1, 50)));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetryNotificationAsync_BeforeOutboxSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.RetryNotificationAsync(new RetryNotificationRequest("corr-1", "n1")));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DiscardNotificationAsync_BeforeOutboxSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.DiscardNotificationAsync(new DiscardNotificationRequest("corr-1", "n1")));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetNotificationKpisAsync_BeforeOutboxSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.GetNotificationKpisAsync(new NotificationKpiRequest("corr-1")));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task QueryNotificationOutboxAsync_AsksOutboxProxyDirectly()
|
|
{
|
|
// The outbox actor is central-local: the request must be Asked directly
|
|
// to the outbox proxy (no SiteEnvelope wrapping).
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetNotificationOutbox(probe.Ref);
|
|
|
|
var request = new NotificationOutboxQueryRequest(
|
|
"corr-q", "Pending", null, null, null, true, "alarm", null, null, 2, 25);
|
|
var task = service.QueryNotificationOutboxAsync(request);
|
|
|
|
var received = probe.ExpectMsg<NotificationOutboxQueryRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new NotificationOutboxQueryResponse(
|
|
"corr-q", true, null, Array.Empty<NotificationSummary>(), 0);
|
|
probe.Reply(reply);
|
|
|
|
Assert.Same(reply, await task);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetryNotificationAsync_AsksOutboxProxyDirectly()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetNotificationOutbox(probe.Ref);
|
|
|
|
var request = new RetryNotificationRequest("corr-r", "n-7");
|
|
var task = service.RetryNotificationAsync(request);
|
|
|
|
var received = probe.ExpectMsg<RetryNotificationRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new RetryNotificationResponse("corr-r", true, null);
|
|
probe.Reply(reply);
|
|
|
|
Assert.Same(reply, await task);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DiscardNotificationAsync_AsksOutboxProxyDirectly()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetNotificationOutbox(probe.Ref);
|
|
|
|
var request = new DiscardNotificationRequest("corr-d", "n-9");
|
|
var task = service.DiscardNotificationAsync(request);
|
|
|
|
var received = probe.ExpectMsg<DiscardNotificationRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new DiscardNotificationResponse("corr-d", false, "already delivered");
|
|
probe.Reply(reply);
|
|
|
|
var result = await task;
|
|
Assert.Same(reply, result);
|
|
Assert.False(result.Success);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetNotificationKpisAsync_AsksOutboxProxyDirectly()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetNotificationOutbox(probe.Ref);
|
|
|
|
var request = new NotificationKpiRequest("corr-k");
|
|
var task = service.GetNotificationKpisAsync(request);
|
|
|
|
var received = probe.ExpectMsg<NotificationKpiRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new NotificationKpiResponse("corr-k", true, null, 3, 1, 0, 12, TimeSpan.FromMinutes(5));
|
|
probe.Reply(reply);
|
|
|
|
var result = await task;
|
|
Assert.Same(reply, result);
|
|
Assert.Equal(3, result.QueueDepth);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetPerSiteNotificationKpisAsync_AsksOutboxProxyDirectly()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetNotificationOutbox(probe.Ref);
|
|
|
|
var request = new PerSiteNotificationKpiRequest("corr-ps");
|
|
var task = service.GetPerSiteNotificationKpisAsync(request);
|
|
|
|
var received = probe.ExpectMsg<PerSiteNotificationKpiRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new PerSiteNotificationKpiResponse(
|
|
"corr-ps", true, null,
|
|
new[] { new SiteNotificationKpiSnapshot("plant-a", 2, 0, 0, 5, null) });
|
|
probe.Reply(reply);
|
|
|
|
var result = await task;
|
|
Assert.Same(reply, result);
|
|
Assert.True(result.Success);
|
|
Assert.Single(result.Sites);
|
|
Assert.Equal("plant-a", result.Sites[0].SourceSiteId);
|
|
}
|
|
|
|
// ── Site Call Audit: central-side audit actor calls ──
|
|
|
|
[Fact]
|
|
public async Task QuerySiteCallsAsync_BeforeSiteCallAuditSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.QuerySiteCallsAsync(new SiteCallQueryRequest(
|
|
"corr-1", null, null, null, null, false, null, null, null, null, 50)));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetSiteCallKpisAsync_BeforeSiteCallAuditSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.GetSiteCallKpisAsync(new SiteCallKpiRequest("corr-1")));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetSiteCallDetailAsync_BeforeSiteCallAuditSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.GetSiteCallDetailAsync(new SiteCallDetailRequest("corr-1", Guid.NewGuid())));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetPerSiteSiteCallKpisAsync_BeforeSiteCallAuditSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.GetPerSiteSiteCallKpisAsync(new PerSiteSiteCallKpiRequest("corr-1")));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task QuerySiteCallsAsync_AsksSiteCallAuditProxyDirectly()
|
|
{
|
|
// The Site Call Audit actor is central-local: the request must be Asked
|
|
// directly to its proxy (no SiteEnvelope wrapping).
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetSiteCallAudit(probe.Ref);
|
|
|
|
var request = new SiteCallQueryRequest(
|
|
"corr-q", "Parked", "plant-a", "ApiOutbound", "ERP.GetOrder", true,
|
|
null, null, null, null, 25);
|
|
var task = service.QuerySiteCallsAsync(request);
|
|
|
|
var received = probe.ExpectMsg<SiteCallQueryRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new SiteCallQueryResponse(
|
|
"corr-q", true, null, Array.Empty<SiteCallSummary>(), null, null);
|
|
probe.Reply(reply);
|
|
|
|
Assert.Same(reply, await task);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetSiteCallDetailAsync_AsksSiteCallAuditProxyDirectly()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetSiteCallAudit(probe.Ref);
|
|
|
|
var request = new SiteCallDetailRequest("corr-d", Guid.NewGuid());
|
|
var task = service.GetSiteCallDetailAsync(request);
|
|
|
|
var received = probe.ExpectMsg<SiteCallDetailRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new SiteCallDetailResponse("corr-d", false, "site call not found", null);
|
|
probe.Reply(reply);
|
|
|
|
var result = await task;
|
|
Assert.Same(reply, result);
|
|
Assert.False(result.Success);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetSiteCallKpisAsync_AsksSiteCallAuditProxyDirectly()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetSiteCallAudit(probe.Ref);
|
|
|
|
var request = new SiteCallKpiRequest("corr-k");
|
|
var task = service.GetSiteCallKpisAsync(request);
|
|
|
|
var received = probe.ExpectMsg<SiteCallKpiRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new SiteCallKpiResponse(
|
|
"corr-k", true, null, 4, 1, 2, 9, TimeSpan.FromMinutes(7), 1);
|
|
probe.Reply(reply);
|
|
|
|
var result = await task;
|
|
Assert.Same(reply, result);
|
|
Assert.Equal(4, result.BufferedCount);
|
|
Assert.Equal(1, result.StuckCount);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetPerSiteSiteCallKpisAsync_AsksSiteCallAuditProxyDirectly()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetSiteCallAudit(probe.Ref);
|
|
|
|
var request = new PerSiteSiteCallKpiRequest("corr-ps");
|
|
var task = service.GetPerSiteSiteCallKpisAsync(request);
|
|
|
|
var received = probe.ExpectMsg<PerSiteSiteCallKpiRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new PerSiteSiteCallKpiResponse(
|
|
"corr-ps", true, null,
|
|
new[] { new SiteCallSiteKpiSnapshot("plant-a", 3, 0, 0, 5, null, 0) });
|
|
probe.Reply(reply);
|
|
|
|
var result = await task;
|
|
Assert.Same(reply, result);
|
|
Assert.True(result.Success);
|
|
Assert.Single(result.Sites);
|
|
Assert.Equal("plant-a", result.Sites[0].SourceSite);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetrySiteCallAsync_BeforeSiteCallAuditSet_Throws()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
|
|
await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.RetrySiteCallAsync(new RetrySiteCallRequest("corr-1", Guid.NewGuid(), "plant-a")));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetrySiteCallAsync_AsksSiteCallAuditProxyDirectly()
|
|
{
|
|
// The relay is initiated by Asking the central-local Site Call Audit
|
|
// proxy directly (no SiteEnvelope wrapping at this layer — the actor
|
|
// does the site routing itself).
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetSiteCallAudit(probe.Ref);
|
|
|
|
var request = new RetrySiteCallRequest("corr-r", Guid.NewGuid(), "plant-a");
|
|
var task = service.RetrySiteCallAsync(request);
|
|
|
|
var received = probe.ExpectMsg<RetrySiteCallRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new RetrySiteCallResponse(
|
|
"corr-r", SiteCallRelayOutcome.Applied, true, true, null);
|
|
probe.Reply(reply);
|
|
|
|
Assert.Same(reply, await task);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DiscardSiteCallAsync_AsksSiteCallAuditProxyDirectly()
|
|
{
|
|
var service = new CommunicationService(
|
|
Options.Create(new CommunicationOptions()),
|
|
NullLogger<CommunicationService>.Instance);
|
|
var probe = CreateTestProbe();
|
|
service.SetSiteCallAudit(probe.Ref);
|
|
|
|
var request = new DiscardSiteCallRequest("corr-d", Guid.NewGuid(), "plant-a");
|
|
var task = service.DiscardSiteCallAsync(request);
|
|
|
|
var received = probe.ExpectMsg<DiscardSiteCallRequest>();
|
|
Assert.Same(request, received);
|
|
var reply = new DiscardSiteCallResponse(
|
|
"corr-d", SiteCallRelayOutcome.SiteUnreachable, false, false, "unreachable");
|
|
probe.Reply(reply);
|
|
|
|
var result = await task;
|
|
Assert.Same(reply, result);
|
|
Assert.False(result.SiteReachable);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Stand-in for CentralCommunicationActor: verifies the message is wrapped
|
|
/// in a SiteEnvelope targeting the requested site and replies with a typed
|
|
/// DeploymentStateQueryResponse.
|
|
/// </summary>
|
|
private class EchoStateQueryActor : ReceiveActor
|
|
{
|
|
public EchoStateQueryActor()
|
|
{
|
|
Receive<SiteEnvelope>(env =>
|
|
{
|
|
if (env is { SiteId: "site-a", Message: DeploymentStateQueryRequest req })
|
|
{
|
|
Sender.Tell(new DeploymentStateQueryResponse(
|
|
req.CorrelationId, req.InstanceUniqueName, true,
|
|
"dep-applied", "sha256:applied", DateTimeOffset.UtcNow));
|
|
}
|
|
});
|
|
}
|
|
}
|
|
}
|