using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Messages.Streaming; using ScadaLink.Commons.Types.Enums; using ScadaLink.SiteRuntime.Streaming; namespace ScadaLink.SiteRuntime.Tests.Streaming; /// /// WP-23: Site-Wide Akka Stream tests. /// WP-25: Debug View Backend tests (subscribe/unsubscribe). /// public class SiteStreamManagerTests : TestKit, IDisposable { private readonly SiteStreamManager _streamManager; public SiteStreamManagerTests() { var options = new SiteRuntimeOptions { StreamBufferSize = 100 }; _streamManager = new SiteStreamManager( Sys, options, NullLogger.Instance); _streamManager.Initialize(); } void IDisposable.Dispose() { Shutdown(); } [Fact] public void Subscribe_CreatesSubscription() { var probe = CreateTestProbe(); var id = _streamManager.Subscribe("Pump1", probe.Ref); Assert.NotNull(id); Assert.Equal(1, _streamManager.SubscriptionCount); } [Fact] public void Unsubscribe_RemovesSubscription() { var probe = CreateTestProbe(); var id = _streamManager.Subscribe("Pump1", probe.Ref); Assert.True(_streamManager.Unsubscribe(id)); Assert.Equal(0, _streamManager.SubscriptionCount); } [Fact] public void Unsubscribe_InvalidId_ReturnsFalse() { Assert.False(_streamManager.Unsubscribe("nonexistent")); } [Fact] public void PublishAttributeValueChanged_ForwardsToSubscriber() { var probe = CreateTestProbe(); _streamManager.Subscribe("Pump1", probe.Ref); var changed = new AttributeValueChanged( "Pump1", "Temperature", "Temperature", "100", "Good", DateTimeOffset.UtcNow); _streamManager.PublishAttributeValueChanged(changed); var received = probe.ExpectMsg(TimeSpan.FromSeconds(3)); Assert.Equal("Pump1", received.InstanceUniqueName); Assert.Equal("Temperature", received.AttributeName); } [Fact] public void PublishAlarmStateChanged_ForwardsToSubscriber() { var probe = CreateTestProbe(); _streamManager.Subscribe("Pump1", probe.Ref); var changed = new AlarmStateChanged( "Pump1", "HighTemp", AlarmState.Active, 1, DateTimeOffset.UtcNow); _streamManager.PublishAlarmStateChanged(changed); var received = probe.ExpectMsg(TimeSpan.FromSeconds(3)); Assert.Equal("Pump1", received.InstanceUniqueName); Assert.Equal(AlarmState.Active, received.State); } [Fact] public void PublishAttributeValueChanged_FiltersbyInstance() { var probe1 = CreateTestProbe(); var probe2 = CreateTestProbe(); _streamManager.Subscribe("Pump1", probe1.Ref); _streamManager.Subscribe("Pump2", probe2.Ref); var changed = new AttributeValueChanged( "Pump1", "Temperature", "Temperature", "100", "Good", DateTimeOffset.UtcNow); _streamManager.PublishAttributeValueChanged(changed); // Pump1 subscriber should receive probe1.ExpectMsg(TimeSpan.FromSeconds(3)); // Pump2 subscriber should NOT receive probe2.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } [Fact] public void RemoveSubscriber_RemovesAllSubscriptionsForActor() { var probe = CreateTestProbe(); _streamManager.Subscribe("Pump1", probe.Ref); _streamManager.Subscribe("Pump2", probe.Ref); Assert.Equal(2, _streamManager.SubscriptionCount); _streamManager.RemoveSubscriber(probe.Ref); Assert.Equal(0, _streamManager.SubscriptionCount); } }