refactor: remove ClusterClient streaming path (DebugStreamEvent), events flow via gRPC

This commit is contained in:
Joseph Doherty
2026-03-21 12:18:52 -04:00
parent 2cd43b6992
commit 49f042a937
6 changed files with 27 additions and 90 deletions

View File

@@ -1,8 +0,0 @@
namespace ScadaLink.Commons.Messages.DebugView;
/// <summary>
/// Wraps a debug stream event (AttributeValueChanged or AlarmStateChanged) with
/// the correlationId for routing back to the correct DebugStreamBridgeActor on central.
/// Sent from InstanceActor → SiteCommunicationActor → ClusterClient → CentralCommunicationActor.
/// </summary>
public record DebugStreamEvent(string CorrelationId, object Event);

View File

@@ -86,8 +86,6 @@ public class CentralCommunicationActor : ReceiveActor
// Route enveloped messages to sites // Route enveloped messages to sites
Receive<SiteEnvelope>(HandleSiteEnvelope); Receive<SiteEnvelope>(HandleSiteEnvelope);
// Route debug stream events from sites to the correct bridge actor
Receive<Commons.Messages.DebugView.DebugStreamEvent>(HandleDebugStreamEvent);
} }
private void HandleHeartbeat(HeartbeatMessage heartbeat) private void HandleHeartbeat(HeartbeatMessage heartbeat)
@@ -96,18 +94,6 @@ public class CentralCommunicationActor : ReceiveActor
Context.Parent.Tell(heartbeat); Context.Parent.Tell(heartbeat);
} }
private void HandleDebugStreamEvent(Commons.Messages.DebugView.DebugStreamEvent msg)
{
if (_debugSubscriptions.TryGetValue(msg.CorrelationId, out var entry))
{
entry.Subscriber.Tell(msg.Event);
}
else
{
_log.Debug("No debug subscription found for correlationId {0}, dropping event", msg.CorrelationId);
}
}
private void HandleSiteHealthReport(SiteHealthReport report) private void HandleSiteHealthReport(SiteHealthReport report)
{ {
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>(); var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();

View File

@@ -146,12 +146,6 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
new ClusterClient.Send("/user/central-communication", msg), Self); new ClusterClient.Send("/user/central-communication", msg), Self);
}); });
// Internal: forward debug stream events to central (site→central streaming)
Receive<DebugStreamEvent>(msg =>
{
_centralClient?.Tell(
new ClusterClient.Send("/user/central-communication", msg), Self);
});
} }
protected override void PreStart() protected override void PreStart()

View File

@@ -50,10 +50,6 @@ public class InstanceActor : ReceiveActor
private readonly Dictionary<string, IActorRef> _alarmActors = new(); private readonly Dictionary<string, IActorRef> _alarmActors = new();
private FlattenedConfiguration? _configuration; private FlattenedConfiguration? _configuration;
// WP-25: Debug view subscribers
private readonly HashSet<string> _debugSubscriberCorrelationIds = new();
private ActorSelection? _siteCommActor;
// DCL manager actor reference for subscribing to tag values // DCL manager actor reference for subscribing to tag values
private readonly IActorRef? _dclManager; private readonly IActorRef? _dclManager;
// Maps tag paths back to attribute canonical names for DCL updates // Maps tag paths back to attribute canonical names for DCL updates
@@ -150,9 +146,6 @@ public class InstanceActor : ReceiveActor
base.PreStart(); base.PreStart();
_logger.LogInformation("InstanceActor started for {Instance}", _instanceUniqueName); _logger.LogInformation("InstanceActor started for {Instance}", _instanceUniqueName);
// Resolve SiteCommunicationActor for routing debug stream events back to central
_siteCommActor = Context.ActorSelection("/user/site-communication");
// Asynchronously load static overrides from SQLite and pipe to self // Asynchronously load static overrides from SQLite and pipe to self
var self = Self; var self = Self;
_storage.GetStaticOverridesAsync(_instanceUniqueName).ContinueWith(t => _storage.GetStaticOverridesAsync(_instanceUniqueName).ContinueWith(t =>
@@ -370,12 +363,6 @@ public class InstanceActor : ReceiveActor
// WP-23: Publish to site-wide stream // WP-23: Publish to site-wide stream
_streamManager?.PublishAlarmStateChanged(changed); _streamManager?.PublishAlarmStateChanged(changed);
// Forward to debug subscribers via SiteCommunicationActor → ClusterClient → central
foreach (var correlationId in _debugSubscriberCorrelationIds)
{
_siteCommActor?.Tell(new DebugStreamEvent(correlationId, changed));
}
} }
/// <summary> /// <summary>
@@ -383,9 +370,6 @@ public class InstanceActor : ReceiveActor
/// </summary> /// </summary>
private void HandleSubscribeDebugView(SubscribeDebugViewRequest request) private void HandleSubscribeDebugView(SubscribeDebugViewRequest request)
{ {
var subscriptionId = request.CorrelationId;
_debugSubscriberCorrelationIds.Add(subscriptionId);
// Build snapshot from current state // Build snapshot from current state
var now = DateTimeOffset.UtcNow; var now = DateTimeOffset.UtcNow;
var attributeValues = _attributes.Select(kvp => new AttributeValueChanged( var attributeValues = _attributes.Select(kvp => new AttributeValueChanged(
@@ -412,8 +396,8 @@ public class InstanceActor : ReceiveActor
Sender.Tell(snapshot); Sender.Tell(snapshot);
_logger.LogDebug( _logger.LogDebug(
"Debug view subscriber added for {Instance}, subscriptionId={Id}", "Debug view snapshot sent for {Instance}, correlationId={Id}",
_instanceUniqueName, subscriptionId); _instanceUniqueName, request.CorrelationId);
} }
/// <summary> /// <summary>
@@ -421,10 +405,8 @@ public class InstanceActor : ReceiveActor
/// </summary> /// </summary>
private void HandleUnsubscribeDebugView(UnsubscribeDebugViewRequest request) private void HandleUnsubscribeDebugView(UnsubscribeDebugViewRequest request)
{ {
_debugSubscriberCorrelationIds.Remove(request.CorrelationId);
_logger.LogDebug( _logger.LogDebug(
"Debug view subscriber removed for {Instance}, correlationId={Id}", "Debug view unsubscribe for {Instance}, correlationId={Id}",
_instanceUniqueName, request.CorrelationId); _instanceUniqueName, request.CorrelationId);
} }
@@ -478,12 +460,6 @@ public class InstanceActor : ReceiveActor
{ {
alarmActor.Tell(changed); alarmActor.Tell(changed);
} }
// Forward to debug subscribers via SiteCommunicationActor → ClusterClient → central
foreach (var correlationId in _debugSubscriberCorrelationIds)
{
_siteCommActor?.Tell(new DebugStreamEvent(correlationId, changed));
}
} }
/// <summary> /// <summary>

View File

@@ -136,6 +136,15 @@ public class ArchitecturalConstraintTests
} }
} }
[Fact]
public void DebugStreamEvent_ShouldNotExist()
{
// DebugStreamEvent was removed when debug streaming moved from ClusterClient to gRPC.
// Events now flow via SiteStreamManager → StreamRelayActor → gRPC channel.
var type = CommonsAssembly.GetTypes().FirstOrDefault(t => t.Name == "DebugStreamEvent");
Assert.Null(type);
}
[Fact] [Fact]
public void AllEnums_ShouldBeSingularNamed() public void AllEnums_ShouldBeSingularNamed()
{ {

View File

@@ -41,21 +41,6 @@ public class InstanceActorIntegrationTests : TestKit, IDisposable
ScriptExecutionTimeoutSeconds = 30 ScriptExecutionTimeoutSeconds = 30
}; };
// Create a fake site-communication actor that unwraps DebugStreamEvent
// and forwards the inner event to TestActor (simulating the ClusterClient relay)
Sys.ActorOf(Props.Create(() => new DebugStreamEventForwarder(TestActor)), "site-communication");
}
/// <summary>
/// Test helper: stands in for SiteCommunicationActor, unwraps DebugStreamEvent
/// and forwards the inner event to a target actor.
/// </summary>
private class DebugStreamEventForwarder : ReceiveActor
{
public DebugStreamEventForwarder(IActorRef target)
{
Receive<DebugStreamEvent>(msg => target.Tell(msg.Event));
}
} }
void IDisposable.Dispose() void IDisposable.Dispose()
@@ -169,22 +154,23 @@ public class InstanceActorIntegrationTests : TestKit, IDisposable
} }
[Fact] [Fact]
public void InstanceActor_WP25_DebugViewSubscriber_ReceivesChanges() public void InstanceActor_WP25_DebugViewSubscribe_NoLongerForwardsEventsViaClusterClient()
{ {
// Events now flow via gRPC (SiteStreamManager → StreamRelayActor → gRPC),
// not via ClusterClient. Subscribing returns a snapshot but ongoing events
// are NOT forwarded to the subscriber actor.
var actor = CreateInstanceWithScripts("Pump1"); var actor = CreateInstanceWithScripts("Pump1");
// Subscribe to debug view // Subscribe to debug view — should still get snapshot
actor.Tell(new SubscribeDebugViewRequest("Pump1", "debug-2")); actor.Tell(new SubscribeDebugViewRequest("Pump1", "debug-2"));
ExpectMsg<DebugViewSnapshot>(TimeSpan.FromSeconds(5)); ExpectMsg<DebugViewSnapshot>(TimeSpan.FromSeconds(5));
// Now change an attribute // Change an attribute
actor.Tell(new AttributeValueChanged( actor.Tell(new AttributeValueChanged(
"Pump1", "Temperature", "Temperature", "200", "Good", DateTimeOffset.UtcNow)); "Pump1", "Temperature", "Temperature", "200", "Good", DateTimeOffset.UtcNow));
// The subscriber should receive the change notification // Should NOT receive change notification (old ClusterClient path removed)
var changed = ExpectMsg<AttributeValueChanged>(TimeSpan.FromSeconds(5)); ExpectNoMsg(TimeSpan.FromSeconds(1));
Assert.Equal("Temperature", changed.AttributeName);
Assert.Equal("200", changed.Value?.ToString());
} }
[Fact] [Fact]
@@ -223,20 +209,14 @@ public class InstanceActorIntegrationTests : TestKit, IDisposable
var actor = CreateInstanceWithScripts("Pump1", alarms: alarms); var actor = CreateInstanceWithScripts("Pump1", alarms: alarms);
// Subscribe to debug view to observe alarm state changes // Wait for initialization
actor.Tell(new SubscribeDebugViewRequest("Pump1", "debug-alarm")); Thread.Sleep(500);
ExpectMsg<DebugViewSnapshot>(TimeSpan.FromSeconds(5));
// Send value outside range to trigger alarm // Verify alarm actor was created by checking the debug snapshot includes the alarm
actor.Tell(new AttributeValueChanged( actor.Tell(new DebugSnapshotRequest("Pump1", "snap-alarm"));
"Pump1", "Temperature", "Temperature", "150", "Good", DateTimeOffset.UtcNow)); var snapshot = ExpectMsg<DebugViewSnapshot>(TimeSpan.FromSeconds(5));
// Should receive the attribute change first (from debug subscription) Assert.Single(snapshot.AlarmStates);
ExpectMsg<AttributeValueChanged>(TimeSpan.FromSeconds(5)); Assert.Equal("HighTemp", snapshot.AlarmStates[0].AlarmName);
// Then the alarm state change (forwarded by Instance Actor)
var alarmMsg = ExpectMsg<AlarmStateChanged>(TimeSpan.FromSeconds(5));
Assert.Equal("HighTemp", alarmMsg.AlarmName);
Assert.Equal(Commons.Types.Enums.AlarmState.Active, alarmMsg.State);
} }
} }