Wire DCL to Instance Actors for OPC UA tag value flow

- Add TagValueUpdate/ConnectionQualityChanged handlers to InstanceActor
- InstanceActor subscribes to DCL on PreStart based on DataSourceReference
- DeploymentManagerActor creates DCL connections on deploy and passes DCL ref
- AkkaHostedService creates DCL Manager Actor for tag subscriptions
- Move CreateConnectionCommand to Commons for cross-project access
- Add ConnectionConfig to FlattenedConfiguration for deployment packaging
This commit is contained in:
Joseph Doherty
2026-03-17 11:21:11 -04:00
parent 2798b91fe1
commit dfb809a909
6 changed files with 175 additions and 12 deletions

View File

@@ -0,0 +1,10 @@
namespace ScadaLink.Commons.Messages.DataConnection;
/// <summary>
/// Command to create a new data connection actor for a specific protocol.
/// Sent from DeploymentManagerActor to DCL Manager Actor.
/// </summary>
public record CreateConnectionCommand(
string ConnectionName,
string ProtocolType,
IDictionary<string, string> ConnectionDetails);

View File

@@ -17,6 +17,22 @@ public sealed record FlattenedConfiguration
public IReadOnlyList<ResolvedAlarm> Alarms { get; init; } = [];
public IReadOnlyList<ResolvedScript> Scripts { get; init; } = [];
public DateTimeOffset GeneratedAtUtc { get; init; } = DateTimeOffset.UtcNow;
/// <summary>
/// Connection configurations keyed by connection name, each containing the
/// protocol-specific settings (e.g. OPC UA endpoint, publish interval).
/// Populated during flattening from the instance's connection bindings.
/// </summary>
public IReadOnlyDictionary<string, ConnectionConfig>? Connections { get; init; }
}
/// <summary>
/// Connection configuration included in the flattened deployment package.
/// </summary>
public sealed record ConnectionConfig
{
public string Protocol { get; init; } = string.Empty;
public string? ConfigurationJson { get; init; }
}
/// <summary>

View File

@@ -123,14 +123,6 @@ public class DataConnectionManagerActor : ReceiveActor
}
}
/// <summary>
/// Command to create a new data connection actor for a specific protocol.
/// </summary>
public record CreateConnectionCommand(
string ConnectionName,
string ProtocolType,
IDictionary<string, string> ConnectionDetails);
/// <summary>
/// Command to remove a data connection actor.
/// </summary>

View File

@@ -196,6 +196,20 @@ akka {{
var dmLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<DeploymentManagerActor>();
// WP-34: Create DCL Manager Actor for tag subscriptions
var dclFactory = _serviceProvider.GetService<ScadaLink.DataConnectionLayer.IDataConnectionFactory>();
var dclOptions = _serviceProvider.GetService<IOptions<ScadaLink.DataConnectionLayer.DataConnectionOptions>>()?.Value
?? new ScadaLink.DataConnectionLayer.DataConnectionOptions();
IActorRef? dclManager = null;
if (dclFactory != null)
{
dclManager = _actorSystem!.ActorOf(
Props.Create(() => new ScadaLink.DataConnectionLayer.Actors.DataConnectionManagerActor(
dclFactory, dclOptions)),
"dcl-manager");
_logger.LogInformation("Data Connection Layer manager actor created");
}
// Create the Deployment Manager as a cluster singleton
var singletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new DeploymentManagerActor(
@@ -204,7 +218,8 @@ akka {{
sharedScriptLibrary,
streamManager,
siteRuntimeOptionsValue,
dmLogger)),
dmLogger,
dclManager)),
terminationMessage: PoisonPill.Instance,
settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
.WithRole(siteRole)

View File

@@ -29,6 +29,7 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
private readonly SiteStreamManager? _streamManager;
private readonly SiteRuntimeOptions _options;
private readonly ILogger<DeploymentManagerActor> _logger;
private readonly IActorRef? _dclManager;
private readonly Dictionary<string, IActorRef> _instanceActors = new();
public ITimerScheduler Timers { get; set; } = null!;
@@ -39,13 +40,15 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
SharedScriptLibrary sharedScriptLibrary,
SiteStreamManager? streamManager,
SiteRuntimeOptions options,
ILogger<DeploymentManagerActor> logger)
ILogger<DeploymentManagerActor> logger,
IActorRef? dclManager = null)
{
_storage = storage;
_compilationService = compilationService;
_sharedScriptLibrary = sharedScriptLibrary;
_streamManager = streamManager;
_options = options;
_dclManager = dclManager;
_logger = logger;
// Lifecycle commands
@@ -192,6 +195,9 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
return;
}
// Ensure DCL connections exist for any data-sourced attributes
EnsureDclConnections(command.FlattenedConfigurationJson);
// Create the Instance Actor immediately (no existing actor to replace)
CreateInstanceActor(instanceName, command.FlattenedConfigurationJson);
@@ -342,6 +348,54 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
_logger.LogInformation("Instance {Instance} deleted", instanceName);
}
// ── DCL connection management ──
private readonly HashSet<string> _createdConnections = new();
/// <summary>
/// Parses the flattened config to find bound data connections and ensures
/// the DCL Manager has corresponding connection actors created.
/// </summary>
private void EnsureDclConnections(string configJson)
{
if (_dclManager == null) return;
try
{
var config = System.Text.Json.JsonSerializer.Deserialize<Commons.Types.Flattening.FlattenedConfiguration>(configJson);
if (config?.Connections == null) return;
foreach (var (name, connConfig) in config.Connections)
{
if (_createdConnections.Contains(name))
continue;
var connectionDetails = new Dictionary<string, string>();
if (!string.IsNullOrEmpty(connConfig.ConfigurationJson))
{
try
{
var parsed = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(connConfig.ConfigurationJson);
if (parsed != null) connectionDetails = parsed;
}
catch { /* Ignore parse errors */ }
}
_dclManager.Tell(new Commons.Messages.DataConnection.CreateConnectionCommand(
name, connConfig.Protocol, connectionDetails));
_createdConnections.Add(name);
_logger.LogInformation(
"Created DCL connection {Connection} (protocol={Protocol})",
name, connConfig.Protocol);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to parse flattened config for DCL connections");
}
}
// ── Debug View routing ──
private void RouteDebugViewSubscribe(SubscribeDebugViewRequest request)
@@ -456,7 +510,8 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
_sharedScriptLibrary,
_streamManager,
_options,
loggerFactory.CreateLogger<InstanceActor>()));
loggerFactory.CreateLogger<InstanceActor>(),
_dclManager));
var actorRef = Context.ActorOf(props, instanceName);
_instanceActors[instanceName] = actorRef;

View File

@@ -1,5 +1,6 @@
using Akka.Actor;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Instance;
using ScadaLink.Commons.Messages.Lifecycle;
@@ -45,6 +46,11 @@ public class InstanceActor : ReceiveActor
// WP-25: Debug view subscribers
private readonly Dictionary<string, IActorRef> _debugSubscribers = new();
// DCL manager actor reference for subscribing to tag values
private readonly IActorRef? _dclManager;
// Maps tag paths back to attribute canonical names for DCL updates
private readonly Dictionary<string, string> _tagPathToAttribute = new();
public InstanceActor(
string instanceUniqueName,
string configJson,
@@ -53,7 +59,8 @@ public class InstanceActor : ReceiveActor
SharedScriptLibrary sharedScriptLibrary,
SiteStreamManager? streamManager,
SiteRuntimeOptions options,
ILogger logger)
ILogger logger,
IActorRef? dclManager = null)
{
_instanceUniqueName = instanceUniqueName;
_storage = storage;
@@ -62,6 +69,7 @@ public class InstanceActor : ReceiveActor
_streamManager = streamManager;
_options = options;
_logger = logger;
_dclManager = dclManager;
// Deserialize the flattened configuration
_configuration = JsonSerializer.Deserialize<FlattenedConfiguration>(configJson);
@@ -102,6 +110,10 @@ public class InstanceActor : ReceiveActor
// WP-22/23: Handle attribute value changes from DCL (Tell pattern)
Receive<AttributeValueChanged>(HandleAttributeValueChanged);
// Handle tag value updates from DCL — convert to AttributeValueChanged
Receive<TagValueUpdate>(HandleTagValueUpdate);
Receive<ConnectionQualityChanged>(HandleConnectionQualityChanged);
// WP-16: Handle alarm state changes from Alarm Actors (Tell pattern)
Receive<AlarmStateChanged>(HandleAlarmStateChanged);
@@ -129,6 +141,9 @@ public class InstanceActor : ReceiveActor
// Create child Script Actors and Alarm Actors from configuration
CreateChildActors();
// Subscribe to DCL for data-sourced attributes
SubscribeToDcl();
}
/// <summary>
@@ -238,6 +253,66 @@ public class InstanceActor : ReceiveActor
PublishAndNotifyChildren(changed);
}
/// <summary>
/// Handles tag value updates from DCL. Maps the tag path back to the attribute
/// canonical name and converts to an AttributeValueChanged for unified processing.
/// </summary>
private void HandleTagValueUpdate(TagValueUpdate update)
{
if (_tagPathToAttribute.TryGetValue(update.TagPath, out var attrName))
{
var changed = new AttributeValueChanged(
_instanceUniqueName, update.TagPath, attrName,
update.Value, update.Quality.ToString(), update.Timestamp);
HandleAttributeValueChanged(changed);
}
}
private void HandleConnectionQualityChanged(ConnectionQualityChanged qualityChanged)
{
_logger.LogInformation("Connection {Connection} quality changed to {Quality}",
qualityChanged.ConnectionName, qualityChanged.Quality);
}
/// <summary>
/// Subscribes to DCL for all data-sourced attributes. Groups tag paths by connection
/// name and sends SubscribeTagsRequest to the DCL manager.
/// </summary>
private void SubscribeToDcl()
{
if (_dclManager == null || _configuration == null) return;
// Group attributes by their bound connection name
var byConnection = new Dictionary<string, List<string>>();
foreach (var attr in _configuration.Attributes)
{
if (string.IsNullOrEmpty(attr.DataSourceReference) ||
string.IsNullOrEmpty(attr.BoundDataConnectionName))
continue;
_tagPathToAttribute[attr.DataSourceReference] = attr.CanonicalName;
if (!byConnection.ContainsKey(attr.BoundDataConnectionName))
byConnection[attr.BoundDataConnectionName] = new List<string>();
byConnection[attr.BoundDataConnectionName].Add(attr.DataSourceReference);
}
// Send subscription requests to DCL for each connection
foreach (var (connectionName, tagPaths) in byConnection)
{
var request = new SubscribeTagsRequest(
Guid.NewGuid().ToString("N"),
_instanceUniqueName,
connectionName,
tagPaths,
DateTimeOffset.UtcNow);
_dclManager.Tell(request, Self);
_logger.LogInformation(
"Instance {Instance} subscribed to {Count} tags on connection {Connection}",
_instanceUniqueName, tagPaths.Count, connectionName);
}
}
/// <summary>
/// WP-16: Handles alarm state changes from Alarm Actors.
/// Updates in-memory alarm state and publishes to stream.