diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs index a233b26..e97a6b5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs @@ -1,4 +1,5 @@ using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus; @@ -22,6 +23,7 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) IHistoryProvider, IRediscoverable, IHostConnectivityProbe, + IAlarmHistorianWriter, IDisposable { private GalaxyIpcClient? _client; @@ -511,6 +513,23 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) _ => AlarmSeverity.Critical, }; + /// + /// Phase 7 follow-up #247 — IAlarmHistorianWriter implementation. Forwards alarm + /// batches to Galaxy.Host over the existing IPC channel, reusing the connection + /// the driver already established for data-plane traffic. Throws + /// when called before + /// has connected the client; the SQLite drain worker + /// translates that to whole-batch RetryPlease per its catch contract. + /// + public Task> WriteBatchAsync( + IReadOnlyList batch, CancellationToken cancellationToken) + { + if (_client is null) + throw new InvalidOperationException( + "GalaxyProxyDriver IPC client not connected — historian writes rejected until InitializeAsync completes"); + return new GalaxyHistorianWriter(_client).WriteBatchAsync(batch, cancellationToken); + } + public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult(); } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyHistorianWriter.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyHistorianWriter.cs new file mode 100644 index 0000000..a535e43 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/Ipc/GalaxyHistorianWriter.cs @@ -0,0 +1,90 @@ +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc; + +/// +/// Phase 7 follow-up (task #247) — bridges 's +/// drain worker to Driver.Galaxy.Host over the existing +/// pipe. Translates batches into the +/// wire format the Host expects + maps per-event +/// responses back to +/// so the SQLite queue knows what to ack / +/// dead-letter / retry. +/// +/// +/// +/// Reuses the IPC channel already opens for the +/// Galaxy data plane — no second pipe to Driver.Galaxy.Host, no separate +/// auth handshake. The IPC client's call gate serializes historian batches with +/// driver Reads/Writes/Subscribes; historian batches are infrequent (every few +/// seconds at most under the SQLite sink's drain cadence) so the contention is +/// negligible compared to per-tag-read pressure. +/// +/// +/// Pipe-level transport faults (broken pipe, host crash) bubble up as +/// which the SQLite sink's drain worker catches + +/// translates to a whole-batch RetryPlease per the +/// docstring — failed events stay queued +/// for the next drain tick after backoff. +/// +/// +public sealed class GalaxyHistorianWriter : IAlarmHistorianWriter +{ + private readonly GalaxyIpcClient _client; + + public GalaxyHistorianWriter(GalaxyIpcClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + public async Task> WriteBatchAsync( + IReadOnlyList batch, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(batch); + if (batch.Count == 0) return []; + + var request = new HistorianAlarmEventRequest + { + Events = batch.Select(ToDto).ToArray(), + }; + + var response = await _client.CallAsync( + requestKind: MessageKind.HistorianAlarmEventRequest, + request: request, + expectedResponseKind: MessageKind.HistorianAlarmEventResponse, + ct: cancellationToken).ConfigureAwait(false); + + if (response.Outcomes.Length != batch.Count) + throw new InvalidOperationException( + $"Galaxy.Host returned {response.Outcomes.Length} outcomes for a batch of {batch.Count} — protocol mismatch"); + + var outcomes = new HistorianWriteOutcome[response.Outcomes.Length]; + for (var i = 0; i < response.Outcomes.Length; i++) + outcomes[i] = MapOutcome(response.Outcomes[i]); + return outcomes; + } + + internal static HistorianAlarmEventDto ToDto(AlarmHistorianEvent e) => new() + { + AlarmId = e.AlarmId, + EquipmentPath = e.EquipmentPath, + AlarmName = e.AlarmName, + AlarmTypeName = e.AlarmTypeName, + Severity = (int)e.Severity, + EventKind = e.EventKind, + Message = e.Message, + User = e.User, + Comment = e.Comment, + TimestampUtcUnixMs = new DateTimeOffset(e.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + }; + + internal static HistorianWriteOutcome MapOutcome(HistorianAlarmEventOutcomeDto wire) => wire switch + { + HistorianAlarmEventOutcomeDto.Ack => HistorianWriteOutcome.Ack, + HistorianAlarmEventOutcomeDto.RetryPlease => HistorianWriteOutcome.RetryPlease, + HistorianAlarmEventOutcomeDto.PermanentFail => HistorianWriteOutcome.PermanentFail, + _ => throw new InvalidOperationException($"Unknown HistorianAlarmEventOutcomeDto byte {(byte)wire}"), + }; +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj index 47dadcc..862f2da 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj @@ -14,6 +14,7 @@ + diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs index 8fe81c0..67a3af5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7Composer.cs @@ -47,6 +47,9 @@ public sealed class Phase7Composer : IAsyncDisposable private DriverSubscriptionBridge? _bridge; private Phase7ComposedSources _sources = Phase7ComposedSources.Empty; + // Sink we constructed in PrepareAsync (vs. the injected fallback). Held so + // DisposeAsync can flush + tear down the SQLite drain timer. + private SqliteStoreAndForwardSink? _ownedSink; private bool _disposed; public Phase7Composer( @@ -96,13 +99,20 @@ public sealed class Phase7Composer : IAsyncDisposable var upstream = new CachedTagUpstreamSource(); + // Phase 7 follow-up #247 — if any registered driver implements IAlarmHistorianWriter + // (today: GalaxyProxyDriver), wrap it in a SqliteStoreAndForwardSink at + // %ProgramData%/OtOpcUa/alarm-historian-queue.db with the 2s drain cadence the + // sink's docstring recommends. Otherwise fall back to the injected sink (Null in + // the default registration). + var historianSink = ResolveHistorianSink(); + _sources = Phase7EngineComposer.Compose( scripts: scripts, virtualTags: virtualTags, scriptedAlarms: scriptedAlarms, upstream: upstream, alarmStateStore: new InMemoryAlarmStateStore(), - historianSink: _historianSink, + historianSink: historianSink, rootScriptLogger: _scriptLogger, loggerFactory: _loggerFactory); @@ -121,6 +131,47 @@ public sealed class Phase7Composer : IAsyncDisposable return _sources; } + private IAlarmHistorianSink ResolveHistorianSink() + { + IAlarmHistorianWriter? writer = null; + foreach (var driverId in _driverHost.RegisteredDriverIds) + { + if (_driverHost.GetDriver(driverId) is IAlarmHistorianWriter w) + { + writer = w; + _logger.LogInformation( + "Phase 7 historian sink: driver {Driver} provides IAlarmHistorianWriter — wiring SqliteStoreAndForwardSink", + driverId); + break; + } + } + if (writer is null) + { + _logger.LogInformation( + "Phase 7 historian sink: no driver provides IAlarmHistorianWriter — using {Sink}", + _historianSink.GetType().Name); + return _historianSink; + } + + var queueRoot = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData); + if (string.IsNullOrEmpty(queueRoot)) queueRoot = Path.GetTempPath(); + var queueDir = Path.Combine(queueRoot, "OtOpcUa"); + Directory.CreateDirectory(queueDir); + var queuePath = Path.Combine(queueDir, "alarm-historian-queue.db"); + + var sinkLogger = _loggerFactory.CreateLogger(); + // SqliteStoreAndForwardSink wants a Serilog logger for warn-on-eviction emissions; + // bridge the Microsoft logger via Serilog's null-safe path until the sink's + // dependency surface is reshaped (covered as part of release-readiness). + var serilogShim = _scriptLogger.ForContext("HistorianQueuePath", queuePath); + _ownedSink = new SqliteStoreAndForwardSink( + databasePath: queuePath, + writer: writer, + logger: serilogShim); + _ownedSink.StartDrainLoop(TimeSpan.FromSeconds(2)); + return _ownedSink; + } + /// /// For each registered driver that exposes , /// build a UNS-path → driver-fullRef map from its EquipmentNamespaceContent. @@ -178,6 +229,9 @@ public sealed class Phase7Composer : IAsyncDisposable try { d.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "Phase 7 disposable threw during shutdown"); } } + // Owned SQLite sink: dispose first so the drain timer stops + final batch flushes + // before we release the writer-bearing driver via DriverHost.DisposeAsync upstream. + _ownedSink?.Dispose(); if (_historianSink is IDisposable disposableSink) disposableSink.Dispose(); } } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/GalaxyHistorianWriterMappingTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/GalaxyHistorianWriterMappingTests.cs new file mode 100644 index 0000000..0154bbe --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/GalaxyHistorianWriterMappingTests.cs @@ -0,0 +1,83 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests; + +/// +/// Phase 7 follow-up #247 — covers the wire-format translation between the +/// the SQLite sink hands to the writer + the +/// the Galaxy.Host IPC contract expects, plus +/// the per-event outcome enum mapping. Pure functions; the round-trip over a real +/// pipe is exercised by the live Host suite (task #240). +/// +[Trait("Category", "Unit")] +public sealed class GalaxyHistorianWriterMappingTests +{ + [Fact] + public void ToDto_round_trips_every_field() + { + var ts = new DateTime(2026, 4, 20, 14, 30, 0, DateTimeKind.Utc); + var e = new AlarmHistorianEvent( + AlarmId: "al-7", + EquipmentPath: "/Site/Line/Cell", + AlarmName: "HighTemp", + AlarmTypeName: "LimitAlarm", + Severity: AlarmSeverity.High, + EventKind: "RaiseEvent", + Message: "Temp 92°C exceeded 90°C", + User: "operator-7", + Comment: "ack with reason", + TimestampUtc: ts); + + var dto = GalaxyHistorianWriter.ToDto(e); + + dto.AlarmId.ShouldBe("al-7"); + dto.EquipmentPath.ShouldBe("/Site/Line/Cell"); + dto.AlarmName.ShouldBe("HighTemp"); + dto.AlarmTypeName.ShouldBe("LimitAlarm"); + dto.Severity.ShouldBe((int)AlarmSeverity.High); + dto.EventKind.ShouldBe("RaiseEvent"); + dto.Message.ShouldBe("Temp 92°C exceeded 90°C"); + dto.User.ShouldBe("operator-7"); + dto.Comment.ShouldBe("ack with reason"); + dto.TimestampUtcUnixMs.ShouldBe(new DateTimeOffset(ts, TimeSpan.Zero).ToUnixTimeMilliseconds()); + } + + [Fact] + public void ToDto_preserves_null_Comment() + { + var e = new AlarmHistorianEvent( + "a", "/p", "n", "AlarmCondition", AlarmSeverity.Low, "RaiseEvent", "m", + User: "system", Comment: null, TimestampUtc: DateTime.UtcNow); + + GalaxyHistorianWriter.ToDto(e).Comment.ShouldBeNull(); + } + + [Theory] + [InlineData(HistorianAlarmEventOutcomeDto.Ack, HistorianWriteOutcome.Ack)] + [InlineData(HistorianAlarmEventOutcomeDto.RetryPlease, HistorianWriteOutcome.RetryPlease)] + [InlineData(HistorianAlarmEventOutcomeDto.PermanentFail, HistorianWriteOutcome.PermanentFail)] + public void MapOutcome_round_trips_every_byte( + HistorianAlarmEventOutcomeDto wire, HistorianWriteOutcome expected) + { + GalaxyHistorianWriter.MapOutcome(wire).ShouldBe(expected); + } + + [Fact] + public void MapOutcome_unknown_byte_throws() + { + Should.Throw( + () => GalaxyHistorianWriter.MapOutcome((HistorianAlarmEventOutcomeDto)0xFF)); + } + + [Fact] + public void Null_client_rejected() + { + Should.Throw(() => new GalaxyHistorianWriter(null!)); + } + +}