diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs
new file mode 100644
index 0000000..312207b
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs
@@ -0,0 +1,129 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which
+ /// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands
+ /// out an ordered list of eligible candidates for the data source to try in sequence.
+ ///
+ internal sealed class HistorianClusterEndpointPicker
+ {
+ private readonly Func _clock;
+ private readonly TimeSpan _cooldown;
+ private readonly object _lock = new object();
+ private readonly List _nodes;
+
+ public HistorianClusterEndpointPicker(HistorianConfiguration config)
+ : this(config, () => DateTime.UtcNow) { }
+
+ internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func clock)
+ {
+ _clock = clock ?? throw new ArgumentNullException(nameof(clock));
+ _cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds));
+
+ var names = (config.ServerNames != null && config.ServerNames.Count > 0)
+ ? config.ServerNames
+ : new List { config.ServerName };
+
+ _nodes = names
+ .Where(n => !string.IsNullOrWhiteSpace(n))
+ .Select(n => n.Trim())
+ .Distinct(StringComparer.OrdinalIgnoreCase)
+ .Select(n => new NodeEntry { Name = n })
+ .ToList();
+ }
+
+ public int NodeCount
+ {
+ get { lock (_lock) return _nodes.Count; }
+ }
+
+ public IReadOnlyList GetHealthyNodes()
+ {
+ lock (_lock)
+ {
+ var now = _clock();
+ return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList();
+ }
+ }
+
+ public int HealthyNodeCount
+ {
+ get
+ {
+ lock (_lock)
+ {
+ var now = _clock();
+ return _nodes.Count(n => IsHealthyAt(n, now));
+ }
+ }
+ }
+
+ public void MarkFailed(string node, string? error)
+ {
+ lock (_lock)
+ {
+ var entry = FindEntry(node);
+ if (entry == null) return;
+
+ var now = _clock();
+ entry.FailureCount++;
+ entry.LastError = error;
+ entry.LastFailureTime = now;
+ entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null;
+ }
+ }
+
+ public void MarkHealthy(string node)
+ {
+ lock (_lock)
+ {
+ var entry = FindEntry(node);
+ if (entry == null) return;
+ entry.CooldownUntil = null;
+ }
+ }
+
+ public List SnapshotNodeStates()
+ {
+ lock (_lock)
+ {
+ var now = _clock();
+ return _nodes.Select(n => new HistorianClusterNodeState
+ {
+ Name = n.Name,
+ IsHealthy = IsHealthyAt(n, now),
+ CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil,
+ FailureCount = n.FailureCount,
+ LastError = n.LastError,
+ LastFailureTime = n.LastFailureTime
+ }).ToList();
+ }
+ }
+
+ private static bool IsHealthyAt(NodeEntry entry, DateTime now)
+ {
+ return entry.CooldownUntil == null || entry.CooldownUntil <= now;
+ }
+
+ private NodeEntry? FindEntry(string node)
+ {
+ for (var i = 0; i < _nodes.Count; i++)
+ if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase))
+ return _nodes[i];
+ return null;
+ }
+
+ private sealed class NodeEntry
+ {
+ public string Name { get; set; } = "";
+ public DateTime? CooldownUntil { get; set; }
+ public int FailureCount { get; set; }
+ public string? LastError { get; set; }
+ public DateTime? LastFailureTime { get; set; }
+ }
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs
new file mode 100644
index 0000000..5b78243
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs
@@ -0,0 +1,18 @@
+using System;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// Point-in-time state of a single historian cluster node. One entry per configured node
+ /// appears inside .
+ ///
+ public sealed class HistorianClusterNodeState
+ {
+ public string Name { get; set; } = "";
+ public bool IsHealthy { get; set; }
+ public DateTime? CooldownUntil { get; set; }
+ public int FailureCount { get; set; }
+ public string? LastError { get; set; }
+ public DateTime? LastFailureTime { get; set; }
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs
new file mode 100644
index 0000000..8d3ac17
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs
@@ -0,0 +1,38 @@
+using System.Collections.Generic;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// Wonderware Historian SDK configuration. Populated from environment variables at Host
+ /// startup (see Program.cs) or from the Proxy's DriverInstance.DriverConfig
+ /// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation.
+ ///
+ public sealed class HistorianConfiguration
+ {
+ public bool Enabled { get; set; } = false;
+
+ /// Single-node fallback when is empty.
+ public string ServerName { get; set; } = "localhost";
+
+ ///
+ /// Ordered cluster nodes. When non-empty, the data source tries each in order on connect,
+ /// falling through to the next on failure. A failed node is placed in cooldown for
+ /// before being re-eligible.
+ ///
+ public List ServerNames { get; set; } = new();
+
+ public int FailureCooldownSeconds { get; set; } = 60;
+ public bool IntegratedSecurity { get; set; } = true;
+ public string? UserName { get; set; }
+ public string? Password { get; set; }
+ public int Port { get; set; } = 32568;
+ public int CommandTimeoutSeconds { get; set; } = 30;
+ public int MaxValuesPerRead { get; set; } = 10000;
+
+ ///
+ /// Outer safety timeout applied to sync-over-async Historian operations. Must be
+ /// comfortably larger than .
+ ///
+ public int RequestTimeoutSeconds { get; set; } = 60;
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs
new file mode 100644
index 0000000..d132aed
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs
@@ -0,0 +1,621 @@
+using System;
+using System.Collections.Generic;
+using StringCollection = System.Collections.Specialized.StringCollection;
+using System.Threading;
+using System.Threading.Tasks;
+using ArchestrA;
+using Serilog;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
+ /// OPC-UA-free — emits /
+ /// which the Proxy maps to OPC UA DataValue on its side of the IPC.
+ ///
+ public sealed class HistorianDataSource : IHistorianDataSource
+ {
+ private static readonly ILogger Log = Serilog.Log.ForContext();
+
+ private readonly HistorianConfiguration _config;
+ private readonly object _connectionLock = new object();
+ private readonly object _eventConnectionLock = new object();
+ private readonly IHistorianConnectionFactory _factory;
+ private HistorianAccess? _connection;
+ private HistorianAccess? _eventConnection;
+ private bool _disposed;
+
+ private readonly object _healthLock = new object();
+ private long _totalSuccesses;
+ private long _totalFailures;
+ private int _consecutiveFailures;
+ private DateTime? _lastSuccessTime;
+ private DateTime? _lastFailureTime;
+ private string? _lastError;
+ private string? _activeProcessNode;
+ private string? _activeEventNode;
+
+ private readonly HistorianClusterEndpointPicker _picker;
+
+ public HistorianDataSource(HistorianConfiguration config)
+ : this(config, new SdkHistorianConnectionFactory(), null) { }
+
+ internal HistorianDataSource(
+ HistorianConfiguration config,
+ IHistorianConnectionFactory factory,
+ HistorianClusterEndpointPicker? picker = null)
+ {
+ _config = config;
+ _factory = factory;
+ _picker = picker ?? new HistorianClusterEndpointPicker(config);
+ }
+
+ private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type)
+ {
+ var candidates = _picker.GetHealthyNodes();
+ if (candidates.Count == 0)
+ {
+ var total = _picker.NodeCount;
+ throw new InvalidOperationException(
+ total == 0
+ ? "No historian nodes configured"
+ : $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to");
+ }
+
+ Exception? lastException = null;
+ foreach (var node in candidates)
+ {
+ var attemptConfig = CloneConfigWithServerName(node);
+ try
+ {
+ var conn = _factory.CreateAndConnect(attemptConfig, type);
+ _picker.MarkHealthy(node);
+ return (conn, node);
+ }
+ catch (Exception ex)
+ {
+ _picker.MarkFailed(node, ex.Message);
+ lastException = ex;
+ Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node);
+ }
+ }
+
+ var inner = lastException?.Message ?? "(no detail)";
+ throw new InvalidOperationException(
+ $"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}",
+ lastException);
+ }
+
+ private HistorianConfiguration CloneConfigWithServerName(string serverName)
+ {
+ return new HistorianConfiguration
+ {
+ Enabled = _config.Enabled,
+ ServerName = serverName,
+ ServerNames = _config.ServerNames,
+ FailureCooldownSeconds = _config.FailureCooldownSeconds,
+ IntegratedSecurity = _config.IntegratedSecurity,
+ UserName = _config.UserName,
+ Password = _config.Password,
+ Port = _config.Port,
+ CommandTimeoutSeconds = _config.CommandTimeoutSeconds,
+ MaxValuesPerRead = _config.MaxValuesPerRead
+ };
+ }
+
+ public HistorianHealthSnapshot GetHealthSnapshot()
+ {
+ var nodeStates = _picker.SnapshotNodeStates();
+ var healthyCount = 0;
+ foreach (var n in nodeStates)
+ if (n.IsHealthy) healthyCount++;
+
+ lock (_healthLock)
+ {
+ return new HistorianHealthSnapshot
+ {
+ TotalQueries = _totalSuccesses + _totalFailures,
+ TotalSuccesses = _totalSuccesses,
+ TotalFailures = _totalFailures,
+ ConsecutiveFailures = _consecutiveFailures,
+ LastSuccessTime = _lastSuccessTime,
+ LastFailureTime = _lastFailureTime,
+ LastError = _lastError,
+ ProcessConnectionOpen = Volatile.Read(ref _connection) != null,
+ EventConnectionOpen = Volatile.Read(ref _eventConnection) != null,
+ ActiveProcessNode = _activeProcessNode,
+ ActiveEventNode = _activeEventNode,
+ NodeCount = nodeStates.Count,
+ HealthyNodeCount = healthyCount,
+ Nodes = nodeStates
+ };
+ }
+ }
+
+ private void RecordSuccess()
+ {
+ lock (_healthLock)
+ {
+ _totalSuccesses++;
+ _lastSuccessTime = DateTime.UtcNow;
+ _consecutiveFailures = 0;
+ _lastError = null;
+ }
+ }
+
+ private void RecordFailure(string error)
+ {
+ lock (_healthLock)
+ {
+ _totalFailures++;
+ _lastFailureTime = DateTime.UtcNow;
+ _consecutiveFailures++;
+ _lastError = error;
+ }
+ }
+
+ private void EnsureConnected()
+ {
+ if (_disposed)
+ throw new ObjectDisposedException(nameof(HistorianDataSource));
+
+ if (Volatile.Read(ref _connection) != null) return;
+
+ var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process);
+
+ lock (_connectionLock)
+ {
+ if (_disposed)
+ {
+ conn.CloseConnection(out _);
+ conn.Dispose();
+ throw new ObjectDisposedException(nameof(HistorianDataSource));
+ }
+
+ if (_connection != null)
+ {
+ conn.CloseConnection(out _);
+ conn.Dispose();
+ return;
+ }
+
+ _connection = conn;
+ lock (_healthLock) _activeProcessNode = winningNode;
+ Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port);
+ }
+ }
+
+ private void HandleConnectionError(Exception? ex = null)
+ {
+ lock (_connectionLock)
+ {
+ if (_connection == null) return;
+
+ try
+ {
+ _connection.CloseConnection(out _);
+ _connection.Dispose();
+ }
+ catch (Exception disposeEx)
+ {
+ Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
+ }
+
+ _connection = null;
+ string? failedNode;
+ lock (_healthLock)
+ {
+ failedNode = _activeProcessNode;
+ _activeProcessNode = null;
+ }
+
+ if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
+ Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)");
+ }
+ }
+
+ private void EnsureEventConnected()
+ {
+ if (_disposed)
+ throw new ObjectDisposedException(nameof(HistorianDataSource));
+
+ if (Volatile.Read(ref _eventConnection) != null) return;
+
+ var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event);
+
+ lock (_eventConnectionLock)
+ {
+ if (_disposed)
+ {
+ conn.CloseConnection(out _);
+ conn.Dispose();
+ throw new ObjectDisposedException(nameof(HistorianDataSource));
+ }
+
+ if (_eventConnection != null)
+ {
+ conn.CloseConnection(out _);
+ conn.Dispose();
+ return;
+ }
+
+ _eventConnection = conn;
+ lock (_healthLock) _activeEventNode = winningNode;
+ Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port);
+ }
+ }
+
+ private void HandleEventConnectionError(Exception? ex = null)
+ {
+ lock (_eventConnectionLock)
+ {
+ if (_eventConnection == null) return;
+
+ try
+ {
+ _eventConnection.CloseConnection(out _);
+ _eventConnection.Dispose();
+ }
+ catch (Exception disposeEx)
+ {
+ Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
+ }
+
+ _eventConnection = null;
+ string? failedNode;
+ lock (_healthLock)
+ {
+ failedNode = _activeEventNode;
+ _activeEventNode = null;
+ }
+
+ if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
+ Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)");
+ }
+ }
+
+ public Task> ReadRawAsync(
+ string tagName, DateTime startTime, DateTime endTime, int maxValues,
+ CancellationToken ct = default)
+ {
+ var results = new List();
+
+ try
+ {
+ EnsureConnected();
+
+ using var query = _connection!.CreateHistoryQuery();
+ var args = new HistoryQueryArgs
+ {
+ TagNames = new StringCollection { tagName },
+ StartDateTime = startTime,
+ EndDateTime = endTime,
+ RetrievalMode = HistorianRetrievalMode.Full
+ };
+
+ if (maxValues > 0)
+ args.BatchSize = (uint)maxValues;
+ else if (_config.MaxValuesPerRead > 0)
+ args.BatchSize = (uint)_config.MaxValuesPerRead;
+
+ if (!query.StartQuery(args, out var error))
+ {
+ Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
+ RecordFailure($"raw StartQuery: {error.ErrorCode}");
+ HandleConnectionError();
+ return Task.FromResult(results);
+ }
+
+ var count = 0;
+ var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
+
+ while (query.MoveNext(out error))
+ {
+ ct.ThrowIfCancellationRequested();
+
+ var result = query.QueryResult;
+ var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
+
+ object? value;
+ if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
+ value = result.StringValue;
+ else
+ value = result.Value;
+
+ results.Add(new HistorianSample
+ {
+ Value = value,
+ TimestampUtc = timestamp,
+ Quality = (byte)(result.OpcQuality & 0xFF),
+ });
+
+ count++;
+ if (limit > 0 && count >= limit) break;
+ }
+
+ query.EndQuery(out _);
+ RecordSuccess();
+ }
+ catch (OperationCanceledException) { throw; }
+ catch (ObjectDisposedException) { throw; }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
+ RecordFailure($"raw: {ex.Message}");
+ HandleConnectionError(ex);
+ }
+
+ Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
+ tagName, results.Count, startTime, endTime);
+
+ return Task.FromResult(results);
+ }
+
+ public Task> ReadAggregateAsync(
+ string tagName, DateTime startTime, DateTime endTime,
+ double intervalMs, string aggregateColumn,
+ CancellationToken ct = default)
+ {
+ var results = new List();
+
+ try
+ {
+ EnsureConnected();
+
+ using var query = _connection!.CreateAnalogSummaryQuery();
+ var args = new AnalogSummaryQueryArgs
+ {
+ TagNames = new StringCollection { tagName },
+ StartDateTime = startTime,
+ EndDateTime = endTime,
+ Resolution = (ulong)intervalMs
+ };
+
+ if (!query.StartQuery(args, out var error))
+ {
+ Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
+ RecordFailure($"aggregate StartQuery: {error.ErrorCode}");
+ HandleConnectionError();
+ return Task.FromResult(results);
+ }
+
+ while (query.MoveNext(out error))
+ {
+ ct.ThrowIfCancellationRequested();
+
+ var result = query.QueryResult;
+ var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
+ var value = ExtractAggregateValue(result, aggregateColumn);
+
+ results.Add(new HistorianAggregateSample
+ {
+ Value = value,
+ TimestampUtc = timestamp,
+ });
+ }
+
+ query.EndQuery(out _);
+ RecordSuccess();
+ }
+ catch (OperationCanceledException) { throw; }
+ catch (ObjectDisposedException) { throw; }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
+ RecordFailure($"aggregate: {ex.Message}");
+ HandleConnectionError(ex);
+ }
+
+ Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
+ aggregateColumn, tagName, results.Count);
+
+ return Task.FromResult(results);
+ }
+
+ public Task> ReadAtTimeAsync(
+ string tagName, DateTime[] timestamps,
+ CancellationToken ct = default)
+ {
+ var results = new List();
+
+ if (timestamps == null || timestamps.Length == 0)
+ return Task.FromResult(results);
+
+ try
+ {
+ EnsureConnected();
+
+ foreach (var timestamp in timestamps)
+ {
+ ct.ThrowIfCancellationRequested();
+
+ using var query = _connection!.CreateHistoryQuery();
+ var args = new HistoryQueryArgs
+ {
+ TagNames = new StringCollection { tagName },
+ StartDateTime = timestamp,
+ EndDateTime = timestamp,
+ RetrievalMode = HistorianRetrievalMode.Interpolated,
+ BatchSize = 1
+ };
+
+ if (!query.StartQuery(args, out var error))
+ {
+ results.Add(new HistorianSample
+ {
+ Value = null,
+ TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
+ Quality = 0, // Bad
+ });
+ continue;
+ }
+
+ if (query.MoveNext(out error))
+ {
+ var result = query.QueryResult;
+ object? value;
+ if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
+ value = result.StringValue;
+ else
+ value = result.Value;
+
+ results.Add(new HistorianSample
+ {
+ Value = value,
+ TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
+ Quality = (byte)(result.OpcQuality & 0xFF),
+ });
+ }
+ else
+ {
+ results.Add(new HistorianSample
+ {
+ Value = null,
+ TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
+ Quality = 0,
+ });
+ }
+
+ query.EndQuery(out _);
+ }
+ RecordSuccess();
+ }
+ catch (OperationCanceledException) { throw; }
+ catch (ObjectDisposedException) { throw; }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
+ RecordFailure($"at-time: {ex.Message}");
+ HandleConnectionError(ex);
+ }
+
+ Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
+ tagName, results.Count, timestamps.Length);
+
+ return Task.FromResult(results);
+ }
+
+ public Task> ReadEventsAsync(
+ string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
+ CancellationToken ct = default)
+ {
+ var results = new List();
+
+ try
+ {
+ EnsureEventConnected();
+
+ using var query = _eventConnection!.CreateEventQuery();
+ var args = new EventQueryArgs
+ {
+ StartDateTime = startTime,
+ EndDateTime = endTime,
+ EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
+ QueryType = HistorianEventQueryType.Events,
+ EventOrder = HistorianEventOrder.Ascending
+ };
+
+ if (!string.IsNullOrEmpty(sourceName))
+ {
+ query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
+ }
+
+ if (!query.StartQuery(args, out var error))
+ {
+ Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode);
+ RecordFailure($"events StartQuery: {error.ErrorCode}");
+ HandleEventConnectionError();
+ return Task.FromResult(results);
+ }
+
+ var count = 0;
+ while (query.MoveNext(out error))
+ {
+ ct.ThrowIfCancellationRequested();
+ results.Add(ToDto(query.QueryResult));
+ count++;
+ if (maxEvents > 0 && count >= maxEvents) break;
+ }
+
+ query.EndQuery(out _);
+ RecordSuccess();
+ }
+ catch (OperationCanceledException) { throw; }
+ catch (ObjectDisposedException) { throw; }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
+ RecordFailure($"events: {ex.Message}");
+ HandleEventConnectionError(ex);
+ }
+
+ Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
+ sourceName ?? "(all)", results.Count, startTime, endTime);
+
+ return Task.FromResult(results);
+ }
+
+ private static HistorianEventDto ToDto(HistorianEvent evt)
+ {
+ // The ArchestrA SDK marks these properties obsolete but still returns them; their
+ // successors aren't wired in the version we bind against. Using them is the documented
+ // v1 behavior — suppressed locally instead of project-wide so any non-event use of
+ // deprecated SDK surface still surfaces as an error.
+#pragma warning disable CS0618
+ return new HistorianEventDto
+ {
+ Id = evt.Id,
+ Source = evt.Source,
+ EventTime = evt.EventTime,
+ ReceivedTime = evt.ReceivedTime,
+ DisplayText = evt.DisplayText,
+ Severity = (ushort)evt.Severity
+ };
+#pragma warning restore CS0618
+ }
+
+ internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
+ {
+ switch (column)
+ {
+ case "Average": return result.Average;
+ case "Minimum": return result.Minimum;
+ case "Maximum": return result.Maximum;
+ case "ValueCount": return result.ValueCount;
+ case "First": return result.First;
+ case "Last": return result.Last;
+ case "StdDev": return result.StdDev;
+ default: return null;
+ }
+ }
+
+ public void Dispose()
+ {
+ if (_disposed) return;
+ _disposed = true;
+
+ try
+ {
+ _connection?.CloseConnection(out _);
+ _connection?.Dispose();
+ }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "Error closing Historian SDK connection");
+ }
+
+ try
+ {
+ _eventConnection?.CloseConnection(out _);
+ _eventConnection?.Dispose();
+ }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "Error closing Historian SDK event connection");
+ }
+
+ _connection = null;
+ _eventConnection = null;
+ }
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs
new file mode 100644
index 0000000..d01e164
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs
@@ -0,0 +1,18 @@
+using System;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// SDK-free representation of a Historian event record. Prevents ArchestrA types from
+ /// leaking beyond HistorianDataSource.
+ ///
+ public sealed class HistorianEventDto
+ {
+ public Guid Id { get; set; }
+ public string? Source { get; set; }
+ public DateTime EventTime { get; set; }
+ public DateTime ReceivedTime { get; set; }
+ public string? DisplayText { get; set; }
+ public ushort Severity { get; set; }
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs
new file mode 100644
index 0000000..d056435
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs
@@ -0,0 +1,27 @@
+using System;
+using System.Collections.Generic;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard
+ /// via an IPC health query (not wired in PR #5; deferred).
+ ///
+ public sealed class HistorianHealthSnapshot
+ {
+ public long TotalQueries { get; set; }
+ public long TotalSuccesses { get; set; }
+ public long TotalFailures { get; set; }
+ public int ConsecutiveFailures { get; set; }
+ public DateTime? LastSuccessTime { get; set; }
+ public DateTime? LastFailureTime { get; set; }
+ public string? LastError { get; set; }
+ public bool ProcessConnectionOpen { get; set; }
+ public bool EventConnectionOpen { get; set; }
+ public string? ActiveProcessNode { get; set; }
+ public string? ActiveEventNode { get; set; }
+ public int NodeCount { get; set; }
+ public int HealthyNodeCount { get; set; }
+ public List Nodes { get; set; } = new();
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs
new file mode 100644
index 0000000..3f78ffd
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs
@@ -0,0 +1,30 @@
+using System;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// OPC-UA-free representation of a single historical data point. The Host returns these
+ /// across the IPC boundary as GalaxyDataValue; the Proxy maps quality and value to
+ /// OPC UA DataValue. Raw MX quality byte is preserved so the Proxy can use the same
+ /// quality mapper it already uses for live reads.
+ ///
+ public sealed class HistorianSample
+ {
+ public object? Value { get; set; }
+
+ /// Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality).
+ public byte Quality { get; set; }
+
+ public DateTime TimestampUtc { get; set; }
+ }
+
+ ///
+ /// Result of . When is
+ /// null the aggregate is unavailable for that bucket (Proxy maps to BadNoData).
+ ///
+ public sealed class HistorianAggregateSample
+ {
+ public double? Value { get; set; }
+ public DateTime TimestampUtc { get; set; }
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs
new file mode 100644
index 0000000..51001e9
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs
@@ -0,0 +1,73 @@
+using System;
+using System.Threading;
+using ArchestrA;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that
+ /// control connection success, failure, and timeout behavior.
+ ///
+ internal interface IHistorianConnectionFactory
+ {
+ HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type);
+ }
+
+ /// Production implementation — opens real Historian SDK connections.
+ internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory
+ {
+ public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type)
+ {
+ var conn = new HistorianAccess();
+
+ var args = new HistorianConnectionArgs
+ {
+ ServerName = config.ServerName,
+ TcpPort = (ushort)config.Port,
+ IntegratedSecurity = config.IntegratedSecurity,
+ UseArchestrAUser = config.IntegratedSecurity,
+ ConnectionType = type,
+ ReadOnly = true,
+ PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000)
+ };
+
+ if (!config.IntegratedSecurity)
+ {
+ args.UserName = config.UserName ?? string.Empty;
+ args.Password = config.Password ?? string.Empty;
+ }
+
+ if (!conn.OpenConnection(args, out var error))
+ {
+ conn.Dispose();
+ throw new InvalidOperationException(
+ $"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}");
+ }
+
+ var timeoutMs = config.CommandTimeoutSeconds * 1000;
+ var elapsed = 0;
+ while (elapsed < timeoutMs)
+ {
+ var status = new HistorianConnectionStatus();
+ conn.GetConnectionStatus(ref status);
+
+ if (status.ConnectedToServer)
+ return conn;
+
+ if (status.ErrorOccurred)
+ {
+ conn.Dispose();
+ throw new InvalidOperationException(
+ $"Historian SDK connection failed: {status.Error}");
+ }
+
+ Thread.Sleep(250);
+ elapsed += 250;
+ }
+
+ conn.Dispose();
+ throw new TimeoutException(
+ $"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s");
+ }
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs
new file mode 100644
index 0000000..146ae1b
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs
@@ -0,0 +1,34 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
+{
+ ///
+ /// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host.
+ /// Implementations read via the aahClient* SDK; the Proxy side maps returned samples
+ /// to OPC UA DataValue.
+ ///
+ public interface IHistorianDataSource : IDisposable
+ {
+ Task> ReadRawAsync(
+ string tagName, DateTime startTime, DateTime endTime, int maxValues,
+ CancellationToken ct = default);
+
+ Task> ReadAggregateAsync(
+ string tagName, DateTime startTime, DateTime endTime,
+ double intervalMs, string aggregateColumn,
+ CancellationToken ct = default);
+
+ Task> ReadAtTimeAsync(
+ string tagName, DateTime[] timestamps,
+ CancellationToken ct = default);
+
+ Task> ReadEventsAsync(
+ string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
+ CancellationToken ct = default);
+
+ HistorianHealthSnapshot GetHealthSnapshot();
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs
index de38f37..222bdef 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs
@@ -4,6 +4,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
+using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
@@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
///
public sealed class MxAccessClient : IDisposable
{
+ private static readonly ILogger Log = Serilog.Log.ForContext();
+
private readonly StaPump _pump;
private readonly IMxProxy _proxy;
private readonly string _clientName;
@@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable
/// Fires whenever the connection transitions Connected ↔ Disconnected.
public event EventHandler? ConnectionStateChanged;
+ ///
+ /// Fires once per failed subscription replay after a reconnect. Carries the tag reference
+ /// and the exception so the backend can propagate the degradation signal (e.g. mark the
+ /// subscription bad on the Proxy side rather than silently losing its callback). Added for
+ /// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an
+ /// operator would only find out that a specific subscription stopped updating through a
+ /// data-quality complaint from downstream.
+ ///
+ public event EventHandler? SubscriptionReplayFailed;
+
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
{
_pump = pump;
@@ -117,16 +130,29 @@ public sealed class MxAccessClient : IDisposable
if (idle <= _options.StaleThreshold) continue;
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
- // our reconnect signal.
+ // our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item
+ // handle; we must RemoveItem it on the same pump turn or the long-running monitor
+ // leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely).
bool probeOk;
try
{
probeOk = await _pump.InvokeAsync(() =>
{
- // AddItem on the connection handle is cheap and round-trips through COM.
- // We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
- try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
+ int probeHandle = 0;
+ try
+ {
+ probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat");
+ return probeHandle > 0;
+ }
catch { return false; }
+ finally
+ {
+ if (probeHandle > 0)
+ {
+ try { _proxy.RemoveItem(_connectionHandle, probeHandle); }
+ catch { /* proxy is dying; best-effort cleanup */ }
+ }
+ }
});
}
catch { probeOk = false; }
@@ -155,16 +181,33 @@ public sealed class MxAccessClient : IDisposable
_reconnectCount++;
ConnectionStateChanged?.Invoke(this, true);
- // Replay every subscription that was active before the disconnect.
+ // Replay every subscription that was active before the disconnect. PR 6 low
+ // finding #2: surface per-tag failures — log them and raise
+ // SubscriptionReplayFailed so the backend can propagate the degraded state
+ // (previously swallowed silently; downstream quality dropped without a signal).
var snapshot = _addressToHandle.Keys.ToArray();
_addressToHandle.Clear();
_handleToAddress.Clear();
+ var failed = 0;
foreach (var fullRef in snapshot)
{
try { await SubscribeOnPumpAsync(fullRef); }
- catch { /* skip — operator can re-subscribe */ }
+ catch (Exception subEx)
+ {
+ failed++;
+ Log.Warning(subEx,
+ "MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}",
+ fullRef, _reconnectCount);
+ SubscriptionReplayFailed?.Invoke(this,
+ new SubscriptionReplayFailedEventArgs(fullRef, subEx));
+ }
}
+ if (failed > 0)
+ Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length);
+ else
+ Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length);
+
_lastObservedActivityUtc = DateTime.UtcNow;
}
catch
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs
new file mode 100644
index 0000000..ee8f03b
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs
@@ -0,0 +1,20 @@
+using System;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
+
+///
+/// Fired by when a previously-active
+/// subscription fails to be restored after a reconnect. The backend should treat the tag as
+/// unhealthy until the next successful resubscribe.
+///
+public sealed class SubscriptionReplayFailedEventArgs : EventArgs
+{
+ public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception)
+ {
+ TagReference = tagReference;
+ Exception = exception;
+ }
+
+ public string TagReference { get; }
+ public Exception Exception { get; }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs
index 0134451..7cd543a 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs
@@ -5,6 +5,7 @@ using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
@@ -18,10 +19,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// MxAccess AlarmExtension primitives but the wire-up is also Phase 2 follow-up
/// (the v1 alarm subsystem is its own subtree).
///
-public sealed class MxAccessGalaxyBackend : IGalaxyBackend
+public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
{
private readonly GalaxyRepository _repository;
private readonly MxAccessClient _mx;
+ private readonly IHistorianDataSource? _historian;
private long _nextSessionId;
private long _nextSubscriptionId;
@@ -37,10 +39,11 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
public event System.EventHandler? OnHostStatusChanged;
#pragma warning restore CS0067
- public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
+ public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
{
_repository = repository;
_mx = mx;
+ _historian = historian;
}
public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
@@ -222,17 +225,50 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
- public Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
- => Task.FromResult(new HistoryReadResponse
+ public async Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
+ {
+ if (_historian is null)
+ return new HistoryReadResponse
+ {
+ Success = false,
+ Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
+ Tags = Array.Empty(),
+ };
+
+ var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
+ var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
+ var tags = new List(req.TagReferences.Length);
+
+ try
{
- Success = false,
- Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)",
- Tags = Array.Empty(),
- });
+ foreach (var reference in req.TagReferences)
+ {
+ var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false);
+ tags.Add(new HistoryTagValues
+ {
+ TagReference = reference,
+ Values = samples.Select(s => ToWire(reference, s)).ToArray(),
+ });
+ }
+ return new HistoryReadResponse { Success = true, Tags = tags.ToArray() };
+ }
+ catch (OperationCanceledException) { throw; }
+ catch (Exception ex)
+ {
+ return new HistoryReadResponse
+ {
+ Success = false,
+ Error = $"Historian read failed: {ex.Message}",
+ Tags = tags.ToArray(),
+ };
+ }
+ }
public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
+ public void Dispose() => _historian?.Dispose();
+
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
{
TagReference = reference,
@@ -243,6 +279,32 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
+ ///
+ /// Maps a (raw historian row, OPC-UA-free) to the IPC wire
+ /// shape. The Proxy decodes the MessagePack value and maps
+ /// through QualityMapper on its side of the pipe — we keep the raw byte here so
+ /// rich OPC DA status codes (e.g. BadNotConnected, UncertainSubNormal) survive
+ /// the hop intact.
+ ///
+ private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new()
+ {
+ TagReference = reference,
+ ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
+ ValueMessagePackType = 0,
+ StatusCode = MapHistorianQualityToOpcUa(sample.Quality),
+ SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
+ ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
+ };
+
+ private static uint MapHistorianQualityToOpcUa(byte q)
+ {
+ // Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges.
+ // The Proxy may refine this when it decodes the wire frame.
+ if (q >= 192) return 0x00000000u; // Good
+ if (q >= 64) return 0x40000000u; // Uncertain
+ return 0x80000000u; // Bad
+ }
+
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
{
AttributeName = row.AttributeName,
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs
index efff93f..2a5ceec 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs
@@ -4,6 +4,7 @@ using System.Threading;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
@@ -66,9 +67,11 @@ public static class Program
pump = new StaPump("Galaxy.Sta");
pump.WaitForStartedAsync().GetAwaiter().GetResult();
mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName);
+ var historian = BuildHistorianIfEnabled();
backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }),
- mx);
+ mx,
+ historian);
break;
}
@@ -77,6 +80,7 @@ public static class Program
try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); }
finally
{
+ (backend as IDisposable)?.Dispose();
mx?.Dispose();
pump?.Dispose();
}
@@ -91,4 +95,45 @@ public static class Program
}
finally { Log.CloseAndFlush(); }
}
+
+ ///
+ /// Builds a from the OTOPCUA_HISTORIAN_* environment
+ /// variables the supervisor passes at spawn time. Returns null when the historian is
+ /// disabled (default) so MxAccessGalaxyBackend.HistoryReadAsync returns a clear
+ /// "not configured" error instead of attempting an SDK connection to localhost.
+ ///
+ private static IHistorianDataSource? BuildHistorianIfEnabled()
+ {
+ var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED");
+ if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1")
+ return null;
+
+ var cfg = new HistorianConfiguration
+ {
+ Enabled = true,
+ ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
+ Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
+ IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase),
+ UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"),
+ Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"),
+ CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30),
+ MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000),
+ FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60),
+ };
+
+ var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS");
+ if (!string.IsNullOrWhiteSpace(servers))
+ cfg.ServerNames = new System.Collections.Generic.List(
+ servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
+
+ Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}",
+ cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port);
+ return new HistorianDataSource(cfg);
+ }
+
+ private static int TryParseInt(string envName, int defaultValue)
+ {
+ var raw = Environment.GetEnvironmentVariable(envName);
+ return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
+ }
}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj
index bc8a16a..7498013 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj
@@ -30,11 +30,43 @@
+
+
+
+
..\..\lib\ArchestrA.MxAccess.dll
true
+
+
+ ..\..\lib\aahClientManaged.dll
+ false
+
+
+ ..\..\lib\aahClientCommon.dll
+ false
+
+
+
+
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs
new file mode 100644
index 0000000..4078080
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs
@@ -0,0 +1,94 @@
+using System;
+using System.Linq;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
+{
+ [Trait("Category", "Unit")]
+ public sealed class HistorianClusterEndpointPickerTests
+ {
+ private static HistorianConfiguration Config(params string[] nodes) => new()
+ {
+ ServerName = "ignored",
+ ServerNames = nodes.ToList(),
+ FailureCooldownSeconds = 60,
+ };
+
+ [Fact]
+ public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty()
+ {
+ var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() };
+ var p = new HistorianClusterEndpointPicker(cfg);
+ p.NodeCount.ShouldBe(1);
+ p.GetHealthyNodes().ShouldBe(new[] { "only-node" });
+ }
+
+ [Fact]
+ public void Failed_node_enters_cooldown_and_is_skipped()
+ {
+ var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
+ var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
+
+ p.MarkFailed("a", "boom");
+ p.GetHealthyNodes().ShouldBe(new[] { "b" });
+ }
+
+ [Fact]
+ public void Cooldown_expires_after_configured_window()
+ {
+ var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
+ var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock);
+ p.MarkFailed("a", "boom");
+ p.GetHealthyNodes().ShouldBe(new[] { "b" });
+
+ clock = clock.AddSeconds(61);
+ p.GetHealthyNodes().ShouldBe(new[] { "a", "b" });
+ }
+
+ [Fact]
+ public void MarkHealthy_immediately_clears_cooldown()
+ {
+ var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
+ var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
+ p.MarkFailed("a", "boom");
+ p.GetHealthyNodes().ShouldBeEmpty();
+ p.MarkHealthy("a");
+ p.GetHealthyNodes().ShouldBe(new[] { "a" });
+ }
+
+ [Fact]
+ public void All_nodes_in_cooldown_returns_empty_healthy_list()
+ {
+ var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
+ var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
+ p.MarkFailed("a", "x");
+ p.MarkFailed("b", "y");
+ p.GetHealthyNodes().ShouldBeEmpty();
+ p.NodeCount.ShouldBe(2);
+ }
+
+ [Fact]
+ public void Snapshot_reports_failure_count_and_last_error()
+ {
+ var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
+ var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
+ p.MarkFailed("a", "first");
+ p.MarkFailed("a", "second");
+
+ var snap = p.SnapshotNodeStates().Single();
+ snap.FailureCount.ShouldBe(2);
+ snap.LastError.ShouldBe("second");
+ snap.IsHealthy.ShouldBeFalse();
+ snap.CooldownUntil.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Duplicate_hostnames_are_deduplicated_case_insensitively()
+ {
+ var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB"));
+ p.NodeCount.ShouldBe(2);
+ }
+ }
+}
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs
new file mode 100644
index 0000000..4c7800c
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs
@@ -0,0 +1,109 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
+{
+ [Trait("Category", "Unit")]
+ public sealed class HistorianWiringTests
+ {
+ ///
+ /// When the Proxy sends a HistoryRead but the supervisor never enabled the historian
+ /// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a
+ /// self-explanatory error — not an exception or a hang against localhost.
+ ///
+ [Fact]
+ public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured()
+ {
+ using var pump = new StaPump("Test.Sta");
+ await pump.WaitForStartedAsync();
+ var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
+ using var backend = new MxAccessGalaxyBackend(
+ new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
+ mx,
+ historian: null);
+
+ var resp = await backend.HistoryReadAsync(new HistoryReadRequest
+ {
+ TagReferences = new[] { "TestTag" },
+ StartUtcUnixMs = 0,
+ EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
+ MaxValuesPerTag = 100,
+ }, CancellationToken.None);
+
+ resp.Success.ShouldBeFalse();
+ resp.Error.ShouldContain("Historian disabled");
+ resp.Tags.ShouldBeEmpty();
+ }
+
+ ///
+ /// When the historian is wired up, we expect the backend to call through and map
+ /// samples onto the IPC wire shape. Uses a fake
+ /// that returns a single known-good sample so we can assert the mapping stays sane.
+ ///
+ [Fact]
+ public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue()
+ {
+ using var pump = new StaPump("Test.Sta");
+ await pump.WaitForStartedAsync();
+ var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
+ var fake = new FakeHistorianDataSource(new HistorianSample
+ {
+ Value = 42.5,
+ Quality = 192, // Good
+ TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc),
+ });
+ using var backend = new MxAccessGalaxyBackend(
+ new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
+ mx,
+ fake);
+
+ var resp = await backend.HistoryReadAsync(new HistoryReadRequest
+ {
+ TagReferences = new[] { "TankLevel" },
+ StartUtcUnixMs = 0,
+ EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
+ MaxValuesPerTag = 100,
+ }, CancellationToken.None);
+
+ resp.Success.ShouldBeTrue();
+ resp.Tags.Length.ShouldBe(1);
+ resp.Tags[0].TagReference.ShouldBe("TankLevel");
+ resp.Tags[0].Values.Length.ShouldBe(1);
+ resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good
+ resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull();
+ resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe(
+ new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds());
+ }
+
+ private sealed class FakeHistorianDataSource : IHistorianDataSource
+ {
+ private readonly HistorianSample _sample;
+ public FakeHistorianDataSource(HistorianSample sample) => _sample = sample;
+
+ public Task> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct)
+ => Task.FromResult(new List { _sample });
+
+ public Task> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
+ => Task.FromResult(new List());
+
+ public Task> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct)
+ => Task.FromResult(new List());
+
+ public Task> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
+ => Task.FromResult(new List());
+
+ public HistorianHealthSnapshot GetHealthSnapshot() => new();
+ public void Dispose() { }
+ }
+ }
+}
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs
new file mode 100644
index 0000000..c071788
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs
@@ -0,0 +1,173 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using ArchestrA.MxAccess;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
+
+[Trait("Category", "Unit")]
+public sealed class MxAccessClientMonitorLoopTests
+{
+ ///
+ /// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it
+ /// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds,
+ /// which over a 24h uptime becomes thousands of leaked MXAccess handles and can
+ /// eventually exhaust the runtime proxy's handle table.
+ ///
+ [Fact]
+ public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem()
+ {
+ using var pump = new StaPump("Monitor.Sta");
+ await pump.WaitForStartedAsync();
+
+ var proxy = new CountingProxy();
+ var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions
+ {
+ AutoReconnect = true,
+ MonitorInterval = TimeSpan.FromMilliseconds(150),
+ StaleThreshold = TimeSpan.FromMilliseconds(50),
+ });
+
+ await client.ConnectAsync();
+
+ // Wait past StaleThreshold, then let several monitor cycles fire.
+ await Task.Delay(700);
+
+ client.Dispose();
+
+ // One Heartbeat probe fires per monitor tick once the connection looks stale.
+ proxy.HeartbeatAddCount.ShouldBeGreaterThan(1);
+ // Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle.
+ proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount);
+ proxy.OutstandingHeartbeatHandles.ShouldBe(0);
+ }
+
+ ///
+ /// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise
+ /// SubscriptionReplayFailed so the backend can propagate the degradation, not get
+ /// silently eaten.
+ ///
+ [Fact]
+ public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay()
+ {
+ using var pump = new StaPump("Replay.Sta");
+ await pump.WaitForStartedAsync();
+
+ var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" });
+ var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions
+ {
+ AutoReconnect = true,
+ MonitorInterval = TimeSpan.FromMilliseconds(120),
+ StaleThreshold = TimeSpan.FromMilliseconds(50),
+ });
+
+ var failures = new ConcurrentBag();
+ client.SubscriptionReplayFailed += (_, e) => failures.Add(e);
+
+ await client.ConnectAsync();
+ await client.SubscribeAsync("GoodTag.X", (_, _) => { });
+ await client.SubscribeAsync("BadTag.A", (_, _) => { });
+ await client.SubscribeAsync("BadTag.B", (_, _) => { });
+
+ proxy.TriggerProbeFailureOnNextCall();
+
+ // Wait for the monitor loop to probe → fail → reconnect → replay.
+ await Task.Delay(800);
+
+ client.Dispose();
+
+ failures.Count.ShouldBe(2);
+ var names = new HashSet();
+ foreach (var f in failures) names.Add(f.TagReference);
+ names.ShouldContain("BadTag.A");
+ names.ShouldContain("BadTag.B");
+ }
+
+ // ----- test doubles -----
+
+ private sealed class CountingProxy : IMxProxy
+ {
+ private int _next = 1;
+ private readonly ConcurrentDictionary _live = new();
+
+ public int HeartbeatAddCount;
+ public int HeartbeatRemoveCount;
+ public int OutstandingHeartbeatHandles => _live.Count;
+
+ public event MxDataChangeHandler? OnDataChange { add { } remove { } }
+ public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
+
+ public int Register(string _) => 42;
+ public void Unregister(int _) { }
+
+ public int AddItem(int _, string address)
+ {
+ var h = Interlocked.Increment(ref _next);
+ _live[h] = address;
+ if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount);
+ return h;
+ }
+
+ public void RemoveItem(int _, int itemHandle)
+ {
+ if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat")
+ Interlocked.Increment(ref HeartbeatRemoveCount);
+ }
+
+ public void AdviseSupervisory(int _, int __) { }
+ public void UnAdviseSupervisory(int _, int __) { }
+ public void Write(int _, int __, object ___, int ____) { }
+ }
+
+ ///
+ /// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall
+ /// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the
+ /// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the
+ /// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once
+ /// per failing tag.
+ ///
+ private sealed class ReplayFailingProxy : IMxProxy
+ {
+ private int _next = 1;
+ private readonly HashSet _failOnReplay;
+ private int _probeFailOnce;
+ private readonly ConcurrentDictionary _replayedOnce = new(StringComparer.OrdinalIgnoreCase);
+
+ public ReplayFailingProxy(IEnumerable failOnReplayForTags)
+ {
+ _failOnReplay = new HashSet(failOnReplayForTags, StringComparer.OrdinalIgnoreCase);
+ }
+
+ public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1);
+
+ public event MxDataChangeHandler? OnDataChange { add { } remove { } }
+ public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
+
+ public int Register(string _) => 42;
+ public void Unregister(int _) { }
+
+ public int AddItem(int _, string address)
+ {
+ if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1)
+ throw new InvalidOperationException("simulated probe failure");
+
+ // Fail only on the *replay* AddItem for listed tags — not the initial subscribe.
+ if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address))
+ throw new InvalidOperationException($"simulated replay failure for {address}");
+
+ if (_failOnReplay.Contains(address)) _replayedOnce[address] = true;
+ return Interlocked.Increment(ref _next);
+ }
+
+ public void RemoveItem(int _, int __) { }
+ public void AdviseSupervisory(int _, int __) { }
+ public void UnAdviseSupervisory(int _, int __) { }
+ public void Write(int _, int __, object ___, int ____) { }
+ }
+}
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj
index fd5d722..6f803d5 100644
--- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj
@@ -24,6 +24,11 @@
+
+
+ ..\..\lib\ArchestrA.MxAccess.dll
+