Files
ScadaBridge/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs
T
Joseph Doherty feeae1371e fix(multivalue): NJ-3/NJ-4/NJ-5 review fixes
- NJ-3: widen per-row catch to Exception (an STJ encode failure can't abort startup); drop dead null-guard already excluded by the SQL filter
- NJ-4: capture logger/instanceName in locals for the fire-and-forget normalize continuation (match the sibling pattern in this actor)
- NJ-5: emit a warn-log when a malformed List value is imported verbatim; thread an optional ILogger<BundleImporter> to the sync re-import site
2026-06-16 18:25:42 -04:00

1206 lines
54 KiB
C#

using Akka.Actor;
using Microsoft.CodeAnalysis.Scripting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Instance;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.ScriptExecution;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Streaming;
using System.Text.Json;
namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors;
/// <summary>
/// Represents a single deployed instance at runtime. Holds the in-memory attribute state
/// (loaded from FlattenedConfiguration + static overrides from SQLite).
///
/// The Instance Actor is the single source of truth for runtime instance state.
/// WP-24: All state mutations are serialized through the actor mailbox.
/// Multiple Script Execution Actors run concurrently; state mutations through this actor.
///
/// WP-15/16: Creates child Script Actors and Alarm Actors on startup.
/// WP-22: Tell for tag value updates, attribute notifications, stream publishing.
/// Ask for CallScript, debug snapshot.
/// WP-25: Debug view backend — snapshot + stream subscription.
/// </summary>
public class InstanceActor : ReceiveActor
{
private readonly string _instanceUniqueName;
private readonly SiteStorageService _storage;
private readonly ScriptCompilationService _compilationService;
private readonly SharedScriptLibrary _sharedScriptLibrary;
private readonly SiteStreamManager? _streamManager;
private readonly SiteRuntimeOptions _options;
private readonly ILogger _logger;
private readonly ISiteHealthCollector? _healthCollector;
private readonly IServiceProvider? _serviceProvider;
private readonly Dictionary<string, object?> _attributes = new();
private readonly Dictionary<string, string> _attributeQualities = new();
private readonly Dictionary<string, DateTimeOffset> _attributeTimestamps = new();
private readonly Dictionary<string, AlarmState> _alarmStates = new();
private readonly Dictionary<string, DateTimeOffset> _alarmTimestamps = new();
private readonly Dictionary<string, int> _alarmPriorities = new();
private readonly Dictionary<string, IActorRef> _scriptActors = new();
private readonly Dictionary<string, IActorRef> _alarmActors = new();
private readonly Dictionary<string, IActorRef> _nativeAlarmActors = new();
/// <summary>
/// Latest enriched <see cref="AlarmStateChanged"/> per alarm name (computed and
/// native), so the DebugView snapshot carries the unified condition + native
/// metadata rather than a bare State/Priority projection.
/// </summary>
private readonly Dictionary<string, AlarmStateChanged> _latestAlarmEvents = new();
private FlattenedConfiguration? _configuration;
// MV-8: resolved attributes indexed by canonical name. The TagValueUpdate
// ingest path is the highest-frequency message this actor handles, so the
// attribute lookup must be O(1) rather than a linear scan of
// _configuration.Attributes. Built once in the constructor from the
// deserialized configuration (last-wins on duplicate canonical names,
// mirroring the rest of the actor's by-name dictionaries).
private readonly Dictionary<string, ResolvedAttribute> _resolvedAttributeByName = new();
// DCL manager actor reference for subscribing to tag values
private readonly IActorRef? _dclManager;
// Maps each tag path to every attribute canonical name that references it.
// A tag path can back more than one attribute (e.g. two composed modules
// whose members reference the same PLC node), so a tag value update must
// fan out to all of them — not just the last one registered.
private readonly Dictionary<string, List<string>> _tagPathToAttributes = new();
/// <summary>
/// Initializes the instance actor with its configuration and dependencies.
/// </summary>
/// <param name="instanceUniqueName">System-wide unique name identifying this instance.</param>
/// <param name="configJson">JSON-serialized flattened configuration for this instance.</param>
/// <param name="storage">Site storage service for loading and persisting static overrides.</param>
/// <param name="compilationService">Service used to compile instance scripts.</param>
/// <param name="sharedScriptLibrary">Library of shared scripts available to instance scripts.</param>
/// <param name="streamManager">Optional site stream manager for publishing attribute/alarm changes.</param>
/// <param name="options">Site runtime configuration options.</param>
/// <param name="logger">Logger for this actor.</param>
/// <param name="dclManager">Optional Data Connection Layer manager actor reference.</param>
/// <param name="healthCollector">Optional health collector for reporting metrics.</param>
/// <param name="serviceProvider">Optional DI service provider for script execution services.</param>
public InstanceActor(
string instanceUniqueName,
string configJson,
SiteStorageService storage,
ScriptCompilationService compilationService,
SharedScriptLibrary sharedScriptLibrary,
SiteStreamManager? streamManager,
SiteRuntimeOptions options,
ILogger logger,
IActorRef? dclManager = null,
ISiteHealthCollector? healthCollector = null,
IServiceProvider? serviceProvider = null)
{
_instanceUniqueName = instanceUniqueName;
_storage = storage;
_compilationService = compilationService;
_sharedScriptLibrary = sharedScriptLibrary;
_streamManager = streamManager;
_options = options;
_logger = logger;
_dclManager = dclManager;
_healthCollector = healthCollector;
_serviceProvider = serviceProvider;
// Deserialize the flattened configuration
_configuration = JsonSerializer.Deserialize<FlattenedConfiguration>(configJson);
// Load default attribute values from the flattened configuration
// Data-sourced attributes start with Uncertain quality until the first DCL value arrives.
// Static attributes start with Good quality.
if (_configuration != null)
{
foreach (var attr in _configuration.Attributes)
{
// MV-8: index resolved attributes for O(1) lookup on the hot
// TagValueUpdate ingest path (last-wins on duplicate names).
_resolvedAttributeByName[attr.CanonicalName] = attr;
// MV-7: a STATIC List attribute's default is the canonical JSON
// array string. Decode it to a typed List<T> for in-memory reads
// so scripts see a real collection. Scalars store their raw
// string unchanged. A malformed List default decodes to null and
// is marked Bad quality rather than crashing the actor.
if (IsListAttribute(attr))
{
var decoded = DecodeAttributeValue(attr, attr.Value);
_attributes[attr.CanonicalName] = decoded;
_attributeQualities[attr.CanonicalName] =
decoded is null && !string.IsNullOrEmpty(attr.Value) ? "Bad"
: string.IsNullOrEmpty(attr.DataSourceReference) ? "Good" : "Uncertain";
}
else
{
_attributes[attr.CanonicalName] = attr.Value;
_attributeQualities[attr.CanonicalName] =
string.IsNullOrEmpty(attr.DataSourceReference) ? "Good" : "Uncertain";
}
}
}
// Handle attribute queries (Tell pattern -- sender gets response)
Receive<GetAttributeRequest>(HandleGetAttribute);
// Handle static attribute writes
Receive<SetStaticAttributeCommand>(HandleSetStaticAttribute);
// SiteRuntime-019: the disable/enable lifecycle is owned entirely by the
// Deployment Manager — DeploymentManagerActor.HandleDisable/HandleEnable
// stop or re-create the Instance Actor directly and reply to the caller.
// DisableInstanceCommand / EnableInstanceCommand are never routed to the
// Instance Actor, so no handlers are registered here. (The previous no-op
// handlers were dead code that implied a non-existent instance-side
// acknowledgement contract.)
// WP-15: Handle script call requests — route to appropriate Script Actor (Ask pattern)
Receive<ScriptCallRequest>(HandleScriptCallRequest);
// WP-22/23: Handle attribute value changes from DCL (Tell pattern)
Receive<AttributeValueChanged>(HandleAttributeValueChanged);
// Handle tag value updates from DCL — convert to AttributeValueChanged
Receive<TagValueUpdate>(HandleTagValueUpdate);
Receive<SubscribeTagsResponse>(_ => { }); // Ack from DCL subscribe — no action needed
Receive<ConnectionQualityChanged>(HandleConnectionQualityChanged);
// WP-16: Handle alarm state changes from Alarm Actors (Tell pattern)
Receive<AlarmStateChanged>(HandleAlarmStateChanged);
// WP-25: Debug view subscribe/unsubscribe (Ask pattern for snapshot)
Receive<SubscribeDebugViewRequest>(HandleSubscribeDebugView);
Receive<UnsubscribeDebugViewRequest>(HandleUnsubscribeDebugView);
// Debug snapshot (one-shot, no subscription)
Receive<DebugSnapshotRequest>(HandleDebugSnapshot);
// Handle internal messages
Receive<LoadOverridesResult>(HandleOverridesLoaded);
}
/// <inheritdoc />
protected override void PreStart()
{
base.PreStart();
_logger.LogInformation("InstanceActor started for {Instance}", _instanceUniqueName);
// M1.6: operational `instance_lifecycle` event — instance started.
// An instance starts on deploy, on enable (DeploymentManager re-creates
// the actor), and on failover/restart; this single point covers them all.
LogLifecycleEvent($"Instance {_instanceUniqueName} started");
// Asynchronously load static overrides from SQLite and pipe to self
var self = Self;
_storage.GetStaticOverridesAsync(_instanceUniqueName).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new LoadOverridesResult(t.Result, null);
return new LoadOverridesResult(new Dictionary<string, string>(), t.Exception?.GetBaseException().Message);
}).PipeTo(self);
// Create child Script Actors and Alarm Actors from configuration
CreateChildActors();
// Subscribe to DCL for data-sourced attributes
SubscribeToDcl();
}
/// <inheritdoc />
protected override void PostStop()
{
// M1.6: operational `instance_lifecycle` event — instance stopped. An
// instance stops on disable, delete, redeployment, and graceful shutdown;
// this single point covers them all.
LogLifecycleEvent($"Instance {_instanceUniqueName} stopped");
base.PostStop();
}
/// <summary>
/// M1.6: fire-and-forget an <c>instance_lifecycle</c> operational event to the
/// optional <see cref="ISiteEventLogger"/>. Resolved optionally and never
/// awaited so a logging failure cannot affect the instance lifecycle
/// (matching the established ScriptActor/ScriptExecutionActor pattern).
/// </summary>
private void LogLifecycleEvent(string message)
{
_ = _serviceProvider?.GetService<ISiteEventLogger>()?.LogEventAsync(
"instance_lifecycle", "Info", _instanceUniqueName,
$"InstanceActor:{_instanceUniqueName}", message);
}
/// <inheritdoc />
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: -1,
withinTimeRange: TimeSpan.FromMinutes(1),
decider: Decider.From(ex =>
{
_logger.LogWarning(ex,
"Child actor on instance {Instance} threw exception, resuming",
_instanceUniqueName);
return Directive.Resume;
}));
}
/// <summary>
/// Returns the current attribute value. Uses Tell pattern; sender gets the response.
/// </summary>
private void HandleGetAttribute(GetAttributeRequest request)
{
var found = _attributes.TryGetValue(request.AttributeName, out var value);
_attributeQualities.TryGetValue(request.AttributeName, out var quality);
Sender.Tell(new GetAttributeResponse(
request.CorrelationId,
_instanceUniqueName,
request.AttributeName,
value,
found,
quality ?? "Good",
DateTimeOffset.UtcNow));
}
/// <summary>
/// Handles an attribute write (<c>Instance.SetAttribute</c> / Inbound API).
/// WP-24: State mutation serialized through this actor's mailbox.
///
/// The write is routed by the attribute's data binding:
/// * Data-sourced attribute → forwards a <see cref="WriteTagRequest"/> to the
/// DCL, which writes the physical device. The in-memory value is NOT
/// optimistically updated and NO static override is persisted — the
/// confirmed device value arrives later via the subscription. Success or
/// failure of the device write is returned to the caller.
/// * Static attribute → updates the in-memory value and persists the override
/// to SQLite.
///
/// Either way the caller receives a <see cref="SetStaticAttributeResponse"/>.
/// </summary>
private void HandleSetStaticAttribute(SetStaticAttributeCommand command)
{
// Resolve the target attribute's data binding from the flattened config.
var resolved = _configuration?.Attributes
.FirstOrDefault(a => a.CanonicalName == command.AttributeName);
// SiteRuntime-025: reject writes targeting an attribute that does not exist
// on the deployed instance. Without this check, an inbound API
// SetAttribute("notARealAttr", ...) would pollute the in-memory
// _attributes dictionary, publish a synthetic AttributeValueChanged to
// debug-view subscribers, and persist a durable static-override row that
// resurrects on every restart. The override row is also outside the
// ClearStaticOverridesAsync window for unknown names. Refuse the write
// and let the caller see the failure, mirroring the script trust model's
// "scripts can only read/write attributes on their own instance" framing.
if (resolved == null)
{
_logger.LogWarning(
"SetAttribute rejected — attribute '{Attribute}' is not defined on instance '{Instance}'",
command.AttributeName, _instanceUniqueName);
Sender.Tell(new SetStaticAttributeResponse(
command.CorrelationId,
_instanceUniqueName,
command.AttributeName,
false,
$"Unknown attribute '{command.AttributeName}'",
DateTimeOffset.UtcNow));
return;
}
var isDataSourced =
!string.IsNullOrEmpty(resolved.DataSourceReference)
&& !string.IsNullOrEmpty(resolved.BoundDataConnectionName);
if (isDataSourced)
{
HandleSetDataAttribute(command, resolved);
return;
}
HandleSetStaticAttributeCore(command);
}
/// <summary>
/// Static attribute write: updates in-memory state, publishes the change,
/// persists the override to SQLite, and replies with success.
/// </summary>
private void HandleSetStaticAttributeCore(SetStaticAttributeCommand command)
{
// MV-7: command.Value is the canonical form — a plain string for scalars,
// a JSON array string for List attributes. For a List attribute we store
// the DECODED typed list in memory (so scripts read a real collection) but
// persist + publish the canonical JSON string UNCHANGED below. Scalars
// store the string verbatim. (HandleSetStaticAttribute already rejected
// unknown attributes, so resolved is non-null here, but guard defensively.)
if (_resolvedAttributeByName.TryGetValue(command.AttributeName, out var resolved)
&& IsListAttribute(resolved))
{
// MV-7: the script path pre-encodes valid canonical JSON via ScopeAccessors,
// but the Inbound API / direct-command path can submit an arbitrary
// command.Value. A non-empty value that fails to decode (malformed JSON,
// bad element, missing element type) is poison: storing it would null the
// in-memory value yet publish "Good" quality and durably persist the bad
// JSON (which then loads as Bad next restart). Reject such writes outright.
// Note: DecodeAttributeValue returns null for BOTH a null/empty input
// (valid — clearing) AND a malformed non-empty input (invalid). Only the
// latter is rejected, hence the explicit IsNullOrWhiteSpace guard. An empty
// list "[]" decodes to a non-null empty List, so it passes through.
var decoded = DecodeAttributeValue(resolved, command.Value);
if (!string.IsNullOrWhiteSpace(command.Value) && decoded == null)
{
_logger.LogWarning(
"SetAttribute rejected — value for List attribute '{Attribute}' on instance '{Instance}' is not a valid list",
command.AttributeName, _instanceUniqueName);
Sender.Tell(new SetStaticAttributeResponse(
command.CorrelationId,
_instanceUniqueName,
command.AttributeName,
false,
$"Invalid list value for attribute '{command.AttributeName}'",
DateTimeOffset.UtcNow));
return;
}
_attributes[command.AttributeName] = decoded;
}
else
{
_attributes[command.AttributeName] = command.Value;
}
// Publish attribute change to stream (WP-23) and notify children
var changed = new AttributeValueChanged(
_instanceUniqueName,
command.AttributeName,
command.AttributeName,
command.Value,
"Good",
DateTimeOffset.UtcNow);
PublishAndNotifyChildren(changed);
// Persist asynchronously -- fire and forget since the actor is the source of truth.
var instanceName = _instanceUniqueName;
var attributeName = command.AttributeName;
var logger = _logger;
_storage.SetStaticOverrideAsync(_instanceUniqueName, command.AttributeName, command.Value)
.ContinueWith(t =>
{
logger.LogWarning(
t.Exception?.GetBaseException(),
"Failed to persist static override for {Instance}.{Attribute}; in-memory state is authoritative",
instanceName,
attributeName);
}, TaskContinuationOptions.OnlyOnFaulted);
Sender.Tell(new SetStaticAttributeResponse(
command.CorrelationId, _instanceUniqueName, command.AttributeName,
true, null, DateTimeOffset.UtcNow));
}
/// <summary>
/// Data-sourced attribute write: forwards a write request to the DCL and pipes
/// the device write result back to the caller. The in-memory value is left
/// untouched (it is refreshed by the subscription when the device confirms);
/// no static override is persisted for a data-sourced attribute.
/// </summary>
private void HandleSetDataAttribute(SetStaticAttributeCommand command, ResolvedAttribute resolved)
{
var caller = Sender;
var correlationId = command.CorrelationId;
var attributeName = command.AttributeName;
var instanceName = _instanceUniqueName;
if (_dclManager == null)
{
_logger.LogWarning(
"SetAttribute on data-sourced attribute {Instance}.{Attribute} cannot be routed — no DCL manager configured",
instanceName, attributeName);
caller.Tell(new SetStaticAttributeResponse(
correlationId, instanceName, attributeName, false,
"Data Connection Layer not available for write.", DateTimeOffset.UtcNow));
return;
}
// MV (C1): for a data-sourced List attribute the incoming command.Value is
// the canonical JSON array string (ScopeAccessors encodes the script's
// List<T> for transport/storage). Writing that string straight to the DCL
// would push a String scalar to an array node. Decode it back to a typed
// List<T> so the DCL/Variant write produces a real array. A non-empty value
// that fails to decode (malformed JSON / bad element) is poison — reject the
// write rather than forward garbage to the device (mirrors the static-path
// rejection in HandleSetStaticAttributeCore). Scalars are unchanged.
object? writeValue = command.Value;
if (IsListAttribute(resolved) && !string.IsNullOrWhiteSpace(command.Value))
{
var decoded = DecodeAttributeValue(resolved, command.Value);
if (decoded == null)
{
_logger.LogWarning(
"SetAttribute rejected — value for data-sourced List attribute '{Attribute}' on instance '{Instance}' is not a valid list",
attributeName, instanceName);
caller.Tell(new SetStaticAttributeResponse(
correlationId, instanceName, attributeName, false,
$"Invalid list value for attribute '{attributeName}'", DateTimeOffset.UtcNow));
return;
}
writeValue = decoded;
}
var writeRequest = new WriteTagRequest(
correlationId,
resolved.BoundDataConnectionName!,
resolved.DataSourceReference!,
writeValue,
DateTimeOffset.UtcNow);
// Ask the DCL and pipe the result back to the original caller. The DCL
// returns the failure synchronously so the script can handle it.
_dclManager.Ask<WriteTagResponse>(writeRequest, TimeSpan.FromSeconds(30))
.ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new SetStaticAttributeResponse(
correlationId, instanceName, attributeName,
t.Result.Success, t.Result.ErrorMessage, DateTimeOffset.UtcNow);
return new SetStaticAttributeResponse(
correlationId, instanceName, attributeName, false,
t.Exception?.GetBaseException().Message ?? "DCL write timed out",
DateTimeOffset.UtcNow);
}).PipeTo(caller);
}
/// <summary>
/// WP-15: Routes script call requests to the appropriate Script Actor.
/// Uses Ask pattern (WP-22).
/// </summary>
private void HandleScriptCallRequest(ScriptCallRequest request)
{
if (_scriptActors.TryGetValue(request.ScriptName, out var scriptActor))
{
// Forward the request to the Script Actor, preserving the original
// sender. The whole record is forwarded unchanged, so any
// ParentExecutionId (Audit Log #23) set by an inbound-API-routed
// call is carried through to the Script Actor verbatim.
scriptActor.Forward(request);
}
else
{
Sender.Tell(new ScriptCallResult(
request.CorrelationId,
false,
null,
$"Script '{request.ScriptName}' not found on instance '{_instanceUniqueName}'."));
}
}
/// <summary>
/// WP-22/23: Handles attribute value changes from DCL or static writes.
/// Updates in-memory state, publishes to stream, and notifies children.
/// </summary>
private void HandleAttributeValueChanged(AttributeValueChanged changed)
{
// WP-24: State mutation serialized through this actor
_attributes[changed.AttributeName] = changed.Value;
_attributeQualities[changed.AttributeName] = changed.Quality;
_attributeTimestamps[changed.AttributeName] = changed.Timestamp;
PublishAndNotifyChildren(changed);
}
/// <summary>
/// Handles tag value updates from DCL. Maps the tag path back to the attribute
/// canonical name and converts to an AttributeValueChanged for unified processing.
/// </summary>
private void HandleTagValueUpdate(TagValueUpdate update)
{
if (!_tagPathToAttributes.TryGetValue(update.TagPath, out var attrNames))
return;
// One tag path may back several attributes — update every one of them.
// Each attribute is coerced according to its own declared data type, so
// we resolve and convert per attribute rather than once for the tag.
foreach (var attrName in attrNames)
{
// MV-8: O(1) lookup off the hot ingest path (was a linear FirstOrDefault).
_resolvedAttributeByName.TryGetValue(attrName, out var resolved);
// MV-8: a List-typed attribute coerces the incoming OPC UA array
// (a CLR array/IEnumerable from the SDK) into a typed List<T>. On an
// element-type mismatch we set the attribute's quality to Bad, log a
// warning, and skip storing a value rather than crashing the actor.
if (resolved != null && IsListAttribute(resolved))
{
if (TryCoerceListValue(resolved, update.Value, out var typedList))
{
HandleAttributeValueChanged(new AttributeValueChanged(
_instanceUniqueName, update.TagPath, attrName,
typedList, update.Quality.ToString(), update.Timestamp));
}
else
{
_logger.LogWarning(
"List attribute {Instance}.{Attribute} received a value that could not be coerced to List<{Element}>; marking quality Bad",
_instanceUniqueName, attrName, resolved.ElementDataType);
_attributeQualities[attrName] = "Bad";
_attributeTimestamps[attrName] = update.Timestamp;
var currentValue = _attributes.GetValueOrDefault(attrName);
PublishAndNotifyChildren(new AttributeValueChanged(
_instanceUniqueName, update.TagPath, attrName,
currentValue, "Bad", update.Timestamp));
}
continue;
}
// Scalars and non-List attributes keep the historical behaviour:
// array values are normalized to JSON strings so they survive Akka
// serialization; scalars pass through unchanged.
var value = update.Value is Array
? System.Text.Json.JsonSerializer.Serialize(update.Value, update.Value.GetType())
: update.Value;
HandleAttributeValueChanged(new AttributeValueChanged(
_instanceUniqueName, update.TagPath, attrName,
value, update.Quality.ToString(), update.Timestamp));
}
}
/// <summary>True if the resolved attribute is declared as a <see cref="DataType.List"/>.</summary>
private static bool IsListAttribute(ResolvedAttribute attr) =>
Enum.TryParse<DataType>(attr.DataType, ignoreCase: true, out var dt)
&& dt == DataType.List;
/// <summary>
/// MV-7: decodes a STATIC (authored / overridden) attribute's canonical value
/// for in-memory storage. List attributes carry a canonical JSON array string
/// (config default or persisted override) which is decoded via
/// <see cref="AttributeValueCodec.Decode"/> into a typed <c>List&lt;T&gt;</c>
/// so scripts read a real collection; scalars pass through unchanged. This is
/// the authored counterpart to MV-8's <see cref="TryCoerceListValue"/> (which
/// coerces live OPC UA CLR arrays). An undecodable List value (malformed JSON,
/// bad element, missing element type) degrades to <see langword="null"/> + a
/// warning — the caller marks the attribute Bad quality. NEVER throws into the
/// actor.
/// </summary>
private object? DecodeAttributeValue(ResolvedAttribute attr, string? raw)
{
DataType dataType = Enum.TryParse<DataType>(attr.DataType, ignoreCase: true, out var dt)
? dt
: DataType.String;
DataType? elementType = string.IsNullOrEmpty(attr.ElementDataType)
? null
: (Enum.TryParse<DataType>(attr.ElementDataType, ignoreCase: true, out var et) ? et : null);
try
{
return AttributeValueCodec.Decode(raw, dataType, elementType);
}
catch (FormatException ex)
{
_logger.LogWarning(ex,
"Attribute '{Attr}' on '{Instance}' has an undecodable List value; marking Bad quality",
attr.CanonicalName, _instanceUniqueName);
return null; // caller marks quality Bad
}
}
/// <summary>
/// MV-8: coerces an incoming data-sourced value (an OPC UA array / IEnumerable)
/// into a typed <c>List&lt;elementClrType&gt;</c> matching the attribute's
/// <see cref="ResolvedAttribute.ElementDataType"/>. Each element is converted
/// with invariant culture (round-trip parse for DateTime). Returns
/// <see langword="false"/> on a missing/invalid element type, a non-enumerable
/// value, or any element that cannot be coerced — the caller then marks the
/// attribute quality Bad. Never throws.
/// </summary>
private bool TryCoerceListValue(ResolvedAttribute attr, object? incoming, out object? typedList)
{
typedList = null;
if (string.IsNullOrEmpty(attr.ElementDataType)
|| !Enum.TryParse<DataType>(attr.ElementDataType, ignoreCase: true, out var elementType)
|| !AttributeValueCodec.IsValidElementType(elementType))
{
return false;
}
if (incoming is not System.Collections.IEnumerable enumerable || incoming is string)
return false;
try
{
// Construct the typed list INSIDE the try: although the six valid
// element types resolved by ListElementClrType cannot throw today,
// keeping ListElementClrType / MakeGenericType / CreateInstance inside
// the guarded block means any future change that introduces a throw
// here is caught and turned into a Bad-quality result rather than
// escaping into the actor and tripping supervision.
var clrType = ListElementClrType(elementType);
var list = (System.Collections.IList)Activator.CreateInstance(
typeof(List<>).MakeGenericType(clrType))!;
foreach (var element in enumerable)
list.Add(CoerceElement(element, elementType));
typedList = list;
return true;
}
catch (Exception ex)
{
// Any coercion / construction failure → Bad quality, never a crash.
_logger.LogWarning(ex,
"Failed to coerce value to List<{Element}> for instance {Instance}; marking quality Bad",
attr.ElementDataType, _instanceUniqueName);
return false;
}
}
private static Type ListElementClrType(DataType t) => t switch
{
DataType.String => typeof(string),
DataType.Int32 => typeof(int),
DataType.Float => typeof(float),
DataType.Double => typeof(double),
DataType.Boolean => typeof(bool),
DataType.DateTime => typeof(DateTime),
_ => throw new FormatException($"Unsupported list element type '{t}'.")
};
private static object CoerceElement(object? element, DataType t)
{
if (element is null)
throw new FormatException("List elements may not be null.");
var culture = System.Globalization.CultureInfo.InvariantCulture;
return t switch
{
DataType.String => Convert.ToString(element, culture)
?? throw new FormatException("Null string element."),
DataType.Int32 => element is string si
? int.Parse(si, culture)
: Convert.ToInt32(element, culture),
DataType.Float => element is string sf
? float.Parse(sf, culture)
: Convert.ToSingle(element, culture),
DataType.Double => element is string sd
? double.Parse(sd, culture)
: Convert.ToDouble(element, culture),
DataType.Boolean => element is string sb
? bool.Parse(sb)
: Convert.ToBoolean(element, culture),
DataType.DateTime => element is string sdt
? DateTime.Parse(sdt, culture, System.Globalization.DateTimeStyles.RoundtripKind)
: Convert.ToDateTime(element, culture),
_ => throw new FormatException($"Unsupported list element type '{t}'.")
};
}
private void HandleConnectionQualityChanged(ConnectionQualityChanged qualityChanged)
{
_logger.LogWarning("Connection {Connection} quality changed to {Quality} for instance {Instance}",
qualityChanged.ConnectionName, qualityChanged.Quality, _instanceUniqueName);
if (_configuration == null) return;
// Mark all attributes bound to this connection with the new quality
// and publish to the site stream so the debug view updates in real-time.
// We intentionally do NOT notify script/alarm actors here — the value
// hasn't changed, only the quality, and firing scripts/alarms would
// cause spurious evaluations.
var qualityStr = qualityChanged.Quality.ToString();
foreach (var attr in _configuration.Attributes)
{
if (attr.BoundDataConnectionName == qualityChanged.ConnectionName &&
!string.IsNullOrEmpty(attr.DataSourceReference))
{
_attributeQualities[attr.CanonicalName] = qualityStr;
_attributeTimestamps[attr.CanonicalName] = qualityChanged.Timestamp;
// Publish quality change to stream (current value, new quality)
_attributes.TryGetValue(attr.CanonicalName, out var currentValue);
_streamManager?.PublishAttributeValueChanged(new AttributeValueChanged(
_instanceUniqueName,
attr.DataSourceReference,
attr.CanonicalName,
currentValue,
qualityStr,
qualityChanged.Timestamp));
}
}
}
/// <summary>
/// Subscribes to DCL for all data-sourced attributes. Groups tag paths by connection
/// name and sends SubscribeTagsRequest to the DCL manager.
/// </summary>
private void SubscribeToDcl()
{
if (_dclManager == null || _configuration == null) return;
// Group attributes by their bound connection name
var byConnection = new Dictionary<string, List<string>>();
foreach (var attr in _configuration.Attributes)
{
if (string.IsNullOrEmpty(attr.DataSourceReference) ||
string.IsNullOrEmpty(attr.BoundDataConnectionName))
continue;
// Record every attribute that references this tag path so a single
// tag value update fans out to all of them.
if (!_tagPathToAttributes.TryGetValue(attr.DataSourceReference, out var attrs))
{
attrs = new List<string>();
_tagPathToAttributes[attr.DataSourceReference] = attrs;
}
attrs.Add(attr.CanonicalName);
if (!byConnection.TryGetValue(attr.BoundDataConnectionName, out var connTags))
{
connTags = new List<string>();
byConnection[attr.BoundDataConnectionName] = connTags;
}
// Subscribe each distinct tag path once per connection — a tag shared
// by several attributes still needs only one DCL subscription.
if (!connTags.Contains(attr.DataSourceReference))
connTags.Add(attr.DataSourceReference);
}
// Send subscription requests to DCL for each connection
foreach (var (connectionName, tagPaths) in byConnection)
{
var request = new SubscribeTagsRequest(
Guid.NewGuid().ToString("N"),
_instanceUniqueName,
connectionName,
tagPaths,
DateTimeOffset.UtcNow);
_dclManager.Tell(request, Self);
_logger.LogInformation(
"Instance {Instance} subscribed to {Count} tags on connection {Connection}",
_instanceUniqueName, tagPaths.Count, connectionName);
}
}
/// <summary>
/// WP-16: Handles alarm state changes from Alarm Actors.
/// Updates in-memory alarm state and publishes to stream.
/// </summary>
private void HandleAlarmStateChanged(AlarmStateChanged changed)
{
_alarmStates[changed.AlarmName] = changed.State;
_alarmTimestamps[changed.AlarmName] = changed.Timestamp;
// Retain the full enriched event (Kind, Condition, native metadata) so the
// DebugView snapshot reflects it — native alarms have no _alarmActors entry.
_latestAlarmEvents[changed.AlarmName] = changed;
// WP-23: Publish to site-wide stream
_streamManager?.PublishAlarmStateChanged(changed);
}
/// <summary>
/// WP-25: Debug view subscribe — returns snapshot and begins streaming.
/// </summary>
private void HandleSubscribeDebugView(SubscribeDebugViewRequest request)
{
// Build snapshot from current state
var now = DateTimeOffset.UtcNow;
var attributeValues = _attributes.Select(kvp => new AttributeValueChanged(
_instanceUniqueName,
kvp.Key,
kvp.Key,
kvp.Value,
_attributeQualities.GetValueOrDefault(kvp.Key, "Good"),
_attributeTimestamps.GetValueOrDefault(kvp.Key, now))).ToList();
var snapshot = new DebugViewSnapshot(
_instanceUniqueName,
attributeValues,
BuildAlarmStatesSnapshot(),
DateTimeOffset.UtcNow);
Sender.Tell(snapshot);
_logger.LogDebug(
"Debug view snapshot sent for {Instance}, correlationId={Id}",
_instanceUniqueName, request.CorrelationId);
}
/// <summary>
/// Builds the alarm-state list for a DebugView snapshot. Prefers the latest
/// enriched <see cref="AlarmStateChanged"/> per alarm (computed alarms that have
/// fired plus all native alarms) and falls back to a bare Normal projection for
/// computed alarms that have not yet emitted an event.
/// </summary>
private List<AlarmStateChanged> BuildAlarmStatesSnapshot()
{
var states = _latestAlarmEvents.Values.ToList();
foreach (var name in _alarmActors.Keys)
{
if (_latestAlarmEvents.ContainsKey(name))
{
continue;
}
states.Add(new AlarmStateChanged(
_instanceUniqueName,
name,
_alarmStates.GetValueOrDefault(name, AlarmState.Normal),
_alarmPriorities.GetValueOrDefault(name, 0),
_alarmTimestamps.GetValueOrDefault(name, DateTimeOffset.UtcNow)));
}
return states;
}
/// <summary>
/// WP-25: Debug view unsubscribe (SiteRuntime-013).
/// This handler is a deliberate no-op acknowledgement: the Instance Actor holds
/// no per-subscriber state. The real debug-stream subscription lifecycle lives in
/// <see cref="ZB.MOM.WW.ScadaBridge.SiteRuntime.Streaming.SiteStreamManager"/>
/// (Subscribe / Unsubscribe / RemoveSubscriber); the gRPC stream is torn down
/// there when the central side cancels the call. Nothing is removed here.
/// </summary>
private void HandleUnsubscribeDebugView(UnsubscribeDebugViewRequest request)
{
// No subscription state in the Instance Actor — see the XML doc above.
_logger.LogDebug(
"Debug view unsubscribe for {Instance}, correlationId={Id} " +
"(no-op; subscription teardown handled by SiteStreamManager)",
_instanceUniqueName, request.CorrelationId);
}
/// <summary>
/// One-shot debug snapshot — returns current state without registering a subscriber.
/// </summary>
private void HandleDebugSnapshot(DebugSnapshotRequest request)
{
var now = DateTimeOffset.UtcNow;
var attributeValues = _attributes.Select(kvp => new AttributeValueChanged(
_instanceUniqueName,
kvp.Key,
kvp.Key,
kvp.Value,
_attributeQualities.GetValueOrDefault(kvp.Key, "Good"),
_attributeTimestamps.GetValueOrDefault(kvp.Key, now))).ToList();
var snapshot = new DebugViewSnapshot(
_instanceUniqueName,
attributeValues,
BuildAlarmStatesSnapshot(),
DateTimeOffset.UtcNow);
Sender.Tell(snapshot);
}
/// <summary>
/// Publishes attribute change to stream and notifies child Script/Alarm actors.
/// WP-22: Tell for attribute notifications (fire-and-forget, never blocks).
/// </summary>
private void PublishAndNotifyChildren(AttributeValueChanged changed)
{
// WP-23: Publish to site-wide stream
_streamManager?.PublishAttributeValueChanged(changed);
// Notify Script Actors (for value-change and conditional triggers)
foreach (var scriptActor in _scriptActors.Values)
{
scriptActor.Tell(changed);
}
// Notify Alarm Actors (for alarm evaluation)
foreach (var alarmActor in _alarmActors.Values)
{
alarmActor.Tell(changed);
}
}
/// <summary>
/// Applies static overrides loaded from SQLite on top of default values.
/// </summary>
private void HandleOverridesLoaded(LoadOverridesResult result)
{
if (result.Error != null)
{
_logger.LogWarning(
"Failed to load static overrides for {Instance}: {Error}",
_instanceUniqueName, result.Error);
return;
}
foreach (var kvp in result.Overrides)
{
// MV-7: persisted override values are canonical strings — a JSON array
// string for List attributes, a plain string for scalars. Decode List
// overrides to a typed list (matching the config-default load), set
// Bad quality on a malformed stored value, and never crash the actor.
if (_resolvedAttributeByName.TryGetValue(kvp.Key, out var resolved)
&& IsListAttribute(resolved))
{
// NJ-4: decode the stored List override (both old array-of-strings
// and native-typed forms decode) and re-persist the native form if
// the stored value is still in the OLD form. Re-encoding the decoded
// list and comparing to the stored string detects old-form values
// (native → native is byte-identical, so a native value is a no-op).
// The re-persist is fire-and-forget and never throws into the actor.
var decoded = DecodeAttributeValue(resolved, kvp.Value);
_attributes[kvp.Key] = decoded;
if (decoded is null && !string.IsNullOrEmpty(kvp.Value))
{
_attributeQualities[kvp.Key] = "Bad";
}
else if (decoded is not null)
{
var native = AttributeValueCodec.Encode(decoded);
if (native != kvp.Value) // stored value was old-form → normalize on disk
{
var key = kvp.Key;
var logger = _logger;
var instanceName = _instanceUniqueName;
_storage.SetStaticOverrideAsync(instanceName, key, native!)
.ContinueWith(t => logger.LogWarning(t.Exception?.GetBaseException(),
"Failed to normalize static override {Instance}.{Attr} to native JSON",
instanceName, key),
TaskContinuationOptions.OnlyOnFaulted);
}
}
}
else
{
_attributes[kvp.Key] = kvp.Value;
}
}
_logger.LogDebug(
"Loaded {Count} static overrides for {Instance}",
result.Overrides.Count, _instanceUniqueName);
}
/// <summary>
/// Creates child Script Actors and Alarm Actors from the flattened configuration.
/// WP-15: Script Actors spawned per script definition.
/// WP-16: Alarm Actors spawned per alarm definition, as peers to Script Actors.
/// WP-32: Compilation errors reject entire instance deployment (logged but actor still starts).
///
/// SiteRuntime-017: each child is seeded from a private point-in-time snapshot
/// of <c>_attributes</c>, NOT the live dictionary. The snapshot is taken here on
/// the Instance Actor thread, so it is race-free; handing the live mutable
/// <see cref="System.Collections.Generic.Dictionary{TKey,TValue}"/> by reference
/// would let a child constructor enumerate it on the child's mailbox thread while
/// this actor mutates it in <c>HandleAttributeValueChanged</c>.
/// </summary>
private void CreateChildActors()
{
if (_configuration == null) return;
// SiteRuntime-017: snapshot the live attribute dictionary once, on the
// Instance Actor thread, before any child is constructed. Each child
// Props closure captures this immutable copy instead of the mutable
// _attributes field, so no child constructor ever enumerates a
// dictionary this actor is concurrently mutating.
var attributeSnapshot = new Dictionary<string, object?>(_attributes);
// Create Script Actors
foreach (var script in _configuration.Scripts)
{
var compilationResult = _compilationService.Compile(script.CanonicalName, script.Code);
if (!compilationResult.IsSuccess)
{
_logger.LogError(
"Script '{Script}' on instance '{Instance}' failed to compile: {Errors}",
script.CanonicalName, _instanceUniqueName,
string.Join("; ", compilationResult.Errors));
continue;
}
// Compile the trigger expression for Expression-triggered scripts.
var triggerExpression = CompileTriggerExpression(
script.TriggerType, script.TriggerConfiguration, $"script-trigger-{script.CanonicalName}");
var props = Props.Create(() => new ScriptActor(
script.CanonicalName,
_instanceUniqueName,
Self,
compilationResult.CompiledScript,
script,
_sharedScriptLibrary,
_options,
_logger,
triggerExpression,
attributeSnapshot,
_healthCollector,
_serviceProvider));
var actorRef = Context.ActorOf(props, $"script-{script.CanonicalName}");
_scriptActors[script.CanonicalName] = actorRef;
}
// Create Alarm Actors
foreach (var alarm in _configuration.Alarms)
{
Script<object?>? onTriggerScript = null;
// M2.5 (#9): the on-trigger script's per-script execution timeout,
// captured from its ResolvedScript so the AlarmExecutionActor can
// apply perScript ?? global. Null when there is no on-trigger script.
int? onTriggerTimeoutSeconds = null;
// Compile on-trigger script if defined
if (!string.IsNullOrEmpty(alarm.OnTriggerScriptCanonicalName))
{
var triggerScriptDef = _configuration.Scripts
.FirstOrDefault(s => s.CanonicalName == alarm.OnTriggerScriptCanonicalName);
if (triggerScriptDef != null)
{
onTriggerTimeoutSeconds = triggerScriptDef.ExecutionTimeoutSeconds;
var result = _compilationService.Compile(
$"alarm-trigger-{alarm.CanonicalName}", triggerScriptDef.Code);
if (result.IsSuccess)
{
onTriggerScript = result.CompiledScript;
}
else
{
_logger.LogWarning(
"Alarm trigger script for {Alarm} on {Instance} failed to compile",
alarm.CanonicalName, _instanceUniqueName);
}
}
}
// Compile the trigger expression for Expression-triggered alarms.
var triggerExpression = CompileTriggerExpression(
alarm.TriggerType, alarm.TriggerConfiguration, $"alarm-trigger-expr-{alarm.CanonicalName}");
var props = Props.Create(() => new AlarmActor(
alarm.CanonicalName,
_instanceUniqueName,
Self,
alarm,
onTriggerScript,
_sharedScriptLibrary,
_options,
_logger,
triggerExpression,
attributeSnapshot,
_healthCollector,
_serviceProvider,
// M2.5 (#9): per-script timeout for the alarm on-trigger script.
onTriggerTimeoutSeconds));
var actorRef = Context.ActorOf(props, $"alarm-{alarm.CanonicalName}");
_alarmActors[alarm.CanonicalName] = actorRef;
_alarmPriorities[alarm.CanonicalName] = alarm.PriorityLevel;
_alarmTimestamps[alarm.CanonicalName] = DateTimeOffset.UtcNow;
}
// Create Native Alarm Actors — read-only mirror of each bound source's
// native alarms (peers to the computed Alarm Actors). They subscribe through
// the DCL manager, so without one (e.g. in isolated tests) they are skipped.
foreach (var nativeSource in _configuration.NativeAlarmSources)
{
if (_dclManager == null)
{
_logger.LogWarning(
"Instance {Instance}: native alarm source {Source} skipped — no DCL manager available",
_instanceUniqueName, nativeSource.CanonicalName);
continue;
}
var nativeKind = ResolveNativeKind(nativeSource.ConnectionName);
var props = Props.Create(() => new NativeAlarmActor(
nativeSource,
_instanceUniqueName,
Self,
_dclManager,
_storage,
_options,
_logger,
nativeKind,
_serviceProvider));
var actorRef = Context.ActorOf(props, $"native-alarm-{nativeSource.CanonicalName}");
_nativeAlarmActors[nativeSource.CanonicalName] = actorRef;
}
_logger.LogInformation(
"Instance {Instance}: created {Scripts} script actors, {Alarms} alarm actors, {NativeAlarms} native alarm actors",
_instanceUniqueName, _scriptActors.Count, _alarmActors.Count, _nativeAlarmActors.Count);
}
/// <summary>
/// Maps a bound connection's protocol to the native alarm kind stamped on
/// emitted events. MxAccess Gateway connections yield
/// <see cref="AlarmKind.NativeMxAccess"/>; everything else (OPC UA) yields
/// <see cref="AlarmKind.NativeOpcUa"/>.
/// </summary>
private AlarmKind ResolveNativeKind(string connectionName)
{
var protocol = _configuration?.Connections is { } connections
&& connections.TryGetValue(connectionName, out var cfg)
? cfg.Protocol
: null;
return protocol != null && protocol.Contains("Mx", StringComparison.OrdinalIgnoreCase)
? AlarmKind.NativeMxAccess
: AlarmKind.NativeOpcUa;
}
/// <summary>
/// Compiles the boolean trigger expression for an Expression-triggered
/// script or alarm. Returns null for non-Expression triggers, a blank
/// expression, or a compilation failure (logged) — in which case the
/// trigger is inert and the actor still starts.
/// </summary>
private Script<object?>? CompileTriggerExpression(
string? triggerType, string? triggerConfigJson, string compileName)
{
if (!string.Equals(triggerType, "Expression", StringComparison.OrdinalIgnoreCase))
return null;
var expression = TriggerExpressionGlobals.ExtractExpression(triggerConfigJson);
if (expression == null)
return null;
var result = _compilationService.CompileTriggerExpression(compileName, expression);
if (result.IsSuccess)
return result.CompiledScript;
_logger.LogError(
"Trigger expression for {Name} on {Instance} failed to compile: {Errors}",
compileName, _instanceUniqueName, string.Join("; ", result.Errors));
return null;
}
/// <summary>
/// Read-only access to current attribute count (for testing/diagnostics).
/// </summary>
public int AttributeCount => _attributes.Count;
/// <summary>
/// Read-only access to script actor count (for testing/diagnostics).
/// </summary>
public int ScriptActorCount => _scriptActors.Count;
/// <summary>
/// Read-only access to alarm actor count (for testing/diagnostics).
/// </summary>
public int AlarmActorCount => _alarmActors.Count;
/// <summary>
/// Internal message for async override loading result.
/// </summary>
internal record LoadOverridesResult(Dictionary<string, string> Overrides, string? Error);
}