Closes the historian leg of Phase 7. Scripted alarm transitions now batch-flow through the existing Galaxy.Host pipe + queue durably in a local SQLite store- and-forward when Galaxy is the registered driver, instead of being dropped into NullAlarmHistorianSink. ## GalaxyHistorianWriter (Driver.Galaxy.Proxy.Ipc) IAlarmHistorianWriter implementation. Translates AlarmHistorianEvent → HistorianAlarmEventDto (Stream D contract), batches via the existing GalaxyIpcClient.CallAsync round-trip on MessageKind.HistorianAlarmEventRequest / Response, maps per-event HistorianAlarmEventOutcomeDto bytes back to HistorianWriteOutcome (Ack/RetryPlease/PermanentFail) so the SQLite drain worker knows what to ack vs dead-letter vs retry. Empty-batch fast path. Pipe-level transport faults (broken pipe, host crash) bubble up as GalaxyIpcException which the SQLite sink's drain worker translates to whole-batch RetryPlease per its catch contract. ## GalaxyProxyDriver implements IAlarmHistorianWriter Marker interface lets Phase7Composer discover it via type check at compose time. WriteBatchAsync delegates to a thin GalaxyHistorianWriter wrapping the driver's existing _client. Throws InvalidOperationException if InitializeAsync hasn't connected yet — the SQLite drain worker treats that as a transient batch failure and retries. ## Phase7Composer.ResolveHistorianSink Replaces the injected sink dep when any registered driver implements IAlarmHistorianWriter. Constructs SqliteStoreAndForwardSink at %ProgramData%/OtOpcUa/alarm-historian-queue.db (falls back to %TEMP% when ProgramData unavailable, e.g. dev), starts the 2s drain timer, owns the sink disposable for clean teardown. When no driver provides the writer, keeps the NullAlarmHistorianSink wired by Program.cs (#246). DisposeAsync now also disposes the owned SQLite sink in the right order: bridge → engines → owned sink → injected fallback. ## Tests — 7 new GalaxyHistorianWriterMappingTests ToDto round-trips every field; preserves null Comment; per-byte outcome enum mapping (Ack / RetryPlease / PermanentFail) via [Theory]; unknown byte throws; ctor null-guard. The IPC round-trip itself is covered by the live Host suite (task #240) which constructs a real pipe. Server.Phase7 tests: 34/34 still pass; Galaxy.Proxy tests: 25/25 (+7 = 32 total). ## Phase 7 production wiring chain — COMPLETE - ✅ #243 composition kernel - ✅ #245 scripted-alarm IReadable adapter - ✅ #244 driver bridge - ✅ #246 Program.cs wire-in - ✅ #247 this — Galaxy.Host historian writer + SQLite sink activation What unblocks now: task #240 live OPC UA E2E smoke. With a Galaxy driver registered, scripted alarm transitions flow end-to-end through the engine → SQLite queue → drain worker → Galaxy.Host IPC → Aveva Historian alarm schema. Without Galaxy, NullSink keeps the engines functional and the queue dormant.
238 lines
11 KiB
C#
238 lines
11 KiB
C#
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
|
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
|
|
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
|
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
|
|
|
/// <summary>
|
|
/// Phase 7 follow-up (task #246) — orchestrates the runtime composition of virtual
|
|
/// tags + scripted alarms + the historian sink + the driver-bridge that feeds the
|
|
/// engines. Called by <see cref="OpcUaServerService"/> after the bootstrap generation
|
|
/// loads + before <see cref="OpcUaApplicationHost.StartAsync"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// <see cref="PrepareAsync"/> reads Script / VirtualTag / ScriptedAlarm rows from
|
|
/// the central config DB at the bootstrapped generation, instantiates a
|
|
/// <see cref="CachedTagUpstreamSource"/>, runs <see cref="Phase7EngineComposer.Compose"/>,
|
|
/// starts a <see cref="DriverSubscriptionBridge"/> per registered driver feeding
|
|
/// <see cref="EquipmentNamespaceContent"/>'s tag rows into the cache, and returns
|
|
/// the engine-backed <see cref="Core.Abstractions.IReadable"/> sources for
|
|
/// <see cref="OpcUaApplicationHost.SetPhase7Sources"/>.
|
|
/// </para>
|
|
/// <para>
|
|
/// <see cref="DisposeAsync"/> tears down the bridge first (so no more events
|
|
/// arrive at the cache), then the engines (so cascades + timer ticks stop), then
|
|
/// the SQLite sink (which flushes any in-flight drain). Lifetime is owned by the
|
|
/// host; <see cref="OpcUaServerService.StopAsync"/> calls dispose during graceful
|
|
/// shutdown.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class Phase7Composer : IAsyncDisposable
|
|
{
|
|
private readonly IServiceScopeFactory _scopeFactory;
|
|
private readonly DriverHost _driverHost;
|
|
private readonly DriverEquipmentContentRegistry _equipmentRegistry;
|
|
private readonly IAlarmHistorianSink _historianSink;
|
|
private readonly ILoggerFactory _loggerFactory;
|
|
private readonly Serilog.ILogger _scriptLogger;
|
|
private readonly ILogger<Phase7Composer> _logger;
|
|
|
|
private DriverSubscriptionBridge? _bridge;
|
|
private Phase7ComposedSources _sources = Phase7ComposedSources.Empty;
|
|
// Sink we constructed in PrepareAsync (vs. the injected fallback). Held so
|
|
// DisposeAsync can flush + tear down the SQLite drain timer.
|
|
private SqliteStoreAndForwardSink? _ownedSink;
|
|
private bool _disposed;
|
|
|
|
public Phase7Composer(
|
|
IServiceScopeFactory scopeFactory,
|
|
DriverHost driverHost,
|
|
DriverEquipmentContentRegistry equipmentRegistry,
|
|
IAlarmHistorianSink historianSink,
|
|
ILoggerFactory loggerFactory,
|
|
Serilog.ILogger scriptLogger,
|
|
ILogger<Phase7Composer> logger)
|
|
{
|
|
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
|
|
_driverHost = driverHost ?? throw new ArgumentNullException(nameof(driverHost));
|
|
_equipmentRegistry = equipmentRegistry ?? throw new ArgumentNullException(nameof(equipmentRegistry));
|
|
_historianSink = historianSink ?? throw new ArgumentNullException(nameof(historianSink));
|
|
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
|
_scriptLogger = scriptLogger ?? throw new ArgumentNullException(nameof(scriptLogger));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
public Phase7ComposedSources Sources => _sources;
|
|
|
|
public async Task<Phase7ComposedSources> PrepareAsync(long generationId, CancellationToken ct)
|
|
{
|
|
if (_disposed) throw new ObjectDisposedException(nameof(Phase7Composer));
|
|
|
|
// Load the three Phase 7 row sets in one DB scope.
|
|
List<Script> scripts;
|
|
List<VirtualTag> virtualTags;
|
|
List<ScriptedAlarm> scriptedAlarms;
|
|
using (var scope = _scopeFactory.CreateScope())
|
|
{
|
|
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
|
|
scripts = await db.Scripts.AsNoTracking()
|
|
.Where(s => s.GenerationId == generationId).ToListAsync(ct).ConfigureAwait(false);
|
|
virtualTags = await db.VirtualTags.AsNoTracking()
|
|
.Where(v => v.GenerationId == generationId && v.Enabled).ToListAsync(ct).ConfigureAwait(false);
|
|
scriptedAlarms = await db.ScriptedAlarms.AsNoTracking()
|
|
.Where(a => a.GenerationId == generationId && a.Enabled).ToListAsync(ct).ConfigureAwait(false);
|
|
}
|
|
|
|
if (virtualTags.Count == 0 && scriptedAlarms.Count == 0)
|
|
{
|
|
_logger.LogInformation("Phase 7: no virtual tags or scripted alarms in generation {Gen}; engines dormant", generationId);
|
|
return Phase7ComposedSources.Empty;
|
|
}
|
|
|
|
var upstream = new CachedTagUpstreamSource();
|
|
|
|
// Phase 7 follow-up #247 — if any registered driver implements IAlarmHistorianWriter
|
|
// (today: GalaxyProxyDriver), wrap it in a SqliteStoreAndForwardSink at
|
|
// %ProgramData%/OtOpcUa/alarm-historian-queue.db with the 2s drain cadence the
|
|
// sink's docstring recommends. Otherwise fall back to the injected sink (Null in
|
|
// the default registration).
|
|
var historianSink = ResolveHistorianSink();
|
|
|
|
_sources = Phase7EngineComposer.Compose(
|
|
scripts: scripts,
|
|
virtualTags: virtualTags,
|
|
scriptedAlarms: scriptedAlarms,
|
|
upstream: upstream,
|
|
alarmStateStore: new InMemoryAlarmStateStore(),
|
|
historianSink: historianSink,
|
|
rootScriptLogger: _scriptLogger,
|
|
loggerFactory: _loggerFactory);
|
|
|
|
_logger.LogInformation(
|
|
"Phase 7: composed engines from generation {Gen} — {Vt} virtual tag(s), {Al} scripted alarm(s), {Sc} script(s)",
|
|
generationId, virtualTags.Count, scriptedAlarms.Count, scripts.Count);
|
|
|
|
// Build driver feeds from each registered driver's EquipmentNamespaceContent + start
|
|
// the bridge. Drivers without populated content (Galaxy SystemPlatform-kind, drivers
|
|
// whose Equipment rows haven't been published yet) contribute an empty feed which
|
|
// the bridge silently skips.
|
|
_bridge = new DriverSubscriptionBridge(upstream, _loggerFactory.CreateLogger<DriverSubscriptionBridge>());
|
|
var feeds = BuildDriverFeeds(_driverHost, _equipmentRegistry);
|
|
await _bridge.StartAsync(feeds, ct).ConfigureAwait(false);
|
|
|
|
return _sources;
|
|
}
|
|
|
|
private IAlarmHistorianSink ResolveHistorianSink()
|
|
{
|
|
IAlarmHistorianWriter? writer = null;
|
|
foreach (var driverId in _driverHost.RegisteredDriverIds)
|
|
{
|
|
if (_driverHost.GetDriver(driverId) is IAlarmHistorianWriter w)
|
|
{
|
|
writer = w;
|
|
_logger.LogInformation(
|
|
"Phase 7 historian sink: driver {Driver} provides IAlarmHistorianWriter — wiring SqliteStoreAndForwardSink",
|
|
driverId);
|
|
break;
|
|
}
|
|
}
|
|
if (writer is null)
|
|
{
|
|
_logger.LogInformation(
|
|
"Phase 7 historian sink: no driver provides IAlarmHistorianWriter — using {Sink}",
|
|
_historianSink.GetType().Name);
|
|
return _historianSink;
|
|
}
|
|
|
|
var queueRoot = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData);
|
|
if (string.IsNullOrEmpty(queueRoot)) queueRoot = Path.GetTempPath();
|
|
var queueDir = Path.Combine(queueRoot, "OtOpcUa");
|
|
Directory.CreateDirectory(queueDir);
|
|
var queuePath = Path.Combine(queueDir, "alarm-historian-queue.db");
|
|
|
|
var sinkLogger = _loggerFactory.CreateLogger<SqliteStoreAndForwardSink>();
|
|
// SqliteStoreAndForwardSink wants a Serilog logger for warn-on-eviction emissions;
|
|
// bridge the Microsoft logger via Serilog's null-safe path until the sink's
|
|
// dependency surface is reshaped (covered as part of release-readiness).
|
|
var serilogShim = _scriptLogger.ForContext("HistorianQueuePath", queuePath);
|
|
_ownedSink = new SqliteStoreAndForwardSink(
|
|
databasePath: queuePath,
|
|
writer: writer,
|
|
logger: serilogShim);
|
|
_ownedSink.StartDrainLoop(TimeSpan.FromSeconds(2));
|
|
return _ownedSink;
|
|
}
|
|
|
|
/// <summary>
|
|
/// For each registered driver that exposes <see cref="Core.Abstractions.ISubscribable"/>,
|
|
/// build a UNS-path → driver-fullRef map from its EquipmentNamespaceContent.
|
|
/// Path convention: <c>/{areaName}/{lineName}/{equipmentName}/{tagName}</c> matching
|
|
/// what the EquipmentNodeWalker emits into the OPC UA browse tree, so script literals
|
|
/// written against the operator-visible tree work without translation.
|
|
/// </summary>
|
|
internal static IReadOnlyList<DriverFeed> BuildDriverFeeds(
|
|
DriverHost driverHost, DriverEquipmentContentRegistry equipmentRegistry)
|
|
{
|
|
var feeds = new List<DriverFeed>();
|
|
foreach (var driverId in driverHost.RegisteredDriverIds)
|
|
{
|
|
var driver = driverHost.GetDriver(driverId);
|
|
if (driver is not Core.Abstractions.ISubscribable subscribable) continue;
|
|
|
|
var content = equipmentRegistry.Get(driverId);
|
|
if (content is null) continue;
|
|
|
|
var pathToFullRef = MapPathsToFullRefs(content);
|
|
if (pathToFullRef.Count == 0) continue;
|
|
|
|
feeds.Add(new DriverFeed(subscribable, pathToFullRef, TimeSpan.FromSeconds(1)));
|
|
}
|
|
return feeds;
|
|
}
|
|
|
|
internal static IReadOnlyDictionary<string, string> MapPathsToFullRefs(EquipmentNamespaceContent content)
|
|
{
|
|
var result = new Dictionary<string, string>(StringComparer.Ordinal);
|
|
var areaById = content.Areas.ToDictionary(a => a.UnsAreaId, StringComparer.OrdinalIgnoreCase);
|
|
var lineById = content.Lines.ToDictionary(l => l.UnsLineId, StringComparer.OrdinalIgnoreCase);
|
|
var equipmentById = content.Equipment.ToDictionary(e => e.EquipmentId, StringComparer.OrdinalIgnoreCase);
|
|
|
|
foreach (var tag in content.Tags)
|
|
{
|
|
if (string.IsNullOrEmpty(tag.EquipmentId)) continue;
|
|
if (!equipmentById.TryGetValue(tag.EquipmentId!, out var eq)) continue;
|
|
if (!lineById.TryGetValue(eq.UnsLineId, out var line)) continue;
|
|
if (!areaById.TryGetValue(line.UnsAreaId, out var area)) continue;
|
|
|
|
var path = $"/{area.Name}/{line.Name}/{eq.Name}/{tag.Name}";
|
|
result[path] = tag.TagConfig; // duplicate-path collisions naturally win-last; UI publish-validation rules out duplicate names
|
|
}
|
|
return result;
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
if (_bridge is not null) await _bridge.DisposeAsync().ConfigureAwait(false);
|
|
foreach (var d in _sources.Disposables)
|
|
{
|
|
try { d.Dispose(); }
|
|
catch (Exception ex) { _logger.LogWarning(ex, "Phase 7 disposable threw during shutdown"); }
|
|
}
|
|
// Owned SQLite sink: dispose first so the drain timer stops + final batch flushes
|
|
// before we release the writer-bearing driver via DriverHost.DisposeAsync upstream.
|
|
_ownedSink?.Dispose();
|
|
if (_historianSink is IDisposable disposableSink) disposableSink.Dispose();
|
|
}
|
|
}
|