feat(historian): emit PermanentFail for poison alarm events via additive PerEventStatus sidecar IPC field
This commit is contained in:
@@ -263,4 +263,8 @@ public sealed class WriteAlarmEventsReply
|
||||
|
||||
/// <summary>Per-event success flag, parallel to <see cref="WriteAlarmEventsRequest.Events"/>.</summary>
|
||||
[Key(3)] public bool[] PerEventOk { get; set; } = Array.Empty<bool>();
|
||||
|
||||
/// <summary>Per-event status parallel to the request's Events: 0=Ack, 1=Retry, 2=Permanent.
|
||||
/// Empty ⇒ an older sidecar that only sent <see cref="PerEventOk"/>; the client falls back to it.</summary>
|
||||
[Key(4)] public byte[] PerEventStatus { get; set; } = Array.Empty<byte>();
|
||||
}
|
||||
|
||||
+72
-4
@@ -16,6 +16,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
/// </summary>
|
||||
public sealed class HistorianFrameHandler : IFrameHandler
|
||||
{
|
||||
// WriteAlarmEventsReply.PerEventStatus byte semantics: 0=Ack, 1=Retry, 2=Permanent.
|
||||
private const byte StatusAck = 0;
|
||||
private const byte StatusRetry = 1;
|
||||
private const byte StatusPermanent = 2;
|
||||
|
||||
private readonly IHistorianDataSource _historian;
|
||||
private readonly IAlarmEventWriter? _alarmWriter;
|
||||
private readonly ILogger _logger;
|
||||
@@ -174,18 +179,51 @@ public sealed class HistorianFrameHandler : IFrameHandler
|
||||
reply.Success = false;
|
||||
reply.Error = "Sidecar not configured with an alarm-event writer.";
|
||||
reply.PerEventOk = new bool[req.Events.Length];
|
||||
reply.PerEventStatus = AllStatus(req.Events.Length, StatusRetry);
|
||||
await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var perEvent = await _alarmWriter.WriteAsync(req.Events, ct).ConfigureAwait(false);
|
||||
reply.PerEventOk = perEvent;
|
||||
// Classify each event before touching the writer: structurally-malformed
|
||||
// (poison) events can never be persisted, so mark them Permanent and exclude
|
||||
// them from the writer batch. Only the well-formed remainder is handed to the
|
||||
// writer, whose bool[] result is mapped back onto the original indices.
|
||||
var status = new byte[req.Events.Length];
|
||||
var writable = new List<AlarmHistorianEventDto>(req.Events.Length);
|
||||
var originalIndex = new List<int>(req.Events.Length);
|
||||
for (var i = 0; i < req.Events.Length; i++)
|
||||
{
|
||||
if (IsStructurallyMalformed(req.Events[i]))
|
||||
{
|
||||
status[i] = StatusPermanent;
|
||||
}
|
||||
else
|
||||
{
|
||||
originalIndex.Add(i);
|
||||
writable.Add(req.Events[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Aligned 1:1 to `writable`; empty when every event was poison (writer skipped).
|
||||
var perEvent = writable.Count == 0
|
||||
? Array.Empty<bool>()
|
||||
: await _alarmWriter.WriteAsync(writable.ToArray(), ct).ConfigureAwait(false);
|
||||
|
||||
for (var i = 0; i < originalIndex.Count; i++)
|
||||
{
|
||||
var ok = i < perEvent.Length && perEvent[i];
|
||||
status[originalIndex[i]] = ok ? StatusAck : StatusRetry;
|
||||
}
|
||||
|
||||
reply.PerEventStatus = status;
|
||||
reply.PerEventOk = StatusToOk(status);
|
||||
reply.Success = true;
|
||||
// Whole-batch Success stays true even when some events failed — per-event
|
||||
// PerEventOk slots carry the granular result; the SQLite drain worker treats
|
||||
// false slots as retry-please candidates.
|
||||
// PerEventStatus slots carry the granular result (Ack / Retry / Permanent);
|
||||
// the SQLite drain worker acks 0, retries 1, and dead-letters 2. PerEventOk
|
||||
// is kept populated for rolling-deploy back-compat with an older client.
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -193,11 +231,41 @@ public sealed class HistorianFrameHandler : IFrameHandler
|
||||
reply.Success = false;
|
||||
reply.Error = ex.Message;
|
||||
reply.PerEventOk = new bool[req.Events.Length];
|
||||
reply.PerEventStatus = AllStatus(req.Events.Length, StatusRetry);
|
||||
}
|
||||
|
||||
await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Classifies an alarm event as structurally malformed (poison): an event the historian
|
||||
/// event store can never persist regardless of retries. Such events are marked Permanent
|
||||
/// so the store-and-forward sink dead-letters them immediately instead of looping to the
|
||||
/// retry cap. A blank source name or alarm type, or a non-positive event timestamp, are
|
||||
/// the structural invariants the historian write requires.
|
||||
/// </summary>
|
||||
/// <param name="e">The candidate alarm event.</param>
|
||||
/// <returns><c>true</c> when the event is structurally malformed; otherwise <c>false</c>.</returns>
|
||||
internal static bool IsStructurallyMalformed(AlarmHistorianEventDto e) =>
|
||||
e is null
|
||||
|| string.IsNullOrWhiteSpace(e.SourceName)
|
||||
|| string.IsNullOrWhiteSpace(e.AlarmType)
|
||||
|| e.EventTimeUtcTicks <= 0;
|
||||
|
||||
private static byte[] AllStatus(int length, byte value)
|
||||
{
|
||||
var status = new byte[length];
|
||||
for (var i = 0; i < length; i++) status[i] = value;
|
||||
return status;
|
||||
}
|
||||
|
||||
private static bool[] StatusToOk(byte[] status)
|
||||
{
|
||||
var ok = new bool[status.Length];
|
||||
for (var i = 0; i < status.Length; i++) ok[i] = status[i] == StatusAck;
|
||||
return ok;
|
||||
}
|
||||
|
||||
private static HistorianSampleDto[] ToWire(List<HistorianSample> samples)
|
||||
{
|
||||
var dtos = new HistorianSampleDto[samples.Count];
|
||||
|
||||
Reference in New Issue
Block a user