diff --git a/docs/plans/alarms-worker-wiring-plan.md b/docs/plans/alarms-worker-wiring-plan.md index de0632a..b5188d9 100644 --- a/docs/plans/alarms-worker-wiring-plan.md +++ b/docs/plans/alarms-worker-wiring-plan.md @@ -212,36 +212,40 @@ x64, which is not bitness-constrained like the worker). C.1 is independently unblockable from A.2 if the goal is to wire up the scripted-alarm historian path. -**Current state**: +**Current state (DONE — code)**: -`SdkAlarmHistorianWriteBackend` in `src\MxGateway.Worker\MxAccess\` is a -placeholder returning `RetryPlease`. The lmxopcua sidecar's `WriteAlarmEvents` -IPC slot is defined in `Ipc\Contracts.cs` but `Program.cs` constructs -`HistorianFrameHandler` without an `alarmWriter` (line 57 per the alarms plan). -The `IAlarmEventWriter` interface exists; only the production implementation -and the consumer wiring are missing. +C.1 shipped. `SdkAlarmHistorianWriteBackend.WriteBatchAsync` writes through the +real SDK entry point — **`HistorianAccess.AddStreamedValue(HistorianEvent, out +HistorianAccessError)`** in `aahClientManaged` — pinned 2026-05-18 by +decompiling the installed SDK. `Program.cs` and `Install-Services.ps1` were +already wired in the PR C.1 scaffolding. Two corrections to the assumptions +this doc was written under: -**What it needs**: +- **There is no `ArchestrAAlarmsAndEvents.SDK` writer.** That assembly + (`ArchestrAAlarmsAndEvents.SDK.Common.dll`, the only one installed) is a WCF + query-proxy base — no `AlarmHistorianWriter` type. The write path is the + `aahClientManaged` `HistorianAccess` surface. +- **The write path needs its own connection.** The query-side + `HistorianDataSource` opens `ReadOnly` sessions; `AddStreamedValue` on a + read-only session fails with `WriteToReadOnlyFile`. + `SdkAlarmHistorianWriteBackend` opens a dedicated `ReadOnly=false` connection + and shares only `HistorianClusterEndpointPicker` (not the connection object). -1. New `AahClientManagedAlarmEventWriter.cs` implementing `IAlarmEventWriter` - (defined in `Ipc\HistorianFrameHandler.cs`). Calls `aahClientManaged`'s - alarm-event write API — same path v1's `GalaxyHistorianWriter` used. - Uses `HistorianClusterEndpointPicker` for multi-node routing. - Maps `MxStatus` write outcomes to `HistorianWriteOutcome` enum - (Ack / PermanentFail / RetryPlease). +**What it needed** (all done): -2. `Program.cs` — build `AahClientManagedAlarmEventWriter` next to the - existing `BuildHistorian()` call; pass it to `HistorianFrameHandler`. - Gate behind `OTOPCUA_HISTORIAN_ALARM_WRITE_ENABLED` env var (default `true` - when `OTOPCUA_HISTORIAN_ENABLED=true`). +1. `SdkAlarmHistorianWriteBackend` builds a `HistorianEvent` per + `AlarmHistorianEventDto`, calls `AddStreamedValue`, and maps + `HistorianAccessError.ErrorValue` codes through + `AahClientManagedAlarmEventWriter.MapOutcome` (Ack / PermanentFail / + RetryPlease). `HistorianClusterEndpointPicker` drives multi-node failover. +2. `Program.cs` — `BuildAlarmWriter()` constructs the backend gated behind + `OTOPCUA_HISTORIAN_ALARM_WRITE_ENABLED`. +3. `Install-Services.ps1` — env var present in the install-time block. -3. `Install-Services.ps1` — add the new env var to the install-time block. - -**What blocks C.1**: access to the `aahClientManaged` SDK on the dev box -(confirmed available per `project_aveva_platform_installed.md` — AVEVA -Historian SDK is present). C.1 can proceed without A.2 since the sidecar's -`aahClientManaged` is x64 and does not share the worker's x86 bitness -constraint. +**What remains for C.1**: only the live-rig write smoke — the `Live_*` tests +in `SdkAlarmHistorianWriteBackendTests` stay `Skip`-gated until D.1 confirms a +round-trip against a real AVEVA Historian, including the exact mandatory +`HistorianEvent` field set. **Tests to write**: diff --git a/looseends.md b/looseends.md index 3967280..277cd4f 100644 --- a/looseends.md +++ b/looseends.md @@ -16,11 +16,14 @@ remains, plus follow-ups surfaced during the run. ## Follow-ups surfaced during the run -- **C.1 live SDK binding.** `SdkAlarmHistorianWriteBackend.WriteBatchAsync` - (`src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/`) is - still a placeholder returning `RetryPlease` for every event, so queued - alarm events are retained rather than written. Pinning the real - `aahClientManaged` alarm-write entry point is rig-gated — pairs with #20. +- **~~C.1 live SDK binding.~~** DONE (code). `SdkAlarmHistorianWriteBackend` + (`src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/`) now + writes via the real entry point `HistorianAccess.AddStreamedValue(HistorianEvent, + out error)` in `aahClientManaged`. Two plan corrections found while pinning it: + (a) `ArchestrAAlarmsAndEvents.SDK` has no writer — it's a WCF query proxy; + (b) writes need their own `ReadOnly=false` connection, not the shared read + pool. Remaining: the live-rig write smoke (the `Live_*` tests are still + `Skip`-gated) — folds into #20 / D.1. - **~~#24 Shelve-method routing.~~** DONE. Acknowledge / Confirm already routed; OneShotShelve / TimedShelve / Unshelve now route via the native diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IHistorianConnectionFactory.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IHistorianConnectionFactory.cs index 4893ff6..feb4722 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IHistorianConnectionFactory.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/IHistorianConnectionFactory.cs @@ -10,32 +10,24 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend /// internal interface IHistorianConnectionFactory { - HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type); + /// + /// Opens a Historian SDK connection. defaults to + /// true for the query path; the alarm-event write backend passes + /// false because HistorianAccess.AddStreamedValue fails with + /// WriteToReadOnlyFile on a read-only session. + /// + HistorianAccess CreateAndConnect( + HistorianConfiguration config, HistorianConnectionType type, bool readOnly = true); } /// Production implementation — opens real Historian SDK connections. internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory { - public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type) + public HistorianAccess CreateAndConnect( + HistorianConfiguration config, HistorianConnectionType type, bool readOnly = true) { var conn = new HistorianAccess(); - - var args = new HistorianConnectionArgs - { - ServerName = config.ServerName, - TcpPort = (ushort)config.Port, - IntegratedSecurity = config.IntegratedSecurity, - UseArchestrAUser = config.IntegratedSecurity, - ConnectionType = type, - ReadOnly = true, - PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000) - }; - - if (!config.IntegratedSecurity) - { - args.UserName = config.UserName ?? string.Empty; - args.Password = config.Password ?? string.Empty; - } + var args = BuildConnectionArgs(config, type, readOnly); if (!conn.OpenConnection(args, out var error)) { @@ -69,5 +61,32 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend throw new TimeoutException( $"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s"); } + + /// + /// Builds the for a connection. Pure (no SDK + /// side effects) so the read-only-vs-write argument shaping is unit-testable. + /// + internal static HistorianConnectionArgs BuildConnectionArgs( + HistorianConfiguration config, HistorianConnectionType type, bool readOnly) + { + var args = new HistorianConnectionArgs + { + ServerName = config.ServerName, + TcpPort = (ushort)config.Port, + IntegratedSecurity = config.IntegratedSecurity, + UseArchestrAUser = config.IntegratedSecurity, + ConnectionType = type, + ReadOnly = readOnly, + PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000) + }; + + if (!config.IntegratedSecurity) + { + args.UserName = config.UserName ?? string.Empty; + args.Password = config.Password ?? string.Empty; + } + + return args; + } } } diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs index c9e4cfb..60b6184 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/SdkAlarmHistorianWriteBackend.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using ArchestrA; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; @@ -8,39 +10,85 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend { /// /// Production backed by AVEVA Historian's - /// aahClientManaged alarm-event write API. The exact SDK entry point is - /// pinned during the live-rig smoke in PR D.1 — until that gate, this backend - /// reports for every - /// event with a structured diagnostic so the lmxopcua-side - /// SqliteStoreAndForwardSink retains the queued events rather than dropping - /// or hard-failing them. + /// aahClientManaged SDK. Each is written via + /// HistorianAccess.AddStreamedValue(HistorianEvent, out HistorianAccessError) — + /// the alarm-event write entry point pinned during PR C.1. /// /// /// - /// Cluster failover reuses via - /// the shared connection pool — there is - /// no second connection pool for writes. Wonderware Historian's alarm-event - /// write surface accepts the same HistorianAccess session a read - /// opens, so reusing the picker is parity-preserving with v1's - /// GalaxyHistorianWriter. + /// The write path needs its own connection. The query-side + /// opens ReadOnly sessions, and + /// AddStreamedValue on a read-only session fails with + /// WriteToReadOnlyFile. This backend therefore opens a dedicated + /// ReadOnly = false connection; it shares + /// for node selection and failover but + /// not the connection object itself. /// /// - /// Once D.1 confirms the SDK entry point, this class swaps the placeholder - /// body for the real call sequence. The mapping from raw HRESULT / - /// HistorianError codes onto - /// is already shared via - /// so the smoke-pinned change stays minimal. + /// Per-event HistorianAccessError.ErrorValue codes map onto + /// via + /// . A connection-class + /// error aborts the remainder of the batch as + /// and resets the connection so + /// the next drain tick reconnects — possibly to a different cluster node. + /// + /// + /// The exact HistorianEvent field set required by the Historian is confirmed + /// against a live install during the PR D.1 rollout smoke; + /// maps the unambiguous fields and carries operator comment / condition id as event + /// properties. /// /// - public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend + public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend, IDisposable { private static readonly ILogger Log = Serilog.Log.ForContext(); + // ErrorValue codes that mean the connection/server is the problem (transient) rather + // than the event payload. These abort the rest of the batch and trigger a reconnect. + private static readonly HashSet ConnectionErrors = + new HashSet + { + HistorianAccessError.ErrorValue.FailedToConnect, + HistorianAccessError.ErrorValue.FailedToCreateSession, + HistorianAccessError.ErrorValue.NoReply, + HistorianAccessError.ErrorValue.NotReady, + HistorianAccessError.ErrorValue.NotInitialized, + HistorianAccessError.ErrorValue.Stopping, + HistorianAccessError.ErrorValue.Win32Exception, + HistorianAccessError.ErrorValue.InvalidResponse, + }; + + // ErrorValue codes that mean the event itself is malformed — permanent, never retried. + private static readonly HashSet MalformedErrors = + new HashSet + { + HistorianAccessError.ErrorValue.InvalidArgument, + HistorianAccessError.ErrorValue.ValidationFailed, + HistorianAccessError.ErrorValue.NullPointerArgument, + HistorianAccessError.ErrorValue.WriteToReadOnlyFile, + HistorianAccessError.ErrorValue.NotImplemented, + HistorianAccessError.ErrorValue.NotApplicable, + }; + private readonly HistorianConfiguration _config; + private readonly IHistorianConnectionFactory _factory; + private readonly HistorianClusterEndpointPicker _picker; + private readonly object _connectionLock = new object(); + private HistorianAccess? _connection; + private string? _activeNode; + private bool _disposed; public SdkAlarmHistorianWriteBackend(HistorianConfiguration config) + : this(config, new SdkHistorianConnectionFactory(), null) { } + + internal SdkAlarmHistorianWriteBackend( + HistorianConfiguration config, + IHistorianConnectionFactory factory, + HistorianClusterEndpointPicker? picker = null) { _config = config ?? throw new ArgumentNullException(nameof(config)); + _factory = factory ?? throw new ArgumentNullException(nameof(factory)); + _picker = picker ?? new HistorianClusterEndpointPicker(config); } public Task WriteBatchAsync( @@ -52,22 +100,265 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend return Task.FromResult(new AlarmHistorianWriteOutcome[0]); } - // Placeholder: pin the SDK entry point in PR D.1 against a live AVEVA - // Historian. Until then the call returns RetryPlease for every slot so - // the lmxopcua-side sink keeps the events queued rather than dropping - // them — same effect as the current NullAlarmHistorianSink fallback, - // but visible through the structured diagnostic + per-event outcome. - Log.Warning( - "Alarm historian SDK write path not yet pinned — returning RetryPlease for {Count} event(s) from server {Server}. PR D.1 swaps this for the live aahClientManaged call.", - events.Length, - _config.ServerName); - var outcomes = new AlarmHistorianWriteOutcome[events.Length]; - for (var i = 0; i < outcomes.Length; i++) + + HistorianAccess connection; + try { - outcomes[i] = AlarmHistorianWriteOutcome.RetryPlease; + connection = EnsureConnected(); } + catch (ObjectDisposedException) + { + throw; + } + catch (Exception ex) + { + // No reachable node — defer the whole batch so the lmxopcua-side SQLite + // store-and-forward sink retains the rows for the next drain tick. + Log.Warning(ex, + "Alarm historian write connection unavailable — deferring {Count} event(s) as RetryPlease", + events.Length); + FillRemaining(outcomes, 0, AlarmHistorianWriteOutcome.RetryPlease); + return Task.FromResult(outcomes); + } + + for (var i = 0; i < events.Length; i++) + { + cancellationToken.ThrowIfCancellationRequested(); + try + { + var historianEvent = ToHistorianEvent(events[i]); + if (connection.AddStreamedValue(historianEvent, out var error)) + { + outcomes[i] = AlarmHistorianWriteOutcome.Ack; + continue; + } + + var code = error?.ErrorCode ?? HistorianAccessError.ErrorValue.Failure; + if (ConnectionErrors.Contains(code)) + { + // Connection died mid-batch — drop it and defer this event + the rest. + Log.Warning( + "Alarm historian write hit connection-level error {Code} ({Desc}); resetting connection, deferring {Remaining} event(s)", + code, error?.ErrorDescription, events.Length - i); + HandleConnectionError(error?.ErrorDescription); + FillRemaining(outcomes, i, AlarmHistorianWriteOutcome.RetryPlease); + return Task.FromResult(outcomes); + } + + outcomes[i] = ClassifyOutcome(code); + Log.Warning( + "Alarm historian write rejected event {EventId}: {Code} ({Desc}) -> {Outcome}", + events[i].EventId, code, error?.ErrorDescription, outcomes[i]); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + // Transport-level throw (SDK marshalling fault, broken connection) — + // reset and defer this event + the rest. + Log.Warning(ex, + "Alarm historian write threw for event {EventId}; resetting connection, deferring {Remaining} event(s)", + events[i].EventId, events.Length - i); + HandleConnectionError(ex.Message); + FillRemaining(outcomes, i, AlarmHistorianWriteOutcome.RetryPlease); + return Task.FromResult(outcomes); + } + } + return Task.FromResult(outcomes); } + + /// + /// Maps an onto the SDK's + /// HistorianEvent. Operator comment and originating condition id ride as + /// event properties — operator-comment fidelity is the field the value-driven + /// fallback path cannot carry. + /// + internal static HistorianEvent ToHistorianEvent(AlarmHistorianEventDto dto) + { + // The ArchestrA SDK marks these HistorianEvent members obsolete but still honours + // them on write; their successors aren't wired in the version we bind against. + // Using them is the documented v1 behaviour — mirrors HistorianDataSource.ToDto, + // suppressed locally so any other deprecated-surface use still surfaces as an error. +#pragma warning disable CS0618 + var historianEvent = new HistorianEvent + { + IsAlarm = true, + Source = dto.SourceName ?? string.Empty, + EventType = string.IsNullOrEmpty(dto.AlarmType) ? "Alarm" : dto.AlarmType, + EventTime = new DateTime(dto.EventTimeUtcTicks, DateTimeKind.Utc), + ReceivedTime = DateTime.UtcNow, + Severity = dto.Severity, + DisplayText = dto.Message ?? string.Empty, + }; + + if (Guid.TryParse(dto.EventId, out var id)) + { + historianEvent.Id = id; + } +#pragma warning restore CS0618 + + if (!string.IsNullOrEmpty(dto.AckComment)) + { + historianEvent.AddProperty("Comment", dto.AckComment, out _); + } + if (!string.IsNullOrEmpty(dto.ConditionId)) + { + historianEvent.AddProperty("ConditionId", dto.ConditionId, out _); + } + + return historianEvent; + } + + /// + /// Classifies a non-connection-class HistorianAccessError.ErrorValue into an + /// by routing it through the shared + /// mapping. Exposed for + /// unit tests — connection-class codes are handled separately by the batch loop. + /// + internal static AlarmHistorianWriteOutcome ClassifyOutcome(HistorianAccessError.ErrorValue code) + => AahClientManagedAlarmEventWriter.MapOutcome( + (int)code, + isCommunicationError: ConnectionErrors.Contains(code), + isMalformedInput: MalformedErrors.Contains(code)); + + private static void FillRemaining( + AlarmHistorianWriteOutcome[] outcomes, int from, AlarmHistorianWriteOutcome value) + { + for (var i = from; i < outcomes.Length; i++) + { + outcomes[i] = value; + } + } + + private HistorianAccess EnsureConnected() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(SdkAlarmHistorianWriteBackend)); + } + + var existing = Volatile.Read(ref _connection); + if (existing != null) return existing; + + var (conn, node) = ConnectToAnyHealthyNode(); + + lock (_connectionLock) + { + if (_disposed) + { + SafeClose(conn); + throw new ObjectDisposedException(nameof(SdkAlarmHistorianWriteBackend)); + } + + if (_connection != null) + { + SafeClose(conn); + return _connection; + } + + _connection = conn; + _activeNode = node; + Log.Information("Alarm historian write connection opened to {Server}:{Port}", node, _config.Port); + return conn; + } + } + + private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode() + { + var candidates = _picker.GetHealthyNodes(); + if (candidates.Count == 0) + { + throw new InvalidOperationException( + _picker.NodeCount == 0 + ? "No historian nodes configured" + : $"All {_picker.NodeCount} historian nodes are in cooldown — no healthy endpoints"); + } + + Exception? lastException = null; + foreach (var node in candidates) + { + try + { + var conn = _factory.CreateAndConnect( + CloneConfigWithServerName(node), HistorianConnectionType.Event, readOnly: false); + _picker.MarkHealthy(node); + return (conn, node); + } + catch (Exception ex) + { + _picker.MarkFailed(node, ex.Message); + lastException = ex; + Log.Warning(ex, "Alarm historian node {Node} failed during write-connect; trying next", node); + } + } + + throw new InvalidOperationException( + $"All {candidates.Count} healthy historian candidate(s) failed during write-connect: " + + (lastException?.Message ?? "(no detail)"), + lastException); + } + + private void HandleConnectionError(string? detail) + { + lock (_connectionLock) + { + if (_connection == null) return; + + SafeClose(_connection); + _connection = null; + + var failedNode = _activeNode; + _activeNode = null; + if (failedNode != null) _picker.MarkFailed(failedNode, detail ?? "mid-batch failure"); + Log.Warning("Alarm historian write connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + private static void SafeClose(HistorianAccess conn) + { + try + { + conn.CloseConnection(out _); + conn.Dispose(); + } + catch (Exception ex) + { + Log.Debug(ex, "Error closing alarm historian write connection"); + } + } + + private HistorianConfiguration CloneConfigWithServerName(string serverName) => new HistorianConfiguration + { + Enabled = _config.Enabled, + ServerName = serverName, + ServerNames = _config.ServerNames, + FailureCooldownSeconds = _config.FailureCooldownSeconds, + IntegratedSecurity = _config.IntegratedSecurity, + UserName = _config.UserName, + Password = _config.Password, + Port = _config.Port, + CommandTimeoutSeconds = _config.CommandTimeoutSeconds, + MaxValuesPerRead = _config.MaxValuesPerRead, + RequestTimeoutSeconds = _config.RequestTimeoutSeconds, + }; + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + lock (_connectionLock) + { + if (_connection != null) + { + SafeClose(_connection); + _connection = null; + } + } + } } } diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/SdkAlarmHistorianWriteBackendTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/SdkAlarmHistorianWriteBackendTests.cs index 7701546..79b56fa 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/SdkAlarmHistorianWriteBackendTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Backend/SdkAlarmHistorianWriteBackendTests.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using ArchestrA; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend; @@ -11,42 +12,42 @@ using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests { /// - /// PR C.1 — pins the contract: + /// PR C.1 — covers , the aahClientManaged-bound + /// alarm-event writer. The SDK-touching batch loop itself is exercised by the rig-gated + /// Live_* tests (D.1); the unit tests below pin the parts that are SDK-type-free: /// - /// - /// Unit: the placeholder backend returns - /// for every slot so the lmxopcua-side store-and-forward sink retains events rather than - /// dropping them while D.1 is unresolved. - /// - /// - /// Integration (rig-gated): once D.1 pins the live SDK entry point the Skip attribute is - /// removed. The live test writes a synthetic batch to a real AVEVA Historian and asserts - /// the cluster picker rotates from a broken primary to a healthy secondary. - /// + /// connection-unavailable → whole batch deferred as RetryPlease; + /// error-code mapping; + /// read-only-vs-write shaping. /// /// [Trait("Category", "Unit")] public sealed class SdkAlarmHistorianWriteBackendTests { - // ── Placeholder-mode tests (no rig required) ───────────────────────── + // ── Connection-unavailable path (deterministic, no SDK load) ────────── [Fact] - public async Task Placeholder_returns_RetryPlease_for_every_slot_so_queue_is_preserved() + public async Task Empty_batch_returns_empty_array() { - // The SDK call-site in SdkAlarmHistorianWriteBackend is not yet pinned (PR D.1). - // Until D.1 swaps in the live call, the backend must return RetryPlease for every - // event so the lmxopcua-side SqliteStoreAndForwardSink retains the rows instead of - // dropping them — same effect as the NullAlarmHistorianSink fallback, but each - // slot is individually addressable for the drain worker. - var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true }; - var backend = new SdkAlarmHistorianWriteBackend(cfg); + var backend = new SdkAlarmHistorianWriteBackend( + Config("any"), new ThrowingConnectionFactory()); - var events = new[] - { - AlarmEvent("E1"), - AlarmEvent("E2"), - AlarmEvent("E3"), - }; + var outcomes = await backend.WriteBatchAsync( + Array.Empty(), CancellationToken.None); + + outcomes.ShouldBeEmpty(); + } + + [Fact] + public async Task Unreachable_node_defers_whole_batch_as_RetryPlease() + { + // No node can be connected — the backend must defer every event so the + // lmxopcua-side SQLite store-and-forward sink retains the rows rather than + // dropping them. + var backend = new SdkAlarmHistorianWriteBackend( + Config("unreachable"), new ThrowingConnectionFactory()); + + var events = new[] { AlarmEvent("E1"), AlarmEvent("E2"), AlarmEvent("E3") }; var outcomes = await backend.WriteBatchAsync(events, CancellationToken.None); outcomes.Length.ShouldBe(events.Length); @@ -54,23 +55,12 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests } [Fact] - public async Task Placeholder_returns_empty_array_for_empty_batch() + public async Task Unreachable_node_large_batch_returns_one_outcome_per_event() { - var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true }; - var backend = new SdkAlarmHistorianWriteBackend(cfg); - - var outcomes = await backend.WriteBatchAsync(Array.Empty(), CancellationToken.None); - - outcomes.ShouldBeEmpty(); - } - - [Fact] - public async Task Placeholder_returns_same_count_as_input_for_large_batch() - { - // Guards against an off-by-one error in the placeholder array allocation — - // WriteBatchAsync must always return exactly as many outcomes as input events. - var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true }; - var backend = new SdkAlarmHistorianWriteBackend(cfg); + // Guards the outcome-array allocation: WriteBatchAsync must always return exactly + // as many outcomes as input events, even on the whole-batch-deferred path. + var backend = new SdkAlarmHistorianWriteBackend( + Config("unreachable"), new ThrowingConnectionFactory()); var batch = Enumerable.Range(0, 1000).Select(i => AlarmEvent($"E{i}")).ToArray(); var outcomes = await backend.WriteBatchAsync(batch, CancellationToken.None); @@ -79,22 +69,91 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests outcomes.ShouldAllBe(o => o == AlarmHistorianWriteOutcome.RetryPlease); } + [Fact] + public async Task Connect_failure_marks_node_failed_in_picker() + { + // Every connect attempt throws → the picker should record the failure so the + // node enters cooldown (cluster-failover plumbing). + var cfg = Config("node-a"); + var picker = new HistorianClusterEndpointPicker(cfg); + var backend = new SdkAlarmHistorianWriteBackend(cfg, new ThrowingConnectionFactory(), picker); + + await backend.WriteBatchAsync(new[] { AlarmEvent("E1") }, CancellationToken.None); + + picker.HealthyNodeCount.ShouldBe(0, "the only node failed to connect and is now in cooldown"); + } + + // ── ClassifyOutcome — error-code → outcome mapping ──────────────────── + + [Theory] + [InlineData(HistorianAccessError.ErrorValue.Success, AlarmHistorianWriteOutcome.Ack)] + [InlineData(HistorianAccessError.ErrorValue.FailedToConnect, AlarmHistorianWriteOutcome.RetryPlease)] + [InlineData(HistorianAccessError.ErrorValue.FailedToCreateSession, AlarmHistorianWriteOutcome.RetryPlease)] + [InlineData(HistorianAccessError.ErrorValue.NoReply, AlarmHistorianWriteOutcome.RetryPlease)] + [InlineData(HistorianAccessError.ErrorValue.NotReady, AlarmHistorianWriteOutcome.RetryPlease)] + [InlineData(HistorianAccessError.ErrorValue.Failure, AlarmHistorianWriteOutcome.RetryPlease)] + [InlineData(HistorianAccessError.ErrorValue.NoData, AlarmHistorianWriteOutcome.RetryPlease)] + [InlineData(HistorianAccessError.ErrorValue.InvalidArgument, AlarmHistorianWriteOutcome.PermanentFail)] + [InlineData(HistorianAccessError.ErrorValue.ValidationFailed, AlarmHistorianWriteOutcome.PermanentFail)] + [InlineData(HistorianAccessError.ErrorValue.NullPointerArgument, AlarmHistorianWriteOutcome.PermanentFail)] + [InlineData(HistorianAccessError.ErrorValue.WriteToReadOnlyFile, AlarmHistorianWriteOutcome.PermanentFail)] + [InlineData(HistorianAccessError.ErrorValue.NotImplemented, AlarmHistorianWriteOutcome.PermanentFail)] + public void ClassifyOutcome_maps_error_code_to_expected_outcome( + HistorianAccessError.ErrorValue code, AlarmHistorianWriteOutcome expected) + { + SdkAlarmHistorianWriteBackend.ClassifyOutcome(code).ShouldBe(expected); + } + + // ── BuildConnectionArgs — read-only vs write shaping ────────────────── + + [Fact] + public void BuildConnectionArgs_write_connection_is_not_read_only() + { + // The alarm-event write path must open ReadOnly=false; AddStreamedValue on a + // read-only session fails with WriteToReadOnlyFile. + var args = SdkHistorianConnectionFactory.BuildConnectionArgs( + Config("h1"), HistorianConnectionType.Event, readOnly: false); + + args.ReadOnly.ShouldBeFalse(); + args.ConnectionType.ShouldBe(HistorianConnectionType.Event); + args.ServerName.ShouldBe("h1"); + } + + [Fact] + public void BuildConnectionArgs_query_connection_is_read_only() + { + var args = SdkHistorianConnectionFactory.BuildConnectionArgs( + Config("h1"), HistorianConnectionType.Process, readOnly: true); + + args.ReadOnly.ShouldBeTrue(); + args.ConnectionType.ShouldBe(HistorianConnectionType.Process); + } + + [Fact] + public void BuildConnectionArgs_non_integrated_security_carries_credentials() + { + var cfg = Config("h1"); + cfg.IntegratedSecurity = false; + cfg.UserName = "histuser"; + cfg.Password = "histpass"; + + var args = SdkHistorianConnectionFactory.BuildConnectionArgs( + cfg, HistorianConnectionType.Event, readOnly: false); + + args.IntegratedSecurity.ShouldBeFalse(); + args.UserName.ShouldBe("histuser"); + args.Password.ShouldBe("histpass"); + } + // ── Rig-gated integration tests ─────────────────────────────────────── // - // The tests below need a live AVEVA Historian install and are gated with - // Skip="rig-required". Once PR D.1 pins the SDK entry point, remove the - // Skip attribute and add them to the integration test run profile. + // The entry point (HistorianAccess.AddStreamedValue) is pinned and implemented; + // these need a live AVEVA Historian and are un-skipped during the PR D.1 smoke. - [Fact(Skip = "rig-required: needs a live AVEVA Historian + aahClientManaged SDK — enable in PR D.1")] + [Fact(Skip = "rig-required: needs a live AVEVA Historian — un-skip during the PR D.1 rollout smoke")] public async Task Live_single_event_roundtrip_returns_Ack() { - // Spec (PR C.1, Tests): "1 / 100 / 1000 events through a fake aahClientManaged - // writer; assert per-row outcome list parallel to input order." - // - // This slice exercises the *live* SDK path. The fake-backend variant at - // AahClientManagedAlarmEventWriterTests covers the same assertion without the rig. - var cfg = BuildRigConfig(); - var backend = new SdkAlarmHistorianWriteBackend(cfg); + var backend = new SdkAlarmHistorianWriteBackend(BuildRigConfig()); var outcomes = await backend.WriteBatchAsync(new[] { AlarmEvent("rig-E1") }, CancellationToken.None); @@ -102,19 +161,13 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests outcomes[0].ShouldBe(AlarmHistorianWriteOutcome.Ack); } - [Fact(Skip = "rig-required: needs a live AVEVA Historian cluster (two nodes) — enable in PR D.1")] + [Fact(Skip = "rig-required: needs a live AVEVA Historian cluster (two nodes) — un-skip during the PR D.1 rollout smoke")] public async Task Live_cluster_failover_primary_bad_rotates_to_secondary() { - // Spec (PR C.1, Tests): "Cluster failover: primary node returns - // BadCommunicationError; picker rotates to secondary; assert eventual success." - // - // Configure the first server name to point at a deliberately unreachable node - // and the second to the real Historian; the picker should mark the first node - // failed and succeed via the second. var cfg = new HistorianConfiguration { Enabled = true, - ServerNames = new System.Collections.Generic.List + ServerNames = new List { "invalid-primary-node-deliberately-unreachable", Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost", @@ -128,18 +181,29 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests var outcomes = await backend.WriteBatchAsync(new[] { AlarmEvent("rig-failover-E1") }, CancellationToken.None); - // The backend must succeed (Ack) via the secondary even though the primary was bad. outcomes.Length.ShouldBe(1); outcomes[0].ShouldBe(AlarmHistorianWriteOutcome.Ack); } + // ── helpers ─────────────────────────────────────────────────────────── + + private static HistorianConfiguration Config(string server) => new HistorianConfiguration + { + Enabled = true, + ServerName = server, + Port = 32568, + IntegratedSecurity = true, + CommandTimeoutSeconds = 30, + FailureCooldownSeconds = 60, + }; + private static AlarmHistorianEventDto AlarmEvent(string id) => new AlarmHistorianEventDto { EventId = id, SourceName = "TestSource", ConditionId = "TestSource.Level.HiHi", AlarmType = "AnalogLimitAlarm.HiHi", - Message = "C.1 integration test alarm", + Message = "C.1 test alarm", Severity = 500, EventTimeUtcTicks = DateTime.UtcNow.Ticks, AckComment = null, @@ -160,5 +224,16 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests var raw = Environment.GetEnvironmentVariable(envName); return int.TryParse(raw, out var parsed) ? parsed : defaultValue; } + + /// + /// Fake factory whose every connect attempt throws — drives the + /// connection-unavailable path without loading the native SDK. + /// + private sealed class ThrowingConnectionFactory : IHistorianConnectionFactory + { + public HistorianAccess CreateAndConnect( + HistorianConfiguration config, HistorianConnectionType type, bool readOnly = true) + => throw new InvalidOperationException($"simulated connect failure to {config.ServerName}"); + } } } diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.csproj b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.csproj index cced219..5f4a3d7 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.csproj +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.csproj @@ -24,4 +24,14 @@ + + + + ..\..\..\lib\aahClientManaged.dll + false + + +