using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using ArchestrA; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend { /// /// Production backed by AVEVA Historian's /// aahClientManaged SDK. Each is written via /// HistorianAccess.AddStreamedValue(HistorianEvent, out HistorianAccessError) — /// the alarm-event write entry point pinned during PR C.1. /// /// /// /// The write path needs its own connection. The query-side /// opens ReadOnly sessions, and /// AddStreamedValue on a read-only session fails with /// WriteToReadOnlyFile. This backend therefore opens a dedicated /// ReadOnly = false connection; it shares /// for node selection and failover but /// not the connection object itself. /// /// /// Per-event HistorianAccessError.ErrorValue codes map onto /// via /// . A connection-class /// error aborts the remainder of the batch as /// and resets the connection so /// the next drain tick reconnects — possibly to a different cluster node. /// /// /// The exact HistorianEvent field set required by the Historian is confirmed /// against a live install during the PR D.1 rollout smoke; /// maps the unambiguous fields and carries operator comment / condition id as event /// properties. /// /// public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend, IDisposable { private static readonly ILogger Log = Serilog.Log.ForContext(); // ErrorValue codes that mean the connection/server is the problem (transient) rather // than the event payload. These abort the rest of the batch and trigger a reconnect. private static readonly HashSet ConnectionErrors = new HashSet { HistorianAccessError.ErrorValue.FailedToConnect, HistorianAccessError.ErrorValue.FailedToCreateSession, HistorianAccessError.ErrorValue.NoReply, HistorianAccessError.ErrorValue.NotReady, HistorianAccessError.ErrorValue.NotInitialized, HistorianAccessError.ErrorValue.Stopping, HistorianAccessError.ErrorValue.Win32Exception, HistorianAccessError.ErrorValue.InvalidResponse, // WriteToReadOnlyFile is a connection-configuration fault, not an event-payload // fault: the session was opened without ReadOnly = false (a misconfiguration or // a regression). The event itself is fine, so it must NOT be dead-lettered. // Classifying it here aborts the batch and resets the connection so the // reconnect path re-opens a writable (ReadOnly = false) session; the deferred // events drain on the next tick. See Driver.Historian.Wonderware-001. HistorianAccessError.ErrorValue.WriteToReadOnlyFile, }; // ErrorValue codes that mean the event itself is malformed — permanent, never retried. private static readonly HashSet MalformedErrors = new HashSet { HistorianAccessError.ErrorValue.InvalidArgument, HistorianAccessError.ErrorValue.ValidationFailed, HistorianAccessError.ErrorValue.NullPointerArgument, HistorianAccessError.ErrorValue.NotImplemented, HistorianAccessError.ErrorValue.NotApplicable, }; private readonly HistorianConfiguration _config; private readonly IHistorianConnectionFactory _factory; private readonly HistorianClusterEndpointPicker _picker; private readonly object _connectionLock = new object(); private HistorianAccess? _connection; private string? _activeNode; private bool _disposed; public SdkAlarmHistorianWriteBackend(HistorianConfiguration config) : this(config, new SdkHistorianConnectionFactory(), null) { } internal SdkAlarmHistorianWriteBackend( HistorianConfiguration config, IHistorianConnectionFactory factory, HistorianClusterEndpointPicker? picker = null) { _config = config ?? throw new ArgumentNullException(nameof(config)); _factory = factory ?? throw new ArgumentNullException(nameof(factory)); _picker = picker ?? new HistorianClusterEndpointPicker(config); } public Task WriteBatchAsync( AlarmHistorianEventDto[] events, CancellationToken cancellationToken) { if (events is null || events.Length == 0) { return Task.FromResult(new AlarmHistorianWriteOutcome[0]); } var outcomes = new AlarmHistorianWriteOutcome[events.Length]; HistorianAccess connection; try { connection = EnsureConnected(); } catch (ObjectDisposedException) { throw; } catch (Exception ex) { // No reachable node — defer the whole batch so the lmxopcua-side SQLite // store-and-forward sink retains the rows for the next drain tick. Log.Warning(ex, "Alarm historian write connection unavailable — deferring {Count} event(s) as RetryPlease", events.Length); FillRemaining(outcomes, 0, AlarmHistorianWriteOutcome.RetryPlease); return Task.FromResult(outcomes); } for (var i = 0; i < events.Length; i++) { cancellationToken.ThrowIfCancellationRequested(); try { var historianEvent = ToHistorianEvent(events[i]); if (connection.AddStreamedValue(historianEvent, out var error)) { outcomes[i] = AlarmHistorianWriteOutcome.Ack; continue; } var code = error?.ErrorCode ?? HistorianAccessError.ErrorValue.Failure; if (ConnectionErrors.Contains(code)) { // Connection died mid-batch — drop it and defer this event + the rest. Log.Warning( "Alarm historian write hit connection-level error {Code} ({Desc}); resetting connection, deferring {Remaining} event(s)", code, error?.ErrorDescription, events.Length - i); HandleConnectionError(error?.ErrorDescription); FillRemaining(outcomes, i, AlarmHistorianWriteOutcome.RetryPlease); return Task.FromResult(outcomes); } outcomes[i] = ClassifyOutcome(code); Log.Warning( "Alarm historian write rejected event {EventId}: {Code} ({Desc}) -> {Outcome}", events[i].EventId, code, error?.ErrorDescription, outcomes[i]); } catch (OperationCanceledException) { throw; } catch (Exception ex) { // Transport-level throw (SDK marshalling fault, broken connection) — // reset and defer this event + the rest. Log.Warning(ex, "Alarm historian write threw for event {EventId}; resetting connection, deferring {Remaining} event(s)", events[i].EventId, events.Length - i); HandleConnectionError(ex.Message); FillRemaining(outcomes, i, AlarmHistorianWriteOutcome.RetryPlease); return Task.FromResult(outcomes); } } return Task.FromResult(outcomes); } /// /// Maps an onto the SDK's /// HistorianEvent. Operator comment and originating condition id ride as /// event properties — operator-comment fidelity is the field the value-driven /// fallback path cannot carry. /// internal static HistorianEvent ToHistorianEvent(AlarmHistorianEventDto dto) { // The ArchestrA SDK marks these HistorianEvent members obsolete but still honours // them on write; their successors aren't wired in the version we bind against. // Using them is the documented v1 behaviour — mirrors HistorianDataSource.ToDto, // suppressed locally so any other deprecated-surface use still surfaces as an error. #pragma warning disable CS0618 var historianEvent = new HistorianEvent { IsAlarm = true, Source = dto.SourceName ?? string.Empty, EventType = string.IsNullOrEmpty(dto.AlarmType) ? "Alarm" : dto.AlarmType, EventTime = new DateTime(dto.EventTimeUtcTicks, DateTimeKind.Utc), ReceivedTime = DateTime.UtcNow, Severity = dto.Severity, DisplayText = dto.Message ?? string.Empty, }; if (Guid.TryParse(dto.EventId, out var id)) { historianEvent.Id = id; } else { // Driver.Historian.Wonderware-004: an unparseable / empty EventId previously // left Id as Guid.Empty, which made every such alarm collide on the same id // with no diagnostic. Synthesize a fresh Guid so each event still gets a // unique identifier (the historian still accepts the write — outcome stays // Ack — and the sender can correlate the synthesized id via the warning log). var synthesized = Guid.NewGuid(); Log.Warning( "Alarm historian event has non-parseable EventId {EventId} for source {Source}; synthesizing Id={SynthesizedId}", dto.EventId ?? "(null)", dto.SourceName ?? "(none)", synthesized); historianEvent.Id = synthesized; } #pragma warning restore CS0618 if (!string.IsNullOrEmpty(dto.AckComment)) { historianEvent.AddProperty("Comment", dto.AckComment, out _); } if (!string.IsNullOrEmpty(dto.ConditionId)) { historianEvent.AddProperty("ConditionId", dto.ConditionId, out _); } return historianEvent; } /// /// Classifies a non-connection-class HistorianAccessError.ErrorValue into an /// by routing it through the shared /// mapping. Exposed for /// unit tests — connection-class codes are handled separately by the batch loop. /// internal static AlarmHistorianWriteOutcome ClassifyOutcome(HistorianAccessError.ErrorValue code) => AahClientManagedAlarmEventWriter.MapOutcome( (int)code, isCommunicationError: ConnectionErrors.Contains(code), isMalformedInput: MalformedErrors.Contains(code)); private static void FillRemaining( AlarmHistorianWriteOutcome[] outcomes, int from, AlarmHistorianWriteOutcome value) { for (var i = from; i < outcomes.Length; i++) { outcomes[i] = value; } } private HistorianAccess EnsureConnected() { if (_disposed) { throw new ObjectDisposedException(nameof(SdkAlarmHistorianWriteBackend)); } var existing = Volatile.Read(ref _connection); if (existing != null) return existing; var (conn, node) = ConnectToAnyHealthyNode(); lock (_connectionLock) { if (_disposed) { SafeClose(conn); throw new ObjectDisposedException(nameof(SdkAlarmHistorianWriteBackend)); } if (_connection != null) { SafeClose(conn); return _connection; } _connection = conn; _activeNode = node; Log.Information("Alarm historian write connection opened to {Server}:{Port}", node, _config.Port); return conn; } } private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode() { var candidates = _picker.GetHealthyNodes(); if (candidates.Count == 0) { throw new InvalidOperationException( _picker.NodeCount == 0 ? "No historian nodes configured" : $"All {_picker.NodeCount} historian nodes are in cooldown — no healthy endpoints"); } Exception? lastException = null; foreach (var node in candidates) { try { var conn = _factory.CreateAndConnect( CloneConfigWithServerName(node), HistorianConnectionType.Event, readOnly: false); _picker.MarkHealthy(node); return (conn, node); } catch (Exception ex) { _picker.MarkFailed(node, ex.Message); lastException = ex; Log.Warning(ex, "Alarm historian node {Node} failed during write-connect; trying next", node); } } throw new InvalidOperationException( $"All {candidates.Count} healthy historian candidate(s) failed during write-connect: " + (lastException?.Message ?? "(no detail)"), lastException); } private void HandleConnectionError(string? detail) { lock (_connectionLock) { if (_connection == null) return; SafeClose(_connection); _connection = null; var failedNode = _activeNode; _activeNode = null; if (failedNode != null) _picker.MarkFailed(failedNode, detail ?? "mid-batch failure"); Log.Warning("Alarm historian write connection reset (node={Node})", failedNode ?? "(unknown)"); } } private static void SafeClose(HistorianAccess conn) { try { conn.CloseConnection(out _); conn.Dispose(); } catch (Exception ex) { Log.Debug(ex, "Error closing alarm historian write connection"); } } private HistorianConfiguration CloneConfigWithServerName(string serverName) => new HistorianConfiguration { Enabled = _config.Enabled, ServerName = serverName, ServerNames = _config.ServerNames, FailureCooldownSeconds = _config.FailureCooldownSeconds, IntegratedSecurity = _config.IntegratedSecurity, UserName = _config.UserName, Password = _config.Password, Port = _config.Port, CommandTimeoutSeconds = _config.CommandTimeoutSeconds, MaxValuesPerRead = _config.MaxValuesPerRead, RequestTimeoutSeconds = _config.RequestTimeoutSeconds, }; public void Dispose() { if (_disposed) return; _disposed = true; lock (_connectionLock) { if (_connection != null) { SafeClose(_connection); _connection = null; } } } } }