feat(historian-gateway): GatewayAlarmHistorianWriter — SendEvent + gRPC->outcome mapping

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
Joseph Doherty
2026-06-26 17:27:03 -04:00
parent 555bd477f1
commit d3081a659f
2 changed files with 326 additions and 0 deletions
@@ -0,0 +1,141 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.HistorianGateway.Client;
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Mapping;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
/// <summary>
/// <see cref="IAlarmHistorianWriter"/> backed by the HistorianGateway <c>SendEvent</c> path. The
/// drain worker behind <c>SqliteStoreAndForwardSink</c> calls
/// <see cref="WriteBatchAsync"/> and uses the returned per-event
/// <see cref="HistorianWriteOutcome"/> to decide retry vs. dead-letter, so this writer maps every
/// gateway result — success ack, the published client's typed exception hierarchy, raw
/// <see cref="RpcException"/> (defensive), and any unexpected error — onto exactly one outcome per
/// event and <b>never throws</b>.
/// </summary>
/// <remarks>
/// <para>
/// Each event is sent individually so one poison event cannot fail the whole batch: a permanent
/// failure on event N is dead-lettered while its siblings continue. Outcomes are returned in
/// input order, one per event; an empty batch yields an empty list with no gateway call.
/// </para>
/// <para>
/// <b>Outcome mapping.</b> Success (or store-forward-queued) ack ⇒ <see cref="HistorianWriteOutcome.Ack"/>.
/// Transient gRPC codes (<c>Unavailable</c>, <c>DeadlineExceeded</c>, <c>ResourceExhausted</c>,
/// <c>Aborted</c>, <c>Internal</c>) and the auth codes (<c>Unauthenticated</c>,
/// <c>PermissionDenied</c>) ⇒ <see cref="HistorianWriteOutcome.RetryPlease"/> — an auth fix
/// re-enables the batch, so an auth blip never dead-letters. Permanent codes
/// (<c>InvalidArgument</c>, <c>FailedPrecondition</c>, <c>OutOfRange</c>, <c>Unimplemented</c>) ⇒
/// <see cref="HistorianWriteOutcome.PermanentFail"/> (dead-letter poison — mirrors the Wonderware
/// <c>PerEventStatus==2</c> boundary). The typed client exceptions are classified by type, or by
/// the <see cref="RpcException"/> they wrap; any other or unclassifiable error defaults to
/// <see cref="HistorianWriteOutcome.PermanentFail"/> so the drain worker cannot loop a poison
/// event forever.
/// </para>
/// </remarks>
public sealed class GatewayAlarmHistorianWriter : IAlarmHistorianWriter
{
private readonly IHistorianGatewayClient _client;
private readonly ILogger<GatewayAlarmHistorianWriter> _logger;
/// <summary>Creates the writer over a gateway client seam.</summary>
/// <param name="client">The gateway client used for the <c>SendEvent</c> write path.</param>
/// <param name="logger">Logger for per-event outcome diagnostics (never logs event content).</param>
public GatewayAlarmHistorianWriter(IHistorianGatewayClient client, ILogger<GatewayAlarmHistorianWriter> logger)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(batch);
if (batch.Count == 0)
{
return Array.Empty<HistorianWriteOutcome>();
}
var outcomes = new HistorianWriteOutcome[batch.Count];
for (var i = 0; i < batch.Count; i++)
{
outcomes[i] = await SendOneAsync(batch[i], cancellationToken).ConfigureAwait(false);
}
return outcomes;
}
private async Task<HistorianWriteOutcome> SendOneAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
{
try
{
var ack = await _client.SendEventAsync(AlarmEventMapper.ToHistorianEvent(evt), cancellationToken)
.ConfigureAwait(false);
return MapAck(ack);
}
catch (Exception exception)
{
// NEVER throw out of the writer — the drain worker expects a per-event outcome. Classify
// and log only the failure category (no event content, hostnames, or credentials).
var outcome = Classify(exception);
if (outcome == HistorianWriteOutcome.PermanentFail)
{
_logger.LogWarning(
"Alarm SendEvent permanently failed ({Exception}); dead-lettering the event.",
exception.GetType().Name);
}
else
{
_logger.LogDebug(
"Alarm SendEvent transiently failed ({Exception}); will retry.",
exception.GetType().Name);
}
return outcome;
}
}
// A non-success ack that the gateway durably queued (store-forward) is still accepted — do not
// re-drain it. A non-success, non-queued ack is a soft failure: retry rather than dead-letter.
private static HistorianWriteOutcome MapAck(WriteAck ack) =>
ack.Success || ack.Queued ? HistorianWriteOutcome.Ack : HistorianWriteOutcome.RetryPlease;
private static HistorianWriteOutcome Classify(Exception exception) => exception switch
{
// Published client's typed hierarchy (production reality). Unavailable + both auth kinds retry.
HistorianGatewayUnavailableException => HistorianWriteOutcome.RetryPlease,
HistorianGatewayAuthenticationException => HistorianWriteOutcome.RetryPlease,
HistorianGatewayAuthorizationException => HistorianWriteOutcome.RetryPlease,
// A base client exception wrapping a permanent/transient RpcException → classify by inner status.
HistorianGatewayException { InnerException: RpcException inner } => ClassifyStatus(inner.StatusCode),
// Defensive raw RpcException path (the seam type signature permits it).
RpcException rpc => ClassifyStatus(rpc.StatusCode),
// Anything else (incl. a bare base client exception we cannot classify) → dead-letter to avoid
// an infinite drain loop on a poison event.
_ => HistorianWriteOutcome.PermanentFail,
};
private static HistorianWriteOutcome ClassifyStatus(StatusCode code) => code switch
{
StatusCode.Unavailable
or StatusCode.DeadlineExceeded
or StatusCode.ResourceExhausted
or StatusCode.Aborted
or StatusCode.Internal
// An auth fix re-enables the whole batch — never dead-letter on an auth blip.
or StatusCode.Unauthenticated
or StatusCode.PermissionDenied => HistorianWriteOutcome.RetryPlease,
StatusCode.InvalidArgument
or StatusCode.FailedPrecondition
or StatusCode.OutOfRange
or StatusCode.Unimplemented => HistorianWriteOutcome.PermanentFail,
// Unknown/unclassified gRPC code → dead-letter to avoid an infinite drain loop.
_ => HistorianWriteOutcome.PermanentFail,
};
}
@@ -0,0 +1,185 @@
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
using ZB.MOM.WW.HistorianGateway.Client;
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
public sealed class GatewayAlarmHistorianWriterTests
{
private static AlarmHistorianEvent Evt(string id) => new(
id, "Area/Pump", "N", "LimitAlarm",
AlarmSeverity.High, "Activated", "m", "u", null,
new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc));
private static GatewayAlarmHistorianWriter Writer(FakeHistorianGatewayClient fake) =>
new(fake, NullLogger<GatewayAlarmHistorianWriter>.Instance);
[Fact]
public async Task All_acked_when_SendEvent_succeeds()
{
var fake = new FakeHistorianGatewayClient { SendEventResult = new WriteAck { Success = true } };
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A"), Evt("B") }, TestContext.Current.CancellationToken);
Assert.All(outcomes, o => Assert.Equal(HistorianWriteOutcome.Ack, o));
// One SendEvent per event so a single poison event cannot fail the whole batch.
Assert.Equal(2, fake.SendEventCallCount);
Assert.Equal(2, outcomes.Count);
}
[Fact]
public async Task Queued_ack_is_treated_as_Ack()
{
// A store-forward-queued send is durably accepted by the gateway → do not re-drain.
var fake = new FakeHistorianGatewayClient { SendEventResult = new WriteAck { Success = false, Queued = true } };
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.Ack, outcomes[0]);
}
// ---- Typed published-client exception hierarchy (production reality) ------------------------
[Fact]
public async Task Typed_Unavailable_is_RetryPlease()
{
var fake = new FakeHistorianGatewayClient
{
SendEventThrows = new HistorianGatewayUnavailableException("down"),
};
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
}
[Fact]
public async Task Typed_Authentication_is_RetryPlease()
{
var fake = new FakeHistorianGatewayClient
{
SendEventThrows = new HistorianGatewayAuthenticationException("bad key"),
};
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
}
[Fact]
public async Task Typed_Authorization_is_RetryPlease()
{
var fake = new FakeHistorianGatewayClient
{
SendEventThrows = new HistorianGatewayAuthorizationException("no scope"),
};
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
}
[Fact]
public async Task Base_typed_exception_with_inner_permanent_status_is_PermanentFail()
{
// The published client maps a permanent gRPC code (InvalidArgument) onto the base
// HistorianGatewayException carrying the original RpcException as InnerException.
var inner = new RpcException(new Status(StatusCode.InvalidArgument, "malformed"));
var fake = new FakeHistorianGatewayClient
{
SendEventThrows = new HistorianGatewayException("malformed", inner),
};
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
}
[Fact]
public async Task Bare_base_typed_exception_is_PermanentFail()
{
// No classifiable inner status → default to PermanentFail to avoid infinite drain loops.
var fake = new FakeHistorianGatewayClient
{
SendEventThrows = new HistorianGatewayException("unclassifiable"),
};
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
}
// ---- Defensive raw RpcException path --------------------------------------------------------
[Fact]
public async Task Raw_Unavailable_is_RetryPlease()
{
var fake = new FakeHistorianGatewayClient
{
SendEventThrows = new RpcException(new Status(StatusCode.Unavailable, "down")),
};
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
}
[Theory]
[InlineData(StatusCode.DeadlineExceeded)]
[InlineData(StatusCode.ResourceExhausted)]
[InlineData(StatusCode.Aborted)]
[InlineData(StatusCode.Internal)]
[InlineData(StatusCode.Unauthenticated)]
[InlineData(StatusCode.PermissionDenied)]
public async Task Raw_transient_or_auth_status_is_RetryPlease(StatusCode code)
{
var fake = new FakeHistorianGatewayClient
{
SendEventThrows = new RpcException(new Status(code, "x")),
};
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
}
[Theory]
[InlineData(StatusCode.InvalidArgument)]
[InlineData(StatusCode.FailedPrecondition)]
[InlineData(StatusCode.OutOfRange)]
[InlineData(StatusCode.Unimplemented)]
public async Task Raw_permanent_status_is_PermanentFail(StatusCode code)
{
var fake = new FakeHistorianGatewayClient
{
SendEventThrows = new RpcException(new Status(code, "x")),
};
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
}
[Fact]
public async Task Unknown_exception_is_PermanentFail()
{
var fake = new FakeHistorianGatewayClient { SendEventThrows = new InvalidOperationException("boom") };
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
}
[Fact]
public async Task Empty_batch_returns_empty()
{
var outcomes = await Writer(new FakeHistorianGatewayClient())
.WriteBatchAsync(Array.Empty<AlarmHistorianEvent>(), TestContext.Current.CancellationToken);
Assert.Empty(outcomes);
}
}