fix: wire DCL tag value delivery, alarm evaluation, and snapshot timestamps
Three runtime bugs fixed: - DataConnectionActor: TagValueReceived/TagResolutionSucceeded/Failed not handled in any Become state — OPC UA values went to dead letters. Added initial read after subscribe to seed current values immediately. - AlarmActor: ParseEvalConfig expected "attributeName"/"matchValue"/"min"/ "max" keys but seed data uses "attribute"/"value"/"high"/"low". Added support for both conventions and !=prefix for not-equal matching. - InstanceActor: snapshots reported all alarms (including unevaluated) with correct priorities and source timestamps instead of current UTC. Removed bogus Vibration template attribute that shadowed Speed's tag mapping.
This commit is contained in:
@@ -153,6 +153,15 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
case WriteTagRequest req:
|
||||
HandleWrite(req);
|
||||
break;
|
||||
case TagValueReceived tvr:
|
||||
HandleTagValueReceived(tvr);
|
||||
break;
|
||||
case TagResolutionSucceeded trs:
|
||||
HandleTagResolutionSucceeded(trs);
|
||||
break;
|
||||
case TagResolutionFailed trf:
|
||||
HandleTagResolutionFailed(trf);
|
||||
break;
|
||||
case AdapterDisconnected:
|
||||
HandleDisconnect();
|
||||
break;
|
||||
@@ -201,6 +210,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
// Allow unsubscribe even during reconnect (for cleanup on instance stop)
|
||||
HandleUnsubscribe(req);
|
||||
break;
|
||||
case TagValueReceived:
|
||||
// Ignore — stale callback from previous connection
|
||||
break;
|
||||
case TagResolutionSucceeded:
|
||||
case TagResolutionFailed:
|
||||
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
|
||||
break;
|
||||
case GetHealthReport:
|
||||
ReplyWithHealthReport();
|
||||
break;
|
||||
@@ -313,6 +329,25 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
}
|
||||
}
|
||||
|
||||
// Initial read — seed current values for all resolved tags so the Instance Actor
|
||||
// doesn't stay Uncertain until the next OPC UA data change notification
|
||||
foreach (var tagPath in instanceTags)
|
||||
{
|
||||
if (_unresolvedTags.Contains(tagPath)) continue;
|
||||
try
|
||||
{
|
||||
var readResult = await _adapter.ReadAsync(tagPath);
|
||||
if (readResult.Success && readResult.Value != null)
|
||||
{
|
||||
self.Tell(new TagValueReceived(tagPath, readResult.Value));
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Best-effort — subscription will deliver subsequent changes
|
||||
}
|
||||
}
|
||||
|
||||
return new SubscribeTagsResponse(
|
||||
request.CorrelationId, request.InstanceUniqueName, true, null, DateTimeOffset.UtcNow);
|
||||
}).PipeTo(sender);
|
||||
@@ -459,6 +494,28 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
|
||||
// ── Internal message handlers for piped async results ──
|
||||
|
||||
private void HandleTagResolutionSucceeded(TagResolutionSucceeded msg)
|
||||
{
|
||||
if (_unresolvedTags.Remove(msg.TagPath))
|
||||
{
|
||||
_subscriptionIds[msg.TagPath] = msg.SubscriptionId;
|
||||
_resolvedTags++;
|
||||
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
|
||||
_log.Info("[{0}] Tag resolved: {1}", _connectionName, msg.TagPath);
|
||||
}
|
||||
|
||||
if (_unresolvedTags.Count == 0)
|
||||
{
|
||||
Timers.Cancel("tag-resolution-retry");
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleTagResolutionFailed(TagResolutionFailed msg)
|
||||
{
|
||||
_log.Debug("[{0}] Tag resolution still failing for {1}: {2}",
|
||||
_connectionName, msg.TagPath, msg.Error);
|
||||
}
|
||||
|
||||
private void HandleTagValueReceived(TagValueReceived msg)
|
||||
{
|
||||
// Fan out to all subscribed instances
|
||||
|
||||
@@ -184,8 +184,18 @@ public class AlarmActor : ReceiveActor
|
||||
private bool EvaluateValueMatch(object? value)
|
||||
{
|
||||
if (_evalConfig is not ValueMatchEvalConfig config) return false;
|
||||
if (value == null) return config.MatchValue == null;
|
||||
return string.Equals(value.ToString(), config.MatchValue, StringComparison.Ordinal);
|
||||
if (config.MatchValue == null) return value == null;
|
||||
|
||||
var valueStr = value?.ToString() ?? "";
|
||||
|
||||
// Support "!=X" for not-equal matching
|
||||
if (config.MatchValue.StartsWith("!="))
|
||||
{
|
||||
var expected = config.MatchValue[2..];
|
||||
return !string.Equals(valueStr, expected, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
return string.Equals(valueStr, config.MatchValue, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
private bool EvaluateRangeViolation(object? value)
|
||||
@@ -268,24 +278,36 @@ public class AlarmActor : ReceiveActor
|
||||
try
|
||||
{
|
||||
var doc = JsonDocument.Parse(triggerConfigJson);
|
||||
var attr = doc.RootElement.TryGetProperty("attributeName", out var attrEl)
|
||||
? attrEl.GetString() ?? "" : "";
|
||||
var root = doc.RootElement;
|
||||
|
||||
// Support both "attributeName" and "attribute" keys
|
||||
var attr = root.TryGetProperty("attributeName", out var attrEl)
|
||||
? attrEl.GetString() ?? ""
|
||||
: root.TryGetProperty("attribute", out var attrEl2)
|
||||
? attrEl2.GetString() ?? ""
|
||||
: "";
|
||||
|
||||
return _triggerType switch
|
||||
{
|
||||
AlarmTriggerType.ValueMatch => new ValueMatchEvalConfig(
|
||||
attr,
|
||||
doc.RootElement.TryGetProperty("matchValue", out var mv) ? mv.GetString() : null),
|
||||
root.TryGetProperty("matchValue", out var mv) ? mv.GetString()
|
||||
: root.TryGetProperty("value", out var mv2) ? mv2.GetString()
|
||||
: null),
|
||||
|
||||
AlarmTriggerType.RangeViolation => new RangeViolationEvalConfig(
|
||||
attr,
|
||||
doc.RootElement.TryGetProperty("min", out var minEl) ? minEl.GetDouble() : double.MinValue,
|
||||
doc.RootElement.TryGetProperty("max", out var maxEl) ? maxEl.GetDouble() : double.MaxValue),
|
||||
root.TryGetProperty("min", out var minEl) ? minEl.GetDouble()
|
||||
: root.TryGetProperty("low", out var lowEl) ? lowEl.GetDouble()
|
||||
: double.MinValue,
|
||||
root.TryGetProperty("max", out var maxEl) ? maxEl.GetDouble()
|
||||
: root.TryGetProperty("high", out var highEl) ? highEl.GetDouble()
|
||||
: double.MaxValue),
|
||||
|
||||
AlarmTriggerType.RateOfChange => new RateOfChangeEvalConfig(
|
||||
attr,
|
||||
doc.RootElement.TryGetProperty("thresholdPerSecond", out var tps) ? tps.GetDouble() : 10.0,
|
||||
doc.RootElement.TryGetProperty("windowSeconds", out var ws)
|
||||
root.TryGetProperty("thresholdPerSecond", out var tps) ? tps.GetDouble() : 10.0,
|
||||
root.TryGetProperty("windowSeconds", out var ws)
|
||||
? TimeSpan.FromSeconds(ws.GetDouble())
|
||||
: TimeSpan.FromSeconds(1)),
|
||||
|
||||
|
||||
@@ -42,7 +42,10 @@ public class InstanceActor : ReceiveActor
|
||||
private readonly IServiceProvider? _serviceProvider;
|
||||
private readonly Dictionary<string, object?> _attributes = new();
|
||||
private readonly Dictionary<string, string> _attributeQualities = new();
|
||||
private readonly Dictionary<string, DateTimeOffset> _attributeTimestamps = new();
|
||||
private readonly Dictionary<string, AlarmState> _alarmStates = new();
|
||||
private readonly Dictionary<string, DateTimeOffset> _alarmTimestamps = new();
|
||||
private readonly Dictionary<string, int> _alarmPriorities = new();
|
||||
private readonly Dictionary<string, IActorRef> _scriptActors = new();
|
||||
private readonly Dictionary<string, IActorRef> _alarmActors = new();
|
||||
private FlattenedConfiguration? _configuration;
|
||||
@@ -124,6 +127,7 @@ public class InstanceActor : ReceiveActor
|
||||
|
||||
// Handle tag value updates from DCL — convert to AttributeValueChanged
|
||||
Receive<TagValueUpdate>(HandleTagValueUpdate);
|
||||
Receive<SubscribeTagsResponse>(_ => { }); // Ack from DCL subscribe — no action needed
|
||||
Receive<ConnectionQualityChanged>(HandleConnectionQualityChanged);
|
||||
|
||||
// WP-16: Handle alarm state changes from Alarm Actors (Tell pattern)
|
||||
@@ -267,6 +271,7 @@ public class InstanceActor : ReceiveActor
|
||||
// WP-24: State mutation serialized through this actor
|
||||
_attributes[changed.AttributeName] = changed.Value;
|
||||
_attributeQualities[changed.AttributeName] = changed.Quality;
|
||||
_attributeTimestamps[changed.AttributeName] = changed.Timestamp;
|
||||
|
||||
PublishAndNotifyChildren(changed);
|
||||
}
|
||||
@@ -338,6 +343,7 @@ public class InstanceActor : ReceiveActor
|
||||
private void HandleAlarmStateChanged(AlarmStateChanged changed)
|
||||
{
|
||||
_alarmStates[changed.AlarmName] = changed.State;
|
||||
_alarmTimestamps[changed.AlarmName] = changed.Timestamp;
|
||||
|
||||
// WP-23: Publish to site-wide stream
|
||||
_streamManager?.PublishAlarmStateChanged(changed);
|
||||
@@ -358,20 +364,21 @@ public class InstanceActor : ReceiveActor
|
||||
_debugSubscribers[subscriptionId] = Sender;
|
||||
|
||||
// Build snapshot from current state
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var attributeValues = _attributes.Select(kvp => new AttributeValueChanged(
|
||||
_instanceUniqueName,
|
||||
kvp.Key,
|
||||
kvp.Key,
|
||||
kvp.Value,
|
||||
_attributeQualities.GetValueOrDefault(kvp.Key, "Good"),
|
||||
DateTimeOffset.UtcNow)).ToList();
|
||||
_attributeTimestamps.GetValueOrDefault(kvp.Key, now))).ToList();
|
||||
|
||||
var alarmStates = _alarmStates.Select(kvp => new AlarmStateChanged(
|
||||
var alarmStates = _alarmActors.Keys.Select(name => new AlarmStateChanged(
|
||||
_instanceUniqueName,
|
||||
kvp.Key,
|
||||
kvp.Value,
|
||||
0, // Priority not tracked in _alarmStates; would need separate tracking
|
||||
DateTimeOffset.UtcNow)).ToList();
|
||||
name,
|
||||
_alarmStates.GetValueOrDefault(name, AlarmState.Normal),
|
||||
_alarmPriorities.GetValueOrDefault(name, 0),
|
||||
_alarmTimestamps[name])).ToList();
|
||||
|
||||
var snapshot = new DebugViewSnapshot(
|
||||
_instanceUniqueName,
|
||||
@@ -407,20 +414,21 @@ public class InstanceActor : ReceiveActor
|
||||
/// </summary>
|
||||
private void HandleDebugSnapshot(DebugSnapshotRequest request)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var attributeValues = _attributes.Select(kvp => new AttributeValueChanged(
|
||||
_instanceUniqueName,
|
||||
kvp.Key,
|
||||
kvp.Key,
|
||||
kvp.Value,
|
||||
_attributeQualities.GetValueOrDefault(kvp.Key, "Good"),
|
||||
DateTimeOffset.UtcNow)).ToList();
|
||||
_attributeTimestamps.GetValueOrDefault(kvp.Key, now))).ToList();
|
||||
|
||||
var alarmStates = _alarmStates.Select(kvp => new AlarmStateChanged(
|
||||
var alarmStates = _alarmActors.Keys.Select(name => new AlarmStateChanged(
|
||||
_instanceUniqueName,
|
||||
kvp.Key,
|
||||
kvp.Value,
|
||||
0,
|
||||
DateTimeOffset.UtcNow)).ToList();
|
||||
name,
|
||||
_alarmStates.GetValueOrDefault(name, AlarmState.Normal),
|
||||
_alarmPriorities.GetValueOrDefault(name, 0),
|
||||
_alarmTimestamps[name])).ToList();
|
||||
|
||||
var snapshot = new DebugViewSnapshot(
|
||||
_instanceUniqueName,
|
||||
@@ -562,6 +570,8 @@ public class InstanceActor : ReceiveActor
|
||||
|
||||
var actorRef = Context.ActorOf(props, $"alarm-{alarm.CanonicalName}");
|
||||
_alarmActors[alarm.CanonicalName] = actorRef;
|
||||
_alarmPriorities[alarm.CanonicalName] = alarm.PriorityLevel;
|
||||
_alarmTimestamps[alarm.CanonicalName] = DateTimeOffset.UtcNow;
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
|
||||
Reference in New Issue
Block a user