using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MessagePack; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend; namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; /// /// Sidecar-side dispatcher. Each post-Hello frame routes by to /// the right historian operation and the result frame is written back through the same /// pipe. Per-call exceptions are caught and surfaced as Success=false, Error=... /// replies so a single bad request doesn't kill the connection. /// public sealed class HistorianFrameHandler : IFrameHandler { // WriteAlarmEventsReply.PerEventStatus byte semantics: 0=Ack, 1=Retry, 2=Permanent. private const byte StatusAck = 0; private const byte StatusRetry = 1; private const byte StatusPermanent = 2; private readonly IHistorianDataSource _historian; private readonly IAlarmEventWriter? _alarmWriter; private readonly ILogger _logger; /// Initializes a new instance of the HistorianFrameHandler class. /// The historian data source to query. /// The logger instance. /// Optional alarm event writer for writebacks. public HistorianFrameHandler( IHistorianDataSource historian, ILogger logger, IAlarmEventWriter? alarmWriter = null) { _historian = historian ?? throw new ArgumentNullException(nameof(historian)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _alarmWriter = alarmWriter; } /// Handles an incoming frame by dispatching to the appropriate historian operation. /// The frame message kind. /// The frame body bytes. /// The frame writer for sending responses. /// Cancellation token. public Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct) => kind switch { MessageKind.ReadRawRequest => HandleReadRawAsync(body, writer, ct), MessageKind.ReadProcessedRequest => HandleReadProcessedAsync(body, writer, ct), MessageKind.ReadAtTimeRequest => HandleReadAtTimeAsync(body, writer, ct), MessageKind.ReadEventsRequest => HandleReadEventsAsync(body, writer, ct), MessageKind.WriteAlarmEventsRequest => HandleWriteAlarmEventsAsync(body, writer, ct), _ => UnknownAsync(kind), }; private Task UnknownAsync(MessageKind kind) { _logger.Warning("Sidecar received unsupported frame kind {Kind}; dropping", kind); return Task.CompletedTask; } private async Task HandleReadRawAsync(byte[] body, FrameWriter writer, CancellationToken ct) { var req = MessagePackSerializer.Deserialize(body); var reply = new ReadRawReply { CorrelationId = req.CorrelationId }; try { var samples = await _historian.ReadRawAsync( req.TagName, new DateTime(req.StartUtcTicks, DateTimeKind.Utc), new DateTime(req.EndUtcTicks, DateTimeKind.Utc), req.MaxValues, ct).ConfigureAwait(false); reply.Success = true; reply.Samples = ToWire(samples); } catch (Exception ex) { _logger.Warning(ex, "Sidecar ReadRaw failed for {Tag}", req.TagName); reply.Success = false; reply.Error = ex.Message; } await writer.WriteAsync(MessageKind.ReadRawReply, reply, ct).ConfigureAwait(false); } private async Task HandleReadProcessedAsync(byte[] body, FrameWriter writer, CancellationToken ct) { var req = MessagePackSerializer.Deserialize(body); var reply = new ReadProcessedReply { CorrelationId = req.CorrelationId }; try { var buckets = await _historian.ReadAggregateAsync( req.TagName, new DateTime(req.StartUtcTicks, DateTimeKind.Utc), new DateTime(req.EndUtcTicks, DateTimeKind.Utc), req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false); reply.Success = true; reply.Buckets = ToWire(buckets); } catch (Exception ex) { _logger.Warning(ex, "Sidecar ReadProcessed failed for {Tag}", req.TagName); reply.Success = false; reply.Error = ex.Message; } await writer.WriteAsync(MessageKind.ReadProcessedReply, reply, ct).ConfigureAwait(false); } private async Task HandleReadAtTimeAsync(byte[] body, FrameWriter writer, CancellationToken ct) { var req = MessagePackSerializer.Deserialize(body); var reply = new ReadAtTimeReply { CorrelationId = req.CorrelationId }; try { var timestamps = new DateTime[req.TimestampsUtcTicks.Length]; for (var i = 0; i < timestamps.Length; i++) timestamps[i] = new DateTime(req.TimestampsUtcTicks[i], DateTimeKind.Utc); var samples = await _historian.ReadAtTimeAsync(req.TagName, timestamps, ct).ConfigureAwait(false); reply.Success = true; reply.Samples = ToWire(samples); } catch (Exception ex) { _logger.Warning(ex, "Sidecar ReadAtTime failed for {Tag}", req.TagName); reply.Success = false; reply.Error = ex.Message; } await writer.WriteAsync(MessageKind.ReadAtTimeReply, reply, ct).ConfigureAwait(false); } private async Task HandleReadEventsAsync(byte[] body, FrameWriter writer, CancellationToken ct) { var req = MessagePackSerializer.Deserialize(body); var reply = new ReadEventsReply { CorrelationId = req.CorrelationId }; try { var events = await _historian.ReadEventsAsync( req.SourceName, new DateTime(req.StartUtcTicks, DateTimeKind.Utc), new DateTime(req.EndUtcTicks, DateTimeKind.Utc), req.MaxEvents, ct).ConfigureAwait(false); reply.Success = true; reply.Events = ToWire(events); } catch (Exception ex) { _logger.Warning(ex, "Sidecar ReadEvents failed for source {Source}", req.SourceName); reply.Success = false; reply.Error = ex.Message; } await writer.WriteAsync(MessageKind.ReadEventsReply, reply, ct).ConfigureAwait(false); } private async Task HandleWriteAlarmEventsAsync(byte[] body, FrameWriter writer, CancellationToken ct) { var req = MessagePackSerializer.Deserialize(body); // MessagePack deserializes an absent or explicit-nil array as null, not Array.Empty. // Normalise here so every path below can safely dereference .Length without an NRE. req.Events ??= Array.Empty(); var reply = new WriteAlarmEventsReply { CorrelationId = req.CorrelationId }; if (_alarmWriter is null) { reply.Success = false; reply.Error = "Sidecar not configured with an alarm-event writer."; reply.PerEventOk = new bool[req.Events.Length]; reply.PerEventStatus = AllStatus(req.Events.Length, StatusRetry); await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false); return; } try { // Classify each event before touching the writer: structurally-malformed // (poison) events can never be persisted, so mark them Permanent and exclude // them from the writer batch. Only the well-formed remainder is handed to the // writer, whose bool[] result is mapped back onto the original indices. var status = new byte[req.Events.Length]; var writable = new List(req.Events.Length); var originalIndex = new List(req.Events.Length); for (var i = 0; i < req.Events.Length; i++) { if (IsStructurallyMalformed(req.Events[i])) { status[i] = StatusPermanent; } else { originalIndex.Add(i); writable.Add(req.Events[i]); } } // Aligned 1:1 to `writable`; empty when every event was poison (writer skipped). var perEvent = writable.Count == 0 ? Array.Empty() : await _alarmWriter.WriteAsync(writable.ToArray(), ct).ConfigureAwait(false); for (var i = 0; i < originalIndex.Count; i++) { var ok = i < perEvent.Length && perEvent[i]; status[originalIndex[i]] = ok ? StatusAck : StatusRetry; } reply.PerEventStatus = status; reply.PerEventOk = StatusToOk(status); reply.Success = true; // Whole-batch Success stays true even when some events failed — per-event // PerEventStatus slots carry the granular result (Ack / Retry / Permanent); // the SQLite drain worker acks 0, retries 1, and dead-letters 2. PerEventOk // is kept populated for rolling-deploy back-compat with an older client. } catch (Exception ex) { _logger.Warning(ex, "Sidecar WriteAlarmEvents failed"); reply.Success = false; reply.Error = ex.Message; reply.PerEventOk = new bool[req.Events.Length]; reply.PerEventStatus = AllStatus(req.Events.Length, StatusRetry); } await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false); } /// /// Classifies an alarm event as structurally malformed (poison): an event the historian /// event store can never persist regardless of retries. Such events are marked Permanent /// so the store-and-forward sink dead-letters them immediately instead of looping to the /// retry cap. A blank source name or alarm type, or a non-positive event timestamp, are /// the structural invariants the historian write requires. /// /// The candidate alarm event. /// true when the event is structurally malformed; otherwise false. internal static bool IsStructurallyMalformed(AlarmHistorianEventDto e) => e is null || string.IsNullOrWhiteSpace(e.SourceName) || string.IsNullOrWhiteSpace(e.AlarmType) || e.EventTimeUtcTicks <= 0; private static byte[] AllStatus(int length, byte value) { var status = new byte[length]; for (var i = 0; i < length; i++) status[i] = value; return status; } private static bool[] StatusToOk(byte[] status) { var ok = new bool[status.Length]; for (var i = 0; i < status.Length; i++) ok[i] = status[i] == StatusAck; return ok; } private static HistorianSampleDto[] ToWire(List samples) { var dtos = new HistorianSampleDto[samples.Count]; for (var i = 0; i < samples.Count; i++) { var s = samples[i]; dtos[i] = new HistorianSampleDto { ValueBytes = s.Value is null ? null : MessagePackSerializer.Serialize(s.Value), Quality = s.Quality, TimestampUtcTicks = s.TimestampUtc.Ticks, }; } return dtos; } private static HistorianAggregateSampleDto[] ToWire(List samples) { var dtos = new HistorianAggregateSampleDto[samples.Count]; for (var i = 0; i < samples.Count; i++) { dtos[i] = new HistorianAggregateSampleDto { Value = samples[i].Value, TimestampUtcTicks = samples[i].TimestampUtc.Ticks, }; } return dtos; } private static HistorianEventDto[] ToWire(List events) { var dtos = new HistorianEventDto[events.Count]; for (var i = 0; i < events.Count; i++) { var e = events[i]; dtos[i] = new HistorianEventDto { EventId = e.Id.ToString(), Source = e.Source, EventTimeUtcTicks = e.EventTime.Ticks, ReceivedTimeUtcTicks = e.ReceivedTime.Ticks, DisplayText = e.DisplayText, Severity = e.Severity, }; } return dtos; } } /// /// Strategy for persisting alarm events into the Wonderware Alarm & Events log. PR 3.W /// supplies a real implementation that drives the aahClient SDK; PR 3.3 ships the /// contract + a default null implementation so the sidecar can boot without one. /// public interface IAlarmEventWriter { /// /// Writes a batch of alarm events. Returns one boolean per input event indicating /// persisted vs. retry-please. The SQLite store-and-forward sink retries failed /// slots on the next drain tick. /// /// Alarm events to write. /// Cancellation token. Task WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken); }