diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Contracts.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Contracts.cs index fdab1935..7506173c 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Contracts.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Contracts.cs @@ -225,4 +225,8 @@ public sealed class WriteAlarmEventsReply /// Per-event success flag, parallel to . [Key(3)] public bool[] PerEventOk { get; set; } = Array.Empty(); + + /// Per-event status parallel to the request's Events: 0=Ack, 1=Retry, 2=Permanent. + /// Empty ⇒ an older sidecar that only sent ; the client falls back to it. + [Key(4)] public byte[] PerEventStatus { get; set; } = Array.Empty(); } diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs index 95517ac7..03343cc0 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs @@ -298,21 +298,29 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist /// /// /// - /// PermanentFail limitation (finding 002): this writer never returns - /// . The sidecar wire contract - /// () carries only a per-event - /// boolean (succeeded / did-not-succeed) and provides no unrecoverable vs. - /// transient distinction. A poison event that the historian SDK can never persist - /// (e.g. a permanently malformed row) will therefore retry indefinitely inside the - /// store-and-forward drain worker rather than being moved to the dead-letter table. - /// Extending the protocol to add a per-event status enum (Ack / Retry / Permanent) - /// requires a coordinated additive change to the .NET 4.8 sidecar and is tracked as - /// a follow-up. Until then, the drain worker's own retry-count limit is the - /// backstop against an infinite loop. + /// Per-event status: when the sidecar populates the additive + /// wire field (0=Ack, 1=Retry, + /// 2=Permanent), each slot maps directly to / + /// / . + /// The sidecar emits Permanent for structurally-malformed (poison) events, + /// so the store-and-forward drain worker dead-letters them immediately instead of + /// looping to the retry cap. An older sidecar that sends only the legacy + /// boolean is handled by the + /// fallback path below (true→Ack, false→RetryPlease) for rolling-deploy back-compat. /// /// - /// Transport or deserialization failures return - /// for every event in the batch; the drain worker's backoff controls recovery. + /// Documented boundary: only structurally-malformed events surface as + /// . A structurally-valid event that + /// the AAH historian SDK rejects for a deeper, semantic reason still maps to + /// (→ retry cap), because the sidecar's + /// writer returns only a transient/persisted boolean for events it actually attempts. + /// Surfacing richer SDK-semantic permanent rejections requires the infra-gated + /// AahClientManagedAlarmEventWriter to report a status code rather than a bool. + /// + /// + /// Transport or deserialization failures, and any whole-call failure + /// (Success=false), return for + /// every event in the batch; the drain worker's backoff controls recovery. /// /// /// The batch of alarm historian events to write. @@ -347,9 +355,26 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist return fail; } - // Per-event status: PerEventOk[i] = true → Ack; false → RetryPlease. - // NOTE: PermanentFail is never emitted — see for the wire-contract - // limitation and why poison events currently retry rather than dead-letter. + // Prefer the granular per-event status when the sidecar provides it (new wire + // field); fall back to the legacy PerEventOk bool for older sidecars. The sidecar + // emits status 2 (Permanent) for structurally-malformed poison events so they + // dead-letter immediately rather than retrying to the cap. + if (reply.PerEventStatus is { Length: > 0 } status && status.Length == batch.Count) + { + var statusOutcomes = new HistorianWriteOutcome[batch.Count]; + for (var i = 0; i < batch.Count; i++) + statusOutcomes[i] = status[i] switch + { + 0 => HistorianWriteOutcome.Ack, + 2 => HistorianWriteOutcome.PermanentFail, + _ => HistorianWriteOutcome.RetryPlease, // 1 or unknown + }; + return statusOutcomes; + } + + // Legacy fallback: PerEventOk[i] = true → Ack; false → RetryPlease. An older + // sidecar without PerEventStatus can never signal PermanentFail through this + // path, so a poison event retries to the drain worker's cap. var outcomes = new HistorianWriteOutcome[batch.Count]; for (var i = 0; i < batch.Count; i++) { @@ -361,7 +386,8 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist catch { // Transport / deserialization failure — every event is retry-please. The drain - // worker's backoff handles recovery. PermanentFail is never emitted (see ). + // worker's backoff handles recovery. PermanentFail is only emitted from the + // success path's PerEventStatus mapping, never from a transport failure. var fail = new HistorianWriteOutcome[batch.Count]; Array.Fill(fail, HistorianWriteOutcome.RetryPlease); return fail; diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Contracts.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Contracts.cs index 22e4ea81..91e25400 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Contracts.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Contracts.cs @@ -263,4 +263,8 @@ public sealed class WriteAlarmEventsReply /// Per-event success flag, parallel to . [Key(3)] public bool[] PerEventOk { get; set; } = Array.Empty(); + + /// Per-event status parallel to the request's Events: 0=Ack, 1=Retry, 2=Permanent. + /// Empty ⇒ an older sidecar that only sent ; the client falls back to it. + [Key(4)] public byte[] PerEventStatus { get; set; } = Array.Empty(); } diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/HistorianFrameHandler.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/HistorianFrameHandler.cs index 9eb9c915..95cc047a 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/HistorianFrameHandler.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/HistorianFrameHandler.cs @@ -16,6 +16,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; /// public sealed class HistorianFrameHandler : IFrameHandler { + // WriteAlarmEventsReply.PerEventStatus byte semantics: 0=Ack, 1=Retry, 2=Permanent. + private const byte StatusAck = 0; + private const byte StatusRetry = 1; + private const byte StatusPermanent = 2; + private readonly IHistorianDataSource _historian; private readonly IAlarmEventWriter? _alarmWriter; private readonly ILogger _logger; @@ -174,18 +179,51 @@ public sealed class HistorianFrameHandler : IFrameHandler reply.Success = false; reply.Error = "Sidecar not configured with an alarm-event writer."; reply.PerEventOk = new bool[req.Events.Length]; + reply.PerEventStatus = AllStatus(req.Events.Length, StatusRetry); await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false); return; } try { - var perEvent = await _alarmWriter.WriteAsync(req.Events, ct).ConfigureAwait(false); - reply.PerEventOk = perEvent; + // Classify each event before touching the writer: structurally-malformed + // (poison) events can never be persisted, so mark them Permanent and exclude + // them from the writer batch. Only the well-formed remainder is handed to the + // writer, whose bool[] result is mapped back onto the original indices. + var status = new byte[req.Events.Length]; + var writable = new List(req.Events.Length); + var originalIndex = new List(req.Events.Length); + for (var i = 0; i < req.Events.Length; i++) + { + if (IsStructurallyMalformed(req.Events[i])) + { + status[i] = StatusPermanent; + } + else + { + originalIndex.Add(i); + writable.Add(req.Events[i]); + } + } + + // Aligned 1:1 to `writable`; empty when every event was poison (writer skipped). + var perEvent = writable.Count == 0 + ? Array.Empty() + : await _alarmWriter.WriteAsync(writable.ToArray(), ct).ConfigureAwait(false); + + for (var i = 0; i < originalIndex.Count; i++) + { + var ok = i < perEvent.Length && perEvent[i]; + status[originalIndex[i]] = ok ? StatusAck : StatusRetry; + } + + reply.PerEventStatus = status; + reply.PerEventOk = StatusToOk(status); reply.Success = true; // Whole-batch Success stays true even when some events failed — per-event - // PerEventOk slots carry the granular result; the SQLite drain worker treats - // false slots as retry-please candidates. + // PerEventStatus slots carry the granular result (Ack / Retry / Permanent); + // the SQLite drain worker acks 0, retries 1, and dead-letters 2. PerEventOk + // is kept populated for rolling-deploy back-compat with an older client. } catch (Exception ex) { @@ -193,11 +231,41 @@ public sealed class HistorianFrameHandler : IFrameHandler reply.Success = false; reply.Error = ex.Message; reply.PerEventOk = new bool[req.Events.Length]; + reply.PerEventStatus = AllStatus(req.Events.Length, StatusRetry); } await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false); } + /// + /// Classifies an alarm event as structurally malformed (poison): an event the historian + /// event store can never persist regardless of retries. Such events are marked Permanent + /// so the store-and-forward sink dead-letters them immediately instead of looping to the + /// retry cap. A blank source name or alarm type, or a non-positive event timestamp, are + /// the structural invariants the historian write requires. + /// + /// The candidate alarm event. + /// true when the event is structurally malformed; otherwise false. + internal static bool IsStructurallyMalformed(AlarmHistorianEventDto e) => + e is null + || string.IsNullOrWhiteSpace(e.SourceName) + || string.IsNullOrWhiteSpace(e.AlarmType) + || e.EventTimeUtcTicks <= 0; + + private static byte[] AllStatus(int length, byte value) + { + var status = new byte[length]; + for (var i = 0; i < length; i++) status[i] = value; + return status; + } + + private static bool[] StatusToOk(byte[] status) + { + var ok = new bool[status.Length]; + for (var i = 0; i < status.Length; i++) ok[i] = status[i] == StatusAck; + return ok; + } + private static HistorianSampleDto[] ToWire(List samples) { var dtos = new HistorianSampleDto[samples.Count]; diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs index d2643fda..0689f0dd 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs @@ -302,6 +302,95 @@ public sealed class WonderwareHistorianClientTests outcomes[1].ShouldBe(HistorianWriteOutcome.RetryPlease); } + /// + /// The granular PerEventStatus wire field maps directly: 0→Ack, 1→Retry, 2→PermanentFail. + /// A poison event the sidecar marks Permanent (status 2) must dead-letter via + /// rather than retrying. + /// + [Fact] + public async Task WriteBatchAsync_PerEventStatusPermanent_MapsToPermanentFail() + { + await using var server = new FakeSidecarServer(Secret) + { + OnWriteAlarmEvents = _ => new WriteAlarmEventsReply + { + Success = true, + PerEventStatus = [2], // Permanent + }, + }; + await server.StartAsync(); + + await using var client = TcpClientFor(server); + var batch = new[] + { + new AlarmHistorianEvent("ev-poison", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow), + }; + + var outcomes = await client.WriteBatchAsync(batch, CancellationToken.None); + + outcomes.Count.ShouldBe(1); + outcomes[0].ShouldBe(HistorianWriteOutcome.PermanentFail); + } + + /// + /// PerEventStatus = 0 maps to ; the granular path + /// takes precedence over the legacy PerEventOk bool when both are present. + /// + [Fact] + public async Task WriteBatchAsync_PerEventStatusAck_MapsToAck() + { + await using var server = new FakeSidecarServer(Secret) + { + OnWriteAlarmEvents = _ => new WriteAlarmEventsReply + { + Success = true, + PerEventStatus = [0], // Ack + }, + }; + await server.StartAsync(); + + await using var client = TcpClientFor(server); + var batch = new[] + { + new AlarmHistorianEvent("ev-ok", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow), + }; + + var outcomes = await client.WriteBatchAsync(batch, CancellationToken.None); + + outcomes.Count.ShouldBe(1); + outcomes[0].ShouldBe(HistorianWriteOutcome.Ack); + } + + /// + /// Rolling-deploy back-compat: an older sidecar that sends an empty PerEventStatus but a + /// populated PerEventOk must still classify via the legacy bool path (false→RetryPlease). + /// + [Fact] + public async Task WriteBatchAsync_EmptyPerEventStatus_FallsBackToLegacyPerEventOk() + { + await using var server = new FakeSidecarServer(Secret) + { + OnWriteAlarmEvents = _ => new WriteAlarmEventsReply + { + Success = true, + PerEventStatus = [], // older sidecar — no granular status + PerEventOk = [false], + }, + }; + await server.StartAsync(); + + await using var client = TcpClientFor(server); + var batch = new[] + { + new AlarmHistorianEvent("ev-legacy", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow), + }; + + var outcomes = await client.WriteBatchAsync(batch, CancellationToken.None); + + outcomes.Count.ShouldBe(1); + outcomes[0].ShouldBe(HistorianWriteOutcome.RetryPlease); + } + /// Verifies that Hello handshake throws UnauthorizedAccessException on secret mismatch. [Fact] public async Task Hello_BadSecret_ThrowsUnauthorizedAccess() diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/HistorianEventClassifierTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/HistorianEventClassifierTests.cs new file mode 100644 index 00000000..c1899fed --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/HistorianEventClassifierTests.cs @@ -0,0 +1,226 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; +using Serilog.Core; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.Ipc +{ + /// + /// Pins the sidecar's poison-event classifier and the per-event status mapping in + /// . A structurally-malformed alarm event is marked + /// Permanent (status 2) and excluded from the writer batch so the store-and-forward sink + /// dead-letters it immediately rather than looping to the retry cap; well-formed events + /// map to Ack (0) / Retry (1) from the writer's per-event bool result. + /// + [Trait("Category", "Unit")] + public sealed class HistorianEventClassifierTests + { + /// Verifies a blank source name is classified structurally malformed. + [Fact] + public void IsStructurallyMalformed_BlankSourceName_IsTrue() + { + var e = WellFormed(); + e.SourceName = " "; + + HistorianFrameHandler.IsStructurallyMalformed(e).ShouldBeTrue(); + } + + /// Verifies a blank alarm type is classified structurally malformed. + [Fact] + public void IsStructurallyMalformed_BlankAlarmType_IsTrue() + { + var e = WellFormed(); + e.AlarmType = ""; + + HistorianFrameHandler.IsStructurallyMalformed(e).ShouldBeTrue(); + } + + /// Verifies a non-positive event timestamp is classified structurally malformed. + /// The event timestamp in ticks to test. + [Theory] + [InlineData(0L)] + [InlineData(-1L)] + public void IsStructurallyMalformed_NonPositiveTimestamp_IsTrue(long ticks) + { + var e = WellFormed(); + e.EventTimeUtcTicks = ticks; + + HistorianFrameHandler.IsStructurallyMalformed(e).ShouldBeTrue(); + } + + /// Verifies a well-formed event is not classified structurally malformed. + [Fact] + public void IsStructurallyMalformed_WellFormedEvent_IsFalse() + { + HistorianFrameHandler.IsStructurallyMalformed(WellFormed()).ShouldBeFalse(); + } + + /// + /// A mixed batch — one poison event then one well-formed event the writer acks — must + /// yield PerEventStatus = [2, 0]: the poison event is Permanent and excluded from the + /// writer batch, and only the well-formed event reaches the writer. + /// + [Fact] + public async Task Handler_MixedBatch_MarksPoisonPermanent_AndOnlyWritesWellFormed() + { + var poison = WellFormed(); + poison.EventId = "poison"; + poison.SourceName = ""; // structurally malformed + + var good = WellFormed(); + good.EventId = "good"; + + var fakeWriter = new RecordingAlarmEventWriter(_ => true); + var handler = new HistorianFrameHandler(new StubHistorian(), Logger.None, fakeWriter); + + var req = new WriteAlarmEventsRequest { Events = new[] { poison, good }, CorrelationId = "c1" }; + var reply = await RoundTripAsync(handler, req); + + reply.Success.ShouldBeTrue(); + reply.PerEventStatus.ShouldBe(new byte[] { 2, 0 }); + reply.PerEventOk.ShouldBe(new[] { false, true }); + + // The writer only ever saw the well-formed event. + fakeWriter.Received.Count.ShouldBe(1); + fakeWriter.Received[0].EventId.ShouldBe("good"); + } + + /// + /// A well-formed event the writer reports as not-persisted maps to Retry (status 1), + /// not Permanent — only structurally-malformed events are Permanent. + /// + [Fact] + public async Task Handler_WriterReportsNotPersisted_MapsToRetry() + { + var good = WellFormed(); + good.EventId = "good"; + + var fakeWriter = new RecordingAlarmEventWriter(_ => false); + var handler = new HistorianFrameHandler(new StubHistorian(), Logger.None, fakeWriter); + + var req = new WriteAlarmEventsRequest { Events = new[] { good }, CorrelationId = "c2" }; + var reply = await RoundTripAsync(handler, req); + + reply.Success.ShouldBeTrue(); + reply.PerEventStatus.ShouldBe(new byte[] { 1 }); + reply.PerEventOk.ShouldBe(new[] { false }); + } + + /// + /// An all-poison batch must short-circuit the writer entirely (no WriteAsync call) + /// and mark every slot Permanent. + /// + [Fact] + public async Task Handler_AllPoison_SkipsWriter_AllPermanent() + { + var p1 = WellFormed(); + p1.SourceName = ""; + var p2 = WellFormed(); + p2.AlarmType = ""; + + var fakeWriter = new RecordingAlarmEventWriter(_ => true); + var handler = new HistorianFrameHandler(new StubHistorian(), Logger.None, fakeWriter); + + var req = new WriteAlarmEventsRequest { Events = new[] { p1, p2 }, CorrelationId = "c3" }; + var reply = await RoundTripAsync(handler, req); + + reply.Success.ShouldBeTrue(); + reply.PerEventStatus.ShouldBe(new byte[] { 2, 2 }); + fakeWriter.Received.Count.ShouldBe(0); + } + + private static AlarmHistorianEventDto WellFormed() => new() + { + EventId = "ev", + SourceName = "Tank.HiHi", + ConditionId = "HiHi", + AlarmType = "LimitAlarm:Activated", + Message = "msg", + Severity = 700, + EventTimeUtcTicks = new DateTime(2026, 6, 18, 12, 0, 0, DateTimeKind.Utc).Ticks, + AckComment = null, + }; + + /// + /// Drives a WriteAlarmEvents request through the real frame handler over an in-memory + /// duplex stream pair and deserializes the reply the handler writes back. + /// + private static async Task RoundTripAsync( + HistorianFrameHandler handler, WriteAlarmEventsRequest req) + { + var capture = new MemoryStream(); + using var writer = new FrameWriter(capture, leaveOpen: true); + + var body = MessagePackSerializer.Serialize(req); + await handler.HandleAsync(MessageKind.WriteAlarmEventsRequest, body, writer, CancellationToken.None); + + capture.Position = 0; + using var reader = new FrameReader(capture, leaveOpen: true); + var frame = await reader.ReadFrameAsync(CancellationToken.None); + frame.ShouldNotBeNull(); + frame!.Value.Kind.ShouldBe(MessageKind.WriteAlarmEventsReply); + return MessagePackSerializer.Deserialize(frame.Value.Body); + } + + /// An that records the batch it received and returns a fixed verdict. + private sealed class RecordingAlarmEventWriter : IAlarmEventWriter + { + private readonly Func _verdict; + + /// Initializes a new instance with the given per-event verdict. + /// Maps each received event to its persisted/not-persisted result. + public RecordingAlarmEventWriter(Func verdict) => _verdict = verdict; + + /// The events the writer was handed, in order. + public List Received { get; } = new(); + + /// + public Task WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken) + { + Received.AddRange(events); + return Task.FromResult(events.Select(_verdict).ToArray()); + } + } + + /// + /// A read data source the WriteAlarmEvents path never touches — present only to + /// satisfy the ctor's non-null requirement. + /// + private sealed class StubHistorian : IHistorianDataSource + { + /// + public Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, CancellationToken ct = default) + => throw new NotSupportedException(); + + /// + public Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, double intervalMs, string aggregateColumn, CancellationToken ct = default) + => throw new NotSupportedException(); + + /// + public Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, CancellationToken ct = default) + => throw new NotSupportedException(); + + /// + public Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, CancellationToken ct = default) + => throw new NotSupportedException(); + + /// + public HistorianHealthSnapshot GetHealthSnapshot() => throw new NotSupportedException(); + + /// + public void Dispose() { } + } + } +}