Merge pull request 'sidecar: AahClientManagedAlarmEventWriter implements IAlarmEventWriter (PR C.1)' (#410) from track-c1-aah-alarm-writer into master
This commit was merged in pull request #410.
This commit is contained in:
@@ -0,0 +1,105 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
{
|
||||
/// <summary>
|
||||
/// IPC-side <see cref="IAlarmEventWriter"/> implementation that delegates to an
|
||||
/// <see cref="IAlarmHistorianWriteBackend"/> (production: aahClientManaged-bound)
|
||||
/// and maps the trinary <see cref="AlarmHistorianWriteOutcome"/> down to the
|
||||
/// <c>bool[]</c> the IPC reply contract carries. Per-event outcomes:
|
||||
/// <list type="bullet">
|
||||
/// <item><description><see cref="AlarmHistorianWriteOutcome.Ack"/> → <c>true</c> (drop from sender's queue).</description></item>
|
||||
/// <item><description><see cref="AlarmHistorianWriteOutcome.RetryPlease"/> → <c>false</c> (sender retries on next drain tick).</description></item>
|
||||
/// <item><description><see cref="AlarmHistorianWriteOutcome.PermanentFail"/> → <c>false</c> (sender's B.4 widens the IPC bool back into the trinary outcome by inspecting structured diagnostics; this slot intentionally collapses to "not-ok" at the wire).</description></item>
|
||||
/// </list>
|
||||
/// </summary>
|
||||
public sealed class AahClientManagedAlarmEventWriter : IAlarmEventWriter
|
||||
{
|
||||
private static readonly ILogger Log = Serilog.Log.ForContext<AahClientManagedAlarmEventWriter>();
|
||||
|
||||
private readonly IAlarmHistorianWriteBackend _backend;
|
||||
|
||||
public AahClientManagedAlarmEventWriter(IAlarmHistorianWriteBackend backend)
|
||||
{
|
||||
_backend = backend ?? throw new ArgumentNullException(nameof(backend));
|
||||
}
|
||||
|
||||
public async Task<bool[]> WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken)
|
||||
{
|
||||
if (events is null || events.Length == 0)
|
||||
{
|
||||
return new bool[0];
|
||||
}
|
||||
|
||||
AlarmHistorianWriteOutcome[] outcomes;
|
||||
try
|
||||
{
|
||||
outcomes = await _backend.WriteBatchAsync(events, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Backend-level failure (cluster unreachable, transport error). Treat the
|
||||
// whole batch as RetryPlease so the sender's queue holds the rows for
|
||||
// the next drain tick — preferable to dropping them on a transient.
|
||||
Log.Warning(ex,
|
||||
"Alarm historian backend WriteBatchAsync threw — marking entire {Count}-event batch RetryPlease.",
|
||||
events.Length);
|
||||
var fallback = new bool[events.Length];
|
||||
return fallback;
|
||||
}
|
||||
|
||||
if (outcomes.Length != events.Length)
|
||||
{
|
||||
// Backend contract violation — defensive degrade so a bug in the backend
|
||||
// doesn't desync the sender's queue accounting. Treat as RetryPlease.
|
||||
Log.Warning(
|
||||
"Alarm historian backend returned {ReturnedCount} outcomes for a batch of {InputCount} events; degrading to RetryPlease for the whole batch.",
|
||||
outcomes.Length, events.Length);
|
||||
return new bool[events.Length];
|
||||
}
|
||||
|
||||
var perEventOk = new bool[outcomes.Length];
|
||||
for (var i = 0; i < outcomes.Length; i++)
|
||||
{
|
||||
perEventOk[i] = outcomes[i] == AlarmHistorianWriteOutcome.Ack;
|
||||
}
|
||||
return perEventOk;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Translate the outcome of a single SDK call (raw HRESULT + diagnostic) into the
|
||||
/// trinary <see cref="AlarmHistorianWriteOutcome"/>. Exposed for the production
|
||||
/// <see cref="SdkAlarmHistorianWriteBackend"/> to share the mapping with tests.
|
||||
/// </summary>
|
||||
public static AlarmHistorianWriteOutcome MapOutcome(int hresult, bool isCommunicationError, bool isMalformedInput)
|
||||
{
|
||||
// Order matters: malformed input is permanent regardless of HRESULT pattern;
|
||||
// communication-class errors are transient regardless of which specific
|
||||
// HRESULT bit fired.
|
||||
if (isMalformedInput)
|
||||
{
|
||||
return AlarmHistorianWriteOutcome.PermanentFail;
|
||||
}
|
||||
if (hresult == 0)
|
||||
{
|
||||
return AlarmHistorianWriteOutcome.Ack;
|
||||
}
|
||||
if (isCommunicationError)
|
||||
{
|
||||
return AlarmHistorianWriteOutcome.RetryPlease;
|
||||
}
|
||||
// Default: unknown HRESULT failure — be conservative and let the sender retry.
|
||||
// The sender's drain worker has its own dead-letter cap so a permanently-broken
|
||||
// event won't loop forever.
|
||||
return AlarmHistorianWriteOutcome.RetryPlease;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
{
|
||||
/// <summary>
|
||||
/// Per-event outcome from <see cref="IAlarmHistorianWriteBackend.WriteBatchAsync"/>.
|
||||
/// Sidecar-local twin of <c>Core.AlarmHistorian.HistorianWriteOutcome</c> (the
|
||||
/// sidecar runs net48 and cannot reference the net10 Core project; the IPC
|
||||
/// contract narrows this to <c>bool</c> per slot, so the lmxopcua-side consumer
|
||||
/// widens that back into the trinary outcome at the IPC boundary in PR B.4).
|
||||
/// </summary>
|
||||
public enum AlarmHistorianWriteOutcome
|
||||
{
|
||||
/// <summary>Event accepted by the historian. Drop from the store-and-forward queue.</summary>
|
||||
Ack,
|
||||
/// <summary>Transient failure (server busy, disconnected, timeout). Leave queued; retry on next drain tick.</summary>
|
||||
RetryPlease,
|
||||
/// <summary>Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter on the lmxopcua side.</summary>
|
||||
PermanentFail,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
{
|
||||
/// <summary>
|
||||
/// The actual aahClientManaged-bound writer. Extracted so unit tests can
|
||||
/// substitute a fake without touching the SDK; the production
|
||||
/// implementation lives in <see cref="SdkAlarmHistorianWriteBackend"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Implementations are responsible for connection management + cluster
|
||||
/// failover. The wrapping <see cref="AahClientManagedAlarmEventWriter"/>
|
||||
/// handles batch-level orchestration but delegates the per-event SDK call
|
||||
/// here so the unit tests can drive every documented MxStatus outcome
|
||||
/// without an installed AVEVA Historian.
|
||||
/// </remarks>
|
||||
public interface IAlarmHistorianWriteBackend
|
||||
{
|
||||
/// <summary>
|
||||
/// Persist the supplied events to the historian. Returns one outcome per
|
||||
/// input slot in the same order — must always return an array of the same
|
||||
/// length as <paramref name="events"/>.
|
||||
/// </summary>
|
||||
Task<AlarmHistorianWriteOutcome[]> WriteBatchAsync(
|
||||
AlarmHistorianEventDto[] events,
|
||||
CancellationToken cancellationToken);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
{
|
||||
/// <summary>
|
||||
/// Production <see cref="IAlarmHistorianWriteBackend"/> backed by AVEVA Historian's
|
||||
/// <c>aahClientManaged</c> alarm-event write API. The exact SDK entry point is
|
||||
/// pinned during the live-rig smoke in PR D.1 — until that gate, this backend
|
||||
/// reports <see cref="AlarmHistorianWriteOutcome.RetryPlease"/> for every
|
||||
/// event with a structured diagnostic so the lmxopcua-side
|
||||
/// <c>SqliteStoreAndForwardSink</c> retains the queued events rather than dropping
|
||||
/// or hard-failing them.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Cluster failover reuses <see cref="HistorianClusterEndpointPicker"/> via
|
||||
/// the shared <see cref="HistorianDataSource"/> connection pool — there is
|
||||
/// no second connection pool for writes. Wonderware Historian's alarm-event
|
||||
/// write surface accepts the same <c>HistorianAccess</c> session a read
|
||||
/// opens, so reusing the picker is parity-preserving with v1's
|
||||
/// <c>GalaxyHistorianWriter</c>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Once D.1 confirms the SDK entry point, this class swaps the placeholder
|
||||
/// body for the real call sequence. The mapping from raw HRESULT /
|
||||
/// <c>HistorianError</c> codes onto <see cref="AlarmHistorianWriteOutcome"/>
|
||||
/// is already shared via <see cref="AahClientManagedAlarmEventWriter.MapOutcome"/>
|
||||
/// so the smoke-pinned change stays minimal.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend
|
||||
{
|
||||
private static readonly ILogger Log = Serilog.Log.ForContext<SdkAlarmHistorianWriteBackend>();
|
||||
|
||||
private readonly HistorianConfiguration _config;
|
||||
|
||||
public SdkAlarmHistorianWriteBackend(HistorianConfiguration config)
|
||||
{
|
||||
_config = config ?? throw new ArgumentNullException(nameof(config));
|
||||
}
|
||||
|
||||
public Task<AlarmHistorianWriteOutcome[]> WriteBatchAsync(
|
||||
AlarmHistorianEventDto[] events,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (events is null || events.Length == 0)
|
||||
{
|
||||
return Task.FromResult(new AlarmHistorianWriteOutcome[0]);
|
||||
}
|
||||
|
||||
// Placeholder: pin the SDK entry point in PR D.1 against a live AVEVA
|
||||
// Historian. Until then the call returns RetryPlease for every slot so
|
||||
// the lmxopcua-side sink keeps the events queued rather than dropping
|
||||
// them — same effect as the current NullAlarmHistorianSink fallback,
|
||||
// but visible through the structured diagnostic + per-event outcome.
|
||||
Log.Warning(
|
||||
"Alarm historian SDK write path not yet pinned — returning RetryPlease for {Count} event(s) from server {Server}. PR D.1 swaps this for the live aahClientManaged call.",
|
||||
events.Length,
|
||||
_config.ServerName);
|
||||
|
||||
var outcomes = new AlarmHistorianWriteOutcome[events.Length];
|
||||
for (var i = 0; i < outcomes.Length; i++)
|
||||
{
|
||||
outcomes[i] = AlarmHistorianWriteOutcome.RetryPlease;
|
||||
}
|
||||
return Task.FromResult(outcomes);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,159 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||
{
|
||||
/// <summary>
|
||||
/// PR C.1 — pins the trinary outcome → IPC bool[] mapping that the sidecar uses
|
||||
/// on the WriteAlarmEvents reply. Per-event outcomes:
|
||||
/// Ack → true, RetryPlease → false, PermanentFail → false.
|
||||
/// The sender's B.4 widens the IPC bool back into the trinary outcome at the
|
||||
/// IPC boundary using structured diagnostics; the wire intentionally collapses
|
||||
/// to "ok / not-ok".
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class AahClientManagedAlarmEventWriterTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Empty_batch_returns_empty_array_without_invoking_backend()
|
||||
{
|
||||
var backend = new RecordingBackend(_ => throw new InvalidOperationException("must not invoke for empty input"));
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
|
||||
var result = await writer.WriteAsync(Array.Empty<AlarmHistorianEventDto>(), CancellationToken.None);
|
||||
|
||||
result.ShouldBeEmpty();
|
||||
backend.Calls.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Single_ack_outcome_maps_to_true()
|
||||
{
|
||||
var backend = new RecordingBackend(events => events.Select(_ => AlarmHistorianWriteOutcome.Ack).ToArray());
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
|
||||
var result = await writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None);
|
||||
|
||||
result.ShouldBe(new[] { true });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Mixed_batch_preserves_per_slot_ordering()
|
||||
{
|
||||
// Ack / Retry / Permanent / Ack — the sender uses positional matching against
|
||||
// its queue, so every slot must hit the exact bool corresponding to its input.
|
||||
var backend = new RecordingBackend(_ => new[]
|
||||
{
|
||||
AlarmHistorianWriteOutcome.Ack,
|
||||
AlarmHistorianWriteOutcome.RetryPlease,
|
||||
AlarmHistorianWriteOutcome.PermanentFail,
|
||||
AlarmHistorianWriteOutcome.Ack,
|
||||
});
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
|
||||
var result = await writer.WriteAsync(
|
||||
new[] { Event("E1"), Event("E2"), Event("E3"), Event("E4") },
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe(new[] { true, false, false, true });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Backend_exception_marks_whole_batch_RetryPlease()
|
||||
{
|
||||
var backend = new RecordingBackend(_ => throw new InvalidOperationException("cluster unreachable"));
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
|
||||
var result = await writer.WriteAsync(
|
||||
new[] { Event("E1"), Event("E2"), Event("E3") },
|
||||
CancellationToken.None);
|
||||
|
||||
// Whole batch must end up as "not ok" (RetryPlease at the trinary layer) —
|
||||
// dropping a transiently-failed batch corrupts the sender's queue.
|
||||
result.ShouldBe(new[] { false, false, false });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cancellation_propagates_from_backend()
|
||||
{
|
||||
var backend = new RecordingBackend(_ => throw new OperationCanceledException());
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
|
||||
var ex = await Should.ThrowAsync<OperationCanceledException>(() =>
|
||||
writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None));
|
||||
ex.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Backend_returning_wrong_count_degrades_to_RetryPlease()
|
||||
{
|
||||
// Backend returns more outcomes than inputs — defensive degrade rather than
|
||||
// letting a backend bug desync the sender's queue accounting.
|
||||
var backend = new RecordingBackend(_ => new[]
|
||||
{
|
||||
AlarmHistorianWriteOutcome.Ack,
|
||||
AlarmHistorianWriteOutcome.Ack,
|
||||
});
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
|
||||
var result = await writer.WriteAsync(new[] { Event("E1") }, CancellationToken.None);
|
||||
|
||||
result.ShouldBe(new[] { false });
|
||||
}
|
||||
|
||||
[Theory]
|
||||
// hresult 0 + clean → Ack
|
||||
[InlineData(0, false, false, AlarmHistorianWriteOutcome.Ack)]
|
||||
// hresult 0 but malformed → PermanentFail (malformed wins)
|
||||
[InlineData(0, false, true, AlarmHistorianWriteOutcome.PermanentFail)]
|
||||
// non-zero hresult + comm error → RetryPlease
|
||||
[InlineData(unchecked((int)0x80131500), true, false, AlarmHistorianWriteOutcome.RetryPlease)]
|
||||
// non-zero hresult, no comm flag, no malformed → conservative RetryPlease
|
||||
[InlineData(unchecked((int)0x80131500), false, false, AlarmHistorianWriteOutcome.RetryPlease)]
|
||||
// any malformed input → PermanentFail regardless of hresult
|
||||
[InlineData(unchecked((int)0x80131500), true, true, AlarmHistorianWriteOutcome.PermanentFail)]
|
||||
public void MapOutcome_table(int hresult, bool isCommunicationError, bool isMalformedInput, AlarmHistorianWriteOutcome expected)
|
||||
{
|
||||
AahClientManagedAlarmEventWriter
|
||||
.MapOutcome(hresult, isCommunicationError, isMalformedInput)
|
||||
.ShouldBe(expected);
|
||||
}
|
||||
|
||||
private static AlarmHistorianEventDto Event(string id) => new AlarmHistorianEventDto
|
||||
{
|
||||
EventId = id,
|
||||
SourceName = "Tank01",
|
||||
ConditionId = "Tank01.Level.HiHi",
|
||||
AlarmType = "AnalogLimitAlarm.HiHi",
|
||||
Message = "Tank 01 high-high level",
|
||||
Severity = 750,
|
||||
EventTimeUtcTicks = DateTime.UtcNow.Ticks,
|
||||
AckComment = null,
|
||||
};
|
||||
|
||||
private sealed class RecordingBackend : IAlarmHistorianWriteBackend
|
||||
{
|
||||
private readonly Func<AlarmHistorianEventDto[], AlarmHistorianWriteOutcome[]> _produce;
|
||||
public int Calls { get; private set; }
|
||||
|
||||
public RecordingBackend(Func<AlarmHistorianEventDto[], AlarmHistorianWriteOutcome[]> produce)
|
||||
{
|
||||
_produce = produce;
|
||||
}
|
||||
|
||||
public Task<AlarmHistorianWriteOutcome[]> WriteBatchAsync(
|
||||
AlarmHistorianEventDto[] events, CancellationToken cancellationToken)
|
||||
{
|
||||
Calls++;
|
||||
return Task.FromResult(_produce(events));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user