feat: add StreamRelayActor bridging Akka events to gRPC proto channel
This commit is contained in:
90
src/ScadaLink.Communication/Actors/StreamRelayActor.cs
Normal file
90
src/ScadaLink.Communication/Actors/StreamRelayActor.cs
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
using System.Threading.Channels;
|
||||||
|
using Akka.Actor;
|
||||||
|
using Akka.Event;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using ScadaLink.Commons.Messages.Streaming;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
using AlarmState = ScadaLink.Commons.Types.Enums.AlarmState;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Actors;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Lightweight relay actor that bridges Akka domain events (AttributeValueChanged,
|
||||||
|
/// AlarmStateChanged) to a System.Threading.Channels.Channel of protobuf SiteStreamEvent
|
||||||
|
/// messages. The gRPC server method reads from the channel's reader side.
|
||||||
|
/// </summary>
|
||||||
|
public class StreamRelayActor : ReceiveActor
|
||||||
|
{
|
||||||
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
|
private readonly string _correlationId;
|
||||||
|
private readonly ChannelWriter<SiteStreamEvent> _channelWriter;
|
||||||
|
|
||||||
|
public StreamRelayActor(string correlationId, ChannelWriter<SiteStreamEvent> channelWriter)
|
||||||
|
{
|
||||||
|
_correlationId = correlationId;
|
||||||
|
_channelWriter = channelWriter;
|
||||||
|
|
||||||
|
Receive<AttributeValueChanged>(HandleAttributeValueChanged);
|
||||||
|
Receive<AlarmStateChanged>(HandleAlarmStateChanged);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleAttributeValueChanged(AttributeValueChanged msg)
|
||||||
|
{
|
||||||
|
var protoEvent = new SiteStreamEvent
|
||||||
|
{
|
||||||
|
CorrelationId = _correlationId,
|
||||||
|
AttributeChanged = new AttributeValueUpdate
|
||||||
|
{
|
||||||
|
InstanceUniqueName = msg.InstanceUniqueName,
|
||||||
|
AttributePath = msg.AttributePath,
|
||||||
|
AttributeName = msg.AttributeName,
|
||||||
|
Value = msg.Value?.ToString() ?? "",
|
||||||
|
Quality = MapQuality(msg.Quality),
|
||||||
|
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
WriteToChannel(protoEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleAlarmStateChanged(AlarmStateChanged msg)
|
||||||
|
{
|
||||||
|
var protoEvent = new SiteStreamEvent
|
||||||
|
{
|
||||||
|
CorrelationId = _correlationId,
|
||||||
|
AlarmChanged = new AlarmStateUpdate
|
||||||
|
{
|
||||||
|
InstanceUniqueName = msg.InstanceUniqueName,
|
||||||
|
AlarmName = msg.AlarmName,
|
||||||
|
State = MapAlarmState(msg.State),
|
||||||
|
Priority = msg.Priority,
|
||||||
|
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
WriteToChannel(protoEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void WriteToChannel(SiteStreamEvent protoEvent)
|
||||||
|
{
|
||||||
|
if (!_channelWriter.TryWrite(protoEvent))
|
||||||
|
{
|
||||||
|
_log.Warning("Channel full, dropping event for correlation {0}", _correlationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Quality MapQuality(string quality) => quality switch
|
||||||
|
{
|
||||||
|
"Good" => Quality.Good,
|
||||||
|
"Uncertain" => Quality.Uncertain,
|
||||||
|
"Bad" => Quality.Bad,
|
||||||
|
_ => Quality.Unspecified
|
||||||
|
};
|
||||||
|
|
||||||
|
private static AlarmStateEnum MapAlarmState(AlarmState state) => state switch
|
||||||
|
{
|
||||||
|
AlarmState.Normal => AlarmStateEnum.AlarmStateNormal,
|
||||||
|
AlarmState.Active => AlarmStateEnum.AlarmStateActive,
|
||||||
|
_ => AlarmStateEnum.AlarmStateUnspecified
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -0,0 +1,176 @@
|
|||||||
|
using System.Threading.Channels;
|
||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using ScadaLink.Commons.Messages.Streaming;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.Communication.Actors;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
using AlarmState = ScadaLink.Commons.Types.Enums.AlarmState;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Tests.Grpc;
|
||||||
|
|
||||||
|
public class StreamRelayActorTests : TestKit
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void RelaysAttributeValueChanged_ToProtoEvent()
|
||||||
|
{
|
||||||
|
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||||
|
var correlationId = "corr-attr-1";
|
||||||
|
var actor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new StreamRelayActor(correlationId, channel.Writer)));
|
||||||
|
|
||||||
|
var timestamp = new DateTimeOffset(2026, 3, 21, 10, 30, 0, TimeSpan.Zero);
|
||||||
|
var domainEvent = new AttributeValueChanged(
|
||||||
|
"Site1.Pump01", "Modules.Pressure", "CurrentPSI", 42.5, "Good", timestamp);
|
||||||
|
|
||||||
|
actor.Tell(domainEvent);
|
||||||
|
|
||||||
|
var success = channel.Reader.TryRead(out var protoEvent);
|
||||||
|
if (!success)
|
||||||
|
{
|
||||||
|
// Give a moment for async processing
|
||||||
|
Thread.Sleep(500);
|
||||||
|
success = channel.Reader.TryRead(out protoEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.True(success, "Expected a proto event on the channel");
|
||||||
|
Assert.NotNull(protoEvent);
|
||||||
|
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, protoEvent.EventCase);
|
||||||
|
Assert.Equal(correlationId, protoEvent.CorrelationId);
|
||||||
|
|
||||||
|
var attr = protoEvent.AttributeChanged;
|
||||||
|
Assert.Equal("Site1.Pump01", attr.InstanceUniqueName);
|
||||||
|
Assert.Equal("Modules.Pressure", attr.AttributePath);
|
||||||
|
Assert.Equal("CurrentPSI", attr.AttributeName);
|
||||||
|
Assert.Equal("42.5", attr.Value);
|
||||||
|
Assert.Equal(Quality.Good, attr.Quality);
|
||||||
|
Assert.Equal(Timestamp.FromDateTimeOffset(timestamp), attr.Timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RelaysAlarmStateChanged_ToProtoEvent()
|
||||||
|
{
|
||||||
|
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||||
|
var correlationId = "corr-alarm-1";
|
||||||
|
var actor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new StreamRelayActor(correlationId, channel.Writer)));
|
||||||
|
|
||||||
|
var timestamp = new DateTimeOffset(2026, 3, 21, 11, 0, 0, TimeSpan.Zero);
|
||||||
|
var domainEvent = new AlarmStateChanged(
|
||||||
|
"Site1.Pump01", "HighPressure", AlarmState.Active, 2, timestamp);
|
||||||
|
|
||||||
|
actor.Tell(domainEvent);
|
||||||
|
|
||||||
|
var success = channel.Reader.TryRead(out var protoEvent);
|
||||||
|
if (!success)
|
||||||
|
{
|
||||||
|
Thread.Sleep(500);
|
||||||
|
success = channel.Reader.TryRead(out protoEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.True(success, "Expected a proto event on the channel");
|
||||||
|
Assert.NotNull(protoEvent);
|
||||||
|
Assert.Equal(SiteStreamEvent.EventOneofCase.AlarmChanged, protoEvent.EventCase);
|
||||||
|
Assert.Equal(correlationId, protoEvent.CorrelationId);
|
||||||
|
|
||||||
|
var alarm = protoEvent.AlarmChanged;
|
||||||
|
Assert.Equal("Site1.Pump01", alarm.InstanceUniqueName);
|
||||||
|
Assert.Equal("HighPressure", alarm.AlarmName);
|
||||||
|
Assert.Equal(AlarmStateEnum.AlarmStateActive, alarm.State);
|
||||||
|
Assert.Equal(2, alarm.Priority);
|
||||||
|
Assert.Equal(Timestamp.FromDateTimeOffset(timestamp), alarm.Timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void SetsCorrelationId_OnAllEvents()
|
||||||
|
{
|
||||||
|
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||||
|
var correlationId = "corr-multi-42";
|
||||||
|
var actor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new StreamRelayActor(correlationId, channel.Writer)));
|
||||||
|
|
||||||
|
var ts = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
actor.Tell(new AttributeValueChanged("Inst1", "Path", "Name", 1, "Good", ts));
|
||||||
|
actor.Tell(new AlarmStateChanged("Inst1", "Alarm1", AlarmState.Normal, 1, ts));
|
||||||
|
actor.Tell(new AttributeValueChanged("Inst2", "Path2", "Name2", null, "Bad", ts));
|
||||||
|
|
||||||
|
// Allow messages to process
|
||||||
|
Thread.Sleep(500);
|
||||||
|
|
||||||
|
var events = new List<SiteStreamEvent>();
|
||||||
|
while (channel.Reader.TryRead(out var evt))
|
||||||
|
{
|
||||||
|
events.Add(evt);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Equal(3, events.Count);
|
||||||
|
Assert.All(events, e => Assert.Equal(correlationId, e.CorrelationId));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DropsEvent_WhenChannelFull()
|
||||||
|
{
|
||||||
|
var channel = Channel.CreateBounded<SiteStreamEvent>(new BoundedChannelOptions(1)
|
||||||
|
{
|
||||||
|
FullMode = BoundedChannelFullMode.Wait
|
||||||
|
});
|
||||||
|
var correlationId = "corr-drop-1";
|
||||||
|
var actor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new StreamRelayActor(correlationId, channel.Writer)));
|
||||||
|
|
||||||
|
var ts = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
// Fill the channel with one item directly
|
||||||
|
var filler = new SiteStreamEvent { CorrelationId = "filler" };
|
||||||
|
Assert.True(channel.Writer.TryWrite(filler));
|
||||||
|
|
||||||
|
// Send another event — should be dropped (channel full), no exception
|
||||||
|
actor.Tell(new AttributeValueChanged("Inst1", "Path", "Name", 1, "Good", ts));
|
||||||
|
|
||||||
|
// Allow message to process
|
||||||
|
Thread.Sleep(500);
|
||||||
|
|
||||||
|
// Channel should still have exactly 1 item (the filler)
|
||||||
|
Assert.True(channel.Reader.TryRead(out var item));
|
||||||
|
Assert.Equal("filler", item.CorrelationId);
|
||||||
|
Assert.False(channel.Reader.TryRead(out _));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData("Good", Quality.Good)]
|
||||||
|
[InlineData("Uncertain", Quality.Uncertain)]
|
||||||
|
[InlineData("Bad", Quality.Bad)]
|
||||||
|
[InlineData("Unknown", Quality.Unspecified)]
|
||||||
|
[InlineData("", Quality.Unspecified)]
|
||||||
|
[InlineData("good", Quality.Unspecified)]
|
||||||
|
public void MapsQualityString_ToProtoEnum(string qualityString, Quality expectedProto)
|
||||||
|
{
|
||||||
|
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||||
|
var actor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new StreamRelayActor("corr", channel.Writer)));
|
||||||
|
|
||||||
|
var ts = DateTimeOffset.UtcNow;
|
||||||
|
actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", 1, qualityString, ts));
|
||||||
|
|
||||||
|
Thread.Sleep(500);
|
||||||
|
Assert.True(channel.Reader.TryRead(out var evt));
|
||||||
|
Assert.Equal(expectedProto, evt.AttributeChanged.Quality);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NullValue_MapsToEmptyString()
|
||||||
|
{
|
||||||
|
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||||
|
var actor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new StreamRelayActor("corr", channel.Writer)));
|
||||||
|
|
||||||
|
var ts = DateTimeOffset.UtcNow;
|
||||||
|
actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", null, "Good", ts));
|
||||||
|
|
||||||
|
Thread.Sleep(500);
|
||||||
|
Assert.True(channel.Reader.TryRead(out var evt));
|
||||||
|
Assert.Equal("", evt.AttributeChanged.Value);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user