Files
ScadaBridge/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs
T
Joseph Doherty 487859bff0 docs+code: close Theme 1 — 24 design-doc / XML-doc drift findings
Doc/XML-comment drift + small adherence fixes across 17 modules. Highlights:
- Host-017: site CoordinatedShutdown ordering — SiteStreamGrpcServer gains
  CancelAllStreams() (refuse new streams, cancel active), wired into
  Program.cs site branch via ApplicationStopping.
- InboundAPI-021: ParentExecutionId now travels on RouteToGet/SetAttributes
  symmetric with RouteToCallRequest; RouteHelper stamps from _parentExecutionId.
- ClusterInfra-012: ClusterOptionsValidator now requires both seed nodes.
- Comm-018: SiteCommunicationActor.HeartbeatMessage.IsActive derived from
  cluster leader check (was hardcoded true).
- DM-020: reconciliation audit row attributes the current user, not prior deployer.
- SEL-019: EventLogPurgeService early-exits on standby via active-node check.
- Plus comment/XML-doc accuracy fixes across AuditLog, ConfigurationDatabase,
  NotificationOutbox, SiteRuntime, SiteCallAudit; doc refreshes for Component-
  Commons / -ManagementService / -CLI / -ExternalSystemGateway / -HealthMonitoring
  / -Transport / -ConfigurationDatabase; CD-023 index-name doc alignment.

11 new regression tests (RouteHelper x4, SiteStreamGrpcServer x2,
ClusterOptionsValidator x1, SiteCommunicationActor x1, DeploymentService x1,
EventLogPurgeService x3). Build clean (0 warnings); InboundAPI/Communication/
Host suites all green. README regenerated: 112 open (was 136).
2026-05-28 06:28:31 -04:00

339 lines
12 KiB
C#

using System.Threading.Channels;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Grpc;
public class SiteStreamGrpcServerTests : TestKit
{
private readonly ISiteStreamSubscriber _subscriber;
private readonly ILogger<SiteStreamGrpcServer> _logger;
public SiteStreamGrpcServerTests()
{
_subscriber = Substitute.For<ISiteStreamSubscriber>();
_subscriber.Subscribe(Arg.Any<string>(), Arg.Any<IActorRef>())
.Returns("sub-1");
_logger = NullLogger<SiteStreamGrpcServer>.Instance;
}
private SiteStreamGrpcServer CreateServer(int maxStreams = 100)
{
return new SiteStreamGrpcServer(_subscriber, _logger, maxStreams);
}
private static InstanceStreamRequest MakeRequest(string correlationId = "corr-1", string instance = "Site1.Pump01")
{
return new InstanceStreamRequest
{
CorrelationId = correlationId,
InstanceUniqueName = instance
};
}
[Fact]
public async Task RejectsWhenNotReady()
{
var server = CreateServer();
// Do NOT call SetReady()
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var context = CreateMockContext();
var ex = await Assert.ThrowsAsync<RpcException>(
() => server.SubscribeInstance(MakeRequest(), writer, context));
Assert.Equal(StatusCode.Unavailable, ex.StatusCode);
}
[Fact]
public async Task RejectsWhenMaxStreamsReached()
{
var server = CreateServer(maxStreams: 1);
server.SetReady(Sys);
// Start one stream that blocks
var cts1 = new CancellationTokenSource();
var context1 = CreateMockContext(cts1.Token);
var writer1 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var stream1Task = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-1"), writer1, context1));
// Wait for the first stream to register
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Second stream should be rejected
var writer2 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var context2 = CreateMockContext();
var ex = await Assert.ThrowsAsync<RpcException>(
() => server.SubscribeInstance(MakeRequest("corr-2"), writer2, context2));
Assert.Equal(StatusCode.ResourceExhausted, ex.StatusCode);
// Clean up first stream
cts1.Cancel();
await stream1Task;
}
[Fact]
public async Task CancelsDuplicateCorrelationId()
{
var server = CreateServer();
server.SetReady(Sys);
var cts1 = new CancellationTokenSource();
var context1 = CreateMockContext(cts1.Token);
var writer1 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
// Start first stream
var stream1Task = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-dup"), writer1, context1));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Start second stream with same correlationId -- should cancel first
var cts2 = new CancellationTokenSource();
var context2 = CreateMockContext(cts2.Token);
var writer2 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var stream2Task = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-dup"), writer2, context2));
// First stream should complete (cancelled by duplicate replacement)
await stream1Task;
// Second stream should be active
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Clean up
cts2.Cancel();
await stream2Task;
}
[Fact]
public async Task CleansUpOnCancellation()
{
var server = CreateServer();
server.SetReady(Sys);
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-cleanup"), writer, context));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
cts.Cancel();
await streamTask;
Assert.Equal(0, server.ActiveStreamCount);
}
// --- Host-017 / REQ-HOST-7: site-shutdown ordering ---
[Fact]
public async Task Host017_CancelAllStreams_CancelsActiveStreamsAndRefusesNewOnes()
{
// REQ-HOST-7 step (1)+(2): on CoordinatedShutdown the gRPC server must
// stop accepting new streams AND cancel every active stream so the
// client observes a clean Cancelled (not a silent stream that only
// times out via keepalive). Program.cs registers
// ApplicationStopping → CancelAllStreams(); this test exercises the
// server-side guarantee in isolation.
var server = CreateServer();
server.SetReady(Sys);
var cts1 = new CancellationTokenSource();
var context1 = CreateMockContext(cts1.Token);
var writer1 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var stream1Task = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-shutdown-1"), writer1, context1));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Begin shutdown — flip the flag AND cancel the active stream.
server.CancelAllStreams();
Assert.True(server.IsShuttingDown);
// Active stream's await foreach observes OCE and falls through finally
// → entry is removed from _activeStreams.
await stream1Task;
Assert.Equal(0, server.ActiveStreamCount);
// A second SubscribeInstance after shutdown is refused immediately
// with Unavailable rather than allowed to register a new stream.
var writer2 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var context2 = CreateMockContext();
var ex = await Assert.ThrowsAsync<RpcException>(
() => server.SubscribeInstance(MakeRequest("corr-shutdown-2"), writer2, context2));
Assert.Equal(StatusCode.Unavailable, ex.StatusCode);
Assert.Contains("shutting", ex.Status.Detail, StringComparison.OrdinalIgnoreCase);
}
[Fact]
public void Host017_CancelAllStreams_IsIdempotent()
{
// Repeated calls during a double-fire shutdown sequence must not throw.
var server = CreateServer();
server.SetReady(Sys);
server.CancelAllStreams();
server.CancelAllStreams();
Assert.True(server.IsShuttingDown);
Assert.Equal(0, server.ActiveStreamCount);
}
[Fact]
public async Task SubscribesAndRemovesFromStreamManager()
{
var server = CreateServer();
server.SetReady(Sys);
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-sub", "Site1.Motor01"), writer, context));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Verify Subscribe was called
_subscriber.Received(1).Subscribe("Site1.Motor01", Arg.Any<IActorRef>());
cts.Cancel();
await streamTask;
// Verify RemoveSubscriber was called
_subscriber.Received(1).RemoveSubscriber(Arg.Any<IActorRef>());
}
[Fact]
public async Task WritesEventsToResponseStream()
{
var server = CreateServer();
server.SetReady(Sys);
// Capture the relay actor so we can send it events
IActorRef? capturedActor = null;
_subscriber.Subscribe(Arg.Any<string>(), Arg.Any<IActorRef>())
.Returns(ci =>
{
capturedActor = ci.Arg<IActorRef>();
return "sub-write";
});
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var writtenEvents = new List<SiteStreamEvent>();
writer.WriteAsync(Arg.Any<SiteStreamEvent>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask)
.AndDoes(ci => writtenEvents.Add(ci.Arg<SiteStreamEvent>()));
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-write", "Site1.Pump01"), writer, context));
await WaitForConditionAsync(() => capturedActor != null);
// Send a domain event to the relay actor
var ts = DateTimeOffset.UtcNow;
capturedActor!.Tell(new Commons.Messages.Streaming.AttributeValueChanged(
"Site1.Pump01", "Path", "Attr", 99.5, "Good", ts));
// Wait for event to be written
await WaitForConditionAsync(() => writtenEvents.Count >= 1);
Assert.Single(writtenEvents);
Assert.Equal("corr-write", writtenEvents[0].CorrelationId);
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, writtenEvents[0].EventCase);
cts.Cancel();
await streamTask;
}
[Theory]
[InlineData("corr/with/slash")]
[InlineData("corr with space")]
[InlineData("")]
[InlineData("$weird")]
public async Task RejectsCorrelationIdThatIsNotActorNameSafe(string badCorrelationId)
{
// Communication-014 regression: a public gRPC SubscribeInstance must not feed
// an untrusted correlation_id straight into an Akka actor name. An unsafe id
// must be rejected cleanly with InvalidArgument rather than escaping as an
// unhandled InvalidActorNameException.
var server = CreateServer();
server.SetReady(Sys);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var context = CreateMockContext();
var ex = await Assert.ThrowsAsync<RpcException>(
() => server.SubscribeInstance(MakeRequest(badCorrelationId), writer, context));
Assert.Equal(StatusCode.InvalidArgument, ex.StatusCode);
Assert.Equal(0, server.ActiveStreamCount);
}
[Fact]
public async Task AcceptsActorNameSafeCorrelationId()
{
// A normal GUID-style correlation id (what central always supplies) is accepted.
var server = CreateServer();
server.SetReady(Sys);
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest(Guid.NewGuid().ToString()), writer, context));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
cts.Cancel();
await streamTask;
}
[Fact]
public void SetReady_AllowsStreamCreation()
{
var server = CreateServer();
// Initially not ready -- just verify the property works
server.SetReady(Sys);
// No assertion needed -- the other tests verify that SetReady enables streaming
Assert.Equal(0, server.ActiveStreamCount);
}
private static ServerCallContext CreateMockContext(CancellationToken cancellationToken = default)
{
var context = Substitute.For<ServerCallContext>();
context.CancellationToken.Returns(cancellationToken);
return context;
}
private static async Task WaitForConditionAsync(Func<bool> condition, int timeoutMs = 5000)
{
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
while (!condition() && DateTime.UtcNow < deadline)
{
await Task.Delay(25);
}
Assert.True(condition(), $"Condition not met within {timeoutMs}ms");
}
}