Compare commits

...

2 Commits

Author SHA1 Message Date
Joseph Doherty
6df1a79d35 Phase 2 PR 5 — port Wonderware Historian SDK into Driver.Galaxy.Host/Backend/Historian/. The full v1 Historian.Aveva code path (HistorianDataSource + HistorianClusterEndpointPicker + IHistorianConnectionFactory + SdkHistorianConnectionFactory) now lives inside Galaxy.Host instead of the previously-required out-of-tree plugin + HistorianPluginLoader AssemblyResolve hack, and MxAccessGalaxyBackend.HistoryReadAsync — which previously returned a Phase 2 Task B.1.h follow-up placeholder — now delegates to the ported HistorianDataSource.ReadRawAsync, maps HistorianSample to GalaxyDataValue via the IPC wire shape, and reports Success=true with per-tag HistoryTagValues arrays. OPC-UA-free surface inside Galaxy.Host: the v1 code returned Opc.Ua.DataValue on the hot path, which would have required dragging OPCFoundation.NetStandard.Opc.Ua.Server into net48 x86 Galaxy.Host and bleeding OPC types across the IPC boundary — instead, the port introduces HistorianSample (Value, Quality byte, TimestampUtc) + HistorianAggregateSample (Value, TimestampUtc) POCOs that carry the raw MX quality byte through the pipe unchanged, and the OPC translation happens on the Proxy side via the existing QualityMapper that the live-read path already uses. Decision #13's IPC data-shape contract survives intact — GalaxyDataValue (TagReference + ValueBytes MessagePack + ValueMessagePackType + StatusCode + SourceTimestampUtcUnixMs + ServerTimestampUtcUnixMs) — so no Shared.Contracts wire break vs PR 4. Cluster failover preserved verbatim: HistorianClusterEndpointPicker is the thread-safe pure-logic picker ported verbatim with no SDK dependency (injected DateTime clock, per-node cooldown state, unknown-node-name tolerance, case-insensitive de-dup on configuration-order list), ConnectToAnyHealthyNode iterates the picker's healthy candidates, clones config per attempt, calls the factory, marks healthy on success / failed on exception with the failure message stored for dashboard surfacing, throws "All N healthy historian candidate(s) failed" with the last exception chained when every node exhausts. Process path + Event path use separate HistorianAccess connections (CreateHistoryQuery vs CreateEventQuery vs CreateAnalogSummaryQuery on the SDK surface) guarded by independent _connection/_eventConnection locks — a mid-query failure on one silo resets only that connection, the other stays open. Four SDK paths ported: ReadRawAsync (RetrievalMode.Full, BatchSize from config.MaxValuesPerRead, MoveNext pump, per-sample quality + value decode with the StringValue/Value fallback the v1 code did, limit-based early exit), ReadAggregateAsync (AnalogSummaryQuery + Resolution in ms, ExtractAggregateValue maps Average/Minimum/Maximum/ValueCount/First/Last/StdDev column names — the NodeId to column mapping is moved to the Proxy side since the IPC request carries a string column), ReadAtTimeAsync (per-timestamp HistoryQuery with RetrievalMode.Interpolated + BatchSize=1, returns Quality=0 / Value=null for missing samples), ReadEventsAsync (EventQuery + AddEventFilter("Source",Equal,sourceName) when sourceName is non-null, EventOrder.Ascending, EventCount = maxEvents or config.MaxValuesPerRead); GetHealthSnapshot returns the full runtime-health snapshot (TotalQueries/Successes/Failures + ConsecutiveFailures + LastSuccess/FailureTime + LastError + ProcessConnectionOpen/EventConnectionOpen + ActiveProcessNode/ActiveEventNode + per-node state list). ReadRaw is the only path wired through IPC in PR 5 (HistoryReadRequest/HistoryTagValues/HistoryReadResponse already existed in Shared.Contracts); Aggregate/AtTime/Events/Health are ported-but-not-yet-IPC-exposed — they stay internal to Galaxy.Host for PR 6+ to surface via new contract message kinds (aggregate = OPC UA HistoryReadProcessed, at-time = HistoryReadAtTime, events = HistoryReadEvents, health = admin dashboard IPC query). Galaxy.Host csproj gains aahClientManaged + aahClientCommon references with Private=false (managed wrappers) + None items for aahClient.dll + Historian.CBE.dll + Historian.DPAPI.dll + ArchestrA.CloudHistorian.Contract.dll native satellites staged alongside the host exe via CopyToOutputDirectory=PreserveNewest so aahClientManaged can P/Invoke into them at runtime without an AssemblyResolve hook (cleaner than the v1 HistorianPluginLoader.cs 180-LOC AssemblyResolve + Assembly.LoadFrom dance that existed solely because the plugin was loaded late from Host/bin/Debug/net48/Historian/). Program.cs adds BuildHistorianIfEnabled() that reads OTOPCUA_HISTORIAN_ENABLED (true or 1) + OTOPCUA_HISTORIAN_SERVER + OTOPCUA_HISTORIAN_SERVERS (comma-separated cluster list overrides single-server) + OTOPCUA_HISTORIAN_PORT (default 32568) + OTOPCUA_HISTORIAN_INTEGRATED (default true) + OTOPCUA_HISTORIAN_USER/OTOPCUA_HISTORIAN_PASS + OTOPCUA_HISTORIAN_TIMEOUT_SEC (30) + OTOPCUA_HISTORIAN_MAX_VALUES (10000) + OTOPCUA_HISTORIAN_COOLDOWN_SEC (60), returns null when disabled so MxAccessGalaxyBackend.HistoryReadAsync surfaces a clean "Historian disabled" Success=false instead of a localhost-SDK hang; server.RunAsync finally block now also casts backend to IDisposable.Dispose() so the historian SDK connections get cleanly closed on Ctrl+C. MxAccessGalaxyBackend gains an IHistorianDataSource? historian constructor parameter (defaults null to preserve existing Host.Tests call sites that don't exercise HistoryRead), implements IDisposable that forwards to _historian.Dispose(), and the pragma warning disable CS0618 is locally scoped to the ToDto(HistorianEvent) mapper since the SDK marks Id/Source/DisplayText/Severity obsolete but the replacement surface isn't available in the aahClientManaged version we bind against — every other deprecated-SDK use still surfaces as an error under TreatWarningsAsErrors. Ported from v1 Historian.Aveva unchanged: the CloneConfigWithServerName helper that preserves every config field except ServerName per attempt; the double-checked locking in EnsureConnected/EnsureEventConnected (fast path = Volatile.Read outside lock, slow path acquires lock + re-checks + disposes any raced-in-parallel connection); HandleConnectionError/HandleEventConnectionError that close the dead connection, clear the active-node tracker, MarkFailed the picker entry with the exception message so the node enters cooldown, and log the reset with node= for operator correlation; RecordSuccess/RecordFailure that bump counters under _healthLock. Tests: HistorianClusterEndpointPickerTests (7 cases) — single-node ServerName fallback when ServerNames empty, MarkFailed enters cooldown and skips, cooldown expires after window, MarkHealthy immediately clears, all-in-cooldown returns empty healthy list, Snapshot reports failure count + last error + IsHealthy, case-insensitive de-dup on duplicate hostnames. HistorianWiringTests (2 cases) — HistoryReadAsync returns "Historian disabled" Success=false when historian:null passed; HistoryReadAsync with a fake IHistorianDataSource maps the returned HistorianSample (Value=42.5, Quality=192 Good, Timestamp) to a GalaxyDataValue with StatusCode=0u + SourceTimestampUtcUnixMs matching the sample + MessagePack-encoded value bytes. InternalsVisibleTo("...Host.Tests") added to Galaxy.Host.csproj so tests can reach the internal HistorianClusterEndpointPicker. Full Galaxy.Host.Tests suite: 24 pass / 0 fail (9 new historian + 15 pre-existing MemoryWatchdog/PostMortemMmf/RecyclePolicy/StaPump/EndToEndIpc/Handshake). Full solution build: 0 errors (202 pre-existing warnings). The v1 Historian.Aveva project + Historian.Aveva.Tests still build intact because the archive PR (Stream D.1 destructive delete) is still ahead of us — PR 5 intentionally does not delete either; once PR 2+3 merge and the archive-delete PR lands, a follow-up cleanup can remove Historian.Aveva + its 4 source files + 18 test cases. Alarm subsystem wire-up (OnAlarmEvent raising from MxAccessGalaxyBackend via AlarmExtension primitives) + host-status push (OnHostStatusChanged via a ported GalaxyRuntimeProbeManager) remain PR 6 candidates; they were on the same "Task B.1.h follow-up" list and share the IPC connection-sink wiring with the historian events path — it made PR 5 scope-manageable to do Historian first since that's what has the biggest surface area (981 LOC v1 plus SDK binding) and alarms/host-status have more bespoke integration with the existing MxAccess subscription fan-out.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 01:44:04 -04:00
Joseph Doherty
caa9cb86f6 Phase 2 PR 4 — close the 4 open high/medium MXAccess findings from exit-gate-phase-2-final.md. High 1 (ReadAsync subscription-leak on cancel): the one-shot read now wraps subscribe→first-OnDataChange→unsubscribe in try/finally so the per-tag callback is always detached, and if the read installed the underlying MXAccess subscription itself (the prior _addressToHandle key was absent) it tears it down on the way out — no leaked probe item handles when the caller cancels or times out. High 2 (no reconnect loop): MxAccessClient gets a MxAccessClientOptions {AutoReconnect, MonitorInterval=5s, StaleThreshold=60s} + a background MonitorLoopAsync started at first ConnectAsync. The loop wakes every MonitorInterval, checks _lastObservedActivityUtc (bumped by every OnDataChange callback), and if stale probes the proxy with a no-op COM AddItem("$Heartbeat") on the StaPump; if the probe throws or returns false, the loop reconnects-with-replay — Unregister (best-effort), Register, snapshot _addressToHandle.Keys + clear, re-AddItem every previously-active subscription, ConnectionStateChanged events fire for the false→true transition, ReconnectCount bumps. Medium 3 (subscriptions don't push frames back to Proxy): IGalaxyBackend gains OnDataChange/OnAlarmEvent/OnHostStatusChanged events; new IFrameHandler.AttachConnection(FrameWriter) is called per-connection by PipeServer after Hello + the returned IDisposable disposes at connection close; GalaxyFrameHandler.ConnectionSink subscribes the events for the connection lifetime, fire-and-forget pushes them as MessageKind.OnDataChangeNotification / AlarmEvent / RuntimeStatusChange frames through the writer, swallows ObjectDisposedException for the dispose race, and unsubscribes in Dispose to prevent leaked invocation list refs across reconnects. MxAccessGalaxyBackend's existing SubscribeAsync (which previously discarded values via a (_, __) => {} callback) now wires OnTagValueChanged that fans out per-tag value changes to every subscription ID listening (one MXAccess subscription, multi-fan-out — _refToSubs reverse map). UnsubscribeAsync also reverse-walks the map to only call mx.UnsubscribeAsync when the LAST sub for a tag drops. Stub + DbBacked backends declare the events with #pragma warning disable CS0067 because they never raise them but must satisfy the interface (treat-warnings-as-errors would otherwise fail). Medium 4 (WriteValuesAsync doesn't await OnWriteComplete): MxAccessClient.WriteAsync rewritten to return Task<bool> via the v1-style TaskCompletionSource-keyed-by-item-handle pattern in _pendingWrites — adds the TCS before the Write call, awaits it with a configurable timeout (default 5s), removes the TCS in finally, returns true only when OnWriteComplete reported success. MxAccessGalaxyBackend.WriteValuesAsync now reports per-tag Bad_InternalError ("MXAccess runtime reported write failure") when the bool returns false, instead of false-positive Good. PipeServer's IFrameHandler interface adds the AttachConnection(FrameWriter):IDisposable method + a public NoopAttachment nested class (net48 doesn't support default interface methods so the empty-attach is exposed for stub implementations). StubFrameHandler returns IFrameHandler.NoopAttachment.Instance. RunOneConnectionAsync calls AttachConnection after HelloAck and usings the returned disposable so it disposes at the connection scope's finally. ConnectionStateChanged event added on MxAccessClient (caller-facing diagnostics for false→true reconnect transitions). docs/v2/implementation/pr-4-body.md is the Gitea web-UI paste-in for opening PR 4 once pushed; includes 2 new low-priority adversarial findings (probe item-handle leak; replay-loop silently swallows per-subscription failures) flagged as follow-ups not PR 4 blockers. Full solution 460 pass / 7 skip (E2E on admin shell) / 1 pre-existing Phase 0 baseline. No regressions vs PR 2's baseline.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 01:12:09 -04:00
22 changed files with 1807 additions and 49 deletions

View File

@@ -0,0 +1,91 @@
# PR 4 — Phase 2 follow-up: close the 4 open MXAccess findings
**Source**: `phase-2-pr4-findings` (branched from `phase-2-stream-d`)
**Target**: `v2`
## Summary
Closes the 4 high/medium open findings carried forward in `exit-gate-phase-2-final.md`:
- **High 1 — `ReadAsync` subscription-leak on cancel.** One-shot read now wraps the
subscribe→first-OnDataChange→unsubscribe pattern in a `try/finally` so the per-tag
callback is always detached, and if the read installed the underlying MXAccess
subscription itself (no other caller had it), it tears it down on the way out.
- **High 2 — No reconnect loop on the MXAccess COM connection.** New
`MxAccessClientOptions { AutoReconnect, MonitorInterval, StaleThreshold }` + a background
`MonitorLoopAsync` that watches a stale-activity threshold + probes the proxy via a
no-op COM call, then reconnects-with-replay (re-Register, re-AddItem every active
subscription) when the proxy is dead. Liveness signal: every `OnDataChange` callback bumps
`_lastObservedActivityUtc`. Defaults match v1 monitor cadence (5s poll, 60s stale).
`ReconnectCount` exposed for diagnostics; `ConnectionStateChanged` event for downstream
consumers (the supervisor on the Proxy side already surfaces this through its
HeartbeatMonitor, but the Host-side event lets local logging/metrics hook in).
- **Medium 3 — `MxAccessGalaxyBackend.SubscribeAsync` doesn't push OnDataChange frames back to
the Proxy.** New `IGalaxyBackend.OnDataChange` / `OnAlarmEvent` / `OnHostStatusChanged`
events that the new `GalaxyFrameHandler.AttachConnection` subscribes per-connection and
forwards as outbound `OnDataChangeNotification` / `AlarmEvent` /
`RuntimeStatusChange` frames through the connection's `FrameWriter`. `MxAccessGalaxyBackend`
fans out per-tag value changes to every `SubscriptionId` that's listening to that tag
(multiple Proxy subs may share a Galaxy attribute — single COM subscription, multi-fan-out
on the wire). Stub + DbBacked backends declare the events with `#pragma warning disable
CS0067` (treat-warnings-as-errors would otherwise fail on never-raised events that exist
only to satisfy the interface).
- **Medium 4 — `WriteValuesAsync` doesn't await `OnWriteComplete`.** New
`WriteAsync(...)` overload returns `bool` after awaiting the OnWriteComplete callback via
the v1-style `TaskCompletionSource`-keyed-by-item-handle pattern in `_pendingWrites`.
`MxAccessGalaxyBackend.WriteValuesAsync` now reports per-tag `Bad_InternalError` when the
runtime rejected the write, instead of false-positive `Good`.
## Pipe server change
`IFrameHandler` gains `AttachConnection(FrameWriter writer): IDisposable` so the handler can
register backend event sinks on each accepted connection and detach them at disconnect. The
`PipeServer.RunOneConnectionAsync` calls it after the Hello handshake and disposes it in the
finally of the per-connection scope. `StubFrameHandler` returns `IFrameHandler.NoopAttachment.Instance`
(net48 doesn't support default interface methods, so the empty-attach lives as a public nested
class).
## Tests
**`dotnet test ZB.MOM.WW.OtOpcUa.slnx`**: **460 pass / 7 skip (E2E on admin shell) / 1
pre-existing baseline failure**. No regressions. The Driver.Galaxy.Host unit tests + 5 live
ZB smoke + 3 live MXAccess COM smoke all pass unchanged.
## Test plan for reviewers
- [ ] `dotnet build` clean
- [ ] `dotnet test` shows 460/7-skip/1-baseline
- [ ] Spot-check `MxAccessClient.MonitorLoopAsync` against v1's `MxAccessClient.Monitor`
partial (`src/ZB.MOM.WW.OtOpcUa.Host/MxAccess/MxAccessClient.Monitor.cs`) — same
polling cadence, same probe-then-reconnect-with-replay shape
- [ ] Read `GalaxyFrameHandler.ConnectionSink.Dispose` and confirm event handlers are
detached on connection close (no leaked invocation list refs)
- [ ] `WriteValuesAsync` returning `Bad_InternalError` on a runtime-rejected write is the
correct shape — confirm against the v1 `MxAccessClient.ReadWrite.cs` pattern
## What's NOT in this PR
- Wonderware Historian SDK plugin port (Task B.1.h) — separate PR, larger scope.
- Alarm subsystem wire-up (`MxAccessGalaxyBackend.SubscribeAlarmsAsync` is still a no-op).
`OnAlarmEvent` is declared on the backend interface and pushed by the frame handler when
raised; `MxAccessGalaxyBackend` just doesn't raise it yet (waits for the alarm-tracking
port from v1's `AlarmObjectFilter` + Galaxy alarm primitives).
- Host-status push (`OnHostStatusChanged`) — declared on the interface and pushed by the
frame handler; `MxAccessGalaxyBackend` doesn't raise it (the Galaxy.Host's
`HostConnectivityProbe` from v1 needs porting too, scoped under the Historian PR).
## Adversarial review
Quick pass over the PR 4 deltas. No new findings beyond:
- **Low 1** — `MonitorLoopAsync`'s `$Heartbeat` probe item-handle is leaked
(`AddItem` succeeds, never `RemoveItem`'d). Cosmetic — the probe item is internal to
the COM connection, dies with `Unregister` at disconnect/recycle. Worth a follow-up
to call `RemoveItem` after the probe succeeds.
- **Low 2** — Replay loop in `MonitorLoopAsync` swallows per-subscription failures. If
Galaxy permanently rejects a previously-valid reference (rare but possible after a
re-deploy), the user gets silent data loss for that one subscription. The stub-handler-
unaware operator wouldn't notice. Worth surfacing as a `ConnectionStateChanged(false)
→ ConnectionStateChanged(true)` payload that includes the replay-failures list.
Both are low-priority follow-ups, not PR 4 blockers.

View File

@@ -21,6 +21,13 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
private long _nextSessionId;
private long _nextSubscriptionId;
// DB-only backend doesn't have a runtime data plane; never raises events.
#pragma warning disable CS0067
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{
var id = Interlocked.Increment(ref _nextSessionId);

View File

@@ -0,0 +1,129 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which
/// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands
/// out an ordered list of eligible candidates for the data source to try in sequence.
/// </summary>
internal sealed class HistorianClusterEndpointPicker
{
private readonly Func<DateTime> _clock;
private readonly TimeSpan _cooldown;
private readonly object _lock = new object();
private readonly List<NodeEntry> _nodes;
public HistorianClusterEndpointPicker(HistorianConfiguration config)
: this(config, () => DateTime.UtcNow) { }
internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func<DateTime> clock)
{
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds));
var names = (config.ServerNames != null && config.ServerNames.Count > 0)
? config.ServerNames
: new List<string> { config.ServerName };
_nodes = names
.Where(n => !string.IsNullOrWhiteSpace(n))
.Select(n => n.Trim())
.Distinct(StringComparer.OrdinalIgnoreCase)
.Select(n => new NodeEntry { Name = n })
.ToList();
}
public int NodeCount
{
get { lock (_lock) return _nodes.Count; }
}
public IReadOnlyList<string> GetHealthyNodes()
{
lock (_lock)
{
var now = _clock();
return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList();
}
}
public int HealthyNodeCount
{
get
{
lock (_lock)
{
var now = _clock();
return _nodes.Count(n => IsHealthyAt(n, now));
}
}
}
public void MarkFailed(string node, string? error)
{
lock (_lock)
{
var entry = FindEntry(node);
if (entry == null) return;
var now = _clock();
entry.FailureCount++;
entry.LastError = error;
entry.LastFailureTime = now;
entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null;
}
}
public void MarkHealthy(string node)
{
lock (_lock)
{
var entry = FindEntry(node);
if (entry == null) return;
entry.CooldownUntil = null;
}
}
public List<HistorianClusterNodeState> SnapshotNodeStates()
{
lock (_lock)
{
var now = _clock();
return _nodes.Select(n => new HistorianClusterNodeState
{
Name = n.Name,
IsHealthy = IsHealthyAt(n, now),
CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil,
FailureCount = n.FailureCount,
LastError = n.LastError,
LastFailureTime = n.LastFailureTime
}).ToList();
}
}
private static bool IsHealthyAt(NodeEntry entry, DateTime now)
{
return entry.CooldownUntil == null || entry.CooldownUntil <= now;
}
private NodeEntry? FindEntry(string node)
{
for (var i = 0; i < _nodes.Count; i++)
if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase))
return _nodes[i];
return null;
}
private sealed class NodeEntry
{
public string Name { get; set; } = "";
public DateTime? CooldownUntil { get; set; }
public int FailureCount { get; set; }
public string? LastError { get; set; }
public DateTime? LastFailureTime { get; set; }
}
}
}

View File

@@ -0,0 +1,18 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Point-in-time state of a single historian cluster node. One entry per configured node
/// appears inside <see cref="HistorianHealthSnapshot"/>.
/// </summary>
public sealed class HistorianClusterNodeState
{
public string Name { get; set; } = "";
public bool IsHealthy { get; set; }
public DateTime? CooldownUntil { get; set; }
public int FailureCount { get; set; }
public string? LastError { get; set; }
public DateTime? LastFailureTime { get; set; }
}
}

View File

@@ -0,0 +1,38 @@
using System.Collections.Generic;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Wonderware Historian SDK configuration. Populated from environment variables at Host
/// startup (see <c>Program.cs</c>) or from the Proxy's <c>DriverInstance.DriverConfig</c>
/// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation.
/// </summary>
public sealed class HistorianConfiguration
{
public bool Enabled { get; set; } = false;
/// <summary>Single-node fallback when <see cref="ServerNames"/> is empty.</summary>
public string ServerName { get; set; } = "localhost";
/// <summary>
/// Ordered cluster nodes. When non-empty, the data source tries each in order on connect,
/// falling through to the next on failure. A failed node is placed in cooldown for
/// <see cref="FailureCooldownSeconds"/> before being re-eligible.
/// </summary>
public List<string> ServerNames { get; set; } = new();
public int FailureCooldownSeconds { get; set; } = 60;
public bool IntegratedSecurity { get; set; } = true;
public string? UserName { get; set; }
public string? Password { get; set; }
public int Port { get; set; } = 32568;
public int CommandTimeoutSeconds { get; set; } = 30;
public int MaxValuesPerRead { get; set; } = 10000;
/// <summary>
/// Outer safety timeout applied to sync-over-async Historian operations. Must be
/// comfortably larger than <see cref="CommandTimeoutSeconds"/>.
/// </summary>
public int RequestTimeoutSeconds { get; set; } = 60;
}
}

View File

@@ -0,0 +1,621 @@
using System;
using System.Collections.Generic;
using StringCollection = System.Collections.Specialized.StringCollection;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
/// OPC-UA-free — emits <see cref="HistorianSample"/>/<see cref="HistorianAggregateSample"/>
/// which the Proxy maps to OPC UA <c>DataValue</c> on its side of the IPC.
/// </summary>
public sealed class HistorianDataSource : IHistorianDataSource
{
private static readonly ILogger Log = Serilog.Log.ForContext<HistorianDataSource>();
private readonly HistorianConfiguration _config;
private readonly object _connectionLock = new object();
private readonly object _eventConnectionLock = new object();
private readonly IHistorianConnectionFactory _factory;
private HistorianAccess? _connection;
private HistorianAccess? _eventConnection;
private bool _disposed;
private readonly object _healthLock = new object();
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
private DateTime? _lastSuccessTime;
private DateTime? _lastFailureTime;
private string? _lastError;
private string? _activeProcessNode;
private string? _activeEventNode;
private readonly HistorianClusterEndpointPicker _picker;
public HistorianDataSource(HistorianConfiguration config)
: this(config, new SdkHistorianConnectionFactory(), null) { }
internal HistorianDataSource(
HistorianConfiguration config,
IHistorianConnectionFactory factory,
HistorianClusterEndpointPicker? picker = null)
{
_config = config;
_factory = factory;
_picker = picker ?? new HistorianClusterEndpointPicker(config);
}
private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type)
{
var candidates = _picker.GetHealthyNodes();
if (candidates.Count == 0)
{
var total = _picker.NodeCount;
throw new InvalidOperationException(
total == 0
? "No historian nodes configured"
: $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to");
}
Exception? lastException = null;
foreach (var node in candidates)
{
var attemptConfig = CloneConfigWithServerName(node);
try
{
var conn = _factory.CreateAndConnect(attemptConfig, type);
_picker.MarkHealthy(node);
return (conn, node);
}
catch (Exception ex)
{
_picker.MarkFailed(node, ex.Message);
lastException = ex;
Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node);
}
}
var inner = lastException?.Message ?? "(no detail)";
throw new InvalidOperationException(
$"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}",
lastException);
}
private HistorianConfiguration CloneConfigWithServerName(string serverName)
{
return new HistorianConfiguration
{
Enabled = _config.Enabled,
ServerName = serverName,
ServerNames = _config.ServerNames,
FailureCooldownSeconds = _config.FailureCooldownSeconds,
IntegratedSecurity = _config.IntegratedSecurity,
UserName = _config.UserName,
Password = _config.Password,
Port = _config.Port,
CommandTimeoutSeconds = _config.CommandTimeoutSeconds,
MaxValuesPerRead = _config.MaxValuesPerRead
};
}
public HistorianHealthSnapshot GetHealthSnapshot()
{
var nodeStates = _picker.SnapshotNodeStates();
var healthyCount = 0;
foreach (var n in nodeStates)
if (n.IsHealthy) healthyCount++;
lock (_healthLock)
{
return new HistorianHealthSnapshot
{
TotalQueries = _totalSuccesses + _totalFailures,
TotalSuccesses = _totalSuccesses,
TotalFailures = _totalFailures,
ConsecutiveFailures = _consecutiveFailures,
LastSuccessTime = _lastSuccessTime,
LastFailureTime = _lastFailureTime,
LastError = _lastError,
ProcessConnectionOpen = Volatile.Read(ref _connection) != null,
EventConnectionOpen = Volatile.Read(ref _eventConnection) != null,
ActiveProcessNode = _activeProcessNode,
ActiveEventNode = _activeEventNode,
NodeCount = nodeStates.Count,
HealthyNodeCount = healthyCount,
Nodes = nodeStates
};
}
}
private void RecordSuccess()
{
lock (_healthLock)
{
_totalSuccesses++;
_lastSuccessTime = DateTime.UtcNow;
_consecutiveFailures = 0;
_lastError = null;
}
}
private void RecordFailure(string error)
{
lock (_healthLock)
{
_totalFailures++;
_lastFailureTime = DateTime.UtcNow;
_consecutiveFailures++;
_lastError = error;
}
}
private void EnsureConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
if (Volatile.Read(ref _connection) != null) return;
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process);
lock (_connectionLock)
{
if (_disposed)
{
conn.CloseConnection(out _);
conn.Dispose();
throw new ObjectDisposedException(nameof(HistorianDataSource));
}
if (_connection != null)
{
conn.CloseConnection(out _);
conn.Dispose();
return;
}
_connection = conn;
lock (_healthLock) _activeProcessNode = winningNode;
Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port);
}
}
private void HandleConnectionError(Exception? ex = null)
{
lock (_connectionLock)
{
if (_connection == null) return;
try
{
_connection.CloseConnection(out _);
_connection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
}
_connection = null;
string? failedNode;
lock (_healthLock)
{
failedNode = _activeProcessNode;
_activeProcessNode = null;
}
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)");
}
}
private void EnsureEventConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
if (Volatile.Read(ref _eventConnection) != null) return;
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event);
lock (_eventConnectionLock)
{
if (_disposed)
{
conn.CloseConnection(out _);
conn.Dispose();
throw new ObjectDisposedException(nameof(HistorianDataSource));
}
if (_eventConnection != null)
{
conn.CloseConnection(out _);
conn.Dispose();
return;
}
_eventConnection = conn;
lock (_healthLock) _activeEventNode = winningNode;
Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port);
}
}
private void HandleEventConnectionError(Exception? ex = null)
{
lock (_eventConnectionLock)
{
if (_eventConnection == null) return;
try
{
_eventConnection.CloseConnection(out _);
_eventConnection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
}
_eventConnection = null;
string? failedNode;
lock (_healthLock)
{
failedNode = _activeEventNode;
_activeEventNode = null;
}
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)");
}
}
public Task<List<HistorianSample>> ReadRawAsync(
string tagName, DateTime startTime, DateTime endTime, int maxValues,
CancellationToken ct = default)
{
var results = new List<HistorianSample>();
try
{
EnsureConnected();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
RetrievalMode = HistorianRetrievalMode.Full
};
if (maxValues > 0)
args.BatchSize = (uint)maxValues;
else if (_config.MaxValuesPerRead > 0)
args.BatchSize = (uint)_config.MaxValuesPerRead;
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
RecordFailure($"raw StartQuery: {error.ErrorCode}");
HandleConnectionError();
return Task.FromResult(results);
}
var count = 0;
var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
object? value;
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
value = result.StringValue;
else
value = result.Value;
results.Add(new HistorianSample
{
Value = value,
TimestampUtc = timestamp,
Quality = (byte)(result.OpcQuality & 0xFF),
});
count++;
if (limit > 0 && count >= limit) break;
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
RecordFailure($"raw: {ex.Message}");
HandleConnectionError(ex);
}
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
tagName, results.Count, startTime, endTime);
return Task.FromResult(results);
}
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(
string tagName, DateTime startTime, DateTime endTime,
double intervalMs, string aggregateColumn,
CancellationToken ct = default)
{
var results = new List<HistorianAggregateSample>();
try
{
EnsureConnected();
using var query = _connection!.CreateAnalogSummaryQuery();
var args = new AnalogSummaryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
Resolution = (ulong)intervalMs
};
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
RecordFailure($"aggregate StartQuery: {error.ErrorCode}");
HandleConnectionError();
return Task.FromResult(results);
}
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
var value = ExtractAggregateValue(result, aggregateColumn);
results.Add(new HistorianAggregateSample
{
Value = value,
TimestampUtc = timestamp,
});
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
RecordFailure($"aggregate: {ex.Message}");
HandleConnectionError(ex);
}
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
aggregateColumn, tagName, results.Count);
return Task.FromResult(results);
}
public Task<List<HistorianSample>> ReadAtTimeAsync(
string tagName, DateTime[] timestamps,
CancellationToken ct = default)
{
var results = new List<HistorianSample>();
if (timestamps == null || timestamps.Length == 0)
return Task.FromResult(results);
try
{
EnsureConnected();
foreach (var timestamp in timestamps)
{
ct.ThrowIfCancellationRequested();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = timestamp,
EndDateTime = timestamp,
RetrievalMode = HistorianRetrievalMode.Interpolated,
BatchSize = 1
};
if (!query.StartQuery(args, out var error))
{
results.Add(new HistorianSample
{
Value = null,
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = 0, // Bad
});
continue;
}
if (query.MoveNext(out error))
{
var result = query.QueryResult;
object? value;
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
value = result.StringValue;
else
value = result.Value;
results.Add(new HistorianSample
{
Value = value,
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = (byte)(result.OpcQuality & 0xFF),
});
}
else
{
results.Add(new HistorianSample
{
Value = null,
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = 0,
});
}
query.EndQuery(out _);
}
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
RecordFailure($"at-time: {ex.Message}");
HandleConnectionError(ex);
}
Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
tagName, results.Count, timestamps.Length);
return Task.FromResult(results);
}
public Task<List<HistorianEventDto>> ReadEventsAsync(
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
CancellationToken ct = default)
{
var results = new List<HistorianEventDto>();
try
{
EnsureEventConnected();
using var query = _eventConnection!.CreateEventQuery();
var args = new EventQueryArgs
{
StartDateTime = startTime,
EndDateTime = endTime,
EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
QueryType = HistorianEventQueryType.Events,
EventOrder = HistorianEventOrder.Ascending
};
if (!string.IsNullOrEmpty(sourceName))
{
query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
}
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode);
RecordFailure($"events StartQuery: {error.ErrorCode}");
HandleEventConnectionError();
return Task.FromResult(results);
}
var count = 0;
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
results.Add(ToDto(query.QueryResult));
count++;
if (maxEvents > 0 && count >= maxEvents) break;
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
RecordFailure($"events: {ex.Message}");
HandleEventConnectionError(ex);
}
Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
sourceName ?? "(all)", results.Count, startTime, endTime);
return Task.FromResult(results);
}
private static HistorianEventDto ToDto(HistorianEvent evt)
{
// The ArchestrA SDK marks these properties obsolete but still returns them; their
// successors aren't wired in the version we bind against. Using them is the documented
// v1 behavior — suppressed locally instead of project-wide so any non-event use of
// deprecated SDK surface still surfaces as an error.
#pragma warning disable CS0618
return new HistorianEventDto
{
Id = evt.Id,
Source = evt.Source,
EventTime = evt.EventTime,
ReceivedTime = evt.ReceivedTime,
DisplayText = evt.DisplayText,
Severity = (ushort)evt.Severity
};
#pragma warning restore CS0618
}
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
{
switch (column)
{
case "Average": return result.Average;
case "Minimum": return result.Minimum;
case "Maximum": return result.Maximum;
case "ValueCount": return result.ValueCount;
case "First": return result.First;
case "Last": return result.Last;
case "StdDev": return result.StdDev;
default: return null;
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection?.CloseConnection(out _);
_connection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK connection");
}
try
{
_eventConnection?.CloseConnection(out _);
_eventConnection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK event connection");
}
_connection = null;
_eventConnection = null;
}
}
}

View File

@@ -0,0 +1,18 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// SDK-free representation of a Historian event record. Prevents ArchestrA types from
/// leaking beyond <c>HistorianDataSource</c>.
/// </summary>
public sealed class HistorianEventDto
{
public Guid Id { get; set; }
public string? Source { get; set; }
public DateTime EventTime { get; set; }
public DateTime ReceivedTime { get; set; }
public string? DisplayText { get; set; }
public ushort Severity { get; set; }
}
}

View File

@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard
/// via an IPC health query (not wired in PR #5; deferred).
/// </summary>
public sealed class HistorianHealthSnapshot
{
public long TotalQueries { get; set; }
public long TotalSuccesses { get; set; }
public long TotalFailures { get; set; }
public int ConsecutiveFailures { get; set; }
public DateTime? LastSuccessTime { get; set; }
public DateTime? LastFailureTime { get; set; }
public string? LastError { get; set; }
public bool ProcessConnectionOpen { get; set; }
public bool EventConnectionOpen { get; set; }
public string? ActiveProcessNode { get; set; }
public string? ActiveEventNode { get; set; }
public int NodeCount { get; set; }
public int HealthyNodeCount { get; set; }
public List<HistorianClusterNodeState> Nodes { get; set; } = new();
}
}

View File

@@ -0,0 +1,30 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// OPC-UA-free representation of a single historical data point. The Host returns these
/// across the IPC boundary as <c>GalaxyDataValue</c>; the Proxy maps quality and value to
/// OPC UA <c>DataValue</c>. Raw MX quality byte is preserved so the Proxy can use the same
/// quality mapper it already uses for live reads.
/// </summary>
public sealed class HistorianSample
{
public object? Value { get; set; }
/// <summary>Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality).</summary>
public byte Quality { get; set; }
public DateTime TimestampUtc { get; set; }
}
/// <summary>
/// Result of <see cref="IHistorianDataSource.ReadAggregateAsync"/>. When <see cref="Value"/> is
/// null the aggregate is unavailable for that bucket (Proxy maps to <c>BadNoData</c>).
/// </summary>
public sealed class HistorianAggregateSample
{
public double? Value { get; set; }
public DateTime TimestampUtc { get; set; }
}
}

View File

@@ -0,0 +1,73 @@
using System;
using System.Threading;
using ArchestrA;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that
/// control connection success, failure, and timeout behavior.
/// </summary>
internal interface IHistorianConnectionFactory
{
HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type);
}
/// <summary>Production implementation — opens real Historian SDK connections.</summary>
internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory
{
public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type)
{
var conn = new HistorianAccess();
var args = new HistorianConnectionArgs
{
ServerName = config.ServerName,
TcpPort = (ushort)config.Port,
IntegratedSecurity = config.IntegratedSecurity,
UseArchestrAUser = config.IntegratedSecurity,
ConnectionType = type,
ReadOnly = true,
PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000)
};
if (!config.IntegratedSecurity)
{
args.UserName = config.UserName ?? string.Empty;
args.Password = config.Password ?? string.Empty;
}
if (!conn.OpenConnection(args, out var error))
{
conn.Dispose();
throw new InvalidOperationException(
$"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}");
}
var timeoutMs = config.CommandTimeoutSeconds * 1000;
var elapsed = 0;
while (elapsed < timeoutMs)
{
var status = new HistorianConnectionStatus();
conn.GetConnectionStatus(ref status);
if (status.ConnectedToServer)
return conn;
if (status.ErrorOccurred)
{
conn.Dispose();
throw new InvalidOperationException(
$"Historian SDK connection failed: {status.Error}");
}
Thread.Sleep(250);
elapsed += 250;
}
conn.Dispose();
throw new TimeoutException(
$"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s");
}
}
}

View File

@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host.
/// Implementations read via the aahClient* SDK; the Proxy side maps returned samples
/// to OPC UA <c>DataValue</c>.
/// </summary>
public interface IHistorianDataSource : IDisposable
{
Task<List<HistorianSample>> ReadRawAsync(
string tagName, DateTime startTime, DateTime endTime, int maxValues,
CancellationToken ct = default);
Task<List<HistorianAggregateSample>> ReadAggregateAsync(
string tagName, DateTime startTime, DateTime endTime,
double intervalMs, string aggregateColumn,
CancellationToken ct = default);
Task<List<HistorianSample>> ReadAtTimeAsync(
string tagName, DateTime[] timestamps,
CancellationToken ct = default);
Task<List<HistorianEventDto>> ReadEventsAsync(
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
CancellationToken ct = default);
HistorianHealthSnapshot GetHealthSnapshot();
}
}

View File

@@ -14,6 +14,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// </summary>
public interface IGalaxyBackend
{
/// <summary>
/// Server-pushed events the backend raises asynchronously (data-change, alarm,
/// host-status). The frame handler subscribes once on connect and forwards each
/// event to the Proxy as a typed <see cref="MessageKind"/> notification.
/// </summary>
event System.EventHandler<OnDataChangeNotification>? OnDataChange;
event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct);
Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct);

View File

@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
@@ -20,6 +21,7 @@ public sealed class MxAccessClient : IDisposable
private readonly StaPump _pump;
private readonly IMxProxy _proxy;
private readonly string _clientName;
private readonly MxAccessClientOptions _options;
// Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read).
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
@@ -30,39 +32,148 @@ public sealed class MxAccessClient : IDisposable
private int _connectionHandle;
private bool _connected;
private DateTime _lastObservedActivityUtc = DateTime.UtcNow;
private CancellationTokenSource? _monitorCts;
private int _reconnectCount;
private bool _disposed;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName)
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
public event EventHandler<bool>? ConnectionStateChanged;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
{
_pump = pump;
_proxy = proxy;
_clientName = clientName;
_options = options ?? new MxAccessClientOptions();
_proxy.OnDataChange += OnDataChange;
_proxy.OnWriteComplete += OnWriteComplete;
}
public bool IsConnected => _connected;
public int SubscriptionCount => _subscriptions.Count;
public int ReconnectCount => _reconnectCount;
/// <summary>Connects on the STA thread. Idempotent.</summary>
public Task<int> ConnectAsync() => _pump.InvokeAsync(() =>
/// <summary>Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call.</summary>
public async Task<int> ConnectAsync()
{
if (_connected) return _connectionHandle;
_connectionHandle = _proxy.Register(_clientName);
_connected = true;
return _connectionHandle;
});
public Task DisconnectAsync() => _pump.InvokeAsync(() =>
{
if (!_connected) return;
try { _proxy.Unregister(_connectionHandle); }
finally
var handle = await _pump.InvokeAsync(() =>
{
_connected = false;
_addressToHandle.Clear();
_handleToAddress.Clear();
if (_connected) return _connectionHandle;
_connectionHandle = _proxy.Register(_clientName);
_connected = true;
return _connectionHandle;
});
ConnectionStateChanged?.Invoke(this, true);
if (_options.AutoReconnect && _monitorCts is null)
{
_monitorCts = new CancellationTokenSource();
_ = Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
}
});
return handle;
}
public async Task DisconnectAsync()
{
_monitorCts?.Cancel();
_monitorCts = null;
await _pump.InvokeAsync(() =>
{
if (!_connected) return;
try { _proxy.Unregister(_connectionHandle); }
finally
{
_connected = false;
_addressToHandle.Clear();
_handleToAddress.Clear();
}
});
ConnectionStateChanged?.Invoke(this, false);
}
/// <summary>
/// Background loop that watches for connection liveness signals and triggers
/// reconnect-with-replay when the connection appears dead. Per Phase 2 high finding #2:
/// v1's MxAccessClient.Monitor pattern lifted into the new pump-based client. Uses
/// observed-activity timestamp + optional probe-tag subscription. Without an explicit
/// probe tag, falls back to "no data change in N seconds + no successful read in N
/// seconds = unhealthy" — same shape as v1.
/// </summary>
private async Task MonitorLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try { await Task.Delay(_options.MonitorInterval, ct); }
catch (OperationCanceledException) { break; }
if (!_connected || _disposed) continue;
var idle = DateTime.UtcNow - _lastObservedActivityUtc;
if (idle <= _options.StaleThreshold) continue;
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
// our reconnect signal.
bool probeOk;
try
{
probeOk = await _pump.InvokeAsync(() =>
{
// AddItem on the connection handle is cheap and round-trips through COM.
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
catch { return false; }
});
}
catch { probeOk = false; }
if (probeOk)
{
_lastObservedActivityUtc = DateTime.UtcNow;
continue;
}
// Connection appears dead — reconnect-with-replay.
try
{
await _pump.InvokeAsync(() =>
{
try { _proxy.Unregister(_connectionHandle); } catch { /* dead anyway */ }
_connected = false;
});
ConnectionStateChanged?.Invoke(this, false);
await _pump.InvokeAsync(() =>
{
_connectionHandle = _proxy.Register(_clientName);
_connected = true;
});
_reconnectCount++;
ConnectionStateChanged?.Invoke(this, true);
// Replay every subscription that was active before the disconnect.
var snapshot = _addressToHandle.Keys.ToArray();
_addressToHandle.Clear();
_handleToAddress.Clear();
foreach (var fullRef in snapshot)
{
try { await SubscribeOnPumpAsync(fullRef); }
catch { /* skip — operator can re-subscribe */ }
}
_lastObservedActivityUtc = DateTime.UtcNow;
}
catch
{
// Reconnect failed; back off and retry on the next tick.
_connected = false;
}
}
}
/// <summary>
/// One-shot read implemented as a transient subscribe + unsubscribe.
@@ -79,26 +190,72 @@ public sealed class MxAccessClient : IDisposable
// Stash the one-shot handler before sending the subscribe, then remove it after firing.
_subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot));
var addedToReadOnlyAttribute = !_addressToHandle.ContainsKey(fullReference);
var itemHandle = await SubscribeOnPumpAsync(fullReference);
try
{
await SubscribeOnPumpAsync(fullReference);
using var _ = ct.Register(() => tcs.TrySetCanceled());
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
using var _ = ct.Register(() => tcs.TrySetCanceled());
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
// Detach the one-shot handler.
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
return await tcs.Task;
return await tcs.Task;
}
finally
{
// High 1 — always detach the one-shot handler, even on cancellation/timeout/throw.
// If we were the one who added the underlying MXAccess subscription (no other
// caller had it), tear it down too so we don't leak a probe item handle.
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
if (addedToReadOnlyAttribute)
{
try { await UnsubscribeAsync(fullReference); }
catch { /* shutdown-best-effort */ }
}
}
}
public Task WriteAsync(string fullReference, object value, int securityClassification = 0) =>
_pump.InvokeAsync(() =>
/// <summary>
/// Writes <paramref name="value"/> to the runtime and AWAITS the OnWriteComplete
/// callback so the caller learns the actual write status. Per Phase 2 medium finding #4
/// in <c>exit-gate-phase-2.md</c>: the previous fire-and-forget version returned a
/// false-positive Good even when the runtime rejected the write post-callback.
/// </summary>
public async Task<bool> WriteAsync(string fullReference, object value,
int securityClassification = 0, TimeSpan? timeout = null)
{
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
var actualTimeout = timeout ?? TimeSpan.FromSeconds(5);
var itemHandle = await _pump.InvokeAsync(() => ResolveItem(fullReference));
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_pendingWrites.TryAdd(itemHandle, tcs))
{
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
var itemHandle = ResolveItem(fullReference);
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification);
});
// A prior write to the same item handle is still pending — uncommon but possible
// if the caller spammed writes. Replace it: the older TCS observes a Cancelled task.
if (_pendingWrites.TryRemove(itemHandle, out var prior))
prior.TrySetCanceled();
_pendingWrites[itemHandle] = tcs;
}
try
{
await _pump.InvokeAsync(() =>
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification));
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(actualTimeout));
if (raceTask != tcs.Task)
throw new TimeoutException($"MXAccess write of {fullReference} timed out after {actualTimeout}");
return await tcs.Task;
}
finally
{
_pendingWrites.TryRemove(itemHandle, out _);
}
}
public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback)
{
@@ -148,6 +305,9 @@ public sealed class MxAccessClient : IDisposable
{
if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return;
// Liveness: any data-change event is proof the connection is alive.
_lastObservedActivityUtc = DateTime.UtcNow;
var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow;
var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality));
var vtq = new Vtq(pvItemValue, ts, quality);
@@ -169,10 +329,30 @@ public sealed class MxAccessClient : IDisposable
public void Dispose()
{
_disposed = true;
_monitorCts?.Cancel();
try { DisconnectAsync().GetAwaiter().GetResult(); }
catch { /* swallow */ }
_proxy.OnDataChange -= OnDataChange;
_proxy.OnWriteComplete -= OnWriteComplete;
_monitorCts?.Dispose();
}
}
/// <summary>
/// Tunables for <see cref="MxAccessClient"/>'s reconnect monitor. Defaults match the v1
/// monitor's polling cadence so behavior is consistent across the lift.
/// </summary>
public sealed class MxAccessClientOptions
{
/// <summary>Whether to start the background monitor at connect time.</summary>
public bool AutoReconnect { get; init; } = true;
/// <summary>How often the monitor wakes up to check liveness.</summary>
public TimeSpan MonitorInterval { get; init; } = TimeSpan.FromSeconds(5);
/// <summary>If no data-change activity in this window, the monitor probes the connection.</summary>
public TimeSpan StaleThreshold { get; init; } = TimeSpan.FromSeconds(60);
}

View File

@@ -5,6 +5,7 @@ using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
@@ -18,20 +19,31 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// MxAccess <c>AlarmExtension</c> primitives but the wire-up is also Phase 2 follow-up
/// (the v1 alarm subsystem is its own subtree).
/// </summary>
public sealed class MxAccessGalaxyBackend : IGalaxyBackend
public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
{
private readonly GalaxyRepository _repository;
private readonly MxAccessClient _mx;
private readonly IHistorianDataSource? _historian;
private long _nextSessionId;
private long _nextSubscriptionId;
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
// Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many).
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, System.Collections.Concurrent.ConcurrentBag<long>>
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
#pragma warning disable CS0067 // event not yet raised — alarm + host-status wire-up in PR #4 follow-up
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
{
_repository = repository;
_mx = mx;
_historian = historian;
}
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
@@ -120,8 +132,13 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
? null
: MessagePackSerializer.Deserialize<object>(w.ValueBytes);
await _mx.WriteAsync(w.TagReference, value!);
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0 });
var ok = await _mx.WriteAsync(w.TagReference, value!);
results.Add(new WriteValueResult
{
TagReference = w.TagReference,
StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError
Error = ok ? null : "MXAccess runtime reported write failure",
});
}
catch (Exception ex)
{
@@ -137,12 +154,16 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
try
{
// For each requested tag, register a subscription that publishes back via the
// shared MXAccess data-change handler. The OnDataChange push frame to the Proxy
// is wired in the upcoming subscription-push pass; for now the value is captured
// for the first ReadAsync to hit it (so the subscribe surface itself is functional).
foreach (var tag in req.TagReferences)
await _mx.SubscribeAsync(tag, (_, __) => { /* push-frame plumbing in next iteration */ });
{
_refToSubs.AddOrUpdate(tag,
_ => new System.Collections.Concurrent.ConcurrentBag<long> { sid },
(_, bag) => { bag.Add(sid); return bag; });
// The MXAccess SubscribeAsync only takes one callback per tag; the same callback
// fires for every active subscription of that tag — we fan out by SubscriptionId.
await _mx.SubscribeAsync(tag, OnTagValueChanged);
}
_subs[sid] = req.TagReferences;
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
@@ -157,23 +178,97 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
{
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
foreach (var r in refs)
await _mx.UnsubscribeAsync(r);
{
// Drop this subscription from the reverse map; only unsubscribe from MXAccess if no
// other subscription is still listening (multiple Proxy subs may share a tag).
_refToSubs.TryGetValue(r, out var bag);
if (bag is not null)
{
var remaining = new System.Collections.Concurrent.ConcurrentBag<long>(
bag.Where(id => id != req.SubscriptionId));
if (remaining.IsEmpty)
{
_refToSubs.TryRemove(r, out _);
await _mx.UnsubscribeAsync(r);
}
else
{
_refToSubs[r] = remaining;
}
}
}
}
/// <summary>
/// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in
/// a <see cref="GalaxyDataValue"/> and raises <see cref="OnDataChange"/> once per
/// subscription that includes this tag — the IPC sink translates that into outbound
/// <c>OnDataChangeNotification</c> frames.
/// </summary>
private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq)
{
if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return;
var wireValue = ToWire(fullReference, vtq);
// Emit one notification per active SubscriptionId for this tag — the Proxy fans out to
// each ISubscribable consumer based on the SubscriptionId in the payload.
foreach (var sid in bag.Distinct())
{
OnDataChange?.Invoke(this, new OnDataChangeNotification
{
SubscriptionId = sid,
Values = new[] { wireValue },
});
}
}
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
public Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadResponse
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Tags = Array.Empty<HistoryTagValues>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
var tags = new List<HistoryTagValues>(req.TagReferences.Length);
try
{
Success = false,
Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)",
Tags = Array.Empty<HistoryTagValues>(),
});
foreach (var reference in req.TagReferences)
{
var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false);
tags.Add(new HistoryTagValues
{
TagReference = reference,
Values = samples.Select(s => ToWire(reference, s)).ToArray(),
});
}
return new HistoryReadResponse { Success = true, Tags = tags.ToArray() };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadResponse
{
Success = false,
Error = $"Historian read failed: {ex.Message}",
Tags = tags.ToArray(),
};
}
}
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
public void Dispose() => _historian?.Dispose();
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
{
TagReference = reference,
@@ -184,6 +279,32 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
/// <summary>
/// Maps a <see cref="HistorianSample"/> (raw historian row, OPC-UA-free) to the IPC wire
/// shape. The Proxy decodes the MessagePack value and maps <see cref="HistorianSample.Quality"/>
/// through <c>QualityMapper</c> on its side of the pipe — we keep the raw byte here so
/// rich OPC DA status codes (e.g. <c>BadNotConnected</c>, <c>UncertainSubNormal</c>) survive
/// the hop intact.
/// </summary>
private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new()
{
TagReference = reference,
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
ValueMessagePackType = 0,
StatusCode = MapHistorianQualityToOpcUa(sample.Quality),
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static uint MapHistorianQualityToOpcUa(byte q)
{
// Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges.
// The Proxy may refine this when it decodes the wire frame.
if (q >= 192) return 0x00000000u; // Good
if (q >= 64) return 0x40000000u; // Uncertain
return 0x80000000u; // Bad
}
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
{
AttributeName = row.AttributeName,

View File

@@ -15,6 +15,13 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
private long _nextSessionId;
private long _nextSubscriptionId;
// Stub backend never raises events — implements the interface members for symmetry.
#pragma warning disable CS0067
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{
var id = Interlocked.Increment(ref _nextSessionId);

View File

@@ -99,9 +99,64 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
}
}
/// <summary>
/// Subscribes the backend's server-pushed events for the lifetime of the connection.
/// The returned disposable unsubscribes when the connection closes — without it the
/// backend's static event invocation list would accumulate dead writer references and
/// leak memory + raise <see cref="ObjectDisposedException"/> on every push.
/// </summary>
public IDisposable AttachConnection(FrameWriter writer)
{
var sink = new ConnectionSink(backend, writer, logger);
sink.Attach();
return sink;
}
private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct)
=> writer.WriteAsync(MessageKind.ErrorResponse,
new ErrorResponse { Code = code, Message = message }, ct);
private sealed class ConnectionSink : IDisposable
{
private readonly IGalaxyBackend _backend;
private readonly FrameWriter _writer;
private readonly ILogger _logger;
private EventHandler<OnDataChangeNotification>? _onData;
private EventHandler<GalaxyAlarmEvent>? _onAlarm;
private EventHandler<HostConnectivityStatus>? _onHost;
public ConnectionSink(IGalaxyBackend backend, FrameWriter writer, ILogger logger)
{
_backend = backend; _writer = writer; _logger = logger;
}
public void Attach()
{
_onData = (_, e) => Push(MessageKind.OnDataChangeNotification, e);
_onAlarm = (_, e) => Push(MessageKind.AlarmEvent, e);
_onHost = (_, e) => Push(MessageKind.RuntimeStatusChange,
new RuntimeStatusChangeNotification { Status = e });
_backend.OnDataChange += _onData;
_backend.OnAlarmEvent += _onAlarm;
_backend.OnHostStatusChanged += _onHost;
}
private void Push<T>(MessageKind kind, T payload)
{
// Fire-and-forget — pushes can race with disposal of the writer. We swallow
// ObjectDisposedException because the dispose path will detach this sink shortly.
try { _writer.WriteAsync(kind, payload, CancellationToken.None).GetAwaiter().GetResult(); }
catch (ObjectDisposedException) { }
catch (Exception ex) { _logger.Warning(ex, "ConnectionSink push failed for {Kind}", kind); }
}
public void Dispose()
{
if (_onData is not null) _backend.OnDataChange -= _onData;
if (_onAlarm is not null) _backend.OnAlarmEvent -= _onAlarm;
if (_onHost is not null) _backend.OnHostStatusChanged -= _onHost;
}
}
}

View File

@@ -98,6 +98,8 @@ public sealed class PipeServer : IDisposable
new HelloAck { Accepted = true, HostName = Environment.MachineName },
linked.Token).ConfigureAwait(false);
using var attachment = handler.AttachConnection(writer);
while (!linked.Token.IsCancellationRequested)
{
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
@@ -157,4 +159,19 @@ public sealed class PipeServer : IDisposable
public interface IFrameHandler
{
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
/// <summary>
/// Called once per accepted connection after the Hello handshake. Lets the handler
/// attach server-pushed event sinks (data-change, alarm, host-status) to the
/// connection's <paramref name="writer"/>. Returns an <see cref="IDisposable"/> the
/// pipe server disposes when the connection closes — backends use it to unsubscribe.
/// Implementations that don't push events can return <see cref="NoopAttachment"/>.
/// </summary>
IDisposable AttachConnection(FrameWriter writer);
public sealed class NoopAttachment : IDisposable
{
public static readonly NoopAttachment Instance = new();
public void Dispose() { }
}
}

View File

@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
@@ -27,4 +28,6 @@ public sealed class StubFrameHandler : IFrameHandler
new ErrorResponse { Code = "not-implemented", Message = $"Kind {kind} is stubbed — MXAccess lift deferred" },
ct);
}
public IDisposable AttachConnection(FrameWriter writer) => IFrameHandler.NoopAttachment.Instance;
}

View File

@@ -4,6 +4,7 @@ using System.Threading;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
@@ -66,9 +67,11 @@ public static class Program
pump = new StaPump("Galaxy.Sta");
pump.WaitForStartedAsync().GetAwaiter().GetResult();
mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName);
var historian = BuildHistorianIfEnabled();
backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }),
mx);
mx,
historian);
break;
}
@@ -77,6 +80,7 @@ public static class Program
try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); }
finally
{
(backend as IDisposable)?.Dispose();
mx?.Dispose();
pump?.Dispose();
}
@@ -91,4 +95,45 @@ public static class Program
}
finally { Log.CloseAndFlush(); }
}
/// <summary>
/// Builds a <see cref="HistorianDataSource"/> from the OTOPCUA_HISTORIAN_* environment
/// variables the supervisor passes at spawn time. Returns null when the historian is
/// disabled (default) so <c>MxAccessGalaxyBackend.HistoryReadAsync</c> returns a clear
/// "not configured" error instead of attempting an SDK connection to localhost.
/// </summary>
private static IHistorianDataSource? BuildHistorianIfEnabled()
{
var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED");
if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1")
return null;
var cfg = new HistorianConfiguration
{
Enabled = true,
ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase),
UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"),
Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"),
CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30),
MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000),
FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60),
};
var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS");
if (!string.IsNullOrWhiteSpace(servers))
cfg.ServerNames = new System.Collections.Generic.List<string>(
servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}",
cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port);
return new HistorianDataSource(cfg);
}
private static int TryParseInt(string envName, int defaultValue)
{
var raw = Environment.GetEnvironmentVariable(envName);
return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
}
}

View File

@@ -30,11 +30,43 @@
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests"/>
</ItemGroup>
<ItemGroup>
<Reference Include="ArchestrA.MxAccess">
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
<Private>true</Private>
</Reference>
<!-- Wonderware Historian SDK — consumed by Backend/Historian/ for HistoryReadAsync.
Previously lived in the v1 Historian.Aveva plugin; folded into Driver.Galaxy.Host
for PR #5 because this host is already Galaxy-specific. -->
<Reference Include="aahClientManaged">
<HintPath>..\..\lib\aahClientManaged.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
<Reference Include="aahClientCommon">
<HintPath>..\..\lib\aahClientCommon.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
</ItemGroup>
<ItemGroup>
<!-- Historian SDK native and satellite DLLs — staged beside the host exe so the
aahClientManaged wrapper can P/Invoke into them without an AssemblyResolve hook. -->
<None Include="..\..\lib\aahClient.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\Historian.CBE.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\Historian.DPAPI.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\ArchestrA.CloudHistorian.Contract.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>

View File

@@ -0,0 +1,94 @@
using System;
using System.Linq;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
{
[Trait("Category", "Unit")]
public sealed class HistorianClusterEndpointPickerTests
{
private static HistorianConfiguration Config(params string[] nodes) => new()
{
ServerName = "ignored",
ServerNames = nodes.ToList(),
FailureCooldownSeconds = 60,
};
[Fact]
public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty()
{
var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() };
var p = new HistorianClusterEndpointPicker(cfg);
p.NodeCount.ShouldBe(1);
p.GetHealthyNodes().ShouldBe(new[] { "only-node" });
}
[Fact]
public void Failed_node_enters_cooldown_and_is_skipped()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
p.MarkFailed("a", "boom");
p.GetHealthyNodes().ShouldBe(new[] { "b" });
}
[Fact]
public void Cooldown_expires_after_configured_window()
{
var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock);
p.MarkFailed("a", "boom");
p.GetHealthyNodes().ShouldBe(new[] { "b" });
clock = clock.AddSeconds(61);
p.GetHealthyNodes().ShouldBe(new[] { "a", "b" });
}
[Fact]
public void MarkHealthy_immediately_clears_cooldown()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
p.MarkFailed("a", "boom");
p.GetHealthyNodes().ShouldBeEmpty();
p.MarkHealthy("a");
p.GetHealthyNodes().ShouldBe(new[] { "a" });
}
[Fact]
public void All_nodes_in_cooldown_returns_empty_healthy_list()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
p.MarkFailed("a", "x");
p.MarkFailed("b", "y");
p.GetHealthyNodes().ShouldBeEmpty();
p.NodeCount.ShouldBe(2);
}
[Fact]
public void Snapshot_reports_failure_count_and_last_error()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
p.MarkFailed("a", "first");
p.MarkFailed("a", "second");
var snap = p.SnapshotNodeStates().Single();
snap.FailureCount.ShouldBe(2);
snap.LastError.ShouldBe("second");
snap.IsHealthy.ShouldBeFalse();
snap.CooldownUntil.ShouldNotBeNull();
}
[Fact]
public void Duplicate_hostnames_are_deduplicated_case_insensitively()
{
var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB"));
p.NodeCount.ShouldBe(2);
}
}
}

View File

@@ -0,0 +1,109 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
{
[Trait("Category", "Unit")]
public sealed class HistorianWiringTests
{
/// <summary>
/// When the Proxy sends a HistoryRead but the supervisor never enabled the historian
/// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a
/// self-explanatory error — not an exception or a hang against localhost.
/// </summary>
[Fact]
public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
historian: null);
var resp = await backend.HistoryReadAsync(new HistoryReadRequest
{
TagReferences = new[] { "TestTag" },
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
MaxValuesPerTag = 100,
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
resp.Tags.ShouldBeEmpty();
}
/// <summary>
/// When the historian is wired up, we expect the backend to call through and map
/// samples onto the IPC wire shape. Uses a fake <see cref="IHistorianDataSource"/>
/// that returns a single known-good sample so we can assert the mapping stays sane.
/// </summary>
[Fact]
public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
var fake = new FakeHistorianDataSource(new HistorianSample
{
Value = 42.5,
Quality = 192, // Good
TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc),
});
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
fake);
var resp = await backend.HistoryReadAsync(new HistoryReadRequest
{
TagReferences = new[] { "TankLevel" },
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
MaxValuesPerTag = 100,
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Tags.Length.ShouldBe(1);
resp.Tags[0].TagReference.ShouldBe("TankLevel");
resp.Tags[0].Values.Length.ShouldBe(1);
resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good
resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull();
resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe(
new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds());
}
private sealed class FakeHistorianDataSource : IHistorianDataSource
{
private readonly HistorianSample _sample;
public FakeHistorianDataSource(HistorianSample sample) => _sample = sample;
public Task<List<HistorianSample>> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample> { _sample });
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
=> Task.FromResult(new List<HistorianAggregateSample>());
public Task<List<HistorianSample>> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianEventDto>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}
}