diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs new file mode 100644 index 00000000..535ed779 --- /dev/null +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs @@ -0,0 +1,141 @@ +using Grpc.Core; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.HistorianGateway.Client; +using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Mapping; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway; + +/// +/// backed by the HistorianGateway SendEvent path. The +/// drain worker behind SqliteStoreAndForwardSink calls +/// and uses the returned per-event +/// to decide retry vs. dead-letter, so this writer maps every +/// gateway result — success ack, the published client's typed exception hierarchy, raw +/// (defensive), and any unexpected error — onto exactly one outcome per +/// event and never throws. +/// +/// +/// +/// Each event is sent individually so one poison event cannot fail the whole batch: a permanent +/// failure on event N is dead-lettered while its siblings continue. Outcomes are returned in +/// input order, one per event; an empty batch yields an empty list with no gateway call. +/// +/// +/// Outcome mapping. Success (or store-forward-queued) ack ⇒ . +/// Transient gRPC codes (Unavailable, DeadlineExceeded, ResourceExhausted, +/// Aborted, Internal) and the auth codes (Unauthenticated, +/// PermissionDenied) ⇒ — an auth fix +/// re-enables the batch, so an auth blip never dead-letters. Permanent codes +/// (InvalidArgument, FailedPrecondition, OutOfRange, Unimplemented) ⇒ +/// (dead-letter poison — mirrors the Wonderware +/// PerEventStatus==2 boundary). The typed client exceptions are classified by type, or by +/// the they wrap; any other or unclassifiable error defaults to +/// so the drain worker cannot loop a poison +/// event forever. +/// +/// +public sealed class GatewayAlarmHistorianWriter : IAlarmHistorianWriter +{ + private readonly IHistorianGatewayClient _client; + private readonly ILogger _logger; + + /// Creates the writer over a gateway client seam. + /// The gateway client used for the SendEvent write path. + /// Logger for per-event outcome diagnostics (never logs event content). + public GatewayAlarmHistorianWriter(IHistorianGatewayClient client, ILogger logger) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + public async Task> WriteBatchAsync( + IReadOnlyList batch, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(batch); + + if (batch.Count == 0) + { + return Array.Empty(); + } + + var outcomes = new HistorianWriteOutcome[batch.Count]; + + for (var i = 0; i < batch.Count; i++) + { + outcomes[i] = await SendOneAsync(batch[i], cancellationToken).ConfigureAwait(false); + } + + return outcomes; + } + + private async Task SendOneAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) + { + try + { + var ack = await _client.SendEventAsync(AlarmEventMapper.ToHistorianEvent(evt), cancellationToken) + .ConfigureAwait(false); + return MapAck(ack); + } + catch (Exception exception) + { + // NEVER throw out of the writer — the drain worker expects a per-event outcome. Classify + // and log only the failure category (no event content, hostnames, or credentials). + var outcome = Classify(exception); + if (outcome == HistorianWriteOutcome.PermanentFail) + { + _logger.LogWarning( + "Alarm SendEvent permanently failed ({Exception}); dead-lettering the event.", + exception.GetType().Name); + } + else + { + _logger.LogDebug( + "Alarm SendEvent transiently failed ({Exception}); will retry.", + exception.GetType().Name); + } + + return outcome; + } + } + + // A non-success ack that the gateway durably queued (store-forward) is still accepted — do not + // re-drain it. A non-success, non-queued ack is a soft failure: retry rather than dead-letter. + private static HistorianWriteOutcome MapAck(WriteAck ack) => + ack.Success || ack.Queued ? HistorianWriteOutcome.Ack : HistorianWriteOutcome.RetryPlease; + + private static HistorianWriteOutcome Classify(Exception exception) => exception switch + { + // Published client's typed hierarchy (production reality). Unavailable + both auth kinds retry. + HistorianGatewayUnavailableException => HistorianWriteOutcome.RetryPlease, + HistorianGatewayAuthenticationException => HistorianWriteOutcome.RetryPlease, + HistorianGatewayAuthorizationException => HistorianWriteOutcome.RetryPlease, + // A base client exception wrapping a permanent/transient RpcException → classify by inner status. + HistorianGatewayException { InnerException: RpcException inner } => ClassifyStatus(inner.StatusCode), + // Defensive raw RpcException path (the seam type signature permits it). + RpcException rpc => ClassifyStatus(rpc.StatusCode), + // Anything else (incl. a bare base client exception we cannot classify) → dead-letter to avoid + // an infinite drain loop on a poison event. + _ => HistorianWriteOutcome.PermanentFail, + }; + + private static HistorianWriteOutcome ClassifyStatus(StatusCode code) => code switch + { + StatusCode.Unavailable + or StatusCode.DeadlineExceeded + or StatusCode.ResourceExhausted + or StatusCode.Aborted + or StatusCode.Internal + // An auth fix re-enables the whole batch — never dead-letter on an auth blip. + or StatusCode.Unauthenticated + or StatusCode.PermissionDenied => HistorianWriteOutcome.RetryPlease, + StatusCode.InvalidArgument + or StatusCode.FailedPrecondition + or StatusCode.OutOfRange + or StatusCode.Unimplemented => HistorianWriteOutcome.PermanentFail, + // Unknown/unclassified gRPC code → dead-letter to avoid an infinite drain loop. + _ => HistorianWriteOutcome.PermanentFail, + }; +} diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs new file mode 100644 index 00000000..0bbd80a7 --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs @@ -0,0 +1,185 @@ +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; +using ZB.MOM.WW.HistorianGateway.Client; +using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests; + +public sealed class GatewayAlarmHistorianWriterTests +{ + private static AlarmHistorianEvent Evt(string id) => new( + id, "Area/Pump", "N", "LimitAlarm", + AlarmSeverity.High, "Activated", "m", "u", null, + new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc)); + + private static GatewayAlarmHistorianWriter Writer(FakeHistorianGatewayClient fake) => + new(fake, NullLogger.Instance); + + [Fact] + public async Task All_acked_when_SendEvent_succeeds() + { + var fake = new FakeHistorianGatewayClient { SendEventResult = new WriteAck { Success = true } }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A"), Evt("B") }, TestContext.Current.CancellationToken); + + Assert.All(outcomes, o => Assert.Equal(HistorianWriteOutcome.Ack, o)); + // One SendEvent per event so a single poison event cannot fail the whole batch. + Assert.Equal(2, fake.SendEventCallCount); + Assert.Equal(2, outcomes.Count); + } + + [Fact] + public async Task Queued_ack_is_treated_as_Ack() + { + // A store-forward-queued send is durably accepted by the gateway → do not re-drain. + var fake = new FakeHistorianGatewayClient { SendEventResult = new WriteAck { Success = false, Queued = true } }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.Ack, outcomes[0]); + } + + // ---- Typed published-client exception hierarchy (production reality) ------------------------ + + [Fact] + public async Task Typed_Unavailable_is_RetryPlease() + { + var fake = new FakeHistorianGatewayClient + { + SendEventThrows = new HistorianGatewayUnavailableException("down"), + }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]); + } + + [Fact] + public async Task Typed_Authentication_is_RetryPlease() + { + var fake = new FakeHistorianGatewayClient + { + SendEventThrows = new HistorianGatewayAuthenticationException("bad key"), + }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]); + } + + [Fact] + public async Task Typed_Authorization_is_RetryPlease() + { + var fake = new FakeHistorianGatewayClient + { + SendEventThrows = new HistorianGatewayAuthorizationException("no scope"), + }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]); + } + + [Fact] + public async Task Base_typed_exception_with_inner_permanent_status_is_PermanentFail() + { + // The published client maps a permanent gRPC code (InvalidArgument) onto the base + // HistorianGatewayException carrying the original RpcException as InnerException. + var inner = new RpcException(new Status(StatusCode.InvalidArgument, "malformed")); + var fake = new FakeHistorianGatewayClient + { + SendEventThrows = new HistorianGatewayException("malformed", inner), + }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]); + } + + [Fact] + public async Task Bare_base_typed_exception_is_PermanentFail() + { + // No classifiable inner status → default to PermanentFail to avoid infinite drain loops. + var fake = new FakeHistorianGatewayClient + { + SendEventThrows = new HistorianGatewayException("unclassifiable"), + }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]); + } + + // ---- Defensive raw RpcException path -------------------------------------------------------- + + [Fact] + public async Task Raw_Unavailable_is_RetryPlease() + { + var fake = new FakeHistorianGatewayClient + { + SendEventThrows = new RpcException(new Status(StatusCode.Unavailable, "down")), + }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]); + } + + [Theory] + [InlineData(StatusCode.DeadlineExceeded)] + [InlineData(StatusCode.ResourceExhausted)] + [InlineData(StatusCode.Aborted)] + [InlineData(StatusCode.Internal)] + [InlineData(StatusCode.Unauthenticated)] + [InlineData(StatusCode.PermissionDenied)] + public async Task Raw_transient_or_auth_status_is_RetryPlease(StatusCode code) + { + var fake = new FakeHistorianGatewayClient + { + SendEventThrows = new RpcException(new Status(code, "x")), + }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]); + } + + [Theory] + [InlineData(StatusCode.InvalidArgument)] + [InlineData(StatusCode.FailedPrecondition)] + [InlineData(StatusCode.OutOfRange)] + [InlineData(StatusCode.Unimplemented)] + public async Task Raw_permanent_status_is_PermanentFail(StatusCode code) + { + var fake = new FakeHistorianGatewayClient + { + SendEventThrows = new RpcException(new Status(code, "x")), + }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]); + } + + [Fact] + public async Task Unknown_exception_is_PermanentFail() + { + var fake = new FakeHistorianGatewayClient { SendEventThrows = new InvalidOperationException("boom") }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken); + + Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]); + } + + [Fact] + public async Task Empty_batch_returns_empty() + { + var outcomes = await Writer(new FakeHistorianGatewayClient()) + .WriteBatchAsync(Array.Empty(), TestContext.Current.CancellationToken); + + Assert.Empty(outcomes); + } +}