Phase 7 follow-up #246 — Phase7Composer + Program.cs wire-in
Activates the Phase 7 engines in production. Loads Script + VirtualTag + ScriptedAlarm rows from the bootstrapped generation, wires the engines through the Phase7EngineComposer kernel (#243), starts the DriverSubscriptionBridge feed (#244), and late-binds the resulting IReadable sources to OpcUaApplicationHost before OPC UA server start. ## Phase7Composer (Server.Phase7) Singleton orchestrator. PrepareAsync loads the three Phase 7 row sets in one DB scope, builds CachedTagUpstreamSource, calls Phase7EngineComposer.Compose, constructs DriverSubscriptionBridge with one DriverFeed per registered ISubscribable driver (path-to-fullRef map built from EquipmentNamespaceContent via MapPathsToFullRefs), starts the bridge. DisposeAsync tears down in the right order: bridge first (no more events fired into the cache), then engines (cascades + timers stop), then any disposable sink. MapPathsToFullRefs: deterministic path convention is /{areaName}/{lineName}/{equipmentName}/{tagName} matching exactly what EquipmentNodeWalker emits into the OPC UA browse tree, so script literals against the operator-visible UNS tree work without translation. Tags missing EquipmentId or pointing at unknown Equipment are skipped silently (Galaxy SystemPlatform-style tags + dangling references handled). ## OpcUaApplicationHost.SetPhase7Sources New late-bind setter. Throws InvalidOperationException if called after StartAsync because OtOpcUaServer + DriverNodeManagers capture the field values at construction; mutation post-start would silently fail. ## OpcUaServerService After bootstrap loads the current generation, calls phase7Composer.PrepareAsync + applicationHost.SetPhase7Sources before applicationHost.StartAsync. StopAsync disposes Phase7Composer first so the bridge stops feeding the cache before the OPC UA server tears down its node managers (avoids in-flight cascades surfacing as noisy shutdown warnings). ## Program.cs Registers IAlarmHistorianSink as NullAlarmHistorianSink.Instance (task #247 swaps in the real Galaxy.Host-writer-backed SqliteStoreAndForwardSink), Serilog root logger, and Phase7Composer singleton. ## Tests — 5 new Phase7ComposerMappingTests = 34 Phase 7 tests total Maps tag → walker UNS path, skips null EquipmentId, skips unknown Equipment reference, multiple tags under same equipment map distinctly, empty content yields empty map. Pure functions; no DI/DB needed. The real PrepareAsync DB query path can't be exercised without SQL Server in the test environment — it's exercised by the live E2E smoke (task #240) which unblocks once #247 lands. ## Phase 7 production wiring chain status - ✅ #243 composition kernel - ✅ #245 scripted-alarm IReadable adapter - ✅ #244 driver bridge - ✅ #246 this — Program.cs wire-in - 🟡 #247 — Galaxy.Host SqliteStoreAndForwardSink writer adapter (replaces NullSink) - 🟡 #240 — live E2E smoke (unblocks once #247 lands)
This commit is contained in:
183
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs
Normal file
183
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs
Normal file
@@ -0,0 +1,183 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 follow-up (task #246) — orchestrates the runtime composition of virtual
|
||||
/// tags + scripted alarms + the historian sink + the driver-bridge that feeds the
|
||||
/// engines. Called by <see cref="OpcUaServerService"/> after the bootstrap generation
|
||||
/// loads + before <see cref="OpcUaApplicationHost.StartAsync"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <see cref="PrepareAsync"/> reads Script / VirtualTag / ScriptedAlarm rows from
|
||||
/// the central config DB at the bootstrapped generation, instantiates a
|
||||
/// <see cref="CachedTagUpstreamSource"/>, runs <see cref="Phase7EngineComposer.Compose"/>,
|
||||
/// starts a <see cref="DriverSubscriptionBridge"/> per registered driver feeding
|
||||
/// <see cref="EquipmentNamespaceContent"/>'s tag rows into the cache, and returns
|
||||
/// the engine-backed <see cref="Core.Abstractions.IReadable"/> sources for
|
||||
/// <see cref="OpcUaApplicationHost.SetPhase7Sources"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <see cref="DisposeAsync"/> tears down the bridge first (so no more events
|
||||
/// arrive at the cache), then the engines (so cascades + timer ticks stop), then
|
||||
/// the SQLite sink (which flushes any in-flight drain). Lifetime is owned by the
|
||||
/// host; <see cref="OpcUaServerService.StopAsync"/> calls dispose during graceful
|
||||
/// shutdown.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class Phase7Composer : IAsyncDisposable
|
||||
{
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly DriverHost _driverHost;
|
||||
private readonly DriverEquipmentContentRegistry _equipmentRegistry;
|
||||
private readonly IAlarmHistorianSink _historianSink;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly Serilog.ILogger _scriptLogger;
|
||||
private readonly ILogger<Phase7Composer> _logger;
|
||||
|
||||
private DriverSubscriptionBridge? _bridge;
|
||||
private Phase7ComposedSources _sources = Phase7ComposedSources.Empty;
|
||||
private bool _disposed;
|
||||
|
||||
public Phase7Composer(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
DriverHost driverHost,
|
||||
DriverEquipmentContentRegistry equipmentRegistry,
|
||||
IAlarmHistorianSink historianSink,
|
||||
ILoggerFactory loggerFactory,
|
||||
Serilog.ILogger scriptLogger,
|
||||
ILogger<Phase7Composer> logger)
|
||||
{
|
||||
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
|
||||
_driverHost = driverHost ?? throw new ArgumentNullException(nameof(driverHost));
|
||||
_equipmentRegistry = equipmentRegistry ?? throw new ArgumentNullException(nameof(equipmentRegistry));
|
||||
_historianSink = historianSink ?? throw new ArgumentNullException(nameof(historianSink));
|
||||
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
||||
_scriptLogger = scriptLogger ?? throw new ArgumentNullException(nameof(scriptLogger));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public Phase7ComposedSources Sources => _sources;
|
||||
|
||||
public async Task<Phase7ComposedSources> PrepareAsync(long generationId, CancellationToken ct)
|
||||
{
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(Phase7Composer));
|
||||
|
||||
// Load the three Phase 7 row sets in one DB scope.
|
||||
List<Script> scripts;
|
||||
List<VirtualTag> virtualTags;
|
||||
List<ScriptedAlarm> scriptedAlarms;
|
||||
using (var scope = _scopeFactory.CreateScope())
|
||||
{
|
||||
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
|
||||
scripts = await db.Scripts.AsNoTracking()
|
||||
.Where(s => s.GenerationId == generationId).ToListAsync(ct).ConfigureAwait(false);
|
||||
virtualTags = await db.VirtualTags.AsNoTracking()
|
||||
.Where(v => v.GenerationId == generationId && v.Enabled).ToListAsync(ct).ConfigureAwait(false);
|
||||
scriptedAlarms = await db.ScriptedAlarms.AsNoTracking()
|
||||
.Where(a => a.GenerationId == generationId && a.Enabled).ToListAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (virtualTags.Count == 0 && scriptedAlarms.Count == 0)
|
||||
{
|
||||
_logger.LogInformation("Phase 7: no virtual tags or scripted alarms in generation {Gen}; engines dormant", generationId);
|
||||
return Phase7ComposedSources.Empty;
|
||||
}
|
||||
|
||||
var upstream = new CachedTagUpstreamSource();
|
||||
|
||||
_sources = Phase7EngineComposer.Compose(
|
||||
scripts: scripts,
|
||||
virtualTags: virtualTags,
|
||||
scriptedAlarms: scriptedAlarms,
|
||||
upstream: upstream,
|
||||
alarmStateStore: new InMemoryAlarmStateStore(),
|
||||
historianSink: _historianSink,
|
||||
rootScriptLogger: _scriptLogger,
|
||||
loggerFactory: _loggerFactory);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Phase 7: composed engines from generation {Gen} — {Vt} virtual tag(s), {Al} scripted alarm(s), {Sc} script(s)",
|
||||
generationId, virtualTags.Count, scriptedAlarms.Count, scripts.Count);
|
||||
|
||||
// Build driver feeds from each registered driver's EquipmentNamespaceContent + start
|
||||
// the bridge. Drivers without populated content (Galaxy SystemPlatform-kind, drivers
|
||||
// whose Equipment rows haven't been published yet) contribute an empty feed which
|
||||
// the bridge silently skips.
|
||||
_bridge = new DriverSubscriptionBridge(upstream, _loggerFactory.CreateLogger<DriverSubscriptionBridge>());
|
||||
var feeds = BuildDriverFeeds(_driverHost, _equipmentRegistry);
|
||||
await _bridge.StartAsync(feeds, ct).ConfigureAwait(false);
|
||||
|
||||
return _sources;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// For each registered driver that exposes <see cref="Core.Abstractions.ISubscribable"/>,
|
||||
/// build a UNS-path → driver-fullRef map from its EquipmentNamespaceContent.
|
||||
/// Path convention: <c>/{areaName}/{lineName}/{equipmentName}/{tagName}</c> matching
|
||||
/// what the EquipmentNodeWalker emits into the OPC UA browse tree, so script literals
|
||||
/// written against the operator-visible tree work without translation.
|
||||
/// </summary>
|
||||
internal static IReadOnlyList<DriverFeed> BuildDriverFeeds(
|
||||
DriverHost driverHost, DriverEquipmentContentRegistry equipmentRegistry)
|
||||
{
|
||||
var feeds = new List<DriverFeed>();
|
||||
foreach (var driverId in driverHost.RegisteredDriverIds)
|
||||
{
|
||||
var driver = driverHost.GetDriver(driverId);
|
||||
if (driver is not Core.Abstractions.ISubscribable subscribable) continue;
|
||||
|
||||
var content = equipmentRegistry.Get(driverId);
|
||||
if (content is null) continue;
|
||||
|
||||
var pathToFullRef = MapPathsToFullRefs(content);
|
||||
if (pathToFullRef.Count == 0) continue;
|
||||
|
||||
feeds.Add(new DriverFeed(subscribable, pathToFullRef, TimeSpan.FromSeconds(1)));
|
||||
}
|
||||
return feeds;
|
||||
}
|
||||
|
||||
internal static IReadOnlyDictionary<string, string> MapPathsToFullRefs(EquipmentNamespaceContent content)
|
||||
{
|
||||
var result = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
var areaById = content.Areas.ToDictionary(a => a.UnsAreaId, StringComparer.OrdinalIgnoreCase);
|
||||
var lineById = content.Lines.ToDictionary(l => l.UnsLineId, StringComparer.OrdinalIgnoreCase);
|
||||
var equipmentById = content.Equipment.ToDictionary(e => e.EquipmentId, StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
foreach (var tag in content.Tags)
|
||||
{
|
||||
if (string.IsNullOrEmpty(tag.EquipmentId)) continue;
|
||||
if (!equipmentById.TryGetValue(tag.EquipmentId!, out var eq)) continue;
|
||||
if (!lineById.TryGetValue(eq.UnsLineId, out var line)) continue;
|
||||
if (!areaById.TryGetValue(line.UnsAreaId, out var area)) continue;
|
||||
|
||||
var path = $"/{area.Name}/{line.Name}/{eq.Name}/{tag.Name}";
|
||||
result[path] = tag.TagConfig; // duplicate-path collisions naturally win-last; UI publish-validation rules out duplicate names
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
if (_bridge is not null) await _bridge.DisposeAsync().ConfigureAwait(false);
|
||||
foreach (var d in _sources.Disposables)
|
||||
{
|
||||
try { d.Dispose(); }
|
||||
catch (Exception ex) { _logger.LogWarning(ex, "Phase 7 disposable threw during shutdown"); }
|
||||
}
|
||||
if (_historianSink is IDisposable disposableSink) disposableSink.Dispose();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user