Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmActor.cs
Joseph Doherty 52997ee164
Some checks failed
v2-ci / build (push) Failing after 38s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (push) Has been skipped
feat(observability): F13d Prometheus + OpenTelemetry instrumentation
OtOpcUaTelemetry (Commons/Observability) centralizes the project's Meter
+ ActivitySource so all instrumentation points emit through a single
named surface. Counters cover the hot paths:

  otopcua.deploy.applied               (outcome=ack|reject)
  otopcua.deploy.apply.duration        (s, histogram)
  otopcua.driver.lifecycle             (event=spawn|spawn_stub|stop|fault)
  otopcua.virtualtag.eval              (outcome=ok|fail|skip)
  otopcua.scriptedalarm.transition     (state=activated|acknowledged|cleared)
  otopcua.opcua.sink.write             (kind=value|alarm|rebuild)
  otopcua.redundancy.service_level_change (level=byte)

Plus two ActivitySource spans:

  otopcua.deploy.apply                 wraps DriverHostActor.ApplyAndAck
  otopcua.opcua.address_space_rebuild  wraps OpcUaPublishActor.HandleRebuild

Instruments are no-op until a listener attaches, so tests + dev hosts
pay nothing for unread telemetry.

Host Program.cs gains AddOtOpcUaObservability() (binds the OtOpcUa Meter
+ ActivitySource to OpenTelemetry, attaches a Prometheus exporter) and
MapOtOpcUaMetrics() (mounts /metrics scrape endpoint). Driver-side
internals + ASP.NET request metrics deliberately stay off — the scrape
payload is scoped to OtOpcUa signals only.

Tests use MeterListener + ActivityListener to verify
VirtualTagActor.eval, OpcUaPublishActor.AttributeValueUpdate, and
RebuildAddressSpace actually emit on the central instruments. Runtime
suite is 72 / 72 green (+3).

Closes #105. Path A (F13b/c/d) complete; next batch options: #85 UNS
folder hierarchy in SDK, or F8b/F9b production engine bindings.
2026-05-26 10:29:40 -04:00

241 lines
9.6 KiB
C#

using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
public enum ScriptedAlarmActorState { Inactive, Active, Acknowledged }
/// <summary>
/// One scripted alarm. Receives dependency value updates, runs the predicate via an
/// injected <see cref="IScriptedAlarmEvaluator"/>, and on transitions publishes both
/// an <see cref="AlarmTransitionEvent"/> on the cluster <c>alerts</c> DPS topic and a
/// <see cref="ScriptLogEntry"/> on <c>script-logs</c>. Manual <see cref="AcknowledgeAlarm"/>
/// + <see cref="ConditionCleared"/> still flow through the same state machine so the
/// legacy callers keep working.
/// </summary>
public sealed class ScriptedAlarmActor : ReceiveActor
{
public const string AlertsTopic = "alerts";
public const string ScriptLogsTopic = "script-logs";
public sealed record DependencyValueChanged(string TagId, object? Value, DateTime TimestampUtc);
public sealed record ConditionMet(string Reason);
public sealed record AcknowledgeAlarm(string Actor);
public sealed record ConditionCleared;
public sealed record StateChanged(string AlarmId, ScriptedAlarmActorState State, DateTime AtUtc);
public sealed record AlarmConfig(
string AlarmId,
string AlarmName,
string EquipmentPath,
int Severity,
string? Predicate);
private readonly AlarmConfig _config;
private readonly IScriptedAlarmEvaluator _evaluator;
private readonly IAlarmActorStateStore _stateStore;
private readonly Func<DPSPublisher>? _publisherFactory;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Dictionary<string, object?> _dependencies = new(StringComparer.Ordinal);
private ScriptedAlarmActorState _state = ScriptedAlarmActorState.Inactive;
private string? _lastAckUser;
public sealed record StateRestored(ScriptedAlarmActorState State, string? LastAckUser);
public static Props Props(
AlarmConfig config,
IScriptedAlarmEvaluator? evaluator = null,
Func<DPSPublisher>? publisherFactory = null,
IAlarmActorStateStore? stateStore = null) =>
Akka.Actor.Props.Create(() => new ScriptedAlarmActor(
config,
evaluator ?? NullScriptedAlarmEvaluator.Instance,
publisherFactory,
stateStore ?? NullAlarmActorStateStore.Instance));
/// <summary>Legacy single-arg ctor kept for callers that only care about the state machine
/// (no engine evaluation, no DPS fan-out, no persistence). Equivalent to <c>Props(new AlarmConfig(...))</c>.</summary>
public static Props Props(string alarmId) =>
Props(new AlarmConfig(alarmId, alarmId, EquipmentPath: "", Severity: 500, Predicate: null));
public ScriptedAlarmActor(
AlarmConfig config,
IScriptedAlarmEvaluator evaluator,
Func<DPSPublisher>? publisherFactory,
IAlarmActorStateStore stateStore)
{
_config = config;
_evaluator = evaluator;
_publisherFactory = publisherFactory;
_stateStore = stateStore;
Receive<DependencyValueChanged>(OnDependencyChanged);
Receive<ConditionMet>(_ => { if (_state == ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Active, user: "system"); });
Receive<AcknowledgeAlarm>(msg => { if (_state == ScriptedAlarmActorState.Active) Transition(ScriptedAlarmActorState.Acknowledged, user: msg.Actor); });
Receive<ConditionCleared>(_ => { if (_state != ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Inactive, user: "system"); });
Receive<StateRestored>(OnStateRestored);
}
protected override void PreStart()
{
// Load persisted state — when the store has a row, restore in-memory state before the
// first dependency-change arrives. Async I/O is piped back as StateRestored so we don't
// block the message-loop thread; until it arrives the actor stays at the default Inactive.
var self = Self;
_ = Task.Run(async () =>
{
try
{
var snapshot = await _stateStore.LoadAsync(_config.AlarmId, CancellationToken.None)
.ConfigureAwait(false);
if (snapshot is null) return;
if (!Enum.TryParse<ScriptedAlarmActorState>(snapshot.State, ignoreCase: true, out var parsed))
return;
self.Tell(new StateRestored(parsed, snapshot.LastAckUser));
}
catch (Exception ex)
{
_log.Warning(ex, "ScriptedAlarm {Id}: state-store load failed; booting Inactive",
_config.AlarmId);
}
});
}
private void OnStateRestored(StateRestored msg)
{
// Active is re-derived from the evaluator at the next DependencyValueChanged — we still
// restore Active here so operators don't lose the in-flight transition if a restart races
// ahead of the next eval. The first evaluator tick will correct it if the condition cleared.
_state = msg.State;
_lastAckUser = msg.LastAckUser;
_log.Info("ScriptedAlarm {Id}: restored persisted state {State} (lastAck={User})",
_config.AlarmId, _state, _lastAckUser ?? "(none)");
}
private void OnDependencyChanged(DependencyValueChanged msg)
{
_dependencies[msg.TagId] = msg.Value;
if (string.IsNullOrEmpty(_config.Predicate)) return;
ScriptedAlarmEvalResult result;
try
{
result = _evaluator.Evaluate(_config.AlarmId, _config.Predicate, _dependencies);
}
catch (Exception ex)
{
_log.Warning(ex, "ScriptedAlarm {Id}: evaluator threw", _config.AlarmId);
PublishLog("Error", $"evaluator threw: {ex.Message}");
return;
}
if (!result.Success)
{
PublishLog("Warning", result.Reason ?? "evaluator failure");
return;
}
// Active condition wins regardless of ack state — re-firing is suppressed because
// _state already == Active. Cleared moves Active OR Acknowledged → Inactive.
if (result.Active && _state == ScriptedAlarmActorState.Inactive)
{
Transition(ScriptedAlarmActorState.Active, user: "system");
}
else if (!result.Active && _state != ScriptedAlarmActorState.Inactive)
{
Transition(ScriptedAlarmActorState.Inactive, user: "system");
}
}
private void Transition(ScriptedAlarmActorState next, string user)
{
var prev = _state;
_state = next;
if (next == ScriptedAlarmActorState.Acknowledged) _lastAckUser = user;
_log.Info("ScriptedAlarm {Id}: {From} → {To}", _config.AlarmId, prev, next);
var nowUtc = DateTime.UtcNow;
Context.Parent.Tell(new StateChanged(_config.AlarmId, next, nowUtc));
PersistStateAsync(nowUtc);
var kind = next switch
{
ScriptedAlarmActorState.Active => "Activated",
ScriptedAlarmActorState.Acknowledged => "Acknowledged",
ScriptedAlarmActorState.Inactive => "Cleared",
_ => next.ToString(),
};
OtOpcUaTelemetry.ScriptedAlarmTransition.Add(1,
new KeyValuePair<string, object?>("state", kind.ToLowerInvariant()));
var evt = new AlarmTransitionEvent(
AlarmId: _config.AlarmId,
EquipmentPath: _config.EquipmentPath,
AlarmName: _config.AlarmName,
TransitionKind: kind,
Severity: _config.Severity,
Message: $"{_config.AlarmName} {kind}",
User: user,
TimestampUtc: nowUtc);
PublishOrFallback(AlertsTopic, evt);
PublishLog("Information", $"{_config.AlarmName} {kind} (by {user})");
}
private void PublishLog(string level, string message)
{
var entry = new ScriptLogEntry(
ScriptId: _config.AlarmId,
Level: level,
Message: message,
TimestampUtc: DateTime.UtcNow,
VirtualTagId: null,
AlarmId: _config.AlarmId,
EquipmentId: null);
PublishOrFallback(ScriptLogsTopic, entry);
}
private void PublishOrFallback(string topic, object payload)
{
if (_publisherFactory is not null)
{
_publisherFactory().Publish(topic, payload);
return;
}
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(topic, payload));
}
private void PersistStateAsync(DateTime nowUtc)
{
var snapshot = new AlarmActorStateSnapshot(
AlarmId: _config.AlarmId,
State: _state.ToString(),
LastTransitionUtc: nowUtc,
LastAckUser: _lastAckUser);
// Fire-and-forget. Save failures get logged but don't block the message loop —
// the worst case is a restart loses one transition, which then re-derives from
// the evaluator's next tick anyway.
_ = Task.Run(async () =>
{
try
{
await _stateStore.SaveAsync(snapshot, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
{
_log.Warning(ex, "ScriptedAlarm {Id}: state-store save failed", _config.AlarmId);
}
});
}
}