using System; using System.Threading; using System.Threading.Tasks; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend { /// /// IPC-side implementation that delegates to an /// (production: aahClientManaged-bound) /// and maps the trinary down to the /// bool[] the IPC reply contract carries. Per-event outcomes: /// /// true (drop from sender's queue). /// false (sender retries on next drain tick). /// false (sender's B.4 widens the IPC bool back into the trinary outcome by inspecting structured diagnostics; this slot intentionally collapses to "not-ok" at the wire). /// /// public sealed class AahClientManagedAlarmEventWriter : IAlarmEventWriter { private static readonly ILogger Log = Serilog.Log.ForContext(); private readonly IAlarmHistorianWriteBackend _backend; /// /// Initializes a new instance of the AahClientManagedAlarmEventWriter class. /// /// The alarm historian write backend to delegate to. public AahClientManagedAlarmEventWriter(IAlarmHistorianWriteBackend backend) { _backend = backend ?? throw new ArgumentNullException(nameof(backend)); } /// /// Writes an array of alarm historian events asynchronously. /// /// The alarm events to write. /// Cancellation token. public async Task WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken) { if (events is null || events.Length == 0) { return new bool[0]; } AlarmHistorianWriteOutcome[] outcomes; try { outcomes = await _backend.WriteBatchAsync(events, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { throw; } catch (Exception ex) { // Backend-level failure (cluster unreachable, transport error). Treat the // whole batch as RetryPlease so the sender's queue holds the rows for // the next drain tick — preferable to dropping them on a transient. Log.Warning(ex, "Alarm historian backend WriteBatchAsync threw — marking entire {Count}-event batch RetryPlease.", events.Length); var fallback = new bool[events.Length]; return fallback; } if (outcomes.Length != events.Length) { // Backend contract violation — defensive degrade so a bug in the backend // doesn't desync the sender's queue accounting. Treat as RetryPlease. Log.Warning( "Alarm historian backend returned {ReturnedCount} outcomes for a batch of {InputCount} events; degrading to RetryPlease for the whole batch.", outcomes.Length, events.Length); return new bool[events.Length]; } var perEventOk = new bool[outcomes.Length]; for (var i = 0; i < outcomes.Length; i++) { perEventOk[i] = outcomes[i] == AlarmHistorianWriteOutcome.Ack; } return perEventOk; } /// /// Translate the outcome of a single SDK call (raw HRESULT + diagnostic) into the /// trinary . Exposed for the production /// to share the mapping with tests. /// /// The HRESULT code from the SDK call. /// Indicates whether the error is a communication-class error. /// Indicates whether the input was malformed. public static AlarmHistorianWriteOutcome MapOutcome(int hresult, bool isCommunicationError, bool isMalformedInput) { // Order matters: malformed input is permanent regardless of HRESULT pattern; // communication-class errors are transient regardless of which specific // HRESULT bit fired. if (isMalformedInput) { return AlarmHistorianWriteOutcome.PermanentFail; } if (hresult == 0) { return AlarmHistorianWriteOutcome.Ack; } if (isCommunicationError) { return AlarmHistorianWriteOutcome.RetryPlease; } // Default: unknown HRESULT failure — be conservative and let the sender retry. // The sender's drain worker has its own dead-letter cap so a permanently-broken // event won't loop forever. return AlarmHistorianWriteOutcome.RetryPlease; } } }