Files
ScadaBridge/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/StreamRelayActorTests.cs
T
Joseph Doherty e7660134f2 fix(communication): drop IsConfiguredPlaceholder rows in StreamRelayActor before gRPC pack
Placeholder AlarmStateChanged rows are a DebugView snapshot-only concept emitted
by InstanceActor.BuildAlarmStatesSnapshot; they are never a real alarm transition.
Their timestamp may be DateTimeOffset.MinValue (the Protobuf Timestamp lower boundary),
which can throw when packed via Timestamp.FromDateTimeOffset.

Added early-return guard at the top of HandleAlarmStateChanged before any timestamp
pack or channel write. Updated the existing NativeBindingLinkage round-trip test to
use a real (non-placeholder) native alarm; added DropsAlarmStateChanged_WhenIsConfiguredPlaceholder
to assert placeholders are silently dropped (15/15 pass).
2026-06-17 15:44:28 -04:00

320 lines
13 KiB
C#

using System.Threading.Channels;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Google.Protobuf.WellKnownTypes;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Communication.Actors;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
using AlarmState = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState;
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
public class StreamRelayActorTests : TestKit
{
[Fact]
public void RelaysAttributeValueChanged_ToProtoEvent()
{
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var correlationId = "corr-attr-1";
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor(correlationId, channel.Writer)));
var timestamp = new DateTimeOffset(2026, 3, 21, 10, 30, 0, TimeSpan.Zero);
var domainEvent = new AttributeValueChanged(
"Site1.Pump01", "Modules.Pressure", "CurrentPSI", 42.5, "Good", timestamp);
actor.Tell(domainEvent);
var success = channel.Reader.TryRead(out var protoEvent);
if (!success)
{
// Give a moment for async processing
Thread.Sleep(500);
success = channel.Reader.TryRead(out protoEvent);
}
Assert.True(success, "Expected a proto event on the channel");
Assert.NotNull(protoEvent);
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, protoEvent.EventCase);
Assert.Equal(correlationId, protoEvent.CorrelationId);
var attr = protoEvent.AttributeChanged;
Assert.Equal("Site1.Pump01", attr.InstanceUniqueName);
Assert.Equal("Modules.Pressure", attr.AttributePath);
Assert.Equal("CurrentPSI", attr.AttributeName);
Assert.Equal("42.5", attr.Value);
Assert.Equal(Quality.Good, attr.Quality);
Assert.Equal(Timestamp.FromDateTimeOffset(timestamp), attr.Timestamp);
}
[Fact]
public void RelaysAlarmStateChanged_ToProtoEvent()
{
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var correlationId = "corr-alarm-1";
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor(correlationId, channel.Writer)));
var timestamp = new DateTimeOffset(2026, 3, 21, 11, 0, 0, TimeSpan.Zero);
var raiseTime = new DateTimeOffset(2026, 3, 21, 10, 0, 0, TimeSpan.Zero);
var domainEvent = new AlarmStateChanged(
"Site1.Pump01", "T01.Hi", AlarmState.Active, 900, timestamp)
{
Kind = AlarmKind.NativeMxAccess,
SourceReference = "T01.Hi",
AlarmTypeName = "AnalogLimit.Hi",
Category = "Process",
OperatorUser = "op1",
OperatorComment = "ack",
OriginalRaiseTime = raiseTime,
CurrentValue = "92",
LimitValue = "90",
Condition = new AlarmConditionState(
Active: true, Acknowledged: true, Confirmed: null,
Shelve: AlarmShelveState.OneShotShelved, Suppressed: false, Severity: 900)
};
actor.Tell(domainEvent);
var success = channel.Reader.TryRead(out var protoEvent);
if (!success)
{
Thread.Sleep(500);
success = channel.Reader.TryRead(out protoEvent);
}
Assert.True(success, "Expected a proto event on the channel");
Assert.NotNull(protoEvent);
Assert.Equal(SiteStreamEvent.EventOneofCase.AlarmChanged, protoEvent.EventCase);
Assert.Equal(correlationId, protoEvent.CorrelationId);
var alarm = protoEvent.AlarmChanged;
Assert.Equal("Site1.Pump01", alarm.InstanceUniqueName);
Assert.Equal("T01.Hi", alarm.AlarmName);
Assert.Equal(AlarmStateEnum.AlarmStateActive, alarm.State);
Assert.Equal(900, alarm.Priority);
Assert.Equal(Timestamp.FromDateTimeOffset(timestamp), alarm.Timestamp);
// Native enrichment mapped out.
Assert.Equal("NativeMxAccess", alarm.Kind);
Assert.True(alarm.Active);
Assert.True(alarm.Acknowledged);
Assert.Equal("OneShotShelved", alarm.ShelveState);
Assert.False(alarm.Suppressed);
Assert.Equal("T01.Hi", alarm.SourceReference);
Assert.Equal("AnalogLimit.Hi", alarm.AlarmTypeName);
Assert.Equal("Process", alarm.Category);
Assert.Equal("op1", alarm.OperatorUser);
Assert.Equal("ack", alarm.OperatorComment);
Assert.Equal(Timestamp.FromDateTimeOffset(raiseTime), alarm.OriginalRaiseTime);
Assert.Equal("92", alarm.CurrentValue);
Assert.Equal("90", alarm.LimitValue);
}
[Fact]
public void RelaysAlarmStateChanged_NativeBindingLinkage_SurvivesFullRoundTrip()
{
// DV-1: the native source-binding canonical name must pack into AlarmStateUpdate
// (StreamRelayActor) and unpack back out (SiteStreamGrpcClient.ConvertToDomainEvent)
// — the full cross-process round trip for a real (non-placeholder) native alarm.
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor("corr-native-binding", channel.Writer)));
var timestamp = new DateTimeOffset(2026, 3, 21, 11, 0, 0, TimeSpan.Zero);
var domainEvent = new AlarmStateChanged(
"Site1.Motor01", "Motor1.MotorAlarms.Hi", AlarmState.Active, 900, timestamp)
{
Kind = AlarmKind.NativeOpcUa,
SourceReference = "Motor1.MotorAlarms.Hi",
NativeSourceCanonicalName = "Motor1.MotorAlarms",
IsConfiguredPlaceholder = false,
Condition = new AlarmConditionState(
Active: true, Acknowledged: false, Confirmed: null,
Shelve: AlarmShelveState.Unshelved, Suppressed: false, Severity: 900)
};
actor.Tell(domainEvent);
var success = channel.Reader.TryRead(out var protoEvent);
if (!success)
{
Thread.Sleep(500);
success = channel.Reader.TryRead(out protoEvent);
}
Assert.True(success, "Expected a proto event on the channel");
Assert.NotNull(protoEvent);
// Packed onto the wire frame by StreamRelayActor.
Assert.Equal("Motor1.MotorAlarms", protoEvent.AlarmChanged.NativeSourceCanonicalName);
Assert.False(protoEvent.AlarmChanged.IsConfiguredPlaceholder);
// Unpacked back into the domain record by SiteStreamGrpcClient.
var roundTripped = Assert.IsType<AlarmStateChanged>(
SiteStreamGrpcClient.ConvertToDomainEvent(protoEvent));
Assert.Equal("Motor1.MotorAlarms", roundTripped.NativeSourceCanonicalName);
Assert.False(roundTripped.IsConfiguredPlaceholder);
}
[Fact]
public void DropsAlarmStateChanged_WhenIsConfiguredPlaceholder()
{
// Placeholder rows are a Debug View snapshot-only concept emitted by
// InstanceActor.BuildAlarmStatesSnapshot. They are never a real alarm
// transition; their timestamp may be DateTimeOffset.MinValue (the Protobuf
// Timestamp boundary). The relay must drop them without writing to the channel.
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor("corr-placeholder-drop", channel.Writer)));
var domainEvent = new AlarmStateChanged(
"Site1.Tank01", "Tank1.Levels.Hi", AlarmState.Normal, 500,
DateTimeOffset.MinValue)
{
Kind = AlarmKind.NativeOpcUa,
NativeSourceCanonicalName = "Tank1.Levels",
IsConfiguredPlaceholder = true,
Condition = new AlarmConditionState(
Active: false, Acknowledged: false, Confirmed: null,
Shelve: AlarmShelveState.Unshelved, Suppressed: false, Severity: 0)
};
actor.Tell(domainEvent);
// Allow time for the actor to process the message.
Thread.Sleep(500);
// Nothing must have been written — placeholder is silently dropped.
Assert.False(channel.Reader.TryRead(out _),
"Placeholder AlarmStateChanged must not be relayed to the gRPC stream");
}
[Fact]
public void SetsCorrelationId_OnAllEvents()
{
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var correlationId = "corr-multi-42";
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor(correlationId, channel.Writer)));
var ts = DateTimeOffset.UtcNow;
actor.Tell(new AttributeValueChanged("Inst1", "Path", "Name", 1, "Good", ts));
actor.Tell(new AlarmStateChanged("Inst1", "Alarm1", AlarmState.Normal, 1, ts));
actor.Tell(new AttributeValueChanged("Inst2", "Path2", "Name2", null, "Bad", ts));
// Allow messages to process
Thread.Sleep(500);
var events = new List<SiteStreamEvent>();
while (channel.Reader.TryRead(out var evt))
{
events.Add(evt);
}
Assert.Equal(3, events.Count);
Assert.All(events, e => Assert.Equal(correlationId, e.CorrelationId));
}
[Fact]
public void DropsEvent_WhenChannelFull()
{
var channel = Channel.CreateBounded<SiteStreamEvent>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.Wait
});
var correlationId = "corr-drop-1";
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor(correlationId, channel.Writer)));
var ts = DateTimeOffset.UtcNow;
// Fill the channel with one item directly
var filler = new SiteStreamEvent { CorrelationId = "filler" };
Assert.True(channel.Writer.TryWrite(filler));
// Send another event — should be dropped (channel full), no exception
actor.Tell(new AttributeValueChanged("Inst1", "Path", "Name", 1, "Good", ts));
// Allow message to process
Thread.Sleep(500);
// Channel should still have exactly 1 item (the filler)
Assert.True(channel.Reader.TryRead(out var item));
Assert.Equal("filler", item.CorrelationId);
Assert.False(channel.Reader.TryRead(out _));
}
[Theory]
[InlineData("Good", Quality.Good)]
[InlineData("Uncertain", Quality.Uncertain)]
[InlineData("Bad", Quality.Bad)]
[InlineData("Unknown", Quality.Unspecified)]
[InlineData("", Quality.Unspecified)]
[InlineData("good", Quality.Unspecified)]
public void MapsQualityString_ToProtoEnum(string qualityString, Quality expectedProto)
{
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor("corr", channel.Writer)));
var ts = DateTimeOffset.UtcNow;
actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", 1, qualityString, ts));
Thread.Sleep(500);
Assert.True(channel.Reader.TryRead(out var evt));
Assert.Equal(expectedProto, evt.AttributeChanged.Quality);
}
[Fact]
public void NullValue_MapsToEmptyString()
{
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor("corr", channel.Writer)));
var ts = DateTimeOffset.UtcNow;
actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", null, "Good", ts));
Thread.Sleep(500);
Assert.True(channel.Reader.TryRead(out var evt));
Assert.Equal("", evt.AttributeChanged.Value);
}
[Fact]
public void ListValue_EncodesAsJsonArray()
{
// List attributes must cross the wire as JSON, not as a comma-joined display string.
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor("corr-list", channel.Writer)));
var ts = DateTimeOffset.UtcNow;
actor.Tell(new AttributeValueChanged("Inst", "Path", "Tags",
new List<string> { "a", "b" }, "Good", ts));
Thread.Sleep(500);
Assert.True(channel.Reader.TryRead(out var evt));
Assert.Equal("[\"a\",\"b\"]", evt.AttributeChanged.Value);
}
[Fact]
public void ScalarStringValue_PassesThroughUnchanged()
{
// Scalar strings must not be double-encoded.
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
var actor = Sys.ActorOf(Props.Create(() =>
new StreamRelayActor("corr-scalar", channel.Writer)));
var ts = DateTimeOffset.UtcNow;
actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", "x", "Good", ts));
Thread.Sleep(500);
Assert.True(channel.Reader.TryRead(out var evt));
Assert.Equal("x", evt.AttributeChanged.Value);
}
}