diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AahClientManagedAlarmEventWriter.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AahClientManagedAlarmEventWriter.cs new file mode 100644 index 0000000..fd496ac --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AahClientManagedAlarmEventWriter.cs @@ -0,0 +1,105 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Serilog; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend +{ + /// + /// IPC-side implementation that delegates to an + /// (production: aahClientManaged-bound) + /// and maps the trinary down to the + /// bool[] the IPC reply contract carries. Per-event outcomes: + /// + /// true (drop from sender's queue). + /// false (sender retries on next drain tick). + /// false (sender's B.4 widens the IPC bool back into the trinary outcome by inspecting structured diagnostics; this slot intentionally collapses to "not-ok" at the wire). + /// + /// + public sealed class AahClientManagedAlarmEventWriter : IAlarmEventWriter + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly IAlarmHistorianWriteBackend _backend; + + public AahClientManagedAlarmEventWriter(IAlarmHistorianWriteBackend backend) + { + _backend = backend ?? throw new ArgumentNullException(nameof(backend)); + } + + public async Task WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken) + { + if (events is null || events.Length == 0) + { + return new bool[0]; + } + + AlarmHistorianWriteOutcome[] outcomes; + try + { + outcomes = await _backend.WriteBatchAsync(events, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + // Backend-level failure (cluster unreachable, transport error). Treat the + // whole batch as RetryPlease so the sender's queue holds the rows for + // the next drain tick — preferable to dropping them on a transient. + Log.Warning(ex, + "Alarm historian backend WriteBatchAsync threw — marking entire {Count}-event batch RetryPlease.", + events.Length); + var fallback = new bool[events.Length]; + return fallback; + } + + if (outcomes.Length != events.Length) + { + // Backend contract violation — defensive degrade so a bug in the backend + // doesn't desync the sender's queue accounting. Treat as RetryPlease. + Log.Warning( + "Alarm historian backend returned {ReturnedCount} outcomes for a batch of {InputCount} events; degrading to RetryPlease for the whole batch.", + outcomes.Length, events.Length); + return new bool[events.Length]; + } + + var perEventOk = new bool[outcomes.Length]; + for (var i = 0; i < outcomes.Length; i++) + { + perEventOk[i] = outcomes[i] == AlarmHistorianWriteOutcome.Ack; + } + return perEventOk; + } + + /// + /// Translate the outcome of a single SDK call (raw HRESULT + diagnostic) into the + /// trinary . Exposed for the production + /// to share the mapping with tests. + /// + public static AlarmHistorianWriteOutcome MapOutcome(int hresult, bool isCommunicationError, bool isMalformedInput) + { + // Order matters: malformed input is permanent regardless of HRESULT pattern; + // communication-class errors are transient regardless of which specific + // HRESULT bit fired. + if (isMalformedInput) + { + return AlarmHistorianWriteOutcome.PermanentFail; + } + if (hresult == 0) + { + return AlarmHistorianWriteOutcome.Ack; + } + if (isCommunicationError) + { + return AlarmHistorianWriteOutcome.RetryPlease; + } + // Default: unknown HRESULT failure — be conservative and let the sender retry. + // The sender's drain worker has its own dead-letter cap so a permanently-broken + // event won't loop forever. + return AlarmHistorianWriteOutcome.RetryPlease; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AlarmHistorianWriteOutcome.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AlarmHistorianWriteOutcome.cs new file mode 100644 index 0000000..6bf863b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/AlarmHistorianWriteOutcome.cs @@ -0,0 +1,19 @@ +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend +{ + /// + /// Per-event outcome from . + /// Sidecar-local twin of Core.AlarmHistorian.HistorianWriteOutcome (the + /// sidecar runs net48 and cannot reference the net10 Core project; the IPC + /// contract narrows this to bool per slot, so the lmxopcua-side consumer + /// widens that back into the trinary outcome at the IPC boundary in PR B.4). + /// + public enum AlarmHistorianWriteOutcome + { + /// Event accepted by the historian. Drop from the store-and-forward queue. + Ack, + /// Transient failure (server busy, disconnected, timeout). Leave queued; retry on next drain tick. + RetryPlease, + /// Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter on the lmxopcua side. + PermanentFail, + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IAlarmHistorianWriteBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IAlarmHistorianWriteBackend.cs new file mode 100644 index 0000000..a7c8f83 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IAlarmHistorianWriteBackend.cs @@ -0,0 +1,30 @@ +using System.Threading; +using System.Threading.Tasks; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend +{ + /// + /// The actual aahClientManaged-bound writer. Extracted so unit tests can + /// substitute a fake without touching the SDK; the production + /// implementation lives in . + /// + /// + /// Implementations are responsible for connection management + cluster + /// failover. The wrapping + /// handles batch-level orchestration but delegates the per-event SDK call + /// here so the unit tests can drive every documented MxStatus outcome + /// without an installed AVEVA Historian. + /// + public interface IAlarmHistorianWriteBackend + { + /// + /// Persist the supplied events to the historian. Returns one outcome per + /// input slot in the same order — must always return an array of the same + /// length as . + /// + Task WriteBatchAsync( + AlarmHistorianEventDto[] events, + CancellationToken cancellationToken); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs new file mode 100644 index 0000000..c9e4cfb --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs @@ -0,0 +1,73 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Serilog; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend +{ + /// + /// Production backed by AVEVA Historian's + /// aahClientManaged alarm-event write API. The exact SDK entry point is + /// pinned during the live-rig smoke in PR D.1 — until that gate, this backend + /// reports for every + /// event with a structured diagnostic so the lmxopcua-side + /// SqliteStoreAndForwardSink retains the queued events rather than dropping + /// or hard-failing them. + /// + /// + /// + /// Cluster failover reuses via + /// the shared connection pool — there is + /// no second connection pool for writes. Wonderware Historian's alarm-event + /// write surface accepts the same HistorianAccess session a read + /// opens, so reusing the picker is parity-preserving with v1's + /// GalaxyHistorianWriter. + /// + /// + /// Once D.1 confirms the SDK entry point, this class swaps the placeholder + /// body for the real call sequence. The mapping from raw HRESULT / + /// HistorianError codes onto + /// is already shared via + /// so the smoke-pinned change stays minimal. + /// + /// + public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly HistorianConfiguration _config; + + public SdkAlarmHistorianWriteBackend(HistorianConfiguration config) + { + _config = config ?? throw new ArgumentNullException(nameof(config)); + } + + public Task WriteBatchAsync( + AlarmHistorianEventDto[] events, + CancellationToken cancellationToken) + { + if (events is null || events.Length == 0) + { + return Task.FromResult(new AlarmHistorianWriteOutcome[0]); + } + + // Placeholder: pin the SDK entry point in PR D.1 against a live AVEVA + // Historian. Until then the call returns RetryPlease for every slot so + // the lmxopcua-side sink keeps the events queued rather than dropping + // them — same effect as the current NullAlarmHistorianSink fallback, + // but visible through the structured diagnostic + per-event outcome. + Log.Warning( + "Alarm historian SDK write path not yet pinned — returning RetryPlease for {Count} event(s) from server {Server}. PR D.1 swaps this for the live aahClientManaged call.", + events.Length, + _config.ServerName); + + var outcomes = new AlarmHistorianWriteOutcome[events.Length]; + for (var i = 0; i < outcomes.Length; i++) + { + outcomes[i] = AlarmHistorianWriteOutcome.RetryPlease; + } + return Task.FromResult(outcomes); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/AahClientManagedAlarmEventWriterTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/AahClientManagedAlarmEventWriterTests.cs new file mode 100644 index 0000000..89c6b9f --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/AahClientManagedAlarmEventWriterTests.cs @@ -0,0 +1,159 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + /// + /// PR C.1 — pins the trinary outcome → IPC bool[] mapping that the sidecar uses + /// on the WriteAlarmEvents reply. Per-event outcomes: + /// Ack → true, RetryPlease → false, PermanentFail → false. + /// The sender's B.4 widens the IPC bool back into the trinary outcome at the + /// IPC boundary using structured diagnostics; the wire intentionally collapses + /// to "ok / not-ok". + /// + [Trait("Category", "Unit")] + public sealed class AahClientManagedAlarmEventWriterTests + { + [Fact] + public async Task Empty_batch_returns_empty_array_without_invoking_backend() + { + var backend = new RecordingBackend(_ => throw new InvalidOperationException("must not invoke for empty input")); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync(Array.Empty(), CancellationToken.None); + + result.ShouldBeEmpty(); + backend.Calls.ShouldBe(0); + } + + [Fact] + public async Task Single_ack_outcome_maps_to_true() + { + var backend = new RecordingBackend(events => events.Select(_ => AlarmHistorianWriteOutcome.Ack).ToArray()); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None); + + result.ShouldBe(new[] { true }); + } + + [Fact] + public async Task Mixed_batch_preserves_per_slot_ordering() + { + // Ack / Retry / Permanent / Ack — the sender uses positional matching against + // its queue, so every slot must hit the exact bool corresponding to its input. + var backend = new RecordingBackend(_ => new[] + { + AlarmHistorianWriteOutcome.Ack, + AlarmHistorianWriteOutcome.RetryPlease, + AlarmHistorianWriteOutcome.PermanentFail, + AlarmHistorianWriteOutcome.Ack, + }); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync( + new[] { Event("E1"), Event("E2"), Event("E3"), Event("E4") }, + CancellationToken.None); + + result.ShouldBe(new[] { true, false, false, true }); + } + + [Fact] + public async Task Backend_exception_marks_whole_batch_RetryPlease() + { + var backend = new RecordingBackend(_ => throw new InvalidOperationException("cluster unreachable")); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync( + new[] { Event("E1"), Event("E2"), Event("E3") }, + CancellationToken.None); + + // Whole batch must end up as "not ok" (RetryPlease at the trinary layer) — + // dropping a transiently-failed batch corrupts the sender's queue. + result.ShouldBe(new[] { false, false, false }); + } + + [Fact] + public async Task Cancellation_propagates_from_backend() + { + var backend = new RecordingBackend(_ => throw new OperationCanceledException()); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var ex = await Should.ThrowAsync(() => + writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None)); + ex.ShouldNotBeNull(); + } + + [Fact] + public async Task Backend_returning_wrong_count_degrades_to_RetryPlease() + { + // Backend returns more outcomes than inputs — defensive degrade rather than + // letting a backend bug desync the sender's queue accounting. + var backend = new RecordingBackend(_ => new[] + { + AlarmHistorianWriteOutcome.Ack, + AlarmHistorianWriteOutcome.Ack, + }); + var writer = new AahClientManagedAlarmEventWriter(backend); + + var result = await writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None); + + result.ShouldBe(new[] { false }); + } + + [Theory] + // hresult 0 + clean → Ack + [InlineData(0, false, false, AlarmHistorianWriteOutcome.Ack)] + // hresult 0 but malformed → PermanentFail (malformed wins) + [InlineData(0, false, true, AlarmHistorianWriteOutcome.PermanentFail)] + // non-zero hresult + comm error → RetryPlease + [InlineData(unchecked((int)0x80131500), true, false, AlarmHistorianWriteOutcome.RetryPlease)] + // non-zero hresult, no comm flag, no malformed → conservative RetryPlease + [InlineData(unchecked((int)0x80131500), false, false, AlarmHistorianWriteOutcome.RetryPlease)] + // any malformed input → PermanentFail regardless of hresult + [InlineData(unchecked((int)0x80131500), true, true, AlarmHistorianWriteOutcome.PermanentFail)] + public void MapOutcome_table(int hresult, bool isCommunicationError, bool isMalformedInput, AlarmHistorianWriteOutcome expected) + { + AahClientManagedAlarmEventWriter + .MapOutcome(hresult, isCommunicationError, isMalformedInput) + .ShouldBe(expected); + } + + private static AlarmHistorianEventDto Event(string id) => new AlarmHistorianEventDto + { + EventId = id, + SourceName = "Tank01", + ConditionId = "Tank01.Level.HiHi", + AlarmType = "AnalogLimitAlarm.HiHi", + Message = "Tank 01 high-high level", + Severity = 750, + EventTimeUtcTicks = DateTime.UtcNow.Ticks, + AckComment = null, + }; + + private sealed class RecordingBackend : IAlarmHistorianWriteBackend + { + private readonly Func _produce; + public int Calls { get; private set; } + + public RecordingBackend(Func produce) + { + _produce = produce; + } + + public Task WriteBatchAsync( + AlarmHistorianEventDto[] events, CancellationToken cancellationToken) + { + Calls++; + return Task.FromResult(_produce(events)); + } + } + } +}