feat(scadabridge): track scadabridge.site.connection.up over site-stream lifetime (balanced open/close)
This commit is contained in:
@@ -7,6 +7,7 @@ using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using GrpcStatus = Grpc.Core.Status;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
||||
@@ -264,6 +265,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
|
||||
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
|
||||
|
||||
// Telemetry follow-on: the connection is now fully established (Subscribe
|
||||
// succeeded, so no leak via the catch above). Count it up here and balance
|
||||
// it in the finally below so the scadabridge.site.connection.up gauge is
|
||||
// decremented on EVERY exit path — normal completion, client-cancel /
|
||||
// duplicate-replacement (OperationCanceledException), server shutdown
|
||||
// (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing
|
||||
// exactly one Closed per Opened and a gauge that never drifts up.
|
||||
ScadaBridgeTelemetry.SiteConnectionOpened();
|
||||
try
|
||||
{
|
||||
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
|
||||
@@ -277,6 +286,7 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
}
|
||||
finally
|
||||
{
|
||||
ScadaBridgeTelemetry.SiteConnectionClosed();
|
||||
_streamSubscriber.RemoveSubscriber(relayActor);
|
||||
_actorSystem!.Stop(relayActor);
|
||||
channel.Writer.TryComplete();
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Threading.Channels;
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
@@ -5,6 +6,7 @@ using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NSubstitute;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
|
||||
@@ -342,6 +344,59 @@ public class SiteStreamGrpcServerTests : TestKit
|
||||
subscriber.DidNotReceive().RemoveSubscriber(Arg.Any<IActorRef>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SiteConnectionUpGauge_GoesToOneOnConnect_AndBackToZeroOnCancel()
|
||||
{
|
||||
// Telemetry follow-on: the scadabridge.site.connection.up gauge must read
|
||||
// exactly 1 while a site stream is established and return to 0 once the
|
||||
// stream terminates on the cancel path — proving SiteConnectionOpened() is
|
||||
// matched by exactly one SiteConnectionClosed() in the handler's finally.
|
||||
var server = CreateServer();
|
||||
server.SetReady(Sys);
|
||||
|
||||
long ReadGauge()
|
||||
{
|
||||
long observed = 0;
|
||||
using var listener = new MeterListener();
|
||||
listener.InstrumentPublished = (instrument, l) =>
|
||||
{
|
||||
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
|
||||
instrument.Name == "scadabridge.site.connection.up")
|
||||
{
|
||||
l.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
};
|
||||
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
|
||||
listener.Start();
|
||||
listener.RecordObservableInstruments();
|
||||
return observed;
|
||||
}
|
||||
|
||||
var baseline = ReadGauge();
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
var context = CreateMockContext(cts.Token);
|
||||
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
|
||||
var streamTask = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest("corr-gauge", "Site1.Pump01"), writer, context));
|
||||
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||
|
||||
// While the stream is up the gauge is one above whatever baseline other
|
||||
// (possibly parallel) tests left behind — read relative so the assertion
|
||||
// is robust to test interleaving on the process-wide static counter.
|
||||
Assert.Equal(baseline + 1, ReadGauge());
|
||||
|
||||
cts.Cancel();
|
||||
await streamTask;
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 0);
|
||||
|
||||
// After the cancel path runs the finally, the gauge is balanced back to
|
||||
// the baseline — no leaked "up" count.
|
||||
Assert.Equal(baseline, ReadGauge());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetReady_AllowsStreamCreation()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user