Communication Layer (WP-1–5): - 8 message patterns with correlation IDs, per-pattern timeouts - Central/Site communication actors, transport heartbeat config - Connection failure handling (no central buffering, debug streams killed) Data Connection Layer (WP-6–14, WP-34): - Connection actor with Become/Stash lifecycle (Connecting/Connected/Reconnecting) - OPC UA + LmxProxy adapters behind IDataConnection - Auto-reconnect, bad quality propagation, transparent re-subscribe - Write-back, tag path resolution with retry, health reporting - Protocol extensibility via DataConnectionFactory Site Runtime (WP-15–25, WP-32–33): - ScriptActor/ScriptExecutionActor (triggers, concurrent execution, blocking I/O dispatcher) - AlarmActor/AlarmExecutionActor (ValueMatch/RangeViolation/RateOfChange, in-memory state) - SharedScriptLibrary (inline execution), ScriptRuntimeContext (API) - ScriptCompilationService (Roslyn, forbidden API enforcement, execution timeout) - Recursion limit (default 10), call direction enforcement - SiteStreamManager (per-subscriber bounded buffers, fire-and-forget) - Debug view backend (snapshot + stream), concurrency serialization - Local artifact storage (4 SQLite tables) Health Monitoring (WP-26–28): - SiteHealthCollector (thread-safe counters, connection state) - HealthReportSender (30s interval, monotonic sequence numbers) - CentralHealthAggregator (offline detection 60s, online recovery) Site Event Logging (WP-29–31): - SiteEventLogger (SQLite, 6 event categories, ISO 8601 UTC) - EventLogPurgeService (30-day retention, 1GB cap) - EventLogQueryService (filters, keyword search, keyset pagination) 541 tests pass, zero warnings.
119 lines
3.7 KiB
C#
119 lines
3.7 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ScadaLink.Commons.Messages.Streaming;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.SiteRuntime.Streaming;
|
|
|
|
namespace ScadaLink.SiteRuntime.Tests.Streaming;
|
|
|
|
/// <summary>
|
|
/// WP-23: Site-Wide Akka Stream tests.
|
|
/// WP-25: Debug View Backend tests (subscribe/unsubscribe).
|
|
/// </summary>
|
|
public class SiteStreamManagerTests : TestKit, IDisposable
|
|
{
|
|
private readonly SiteStreamManager _streamManager;
|
|
|
|
public SiteStreamManagerTests()
|
|
{
|
|
var options = new SiteRuntimeOptions { StreamBufferSize = 100 };
|
|
_streamManager = new SiteStreamManager(
|
|
Sys, options, NullLogger<SiteStreamManager>.Instance);
|
|
_streamManager.Initialize();
|
|
}
|
|
|
|
void IDisposable.Dispose()
|
|
{
|
|
Shutdown();
|
|
}
|
|
|
|
[Fact]
|
|
public void Subscribe_CreatesSubscription()
|
|
{
|
|
var probe = CreateTestProbe();
|
|
var id = _streamManager.Subscribe("Pump1", probe.Ref);
|
|
|
|
Assert.NotNull(id);
|
|
Assert.Equal(1, _streamManager.SubscriptionCount);
|
|
}
|
|
|
|
[Fact]
|
|
public void Unsubscribe_RemovesSubscription()
|
|
{
|
|
var probe = CreateTestProbe();
|
|
var id = _streamManager.Subscribe("Pump1", probe.Ref);
|
|
|
|
Assert.True(_streamManager.Unsubscribe(id));
|
|
Assert.Equal(0, _streamManager.SubscriptionCount);
|
|
}
|
|
|
|
[Fact]
|
|
public void Unsubscribe_InvalidId_ReturnsFalse()
|
|
{
|
|
Assert.False(_streamManager.Unsubscribe("nonexistent"));
|
|
}
|
|
|
|
[Fact]
|
|
public void PublishAttributeValueChanged_ForwardsToSubscriber()
|
|
{
|
|
var probe = CreateTestProbe();
|
|
_streamManager.Subscribe("Pump1", probe.Ref);
|
|
|
|
var changed = new AttributeValueChanged(
|
|
"Pump1", "Temperature", "Temperature", "100", "Good", DateTimeOffset.UtcNow);
|
|
_streamManager.PublishAttributeValueChanged(changed);
|
|
|
|
var received = probe.ExpectMsg<AttributeValueChanged>(TimeSpan.FromSeconds(3));
|
|
Assert.Equal("Pump1", received.InstanceUniqueName);
|
|
Assert.Equal("Temperature", received.AttributeName);
|
|
}
|
|
|
|
[Fact]
|
|
public void PublishAlarmStateChanged_ForwardsToSubscriber()
|
|
{
|
|
var probe = CreateTestProbe();
|
|
_streamManager.Subscribe("Pump1", probe.Ref);
|
|
|
|
var changed = new AlarmStateChanged(
|
|
"Pump1", "HighTemp", AlarmState.Active, 1, DateTimeOffset.UtcNow);
|
|
_streamManager.PublishAlarmStateChanged(changed);
|
|
|
|
var received = probe.ExpectMsg<AlarmStateChanged>(TimeSpan.FromSeconds(3));
|
|
Assert.Equal("Pump1", received.InstanceUniqueName);
|
|
Assert.Equal(AlarmState.Active, received.State);
|
|
}
|
|
|
|
[Fact]
|
|
public void PublishAttributeValueChanged_FiltersbyInstance()
|
|
{
|
|
var probe1 = CreateTestProbe();
|
|
var probe2 = CreateTestProbe();
|
|
_streamManager.Subscribe("Pump1", probe1.Ref);
|
|
_streamManager.Subscribe("Pump2", probe2.Ref);
|
|
|
|
var changed = new AttributeValueChanged(
|
|
"Pump1", "Temperature", "Temperature", "100", "Good", DateTimeOffset.UtcNow);
|
|
_streamManager.PublishAttributeValueChanged(changed);
|
|
|
|
// Pump1 subscriber should receive
|
|
probe1.ExpectMsg<AttributeValueChanged>(TimeSpan.FromSeconds(3));
|
|
|
|
// Pump2 subscriber should NOT receive
|
|
probe2.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
|
}
|
|
|
|
[Fact]
|
|
public void RemoveSubscriber_RemovesAllSubscriptionsForActor()
|
|
{
|
|
var probe = CreateTestProbe();
|
|
_streamManager.Subscribe("Pump1", probe.Ref);
|
|
_streamManager.Subscribe("Pump2", probe.Ref);
|
|
|
|
Assert.Equal(2, _streamManager.SubscriptionCount);
|
|
|
|
_streamManager.RemoveSubscriber(probe.Ref);
|
|
Assert.Equal(0, _streamManager.SubscriptionCount);
|
|
}
|
|
}
|