Compare commits
6 Commits
phase-7-fu
...
phase-7-fu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7352db28a6 | ||
| 8388ddc033 | |||
|
|
e11350cf80 | ||
| a5bd60768d | |||
|
|
d6a8bb1064 | ||
| f3053580a0 |
@@ -34,9 +34,11 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
// Phase 7 Stream G follow-up (task #239). When composed with the VirtualTagEngine +
|
||||
// ScriptedAlarmEngine sources these route node reads to the engines instead of the
|
||||
// driver. Null = Phase 7 engines not enabled for this deployment (identical to pre-
|
||||
// Phase-7 behaviour).
|
||||
private readonly ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? _virtualReadable;
|
||||
private readonly ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? _scriptedAlarmReadable;
|
||||
// Phase-7 behaviour). Late-bindable via SetPhase7Sources because the engines need
|
||||
// the bootstrapped generation id before they can compose, which is only known after
|
||||
// the host has been DI-constructed (task #246).
|
||||
private ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? _virtualReadable;
|
||||
private ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? _scriptedAlarmReadable;
|
||||
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly ILogger<OpcUaApplicationHost> _logger;
|
||||
@@ -75,6 +77,24 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
|
||||
public OtOpcUaServer? Server => _server;
|
||||
|
||||
/// <summary>
|
||||
/// Late-bind the Phase 7 engine-backed <c>IReadable</c> sources. Must be
|
||||
/// called BEFORE <see cref="StartAsync"/> — once the OPC UA server starts, the
|
||||
/// <see cref="OtOpcUaServer"/> ctor captures the field values + per-node
|
||||
/// <see cref="DriverNodeManager"/>s are constructed. Calling this after start has
|
||||
/// no effect on already-materialized node managers.
|
||||
/// </summary>
|
||||
public void SetPhase7Sources(
|
||||
ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? virtualReadable,
|
||||
ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? scriptedAlarmReadable)
|
||||
{
|
||||
if (_server is not null)
|
||||
throw new InvalidOperationException(
|
||||
"Phase 7 sources must be set before OpcUaApplicationHost.StartAsync; the OtOpcUaServer + DriverNodeManagers have already captured the previous values.");
|
||||
_virtualReadable = virtualReadable;
|
||||
_scriptedAlarmReadable = scriptedAlarmReadable;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds the <see cref="ApplicationConfiguration"/>, validates/creates the application
|
||||
/// certificate, constructs + starts the <see cref="OtOpcUaServer"/>, then drives
|
||||
|
||||
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server;
|
||||
|
||||
@@ -17,6 +18,7 @@ public sealed class OpcUaServerService(
|
||||
DriverHost driverHost,
|
||||
OpcUaApplicationHost applicationHost,
|
||||
DriverEquipmentContentRegistry equipmentContentRegistry,
|
||||
Phase7Composer phase7Composer,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<OpcUaServerService> logger) : BackgroundService
|
||||
{
|
||||
@@ -34,12 +36,19 @@ public sealed class OpcUaServerService(
|
||||
// Skipped when no generation is Published yet — the fleet boots into a UNS-less
|
||||
// address space until the first publish, then the registry fills on next restart.
|
||||
if (result.GenerationId is { } gen)
|
||||
{
|
||||
await PopulateEquipmentContentAsync(gen, stoppingToken);
|
||||
|
||||
// PR 17: stand up the OPC UA server + drive discovery per registered driver. Driver
|
||||
// registration itself (RegisterAsync on DriverHost) happens during an earlier DI
|
||||
// extension once the central config DB query + per-driver factory land; for now the
|
||||
// server comes up with whatever drivers are in DriverHost at start time.
|
||||
// Phase 7 follow-up #246 — load Script + VirtualTag + ScriptedAlarm rows,
|
||||
// compose VirtualTagEngine + ScriptedAlarmEngine, start the driver-bridge
|
||||
// feed. SetPhase7Sources MUST run before applicationHost.StartAsync because
|
||||
// OtOpcUaServer + DriverNodeManager construction captures the field values
|
||||
// — late binding after server start is rejected with InvalidOperationException.
|
||||
// No-op when the generation has no virtual tags or scripted alarms.
|
||||
var phase7 = await phase7Composer.PrepareAsync(gen, stoppingToken);
|
||||
applicationHost.SetPhase7Sources(phase7.VirtualReadable, phase7.ScriptedAlarmReadable);
|
||||
}
|
||||
|
||||
await applicationHost.StartAsync(stoppingToken);
|
||||
|
||||
logger.LogInformation("OtOpcUa.Server running. Hosted drivers: {Count}", driverHost.RegisteredDriverIds.Count);
|
||||
@@ -57,6 +66,11 @@ public sealed class OpcUaServerService(
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await base.StopAsync(cancellationToken);
|
||||
// Dispose Phase 7 first so the bridge stops feeding the cache + the engines
|
||||
// stop firing alarm/historian events before the OPC UA server tears down its
|
||||
// node managers. Otherwise an in-flight cascade could try to push through a
|
||||
// disposed source and surface as a noisy shutdown warning.
|
||||
await phase7Composer.DisposeAsync();
|
||||
await applicationHost.DisposeAsync();
|
||||
await driverHost.DisposeAsync();
|
||||
}
|
||||
|
||||
146
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs
Normal file
146
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs
Normal file
@@ -0,0 +1,146 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 follow-up (task #244). Subscribes to live driver <see cref="ISubscribable"/>
|
||||
/// surfaces for every input path the Phase 7 engines care about + pushes incoming
|
||||
/// <see cref="DataChangeEventArgs.Snapshot"/>s into <see cref="CachedTagUpstreamSource"/>
|
||||
/// so <c>ctx.GetTag</c> reads see the freshest driver value.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Each <see cref="DriverFeed"/> declares a driver + the path-to-fullRef map for the
|
||||
/// attributes that driver provides. The bridge groups by driver so each <see cref="ISubscribable"/>
|
||||
/// gets one <c>SubscribeAsync</c> call with a batched fullRef list — drivers that
|
||||
/// poll under the hood (Modbus, AB CIP, S7) consolidate the polls; drivers with
|
||||
/// native subscriptions (Galaxy, OPC UA Client, TwinCAT) get a single watch list.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Because driver fullRefs are opaque + driver-specific (Galaxy
|
||||
/// <c>"DelmiaReceiver_001.Temp"</c>, Modbus <c>"40001"</c>, AB CIP
|
||||
/// <c>"Temperature[0]"</c>), the bridge keeps a per-feed reverse map from fullRef
|
||||
/// back to UNS path. <c>OnDataChange</c> fires keyed by fullRef; the bridge
|
||||
/// translates to the script-side path before calling <see cref="CachedTagUpstreamSource.Push"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Lifecycle: construct → <see cref="StartAsync"/> with the feeds → keep alive
|
||||
/// alongside the engines → <see cref="DisposeAsync"/> unsubscribes from every
|
||||
/// driver + unhooks the OnDataChange handlers. Driver subscriptions don't leak
|
||||
/// even on abnormal shutdown because the disposal awaits each
|
||||
/// <c>UnsubscribeAsync</c>.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class DriverSubscriptionBridge : IAsyncDisposable
|
||||
{
|
||||
private readonly CachedTagUpstreamSource _sink;
|
||||
private readonly ILogger<DriverSubscriptionBridge> _logger;
|
||||
private readonly List<ActiveSubscription> _active = [];
|
||||
private bool _started;
|
||||
private bool _disposed;
|
||||
|
||||
public DriverSubscriptionBridge(
|
||||
CachedTagUpstreamSource sink,
|
||||
ILogger<DriverSubscriptionBridge> logger)
|
||||
{
|
||||
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Subscribe each feed's driver to its declared fullRefs + wire push-to-cache.
|
||||
/// Idempotent guard rejects double-start. Throws on the first subscribe failure
|
||||
/// so misconfiguration surfaces fast — partial-subscribe state doesn't linger.
|
||||
/// </summary>
|
||||
public async Task StartAsync(IEnumerable<DriverFeed> feeds, CancellationToken ct)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(feeds);
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(DriverSubscriptionBridge));
|
||||
if (_started) throw new InvalidOperationException("DriverSubscriptionBridge already started");
|
||||
_started = true;
|
||||
|
||||
foreach (var feed in feeds)
|
||||
{
|
||||
if (feed.PathToFullRef.Count == 0) continue;
|
||||
|
||||
// Reverse map for OnDataChange dispatch — driver fires keyed by FullReference,
|
||||
// we push keyed by the script-side path.
|
||||
var fullRefToPath = feed.PathToFullRef
|
||||
.ToDictionary(kv => kv.Value, kv => kv.Key, StringComparer.Ordinal);
|
||||
var fullRefs = feed.PathToFullRef.Values.Distinct(StringComparer.Ordinal).ToList();
|
||||
|
||||
EventHandler<DataChangeEventArgs> handler = (_, e) =>
|
||||
{
|
||||
if (fullRefToPath.TryGetValue(e.FullReference, out var unsPath))
|
||||
_sink.Push(unsPath, e.Snapshot);
|
||||
};
|
||||
feed.Driver.OnDataChange += handler;
|
||||
|
||||
try
|
||||
{
|
||||
// OTOPCUA0001 suppression — the analyzer flags ISubscribable calls outside
|
||||
// CapabilityInvoker. This bridge IS the lifecycle-coordinator for Phase 7
|
||||
// subscriptions: it runs once at engine compose, doesn't hot-path per
|
||||
// script evaluation (the engines read from the cache instead), and surfaces
|
||||
// any subscribe failure by aborting bridge start. Wrapping in the per-call
|
||||
// resilience pipeline would add nothing — there's no caller to retry on
|
||||
// behalf of, and the breaker/bulkhead semantics belong to actual driver Read
|
||||
// dispatch, which still goes through CapabilityInvoker via DriverNodeManager.
|
||||
#pragma warning disable OTOPCUA0001
|
||||
var handle = await feed.Driver.SubscribeAsync(fullRefs, feed.PublishingInterval, ct).ConfigureAwait(false);
|
||||
#pragma warning restore OTOPCUA0001
|
||||
_active.Add(new ActiveSubscription(feed.Driver, handle, handler));
|
||||
_logger.LogInformation(
|
||||
"Phase 7 bridge subscribed {Count} attribute(s) from driver {Driver} (handle {Handle})",
|
||||
fullRefs.Count, feed.Driver.GetType().Name, handle.DiagnosticId);
|
||||
}
|
||||
catch
|
||||
{
|
||||
feed.Driver.OnDataChange -= handler;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
foreach (var sub in _active)
|
||||
{
|
||||
sub.Driver.OnDataChange -= sub.Handler;
|
||||
try
|
||||
{
|
||||
#pragma warning disable OTOPCUA0001 // bridge lifecycle — see StartAsync suppression rationale
|
||||
await sub.Driver.UnsubscribeAsync(sub.Handle, CancellationToken.None).ConfigureAwait(false);
|
||||
#pragma warning restore OTOPCUA0001
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Driver {Driver} UnsubscribeAsync threw on bridge dispose (handle {Handle})",
|
||||
sub.Driver.GetType().Name, sub.Handle.DiagnosticId);
|
||||
}
|
||||
}
|
||||
_active.Clear();
|
||||
}
|
||||
|
||||
private sealed record ActiveSubscription(
|
||||
ISubscribable Driver,
|
||||
ISubscriptionHandle Handle,
|
||||
EventHandler<DataChangeEventArgs> Handler);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One driver's contribution to the Phase 7 bridge — the driver's <see cref="ISubscribable"/>
|
||||
/// surface plus the path-to-fullRef map the bridge uses to translate driver-side
|
||||
/// <see cref="DataChangeEventArgs.FullReference"/> back to script-side paths.
|
||||
/// </summary>
|
||||
/// <param name="Driver">The driver's subscribable surface (every shipped driver implements <see cref="ISubscribable"/>).</param>
|
||||
/// <param name="PathToFullRef">UNS path the script uses → driver-opaque fullRef. Empty map = nothing to subscribe (skipped).</param>
|
||||
/// <param name="PublishingInterval">Forwarded to the driver's <see cref="ISubscribable.SubscribeAsync"/>.</param>
|
||||
public sealed record DriverFeed(
|
||||
ISubscribable Driver,
|
||||
IReadOnlyDictionary<string, string> PathToFullRef,
|
||||
TimeSpan PublishingInterval);
|
||||
183
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs
Normal file
183
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs
Normal file
@@ -0,0 +1,183 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 follow-up (task #246) — orchestrates the runtime composition of virtual
|
||||
/// tags + scripted alarms + the historian sink + the driver-bridge that feeds the
|
||||
/// engines. Called by <see cref="OpcUaServerService"/> after the bootstrap generation
|
||||
/// loads + before <see cref="OpcUaApplicationHost.StartAsync"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <see cref="PrepareAsync"/> reads Script / VirtualTag / ScriptedAlarm rows from
|
||||
/// the central config DB at the bootstrapped generation, instantiates a
|
||||
/// <see cref="CachedTagUpstreamSource"/>, runs <see cref="Phase7EngineComposer.Compose"/>,
|
||||
/// starts a <see cref="DriverSubscriptionBridge"/> per registered driver feeding
|
||||
/// <see cref="EquipmentNamespaceContent"/>'s tag rows into the cache, and returns
|
||||
/// the engine-backed <see cref="Core.Abstractions.IReadable"/> sources for
|
||||
/// <see cref="OpcUaApplicationHost.SetPhase7Sources"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <see cref="DisposeAsync"/> tears down the bridge first (so no more events
|
||||
/// arrive at the cache), then the engines (so cascades + timer ticks stop), then
|
||||
/// the SQLite sink (which flushes any in-flight drain). Lifetime is owned by the
|
||||
/// host; <see cref="OpcUaServerService.StopAsync"/> calls dispose during graceful
|
||||
/// shutdown.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class Phase7Composer : IAsyncDisposable
|
||||
{
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly DriverHost _driverHost;
|
||||
private readonly DriverEquipmentContentRegistry _equipmentRegistry;
|
||||
private readonly IAlarmHistorianSink _historianSink;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly Serilog.ILogger _scriptLogger;
|
||||
private readonly ILogger<Phase7Composer> _logger;
|
||||
|
||||
private DriverSubscriptionBridge? _bridge;
|
||||
private Phase7ComposedSources _sources = Phase7ComposedSources.Empty;
|
||||
private bool _disposed;
|
||||
|
||||
public Phase7Composer(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
DriverHost driverHost,
|
||||
DriverEquipmentContentRegistry equipmentRegistry,
|
||||
IAlarmHistorianSink historianSink,
|
||||
ILoggerFactory loggerFactory,
|
||||
Serilog.ILogger scriptLogger,
|
||||
ILogger<Phase7Composer> logger)
|
||||
{
|
||||
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
|
||||
_driverHost = driverHost ?? throw new ArgumentNullException(nameof(driverHost));
|
||||
_equipmentRegistry = equipmentRegistry ?? throw new ArgumentNullException(nameof(equipmentRegistry));
|
||||
_historianSink = historianSink ?? throw new ArgumentNullException(nameof(historianSink));
|
||||
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
||||
_scriptLogger = scriptLogger ?? throw new ArgumentNullException(nameof(scriptLogger));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public Phase7ComposedSources Sources => _sources;
|
||||
|
||||
public async Task<Phase7ComposedSources> PrepareAsync(long generationId, CancellationToken ct)
|
||||
{
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(Phase7Composer));
|
||||
|
||||
// Load the three Phase 7 row sets in one DB scope.
|
||||
List<Script> scripts;
|
||||
List<VirtualTag> virtualTags;
|
||||
List<ScriptedAlarm> scriptedAlarms;
|
||||
using (var scope = _scopeFactory.CreateScope())
|
||||
{
|
||||
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
|
||||
scripts = await db.Scripts.AsNoTracking()
|
||||
.Where(s => s.GenerationId == generationId).ToListAsync(ct).ConfigureAwait(false);
|
||||
virtualTags = await db.VirtualTags.AsNoTracking()
|
||||
.Where(v => v.GenerationId == generationId && v.Enabled).ToListAsync(ct).ConfigureAwait(false);
|
||||
scriptedAlarms = await db.ScriptedAlarms.AsNoTracking()
|
||||
.Where(a => a.GenerationId == generationId && a.Enabled).ToListAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (virtualTags.Count == 0 && scriptedAlarms.Count == 0)
|
||||
{
|
||||
_logger.LogInformation("Phase 7: no virtual tags or scripted alarms in generation {Gen}; engines dormant", generationId);
|
||||
return Phase7ComposedSources.Empty;
|
||||
}
|
||||
|
||||
var upstream = new CachedTagUpstreamSource();
|
||||
|
||||
_sources = Phase7EngineComposer.Compose(
|
||||
scripts: scripts,
|
||||
virtualTags: virtualTags,
|
||||
scriptedAlarms: scriptedAlarms,
|
||||
upstream: upstream,
|
||||
alarmStateStore: new InMemoryAlarmStateStore(),
|
||||
historianSink: _historianSink,
|
||||
rootScriptLogger: _scriptLogger,
|
||||
loggerFactory: _loggerFactory);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Phase 7: composed engines from generation {Gen} — {Vt} virtual tag(s), {Al} scripted alarm(s), {Sc} script(s)",
|
||||
generationId, virtualTags.Count, scriptedAlarms.Count, scripts.Count);
|
||||
|
||||
// Build driver feeds from each registered driver's EquipmentNamespaceContent + start
|
||||
// the bridge. Drivers without populated content (Galaxy SystemPlatform-kind, drivers
|
||||
// whose Equipment rows haven't been published yet) contribute an empty feed which
|
||||
// the bridge silently skips.
|
||||
_bridge = new DriverSubscriptionBridge(upstream, _loggerFactory.CreateLogger<DriverSubscriptionBridge>());
|
||||
var feeds = BuildDriverFeeds(_driverHost, _equipmentRegistry);
|
||||
await _bridge.StartAsync(feeds, ct).ConfigureAwait(false);
|
||||
|
||||
return _sources;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// For each registered driver that exposes <see cref="Core.Abstractions.ISubscribable"/>,
|
||||
/// build a UNS-path → driver-fullRef map from its EquipmentNamespaceContent.
|
||||
/// Path convention: <c>/{areaName}/{lineName}/{equipmentName}/{tagName}</c> matching
|
||||
/// what the EquipmentNodeWalker emits into the OPC UA browse tree, so script literals
|
||||
/// written against the operator-visible tree work without translation.
|
||||
/// </summary>
|
||||
internal static IReadOnlyList<DriverFeed> BuildDriverFeeds(
|
||||
DriverHost driverHost, DriverEquipmentContentRegistry equipmentRegistry)
|
||||
{
|
||||
var feeds = new List<DriverFeed>();
|
||||
foreach (var driverId in driverHost.RegisteredDriverIds)
|
||||
{
|
||||
var driver = driverHost.GetDriver(driverId);
|
||||
if (driver is not Core.Abstractions.ISubscribable subscribable) continue;
|
||||
|
||||
var content = equipmentRegistry.Get(driverId);
|
||||
if (content is null) continue;
|
||||
|
||||
var pathToFullRef = MapPathsToFullRefs(content);
|
||||
if (pathToFullRef.Count == 0) continue;
|
||||
|
||||
feeds.Add(new DriverFeed(subscribable, pathToFullRef, TimeSpan.FromSeconds(1)));
|
||||
}
|
||||
return feeds;
|
||||
}
|
||||
|
||||
internal static IReadOnlyDictionary<string, string> MapPathsToFullRefs(EquipmentNamespaceContent content)
|
||||
{
|
||||
var result = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
var areaById = content.Areas.ToDictionary(a => a.UnsAreaId, StringComparer.OrdinalIgnoreCase);
|
||||
var lineById = content.Lines.ToDictionary(l => l.UnsLineId, StringComparer.OrdinalIgnoreCase);
|
||||
var equipmentById = content.Equipment.ToDictionary(e => e.EquipmentId, StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
foreach (var tag in content.Tags)
|
||||
{
|
||||
if (string.IsNullOrEmpty(tag.EquipmentId)) continue;
|
||||
if (!equipmentById.TryGetValue(tag.EquipmentId!, out var eq)) continue;
|
||||
if (!lineById.TryGetValue(eq.UnsLineId, out var line)) continue;
|
||||
if (!areaById.TryGetValue(line.UnsAreaId, out var area)) continue;
|
||||
|
||||
var path = $"/{area.Name}/{line.Name}/{eq.Name}/{tag.Name}";
|
||||
result[path] = tag.TagConfig; // duplicate-path collisions naturally win-last; UI publish-validation rules out duplicate names
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
if (_bridge is not null) await _bridge.DisposeAsync().ConfigureAwait(false);
|
||||
foreach (var d in _sources.Disposables)
|
||||
{
|
||||
try { d.Dispose(); }
|
||||
catch (Exception ex) { _logger.LogWarning(ex, "Phase 7 disposable threw during shutdown"); }
|
||||
}
|
||||
if (_historianSink is IDisposable disposableSink) disposableSink.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -73,7 +73,7 @@ public static class Phase7EngineComposer
|
||||
disposables.Add(vtEngine);
|
||||
}
|
||||
|
||||
ScriptedAlarmSource? alarmSource = null;
|
||||
IReadable? alarmReadable = null;
|
||||
if (scriptedAlarms.Count > 0)
|
||||
{
|
||||
var alarmDefs = ProjectScriptedAlarms(scriptedAlarms, scriptById).ToList();
|
||||
@@ -83,17 +83,17 @@ public static class Phase7EngineComposer
|
||||
var engineLogger = loggerFactory.CreateLogger("Phase7HistorianRouter");
|
||||
alarmEngine.OnEvent += (_, e) => _ = RouteToHistorianAsync(e, historianSink, engineLogger);
|
||||
alarmEngine.LoadAsync(alarmDefs, CancellationToken.None).GetAwaiter().GetResult();
|
||||
alarmSource = new ScriptedAlarmSource(alarmEngine);
|
||||
var alarmSource = new ScriptedAlarmSource(alarmEngine);
|
||||
// Task #245 — expose each alarm's current Active state as IReadable so OPC UA
|
||||
// variable reads on Source=ScriptedAlarm nodes return the live predicate truth
|
||||
// instead of BadNotFound. ScriptedAlarmSource stays registered as IAlarmSource
|
||||
// for the event stream; the IReadable is a separate adapter over the same engine.
|
||||
alarmReadable = new ScriptedAlarmReadable(alarmEngine);
|
||||
disposables.Add(alarmEngine);
|
||||
disposables.Add(alarmSource);
|
||||
}
|
||||
|
||||
// ScriptedAlarmSource is an IAlarmSource, not an IReadable — scripted-alarm
|
||||
// variable-read dispatch (task #245) needs a dedicated engine-state adapter. Until
|
||||
// that ships, reads against Source=ScriptedAlarm nodes return BadNotFound per the
|
||||
// DriverNodeManager null-check path (the ADR-002 "misconfiguration not silent
|
||||
// fallback" signal). The alarm event stream still fires via IAlarmSource.
|
||||
return new Phase7ComposedSources(vtSource, ScriptedAlarmReadable: null, disposables);
|
||||
return new Phase7ComposedSources(vtSource, alarmReadable, disposables);
|
||||
}
|
||||
|
||||
internal static IEnumerable<VirtualTagDefinition> ProjectVirtualTags(
|
||||
|
||||
58
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/ScriptedAlarmReadable.cs
Normal file
58
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/ScriptedAlarmReadable.cs
Normal file
@@ -0,0 +1,58 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IReadable"/> adapter exposing each scripted alarm's current
|
||||
/// <see cref="AlarmActiveState"/> as an OPC UA boolean. Phase 7 follow-up (task #245).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Paired with the <see cref="NodeSourceKind.ScriptedAlarm"/> dispatch in
|
||||
/// <c>DriverNodeManager.OnReadValue</c>. Full-reference lookup is the
|
||||
/// <c>ScriptedAlarmId</c> the walker wrote into <c>DriverAttributeInfo.FullName</c>
|
||||
/// when emitting the alarm variable node.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Unknown alarm ids return <c>BadNodeIdUnknown</c> so misconfiguration surfaces
|
||||
/// instead of silently reading <c>false</c>. Alarms whose predicate has never
|
||||
/// been evaluated (brand new, before the engine's first cascade tick) report
|
||||
/// <see cref="AlarmActiveState.Inactive"/> via <see cref="AlarmConditionState.Fresh"/>,
|
||||
/// which matches the Part 9 initial-state semantics.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class ScriptedAlarmReadable : IReadable
|
||||
{
|
||||
/// <summary>OPC UA <c>StatusCodes.BadNodeIdUnknown</c> — kept local so we don't pull the OPC stack.</summary>
|
||||
private const uint BadNodeIdUnknown = 0x80340000;
|
||||
|
||||
private readonly ScriptedAlarmEngine _engine;
|
||||
|
||||
public ScriptedAlarmReadable(ScriptedAlarmEngine engine)
|
||||
{
|
||||
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
||||
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(fullReferences);
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
var results = new DataValueSnapshot[fullReferences.Count];
|
||||
for (var i = 0; i < fullReferences.Count; i++)
|
||||
{
|
||||
var alarmId = fullReferences[i];
|
||||
var state = _engine.GetState(alarmId);
|
||||
if (state is null)
|
||||
{
|
||||
results[i] = new DataValueSnapshot(null, BadNodeIdUnknown, null, now);
|
||||
continue;
|
||||
}
|
||||
var active = state.Active == AlarmActiveState.Active;
|
||||
results[i] = new DataValueSnapshot(active, 0u, now, now);
|
||||
}
|
||||
return Task.FromResult<IReadOnlyList<DataValueSnapshot>>(results);
|
||||
}
|
||||
}
|
||||
@@ -8,8 +8,10 @@ using Serilog.Formatting.Compact;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
using ZB.MOM.WW.OtOpcUa.Server;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Security;
|
||||
|
||||
var builder = Host.CreateApplicationBuilder(args);
|
||||
@@ -113,5 +115,13 @@ builder.Services.AddDbContext<OtOpcUaConfigDbContext>(opt =>
|
||||
opt.UseSqlServer(options.ConfigDbConnectionString));
|
||||
builder.Services.AddHostedService<HostStatusPublisher>();
|
||||
|
||||
// Phase 7 follow-up #246 — historian sink + engine composer. NullAlarmHistorianSink
|
||||
// is the default until the Galaxy.Host SqliteStoreAndForwardSink writer adapter
|
||||
// lands (task #248). The composer reads Script/VirtualTag/ScriptedAlarm rows on
|
||||
// generation bootstrap, builds the engines, and starts the driver-bridge feed.
|
||||
builder.Services.AddSingleton<IAlarmHistorianSink>(NullAlarmHistorianSink.Instance);
|
||||
builder.Services.AddSingleton(Log.Logger); // Serilog root for ScriptLoggerFactory
|
||||
builder.Services.AddSingleton<Phase7Composer>();
|
||||
|
||||
var host = builder.Build();
|
||||
await host.RunAsync();
|
||||
|
||||
@@ -0,0 +1,226 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
|
||||
|
||||
/// <summary>
|
||||
/// Task #244 — covers the bridge that pumps live driver <c>OnDataChange</c>
|
||||
/// notifications into the Phase 7 <see cref="CachedTagUpstreamSource"/>.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class DriverSubscriptionBridgeTests
|
||||
{
|
||||
private sealed class FakeDriver : ISubscribable
|
||||
{
|
||||
public List<IReadOnlyList<string>> SubscribeCalls { get; } = [];
|
||||
public List<ISubscriptionHandle> Unsubscribed { get; } = [];
|
||||
public ISubscriptionHandle? LastHandle { get; private set; }
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||
{
|
||||
SubscribeCalls.Add(fullReferences);
|
||||
LastHandle = new Handle($"sub-{SubscribeCalls.Count}");
|
||||
return Task.FromResult(LastHandle);
|
||||
}
|
||||
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
Unsubscribed.Add(handle);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void Fire(string fullRef, object value)
|
||||
{
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(
|
||||
LastHandle!, fullRef,
|
||||
new DataValueSnapshot(value, 0u, DateTime.UtcNow, DateTime.UtcNow)));
|
||||
}
|
||||
|
||||
private sealed record Handle(string DiagnosticId) : ISubscriptionHandle;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task StartAsync_calls_SubscribeAsync_with_distinct_fullRefs()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
var driver = new FakeDriver();
|
||||
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
|
||||
await bridge.StartAsync(new[]
|
||||
{
|
||||
new DriverFeed(driver,
|
||||
new Dictionary<string, string>
|
||||
{
|
||||
["/Site/L1/A/Temp"] = "DR.Temp",
|
||||
["/Site/L1/A/Pressure"] = "DR.Pressure",
|
||||
},
|
||||
TimeSpan.FromSeconds(1)),
|
||||
}, CancellationToken.None);
|
||||
|
||||
driver.SubscribeCalls.Count.ShouldBe(1);
|
||||
driver.SubscribeCalls[0].ShouldContain("DR.Temp");
|
||||
driver.SubscribeCalls[0].ShouldContain("DR.Pressure");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OnDataChange_pushes_to_cache_keyed_by_UNS_path()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
var driver = new FakeDriver();
|
||||
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
|
||||
await bridge.StartAsync(new[]
|
||||
{
|
||||
new DriverFeed(driver,
|
||||
new Dictionary<string, string> { ["/Site/L1/A/Temp"] = "DR.Temp" },
|
||||
TimeSpan.FromSeconds(1)),
|
||||
}, CancellationToken.None);
|
||||
|
||||
driver.Fire("DR.Temp", 42.5);
|
||||
|
||||
sink.ReadTag("/Site/L1/A/Temp").Value.ShouldBe(42.5);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OnDataChange_with_unmapped_fullRef_is_ignored()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
var driver = new FakeDriver();
|
||||
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
|
||||
await bridge.StartAsync(new[]
|
||||
{
|
||||
new DriverFeed(driver,
|
||||
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
||||
TimeSpan.FromSeconds(1)),
|
||||
}, CancellationToken.None);
|
||||
|
||||
driver.Fire("DR.B", 99); // not in map
|
||||
|
||||
sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured,
|
||||
"unmapped fullRef shouldn't pollute the cache");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Empty_PathToFullRef_skips_SubscribeAsync_call()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
var driver = new FakeDriver();
|
||||
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
|
||||
await bridge.StartAsync(new[]
|
||||
{
|
||||
new DriverFeed(driver, new Dictionary<string, string>(), TimeSpan.FromSeconds(1)),
|
||||
}, CancellationToken.None);
|
||||
|
||||
driver.SubscribeCalls.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposeAsync_unsubscribes_each_active_subscription()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
var driver = new FakeDriver();
|
||||
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
|
||||
await bridge.StartAsync(new[]
|
||||
{
|
||||
new DriverFeed(driver,
|
||||
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
||||
TimeSpan.FromSeconds(1)),
|
||||
}, CancellationToken.None);
|
||||
|
||||
await bridge.DisposeAsync();
|
||||
|
||||
driver.Unsubscribed.Count.ShouldBe(1);
|
||||
driver.Unsubscribed[0].ShouldBeSameAs(driver.LastHandle);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposeAsync_unhooks_OnDataChange_so_post_dispose_events_dont_push()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
var driver = new FakeDriver();
|
||||
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
|
||||
await bridge.StartAsync(new[]
|
||||
{
|
||||
new DriverFeed(driver,
|
||||
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
||||
TimeSpan.FromSeconds(1)),
|
||||
}, CancellationToken.None);
|
||||
|
||||
await bridge.DisposeAsync();
|
||||
driver.Fire("DR.A", 999); // post-dispose event
|
||||
|
||||
sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task StartAsync_called_twice_throws()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
await bridge.StartAsync(Array.Empty<DriverFeed>(), CancellationToken.None);
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(
|
||||
() => bridge.StartAsync(Array.Empty<DriverFeed>(), CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposeAsync_is_idempotent()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
await bridge.DisposeAsync();
|
||||
await bridge.DisposeAsync(); // must not throw
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribe_failure_unhooks_handler_and_propagates()
|
||||
{
|
||||
var sink = new CachedTagUpstreamSource();
|
||||
var failingDriver = new ThrowingDriver();
|
||||
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||
|
||||
var feeds = new[]
|
||||
{
|
||||
new DriverFeed(failingDriver,
|
||||
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
||||
TimeSpan.FromSeconds(1)),
|
||||
};
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(
|
||||
() => bridge.StartAsync(feeds, CancellationToken.None));
|
||||
|
||||
// Handler should be unhooked — firing now would NPE if it wasn't (event has 0 subs).
|
||||
failingDriver.HasAnyHandlers.ShouldBeFalse(
|
||||
"handler must be removed when SubscribeAsync throws so it doesn't leak");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_sink_or_logger_rejected()
|
||||
{
|
||||
Should.Throw<ArgumentNullException>(() => new DriverSubscriptionBridge(null!, NullLogger<DriverSubscriptionBridge>.Instance));
|
||||
Should.Throw<ArgumentNullException>(() => new DriverSubscriptionBridge(new CachedTagUpstreamSource(), null!));
|
||||
}
|
||||
|
||||
private sealed class ThrowingDriver : ISubscribable
|
||||
{
|
||||
private EventHandler<DataChangeEventArgs>? _handler;
|
||||
public bool HasAnyHandlers => _handler is not null;
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange
|
||||
{
|
||||
add => _handler = (EventHandler<DataChangeEventArgs>?)Delegate.Combine(_handler, value);
|
||||
remove => _handler = (EventHandler<DataChangeEventArgs>?)Delegate.Remove(_handler, value);
|
||||
}
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(IReadOnlyList<string> _, TimeSpan __, CancellationToken ___) =>
|
||||
throw new InvalidOperationException("driver offline");
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle _, CancellationToken __) => Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
|
||||
|
||||
/// <summary>
|
||||
/// Task #246 — covers the deterministic mapping inside <see cref="Phase7Composer"/>
|
||||
/// that turns <see cref="EquipmentNamespaceContent"/> into the path → fullRef map
|
||||
/// <see cref="DriverFeed.PathToFullRef"/> consumes. Pure function; no DI / DB needed.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class Phase7ComposerMappingTests
|
||||
{
|
||||
private static UnsArea Area(string id, string name) =>
|
||||
new() { UnsAreaId = id, ClusterId = "c", Name = name, GenerationId = 1 };
|
||||
|
||||
private static UnsLine Line(string id, string areaId, string name) =>
|
||||
new() { UnsLineId = id, UnsAreaId = areaId, Name = name, GenerationId = 1 };
|
||||
|
||||
private static Equipment Eq(string id, string lineId, string name) => new()
|
||||
{
|
||||
EquipmentRowId = Guid.NewGuid(), GenerationId = 1, EquipmentId = id,
|
||||
EquipmentUuid = Guid.NewGuid(), DriverInstanceId = "drv",
|
||||
UnsLineId = lineId, Name = name, MachineCode = "m",
|
||||
};
|
||||
|
||||
private static Tag T(string id, string name, string fullRef, string equipmentId) => new()
|
||||
{
|
||||
TagRowId = Guid.NewGuid(), GenerationId = 1, TagId = id,
|
||||
DriverInstanceId = "drv", EquipmentId = equipmentId,
|
||||
Name = name, DataType = "Float32",
|
||||
AccessLevel = TagAccessLevel.Read, TagConfig = fullRef,
|
||||
};
|
||||
|
||||
[Fact]
|
||||
public void Maps_tag_to_UNS_path_walker_emits()
|
||||
{
|
||||
var content = new EquipmentNamespaceContent(
|
||||
Areas: [Area("a1", "warsaw")],
|
||||
Lines: [Line("l1", "a1", "oven-line")],
|
||||
Equipment: [Eq("e1", "l1", "oven-3")],
|
||||
Tags: [T("t1", "Temp", "DR.Temp", "e1")]);
|
||||
|
||||
var map = Phase7Composer.MapPathsToFullRefs(content);
|
||||
|
||||
map.ShouldContainKeyAndValue("/warsaw/oven-line/oven-3/Temp", "DR.Temp");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Skips_tag_with_null_EquipmentId()
|
||||
{
|
||||
var content = new EquipmentNamespaceContent(
|
||||
[Area("a1", "warsaw")], [Line("l1", "a1", "ol")], [Eq("e1", "l1", "ov")],
|
||||
[T("t1", "Bare", "DR.Bare", null!)]); // SystemPlatform-style orphan
|
||||
|
||||
Phase7Composer.MapPathsToFullRefs(content).ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Skips_tag_pointing_at_unknown_Equipment()
|
||||
{
|
||||
var content = new EquipmentNamespaceContent(
|
||||
[Area("a1", "warsaw")], [Line("l1", "a1", "ol")], [Eq("e1", "l1", "ov")],
|
||||
[T("t1", "Lost", "DR.Lost", "e-missing")]);
|
||||
|
||||
Phase7Composer.MapPathsToFullRefs(content).ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Maps_multiple_tags_under_same_equipment_distinctly()
|
||||
{
|
||||
var content = new EquipmentNamespaceContent(
|
||||
[Area("a1", "site")], [Line("l1", "a1", "line1")], [Eq("e1", "l1", "cell")],
|
||||
[T("t1", "Temp", "DR.T", "e1"), T("t2", "Pressure", "DR.P", "e1")]);
|
||||
|
||||
var map = Phase7Composer.MapPathsToFullRefs(content);
|
||||
|
||||
map.Count.ShouldBe(2);
|
||||
map["/site/line1/cell/Temp"].ShouldBe("DR.T");
|
||||
map["/site/line1/cell/Pressure"].ShouldBe("DR.P");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Empty_content_yields_empty_map()
|
||||
{
|
||||
Phase7Composer.MapPathsToFullRefs(new EquipmentNamespaceContent([], [], [], []))
|
||||
.ShouldBeEmpty();
|
||||
}
|
||||
}
|
||||
@@ -72,9 +72,28 @@ public sealed class Phase7EngineComposerTests
|
||||
loggerFactory: NullLoggerFactory.Instance);
|
||||
|
||||
result.VirtualReadable.ShouldNotBeNull();
|
||||
result.ScriptedAlarmReadable.ShouldBeNull("no alarms configured");
|
||||
result.Disposables.Count.ShouldBeGreaterThan(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Compose_ScriptedAlarm_rows_returns_non_null_ScriptedAlarmReadable()
|
||||
{
|
||||
var scripts = new[] { ScriptRow("scr-1", "return false;") };
|
||||
var alarms = new[] { AlarmRow("al-1", "scr-1") };
|
||||
|
||||
var result = Phase7EngineComposer.Compose(
|
||||
scripts, [], alarms,
|
||||
upstream: new CachedTagUpstreamSource(),
|
||||
alarmStateStore: new InMemoryAlarmStateStore(),
|
||||
historianSink: NullAlarmHistorianSink.Instance,
|
||||
rootScriptLogger: new LoggerConfiguration().CreateLogger(),
|
||||
loggerFactory: NullLoggerFactory.Instance);
|
||||
|
||||
result.ScriptedAlarmReadable.ShouldNotBeNull("task #245 — alarm Active state readable");
|
||||
result.VirtualReadable.ShouldBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Compose_missing_script_reference_throws_with_actionable_message()
|
||||
{
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Serilog;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
|
||||
|
||||
/// <summary>
|
||||
/// Task #245 — covers the IReadable adapter that surfaces each scripted alarm's
|
||||
/// live <c>ActiveState</c> so OPC UA variable reads on Source=ScriptedAlarm nodes
|
||||
/// return the predicate truth instead of BadNotFound.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class ScriptedAlarmReadableTests
|
||||
{
|
||||
private static (ScriptedAlarmEngine engine, CachedTagUpstreamSource upstream) BuildEngineWith(
|
||||
params (string alarmId, string predicateSource)[] alarms)
|
||||
{
|
||||
var upstream = new CachedTagUpstreamSource();
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
var factory = new ScriptLoggerFactory(logger);
|
||||
var engine = new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), factory, logger);
|
||||
var defs = alarms.Select(a => new ScriptedAlarmDefinition(
|
||||
AlarmId: a.alarmId,
|
||||
EquipmentPath: "/eq",
|
||||
AlarmName: a.alarmId,
|
||||
Kind: AlarmKind.LimitAlarm,
|
||||
Severity: AlarmSeverity.Medium,
|
||||
MessageTemplate: "x",
|
||||
PredicateScriptSource: a.predicateSource)).ToList();
|
||||
engine.LoadAsync(defs, CancellationToken.None).GetAwaiter().GetResult();
|
||||
return (engine, upstream);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Reads_return_false_for_newly_loaded_alarm_with_inactive_predicate()
|
||||
{
|
||||
var (engine, _) = BuildEngineWith(("a1", "return false;"));
|
||||
using var _e = engine;
|
||||
var readable = new ScriptedAlarmReadable(engine);
|
||||
|
||||
var result = await readable.ReadAsync(["a1"], CancellationToken.None);
|
||||
|
||||
result.Count.ShouldBe(1);
|
||||
result[0].Value.ShouldBe(false);
|
||||
result[0].StatusCode.ShouldBe(0u, "Good quality when the engine has state");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Reads_return_true_when_predicate_evaluates_to_active()
|
||||
{
|
||||
var (engine, upstream) = BuildEngineWith(
|
||||
("tempAlarm", "return ctx.GetTag(\"/Site/Line/Cell/Temp\").Value is double d && d > 100;"));
|
||||
using var _e = engine;
|
||||
|
||||
// Seed the upstream value + nudge the engine so the alarm transitions to Active.
|
||||
upstream.Push("/Site/Line/Cell/Temp",
|
||||
new DataValueSnapshot(150.0, 0u, DateTime.UtcNow, DateTime.UtcNow));
|
||||
|
||||
// Allow the engine's change-driven cascade to run.
|
||||
await Task.Delay(50);
|
||||
|
||||
var readable = new ScriptedAlarmReadable(engine);
|
||||
var result = await readable.ReadAsync(["tempAlarm"], CancellationToken.None);
|
||||
|
||||
result[0].Value.ShouldBe(true);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Reads_return_BadNodeIdUnknown_for_missing_alarm()
|
||||
{
|
||||
var (engine, _) = BuildEngineWith(("a1", "return false;"));
|
||||
using var _e = engine;
|
||||
var readable = new ScriptedAlarmReadable(engine);
|
||||
|
||||
var result = await readable.ReadAsync(["a-not-loaded"], CancellationToken.None);
|
||||
|
||||
result[0].Value.ShouldBeNull();
|
||||
result[0].StatusCode.ShouldBe(0x80340000u,
|
||||
"BadNodeIdUnknown surfaces a misconfiguration, not a silent false");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Reads_batch_round_trip_preserves_order()
|
||||
{
|
||||
var (engine, _) = BuildEngineWith(
|
||||
("a1", "return false;"),
|
||||
("a2", "return false;"));
|
||||
using var _e = engine;
|
||||
var readable = new ScriptedAlarmReadable(engine);
|
||||
|
||||
var result = await readable.ReadAsync(["a2", "missing", "a1"], CancellationToken.None);
|
||||
|
||||
result.Count.ShouldBe(3);
|
||||
result[0].Value.ShouldBe(false); // a2
|
||||
result[1].StatusCode.ShouldBe(0x80340000u); // missing
|
||||
result[2].Value.ShouldBe(false); // a1
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_engine_rejected()
|
||||
{
|
||||
Should.Throw<ArgumentNullException>(() => new ScriptedAlarmReadable(null!));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Null_fullReferences_rejected()
|
||||
{
|
||||
var (engine, _) = BuildEngineWith(("a1", "return false;"));
|
||||
using var _e = engine;
|
||||
var readable = new ScriptedAlarmReadable(engine);
|
||||
|
||||
await Should.ThrowAsync<ArgumentNullException>(
|
||||
() => readable.ReadAsync(null!, CancellationToken.None));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user