diff --git a/src/ScadaLink.Commons/Messages/DataConnection/CreateConnectionCommand.cs b/src/ScadaLink.Commons/Messages/DataConnection/CreateConnectionCommand.cs new file mode 100644 index 0000000..2026ae3 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/DataConnection/CreateConnectionCommand.cs @@ -0,0 +1,10 @@ +namespace ScadaLink.Commons.Messages.DataConnection; + +/// +/// Command to create a new data connection actor for a specific protocol. +/// Sent from DeploymentManagerActor to DCL Manager Actor. +/// +public record CreateConnectionCommand( + string ConnectionName, + string ProtocolType, + IDictionary ConnectionDetails); diff --git a/src/ScadaLink.Commons/Types/Flattening/FlattenedConfiguration.cs b/src/ScadaLink.Commons/Types/Flattening/FlattenedConfiguration.cs index c68e499..16f8bec 100644 --- a/src/ScadaLink.Commons/Types/Flattening/FlattenedConfiguration.cs +++ b/src/ScadaLink.Commons/Types/Flattening/FlattenedConfiguration.cs @@ -17,6 +17,22 @@ public sealed record FlattenedConfiguration public IReadOnlyList Alarms { get; init; } = []; public IReadOnlyList Scripts { get; init; } = []; public DateTimeOffset GeneratedAtUtc { get; init; } = DateTimeOffset.UtcNow; + + /// + /// Connection configurations keyed by connection name, each containing the + /// protocol-specific settings (e.g. OPC UA endpoint, publish interval). + /// Populated during flattening from the instance's connection bindings. + /// + public IReadOnlyDictionary? Connections { get; init; } +} + +/// +/// Connection configuration included in the flattened deployment package. +/// +public sealed record ConnectionConfig +{ + public string Protocol { get; init; } = string.Empty; + public string? ConfigurationJson { get; init; } } /// diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs index 872ef57..b1856ad 100644 --- a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs +++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs @@ -123,14 +123,6 @@ public class DataConnectionManagerActor : ReceiveActor } } -/// -/// Command to create a new data connection actor for a specific protocol. -/// -public record CreateConnectionCommand( - string ConnectionName, - string ProtocolType, - IDictionary ConnectionDetails); - /// /// Command to remove a data connection actor. /// diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 3af7958..4bd5b2c 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -196,6 +196,20 @@ akka {{ var dmLogger = _serviceProvider.GetRequiredService() .CreateLogger(); + // WP-34: Create DCL Manager Actor for tag subscriptions + var dclFactory = _serviceProvider.GetService(); + var dclOptions = _serviceProvider.GetService>()?.Value + ?? new ScadaLink.DataConnectionLayer.DataConnectionOptions(); + IActorRef? dclManager = null; + if (dclFactory != null) + { + dclManager = _actorSystem!.ActorOf( + Props.Create(() => new ScadaLink.DataConnectionLayer.Actors.DataConnectionManagerActor( + dclFactory, dclOptions)), + "dcl-manager"); + _logger.LogInformation("Data Connection Layer manager actor created"); + } + // Create the Deployment Manager as a cluster singleton var singletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new DeploymentManagerActor( @@ -204,7 +218,8 @@ akka {{ sharedScriptLibrary, streamManager, siteRuntimeOptionsValue, - dmLogger)), + dmLogger, + dclManager)), terminationMessage: PoisonPill.Instance, settings: ClusterSingletonManagerSettings.Create(_actorSystem!) .WithRole(siteRole) diff --git a/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs b/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs index eeb0d11..ac3cae5 100644 --- a/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs @@ -29,6 +29,7 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers private readonly SiteStreamManager? _streamManager; private readonly SiteRuntimeOptions _options; private readonly ILogger _logger; + private readonly IActorRef? _dclManager; private readonly Dictionary _instanceActors = new(); public ITimerScheduler Timers { get; set; } = null!; @@ -39,13 +40,15 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers SharedScriptLibrary sharedScriptLibrary, SiteStreamManager? streamManager, SiteRuntimeOptions options, - ILogger logger) + ILogger logger, + IActorRef? dclManager = null) { _storage = storage; _compilationService = compilationService; _sharedScriptLibrary = sharedScriptLibrary; _streamManager = streamManager; _options = options; + _dclManager = dclManager; _logger = logger; // Lifecycle commands @@ -192,6 +195,9 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers return; } + // Ensure DCL connections exist for any data-sourced attributes + EnsureDclConnections(command.FlattenedConfigurationJson); + // Create the Instance Actor immediately (no existing actor to replace) CreateInstanceActor(instanceName, command.FlattenedConfigurationJson); @@ -342,6 +348,54 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers _logger.LogInformation("Instance {Instance} deleted", instanceName); } + // ── DCL connection management ── + + private readonly HashSet _createdConnections = new(); + + /// + /// Parses the flattened config to find bound data connections and ensures + /// the DCL Manager has corresponding connection actors created. + /// + private void EnsureDclConnections(string configJson) + { + if (_dclManager == null) return; + + try + { + var config = System.Text.Json.JsonSerializer.Deserialize(configJson); + if (config?.Connections == null) return; + + foreach (var (name, connConfig) in config.Connections) + { + if (_createdConnections.Contains(name)) + continue; + + var connectionDetails = new Dictionary(); + if (!string.IsNullOrEmpty(connConfig.ConfigurationJson)) + { + try + { + var parsed = System.Text.Json.JsonSerializer.Deserialize>(connConfig.ConfigurationJson); + if (parsed != null) connectionDetails = parsed; + } + catch { /* Ignore parse errors */ } + } + + _dclManager.Tell(new Commons.Messages.DataConnection.CreateConnectionCommand( + name, connConfig.Protocol, connectionDetails)); + + _createdConnections.Add(name); + _logger.LogInformation( + "Created DCL connection {Connection} (protocol={Protocol})", + name, connConfig.Protocol); + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to parse flattened config for DCL connections"); + } + } + // ── Debug View routing ── private void RouteDebugViewSubscribe(SubscribeDebugViewRequest request) @@ -456,7 +510,8 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers _sharedScriptLibrary, _streamManager, _options, - loggerFactory.CreateLogger())); + loggerFactory.CreateLogger(), + _dclManager)); var actorRef = Context.ActorOf(props, instanceName); _instanceActors[instanceName] = actorRef; diff --git a/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs b/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs index 10bcb98..565d738 100644 --- a/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs @@ -1,5 +1,6 @@ using Akka.Actor; using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Instance; using ScadaLink.Commons.Messages.Lifecycle; @@ -45,6 +46,11 @@ public class InstanceActor : ReceiveActor // WP-25: Debug view subscribers private readonly Dictionary _debugSubscribers = new(); + // DCL manager actor reference for subscribing to tag values + private readonly IActorRef? _dclManager; + // Maps tag paths back to attribute canonical names for DCL updates + private readonly Dictionary _tagPathToAttribute = new(); + public InstanceActor( string instanceUniqueName, string configJson, @@ -53,7 +59,8 @@ public class InstanceActor : ReceiveActor SharedScriptLibrary sharedScriptLibrary, SiteStreamManager? streamManager, SiteRuntimeOptions options, - ILogger logger) + ILogger logger, + IActorRef? dclManager = null) { _instanceUniqueName = instanceUniqueName; _storage = storage; @@ -62,6 +69,7 @@ public class InstanceActor : ReceiveActor _streamManager = streamManager; _options = options; _logger = logger; + _dclManager = dclManager; // Deserialize the flattened configuration _configuration = JsonSerializer.Deserialize(configJson); @@ -102,6 +110,10 @@ public class InstanceActor : ReceiveActor // WP-22/23: Handle attribute value changes from DCL (Tell pattern) Receive(HandleAttributeValueChanged); + // Handle tag value updates from DCL — convert to AttributeValueChanged + Receive(HandleTagValueUpdate); + Receive(HandleConnectionQualityChanged); + // WP-16: Handle alarm state changes from Alarm Actors (Tell pattern) Receive(HandleAlarmStateChanged); @@ -129,6 +141,9 @@ public class InstanceActor : ReceiveActor // Create child Script Actors and Alarm Actors from configuration CreateChildActors(); + + // Subscribe to DCL for data-sourced attributes + SubscribeToDcl(); } /// @@ -238,6 +253,66 @@ public class InstanceActor : ReceiveActor PublishAndNotifyChildren(changed); } + /// + /// Handles tag value updates from DCL. Maps the tag path back to the attribute + /// canonical name and converts to an AttributeValueChanged for unified processing. + /// + private void HandleTagValueUpdate(TagValueUpdate update) + { + if (_tagPathToAttribute.TryGetValue(update.TagPath, out var attrName)) + { + var changed = new AttributeValueChanged( + _instanceUniqueName, update.TagPath, attrName, + update.Value, update.Quality.ToString(), update.Timestamp); + HandleAttributeValueChanged(changed); + } + } + + private void HandleConnectionQualityChanged(ConnectionQualityChanged qualityChanged) + { + _logger.LogInformation("Connection {Connection} quality changed to {Quality}", + qualityChanged.ConnectionName, qualityChanged.Quality); + } + + /// + /// Subscribes to DCL for all data-sourced attributes. Groups tag paths by connection + /// name and sends SubscribeTagsRequest to the DCL manager. + /// + private void SubscribeToDcl() + { + if (_dclManager == null || _configuration == null) return; + + // Group attributes by their bound connection name + var byConnection = new Dictionary>(); + foreach (var attr in _configuration.Attributes) + { + if (string.IsNullOrEmpty(attr.DataSourceReference) || + string.IsNullOrEmpty(attr.BoundDataConnectionName)) + continue; + + _tagPathToAttribute[attr.DataSourceReference] = attr.CanonicalName; + + if (!byConnection.ContainsKey(attr.BoundDataConnectionName)) + byConnection[attr.BoundDataConnectionName] = new List(); + byConnection[attr.BoundDataConnectionName].Add(attr.DataSourceReference); + } + + // Send subscription requests to DCL for each connection + foreach (var (connectionName, tagPaths) in byConnection) + { + var request = new SubscribeTagsRequest( + Guid.NewGuid().ToString("N"), + _instanceUniqueName, + connectionName, + tagPaths, + DateTimeOffset.UtcNow); + _dclManager.Tell(request, Self); + _logger.LogInformation( + "Instance {Instance} subscribed to {Count} tags on connection {Connection}", + _instanceUniqueName, tagPaths.Count, connectionName); + } + } + /// /// WP-16: Handles alarm state changes from Alarm Actors. /// Updates in-memory alarm state and publishes to stream.