Compare commits
2 Commits
9b0a80dcbd
...
55a05914d0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
55a05914d0 | ||
|
|
d70bbbe739 |
90
src/ScadaLink.Communication/Actors/StreamRelayActor.cs
Normal file
90
src/ScadaLink.Communication/Actors/StreamRelayActor.cs
Normal file
@@ -0,0 +1,90 @@
|
||||
using System.Threading.Channels;
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using ScadaLink.Commons.Messages.Streaming;
|
||||
using ScadaLink.Communication.Grpc;
|
||||
using AlarmState = ScadaLink.Commons.Types.Enums.AlarmState;
|
||||
|
||||
namespace ScadaLink.Communication.Actors;
|
||||
|
||||
/// <summary>
|
||||
/// Lightweight relay actor that bridges Akka domain events (AttributeValueChanged,
|
||||
/// AlarmStateChanged) to a System.Threading.Channels.Channel of protobuf SiteStreamEvent
|
||||
/// messages. The gRPC server method reads from the channel's reader side.
|
||||
/// </summary>
|
||||
public class StreamRelayActor : ReceiveActor
|
||||
{
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private readonly string _correlationId;
|
||||
private readonly ChannelWriter<SiteStreamEvent> _channelWriter;
|
||||
|
||||
public StreamRelayActor(string correlationId, ChannelWriter<SiteStreamEvent> channelWriter)
|
||||
{
|
||||
_correlationId = correlationId;
|
||||
_channelWriter = channelWriter;
|
||||
|
||||
Receive<AttributeValueChanged>(HandleAttributeValueChanged);
|
||||
Receive<AlarmStateChanged>(HandleAlarmStateChanged);
|
||||
}
|
||||
|
||||
private void HandleAttributeValueChanged(AttributeValueChanged msg)
|
||||
{
|
||||
var protoEvent = new SiteStreamEvent
|
||||
{
|
||||
CorrelationId = _correlationId,
|
||||
AttributeChanged = new AttributeValueUpdate
|
||||
{
|
||||
InstanceUniqueName = msg.InstanceUniqueName,
|
||||
AttributePath = msg.AttributePath,
|
||||
AttributeName = msg.AttributeName,
|
||||
Value = msg.Value?.ToString() ?? "",
|
||||
Quality = MapQuality(msg.Quality),
|
||||
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
|
||||
}
|
||||
};
|
||||
|
||||
WriteToChannel(protoEvent);
|
||||
}
|
||||
|
||||
private void HandleAlarmStateChanged(AlarmStateChanged msg)
|
||||
{
|
||||
var protoEvent = new SiteStreamEvent
|
||||
{
|
||||
CorrelationId = _correlationId,
|
||||
AlarmChanged = new AlarmStateUpdate
|
||||
{
|
||||
InstanceUniqueName = msg.InstanceUniqueName,
|
||||
AlarmName = msg.AlarmName,
|
||||
State = MapAlarmState(msg.State),
|
||||
Priority = msg.Priority,
|
||||
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
|
||||
}
|
||||
};
|
||||
|
||||
WriteToChannel(protoEvent);
|
||||
}
|
||||
|
||||
private void WriteToChannel(SiteStreamEvent protoEvent)
|
||||
{
|
||||
if (!_channelWriter.TryWrite(protoEvent))
|
||||
{
|
||||
_log.Warning("Channel full, dropping event for correlation {0}", _correlationId);
|
||||
}
|
||||
}
|
||||
|
||||
private static Quality MapQuality(string quality) => quality switch
|
||||
{
|
||||
"Good" => Quality.Good,
|
||||
"Uncertain" => Quality.Uncertain,
|
||||
"Bad" => Quality.Bad,
|
||||
_ => Quality.Unspecified
|
||||
};
|
||||
|
||||
private static AlarmStateEnum MapAlarmState(AlarmState state) => state switch
|
||||
{
|
||||
AlarmState.Normal => AlarmStateEnum.AlarmStateNormal,
|
||||
AlarmState.Active => AlarmStateEnum.AlarmStateActive,
|
||||
_ => AlarmStateEnum.AlarmStateUnspecified
|
||||
};
|
||||
}
|
||||
22
src/ScadaLink.Communication/Grpc/ISiteStreamSubscriber.cs
Normal file
22
src/ScadaLink.Communication/Grpc/ISiteStreamSubscriber.cs
Normal file
@@ -0,0 +1,22 @@
|
||||
using Akka.Actor;
|
||||
|
||||
namespace ScadaLink.Communication.Grpc;
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction over the site-side stream subscription mechanism.
|
||||
/// SiteStreamManager in the SiteRuntime project implements this interface;
|
||||
/// the gRPC server depends on it without referencing SiteRuntime directly.
|
||||
/// </summary>
|
||||
public interface ISiteStreamSubscriber
|
||||
{
|
||||
/// <summary>
|
||||
/// Subscribes an actor to receive filtered stream events for a specific instance.
|
||||
/// </summary>
|
||||
/// <returns>A subscription ID that can be used for unsubscription.</returns>
|
||||
string Subscribe(string instanceName, IActorRef subscriber);
|
||||
|
||||
/// <summary>
|
||||
/// Removes all subscriptions for the given actor.
|
||||
/// </summary>
|
||||
void RemoveSubscriber(IActorRef subscriber);
|
||||
}
|
||||
116
src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs
Normal file
116
src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs
Normal file
@@ -0,0 +1,116 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Threading.Channels;
|
||||
using Akka.Actor;
|
||||
using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using GrpcStatus = Grpc.Core.Status;
|
||||
|
||||
namespace ScadaLink.Communication.Grpc;
|
||||
|
||||
/// <summary>
|
||||
/// gRPC service that accepts instance stream subscriptions from central nodes.
|
||||
/// Creates a StreamRelayActor per subscription to bridge Akka domain events
|
||||
/// through a Channel<T> to the gRPC response stream.
|
||||
/// </summary>
|
||||
public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
{
|
||||
private readonly ISiteStreamSubscriber _streamSubscriber;
|
||||
private readonly ActorSystem _actorSystem;
|
||||
private readonly ILogger<SiteStreamGrpcServer> _logger;
|
||||
private readonly ConcurrentDictionary<string, StreamEntry> _activeStreams = new();
|
||||
private readonly int _maxConcurrentStreams;
|
||||
private volatile bool _ready;
|
||||
private long _actorCounter;
|
||||
|
||||
public SiteStreamGrpcServer(
|
||||
ISiteStreamSubscriber streamSubscriber,
|
||||
ActorSystem actorSystem,
|
||||
ILogger<SiteStreamGrpcServer> logger,
|
||||
int maxConcurrentStreams = 100)
|
||||
{
|
||||
_streamSubscriber = streamSubscriber;
|
||||
_actorSystem = actorSystem;
|
||||
_logger = logger;
|
||||
_maxConcurrentStreams = maxConcurrentStreams;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Marks the server as ready to accept subscriptions.
|
||||
/// Called after the site runtime is fully initialized.
|
||||
/// </summary>
|
||||
public void SetReady() => _ready = true;
|
||||
|
||||
/// <summary>
|
||||
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
|
||||
/// </summary>
|
||||
public int ActiveStreamCount => _activeStreams.Count;
|
||||
|
||||
public override async Task SubscribeInstance(
|
||||
InstanceStreamRequest request,
|
||||
IServerStreamWriter<SiteStreamEvent> responseStream,
|
||||
ServerCallContext context)
|
||||
{
|
||||
if (!_ready)
|
||||
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready"));
|
||||
|
||||
// Duplicate prevention -- cancel existing stream for this correlationId
|
||||
if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry))
|
||||
{
|
||||
existingEntry.Cts.Cancel();
|
||||
existingEntry.Cts.Dispose();
|
||||
}
|
||||
|
||||
// Check max concurrent streams after duplicate removal
|
||||
if (_activeStreams.Count >= _maxConcurrentStreams)
|
||||
throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached"));
|
||||
|
||||
using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
|
||||
var entry = new StreamEntry(streamCts);
|
||||
_activeStreams[request.CorrelationId] = entry;
|
||||
|
||||
var channel = Channel.CreateBounded<SiteStreamEvent>(
|
||||
new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest });
|
||||
|
||||
var actorSeq = Interlocked.Increment(ref _actorCounter);
|
||||
var relayActor = _actorSystem.ActorOf(
|
||||
Props.Create(typeof(Actors.StreamRelayActor), request.CorrelationId, channel.Writer),
|
||||
$"stream-relay-{request.CorrelationId}-{actorSeq}");
|
||||
|
||||
var subscriptionId = _streamSubscriber.Subscribe(request.InstanceUniqueName, relayActor);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
|
||||
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
|
||||
|
||||
try
|
||||
{
|
||||
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
|
||||
{
|
||||
await responseStream.WriteAsync(evt, streamCts.Token);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Normal cancellation (client disconnect or duplicate replacement)
|
||||
}
|
||||
finally
|
||||
{
|
||||
_streamSubscriber.RemoveSubscriber(relayActor);
|
||||
_actorSystem.Stop(relayActor);
|
||||
channel.Writer.TryComplete();
|
||||
|
||||
// Only remove our own entry -- a replacement stream may have already taken the slot
|
||||
_activeStreams.TryRemove(
|
||||
new KeyValuePair<string, StreamEntry>(request.CorrelationId, entry));
|
||||
|
||||
_logger.LogInformation(
|
||||
"Stream {CorrelationId} for {Instance} ended",
|
||||
request.CorrelationId, request.InstanceUniqueName);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tracks a single active stream so cleanup only removes its own entry.
|
||||
/// </summary>
|
||||
private sealed record StreamEntry(CancellationTokenSource Cts);
|
||||
}
|
||||
@@ -0,0 +1,237 @@
|
||||
using System.Threading.Channels;
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NSubstitute;
|
||||
using ScadaLink.Communication.Grpc;
|
||||
|
||||
namespace ScadaLink.Communication.Tests.Grpc;
|
||||
|
||||
public class SiteStreamGrpcServerTests : TestKit
|
||||
{
|
||||
private readonly ISiteStreamSubscriber _subscriber;
|
||||
private readonly ILogger<SiteStreamGrpcServer> _logger;
|
||||
|
||||
public SiteStreamGrpcServerTests()
|
||||
{
|
||||
_subscriber = Substitute.For<ISiteStreamSubscriber>();
|
||||
_subscriber.Subscribe(Arg.Any<string>(), Arg.Any<IActorRef>())
|
||||
.Returns("sub-1");
|
||||
_logger = NullLogger<SiteStreamGrpcServer>.Instance;
|
||||
}
|
||||
|
||||
private SiteStreamGrpcServer CreateServer(int maxStreams = 100)
|
||||
{
|
||||
return new SiteStreamGrpcServer(_subscriber, Sys, _logger, maxStreams);
|
||||
}
|
||||
|
||||
private static InstanceStreamRequest MakeRequest(string correlationId = "corr-1", string instance = "Site1.Pump01")
|
||||
{
|
||||
return new InstanceStreamRequest
|
||||
{
|
||||
CorrelationId = correlationId,
|
||||
InstanceUniqueName = instance
|
||||
};
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RejectsWhenNotReady()
|
||||
{
|
||||
var server = CreateServer();
|
||||
// Do NOT call SetReady()
|
||||
|
||||
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
var context = CreateMockContext();
|
||||
|
||||
var ex = await Assert.ThrowsAsync<RpcException>(
|
||||
() => server.SubscribeInstance(MakeRequest(), writer, context));
|
||||
|
||||
Assert.Equal(StatusCode.Unavailable, ex.StatusCode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RejectsWhenMaxStreamsReached()
|
||||
{
|
||||
var server = CreateServer(maxStreams: 1);
|
||||
server.SetReady();
|
||||
|
||||
// Start one stream that blocks
|
||||
var cts1 = new CancellationTokenSource();
|
||||
var context1 = CreateMockContext(cts1.Token);
|
||||
var writer1 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
|
||||
var stream1Task = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest("corr-1"), writer1, context1));
|
||||
|
||||
// Wait for the first stream to register
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||
|
||||
// Second stream should be rejected
|
||||
var writer2 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
var context2 = CreateMockContext();
|
||||
|
||||
var ex = await Assert.ThrowsAsync<RpcException>(
|
||||
() => server.SubscribeInstance(MakeRequest("corr-2"), writer2, context2));
|
||||
|
||||
Assert.Equal(StatusCode.ResourceExhausted, ex.StatusCode);
|
||||
|
||||
// Clean up first stream
|
||||
cts1.Cancel();
|
||||
await stream1Task;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CancelsDuplicateCorrelationId()
|
||||
{
|
||||
var server = CreateServer();
|
||||
server.SetReady();
|
||||
|
||||
var cts1 = new CancellationTokenSource();
|
||||
var context1 = CreateMockContext(cts1.Token);
|
||||
var writer1 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
|
||||
// Start first stream
|
||||
var stream1Task = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest("corr-dup"), writer1, context1));
|
||||
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||
|
||||
// Start second stream with same correlationId -- should cancel first
|
||||
var cts2 = new CancellationTokenSource();
|
||||
var context2 = CreateMockContext(cts2.Token);
|
||||
var writer2 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
|
||||
var stream2Task = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest("corr-dup"), writer2, context2));
|
||||
|
||||
// First stream should complete (cancelled by duplicate replacement)
|
||||
await stream1Task;
|
||||
|
||||
// Second stream should be active
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||
|
||||
// Clean up
|
||||
cts2.Cancel();
|
||||
await stream2Task;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CleansUpOnCancellation()
|
||||
{
|
||||
var server = CreateServer();
|
||||
server.SetReady();
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
var context = CreateMockContext(cts.Token);
|
||||
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
|
||||
var streamTask = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest("corr-cleanup"), writer, context));
|
||||
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||
|
||||
cts.Cancel();
|
||||
await streamTask;
|
||||
|
||||
Assert.Equal(0, server.ActiveStreamCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscribesAndRemovesFromStreamManager()
|
||||
{
|
||||
var server = CreateServer();
|
||||
server.SetReady();
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
var context = CreateMockContext(cts.Token);
|
||||
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
|
||||
var streamTask = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest("corr-sub", "Site1.Motor01"), writer, context));
|
||||
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||
|
||||
// Verify Subscribe was called
|
||||
_subscriber.Received(1).Subscribe("Site1.Motor01", Arg.Any<IActorRef>());
|
||||
|
||||
cts.Cancel();
|
||||
await streamTask;
|
||||
|
||||
// Verify RemoveSubscriber was called
|
||||
_subscriber.Received(1).RemoveSubscriber(Arg.Any<IActorRef>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WritesEventsToResponseStream()
|
||||
{
|
||||
var server = CreateServer();
|
||||
server.SetReady();
|
||||
|
||||
// Capture the relay actor so we can send it events
|
||||
IActorRef? capturedActor = null;
|
||||
_subscriber.Subscribe(Arg.Any<string>(), Arg.Any<IActorRef>())
|
||||
.Returns(ci =>
|
||||
{
|
||||
capturedActor = ci.Arg<IActorRef>();
|
||||
return "sub-write";
|
||||
});
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
var context = CreateMockContext(cts.Token);
|
||||
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
var writtenEvents = new List<SiteStreamEvent>();
|
||||
writer.WriteAsync(Arg.Any<SiteStreamEvent>(), Arg.Any<CancellationToken>())
|
||||
.Returns(Task.CompletedTask)
|
||||
.AndDoes(ci => writtenEvents.Add(ci.Arg<SiteStreamEvent>()));
|
||||
|
||||
var streamTask = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest("corr-write", "Site1.Pump01"), writer, context));
|
||||
|
||||
await WaitForConditionAsync(() => capturedActor != null);
|
||||
|
||||
// Send a domain event to the relay actor
|
||||
var ts = DateTimeOffset.UtcNow;
|
||||
capturedActor!.Tell(new Commons.Messages.Streaming.AttributeValueChanged(
|
||||
"Site1.Pump01", "Path", "Attr", 99.5, "Good", ts));
|
||||
|
||||
// Wait for event to be written
|
||||
await WaitForConditionAsync(() => writtenEvents.Count >= 1);
|
||||
|
||||
Assert.Single(writtenEvents);
|
||||
Assert.Equal("corr-write", writtenEvents[0].CorrelationId);
|
||||
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, writtenEvents[0].EventCase);
|
||||
|
||||
cts.Cancel();
|
||||
await streamTask;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetReady_AllowsStreamCreation()
|
||||
{
|
||||
var server = CreateServer();
|
||||
// Initially not ready -- just verify the property works
|
||||
server.SetReady();
|
||||
// No assertion needed -- the other tests verify that SetReady enables streaming
|
||||
Assert.Equal(0, server.ActiveStreamCount);
|
||||
}
|
||||
|
||||
private static ServerCallContext CreateMockContext(CancellationToken cancellationToken = default)
|
||||
{
|
||||
var context = Substitute.For<ServerCallContext>();
|
||||
context.CancellationToken.Returns(cancellationToken);
|
||||
return context;
|
||||
}
|
||||
|
||||
private static async Task WaitForConditionAsync(Func<bool> condition, int timeoutMs = 5000)
|
||||
{
|
||||
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
|
||||
while (!condition() && DateTime.UtcNow < deadline)
|
||||
{
|
||||
await Task.Delay(25);
|
||||
}
|
||||
|
||||
Assert.True(condition(), $"Condition not met within {timeoutMs}ms");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,176 @@
|
||||
using System.Threading.Channels;
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using ScadaLink.Commons.Messages.Streaming;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.Communication.Actors;
|
||||
using ScadaLink.Communication.Grpc;
|
||||
using AlarmState = ScadaLink.Commons.Types.Enums.AlarmState;
|
||||
|
||||
namespace ScadaLink.Communication.Tests.Grpc;
|
||||
|
||||
public class StreamRelayActorTests : TestKit
|
||||
{
|
||||
[Fact]
|
||||
public void RelaysAttributeValueChanged_ToProtoEvent()
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||
var correlationId = "corr-attr-1";
|
||||
var actor = Sys.ActorOf(Props.Create(() =>
|
||||
new StreamRelayActor(correlationId, channel.Writer)));
|
||||
|
||||
var timestamp = new DateTimeOffset(2026, 3, 21, 10, 30, 0, TimeSpan.Zero);
|
||||
var domainEvent = new AttributeValueChanged(
|
||||
"Site1.Pump01", "Modules.Pressure", "CurrentPSI", 42.5, "Good", timestamp);
|
||||
|
||||
actor.Tell(domainEvent);
|
||||
|
||||
var success = channel.Reader.TryRead(out var protoEvent);
|
||||
if (!success)
|
||||
{
|
||||
// Give a moment for async processing
|
||||
Thread.Sleep(500);
|
||||
success = channel.Reader.TryRead(out protoEvent);
|
||||
}
|
||||
|
||||
Assert.True(success, "Expected a proto event on the channel");
|
||||
Assert.NotNull(protoEvent);
|
||||
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, protoEvent.EventCase);
|
||||
Assert.Equal(correlationId, protoEvent.CorrelationId);
|
||||
|
||||
var attr = protoEvent.AttributeChanged;
|
||||
Assert.Equal("Site1.Pump01", attr.InstanceUniqueName);
|
||||
Assert.Equal("Modules.Pressure", attr.AttributePath);
|
||||
Assert.Equal("CurrentPSI", attr.AttributeName);
|
||||
Assert.Equal("42.5", attr.Value);
|
||||
Assert.Equal(Quality.Good, attr.Quality);
|
||||
Assert.Equal(Timestamp.FromDateTimeOffset(timestamp), attr.Timestamp);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RelaysAlarmStateChanged_ToProtoEvent()
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||
var correlationId = "corr-alarm-1";
|
||||
var actor = Sys.ActorOf(Props.Create(() =>
|
||||
new StreamRelayActor(correlationId, channel.Writer)));
|
||||
|
||||
var timestamp = new DateTimeOffset(2026, 3, 21, 11, 0, 0, TimeSpan.Zero);
|
||||
var domainEvent = new AlarmStateChanged(
|
||||
"Site1.Pump01", "HighPressure", AlarmState.Active, 2, timestamp);
|
||||
|
||||
actor.Tell(domainEvent);
|
||||
|
||||
var success = channel.Reader.TryRead(out var protoEvent);
|
||||
if (!success)
|
||||
{
|
||||
Thread.Sleep(500);
|
||||
success = channel.Reader.TryRead(out protoEvent);
|
||||
}
|
||||
|
||||
Assert.True(success, "Expected a proto event on the channel");
|
||||
Assert.NotNull(protoEvent);
|
||||
Assert.Equal(SiteStreamEvent.EventOneofCase.AlarmChanged, protoEvent.EventCase);
|
||||
Assert.Equal(correlationId, protoEvent.CorrelationId);
|
||||
|
||||
var alarm = protoEvent.AlarmChanged;
|
||||
Assert.Equal("Site1.Pump01", alarm.InstanceUniqueName);
|
||||
Assert.Equal("HighPressure", alarm.AlarmName);
|
||||
Assert.Equal(AlarmStateEnum.AlarmStateActive, alarm.State);
|
||||
Assert.Equal(2, alarm.Priority);
|
||||
Assert.Equal(Timestamp.FromDateTimeOffset(timestamp), alarm.Timestamp);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetsCorrelationId_OnAllEvents()
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||
var correlationId = "corr-multi-42";
|
||||
var actor = Sys.ActorOf(Props.Create(() =>
|
||||
new StreamRelayActor(correlationId, channel.Writer)));
|
||||
|
||||
var ts = DateTimeOffset.UtcNow;
|
||||
|
||||
actor.Tell(new AttributeValueChanged("Inst1", "Path", "Name", 1, "Good", ts));
|
||||
actor.Tell(new AlarmStateChanged("Inst1", "Alarm1", AlarmState.Normal, 1, ts));
|
||||
actor.Tell(new AttributeValueChanged("Inst2", "Path2", "Name2", null, "Bad", ts));
|
||||
|
||||
// Allow messages to process
|
||||
Thread.Sleep(500);
|
||||
|
||||
var events = new List<SiteStreamEvent>();
|
||||
while (channel.Reader.TryRead(out var evt))
|
||||
{
|
||||
events.Add(evt);
|
||||
}
|
||||
|
||||
Assert.Equal(3, events.Count);
|
||||
Assert.All(events, e => Assert.Equal(correlationId, e.CorrelationId));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DropsEvent_WhenChannelFull()
|
||||
{
|
||||
var channel = Channel.CreateBounded<SiteStreamEvent>(new BoundedChannelOptions(1)
|
||||
{
|
||||
FullMode = BoundedChannelFullMode.Wait
|
||||
});
|
||||
var correlationId = "corr-drop-1";
|
||||
var actor = Sys.ActorOf(Props.Create(() =>
|
||||
new StreamRelayActor(correlationId, channel.Writer)));
|
||||
|
||||
var ts = DateTimeOffset.UtcNow;
|
||||
|
||||
// Fill the channel with one item directly
|
||||
var filler = new SiteStreamEvent { CorrelationId = "filler" };
|
||||
Assert.True(channel.Writer.TryWrite(filler));
|
||||
|
||||
// Send another event — should be dropped (channel full), no exception
|
||||
actor.Tell(new AttributeValueChanged("Inst1", "Path", "Name", 1, "Good", ts));
|
||||
|
||||
// Allow message to process
|
||||
Thread.Sleep(500);
|
||||
|
||||
// Channel should still have exactly 1 item (the filler)
|
||||
Assert.True(channel.Reader.TryRead(out var item));
|
||||
Assert.Equal("filler", item.CorrelationId);
|
||||
Assert.False(channel.Reader.TryRead(out _));
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("Good", Quality.Good)]
|
||||
[InlineData("Uncertain", Quality.Uncertain)]
|
||||
[InlineData("Bad", Quality.Bad)]
|
||||
[InlineData("Unknown", Quality.Unspecified)]
|
||||
[InlineData("", Quality.Unspecified)]
|
||||
[InlineData("good", Quality.Unspecified)]
|
||||
public void MapsQualityString_ToProtoEnum(string qualityString, Quality expectedProto)
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||
var actor = Sys.ActorOf(Props.Create(() =>
|
||||
new StreamRelayActor("corr", channel.Writer)));
|
||||
|
||||
var ts = DateTimeOffset.UtcNow;
|
||||
actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", 1, qualityString, ts));
|
||||
|
||||
Thread.Sleep(500);
|
||||
Assert.True(channel.Reader.TryRead(out var evt));
|
||||
Assert.Equal(expectedProto, evt.AttributeChanged.Quality);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NullValue_MapsToEmptyString()
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<SiteStreamEvent>();
|
||||
var actor = Sys.ActorOf(Props.Create(() =>
|
||||
new StreamRelayActor("corr", channel.Writer)));
|
||||
|
||||
var ts = DateTimeOffset.UtcNow;
|
||||
actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", null, "Good", ts));
|
||||
|
||||
Thread.Sleep(500);
|
||||
Assert.True(channel.Reader.TryRead(out var evt));
|
||||
Assert.Equal("", evt.AttributeChanged.Value);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user