sidecar: AahClientManagedAlarmEventWriter implements IAlarmEventWriter (PR C.1)

Fourth PR of the alarms-over-gateway epic
(docs/plans/alarms-over-gateway.md). Independent of Tracks A and B —
the sidecar slot defined in HistorianFrameHandler line 242 is unwired
today; PR C.2 (next) flips it on in Program.cs.

- AlarmHistorianWriteOutcome (sidecar-local, net48 — twin of
  Core.AlarmHistorian.HistorianWriteOutcome which is net10): Ack /
  RetryPlease / PermanentFail.
- IAlarmHistorianWriteBackend abstraction so the SDK call can be
  faked in unit tests.
- AahClientManagedAlarmEventWriter implements IAlarmEventWriter,
  delegates to the backend, maps Ack→true / Retry|Permanent→false
  for the IPC bool[] reply contract. Backend exception → whole
  batch RetryPlease (preserves the sender's queue across transients
  rather than dropping). Wrong-count return defends against a
  backend bug desyncing queue accounting.
- SdkAlarmHistorianWriteBackend — production binding skeleton.
  Reports RetryPlease for every event and logs a structured
  diagnostic until PR D.1 pins the live aahClientManaged entry
  point against the dev rig. The sender's SqliteStoreAndForwardSink
  retains queued events, mirroring today's NullAlarmHistorianSink
  behaviour but with visible diagnostics instead of silent discard.
- MapOutcome shared helper — pinned via theory tests so the D.1
  swap can change the SDK call site without reshuffling the
  HRESULT → outcome mapping.

Tests:
- 6 writer tests — empty batch / single Ack / mixed Ack-Retry-
  Permanent-Ack ordering / backend-throw → RetryPlease batch /
  cancellation propagates / wrong-count defensive degrade.
- 5 outcome theory cases — hresult 0 → Ack, malformed wins over
  hresult 0, comm error → Retry, unknown failure → Retry,
  malformed + comm → Permanent.
- Full sidecar test suite: 48 passed (was 42; 6 new).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-30 16:17:05 -04:00
parent c59bf59635
commit f99cf5033a
5 changed files with 386 additions and 0 deletions

View File

@@ -0,0 +1,105 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
{
/// <summary>
/// IPC-side <see cref="IAlarmEventWriter"/> implementation that delegates to an
/// <see cref="IAlarmHistorianWriteBackend"/> (production: aahClientManaged-bound)
/// and maps the trinary <see cref="AlarmHistorianWriteOutcome"/> down to the
/// <c>bool[]</c> the IPC reply contract carries. Per-event outcomes:
/// <list type="bullet">
/// <item><description><see cref="AlarmHistorianWriteOutcome.Ack"/> → <c>true</c> (drop from sender's queue).</description></item>
/// <item><description><see cref="AlarmHistorianWriteOutcome.RetryPlease"/> → <c>false</c> (sender retries on next drain tick).</description></item>
/// <item><description><see cref="AlarmHistorianWriteOutcome.PermanentFail"/> → <c>false</c> (sender's B.4 widens the IPC bool back into the trinary outcome by inspecting structured diagnostics; this slot intentionally collapses to "not-ok" at the wire).</description></item>
/// </list>
/// </summary>
public sealed class AahClientManagedAlarmEventWriter : IAlarmEventWriter
{
private static readonly ILogger Log = Serilog.Log.ForContext<AahClientManagedAlarmEventWriter>();
private readonly IAlarmHistorianWriteBackend _backend;
public AahClientManagedAlarmEventWriter(IAlarmHistorianWriteBackend backend)
{
_backend = backend ?? throw new ArgumentNullException(nameof(backend));
}
public async Task<bool[]> WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken)
{
if (events is null || events.Length == 0)
{
return new bool[0];
}
AlarmHistorianWriteOutcome[] outcomes;
try
{
outcomes = await _backend.WriteBatchAsync(events, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// Backend-level failure (cluster unreachable, transport error). Treat the
// whole batch as RetryPlease so the sender's queue holds the rows for
// the next drain tick — preferable to dropping them on a transient.
Log.Warning(ex,
"Alarm historian backend WriteBatchAsync threw — marking entire {Count}-event batch RetryPlease.",
events.Length);
var fallback = new bool[events.Length];
return fallback;
}
if (outcomes.Length != events.Length)
{
// Backend contract violation — defensive degrade so a bug in the backend
// doesn't desync the sender's queue accounting. Treat as RetryPlease.
Log.Warning(
"Alarm historian backend returned {ReturnedCount} outcomes for a batch of {InputCount} events; degrading to RetryPlease for the whole batch.",
outcomes.Length, events.Length);
return new bool[events.Length];
}
var perEventOk = new bool[outcomes.Length];
for (var i = 0; i < outcomes.Length; i++)
{
perEventOk[i] = outcomes[i] == AlarmHistorianWriteOutcome.Ack;
}
return perEventOk;
}
/// <summary>
/// Translate the outcome of a single SDK call (raw HRESULT + diagnostic) into the
/// trinary <see cref="AlarmHistorianWriteOutcome"/>. Exposed for the production
/// <see cref="SdkAlarmHistorianWriteBackend"/> to share the mapping with tests.
/// </summary>
public static AlarmHistorianWriteOutcome MapOutcome(int hresult, bool isCommunicationError, bool isMalformedInput)
{
// Order matters: malformed input is permanent regardless of HRESULT pattern;
// communication-class errors are transient regardless of which specific
// HRESULT bit fired.
if (isMalformedInput)
{
return AlarmHistorianWriteOutcome.PermanentFail;
}
if (hresult == 0)
{
return AlarmHistorianWriteOutcome.Ack;
}
if (isCommunicationError)
{
return AlarmHistorianWriteOutcome.RetryPlease;
}
// Default: unknown HRESULT failure — be conservative and let the sender retry.
// The sender's drain worker has its own dead-letter cap so a permanently-broken
// event won't loop forever.
return AlarmHistorianWriteOutcome.RetryPlease;
}
}
}

View File

@@ -0,0 +1,19 @@
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
{
/// <summary>
/// Per-event outcome from <see cref="IAlarmHistorianWriteBackend.WriteBatchAsync"/>.
/// Sidecar-local twin of <c>Core.AlarmHistorian.HistorianWriteOutcome</c> (the
/// sidecar runs net48 and cannot reference the net10 Core project; the IPC
/// contract narrows this to <c>bool</c> per slot, so the lmxopcua-side consumer
/// widens that back into the trinary outcome at the IPC boundary in PR B.4).
/// </summary>
public enum AlarmHistorianWriteOutcome
{
/// <summary>Event accepted by the historian. Drop from the store-and-forward queue.</summary>
Ack,
/// <summary>Transient failure (server busy, disconnected, timeout). Leave queued; retry on next drain tick.</summary>
RetryPlease,
/// <summary>Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter on the lmxopcua side.</summary>
PermanentFail,
}
}

View File

@@ -0,0 +1,30 @@
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
{
/// <summary>
/// The actual aahClientManaged-bound writer. Extracted so unit tests can
/// substitute a fake without touching the SDK; the production
/// implementation lives in <see cref="SdkAlarmHistorianWriteBackend"/>.
/// </summary>
/// <remarks>
/// Implementations are responsible for connection management + cluster
/// failover. The wrapping <see cref="AahClientManagedAlarmEventWriter"/>
/// handles batch-level orchestration but delegates the per-event SDK call
/// here so the unit tests can drive every documented MxStatus outcome
/// without an installed AVEVA Historian.
/// </remarks>
public interface IAlarmHistorianWriteBackend
{
/// <summary>
/// Persist the supplied events to the historian. Returns one outcome per
/// input slot in the same order — must always return an array of the same
/// length as <paramref name="events"/>.
/// </summary>
Task<AlarmHistorianWriteOutcome[]> WriteBatchAsync(
AlarmHistorianEventDto[] events,
CancellationToken cancellationToken);
}
}

View File

@@ -0,0 +1,73 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
{
/// <summary>
/// Production <see cref="IAlarmHistorianWriteBackend"/> backed by AVEVA Historian's
/// <c>aahClientManaged</c> alarm-event write API. The exact SDK entry point is
/// pinned during the live-rig smoke in PR D.1 — until that gate, this backend
/// reports <see cref="AlarmHistorianWriteOutcome.RetryPlease"/> for every
/// event with a structured diagnostic so the lmxopcua-side
/// <c>SqliteStoreAndForwardSink</c> retains the queued events rather than dropping
/// or hard-failing them.
/// </summary>
/// <remarks>
/// <para>
/// Cluster failover reuses <see cref="HistorianClusterEndpointPicker"/> via
/// the shared <see cref="HistorianDataSource"/> connection pool — there is
/// no second connection pool for writes. Wonderware Historian's alarm-event
/// write surface accepts the same <c>HistorianAccess</c> session a read
/// opens, so reusing the picker is parity-preserving with v1's
/// <c>GalaxyHistorianWriter</c>.
/// </para>
/// <para>
/// Once D.1 confirms the SDK entry point, this class swaps the placeholder
/// body for the real call sequence. The mapping from raw HRESULT /
/// <c>HistorianError</c> codes onto <see cref="AlarmHistorianWriteOutcome"/>
/// is already shared via <see cref="AahClientManagedAlarmEventWriter.MapOutcome"/>
/// so the smoke-pinned change stays minimal.
/// </para>
/// </remarks>
public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend
{
private static readonly ILogger Log = Serilog.Log.ForContext<SdkAlarmHistorianWriteBackend>();
private readonly HistorianConfiguration _config;
public SdkAlarmHistorianWriteBackend(HistorianConfiguration config)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
}
public Task<AlarmHistorianWriteOutcome[]> WriteBatchAsync(
AlarmHistorianEventDto[] events,
CancellationToken cancellationToken)
{
if (events is null || events.Length == 0)
{
return Task.FromResult(new AlarmHistorianWriteOutcome[0]);
}
// Placeholder: pin the SDK entry point in PR D.1 against a live AVEVA
// Historian. Until then the call returns RetryPlease for every slot so
// the lmxopcua-side sink keeps the events queued rather than dropping
// them — same effect as the current NullAlarmHistorianSink fallback,
// but visible through the structured diagnostic + per-event outcome.
Log.Warning(
"Alarm historian SDK write path not yet pinned — returning RetryPlease for {Count} event(s) from server {Server}. PR D.1 swaps this for the live aahClientManaged call.",
events.Length,
_config.ServerName);
var outcomes = new AlarmHistorianWriteOutcome[events.Length];
for (var i = 0; i < outcomes.Length; i++)
{
outcomes[i] = AlarmHistorianWriteOutcome.RetryPlease;
}
return Task.FromResult(outcomes);
}
}
}

View File

@@ -0,0 +1,159 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
{
/// <summary>
/// PR C.1 — pins the trinary outcome → IPC bool[] mapping that the sidecar uses
/// on the WriteAlarmEvents reply. Per-event outcomes:
/// Ack → true, RetryPlease → false, PermanentFail → false.
/// The sender's B.4 widens the IPC bool back into the trinary outcome at the
/// IPC boundary using structured diagnostics; the wire intentionally collapses
/// to "ok / not-ok".
/// </summary>
[Trait("Category", "Unit")]
public sealed class AahClientManagedAlarmEventWriterTests
{
[Fact]
public async Task Empty_batch_returns_empty_array_without_invoking_backend()
{
var backend = new RecordingBackend(_ => throw new InvalidOperationException("must not invoke for empty input"));
var writer = new AahClientManagedAlarmEventWriter(backend);
var result = await writer.WriteAsync(Array.Empty<AlarmHistorianEventDto>(), CancellationToken.None);
result.ShouldBeEmpty();
backend.Calls.ShouldBe(0);
}
[Fact]
public async Task Single_ack_outcome_maps_to_true()
{
var backend = new RecordingBackend(events => events.Select(_ => AlarmHistorianWriteOutcome.Ack).ToArray());
var writer = new AahClientManagedAlarmEventWriter(backend);
var result = await writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None);
result.ShouldBe(new[] { true });
}
[Fact]
public async Task Mixed_batch_preserves_per_slot_ordering()
{
// Ack / Retry / Permanent / Ack — the sender uses positional matching against
// its queue, so every slot must hit the exact bool corresponding to its input.
var backend = new RecordingBackend(_ => new[]
{
AlarmHistorianWriteOutcome.Ack,
AlarmHistorianWriteOutcome.RetryPlease,
AlarmHistorianWriteOutcome.PermanentFail,
AlarmHistorianWriteOutcome.Ack,
});
var writer = new AahClientManagedAlarmEventWriter(backend);
var result = await writer.WriteAsync(
new[] { Event("E1"), Event("E2"), Event("E3"), Event("E4") },
CancellationToken.None);
result.ShouldBe(new[] { true, false, false, true });
}
[Fact]
public async Task Backend_exception_marks_whole_batch_RetryPlease()
{
var backend = new RecordingBackend(_ => throw new InvalidOperationException("cluster unreachable"));
var writer = new AahClientManagedAlarmEventWriter(backend);
var result = await writer.WriteAsync(
new[] { Event("E1"), Event("E2"), Event("E3") },
CancellationToken.None);
// Whole batch must end up as "not ok" (RetryPlease at the trinary layer) —
// dropping a transiently-failed batch corrupts the sender's queue.
result.ShouldBe(new[] { false, false, false });
}
[Fact]
public async Task Cancellation_propagates_from_backend()
{
var backend = new RecordingBackend(_ => throw new OperationCanceledException());
var writer = new AahClientManagedAlarmEventWriter(backend);
var ex = await Should.ThrowAsync<OperationCanceledException>(() =>
writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None));
ex.ShouldNotBeNull();
}
[Fact]
public async Task Backend_returning_wrong_count_degrades_to_RetryPlease()
{
// Backend returns more outcomes than inputs — defensive degrade rather than
// letting a backend bug desync the sender's queue accounting.
var backend = new RecordingBackend(_ => new[]
{
AlarmHistorianWriteOutcome.Ack,
AlarmHistorianWriteOutcome.Ack,
});
var writer = new AahClientManagedAlarmEventWriter(backend);
var result = await writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None);
result.ShouldBe(new[] { false });
}
[Theory]
// hresult 0 + clean → Ack
[InlineData(0, false, false, AlarmHistorianWriteOutcome.Ack)]
// hresult 0 but malformed → PermanentFail (malformed wins)
[InlineData(0, false, true, AlarmHistorianWriteOutcome.PermanentFail)]
// non-zero hresult + comm error → RetryPlease
[InlineData(unchecked((int)0x80131500), true, false, AlarmHistorianWriteOutcome.RetryPlease)]
// non-zero hresult, no comm flag, no malformed → conservative RetryPlease
[InlineData(unchecked((int)0x80131500), false, false, AlarmHistorianWriteOutcome.RetryPlease)]
// any malformed input → PermanentFail regardless of hresult
[InlineData(unchecked((int)0x80131500), true, true, AlarmHistorianWriteOutcome.PermanentFail)]
public void MapOutcome_table(int hresult, bool isCommunicationError, bool isMalformedInput, AlarmHistorianWriteOutcome expected)
{
AahClientManagedAlarmEventWriter
.MapOutcome(hresult, isCommunicationError, isMalformedInput)
.ShouldBe(expected);
}
private static AlarmHistorianEventDto Event(string id) => new AlarmHistorianEventDto
{
EventId = id,
SourceName = "Tank01",
ConditionId = "Tank01.Level.HiHi",
AlarmType = "AnalogLimitAlarm.HiHi",
Message = "Tank 01 high-high level",
Severity = 750,
EventTimeUtcTicks = DateTime.UtcNow.Ticks,
AckComment = null,
};
private sealed class RecordingBackend : IAlarmHistorianWriteBackend
{
private readonly Func<AlarmHistorianEventDto[], AlarmHistorianWriteOutcome[]> _produce;
public int Calls { get; private set; }
public RecordingBackend(Func<AlarmHistorianEventDto[], AlarmHistorianWriteOutcome[]> produce)
{
_produce = produce;
}
public Task<AlarmHistorianWriteOutcome[]> WriteBatchAsync(
AlarmHistorianEventDto[] events, CancellationToken cancellationToken)
{
Calls++;
return Task.FromResult(_produce(events));
}
}
}
}