diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs index 00a1393d..5900f6e5 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Options; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using GrpcStatus = Grpc.Core.Status; namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc; @@ -264,6 +265,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase "Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})", request.CorrelationId, request.InstanceUniqueName, subscriptionId); + // Telemetry follow-on: the connection is now fully established (Subscribe + // succeeded, so no leak via the catch above). Count it up here and balance + // it in the finally below so the scadabridge.site.connection.up gauge is + // decremented on EVERY exit path — normal completion, client-cancel / + // duplicate-replacement (OperationCanceledException), server shutdown + // (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing + // exactly one Closed per Opened and a gauge that never drifts up. + ScadaBridgeTelemetry.SiteConnectionOpened(); try { await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token)) @@ -277,6 +286,7 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase } finally { + ScadaBridgeTelemetry.SiteConnectionClosed(); _streamSubscriber.RemoveSubscriber(relayActor); _actorSystem!.Stop(relayActor); channel.Writer.TryComplete(); diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs index ffb4fe2b..81e00fc0 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs @@ -1,3 +1,4 @@ +using System.Diagnostics.Metrics; using System.Threading.Channels; using Akka.Actor; using Akka.TestKit.Xunit2; @@ -5,6 +6,7 @@ using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Communication.Grpc; namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc; @@ -342,6 +344,59 @@ public class SiteStreamGrpcServerTests : TestKit subscriber.DidNotReceive().RemoveSubscriber(Arg.Any()); } + [Fact] + public async Task SiteConnectionUpGauge_GoesToOneOnConnect_AndBackToZeroOnCancel() + { + // Telemetry follow-on: the scadabridge.site.connection.up gauge must read + // exactly 1 while a site stream is established and return to 0 once the + // stream terminates on the cancel path — proving SiteConnectionOpened() is + // matched by exactly one SiteConnectionClosed() in the handler's finally. + var server = CreateServer(); + server.SetReady(Sys); + + long ReadGauge() + { + long observed = 0; + using var listener = new MeterListener(); + listener.InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName && + instrument.Name == "scadabridge.site.connection.up") + { + l.EnableMeasurementEvents(instrument); + } + }; + listener.SetMeasurementEventCallback((_, measurement, _, _) => observed = measurement); + listener.Start(); + listener.RecordObservableInstruments(); + return observed; + } + + var baseline = ReadGauge(); + + var cts = new CancellationTokenSource(); + var context = CreateMockContext(cts.Token); + var writer = Substitute.For>(); + + var streamTask = Task.Run(() => server.SubscribeInstance( + MakeRequest("corr-gauge", "Site1.Pump01"), writer, context)); + + await WaitForConditionAsync(() => server.ActiveStreamCount == 1); + + // While the stream is up the gauge is one above whatever baseline other + // (possibly parallel) tests left behind — read relative so the assertion + // is robust to test interleaving on the process-wide static counter. + Assert.Equal(baseline + 1, ReadGauge()); + + cts.Cancel(); + await streamTask; + await WaitForConditionAsync(() => server.ActiveStreamCount == 0); + + // After the cancel path runs the finally, the gauge is balanced back to + // the baseline — no leaked "up" count. + Assert.Equal(baseline, ReadGauge()); + } + [Fact] public void SetReady_AllowsStreamCreation() {