Phase 7 follow-up #243 — CachedTagUpstreamSource + Phase7EngineComposer

Ships the composition kernel that maps Config DB rows (Script / VirtualTag /
ScriptedAlarm) to the runtime definitions VirtualTagEngine + ScriptedAlarmEngine
consume, builds the engine instances, and wires OnEvent → historian-sink routing.

## src/ZB.MOM.WW.OtOpcUa.Server/Phase7/

- CachedTagUpstreamSource — implements both Core.VirtualTags.ITagUpstreamSource and
  Core.ScriptedAlarms.ITagUpstreamSource (identical shape, distinct namespaces) on one
  concrete type so the composer can hand one instance to both engines. Thread-safe
  ConcurrentDictionary value cache with synchronous ReadTag + fire-on-write
  Push(path, snapshot) that fans out to every observer registered via SubscribeTag.
  Unknown-path reads return a BadNodeIdUnknown-quality snapshot (status 0x80340000)
  so scripts branch on quality naturally.
- Phase7EngineComposer.Compose(scripts, virtualTags, scriptedAlarms, upstream,
  alarmStateStore, historianSink, rootScriptLogger, loggerFactory) — single static
  entry point that:
  * Indexes scripts by ScriptId, resolves VirtualTag.ScriptId + ScriptedAlarm.PredicateScriptId
    to full SourceCode
  * Projects DB rows to VirtualTagDefinition + ScriptedAlarmDefinition (mapping
    DataType string → DriverDataType enum, AlarmType string → AlarmKind enum,
    Severity 1..1000 → AlarmSeverity bucket matching the OPC UA Part 9 bands
    that AbCipAlarmProjection + OpcUaClient MapSeverity already use)
  * Constructs VirtualTagEngine + loads definitions (throws InvalidOperationException
    with the list of scripts that failed to compile — aggregated like Streams B+C)
  * Constructs ScriptedAlarmEngine + loads definitions + wires OnEvent →
    IAlarmHistorianSink.EnqueueAsync using ScriptedAlarmEvent.Emission as the event
    kind + Condition.LastAckUser/LastAckComment for audit fields
  * Returns Phase7ComposedSources with Disposables list the caller owns

Empty Phase 7 config returns Phase7ComposedSources.Empty so deployments without
scripts / alarms behave exactly as pre-Phase-7. Non-null sources flow into
OpcUaApplicationHost's virtualReadable / scriptedAlarmReadable plumbing landed by
task #239 — DriverNodeManager then dispatches reads by NodeSourceKind per PR #186.

## Tests — 12/12

CachedTagUpstreamSourceTests (6):
- Unknown-path read returns BadNodeIdUnknown-quality snapshot
- Push-then-Read returns cached value
- Push fans out to subscribers in registration order
- Push to one path doesn't fire another path's observer
- Dispose of subscription handle stops fan-out
- Satisfies both Core.VirtualTags + Core.ScriptedAlarms ITagUpstreamSource interfaces

Phase7EngineComposerTests (6):
- Empty rows → Phase7ComposedSources.Empty (both sources null)
- VirtualTag rows → VirtualReadable non-null + Disposables populated
- Missing script reference throws InvalidOperationException with the missing ScriptId
  in the message
- Disabled VirtualTag row skipped by projection
- TimerIntervalMs → TimeSpan.FromMilliseconds round-trip
- Severity 1..1000 maps to Low/Medium/High/Critical at 250/500/750 boundaries
  (matches AbCipAlarmProjection + OpcUaClient.MapSeverity banding)

## Scope — what this PR does NOT do

The composition kernel is the tricky part; the remaining wiring is three narrower
follow-ups that each build on this PR:

- task #244 — driver-bridge feed that populates CachedTagUpstreamSource from live
  driver subscriptions. Without this, ctx.GetTag returns BadNodeIdUnknown even when
  the driver has a fresh value.
- task #245 — ScriptedAlarmReadable adapter exposing each alarm's current Active
  state as IReadable. Phase7EngineComposer.Compose currently returns
  ScriptedAlarmReadable=null so reads on Source=ScriptedAlarm variables return
  BadNotFound per the ADR-002 "misconfiguration not silent fallback" signal.
- task #246 — Program.cs call to Phase7EngineComposer.Compose with config rows
  loaded from the sealed-cache DB read, plus SqliteStoreAndForwardSink lifecycle
  wiring at %ProgramData%/OtOpcUa/alarm-historian-queue.db with the Galaxy.Host
  IPC writer from Stream D.

Task #240 (live OPC UA E2E smoke) depends on all three follow-ups landing.
This commit is contained in:
Joseph Doherty
2026-04-20 21:23:31 -04:00
parent c7f0855427
commit f64a8049d8
5 changed files with 522 additions and 0 deletions

View File

@@ -0,0 +1,208 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
/// <summary>
/// Phase 7 follow-up (task #243) — maps the generation's <see cref="Script"/> /
/// <see cref="VirtualTag"/> / <see cref="ScriptedAlarm"/> rows into the runtime
/// definitions <see cref="VirtualTagEngine"/> + <see cref="ScriptedAlarmEngine"/>
/// expect, builds the engine instances, and returns the <see cref="IReadable"/>
/// sources plus an <see cref="IAlarmSource"/> for the <c>DriverNodeManager</c>
/// wiring added by task #239.
/// </summary>
/// <remarks>
/// <para>
/// Empty Phase 7 config (no virtual tags + no scripted alarms) is a valid state:
/// <see cref="Compose"/> returns a <see cref="Phase7ComposedSources"/> with null
/// sources so Program.cs can pass them through to <c>OpcUaApplicationHost</c>
/// unchanged — deployments without scripts behave exactly as they did before
/// Phase 7.
/// </para>
/// <para>
/// The caller owns the returned <see cref="Phase7ComposedSources.Disposables"/>
/// and must dispose them on shutdown. Engine cascades + timer ticks run off
/// background threads until then.
/// </para>
/// </remarks>
public static class Phase7EngineComposer
{
public static Phase7ComposedSources Compose(
IReadOnlyList<Script> scripts,
IReadOnlyList<VirtualTag> virtualTags,
IReadOnlyList<ScriptedAlarm> scriptedAlarms,
CachedTagUpstreamSource upstream,
IAlarmStateStore alarmStateStore,
IAlarmHistorianSink historianSink,
Serilog.ILogger rootScriptLogger,
ILoggerFactory loggerFactory)
{
ArgumentNullException.ThrowIfNull(scripts);
ArgumentNullException.ThrowIfNull(virtualTags);
ArgumentNullException.ThrowIfNull(scriptedAlarms);
ArgumentNullException.ThrowIfNull(upstream);
ArgumentNullException.ThrowIfNull(alarmStateStore);
ArgumentNullException.ThrowIfNull(historianSink);
ArgumentNullException.ThrowIfNull(rootScriptLogger);
ArgumentNullException.ThrowIfNull(loggerFactory);
if (virtualTags.Count == 0 && scriptedAlarms.Count == 0)
return Phase7ComposedSources.Empty;
var scriptById = scripts
.Where(s => s.Enabled())
.ToDictionary(s => s.ScriptId, StringComparer.Ordinal);
var scriptLoggerFactory = new ScriptLoggerFactory(rootScriptLogger);
var disposables = new List<IDisposable>();
// Engines take Serilog.ILogger — each engine gets its own so rolling-file emissions
// stay keyed to the right source in the scripts-*.log.
VirtualTagSource? vtSource = null;
if (virtualTags.Count > 0)
{
var vtDefs = ProjectVirtualTags(virtualTags, scriptById).ToList();
var vtEngine = new VirtualTagEngine(upstream, scriptLoggerFactory, rootScriptLogger);
vtEngine.Load(vtDefs);
vtSource = new VirtualTagSource(vtEngine);
disposables.Add(vtEngine);
}
ScriptedAlarmSource? alarmSource = null;
if (scriptedAlarms.Count > 0)
{
var alarmDefs = ProjectScriptedAlarms(scriptedAlarms, scriptById).ToList();
var alarmEngine = new ScriptedAlarmEngine(upstream, alarmStateStore, scriptLoggerFactory, rootScriptLogger);
// Wire alarm emissions to the historian sink (Stream D). Fire-and-forget because
// the sink's EnqueueAsync is already non-blocking from the producer's view.
var engineLogger = loggerFactory.CreateLogger("Phase7HistorianRouter");
alarmEngine.OnEvent += (_, e) => _ = RouteToHistorianAsync(e, historianSink, engineLogger);
alarmEngine.LoadAsync(alarmDefs, CancellationToken.None).GetAwaiter().GetResult();
alarmSource = new ScriptedAlarmSource(alarmEngine);
disposables.Add(alarmEngine);
disposables.Add(alarmSource);
}
// ScriptedAlarmSource is an IAlarmSource, not an IReadable — scripted-alarm
// variable-read dispatch (task #245) needs a dedicated engine-state adapter. Until
// that ships, reads against Source=ScriptedAlarm nodes return BadNotFound per the
// DriverNodeManager null-check path (the ADR-002 "misconfiguration not silent
// fallback" signal). The alarm event stream still fires via IAlarmSource.
return new Phase7ComposedSources(vtSource, ScriptedAlarmReadable: null, disposables);
}
internal static IEnumerable<VirtualTagDefinition> ProjectVirtualTags(
IReadOnlyList<VirtualTag> rows, IReadOnlyDictionary<string, Script> scriptById)
{
foreach (var row in rows)
{
if (!row.Enabled) continue;
if (!scriptById.TryGetValue(row.ScriptId, out var script))
throw new InvalidOperationException(
$"VirtualTag '{row.VirtualTagId}' references unknown / disabled Script '{row.ScriptId}' in this generation");
yield return new VirtualTagDefinition(
Path: row.VirtualTagId,
DataType: ParseDataType(row.DataType),
ScriptSource: script.SourceCode,
ChangeTriggered: row.ChangeTriggered,
TimerInterval: row.TimerIntervalMs.HasValue
? TimeSpan.FromMilliseconds(row.TimerIntervalMs.Value)
: null,
Historize: row.Historize);
}
}
internal static IEnumerable<ScriptedAlarmDefinition> ProjectScriptedAlarms(
IReadOnlyList<ScriptedAlarm> rows, IReadOnlyDictionary<string, Script> scriptById)
{
foreach (var row in rows)
{
if (!row.Enabled) continue;
if (!scriptById.TryGetValue(row.PredicateScriptId, out var script))
throw new InvalidOperationException(
$"ScriptedAlarm '{row.ScriptedAlarmId}' references unknown / disabled predicate Script '{row.PredicateScriptId}'");
yield return new ScriptedAlarmDefinition(
AlarmId: row.ScriptedAlarmId,
EquipmentPath: row.EquipmentId,
AlarmName: row.Name,
Kind: ParseAlarmKind(row.AlarmType),
Severity: MapSeverity(row.Severity),
MessageTemplate: row.MessageTemplate,
PredicateScriptSource: script.SourceCode,
HistorizeToAveva: row.HistorizeToAveva,
Retain: row.Retain);
}
}
private static DriverDataType ParseDataType(string raw) =>
Enum.TryParse<DriverDataType>(raw, ignoreCase: true, out var parsed) ? parsed : DriverDataType.String;
private static AlarmKind ParseAlarmKind(string raw) => raw switch
{
"AlarmCondition" => AlarmKind.AlarmCondition,
"LimitAlarm" => AlarmKind.LimitAlarm,
"DiscreteAlarm" => AlarmKind.DiscreteAlarm,
"OffNormalAlarm" => AlarmKind.OffNormalAlarm,
_ => throw new InvalidOperationException($"Unknown AlarmType '{raw}' — DB check constraint should have caught this"),
};
// OPC UA Part 9 severity bands (1..1000) → AlarmSeverity enum. Matches the same
// banding the AB CIP ALMA projection + OpcUaClient MapSeverity use.
private static AlarmSeverity MapSeverity(int s) => s switch
{
<= 250 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 750 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
private static async Task RouteToHistorianAsync(
ScriptedAlarmEvent e, IAlarmHistorianSink sink, Microsoft.Extensions.Logging.ILogger log)
{
try
{
var historianEvent = new AlarmHistorianEvent(
AlarmId: e.AlarmId,
EquipmentPath: e.EquipmentPath,
AlarmName: e.AlarmName,
AlarmTypeName: e.Kind.ToString(),
Severity: e.Severity,
EventKind: e.Emission.ToString(),
Message: e.Message,
User: e.Condition.LastAckUser ?? "system",
Comment: e.Condition.LastAckComment,
TimestampUtc: e.TimestampUtc);
await sink.EnqueueAsync(historianEvent, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
{
log.LogWarning(ex, "Historian enqueue failed for alarm {AlarmId}/{Emission}", e.AlarmId, e.Emission);
}
}
}
/// <summary>What <see cref="Phase7EngineComposer.Compose"/> returns.</summary>
/// <param name="VirtualReadable">Non-null when virtual tags were composed; pass to <c>OpcUaApplicationHost.virtualReadable</c>.</param>
/// <param name="ScriptedAlarmReadable">Non-null when scripted alarms were composed; pass to <c>OpcUaApplicationHost.scriptedAlarmReadable</c>.</param>
/// <param name="Disposables">Engine + source instances the caller owns. Dispose on shutdown.</param>
public sealed record Phase7ComposedSources(
IReadable? VirtualReadable,
IReadable? ScriptedAlarmReadable,
IReadOnlyList<IDisposable> Disposables)
{
public static readonly Phase7ComposedSources Empty =
new(null, null, Array.Empty<IDisposable>());
}
internal static class ScriptEnabledExtensions
{
// Script has no explicit Enabled column; every row in the generation is a live script.
public static bool Enabled(this Script _) => true;
}