using System.Diagnostics.Metrics; using System.Threading.Channels; using Akka.Actor; using Akka.TestKit.Xunit2; using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Communication.Grpc; namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc; public class SiteStreamGrpcServerTests : TestKit { private readonly ISiteStreamSubscriber _subscriber; private readonly ILogger _logger; public SiteStreamGrpcServerTests() { _subscriber = Substitute.For(); _subscriber.Subscribe(Arg.Any(), Arg.Any()) .Returns("sub-1"); _logger = NullLogger.Instance; } private SiteStreamGrpcServer CreateServer(int maxStreams = 100) { return new SiteStreamGrpcServer(_subscriber, _logger, maxStreams); } private static InstanceStreamRequest MakeRequest(string correlationId = "corr-1", string instance = "Site1.Pump01") { return new InstanceStreamRequest { CorrelationId = correlationId, InstanceUniqueName = instance }; } [Fact] public async Task RejectsWhenNotReady() { var server = CreateServer(); // Do NOT call SetReady() var writer = Substitute.For>(); var context = CreateMockContext(); var ex = await Assert.ThrowsAsync( () => server.SubscribeInstance(MakeRequest(), writer, context)); Assert.Equal(StatusCode.Unavailable, ex.StatusCode); } [Fact] public async Task RejectsWhenMaxStreamsReached() { var server = CreateServer(maxStreams: 1); server.SetReady(Sys); // Start one stream that blocks var cts1 = new CancellationTokenSource(); var context1 = CreateMockContext(cts1.Token); var writer1 = Substitute.For>(); var stream1Task = Task.Run(() => server.SubscribeInstance( MakeRequest("corr-1"), writer1, context1)); // Wait for the first stream to register await WaitForConditionAsync(() => server.ActiveStreamCount == 1); // Second stream should be rejected var writer2 = Substitute.For>(); var context2 = CreateMockContext(); var ex = await Assert.ThrowsAsync( () => server.SubscribeInstance(MakeRequest("corr-2"), writer2, context2)); Assert.Equal(StatusCode.ResourceExhausted, ex.StatusCode); // Clean up first stream cts1.Cancel(); await stream1Task; } [Fact] public async Task CancelsDuplicateCorrelationId() { var server = CreateServer(); server.SetReady(Sys); var cts1 = new CancellationTokenSource(); var context1 = CreateMockContext(cts1.Token); var writer1 = Substitute.For>(); // Start first stream var stream1Task = Task.Run(() => server.SubscribeInstance( MakeRequest("corr-dup"), writer1, context1)); await WaitForConditionAsync(() => server.ActiveStreamCount == 1); // Start second stream with same correlationId -- should cancel first var cts2 = new CancellationTokenSource(); var context2 = CreateMockContext(cts2.Token); var writer2 = Substitute.For>(); var stream2Task = Task.Run(() => server.SubscribeInstance( MakeRequest("corr-dup"), writer2, context2)); // First stream should complete (cancelled by duplicate replacement) await stream1Task; // Second stream should be active await WaitForConditionAsync(() => server.ActiveStreamCount == 1); // Clean up cts2.Cancel(); await stream2Task; } [Fact] public async Task CleansUpOnCancellation() { var server = CreateServer(); server.SetReady(Sys); var cts = new CancellationTokenSource(); var context = CreateMockContext(cts.Token); var writer = Substitute.For>(); var streamTask = Task.Run(() => server.SubscribeInstance( MakeRequest("corr-cleanup"), writer, context)); await WaitForConditionAsync(() => server.ActiveStreamCount == 1); cts.Cancel(); await streamTask; Assert.Equal(0, server.ActiveStreamCount); } // --- Host-017 / REQ-HOST-7: site-shutdown ordering --- [Fact] public async Task Host017_CancelAllStreams_CancelsActiveStreamsAndRefusesNewOnes() { // REQ-HOST-7 step (1)+(2): on CoordinatedShutdown the gRPC server must // stop accepting new streams AND cancel every active stream so the // client observes a clean Cancelled (not a silent stream that only // times out via keepalive). Program.cs registers // ApplicationStopping → CancelAllStreams(); this test exercises the // server-side guarantee in isolation. var server = CreateServer(); server.SetReady(Sys); var cts1 = new CancellationTokenSource(); var context1 = CreateMockContext(cts1.Token); var writer1 = Substitute.For>(); var stream1Task = Task.Run(() => server.SubscribeInstance( MakeRequest("corr-shutdown-1"), writer1, context1)); await WaitForConditionAsync(() => server.ActiveStreamCount == 1); // Begin shutdown — flip the flag AND cancel the active stream. server.CancelAllStreams(); Assert.True(server.IsShuttingDown); // Active stream's await foreach observes OCE and falls through finally // → entry is removed from _activeStreams. await stream1Task; Assert.Equal(0, server.ActiveStreamCount); // A second SubscribeInstance after shutdown is refused immediately // with Unavailable rather than allowed to register a new stream. var writer2 = Substitute.For>(); var context2 = CreateMockContext(); var ex = await Assert.ThrowsAsync( () => server.SubscribeInstance(MakeRequest("corr-shutdown-2"), writer2, context2)); Assert.Equal(StatusCode.Unavailable, ex.StatusCode); Assert.Contains("shutting", ex.Status.Detail, StringComparison.OrdinalIgnoreCase); } [Fact] public void Host017_CancelAllStreams_IsIdempotent() { // Repeated calls during a double-fire shutdown sequence must not throw. var server = CreateServer(); server.SetReady(Sys); server.CancelAllStreams(); server.CancelAllStreams(); Assert.True(server.IsShuttingDown); Assert.Equal(0, server.ActiveStreamCount); } [Fact] public async Task SubscribesAndRemovesFromStreamManager() { var server = CreateServer(); server.SetReady(Sys); var cts = new CancellationTokenSource(); var context = CreateMockContext(cts.Token); var writer = Substitute.For>(); var streamTask = Task.Run(() => server.SubscribeInstance( MakeRequest("corr-sub", "Site1.Motor01"), writer, context)); await WaitForConditionAsync(() => server.ActiveStreamCount == 1); // Verify Subscribe was called _subscriber.Received(1).Subscribe("Site1.Motor01", Arg.Any()); cts.Cancel(); await streamTask; // Verify RemoveSubscriber was called _subscriber.Received(1).RemoveSubscriber(Arg.Any()); } [Fact] public async Task WritesEventsToResponseStream() { var server = CreateServer(); server.SetReady(Sys); // Capture the relay actor so we can send it events IActorRef? capturedActor = null; _subscriber.Subscribe(Arg.Any(), Arg.Any()) .Returns(ci => { capturedActor = ci.Arg(); return "sub-write"; }); var cts = new CancellationTokenSource(); var context = CreateMockContext(cts.Token); var writer = Substitute.For>(); var writtenEvents = new List(); writer.WriteAsync(Arg.Any(), Arg.Any()) .Returns(Task.CompletedTask) .AndDoes(ci => writtenEvents.Add(ci.Arg())); var streamTask = Task.Run(() => server.SubscribeInstance( MakeRequest("corr-write", "Site1.Pump01"), writer, context)); await WaitForConditionAsync(() => capturedActor != null); // Send a domain event to the relay actor var ts = DateTimeOffset.UtcNow; capturedActor!.Tell(new Commons.Messages.Streaming.AttributeValueChanged( "Site1.Pump01", "Path", "Attr", 99.5, "Good", ts)); // Wait for event to be written await WaitForConditionAsync(() => writtenEvents.Count >= 1); Assert.Single(writtenEvents); Assert.Equal("corr-write", writtenEvents[0].CorrelationId); Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, writtenEvents[0].EventCase); cts.Cancel(); await streamTask; } [Theory] [InlineData("corr/with/slash")] [InlineData("corr with space")] [InlineData("")] [InlineData("$weird")] public async Task RejectsCorrelationIdThatIsNotActorNameSafe(string badCorrelationId) { // Communication-014 regression: a public gRPC SubscribeInstance must not feed // an untrusted correlation_id straight into an Akka actor name. An unsafe id // must be rejected cleanly with InvalidArgument rather than escaping as an // unhandled InvalidActorNameException. var server = CreateServer(); server.SetReady(Sys); var writer = Substitute.For>(); var context = CreateMockContext(); var ex = await Assert.ThrowsAsync( () => server.SubscribeInstance(MakeRequest(badCorrelationId), writer, context)); Assert.Equal(StatusCode.InvalidArgument, ex.StatusCode); Assert.Equal(0, server.ActiveStreamCount); } [Fact] public async Task AcceptsActorNameSafeCorrelationId() { // A normal GUID-style correlation id (what central always supplies) is accepted. var server = CreateServer(); server.SetReady(Sys); var cts = new CancellationTokenSource(); var context = CreateMockContext(cts.Token); var writer = Substitute.For>(); var streamTask = Task.Run(() => server.SubscribeInstance( MakeRequest(Guid.NewGuid().ToString()), writer, context)); await WaitForConditionAsync(() => server.ActiveStreamCount == 1); cts.Cancel(); await streamTask; } [Fact] public async Task Comm021_SubscribeThrows_StopsRelayActorAndRemovesActiveStreamEntry() { // Communication-021 regression: SubscribeInstance creates a StreamRelayActor // and registers an _activeStreams entry BEFORE calling _streamSubscriber.Subscribe. // If Subscribe throws (e.g. stale instance, site runtime shutting down) and the // pre-fix code lets the throw escape without the wrapping try, the relay actor // and the activeStreams entry both leak. The fix wraps the Subscribe call so the // catch deterministically stops the actor and removes the entry before re-throw. var subscriber = Substitute.For(); subscriber.Subscribe(Arg.Any(), Arg.Any()) .Returns(_ => throw new InvalidOperationException("instance not found")); var server = new SiteStreamGrpcServer(subscriber, _logger); server.SetReady(Sys); var writer = Substitute.For>(); var context = CreateMockContext(); // The InvalidOperationException is expected to propagate (the gRPC stack maps // unhandled throws to Internal); the load-bearing assertion is the cleanup. await Assert.ThrowsAsync( () => server.SubscribeInstance(MakeRequest("corr-comm021"), writer, context)); // _activeStreams entry was inserted before Subscribe was called; the catch // must remove it so a follow-up subscription with the same correlation id is // not blocked, and the relay actor must be stopped so it does not leak. Assert.Equal(0, server.ActiveStreamCount); // RemoveSubscriber must NOT have been called (Subscribe never returned a // subscription id) — verifying we hit the catch path, not the finally path. subscriber.DidNotReceive().RemoveSubscriber(Arg.Any()); } [Fact] public async Task SiteConnectionUpGauge_GoesToOneOnConnect_AndBackToZeroOnCancel() { // Telemetry follow-on: the scadabridge.site.connection.up gauge must read // exactly 1 while a site stream is established and return to 0 once the // stream terminates on the cancel path — proving SiteConnectionOpened() is // matched by exactly one SiteConnectionClosed() in the handler's finally. var server = CreateServer(); server.SetReady(Sys); long ReadGauge() { long observed = 0; using var listener = new MeterListener(); listener.InstrumentPublished = (instrument, l) => { if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName && instrument.Name == "scadabridge.site.connection.up") { l.EnableMeasurementEvents(instrument); } }; listener.SetMeasurementEventCallback((_, measurement, _, _) => observed = measurement); listener.Start(); listener.RecordObservableInstruments(); return observed; } var baseline = ReadGauge(); var cts = new CancellationTokenSource(); var context = CreateMockContext(cts.Token); var writer = Substitute.For>(); var streamTask = Task.Run(() => server.SubscribeInstance( MakeRequest("corr-gauge", "Site1.Pump01"), writer, context)); await WaitForConditionAsync(() => server.ActiveStreamCount == 1); // While the stream is up the gauge is one above whatever baseline other // (possibly parallel) tests left behind — read relative so the assertion // is robust to test interleaving on the process-wide static counter. Assert.Equal(baseline + 1, ReadGauge()); cts.Cancel(); await streamTask; await WaitForConditionAsync(() => server.ActiveStreamCount == 0); // After the cancel path runs the finally, the gauge is balanced back to // the baseline — no leaked "up" count. Assert.Equal(baseline, ReadGauge()); } [Fact] public void SetReady_AllowsStreamCreation() { var server = CreateServer(); // Initially not ready -- just verify the property works server.SetReady(Sys); // No assertion needed -- the other tests verify that SetReady enables streaming Assert.Equal(0, server.ActiveStreamCount); } private static ServerCallContext CreateMockContext(CancellationToken cancellationToken = default) { var context = Substitute.For(); context.CancellationToken.Returns(cancellationToken); return context; } private static async Task WaitForConditionAsync(Func condition, int timeoutMs = 5000) { var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs); while (!condition() && DateTime.UtcNow < deadline) { await Task.Delay(25); } Assert.True(condition(), $"Condition not met within {timeoutMs}ms"); } }