From bb10ba710845a20f62514d25e0a2178ff806b598 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 20 Apr 2026 22:18:39 -0400 Subject: [PATCH] =?UTF-8?q?Phase=207=20follow-up=20#247=20=E2=80=94=20Gala?= =?UTF-8?q?xy.Host=20historian=20writer=20+=20SQLite=20sink=20activation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the historian leg of Phase 7. Scripted alarm transitions now batch-flow through the existing Galaxy.Host pipe + queue durably in a local SQLite store- and-forward when Galaxy is the registered driver, instead of being dropped into NullAlarmHistorianSink. ## GalaxyHistorianWriter (Driver.Galaxy.Proxy.Ipc) IAlarmHistorianWriter implementation. Translates AlarmHistorianEvent → HistorianAlarmEventDto (Stream D contract), batches via the existing GalaxyIpcClient.CallAsync round-trip on MessageKind.HistorianAlarmEventRequest / Response, maps per-event HistorianAlarmEventOutcomeDto bytes back to HistorianWriteOutcome (Ack/RetryPlease/PermanentFail) so the SQLite drain worker knows what to ack vs dead-letter vs retry. Empty-batch fast path. Pipe-level transport faults (broken pipe, host crash) bubble up as GalaxyIpcException which the SQLite sink's drain worker translates to whole-batch RetryPlease per its catch contract. ## GalaxyProxyDriver implements IAlarmHistorianWriter Marker interface lets Phase7Composer discover it via type check at compose time. WriteBatchAsync delegates to a thin GalaxyHistorianWriter wrapping the driver's existing _client. Throws InvalidOperationException if InitializeAsync hasn't connected yet — the SQLite drain worker treats that as a transient batch failure and retries. ## Phase7Composer.ResolveHistorianSink Replaces the injected sink dep when any registered driver implements IAlarmHistorianWriter. Constructs SqliteStoreAndForwardSink at %ProgramData%/OtOpcUa/alarm-historian-queue.db (falls back to %TEMP% when ProgramData unavailable, e.g. dev), starts the 2s drain timer, owns the sink disposable for clean teardown. When no driver provides the writer, keeps the NullAlarmHistorianSink wired by Program.cs (#246). DisposeAsync now also disposes the owned SQLite sink in the right order: bridge → engines → owned sink → injected fallback. ## Tests — 7 new GalaxyHistorianWriterMappingTests ToDto round-trips every field; preserves null Comment; per-byte outcome enum mapping (Ack / RetryPlease / PermanentFail) via [Theory]; unknown byte throws; ctor null-guard. The IPC round-trip itself is covered by the live Host suite (task #240) which constructs a real pipe. Server.Phase7 tests: 34/34 still pass; Galaxy.Proxy tests: 25/25 (+7 = 32 total). ## Phase 7 production wiring chain — COMPLETE - ✅ #243 composition kernel - ✅ #245 scripted-alarm IReadable adapter - ✅ #244 driver bridge - ✅ #246 Program.cs wire-in - ✅ #247 this — Galaxy.Host historian writer + SQLite sink activation What unblocks now: task #240 live OPC UA E2E smoke. With a Galaxy driver registered, scripted alarm transitions flow end-to-end through the engine → SQLite queue → drain worker → Galaxy.Host IPC → Aveva Historian alarm schema. Without Galaxy, NullSink keeps the engines functional and the queue dormant. --- .../GalaxyProxyDriver.cs | 19 ++++ .../Ipc/GalaxyHistorianWriter.cs | 90 +++++++++++++++++++ ....MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj | 1 + .../Phase7/Phase7Composer.cs | 56 +++++++++++- .../GalaxyHistorianWriterMappingTests.cs | 83 +++++++++++++++++ 5 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyHistorianWriter.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/GalaxyHistorianWriterMappingTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs index a233b26..e97a6b5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs @@ -1,4 +1,5 @@ using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus; @@ -22,6 +23,7 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) IHistoryProvider, IRediscoverable, IHostConnectivityProbe, + IAlarmHistorianWriter, IDisposable { private GalaxyIpcClient? _client; @@ -511,6 +513,23 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) _ => AlarmSeverity.Critical, }; + /// + /// Phase 7 follow-up #247 — IAlarmHistorianWriter implementation. Forwards alarm + /// batches to Galaxy.Host over the existing IPC channel, reusing the connection + /// the driver already established for data-plane traffic. Throws + /// when called before + /// has connected the client; the SQLite drain worker + /// translates that to whole-batch RetryPlease per its catch contract. + /// + public Task> WriteBatchAsync( + IReadOnlyList batch, CancellationToken cancellationToken) + { + if (_client is null) + throw new InvalidOperationException( + "GalaxyProxyDriver IPC client not connected — historian writes rejected until InitializeAsync completes"); + return new GalaxyHistorianWriter(_client).WriteBatchAsync(batch, cancellationToken); + } + public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult(); } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyHistorianWriter.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyHistorianWriter.cs new file mode 100644 index 0000000..a535e43 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyHistorianWriter.cs @@ -0,0 +1,90 @@ +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc; + +/// +/// Phase 7 follow-up (task #247) — bridges 's +/// drain worker to Driver.Galaxy.Host over the existing +/// pipe. Translates batches into the +/// wire format the Host expects + maps per-event +/// responses back to +/// so the SQLite queue knows what to ack / +/// dead-letter / retry. +/// +/// +/// +/// Reuses the IPC channel already opens for the +/// Galaxy data plane — no second pipe to Driver.Galaxy.Host, no separate +/// auth handshake. The IPC client's call gate serializes historian batches with +/// driver Reads/Writes/Subscribes; historian batches are infrequent (every few +/// seconds at most under the SQLite sink's drain cadence) so the contention is +/// negligible compared to per-tag-read pressure. +/// +/// +/// Pipe-level transport faults (broken pipe, host crash) bubble up as +/// which the SQLite sink's drain worker catches + +/// translates to a whole-batch RetryPlease per the +/// docstring — failed events stay queued +/// for the next drain tick after backoff. +/// +/// +public sealed class GalaxyHistorianWriter : IAlarmHistorianWriter +{ + private readonly GalaxyIpcClient _client; + + public GalaxyHistorianWriter(GalaxyIpcClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + public async Task> WriteBatchAsync( + IReadOnlyList batch, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(batch); + if (batch.Count == 0) return []; + + var request = new HistorianAlarmEventRequest + { + Events = batch.Select(ToDto).ToArray(), + }; + + var response = await _client.CallAsync( + requestKind: MessageKind.HistorianAlarmEventRequest, + request: request, + expectedResponseKind: MessageKind.HistorianAlarmEventResponse, + ct: cancellationToken).ConfigureAwait(false); + + if (response.Outcomes.Length != batch.Count) + throw new InvalidOperationException( + $"Galaxy.Host returned {response.Outcomes.Length} outcomes for a batch of {batch.Count} — protocol mismatch"); + + var outcomes = new HistorianWriteOutcome[response.Outcomes.Length]; + for (var i = 0; i < response.Outcomes.Length; i++) + outcomes[i] = MapOutcome(response.Outcomes[i]); + return outcomes; + } + + internal static HistorianAlarmEventDto ToDto(AlarmHistorianEvent e) => new() + { + AlarmId = e.AlarmId, + EquipmentPath = e.EquipmentPath, + AlarmName = e.AlarmName, + AlarmTypeName = e.AlarmTypeName, + Severity = (int)e.Severity, + EventKind = e.EventKind, + Message = e.Message, + User = e.User, + Comment = e.Comment, + TimestampUtcUnixMs = new DateTimeOffset(e.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + }; + + internal static HistorianWriteOutcome MapOutcome(HistorianAlarmEventOutcomeDto wire) => wire switch + { + HistorianAlarmEventOutcomeDto.Ack => HistorianWriteOutcome.Ack, + HistorianAlarmEventOutcomeDto.RetryPlease => HistorianWriteOutcome.RetryPlease, + HistorianAlarmEventOutcomeDto.PermanentFail => HistorianWriteOutcome.PermanentFail, + _ => throw new InvalidOperationException($"Unknown HistorianAlarmEventOutcomeDto byte {(byte)wire}"), + }; +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj index 47dadcc..862f2da 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj @@ -14,6 +14,7 @@ + diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs index 8fe81c0..67a3af5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs @@ -47,6 +47,9 @@ public sealed class Phase7Composer : IAsyncDisposable private DriverSubscriptionBridge? _bridge; private Phase7ComposedSources _sources = Phase7ComposedSources.Empty; + // Sink we constructed in PrepareAsync (vs. the injected fallback). Held so + // DisposeAsync can flush + tear down the SQLite drain timer. + private SqliteStoreAndForwardSink? _ownedSink; private bool _disposed; public Phase7Composer( @@ -96,13 +99,20 @@ public sealed class Phase7Composer : IAsyncDisposable var upstream = new CachedTagUpstreamSource(); + // Phase 7 follow-up #247 — if any registered driver implements IAlarmHistorianWriter + // (today: GalaxyProxyDriver), wrap it in a SqliteStoreAndForwardSink at + // %ProgramData%/OtOpcUa/alarm-historian-queue.db with the 2s drain cadence the + // sink's docstring recommends. Otherwise fall back to the injected sink (Null in + // the default registration). + var historianSink = ResolveHistorianSink(); + _sources = Phase7EngineComposer.Compose( scripts: scripts, virtualTags: virtualTags, scriptedAlarms: scriptedAlarms, upstream: upstream, alarmStateStore: new InMemoryAlarmStateStore(), - historianSink: _historianSink, + historianSink: historianSink, rootScriptLogger: _scriptLogger, loggerFactory: _loggerFactory); @@ -121,6 +131,47 @@ public sealed class Phase7Composer : IAsyncDisposable return _sources; } + private IAlarmHistorianSink ResolveHistorianSink() + { + IAlarmHistorianWriter? writer = null; + foreach (var driverId in _driverHost.RegisteredDriverIds) + { + if (_driverHost.GetDriver(driverId) is IAlarmHistorianWriter w) + { + writer = w; + _logger.LogInformation( + "Phase 7 historian sink: driver {Driver} provides IAlarmHistorianWriter — wiring SqliteStoreAndForwardSink", + driverId); + break; + } + } + if (writer is null) + { + _logger.LogInformation( + "Phase 7 historian sink: no driver provides IAlarmHistorianWriter — using {Sink}", + _historianSink.GetType().Name); + return _historianSink; + } + + var queueRoot = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData); + if (string.IsNullOrEmpty(queueRoot)) queueRoot = Path.GetTempPath(); + var queueDir = Path.Combine(queueRoot, "OtOpcUa"); + Directory.CreateDirectory(queueDir); + var queuePath = Path.Combine(queueDir, "alarm-historian-queue.db"); + + var sinkLogger = _loggerFactory.CreateLogger(); + // SqliteStoreAndForwardSink wants a Serilog logger for warn-on-eviction emissions; + // bridge the Microsoft logger via Serilog's null-safe path until the sink's + // dependency surface is reshaped (covered as part of release-readiness). + var serilogShim = _scriptLogger.ForContext("HistorianQueuePath", queuePath); + _ownedSink = new SqliteStoreAndForwardSink( + databasePath: queuePath, + writer: writer, + logger: serilogShim); + _ownedSink.StartDrainLoop(TimeSpan.FromSeconds(2)); + return _ownedSink; + } + /// /// For each registered driver that exposes , /// build a UNS-path → driver-fullRef map from its EquipmentNamespaceContent. @@ -178,6 +229,9 @@ public sealed class Phase7Composer : IAsyncDisposable try { d.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "Phase 7 disposable threw during shutdown"); } } + // Owned SQLite sink: dispose first so the drain timer stops + final batch flushes + // before we release the writer-bearing driver via DriverHost.DisposeAsync upstream. + _ownedSink?.Dispose(); if (_historianSink is IDisposable disposableSink) disposableSink.Dispose(); } } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/GalaxyHistorianWriterMappingTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/GalaxyHistorianWriterMappingTests.cs new file mode 100644 index 0000000..0154bbe --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/GalaxyHistorianWriterMappingTests.cs @@ -0,0 +1,83 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests; + +/// +/// Phase 7 follow-up #247 — covers the wire-format translation between the +/// the SQLite sink hands to the writer + the +/// the Galaxy.Host IPC contract expects, plus +/// the per-event outcome enum mapping. Pure functions; the round-trip over a real +/// pipe is exercised by the live Host suite (task #240). +/// +[Trait("Category", "Unit")] +public sealed class GalaxyHistorianWriterMappingTests +{ + [Fact] + public void ToDto_round_trips_every_field() + { + var ts = new DateTime(2026, 4, 20, 14, 30, 0, DateTimeKind.Utc); + var e = new AlarmHistorianEvent( + AlarmId: "al-7", + EquipmentPath: "/Site/Line/Cell", + AlarmName: "HighTemp", + AlarmTypeName: "LimitAlarm", + Severity: AlarmSeverity.High, + EventKind: "RaiseEvent", + Message: "Temp 92°C exceeded 90°C", + User: "operator-7", + Comment: "ack with reason", + TimestampUtc: ts); + + var dto = GalaxyHistorianWriter.ToDto(e); + + dto.AlarmId.ShouldBe("al-7"); + dto.EquipmentPath.ShouldBe("/Site/Line/Cell"); + dto.AlarmName.ShouldBe("HighTemp"); + dto.AlarmTypeName.ShouldBe("LimitAlarm"); + dto.Severity.ShouldBe((int)AlarmSeverity.High); + dto.EventKind.ShouldBe("RaiseEvent"); + dto.Message.ShouldBe("Temp 92°C exceeded 90°C"); + dto.User.ShouldBe("operator-7"); + dto.Comment.ShouldBe("ack with reason"); + dto.TimestampUtcUnixMs.ShouldBe(new DateTimeOffset(ts, TimeSpan.Zero).ToUnixTimeMilliseconds()); + } + + [Fact] + public void ToDto_preserves_null_Comment() + { + var e = new AlarmHistorianEvent( + "a", "/p", "n", "AlarmCondition", AlarmSeverity.Low, "RaiseEvent", "m", + User: "system", Comment: null, TimestampUtc: DateTime.UtcNow); + + GalaxyHistorianWriter.ToDto(e).Comment.ShouldBeNull(); + } + + [Theory] + [InlineData(HistorianAlarmEventOutcomeDto.Ack, HistorianWriteOutcome.Ack)] + [InlineData(HistorianAlarmEventOutcomeDto.RetryPlease, HistorianWriteOutcome.RetryPlease)] + [InlineData(HistorianAlarmEventOutcomeDto.PermanentFail, HistorianWriteOutcome.PermanentFail)] + public void MapOutcome_round_trips_every_byte( + HistorianAlarmEventOutcomeDto wire, HistorianWriteOutcome expected) + { + GalaxyHistorianWriter.MapOutcome(wire).ShouldBe(expected); + } + + [Fact] + public void MapOutcome_unknown_byte_throws() + { + Should.Throw( + () => GalaxyHistorianWriter.MapOutcome((HistorianAlarmEventOutcomeDto)0xFF)); + } + + [Fact] + public void Null_client_rejected() + { + Should.Throw(() => new GalaxyHistorianWriter(null!)); + } + +}