using System.Threading.Channels;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.IntegrationTests.Grpc;
///
/// Integration tests for the gRPC streaming pipeline.
/// Tests the full in-process flow: subscribe request -> StreamRelayActor creation ->
/// domain events via Akka Tell -> Channel relay -> gRPC response stream writes.
///
/// These tests exercise the real SiteStreamGrpcServer, StreamRelayActor, and Channel
/// wiring together with a real Akka actor system, using only mocked gRPC transport
/// (IServerStreamWriter + ServerCallContext).
///
/// Full end-to-end gRPC-over-HTTP/2 tests are performed manually against the Docker
/// cluster (docker/deploy.sh + docker/seed-sites.sh + CLI debug-stream).
///
public class GrpcStreamIntegrationTests : TestKit
{
///
/// End-to-end pipeline test: subscribe -> relay actor receives domain events ->
/// events flow through Channel to gRPC response stream.
/// Validates attribute value changes arrive with correct protobuf mapping.
///
[Fact]
public async Task Pipeline_AttributeValueChanged_FlowsToResponseStream()
{
// Arrange: capture the relay actor created by the gRPC server
IActorRef? relayActor = null;
var subscriber = Substitute.For();
subscriber.Subscribe(Arg.Any(), Arg.Any())
.Returns(ci =>
{
relayActor = ci.Arg();
return "sub-integ-1";
});
var server = new SiteStreamGrpcServer(
subscriber,
NullLogger.Instance);
server.SetReady(Sys);
var writtenEvents = new List();
var writer = Substitute.For>();
writer.WriteAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask)
.AndDoes(ci => writtenEvents.Add(ci.Arg()));
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var request = new InstanceStreamRequest
{
CorrelationId = "integ-attr-1",
InstanceUniqueName = "SiteA.Pump01"
};
// Act: start the subscription stream
var streamTask = Task.Run(() => server.SubscribeInstance(request, writer, context));
await WaitForConditionAsync(() => relayActor != null);
// Send domain events to the relay actor (simulating what SiteStreamManager does)
var ts1 = new DateTimeOffset(2026, 3, 21, 14, 0, 0, TimeSpan.Zero);
var ts2 = new DateTimeOffset(2026, 3, 21, 14, 0, 1, TimeSpan.Zero);
relayActor!.Tell(new AttributeValueChanged(
"SiteA.Pump01", "Modules.Flow", "CurrentGPM", 125.3, "Good", ts1));
relayActor.Tell(new AttributeValueChanged(
"SiteA.Pump01", "Modules.Pressure", "CurrentPSI", 48.7, "Uncertain", ts2));
await WaitForConditionAsync(() => writtenEvents.Count >= 2);
// Cleanup
cts.Cancel();
await streamTask;
// Assert: both events arrived in order with correct protobuf mapping
Assert.Equal(2, writtenEvents.Count);
var evt1 = writtenEvents[0];
Assert.Equal("integ-attr-1", evt1.CorrelationId);
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, evt1.EventCase);
Assert.Equal("SiteA.Pump01", evt1.AttributeChanged.InstanceUniqueName);
Assert.Equal("Modules.Flow", evt1.AttributeChanged.AttributePath);
Assert.Equal("CurrentGPM", evt1.AttributeChanged.AttributeName);
Assert.Equal("125.3", evt1.AttributeChanged.Value);
Assert.Equal(Quality.Good, evt1.AttributeChanged.Quality);
Assert.Equal(Timestamp.FromDateTimeOffset(ts1), evt1.AttributeChanged.Timestamp);
var evt2 = writtenEvents[1];
Assert.Equal("integ-attr-1", evt2.CorrelationId);
Assert.Equal("Modules.Pressure", evt2.AttributeChanged.AttributePath);
Assert.Equal(Quality.Uncertain, evt2.AttributeChanged.Quality);
}
///
/// End-to-end pipeline test for alarm state changes.
///
[Fact]
public async Task Pipeline_AlarmStateChanged_FlowsToResponseStream()
{
IActorRef? relayActor = null;
var subscriber = Substitute.For();
subscriber.Subscribe(Arg.Any(), Arg.Any())
.Returns(ci =>
{
relayActor = ci.Arg();
return "sub-integ-2";
});
var server = new SiteStreamGrpcServer(
subscriber,
NullLogger.Instance);
server.SetReady(Sys);
var writtenEvents = new List();
var writer = Substitute.For>();
writer.WriteAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask)
.AndDoes(ci => writtenEvents.Add(ci.Arg()));
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var request = new InstanceStreamRequest
{
CorrelationId = "integ-alarm-1",
InstanceUniqueName = "SiteA.Pump01"
};
var streamTask = Task.Run(() => server.SubscribeInstance(request, writer, context));
await WaitForConditionAsync(() => relayActor != null);
var ts = new DateTimeOffset(2026, 3, 21, 14, 5, 0, TimeSpan.Zero);
relayActor!.Tell(new AlarmStateChanged(
"SiteA.Pump01", "HighPressure",
Commons.Types.Enums.AlarmState.Active, 3, ts));
await WaitForConditionAsync(() => writtenEvents.Count >= 1);
cts.Cancel();
await streamTask;
Assert.Single(writtenEvents);
var evt = writtenEvents[0];
Assert.Equal("integ-alarm-1", evt.CorrelationId);
Assert.Equal(SiteStreamEvent.EventOneofCase.AlarmChanged, evt.EventCase);
Assert.Equal("SiteA.Pump01", evt.AlarmChanged.InstanceUniqueName);
Assert.Equal("HighPressure", evt.AlarmChanged.AlarmName);
Assert.Equal(AlarmStateEnum.AlarmStateActive, evt.AlarmChanged.State);
Assert.Equal(3, evt.AlarmChanged.Priority);
Assert.Equal(Timestamp.FromDateTimeOffset(ts), evt.AlarmChanged.Timestamp);
}
///
/// Verifies that mixed event types (attribute + alarm) flow through the same stream.
///
[Fact]
public async Task Pipeline_MixedEvents_FlowInOrder()
{
IActorRef? relayActor = null;
var subscriber = Substitute.For();
subscriber.Subscribe(Arg.Any(), Arg.Any())
.Returns(ci =>
{
relayActor = ci.Arg();
return "sub-integ-3";
});
var server = new SiteStreamGrpcServer(
subscriber,
NullLogger.Instance);
server.SetReady(Sys);
var writtenEvents = new List();
var writer = Substitute.For>();
writer.WriteAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask)
.AndDoes(ci => writtenEvents.Add(ci.Arg()));
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var request = new InstanceStreamRequest
{
CorrelationId = "integ-mixed-1",
InstanceUniqueName = "SiteB.Motor01"
};
var streamTask = Task.Run(() => server.SubscribeInstance(request, writer, context));
await WaitForConditionAsync(() => relayActor != null);
var ts = DateTimeOffset.UtcNow;
// Send interleaved attribute and alarm events
relayActor!.Tell(new AttributeValueChanged(
"SiteB.Motor01", "Speed", "RPM", 1750.0, "Good", ts));
relayActor.Tell(new AlarmStateChanged(
"SiteB.Motor01", "OverSpeed",
Commons.Types.Enums.AlarmState.Active, 4, ts));
relayActor.Tell(new AttributeValueChanged(
"SiteB.Motor01", "Temperature", "BearingTemp", 85.2, "Good", ts));
await WaitForConditionAsync(() => writtenEvents.Count >= 3);
cts.Cancel();
await streamTask;
Assert.Equal(3, writtenEvents.Count);
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, writtenEvents[0].EventCase);
Assert.Equal(SiteStreamEvent.EventOneofCase.AlarmChanged, writtenEvents[1].EventCase);
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, writtenEvents[2].EventCase);
Assert.All(writtenEvents, e => Assert.Equal("integ-mixed-1", e.CorrelationId));
}
///
/// Verifies that when a stream is cancelled, the subscriber is cleaned up
/// and the active stream count returns to zero.
///
[Fact]
public async Task Pipeline_Cancellation_CleansUpRelayActorAndSubscription()
{
var subscriber = Substitute.For();
subscriber.Subscribe(Arg.Any(), Arg.Any())
.Returns("sub-integ-4");
var server = new SiteStreamGrpcServer(
subscriber,
NullLogger.Instance);
server.SetReady(Sys);
var writer = Substitute.For>();
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var request = new InstanceStreamRequest
{
CorrelationId = "integ-cleanup-1",
InstanceUniqueName = "SiteC.Valve01"
};
var streamTask = Task.Run(() => server.SubscribeInstance(request, writer, context));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Cancel the stream
cts.Cancel();
await streamTask;
// Verify cleanup
Assert.Equal(0, server.ActiveStreamCount);
subscriber.Received(1).Subscribe("SiteC.Valve01", Arg.Any());
subscriber.Received(1).RemoveSubscriber(Arg.Any());
}
///
/// Verifies that a duplicate correlation ID cancels the first stream and
/// the second stream continues to receive events.
///
[Fact]
public async Task Pipeline_DuplicateCorrelationId_ReplacesStream()
{
IActorRef? relayActor2 = null;
var callCount = 0;
var subscriber = Substitute.For();
subscriber.Subscribe(Arg.Any(), Arg.Any())
.Returns(ci =>
{
callCount++;
if (callCount == 2)
relayActor2 = ci.Arg();
return $"sub-dup-{callCount}";
});
var server = new SiteStreamGrpcServer(
subscriber,
NullLogger.Instance);
server.SetReady(Sys);
// First stream
var writer1 = Substitute.For>();
var cts1 = new CancellationTokenSource();
var context1 = CreateMockContext(cts1.Token);
var stream1Task = Task.Run(() => server.SubscribeInstance(
new InstanceStreamRequest { CorrelationId = "integ-dup", InstanceUniqueName = "SiteA.Pump01" },
writer1, context1));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Second stream with same correlation ID -- should cancel first
var writtenEvents2 = new List();
var writer2 = Substitute.For>();
writer2.WriteAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask)
.AndDoes(ci => writtenEvents2.Add(ci.Arg()));
var cts2 = new CancellationTokenSource();
var context2 = CreateMockContext(cts2.Token);
var stream2Task = Task.Run(() => server.SubscribeInstance(
new InstanceStreamRequest { CorrelationId = "integ-dup", InstanceUniqueName = "SiteA.Pump01" },
writer2, context2));
// First stream should complete
await stream1Task;
await WaitForConditionAsync(() => relayActor2 != null);
// Send event to second relay
var ts = DateTimeOffset.UtcNow;
relayActor2!.Tell(new AttributeValueChanged(
"SiteA.Pump01", "Flow", "GPM", 100.0, "Good", ts));
await WaitForConditionAsync(() => writtenEvents2.Count >= 1);
cts2.Cancel();
await stream2Task;
Assert.Single(writtenEvents2);
Assert.Equal("integ-dup", writtenEvents2[0].CorrelationId);
}
private static ServerCallContext CreateMockContext(CancellationToken cancellationToken = default)
{
var context = Substitute.For();
context.CancellationToken.Returns(cancellationToken);
return context;
}
private static async Task WaitForConditionAsync(Func condition, int timeoutMs = 5000)
{
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
while (!condition() && DateTime.UtcNow < deadline)
{
await Task.Delay(25);
}
Assert.True(condition(), $"Condition not met within {timeoutMs}ms");
}
}