test: add proto contract, cleanup verification, and regression guardrail tests
This commit is contained in:
@@ -0,0 +1,129 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Grpc.Core;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NSubstitute;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Tests.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Verifies that after a gRPC stream is cancelled, the SiteStreamManager
|
||||||
|
/// subscription is properly cleaned up with no leaked subscriptions.
|
||||||
|
/// </summary>
|
||||||
|
public class CleanupVerificationTests : TestKit
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Stream_Cancellation_CleansUp_SiteStreamManager_Subscription()
|
||||||
|
{
|
||||||
|
// Arrange: create server with mock subscriber that tracks subscribe/remove calls
|
||||||
|
var subscriber = Substitute.For<ISiteStreamSubscriber>();
|
||||||
|
var subscribeCalled = false;
|
||||||
|
var removeCalled = false;
|
||||||
|
IActorRef? subscribedActor = null;
|
||||||
|
|
||||||
|
subscriber.Subscribe(Arg.Any<string>(), Arg.Any<IActorRef>())
|
||||||
|
.Returns(ci =>
|
||||||
|
{
|
||||||
|
subscribeCalled = true;
|
||||||
|
subscribedActor = ci.Arg<IActorRef>();
|
||||||
|
return "sub-cleanup-test";
|
||||||
|
});
|
||||||
|
|
||||||
|
subscriber.When(x => x.RemoveSubscriber(Arg.Any<IActorRef>()))
|
||||||
|
.Do(_ => removeCalled = true);
|
||||||
|
|
||||||
|
var logger = NullLogger<SiteStreamGrpcServer>.Instance;
|
||||||
|
var server = new SiteStreamGrpcServer(subscriber, logger);
|
||||||
|
server.SetReady(Sys);
|
||||||
|
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
var context = Substitute.For<ServerCallContext>();
|
||||||
|
context.CancellationToken.Returns(cts.Token);
|
||||||
|
|
||||||
|
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||||
|
|
||||||
|
var request = new InstanceStreamRequest
|
||||||
|
{
|
||||||
|
CorrelationId = "corr-cleanup-verify",
|
||||||
|
InstanceUniqueName = "Site1.TestInst"
|
||||||
|
};
|
||||||
|
|
||||||
|
// Act: start a stream, wait for it to register, then cancel
|
||||||
|
var streamTask = Task.Run(() => server.SubscribeInstance(request, writer, context));
|
||||||
|
|
||||||
|
await WaitForConditionAsync(() => subscribeCalled);
|
||||||
|
Assert.True(subscribeCalled, "Subscribe should have been called");
|
||||||
|
Assert.Equal(1, server.ActiveStreamCount);
|
||||||
|
|
||||||
|
cts.Cancel();
|
||||||
|
await streamTask;
|
||||||
|
|
||||||
|
// Assert: verify cleanup
|
||||||
|
Assert.True(removeCalled, "RemoveSubscriber should have been called after cancellation");
|
||||||
|
Assert.Equal(0, server.ActiveStreamCount);
|
||||||
|
|
||||||
|
// Verify the same actor that was subscribed is the one that was removed
|
||||||
|
subscriber.Received(1).RemoveSubscriber(subscribedActor!);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Multiple_Streams_Cancelled_AllCleanedUp()
|
||||||
|
{
|
||||||
|
var subscriber = Substitute.For<ISiteStreamSubscriber>();
|
||||||
|
var removeCount = 0;
|
||||||
|
|
||||||
|
subscriber.Subscribe(Arg.Any<string>(), Arg.Any<IActorRef>())
|
||||||
|
.Returns("sub-multi");
|
||||||
|
|
||||||
|
subscriber.When(x => x.RemoveSubscriber(Arg.Any<IActorRef>()))
|
||||||
|
.Do(_ => Interlocked.Increment(ref removeCount));
|
||||||
|
|
||||||
|
var logger = NullLogger<SiteStreamGrpcServer>.Instance;
|
||||||
|
var server = new SiteStreamGrpcServer(subscriber, logger);
|
||||||
|
server.SetReady(Sys);
|
||||||
|
|
||||||
|
// Start 3 streams
|
||||||
|
var ctsList = new List<CancellationTokenSource>();
|
||||||
|
var tasks = new List<Task>();
|
||||||
|
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
{
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
ctsList.Add(cts);
|
||||||
|
var ctx = Substitute.For<ServerCallContext>();
|
||||||
|
ctx.CancellationToken.Returns(cts.Token);
|
||||||
|
var w = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||||
|
|
||||||
|
var req = new InstanceStreamRequest
|
||||||
|
{
|
||||||
|
CorrelationId = $"corr-multi-{i}",
|
||||||
|
InstanceUniqueName = $"Site1.Inst{i}"
|
||||||
|
};
|
||||||
|
|
||||||
|
tasks.Add(Task.Run(() => server.SubscribeInstance(req, w, ctx)));
|
||||||
|
}
|
||||||
|
|
||||||
|
await WaitForConditionAsync(() => server.ActiveStreamCount == 3);
|
||||||
|
|
||||||
|
// Cancel all
|
||||||
|
foreach (var cts in ctsList)
|
||||||
|
cts.Cancel();
|
||||||
|
|
||||||
|
await Task.WhenAll(tasks);
|
||||||
|
|
||||||
|
Assert.Equal(0, server.ActiveStreamCount);
|
||||||
|
Assert.Equal(3, removeCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task WaitForConditionAsync(Func<bool> condition, int timeoutMs = 5000)
|
||||||
|
{
|
||||||
|
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
|
||||||
|
while (!condition() && DateTime.UtcNow < deadline)
|
||||||
|
{
|
||||||
|
await Task.Delay(25);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.True(condition(), $"Condition not met within {timeoutMs}ms");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,51 @@
|
|||||||
|
using System.Reflection;
|
||||||
|
using ScadaLink.Communication.Actors;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Tests.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Regression tests ensuring that the old ClusterClient-based debug streaming
|
||||||
|
/// path is not reintroduced. Debug streaming now flows through gRPC.
|
||||||
|
///
|
||||||
|
/// Note: The DebugStreamEvent type-does-not-exist check lives in
|
||||||
|
/// ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs and is not
|
||||||
|
/// duplicated here.
|
||||||
|
/// </summary>
|
||||||
|
public class NoClusterClientStreamingRegressionTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void CentralCommunicationActor_DoesNotHave_HandleDebugStreamEvent()
|
||||||
|
{
|
||||||
|
var type = typeof(CentralCommunicationActor);
|
||||||
|
var method = type.GetMethod("HandleDebugStreamEvent",
|
||||||
|
BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public);
|
||||||
|
Assert.Null(method);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void SiteCommunicationActor_DoesNotHave_HandleDebugStreamEvent()
|
||||||
|
{
|
||||||
|
var type = typeof(SiteCommunicationActor);
|
||||||
|
var method = type.GetMethod("HandleDebugStreamEvent",
|
||||||
|
BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public);
|
||||||
|
Assert.Null(method);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CentralCommunicationActor_DoesNotHave_ForwardDebugStreamEvent()
|
||||||
|
{
|
||||||
|
var type = typeof(CentralCommunicationActor);
|
||||||
|
var method = type.GetMethod("ForwardDebugStreamEvent",
|
||||||
|
BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public);
|
||||||
|
Assert.Null(method);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Communication_Assembly_DoesNotContain_DebugStreamEvent_Type()
|
||||||
|
{
|
||||||
|
// DebugStreamEvent should not exist in the Communication assembly either
|
||||||
|
var assembly = typeof(CentralCommunicationActor).Assembly;
|
||||||
|
var type = assembly.GetTypes().FirstOrDefault(t => t.Name == "DebugStreamEvent");
|
||||||
|
Assert.Null(type);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,79 @@
|
|||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Tests.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Guardrail tests that verify all oneof variants in SiteStreamEvent have
|
||||||
|
/// corresponding conversion handlers. Adding a new proto field without
|
||||||
|
/// implementing the conversion will cause these tests to fail.
|
||||||
|
/// </summary>
|
||||||
|
public class ProtoContractTests
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// The set of EventOneofCase values we handle in ConvertToDomainEvent.
|
||||||
|
/// Update this array when adding a new oneof variant.
|
||||||
|
/// </summary>
|
||||||
|
private static readonly SiteStreamEvent.EventOneofCase[] HandledCases =
|
||||||
|
[
|
||||||
|
SiteStreamEvent.EventOneofCase.AttributeChanged,
|
||||||
|
SiteStreamEvent.EventOneofCase.AlarmChanged
|
||||||
|
];
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AllOneofVariants_HaveConversionHandlers()
|
||||||
|
{
|
||||||
|
var allCases = System.Enum.GetValues<SiteStreamEvent.EventOneofCase>()
|
||||||
|
.Where(c => c != SiteStreamEvent.EventOneofCase.None)
|
||||||
|
.ToArray();
|
||||||
|
|
||||||
|
Assert.Equal(allCases.Length, HandledCases.Length);
|
||||||
|
foreach (var c in allCases)
|
||||||
|
Assert.Contains(c, HandledCases);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(SiteStreamEvent.EventOneofCase.AttributeChanged)]
|
||||||
|
[InlineData(SiteStreamEvent.EventOneofCase.AlarmChanged)]
|
||||||
|
public void ConvertToDomainEvent_HandlesAllOneofVariants(SiteStreamEvent.EventOneofCase eventCase)
|
||||||
|
{
|
||||||
|
var evt = CreateTestEvent(eventCase);
|
||||||
|
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
|
||||||
|
Assert.NotNull(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SiteStreamEvent CreateTestEvent(SiteStreamEvent.EventOneofCase eventCase)
|
||||||
|
{
|
||||||
|
var ts = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
return eventCase switch
|
||||||
|
{
|
||||||
|
SiteStreamEvent.EventOneofCase.AttributeChanged => new SiteStreamEvent
|
||||||
|
{
|
||||||
|
CorrelationId = "test",
|
||||||
|
AttributeChanged = new AttributeValueUpdate
|
||||||
|
{
|
||||||
|
InstanceUniqueName = "Site1.Inst1",
|
||||||
|
AttributePath = "Path",
|
||||||
|
AttributeName = "Attr",
|
||||||
|
Value = "42",
|
||||||
|
Quality = Quality.Good,
|
||||||
|
Timestamp = ts
|
||||||
|
}
|
||||||
|
},
|
||||||
|
SiteStreamEvent.EventOneofCase.AlarmChanged => new SiteStreamEvent
|
||||||
|
{
|
||||||
|
CorrelationId = "test",
|
||||||
|
AlarmChanged = new AlarmStateUpdate
|
||||||
|
{
|
||||||
|
InstanceUniqueName = "Site1.Inst1",
|
||||||
|
AlarmName = "HighTemp",
|
||||||
|
State = AlarmStateEnum.AlarmStateActive,
|
||||||
|
Priority = 1,
|
||||||
|
Timestamp = ts
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => throw new ArgumentOutOfRangeException(nameof(eventCase), eventCase, "Unhandled event case")
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user