feat(historian-sidecar): live aahClientManaged alarm-event write path (C.1)

SdkAlarmHistorianWriteBackend.WriteBatchAsync replaces the RetryPlease
placeholder with the real entry point — HistorianAccess.AddStreamedValue
(HistorianEvent, out HistorianAccessError) in aahClientManaged, pinned by
decompiling the installed SDK.

The write path opens its own ReadOnly=false connection: the query-side
HistorianDataSource opens ReadOnly sessions and AddStreamedValue fails on
those with WriteToReadOnlyFile. IHistorianConnectionFactory gains a readOnly
parameter (default true, query path unchanged); BuildConnectionArgs is
extracted as a pure helper. HistorianClusterEndpointPicker is shared for
node failover; connection-class errors abort the batch as RetryPlease and
reset the connection, malformed-input codes map to PermanentFail.

Tests: connection-unavailable batch deferral, ClassifyOutcome error-code
table, BuildConnectionArgs read-vs-write shaping (80 pass, 2 rig-skipped).
Live_* round-trip tests stay Skip-gated for the D.1 rollout smoke.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 16:08:32 -04:00
parent 419eda256b
commit cd2306db66
6 changed files with 544 additions and 142 deletions

View File

@@ -212,36 +212,40 @@ x64, which is not bitness-constrained like the worker). C.1 is independently
unblockable from A.2 if the goal is to wire up the scripted-alarm historian
path.
**Current state**:
**Current state (DONE — code)**:
`SdkAlarmHistorianWriteBackend` in `src\MxGateway.Worker\MxAccess\` is a
placeholder returning `RetryPlease`. The lmxopcua sidecar's `WriteAlarmEvents`
IPC slot is defined in `Ipc\Contracts.cs` but `Program.cs` constructs
`HistorianFrameHandler` without an `alarmWriter` (line 57 per the alarms plan).
The `IAlarmEventWriter` interface exists; only the production implementation
and the consumer wiring are missing.
C.1 shipped. `SdkAlarmHistorianWriteBackend.WriteBatchAsync` writes through the
real SDK entry point — **`HistorianAccess.AddStreamedValue(HistorianEvent, out
HistorianAccessError)`** in `aahClientManaged` — pinned 2026-05-18 by
decompiling the installed SDK. `Program.cs` and `Install-Services.ps1` were
already wired in the PR C.1 scaffolding. Two corrections to the assumptions
this doc was written under:
**What it needs**:
- **There is no `ArchestrAAlarmsAndEvents.SDK` writer.** That assembly
(`ArchestrAAlarmsAndEvents.SDK.Common.dll`, the only one installed) is a WCF
query-proxy base — no `AlarmHistorianWriter` type. The write path is the
`aahClientManaged` `HistorianAccess` surface.
- **The write path needs its own connection.** The query-side
`HistorianDataSource` opens `ReadOnly` sessions; `AddStreamedValue` on a
read-only session fails with `WriteToReadOnlyFile`.
`SdkAlarmHistorianWriteBackend` opens a dedicated `ReadOnly=false` connection
and shares only `HistorianClusterEndpointPicker` (not the connection object).
1. New `AahClientManagedAlarmEventWriter.cs` implementing `IAlarmEventWriter`
(defined in `Ipc\HistorianFrameHandler.cs`). Calls `aahClientManaged`'s
alarm-event write API — same path v1's `GalaxyHistorianWriter` used.
Uses `HistorianClusterEndpointPicker` for multi-node routing.
Maps `MxStatus` write outcomes to `HistorianWriteOutcome` enum
(Ack / PermanentFail / RetryPlease).
**What it needed** (all done):
2. `Program.cs` — build `AahClientManagedAlarmEventWriter` next to the
existing `BuildHistorian()` call; pass it to `HistorianFrameHandler`.
Gate behind `OTOPCUA_HISTORIAN_ALARM_WRITE_ENABLED` env var (default `true`
when `OTOPCUA_HISTORIAN_ENABLED=true`).
1. `SdkAlarmHistorianWriteBackend` builds a `HistorianEvent` per
`AlarmHistorianEventDto`, calls `AddStreamedValue`, and maps
`HistorianAccessError.ErrorValue` codes through
`AahClientManagedAlarmEventWriter.MapOutcome` (Ack / PermanentFail /
RetryPlease). `HistorianClusterEndpointPicker` drives multi-node failover.
2. `Program.cs``BuildAlarmWriter()` constructs the backend gated behind
`OTOPCUA_HISTORIAN_ALARM_WRITE_ENABLED`.
3. `Install-Services.ps1` — env var present in the install-time block.
3. `Install-Services.ps1` — add the new env var to the install-time block.
**What blocks C.1**: access to the `aahClientManaged` SDK on the dev box
(confirmed available per `project_aveva_platform_installed.md` — AVEVA
Historian SDK is present). C.1 can proceed without A.2 since the sidecar's
`aahClientManaged` is x64 and does not share the worker's x86 bitness
constraint.
**What remains for C.1**: only the live-rig write smoke — the `Live_*` tests
in `SdkAlarmHistorianWriteBackendTests` stay `Skip`-gated until D.1 confirms a
round-trip against a real AVEVA Historian, including the exact mandatory
`HistorianEvent` field set.
**Tests to write**:

View File

@@ -16,11 +16,14 @@ remains, plus follow-ups surfaced during the run.
## Follow-ups surfaced during the run
- **C.1 live SDK binding.** `SdkAlarmHistorianWriteBackend.WriteBatchAsync`
(`src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/`) is
still a placeholder returning `RetryPlease` for every event, so queued
alarm events are retained rather than written. Pinning the real
`aahClientManaged` alarm-write entry point is rig-gated — pairs with #20.
- **~~C.1 live SDK binding.~~** DONE (code). `SdkAlarmHistorianWriteBackend`
(`src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Backend/`) now
writes via the real entry point `HistorianAccess.AddStreamedValue(HistorianEvent,
out error)` in `aahClientManaged`. Two plan corrections found while pinning it:
(a) `ArchestrAAlarmsAndEvents.SDK` has no writer — it's a WCF query proxy;
(b) writes need their own `ReadOnly=false` connection, not the shared read
pool. Remaining: the live-rig write smoke (the `Live_*` tests are still
`Skip`-gated) — folds into #20 / D.1.
- **~~#24 Shelve-method routing.~~** DONE. Acknowledge / Confirm already
routed; OneShotShelve / TimedShelve / Unshelve now route via the native

View File

@@ -10,32 +10,24 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
/// </summary>
internal interface IHistorianConnectionFactory
{
HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type);
/// <summary>
/// Opens a Historian SDK connection. <paramref name="readOnly"/> defaults to
/// <c>true</c> for the query path; the alarm-event write backend passes
/// <c>false</c> because <c>HistorianAccess.AddStreamedValue</c> fails with
/// <c>WriteToReadOnlyFile</c> on a read-only session.
/// </summary>
HistorianAccess CreateAndConnect(
HistorianConfiguration config, HistorianConnectionType type, bool readOnly = true);
}
/// <summary>Production implementation — opens real Historian SDK connections.</summary>
internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory
{
public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type)
public HistorianAccess CreateAndConnect(
HistorianConfiguration config, HistorianConnectionType type, bool readOnly = true)
{
var conn = new HistorianAccess();
var args = new HistorianConnectionArgs
{
ServerName = config.ServerName,
TcpPort = (ushort)config.Port,
IntegratedSecurity = config.IntegratedSecurity,
UseArchestrAUser = config.IntegratedSecurity,
ConnectionType = type,
ReadOnly = true,
PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000)
};
if (!config.IntegratedSecurity)
{
args.UserName = config.UserName ?? string.Empty;
args.Password = config.Password ?? string.Empty;
}
var args = BuildConnectionArgs(config, type, readOnly);
if (!conn.OpenConnection(args, out var error))
{
@@ -69,5 +61,32 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
throw new TimeoutException(
$"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s");
}
/// <summary>
/// Builds the <see cref="HistorianConnectionArgs"/> for a connection. Pure (no SDK
/// side effects) so the read-only-vs-write argument shaping is unit-testable.
/// </summary>
internal static HistorianConnectionArgs BuildConnectionArgs(
HistorianConfiguration config, HistorianConnectionType type, bool readOnly)
{
var args = new HistorianConnectionArgs
{
ServerName = config.ServerName,
TcpPort = (ushort)config.Port,
IntegratedSecurity = config.IntegratedSecurity,
UseArchestrAUser = config.IntegratedSecurity,
ConnectionType = type,
ReadOnly = readOnly,
PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000)
};
if (!config.IntegratedSecurity)
{
args.UserName = config.UserName ?? string.Empty;
args.Password = config.Password ?? string.Empty;
}
return args;
}
}
}

View File

@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
@@ -8,39 +10,85 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
{
/// <summary>
/// Production <see cref="IAlarmHistorianWriteBackend"/> backed by AVEVA Historian's
/// <c>aahClientManaged</c> alarm-event write API. The exact SDK entry point is
/// pinned during the live-rig smoke in PR D.1 — until that gate, this backend
/// reports <see cref="AlarmHistorianWriteOutcome.RetryPlease"/> for every
/// event with a structured diagnostic so the lmxopcua-side
/// <c>SqliteStoreAndForwardSink</c> retains the queued events rather than dropping
/// or hard-failing them.
/// <c>aahClientManaged</c> SDK. Each <see cref="AlarmHistorianEventDto"/> is written via
/// <c>HistorianAccess.AddStreamedValue(HistorianEvent, out HistorianAccessError)</c> —
/// the alarm-event write entry point pinned during PR C.1.
/// </summary>
/// <remarks>
/// <para>
/// Cluster failover reuses <see cref="HistorianClusterEndpointPicker"/> via
/// the shared <see cref="HistorianDataSource"/> connection pool — there is
/// no second connection pool for writes. Wonderware Historian's alarm-event
/// write surface accepts the same <c>HistorianAccess</c> session a read
/// opens, so reusing the picker is parity-preserving with v1's
/// <c>GalaxyHistorianWriter</c>.
/// The write path needs its <b>own</b> connection. The query-side
/// <see cref="HistorianDataSource"/> opens <c>ReadOnly</c> sessions, and
/// <c>AddStreamedValue</c> on a read-only session fails with
/// <c>WriteToReadOnlyFile</c>. This backend therefore opens a dedicated
/// <c>ReadOnly = false</c> connection; it shares
/// <see cref="HistorianClusterEndpointPicker"/> for node selection and failover but
/// not the connection object itself.
/// </para>
/// <para>
/// Once D.1 confirms the SDK entry point, this class swaps the placeholder
/// body for the real call sequence. The mapping from raw HRESULT /
/// <c>HistorianError</c> codes onto <see cref="AlarmHistorianWriteOutcome"/>
/// is already shared via <see cref="AahClientManagedAlarmEventWriter.MapOutcome"/>
/// so the smoke-pinned change stays minimal.
/// Per-event <c>HistorianAccessError.ErrorValue</c> codes map onto
/// <see cref="AlarmHistorianWriteOutcome"/> via
/// <see cref="AahClientManagedAlarmEventWriter.MapOutcome"/>. A connection-class
/// error aborts the remainder of the batch as
/// <see cref="AlarmHistorianWriteOutcome.RetryPlease"/> and resets the connection so
/// the next drain tick reconnects — possibly to a different cluster node.
/// </para>
/// <para>
/// The exact <c>HistorianEvent</c> field set required by the Historian is confirmed
/// against a live install during the PR D.1 rollout smoke; <see cref="ToHistorianEvent"/>
/// maps the unambiguous fields and carries operator comment / condition id as event
/// properties.
/// </para>
/// </remarks>
public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend
public sealed class SdkAlarmHistorianWriteBackend : IAlarmHistorianWriteBackend, IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<SdkAlarmHistorianWriteBackend>();
// ErrorValue codes that mean the connection/server is the problem (transient) rather
// than the event payload. These abort the rest of the batch and trigger a reconnect.
private static readonly HashSet<HistorianAccessError.ErrorValue> ConnectionErrors =
new HashSet<HistorianAccessError.ErrorValue>
{
HistorianAccessError.ErrorValue.FailedToConnect,
HistorianAccessError.ErrorValue.FailedToCreateSession,
HistorianAccessError.ErrorValue.NoReply,
HistorianAccessError.ErrorValue.NotReady,
HistorianAccessError.ErrorValue.NotInitialized,
HistorianAccessError.ErrorValue.Stopping,
HistorianAccessError.ErrorValue.Win32Exception,
HistorianAccessError.ErrorValue.InvalidResponse,
};
// ErrorValue codes that mean the event itself is malformed — permanent, never retried.
private static readonly HashSet<HistorianAccessError.ErrorValue> MalformedErrors =
new HashSet<HistorianAccessError.ErrorValue>
{
HistorianAccessError.ErrorValue.InvalidArgument,
HistorianAccessError.ErrorValue.ValidationFailed,
HistorianAccessError.ErrorValue.NullPointerArgument,
HistorianAccessError.ErrorValue.WriteToReadOnlyFile,
HistorianAccessError.ErrorValue.NotImplemented,
HistorianAccessError.ErrorValue.NotApplicable,
};
private readonly HistorianConfiguration _config;
private readonly IHistorianConnectionFactory _factory;
private readonly HistorianClusterEndpointPicker _picker;
private readonly object _connectionLock = new object();
private HistorianAccess? _connection;
private string? _activeNode;
private bool _disposed;
public SdkAlarmHistorianWriteBackend(HistorianConfiguration config)
: this(config, new SdkHistorianConnectionFactory(), null) { }
internal SdkAlarmHistorianWriteBackend(
HistorianConfiguration config,
IHistorianConnectionFactory factory,
HistorianClusterEndpointPicker? picker = null)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
_picker = picker ?? new HistorianClusterEndpointPicker(config);
}
public Task<AlarmHistorianWriteOutcome[]> WriteBatchAsync(
@@ -52,22 +100,265 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
return Task.FromResult(new AlarmHistorianWriteOutcome[0]);
}
// Placeholder: pin the SDK entry point in PR D.1 against a live AVEVA
// Historian. Until then the call returns RetryPlease for every slot so
// the lmxopcua-side sink keeps the events queued rather than dropping
// them — same effect as the current NullAlarmHistorianSink fallback,
// but visible through the structured diagnostic + per-event outcome.
Log.Warning(
"Alarm historian SDK write path not yet pinned — returning RetryPlease for {Count} event(s) from server {Server}. PR D.1 swaps this for the live aahClientManaged call.",
events.Length,
_config.ServerName);
var outcomes = new AlarmHistorianWriteOutcome[events.Length];
for (var i = 0; i < outcomes.Length; i++)
HistorianAccess connection;
try
{
outcomes[i] = AlarmHistorianWriteOutcome.RetryPlease;
connection = EnsureConnected();
}
catch (ObjectDisposedException)
{
throw;
}
catch (Exception ex)
{
// No reachable node — defer the whole batch so the lmxopcua-side SQLite
// store-and-forward sink retains the rows for the next drain tick.
Log.Warning(ex,
"Alarm historian write connection unavailable — deferring {Count} event(s) as RetryPlease",
events.Length);
FillRemaining(outcomes, 0, AlarmHistorianWriteOutcome.RetryPlease);
return Task.FromResult(outcomes);
}
for (var i = 0; i < events.Length; i++)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
var historianEvent = ToHistorianEvent(events[i]);
if (connection.AddStreamedValue(historianEvent, out var error))
{
outcomes[i] = AlarmHistorianWriteOutcome.Ack;
continue;
}
var code = error?.ErrorCode ?? HistorianAccessError.ErrorValue.Failure;
if (ConnectionErrors.Contains(code))
{
// Connection died mid-batch — drop it and defer this event + the rest.
Log.Warning(
"Alarm historian write hit connection-level error {Code} ({Desc}); resetting connection, deferring {Remaining} event(s)",
code, error?.ErrorDescription, events.Length - i);
HandleConnectionError(error?.ErrorDescription);
FillRemaining(outcomes, i, AlarmHistorianWriteOutcome.RetryPlease);
return Task.FromResult(outcomes);
}
outcomes[i] = ClassifyOutcome(code);
Log.Warning(
"Alarm historian write rejected event {EventId}: {Code} ({Desc}) -> {Outcome}",
events[i].EventId, code, error?.ErrorDescription, outcomes[i]);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// Transport-level throw (SDK marshalling fault, broken connection) —
// reset and defer this event + the rest.
Log.Warning(ex,
"Alarm historian write threw for event {EventId}; resetting connection, deferring {Remaining} event(s)",
events[i].EventId, events.Length - i);
HandleConnectionError(ex.Message);
FillRemaining(outcomes, i, AlarmHistorianWriteOutcome.RetryPlease);
return Task.FromResult(outcomes);
}
}
return Task.FromResult(outcomes);
}
/// <summary>
/// Maps an <see cref="AlarmHistorianEventDto"/> onto the SDK's
/// <c>HistorianEvent</c>. Operator comment and originating condition id ride as
/// event properties — operator-comment fidelity is the field the value-driven
/// fallback path cannot carry.
/// </summary>
internal static HistorianEvent ToHistorianEvent(AlarmHistorianEventDto dto)
{
// The ArchestrA SDK marks these HistorianEvent members obsolete but still honours
// them on write; their successors aren't wired in the version we bind against.
// Using them is the documented v1 behaviour — mirrors HistorianDataSource.ToDto,
// suppressed locally so any other deprecated-surface use still surfaces as an error.
#pragma warning disable CS0618
var historianEvent = new HistorianEvent
{
IsAlarm = true,
Source = dto.SourceName ?? string.Empty,
EventType = string.IsNullOrEmpty(dto.AlarmType) ? "Alarm" : dto.AlarmType,
EventTime = new DateTime(dto.EventTimeUtcTicks, DateTimeKind.Utc),
ReceivedTime = DateTime.UtcNow,
Severity = dto.Severity,
DisplayText = dto.Message ?? string.Empty,
};
if (Guid.TryParse(dto.EventId, out var id))
{
historianEvent.Id = id;
}
#pragma warning restore CS0618
if (!string.IsNullOrEmpty(dto.AckComment))
{
historianEvent.AddProperty("Comment", dto.AckComment, out _);
}
if (!string.IsNullOrEmpty(dto.ConditionId))
{
historianEvent.AddProperty("ConditionId", dto.ConditionId, out _);
}
return historianEvent;
}
/// <summary>
/// Classifies a non-connection-class <c>HistorianAccessError.ErrorValue</c> into an
/// <see cref="AlarmHistorianWriteOutcome"/> by routing it through the shared
/// <see cref="AahClientManagedAlarmEventWriter.MapOutcome"/> mapping. Exposed for
/// unit tests — connection-class codes are handled separately by the batch loop.
/// </summary>
internal static AlarmHistorianWriteOutcome ClassifyOutcome(HistorianAccessError.ErrorValue code)
=> AahClientManagedAlarmEventWriter.MapOutcome(
(int)code,
isCommunicationError: ConnectionErrors.Contains(code),
isMalformedInput: MalformedErrors.Contains(code));
private static void FillRemaining(
AlarmHistorianWriteOutcome[] outcomes, int from, AlarmHistorianWriteOutcome value)
{
for (var i = from; i < outcomes.Length; i++)
{
outcomes[i] = value;
}
}
private HistorianAccess EnsureConnected()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(SdkAlarmHistorianWriteBackend));
}
var existing = Volatile.Read(ref _connection);
if (existing != null) return existing;
var (conn, node) = ConnectToAnyHealthyNode();
lock (_connectionLock)
{
if (_disposed)
{
SafeClose(conn);
throw new ObjectDisposedException(nameof(SdkAlarmHistorianWriteBackend));
}
if (_connection != null)
{
SafeClose(conn);
return _connection;
}
_connection = conn;
_activeNode = node;
Log.Information("Alarm historian write connection opened to {Server}:{Port}", node, _config.Port);
return conn;
}
}
private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode()
{
var candidates = _picker.GetHealthyNodes();
if (candidates.Count == 0)
{
throw new InvalidOperationException(
_picker.NodeCount == 0
? "No historian nodes configured"
: $"All {_picker.NodeCount} historian nodes are in cooldown — no healthy endpoints");
}
Exception? lastException = null;
foreach (var node in candidates)
{
try
{
var conn = _factory.CreateAndConnect(
CloneConfigWithServerName(node), HistorianConnectionType.Event, readOnly: false);
_picker.MarkHealthy(node);
return (conn, node);
}
catch (Exception ex)
{
_picker.MarkFailed(node, ex.Message);
lastException = ex;
Log.Warning(ex, "Alarm historian node {Node} failed during write-connect; trying next", node);
}
}
throw new InvalidOperationException(
$"All {candidates.Count} healthy historian candidate(s) failed during write-connect: " +
(lastException?.Message ?? "(no detail)"),
lastException);
}
private void HandleConnectionError(string? detail)
{
lock (_connectionLock)
{
if (_connection == null) return;
SafeClose(_connection);
_connection = null;
var failedNode = _activeNode;
_activeNode = null;
if (failedNode != null) _picker.MarkFailed(failedNode, detail ?? "mid-batch failure");
Log.Warning("Alarm historian write connection reset (node={Node})", failedNode ?? "(unknown)");
}
}
private static void SafeClose(HistorianAccess conn)
{
try
{
conn.CloseConnection(out _);
conn.Dispose();
}
catch (Exception ex)
{
Log.Debug(ex, "Error closing alarm historian write connection");
}
}
private HistorianConfiguration CloneConfigWithServerName(string serverName) => new HistorianConfiguration
{
Enabled = _config.Enabled,
ServerName = serverName,
ServerNames = _config.ServerNames,
FailureCooldownSeconds = _config.FailureCooldownSeconds,
IntegratedSecurity = _config.IntegratedSecurity,
UserName = _config.UserName,
Password = _config.Password,
Port = _config.Port,
CommandTimeoutSeconds = _config.CommandTimeoutSeconds,
MaxValuesPerRead = _config.MaxValuesPerRead,
RequestTimeoutSeconds = _config.RequestTimeoutSeconds,
};
public void Dispose()
{
if (_disposed) return;
_disposed = true;
lock (_connectionLock)
{
if (_connection != null)
{
SafeClose(_connection);
_connection = null;
}
}
}
}
}

View File

@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
@@ -11,42 +12,42 @@ using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
{
/// <summary>
/// PR C.1 — pins the <see cref="SdkAlarmHistorianWriteBackend"/> contract:
/// PR C.1 — covers <see cref="SdkAlarmHistorianWriteBackend"/>, the aahClientManaged-bound
/// alarm-event writer. The SDK-touching batch loop itself is exercised by the rig-gated
/// <c>Live_*</c> tests (D.1); the unit tests below pin the parts that are SDK-type-free:
/// <list type="bullet">
/// <item><description>
/// Unit: the placeholder backend returns <see cref="AlarmHistorianWriteOutcome.RetryPlease"/>
/// for every slot so the lmxopcua-side store-and-forward sink retains events rather than
/// dropping them while D.1 is unresolved.
/// </description></item>
/// <item><description>
/// Integration (rig-gated): once D.1 pins the live SDK entry point the Skip attribute is
/// removed. The live test writes a synthetic batch to a real AVEVA Historian and asserts
/// the cluster picker rotates from a broken primary to a healthy secondary.
/// </description></item>
/// <item><description>connection-unavailable → whole batch deferred as RetryPlease;</description></item>
/// <item><description><see cref="SdkAlarmHistorianWriteBackend.ClassifyOutcome"/> error-code mapping;</description></item>
/// <item><description><see cref="SdkHistorianConnectionFactory.BuildConnectionArgs"/> read-only-vs-write shaping.</description></item>
/// </list>
/// </summary>
[Trait("Category", "Unit")]
public sealed class SdkAlarmHistorianWriteBackendTests
{
// ── Placeholder-mode tests (no rig required) ─────────────────────────
// ── Connection-unavailable path (deterministic, no SDK load) ──────────
[Fact]
public async Task Placeholder_returns_RetryPlease_for_every_slot_so_queue_is_preserved()
public async Task Empty_batch_returns_empty_array()
{
// The SDK call-site in SdkAlarmHistorianWriteBackend is not yet pinned (PR D.1).
// Until D.1 swaps in the live call, the backend must return RetryPlease for every
// event so the lmxopcua-side SqliteStoreAndForwardSink retains the rows instead of
// dropping them — same effect as the NullAlarmHistorianSink fallback, but each
// slot is individually addressable for the drain worker.
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
var backend = new SdkAlarmHistorianWriteBackend(cfg);
var backend = new SdkAlarmHistorianWriteBackend(
Config("any"), new ThrowingConnectionFactory());
var events = new[]
{
AlarmEvent("E1"),
AlarmEvent("E2"),
AlarmEvent("E3"),
};
var outcomes = await backend.WriteBatchAsync(
Array.Empty<AlarmHistorianEventDto>(), CancellationToken.None);
outcomes.ShouldBeEmpty();
}
[Fact]
public async Task Unreachable_node_defers_whole_batch_as_RetryPlease()
{
// No node can be connected — the backend must defer every event so the
// lmxopcua-side SQLite store-and-forward sink retains the rows rather than
// dropping them.
var backend = new SdkAlarmHistorianWriteBackend(
Config("unreachable"), new ThrowingConnectionFactory());
var events = new[] { AlarmEvent("E1"), AlarmEvent("E2"), AlarmEvent("E3") };
var outcomes = await backend.WriteBatchAsync(events, CancellationToken.None);
outcomes.Length.ShouldBe(events.Length);
@@ -54,23 +55,12 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
}
[Fact]
public async Task Placeholder_returns_empty_array_for_empty_batch()
public async Task Unreachable_node_large_batch_returns_one_outcome_per_event()
{
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
var backend = new SdkAlarmHistorianWriteBackend(cfg);
var outcomes = await backend.WriteBatchAsync(Array.Empty<AlarmHistorianEventDto>(), CancellationToken.None);
outcomes.ShouldBeEmpty();
}
[Fact]
public async Task Placeholder_returns_same_count_as_input_for_large_batch()
{
// Guards against an off-by-one error in the placeholder array allocation —
// WriteBatchAsync must always return exactly as many outcomes as input events.
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
var backend = new SdkAlarmHistorianWriteBackend(cfg);
// Guards the outcome-array allocation: WriteBatchAsync must always return exactly
// as many outcomes as input events, even on the whole-batch-deferred path.
var backend = new SdkAlarmHistorianWriteBackend(
Config("unreachable"), new ThrowingConnectionFactory());
var batch = Enumerable.Range(0, 1000).Select(i => AlarmEvent($"E{i}")).ToArray();
var outcomes = await backend.WriteBatchAsync(batch, CancellationToken.None);
@@ -79,22 +69,91 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
outcomes.ShouldAllBe(o => o == AlarmHistorianWriteOutcome.RetryPlease);
}
[Fact]
public async Task Connect_failure_marks_node_failed_in_picker()
{
// Every connect attempt throws → the picker should record the failure so the
// node enters cooldown (cluster-failover plumbing).
var cfg = Config("node-a");
var picker = new HistorianClusterEndpointPicker(cfg);
var backend = new SdkAlarmHistorianWriteBackend(cfg, new ThrowingConnectionFactory(), picker);
await backend.WriteBatchAsync(new[] { AlarmEvent("E1") }, CancellationToken.None);
picker.HealthyNodeCount.ShouldBe(0, "the only node failed to connect and is now in cooldown");
}
// ── ClassifyOutcome — error-code → outcome mapping ────────────────────
[Theory]
[InlineData(HistorianAccessError.ErrorValue.Success, AlarmHistorianWriteOutcome.Ack)]
[InlineData(HistorianAccessError.ErrorValue.FailedToConnect, AlarmHistorianWriteOutcome.RetryPlease)]
[InlineData(HistorianAccessError.ErrorValue.FailedToCreateSession, AlarmHistorianWriteOutcome.RetryPlease)]
[InlineData(HistorianAccessError.ErrorValue.NoReply, AlarmHistorianWriteOutcome.RetryPlease)]
[InlineData(HistorianAccessError.ErrorValue.NotReady, AlarmHistorianWriteOutcome.RetryPlease)]
[InlineData(HistorianAccessError.ErrorValue.Failure, AlarmHistorianWriteOutcome.RetryPlease)]
[InlineData(HistorianAccessError.ErrorValue.NoData, AlarmHistorianWriteOutcome.RetryPlease)]
[InlineData(HistorianAccessError.ErrorValue.InvalidArgument, AlarmHistorianWriteOutcome.PermanentFail)]
[InlineData(HistorianAccessError.ErrorValue.ValidationFailed, AlarmHistorianWriteOutcome.PermanentFail)]
[InlineData(HistorianAccessError.ErrorValue.NullPointerArgument, AlarmHistorianWriteOutcome.PermanentFail)]
[InlineData(HistorianAccessError.ErrorValue.WriteToReadOnlyFile, AlarmHistorianWriteOutcome.PermanentFail)]
[InlineData(HistorianAccessError.ErrorValue.NotImplemented, AlarmHistorianWriteOutcome.PermanentFail)]
public void ClassifyOutcome_maps_error_code_to_expected_outcome(
HistorianAccessError.ErrorValue code, AlarmHistorianWriteOutcome expected)
{
SdkAlarmHistorianWriteBackend.ClassifyOutcome(code).ShouldBe(expected);
}
// ── BuildConnectionArgs — read-only vs write shaping ──────────────────
[Fact]
public void BuildConnectionArgs_write_connection_is_not_read_only()
{
// The alarm-event write path must open ReadOnly=false; AddStreamedValue on a
// read-only session fails with WriteToReadOnlyFile.
var args = SdkHistorianConnectionFactory.BuildConnectionArgs(
Config("h1"), HistorianConnectionType.Event, readOnly: false);
args.ReadOnly.ShouldBeFalse();
args.ConnectionType.ShouldBe(HistorianConnectionType.Event);
args.ServerName.ShouldBe("h1");
}
[Fact]
public void BuildConnectionArgs_query_connection_is_read_only()
{
var args = SdkHistorianConnectionFactory.BuildConnectionArgs(
Config("h1"), HistorianConnectionType.Process, readOnly: true);
args.ReadOnly.ShouldBeTrue();
args.ConnectionType.ShouldBe(HistorianConnectionType.Process);
}
[Fact]
public void BuildConnectionArgs_non_integrated_security_carries_credentials()
{
var cfg = Config("h1");
cfg.IntegratedSecurity = false;
cfg.UserName = "histuser";
cfg.Password = "histpass";
var args = SdkHistorianConnectionFactory.BuildConnectionArgs(
cfg, HistorianConnectionType.Event, readOnly: false);
args.IntegratedSecurity.ShouldBeFalse();
args.UserName.ShouldBe("histuser");
args.Password.ShouldBe("histpass");
}
// ── Rig-gated integration tests ───────────────────────────────────────
//
// The tests below need a live AVEVA Historian install and are gated with
// Skip="rig-required". Once PR D.1 pins the SDK entry point, remove the
// Skip attribute and add them to the integration test run profile.
// The entry point (HistorianAccess.AddStreamedValue) is pinned and implemented;
// these need a live AVEVA Historian and are un-skipped during the PR D.1 smoke.
[Fact(Skip = "rig-required: needs a live AVEVA Historian + aahClientManaged SDK — enable in PR D.1")]
[Fact(Skip = "rig-required: needs a live AVEVA Historian — un-skip during the PR D.1 rollout smoke")]
public async Task Live_single_event_roundtrip_returns_Ack()
{
// Spec (PR C.1, Tests): "1 / 100 / 1000 events through a fake aahClientManaged
// writer; assert per-row outcome list parallel to input order."
//
// This slice exercises the *live* SDK path. The fake-backend variant at
// AahClientManagedAlarmEventWriterTests covers the same assertion without the rig.
var cfg = BuildRigConfig();
var backend = new SdkAlarmHistorianWriteBackend(cfg);
var backend = new SdkAlarmHistorianWriteBackend(BuildRigConfig());
var outcomes = await backend.WriteBatchAsync(new[] { AlarmEvent("rig-E1") }, CancellationToken.None);
@@ -102,19 +161,13 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
outcomes[0].ShouldBe(AlarmHistorianWriteOutcome.Ack);
}
[Fact(Skip = "rig-required: needs a live AVEVA Historian cluster (two nodes) — enable in PR D.1")]
[Fact(Skip = "rig-required: needs a live AVEVA Historian cluster (two nodes) — un-skip during the PR D.1 rollout smoke")]
public async Task Live_cluster_failover_primary_bad_rotates_to_secondary()
{
// Spec (PR C.1, Tests): "Cluster failover: primary node returns
// BadCommunicationError; picker rotates to secondary; assert eventual success."
//
// Configure the first server name to point at a deliberately unreachable node
// and the second to the real Historian; the picker should mark the first node
// failed and succeed via the second.
var cfg = new HistorianConfiguration
{
Enabled = true,
ServerNames = new System.Collections.Generic.List<string>
ServerNames = new List<string>
{
"invalid-primary-node-deliberately-unreachable",
Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
@@ -128,18 +181,29 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
var outcomes = await backend.WriteBatchAsync(new[] { AlarmEvent("rig-failover-E1") }, CancellationToken.None);
// The backend must succeed (Ack) via the secondary even though the primary was bad.
outcomes.Length.ShouldBe(1);
outcomes[0].ShouldBe(AlarmHistorianWriteOutcome.Ack);
}
// ── helpers ───────────────────────────────────────────────────────────
private static HistorianConfiguration Config(string server) => new HistorianConfiguration
{
Enabled = true,
ServerName = server,
Port = 32568,
IntegratedSecurity = true,
CommandTimeoutSeconds = 30,
FailureCooldownSeconds = 60,
};
private static AlarmHistorianEventDto AlarmEvent(string id) => new AlarmHistorianEventDto
{
EventId = id,
SourceName = "TestSource",
ConditionId = "TestSource.Level.HiHi",
AlarmType = "AnalogLimitAlarm.HiHi",
Message = "C.1 integration test alarm",
Message = "C.1 test alarm",
Severity = 500,
EventTimeUtcTicks = DateTime.UtcNow.Ticks,
AckComment = null,
@@ -160,5 +224,16 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
var raw = Environment.GetEnvironmentVariable(envName);
return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
}
/// <summary>
/// Fake factory whose every connect attempt throws — drives the
/// connection-unavailable path without loading the native SDK.
/// </summary>
private sealed class ThrowingConnectionFactory : IHistorianConnectionFactory
{
public HistorianAccess CreateAndConnect(
HistorianConfiguration config, HistorianConnectionType type, bool readOnly = true)
=> throw new InvalidOperationException($"simulated connect failure to {config.ServerName}");
}
}
}

View File

@@ -24,4 +24,14 @@
<ProjectReference Include="..\..\..\src\Drivers\ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware\ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.csproj"/>
</ItemGroup>
<ItemGroup>
<!-- Wonderware Historian SDK — SdkAlarmHistorianWriteBackendTests pins the
error-code (HistorianAccessError.ErrorValue) and connection-arg shaping;
a DLL <Reference> doesn't flow transitively through the ProjectReference. -->
<Reference Include="aahClientManaged">
<HintPath>..\..\..\lib\aahClientManaged.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
</ItemGroup>
</Project>