From d70bbbe739d5ff61ad61d48d96347e5c80e9cfd9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 21 Mar 2026 11:48:04 -0400 Subject: [PATCH] feat: add StreamRelayActor bridging Akka events to gRPC proto channel --- .../Actors/StreamRelayActor.cs | 90 +++++++++ .../Grpc/StreamRelayActorTests.cs | 176 ++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 src/ScadaLink.Communication/Actors/StreamRelayActor.cs create mode 100644 tests/ScadaLink.Communication.Tests/Grpc/StreamRelayActorTests.cs diff --git a/src/ScadaLink.Communication/Actors/StreamRelayActor.cs b/src/ScadaLink.Communication/Actors/StreamRelayActor.cs new file mode 100644 index 0000000..16c1bdb --- /dev/null +++ b/src/ScadaLink.Communication/Actors/StreamRelayActor.cs @@ -0,0 +1,90 @@ +using System.Threading.Channels; +using Akka.Actor; +using Akka.Event; +using Google.Protobuf.WellKnownTypes; +using ScadaLink.Commons.Messages.Streaming; +using ScadaLink.Communication.Grpc; +using AlarmState = ScadaLink.Commons.Types.Enums.AlarmState; + +namespace ScadaLink.Communication.Actors; + +/// +/// Lightweight relay actor that bridges Akka domain events (AttributeValueChanged, +/// AlarmStateChanged) to a System.Threading.Channels.Channel of protobuf SiteStreamEvent +/// messages. The gRPC server method reads from the channel's reader side. +/// +public class StreamRelayActor : ReceiveActor +{ + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly string _correlationId; + private readonly ChannelWriter _channelWriter; + + public StreamRelayActor(string correlationId, ChannelWriter channelWriter) + { + _correlationId = correlationId; + _channelWriter = channelWriter; + + Receive(HandleAttributeValueChanged); + Receive(HandleAlarmStateChanged); + } + + private void HandleAttributeValueChanged(AttributeValueChanged msg) + { + var protoEvent = new SiteStreamEvent + { + CorrelationId = _correlationId, + AttributeChanged = new AttributeValueUpdate + { + InstanceUniqueName = msg.InstanceUniqueName, + AttributePath = msg.AttributePath, + AttributeName = msg.AttributeName, + Value = msg.Value?.ToString() ?? "", + Quality = MapQuality(msg.Quality), + Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp) + } + }; + + WriteToChannel(protoEvent); + } + + private void HandleAlarmStateChanged(AlarmStateChanged msg) + { + var protoEvent = new SiteStreamEvent + { + CorrelationId = _correlationId, + AlarmChanged = new AlarmStateUpdate + { + InstanceUniqueName = msg.InstanceUniqueName, + AlarmName = msg.AlarmName, + State = MapAlarmState(msg.State), + Priority = msg.Priority, + Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp) + } + }; + + WriteToChannel(protoEvent); + } + + private void WriteToChannel(SiteStreamEvent protoEvent) + { + if (!_channelWriter.TryWrite(protoEvent)) + { + _log.Warning("Channel full, dropping event for correlation {0}", _correlationId); + } + } + + private static Quality MapQuality(string quality) => quality switch + { + "Good" => Quality.Good, + "Uncertain" => Quality.Uncertain, + "Bad" => Quality.Bad, + _ => Quality.Unspecified + }; + + private static AlarmStateEnum MapAlarmState(AlarmState state) => state switch + { + AlarmState.Normal => AlarmStateEnum.AlarmStateNormal, + AlarmState.Active => AlarmStateEnum.AlarmStateActive, + _ => AlarmStateEnum.AlarmStateUnspecified + }; +} diff --git a/tests/ScadaLink.Communication.Tests/Grpc/StreamRelayActorTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/StreamRelayActorTests.cs new file mode 100644 index 0000000..546a5da --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/StreamRelayActorTests.cs @@ -0,0 +1,176 @@ +using System.Threading.Channels; +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Google.Protobuf.WellKnownTypes; +using ScadaLink.Commons.Messages.Streaming; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Communication.Actors; +using ScadaLink.Communication.Grpc; +using AlarmState = ScadaLink.Commons.Types.Enums.AlarmState; + +namespace ScadaLink.Communication.Tests.Grpc; + +public class StreamRelayActorTests : TestKit +{ + [Fact] + public void RelaysAttributeValueChanged_ToProtoEvent() + { + var channel = Channel.CreateUnbounded(); + var correlationId = "corr-attr-1"; + var actor = Sys.ActorOf(Props.Create(() => + new StreamRelayActor(correlationId, channel.Writer))); + + var timestamp = new DateTimeOffset(2026, 3, 21, 10, 30, 0, TimeSpan.Zero); + var domainEvent = new AttributeValueChanged( + "Site1.Pump01", "Modules.Pressure", "CurrentPSI", 42.5, "Good", timestamp); + + actor.Tell(domainEvent); + + var success = channel.Reader.TryRead(out var protoEvent); + if (!success) + { + // Give a moment for async processing + Thread.Sleep(500); + success = channel.Reader.TryRead(out protoEvent); + } + + Assert.True(success, "Expected a proto event on the channel"); + Assert.NotNull(protoEvent); + Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, protoEvent.EventCase); + Assert.Equal(correlationId, protoEvent.CorrelationId); + + var attr = protoEvent.AttributeChanged; + Assert.Equal("Site1.Pump01", attr.InstanceUniqueName); + Assert.Equal("Modules.Pressure", attr.AttributePath); + Assert.Equal("CurrentPSI", attr.AttributeName); + Assert.Equal("42.5", attr.Value); + Assert.Equal(Quality.Good, attr.Quality); + Assert.Equal(Timestamp.FromDateTimeOffset(timestamp), attr.Timestamp); + } + + [Fact] + public void RelaysAlarmStateChanged_ToProtoEvent() + { + var channel = Channel.CreateUnbounded(); + var correlationId = "corr-alarm-1"; + var actor = Sys.ActorOf(Props.Create(() => + new StreamRelayActor(correlationId, channel.Writer))); + + var timestamp = new DateTimeOffset(2026, 3, 21, 11, 0, 0, TimeSpan.Zero); + var domainEvent = new AlarmStateChanged( + "Site1.Pump01", "HighPressure", AlarmState.Active, 2, timestamp); + + actor.Tell(domainEvent); + + var success = channel.Reader.TryRead(out var protoEvent); + if (!success) + { + Thread.Sleep(500); + success = channel.Reader.TryRead(out protoEvent); + } + + Assert.True(success, "Expected a proto event on the channel"); + Assert.NotNull(protoEvent); + Assert.Equal(SiteStreamEvent.EventOneofCase.AlarmChanged, protoEvent.EventCase); + Assert.Equal(correlationId, protoEvent.CorrelationId); + + var alarm = protoEvent.AlarmChanged; + Assert.Equal("Site1.Pump01", alarm.InstanceUniqueName); + Assert.Equal("HighPressure", alarm.AlarmName); + Assert.Equal(AlarmStateEnum.AlarmStateActive, alarm.State); + Assert.Equal(2, alarm.Priority); + Assert.Equal(Timestamp.FromDateTimeOffset(timestamp), alarm.Timestamp); + } + + [Fact] + public void SetsCorrelationId_OnAllEvents() + { + var channel = Channel.CreateUnbounded(); + var correlationId = "corr-multi-42"; + var actor = Sys.ActorOf(Props.Create(() => + new StreamRelayActor(correlationId, channel.Writer))); + + var ts = DateTimeOffset.UtcNow; + + actor.Tell(new AttributeValueChanged("Inst1", "Path", "Name", 1, "Good", ts)); + actor.Tell(new AlarmStateChanged("Inst1", "Alarm1", AlarmState.Normal, 1, ts)); + actor.Tell(new AttributeValueChanged("Inst2", "Path2", "Name2", null, "Bad", ts)); + + // Allow messages to process + Thread.Sleep(500); + + var events = new List(); + while (channel.Reader.TryRead(out var evt)) + { + events.Add(evt); + } + + Assert.Equal(3, events.Count); + Assert.All(events, e => Assert.Equal(correlationId, e.CorrelationId)); + } + + [Fact] + public void DropsEvent_WhenChannelFull() + { + var channel = Channel.CreateBounded(new BoundedChannelOptions(1) + { + FullMode = BoundedChannelFullMode.Wait + }); + var correlationId = "corr-drop-1"; + var actor = Sys.ActorOf(Props.Create(() => + new StreamRelayActor(correlationId, channel.Writer))); + + var ts = DateTimeOffset.UtcNow; + + // Fill the channel with one item directly + var filler = new SiteStreamEvent { CorrelationId = "filler" }; + Assert.True(channel.Writer.TryWrite(filler)); + + // Send another event — should be dropped (channel full), no exception + actor.Tell(new AttributeValueChanged("Inst1", "Path", "Name", 1, "Good", ts)); + + // Allow message to process + Thread.Sleep(500); + + // Channel should still have exactly 1 item (the filler) + Assert.True(channel.Reader.TryRead(out var item)); + Assert.Equal("filler", item.CorrelationId); + Assert.False(channel.Reader.TryRead(out _)); + } + + [Theory] + [InlineData("Good", Quality.Good)] + [InlineData("Uncertain", Quality.Uncertain)] + [InlineData("Bad", Quality.Bad)] + [InlineData("Unknown", Quality.Unspecified)] + [InlineData("", Quality.Unspecified)] + [InlineData("good", Quality.Unspecified)] + public void MapsQualityString_ToProtoEnum(string qualityString, Quality expectedProto) + { + var channel = Channel.CreateUnbounded(); + var actor = Sys.ActorOf(Props.Create(() => + new StreamRelayActor("corr", channel.Writer))); + + var ts = DateTimeOffset.UtcNow; + actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", 1, qualityString, ts)); + + Thread.Sleep(500); + Assert.True(channel.Reader.TryRead(out var evt)); + Assert.Equal(expectedProto, evt.AttributeChanged.Quality); + } + + [Fact] + public void NullValue_MapsToEmptyString() + { + var channel = Channel.CreateUnbounded(); + var actor = Sys.ActorOf(Props.Create(() => + new StreamRelayActor("corr", channel.Writer))); + + var ts = DateTimeOffset.UtcNow; + actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", null, "Good", ts)); + + Thread.Sleep(500); + Assert.True(channel.Reader.TryRead(out var evt)); + Assert.Equal("", evt.AttributeChanged.Value); + } +}