From f99cf5033a161520621c466ce0f89c4156324abd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 30 Apr 2026 16:17:05 -0400 Subject: [PATCH] sidecar: AahClientManagedAlarmEventWriter implements IAlarmEventWriter (PR C.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fourth PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Independent of Tracks A and B — the sidecar slot defined in HistorianFrameHandler line 242 is unwired today; PR C.2 (next) flips it on in Program.cs. - AlarmHistorianWriteOutcome (sidecar-local, net48 — twin of Core.AlarmHistorian.HistorianWriteOutcome which is net10): Ack / RetryPlease / PermanentFail. - IAlarmHistorianWriteBackend abstraction so the SDK call can be faked in unit tests. - AahClientManagedAlarmEventWriter implements IAlarmEventWriter, delegates to the backend, maps Ack→true / Retry|Permanent→false for the IPC bool[] reply contract. Backend exception → whole batch RetryPlease (preserves the sender's queue across transients rather than dropping). Wrong-count return defends against a backend bug desyncing queue accounting. - SdkAlarmHistorianWriteBackend — production binding skeleton. Reports RetryPlease for every event and logs a structured diagnostic until PR D.1 pins the live aahClientManaged entry point against the dev rig. The sender's SqliteStoreAndForwardSink retains queued events, mirroring today's NullAlarmHistorianSink behaviour but with visible diagnostics instead of silent discard. - MapOutcome shared helper — pinned via theory tests so the D.1 swap can change the SDK call site without reshuffling the HRESULT → outcome mapping. Tests: - 6 writer tests — empty batch / single Ack / mixed Ack-Retry- Permanent-Ack ordering / backend-throw → RetryPlease batch / cancellation propagates / wrong-count defensive degrade. - 5 outcome theory cases — hresult 0 → Ack, malformed wins over hresult 0, comm error → Retry, unknown failure → Retry, malformed + comm → Permanent. - Full sidecar test suite: 48 passed (was 42; 6 new). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../AahClientManagedAlarmEventWriter.cs | 105 ++++++++++++ .../Backend/AlarmHistorianWriteOutcome.cs | 19 +++ .../Backend/IAlarmHistorianWriteBackend.cs | 30 ++++ .../Backend/SdkAlarmHistorianWriteBackend.cs | 73 ++++++++ .../AahClientManagedAlarmEventWriterTests.cs | 159 ++++++++++++++++++ 5 files changed, 386 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AahClientManagedAlarmEventWriter.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AlarmHistorianWriteOutcome.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IAlarmHistorianWriteBackend.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/AahClientManagedAlarmEventWriterTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AahClientManagedAlarmEventWriter.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AahClientManagedAlarmEventWriter.cs new file mode 100644 index 0000000..fd496ac --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AahClientManagedAlarmEventWriter.cs @@ -0,0 +1,105 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Serilog; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend +{ + /// + /// IPC-side implementation that delegates to an + /// (production: aahClientManaged-bound) + /// and maps the trinary down to the + /// bool[] the IPC reply contract carries. Per-event outcomes: + /// + /// true (drop from sender's queue). + /// false (sender retries on next drain tick). + /// false (sender's B.4 widens the IPC bool back into the trinary outcome by inspecting structured diagnostics; this slot intentionally collapses to "not-ok" at the wire). + /// + /// + public sealed class AahClientManagedAlarmEventWriter : IAlarmEventWriter + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly IAlarmHistorianWriteBackend _backend; + + public AahClientManagedAlarmEventWriter(IAlarmHistorianWriteBackend backend) + { + _backend = backend ?? throw new ArgumentNullException(nameof(backend)); + } + + public async Task WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken) + { + if (events is null || events.Length == 0) + { + return new bool[0]; + } + + AlarmHistorianWriteOutcome[] outcomes; + try + { + outcomes = await _backend.WriteBatchAsync(events, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + // Backend-level failure (cluster unreachable, transport error). Treat the + // whole batch as RetryPlease so the sender's queue holds the rows for + // the next drain tick — preferable to dropping them on a transient. + Log.Warning(ex, + "Alarm historian backend WriteBatchAsync threw — marking entire {Count}-event batch RetryPlease.", + events.Length); + var fallback = new bool[events.Length]; + return fallback; + } + + if (outcomes.Length != events.Length) + { + // Backend contract violation — defensive degrade so a bug in the backend + // doesn't desync the sender's queue accounting. Treat as RetryPlease. + Log.Warning( + "Alarm historian backend returned {ReturnedCount} outcomes for a batch of {InputCount} events; degrading to RetryPlease for the whole batch.", + outcomes.Length, events.Length); + return new bool[events.Length]; + } + + var perEventOk = new bool[outcomes.Length]; + for (var i = 0; i < outcomes.Length; i++) + { + perEventOk[i] = outcomes[i] == AlarmHistorianWriteOutcome.Ack; + } + return perEventOk; + } + + /// + /// Translate the outcome of a single SDK call (raw HRESULT + diagnostic) into the + /// trinary . Exposed for the production + /// to share the mapping with tests. + /// + public static AlarmHistorianWriteOutcome MapOutcome(int hresult, bool isCommunicationError, bool isMalformedInput) + { + // Order matters: malformed input is permanent regardless of HRESULT pattern; + // communication-class errors are transient regardless of which specific + // HRESULT bit fired. + if (isMalformedInput) + { + return AlarmHistorianWriteOutcome.PermanentFail; + } + if (hresult == 0) + { + return AlarmHistorianWriteOutcome.Ack; + } + if (isCommunicationError) + { + return AlarmHistorianWriteOutcome.RetryPlease; + } + // Default: unknown HRESULT failure — be conservative and let the sender retry. + // The sender's drain worker has its own dead-letter cap so a permanently-broken + // event won't loop forever. + return AlarmHistorianWriteOutcome.RetryPlease; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AlarmHistorianWriteOutcome.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AlarmHistorianWriteOutcome.cs new file mode 100644 index 0000000..6bf863b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AlarmHistorianWriteOutcome.cs @@ -0,0 +1,19 @@ +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend +{ + /// + /// Per-event outcome from . + /// Sidecar-local twin of Core.AlarmHistorian.HistorianWriteOutcome (the + /// sidecar runs net48 and cannot reference the net10 Core project; the IPC + /// contract narrows this to bool per slot, so the lmxopcua-side consumer + /// widens that back into the trinary outcome at the IPC boundary in PR B.4). + /// + public enum AlarmHistorianWriteOutcome + { + /// Event accepted by the historian. Drop from the store-and-forward queue. + Ack, + /// Transient failure (server busy, disconnected, timeout). Leave queued; retry on next drain tick. + RetryPlease, + /// Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter on the lmxopcua side. + PermanentFail, + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IAlarmHistorianWriteBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IAlarmHistorianWriteBackend.cs new file mode 100644 index 0000000..a7c8f83 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IAlarmHistorianWriteBackend.cs @@ -0,0 +1,30 @@ +using System.Threading; +using System.Threading.Tasks; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend +{ + /// + /// The actual aahClientManaged-bound writer. Extracted so unit tests can + /// substitute a fake without touching the SDK; the production + /// implementation lives in . + /// + /// + /// Implementations are responsible for connection management + cluster + /// failover. The wrapping + /// handles batch-level orchestration but delegates the per-event SDK call + /// here so the unit tests can drive every documented MxStatus outcome + /// without an installed AVEVA Historian. + /// + public interface IAlarmHistorianWriteBackend + { + /// + /// Persist the supplied events to the historian. Returns one outcome per + /// input slot in the same order — must always return an array of the same + /// length as . + /// + Task WriteBatchAsync( + AlarmHistorianEventDto[] events, + CancellationToken cancellationToken); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs new file mode 100644 index 0000000..c9e4cfb --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs @@ -0,0 +1,73 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Serilog; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend +{ + /// + /// Production backed by AVEVA Historian's + /// aahClientManaged alarm-event write API. The exact SDK entry point is + /// pinned during the live-rig smoke in PR D.1 — until that gate, this backend + /// reports for every + /// event with a structured diagnostic so the lmxopcua-side + /// SqliteStoreAndForwardSink retains the queued events rather than dropping + /// or hard-failing them. + /// + /// + /// + /// Cluster failover reuses via + /// the shared connection pool — there is + /// no second connection pool for writes. Wonderware Historian's alarm-event + /// write surface accepts the same HistorianAccess session a read + /// opens, so reusing the picker is parity-preserving with v1's + /// GalaxyHistorianWriter. + /// + /// + /// Once D.1 confirms the SDK entry point, this class swaps the placeholder + /// body for the real call sequence. The mapping from raw HRESULT / + /// HistorianError codes onto + /// is already shared via + /// so the smoke-pinned change stays minimal. + /// + /// + public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly HistorianConfiguration _config; + + public SdkAlarmHistorianWriteBackend(HistorianConfiguration config) + { + _config = config ?? throw new ArgumentNullException(nameof(config)); + } + + public Task WriteBatchAsync( + AlarmHistorianEventDto[] events, + CancellationToken cancellationToken) + { + if (events is null || events.Length == 0) + { + return Task.FromResult(new AlarmHistorianWriteOutcome[0]); + } + + // Placeholder: pin the SDK entry point in PR D.1 against a live AVEVA + // Historian. Until then the call returns RetryPlease for every slot so + // the lmxopcua-side sink keeps the events queued rather than dropping + // them — same effect as the current NullAlarmHistorianSink fallback, + // but visible through the structured diagnostic + per-event outcome. + Log.Warning( + "Alarm historian SDK write path not yet pinned — returning RetryPlease for {Count} event(s) from server {Server}. PR D.1 swaps this for the live aahClientManaged call.", + events.Length, + _config.ServerName); + + var outcomes = new AlarmHistorianWriteOutcome[events.Length]; + for (var i = 0; i < outcomes.Length; i++) + { + outcomes[i] = AlarmHistorianWriteOutcome.RetryPlease; + } + return Task.FromResult(outcomes); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/AahClientManagedAlarmEventWriterTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/AahClientManagedAlarmEventWriterTests.cs new file mode 100644 index 0000000..89c6b9f --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/AahClientManagedAlarmEventWriterTests.cs @@ -0,0 +1,159 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + /// + /// PR C.1 — pins the trinary outcome → IPC bool[] mapping that the sidecar uses + /// on the WriteAlarmEvents reply. Per-event outcomes: + /// Ack → true, RetryPlease → false, PermanentFail → false. + /// The sender's B.4 widens the IPC bool back into the trinary outcome at the + /// IPC boundary using structured diagnostics; the wire intentionally collapses + /// to "ok / not-ok". + /// + [Trait("Category", "Unit")] + public sealed class AahClientManagedAlarmEventWriterTests + { + [Fact] + public async Task Empty_batch_returns_empty_array_without_invoking_backend() + { + var backend = new RecordingBackend(_ => throw new InvalidOperationException("must not invoke for empty input")); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync(Array.Empty(), CancellationToken.None); + + result.ShouldBeEmpty(); + backend.Calls.ShouldBe(0); + } + + [Fact] + public async Task Single_ack_outcome_maps_to_true() + { + var backend = new RecordingBackend(events => events.Select(_ => AlarmHistorianWriteOutcome.Ack).ToArray()); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None); + + result.ShouldBe(new[] { true }); + } + + [Fact] + public async Task Mixed_batch_preserves_per_slot_ordering() + { + // Ack / Retry / Permanent / Ack — the sender uses positional matching against + // its queue, so every slot must hit the exact bool corresponding to its input. + var backend = new RecordingBackend(_ => new[] + { + AlarmHistorianWriteOutcome.Ack, + AlarmHistorianWriteOutcome.RetryPlease, + AlarmHistorianWriteOutcome.PermanentFail, + AlarmHistorianWriteOutcome.Ack, + }); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync( + new[] { Event("E1"), Event("E2"), Event("E3"), Event("E4") }, + CancellationToken.None); + + result.ShouldBe(new[] { true, false, false, true }); + } + + [Fact] + public async Task Backend_exception_marks_whole_batch_RetryPlease() + { + var backend = new RecordingBackend(_ => throw new InvalidOperationException("cluster unreachable")); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync( + new[] { Event("E1"), Event("E2"), Event("E3") }, + CancellationToken.None); + + // Whole batch must end up as "not ok" (RetryPlease at the trinary layer) — + // dropping a transiently-failed batch corrupts the sender's queue. + result.ShouldBe(new[] { false, false, false }); + } + + [Fact] + public async Task Cancellation_propagates_from_backend() + { + var backend = new RecordingBackend(_ => throw new OperationCanceledException()); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var ex = await Should.ThrowAsync(() => + writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None)); + ex.ShouldNotBeNull(); + } + + [Fact] + public async Task Backend_returning_wrong_count_degrades_to_RetryPlease() + { + // Backend returns more outcomes than inputs — defensive degrade rather than + // letting a backend bug desync the sender's queue accounting. + var backend = new RecordingBackend(_ => new[] + { + AlarmHistorianWriteOutcome.Ack, + AlarmHistorianWriteOutcome.Ack, + }); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None); + + result.ShouldBe(new[] { false }); + } + + [Theory] + // hresult 0 + clean → Ack + [InlineData(0, false, false, AlarmHistorianWriteOutcome.Ack)] + // hresult 0 but malformed → PermanentFail (malformed wins) + [InlineData(0, false, true, AlarmHistorianWriteOutcome.PermanentFail)] + // non-zero hresult + comm error → RetryPlease + [InlineData(unchecked((int)0x80131500), true, false, AlarmHistorianWriteOutcome.RetryPlease)] + // non-zero hresult, no comm flag, no malformed → conservative RetryPlease + [InlineData(unchecked((int)0x80131500), false, false, AlarmHistorianWriteOutcome.RetryPlease)] + // any malformed input → PermanentFail regardless of hresult + [InlineData(unchecked((int)0x80131500), true, true, AlarmHistorianWriteOutcome.PermanentFail)] + public void MapOutcome_table(int hresult, bool isCommunicationError, bool isMalformedInput, AlarmHistorianWriteOutcome expected) + { + AahClientManagedAlarmEventWriter + .MapOutcome(hresult, isCommunicationError, isMalformedInput) + .ShouldBe(expected); + } + + private static AlarmHistorianEventDto Event(string id) => new AlarmHistorianEventDto + { + EventId = id, + SourceName = "Tank01", + ConditionId = "Tank01.Level.HiHi", + AlarmType = "AnalogLimitAlarm.HiHi", + Message = "Tank 01 high-high level", + Severity = 750, + EventTimeUtcTicks = DateTime.UtcNow.Ticks, + AckComment = null, + }; + + private sealed class RecordingBackend : IAlarmHistorianWriteBackend + { + private readonly Func _produce; + public int Calls { get; private set; } + + public RecordingBackend(Func produce) + { + _produce = produce; + } + + public Task WriteBatchAsync( + AlarmHistorianEventDto[] events, CancellationToken cancellationToken) + { + Calls++; + return Task.FromResult(_produce(events)); + } + } + } +} -- 2.49.1