diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs new file mode 100644 index 0000000..312207b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which + /// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands + /// out an ordered list of eligible candidates for the data source to try in sequence. + /// + internal sealed class HistorianClusterEndpointPicker + { + private readonly Func _clock; + private readonly TimeSpan _cooldown; + private readonly object _lock = new object(); + private readonly List _nodes; + + public HistorianClusterEndpointPicker(HistorianConfiguration config) + : this(config, () => DateTime.UtcNow) { } + + internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func clock) + { + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + _cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds)); + + var names = (config.ServerNames != null && config.ServerNames.Count > 0) + ? config.ServerNames + : new List { config.ServerName }; + + _nodes = names + .Where(n => !string.IsNullOrWhiteSpace(n)) + .Select(n => n.Trim()) + .Distinct(StringComparer.OrdinalIgnoreCase) + .Select(n => new NodeEntry { Name = n }) + .ToList(); + } + + public int NodeCount + { + get { lock (_lock) return _nodes.Count; } + } + + public IReadOnlyList GetHealthyNodes() + { + lock (_lock) + { + var now = _clock(); + return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList(); + } + } + + public int HealthyNodeCount + { + get + { + lock (_lock) + { + var now = _clock(); + return _nodes.Count(n => IsHealthyAt(n, now)); + } + } + } + + public void MarkFailed(string node, string? error) + { + lock (_lock) + { + var entry = FindEntry(node); + if (entry == null) return; + + var now = _clock(); + entry.FailureCount++; + entry.LastError = error; + entry.LastFailureTime = now; + entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null; + } + } + + public void MarkHealthy(string node) + { + lock (_lock) + { + var entry = FindEntry(node); + if (entry == null) return; + entry.CooldownUntil = null; + } + } + + public List SnapshotNodeStates() + { + lock (_lock) + { + var now = _clock(); + return _nodes.Select(n => new HistorianClusterNodeState + { + Name = n.Name, + IsHealthy = IsHealthyAt(n, now), + CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil, + FailureCount = n.FailureCount, + LastError = n.LastError, + LastFailureTime = n.LastFailureTime + }).ToList(); + } + } + + private static bool IsHealthyAt(NodeEntry entry, DateTime now) + { + return entry.CooldownUntil == null || entry.CooldownUntil <= now; + } + + private NodeEntry? FindEntry(string node) + { + for (var i = 0; i < _nodes.Count; i++) + if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase)) + return _nodes[i]; + return null; + } + + private sealed class NodeEntry + { + public string Name { get; set; } = ""; + public DateTime? CooldownUntil { get; set; } + public int FailureCount { get; set; } + public string? LastError { get; set; } + public DateTime? LastFailureTime { get; set; } + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs new file mode 100644 index 0000000..5b78243 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs @@ -0,0 +1,18 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Point-in-time state of a single historian cluster node. One entry per configured node + /// appears inside . + /// + public sealed class HistorianClusterNodeState + { + public string Name { get; set; } = ""; + public bool IsHealthy { get; set; } + public DateTime? CooldownUntil { get; set; } + public int FailureCount { get; set; } + public string? LastError { get; set; } + public DateTime? LastFailureTime { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs new file mode 100644 index 0000000..8d3ac17 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs @@ -0,0 +1,38 @@ +using System.Collections.Generic; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Wonderware Historian SDK configuration. Populated from environment variables at Host + /// startup (see Program.cs) or from the Proxy's DriverInstance.DriverConfig + /// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation. + /// + public sealed class HistorianConfiguration + { + public bool Enabled { get; set; } = false; + + /// Single-node fallback when is empty. + public string ServerName { get; set; } = "localhost"; + + /// + /// Ordered cluster nodes. When non-empty, the data source tries each in order on connect, + /// falling through to the next on failure. A failed node is placed in cooldown for + /// before being re-eligible. + /// + public List ServerNames { get; set; } = new(); + + public int FailureCooldownSeconds { get; set; } = 60; + public bool IntegratedSecurity { get; set; } = true; + public string? UserName { get; set; } + public string? Password { get; set; } + public int Port { get; set; } = 32568; + public int CommandTimeoutSeconds { get; set; } = 30; + public int MaxValuesPerRead { get; set; } = 10000; + + /// + /// Outer safety timeout applied to sync-over-async Historian operations. Must be + /// comfortably larger than . + /// + public int RequestTimeoutSeconds { get; set; } = 60; + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs new file mode 100644 index 0000000..d132aed --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs @@ -0,0 +1,621 @@ +using System; +using System.Collections.Generic; +using StringCollection = System.Collections.Specialized.StringCollection; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA; +using Serilog; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Reads historical data from the Wonderware Historian via the aahClientManaged SDK. + /// OPC-UA-free — emits / + /// which the Proxy maps to OPC UA DataValue on its side of the IPC. + /// + public sealed class HistorianDataSource : IHistorianDataSource + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly HistorianConfiguration _config; + private readonly object _connectionLock = new object(); + private readonly object _eventConnectionLock = new object(); + private readonly IHistorianConnectionFactory _factory; + private HistorianAccess? _connection; + private HistorianAccess? _eventConnection; + private bool _disposed; + + private readonly object _healthLock = new object(); + private long _totalSuccesses; + private long _totalFailures; + private int _consecutiveFailures; + private DateTime? _lastSuccessTime; + private DateTime? _lastFailureTime; + private string? _lastError; + private string? _activeProcessNode; + private string? _activeEventNode; + + private readonly HistorianClusterEndpointPicker _picker; + + public HistorianDataSource(HistorianConfiguration config) + : this(config, new SdkHistorianConnectionFactory(), null) { } + + internal HistorianDataSource( + HistorianConfiguration config, + IHistorianConnectionFactory factory, + HistorianClusterEndpointPicker? picker = null) + { + _config = config; + _factory = factory; + _picker = picker ?? new HistorianClusterEndpointPicker(config); + } + + private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type) + { + var candidates = _picker.GetHealthyNodes(); + if (candidates.Count == 0) + { + var total = _picker.NodeCount; + throw new InvalidOperationException( + total == 0 + ? "No historian nodes configured" + : $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to"); + } + + Exception? lastException = null; + foreach (var node in candidates) + { + var attemptConfig = CloneConfigWithServerName(node); + try + { + var conn = _factory.CreateAndConnect(attemptConfig, type); + _picker.MarkHealthy(node); + return (conn, node); + } + catch (Exception ex) + { + _picker.MarkFailed(node, ex.Message); + lastException = ex; + Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node); + } + } + + var inner = lastException?.Message ?? "(no detail)"; + throw new InvalidOperationException( + $"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}", + lastException); + } + + private HistorianConfiguration CloneConfigWithServerName(string serverName) + { + return new HistorianConfiguration + { + Enabled = _config.Enabled, + ServerName = serverName, + ServerNames = _config.ServerNames, + FailureCooldownSeconds = _config.FailureCooldownSeconds, + IntegratedSecurity = _config.IntegratedSecurity, + UserName = _config.UserName, + Password = _config.Password, + Port = _config.Port, + CommandTimeoutSeconds = _config.CommandTimeoutSeconds, + MaxValuesPerRead = _config.MaxValuesPerRead + }; + } + + public HistorianHealthSnapshot GetHealthSnapshot() + { + var nodeStates = _picker.SnapshotNodeStates(); + var healthyCount = 0; + foreach (var n in nodeStates) + if (n.IsHealthy) healthyCount++; + + lock (_healthLock) + { + return new HistorianHealthSnapshot + { + TotalQueries = _totalSuccesses + _totalFailures, + TotalSuccesses = _totalSuccesses, + TotalFailures = _totalFailures, + ConsecutiveFailures = _consecutiveFailures, + LastSuccessTime = _lastSuccessTime, + LastFailureTime = _lastFailureTime, + LastError = _lastError, + ProcessConnectionOpen = Volatile.Read(ref _connection) != null, + EventConnectionOpen = Volatile.Read(ref _eventConnection) != null, + ActiveProcessNode = _activeProcessNode, + ActiveEventNode = _activeEventNode, + NodeCount = nodeStates.Count, + HealthyNodeCount = healthyCount, + Nodes = nodeStates + }; + } + } + + private void RecordSuccess() + { + lock (_healthLock) + { + _totalSuccesses++; + _lastSuccessTime = DateTime.UtcNow; + _consecutiveFailures = 0; + _lastError = null; + } + } + + private void RecordFailure(string error) + { + lock (_healthLock) + { + _totalFailures++; + _lastFailureTime = DateTime.UtcNow; + _consecutiveFailures++; + _lastError = error; + } + } + + private void EnsureConnected() + { + if (_disposed) + throw new ObjectDisposedException(nameof(HistorianDataSource)); + + if (Volatile.Read(ref _connection) != null) return; + + var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process); + + lock (_connectionLock) + { + if (_disposed) + { + conn.CloseConnection(out _); + conn.Dispose(); + throw new ObjectDisposedException(nameof(HistorianDataSource)); + } + + if (_connection != null) + { + conn.CloseConnection(out _); + conn.Dispose(); + return; + } + + _connection = conn; + lock (_healthLock) _activeProcessNode = winningNode; + Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port); + } + } + + private void HandleConnectionError(Exception? ex = null) + { + lock (_connectionLock) + { + if (_connection == null) return; + + try + { + _connection.CloseConnection(out _); + _connection.Dispose(); + } + catch (Exception disposeEx) + { + Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery"); + } + + _connection = null; + string? failedNode; + lock (_healthLock) + { + failedNode = _activeProcessNode; + _activeProcessNode = null; + } + + if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure"); + Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + private void EnsureEventConnected() + { + if (_disposed) + throw new ObjectDisposedException(nameof(HistorianDataSource)); + + if (Volatile.Read(ref _eventConnection) != null) return; + + var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event); + + lock (_eventConnectionLock) + { + if (_disposed) + { + conn.CloseConnection(out _); + conn.Dispose(); + throw new ObjectDisposedException(nameof(HistorianDataSource)); + } + + if (_eventConnection != null) + { + conn.CloseConnection(out _); + conn.Dispose(); + return; + } + + _eventConnection = conn; + lock (_healthLock) _activeEventNode = winningNode; + Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port); + } + } + + private void HandleEventConnectionError(Exception? ex = null) + { + lock (_eventConnectionLock) + { + if (_eventConnection == null) return; + + try + { + _eventConnection.CloseConnection(out _); + _eventConnection.Dispose(); + } + catch (Exception disposeEx) + { + Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery"); + } + + _eventConnection = null; + string? failedNode; + lock (_healthLock) + { + failedNode = _activeEventNode; + _activeEventNode = null; + } + + if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure"); + Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + public Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureConnected(); + + using var query = _connection!.CreateHistoryQuery(); + var args = new HistoryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = startTime, + EndDateTime = endTime, + RetrievalMode = HistorianRetrievalMode.Full + }; + + if (maxValues > 0) + args.BatchSize = (uint)maxValues; + else if (_config.MaxValuesPerRead > 0) + args.BatchSize = (uint)_config.MaxValuesPerRead; + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode); + RecordFailure($"raw StartQuery: {error.ErrorCode}"); + HandleConnectionError(); + return Task.FromResult(results); + } + + var count = 0; + var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead; + + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + + var result = query.QueryResult; + var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc); + + object? value; + if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0) + value = result.StringValue; + else + value = result.Value; + + results.Add(new HistorianSample + { + Value = value, + TimestampUtc = timestamp, + Quality = (byte)(result.OpcQuality & 0xFF), + }); + + count++; + if (limit > 0 && count >= limit) break; + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName); + RecordFailure($"raw: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})", + tagName, results.Count, startTime, endTime); + + return Task.FromResult(results); + } + + public Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureConnected(); + + using var query = _connection!.CreateAnalogSummaryQuery(); + var args = new AnalogSummaryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = startTime, + EndDateTime = endTime, + Resolution = (ulong)intervalMs + }; + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode); + RecordFailure($"aggregate StartQuery: {error.ErrorCode}"); + HandleConnectionError(); + return Task.FromResult(results); + } + + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + + var result = query.QueryResult; + var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc); + var value = ExtractAggregateValue(result, aggregateColumn); + + results.Add(new HistorianAggregateSample + { + Value = value, + TimestampUtc = timestamp, + }); + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName); + RecordFailure($"aggregate: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values", + aggregateColumn, tagName, results.Count); + + return Task.FromResult(results); + } + + public Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, + CancellationToken ct = default) + { + var results = new List(); + + if (timestamps == null || timestamps.Length == 0) + return Task.FromResult(results); + + try + { + EnsureConnected(); + + foreach (var timestamp in timestamps) + { + ct.ThrowIfCancellationRequested(); + + using var query = _connection!.CreateHistoryQuery(); + var args = new HistoryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = timestamp, + EndDateTime = timestamp, + RetrievalMode = HistorianRetrievalMode.Interpolated, + BatchSize = 1 + }; + + if (!query.StartQuery(args, out var error)) + { + results.Add(new HistorianSample + { + Value = null, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = 0, // Bad + }); + continue; + } + + if (query.MoveNext(out error)) + { + var result = query.QueryResult; + object? value; + if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0) + value = result.StringValue; + else + value = result.Value; + + results.Add(new HistorianSample + { + Value = value, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = (byte)(result.OpcQuality & 0xFF), + }); + } + else + { + results.Add(new HistorianSample + { + Value = null, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = 0, + }); + } + + query.EndQuery(out _); + } + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName); + RecordFailure($"at-time: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps", + tagName, results.Count, timestamps.Length); + + return Task.FromResult(results); + } + + public Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureEventConnected(); + + using var query = _eventConnection!.CreateEventQuery(); + var args = new EventQueryArgs + { + StartDateTime = startTime, + EndDateTime = endTime, + EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead, + QueryType = HistorianEventQueryType.Events, + EventOrder = HistorianEventOrder.Ascending + }; + + if (!string.IsNullOrEmpty(sourceName)) + { + query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _); + } + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode); + RecordFailure($"events StartQuery: {error.ErrorCode}"); + HandleEventConnectionError(); + return Task.FromResult(results); + } + + var count = 0; + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + results.Add(ToDto(query.QueryResult)); + count++; + if (maxEvents > 0 && count >= maxEvents) break; + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)"); + RecordFailure($"events: {ex.Message}"); + HandleEventConnectionError(ex); + } + + Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})", + sourceName ?? "(all)", results.Count, startTime, endTime); + + return Task.FromResult(results); + } + + private static HistorianEventDto ToDto(HistorianEvent evt) + { + // The ArchestrA SDK marks these properties obsolete but still returns them; their + // successors aren't wired in the version we bind against. Using them is the documented + // v1 behavior — suppressed locally instead of project-wide so any non-event use of + // deprecated SDK surface still surfaces as an error. +#pragma warning disable CS0618 + return new HistorianEventDto + { + Id = evt.Id, + Source = evt.Source, + EventTime = evt.EventTime, + ReceivedTime = evt.ReceivedTime, + DisplayText = evt.DisplayText, + Severity = (ushort)evt.Severity + }; +#pragma warning restore CS0618 + } + + internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column) + { + switch (column) + { + case "Average": return result.Average; + case "Minimum": return result.Minimum; + case "Maximum": return result.Maximum; + case "ValueCount": return result.ValueCount; + case "First": return result.First; + case "Last": return result.Last; + case "StdDev": return result.StdDev; + default: return null; + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + try + { + _connection?.CloseConnection(out _); + _connection?.Dispose(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error closing Historian SDK connection"); + } + + try + { + _eventConnection?.CloseConnection(out _); + _eventConnection?.Dispose(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error closing Historian SDK event connection"); + } + + _connection = null; + _eventConnection = null; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs new file mode 100644 index 0000000..d01e164 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs @@ -0,0 +1,18 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// SDK-free representation of a Historian event record. Prevents ArchestrA types from + /// leaking beyond HistorianDataSource. + /// + public sealed class HistorianEventDto + { + public Guid Id { get; set; } + public string? Source { get; set; } + public DateTime EventTime { get; set; } + public DateTime ReceivedTime { get; set; } + public string? DisplayText { get; set; } + public ushort Severity { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs new file mode 100644 index 0000000..d056435 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard + /// via an IPC health query (not wired in PR #5; deferred). + /// + public sealed class HistorianHealthSnapshot + { + public long TotalQueries { get; set; } + public long TotalSuccesses { get; set; } + public long TotalFailures { get; set; } + public int ConsecutiveFailures { get; set; } + public DateTime? LastSuccessTime { get; set; } + public DateTime? LastFailureTime { get; set; } + public string? LastError { get; set; } + public bool ProcessConnectionOpen { get; set; } + public bool EventConnectionOpen { get; set; } + public string? ActiveProcessNode { get; set; } + public string? ActiveEventNode { get; set; } + public int NodeCount { get; set; } + public int HealthyNodeCount { get; set; } + public List Nodes { get; set; } = new(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs new file mode 100644 index 0000000..3f78ffd --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs @@ -0,0 +1,30 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// OPC-UA-free representation of a single historical data point. The Host returns these + /// across the IPC boundary as GalaxyDataValue; the Proxy maps quality and value to + /// OPC UA DataValue. Raw MX quality byte is preserved so the Proxy can use the same + /// quality mapper it already uses for live reads. + /// + public sealed class HistorianSample + { + public object? Value { get; set; } + + /// Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality). + public byte Quality { get; set; } + + public DateTime TimestampUtc { get; set; } + } + + /// + /// Result of . When is + /// null the aggregate is unavailable for that bucket (Proxy maps to BadNoData). + /// + public sealed class HistorianAggregateSample + { + public double? Value { get; set; } + public DateTime TimestampUtc { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs new file mode 100644 index 0000000..51001e9 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs @@ -0,0 +1,73 @@ +using System; +using System.Threading; +using ArchestrA; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that + /// control connection success, failure, and timeout behavior. + /// + internal interface IHistorianConnectionFactory + { + HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type); + } + + /// Production implementation — opens real Historian SDK connections. + internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory + { + public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type) + { + var conn = new HistorianAccess(); + + var args = new HistorianConnectionArgs + { + ServerName = config.ServerName, + TcpPort = (ushort)config.Port, + IntegratedSecurity = config.IntegratedSecurity, + UseArchestrAUser = config.IntegratedSecurity, + ConnectionType = type, + ReadOnly = true, + PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000) + }; + + if (!config.IntegratedSecurity) + { + args.UserName = config.UserName ?? string.Empty; + args.Password = config.Password ?? string.Empty; + } + + if (!conn.OpenConnection(args, out var error)) + { + conn.Dispose(); + throw new InvalidOperationException( + $"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}"); + } + + var timeoutMs = config.CommandTimeoutSeconds * 1000; + var elapsed = 0; + while (elapsed < timeoutMs) + { + var status = new HistorianConnectionStatus(); + conn.GetConnectionStatus(ref status); + + if (status.ConnectedToServer) + return conn; + + if (status.ErrorOccurred) + { + conn.Dispose(); + throw new InvalidOperationException( + $"Historian SDK connection failed: {status.Error}"); + } + + Thread.Sleep(250); + elapsed += 250; + } + + conn.Dispose(); + throw new TimeoutException( + $"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s"); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs new file mode 100644 index 0000000..146ae1b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host. + /// Implementations read via the aahClient* SDK; the Proxy side maps returned samples + /// to OPC UA DataValue. + /// + public interface IHistorianDataSource : IDisposable + { + Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default); + + Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default); + + Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, + CancellationToken ct = default); + + Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, + CancellationToken ct = default); + + HistorianHealthSnapshot GetHealthSnapshot(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs index de38f37..222bdef 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using ArchestrA.MxAccess; +using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; @@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; /// public sealed class MxAccessClient : IDisposable { + private static readonly ILogger Log = Serilog.Log.ForContext(); + private readonly StaPump _pump; private readonly IMxProxy _proxy; private readonly string _clientName; @@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable /// Fires whenever the connection transitions Connected ↔ Disconnected. public event EventHandler? ConnectionStateChanged; + /// + /// Fires once per failed subscription replay after a reconnect. Carries the tag reference + /// and the exception so the backend can propagate the degradation signal (e.g. mark the + /// subscription bad on the Proxy side rather than silently losing its callback). Added for + /// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an + /// operator would only find out that a specific subscription stopped updating through a + /// data-quality complaint from downstream. + /// + public event EventHandler? SubscriptionReplayFailed; + public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null) { _pump = pump; @@ -117,16 +130,29 @@ public sealed class MxAccessClient : IDisposable if (idle <= _options.StaleThreshold) continue; // Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's - // our reconnect signal. + // our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item + // handle; we must RemoveItem it on the same pump turn or the long-running monitor + // leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely). bool probeOk; try { probeOk = await _pump.InvokeAsync(() => { - // AddItem on the connection handle is cheap and round-trips through COM. - // We use a sentinel "$Heartbeat" reference; if it fails the connection is gone. - try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; } + int probeHandle = 0; + try + { + probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat"); + return probeHandle > 0; + } catch { return false; } + finally + { + if (probeHandle > 0) + { + try { _proxy.RemoveItem(_connectionHandle, probeHandle); } + catch { /* proxy is dying; best-effort cleanup */ } + } + } }); } catch { probeOk = false; } @@ -155,16 +181,33 @@ public sealed class MxAccessClient : IDisposable _reconnectCount++; ConnectionStateChanged?.Invoke(this, true); - // Replay every subscription that was active before the disconnect. + // Replay every subscription that was active before the disconnect. PR 6 low + // finding #2: surface per-tag failures — log them and raise + // SubscriptionReplayFailed so the backend can propagate the degraded state + // (previously swallowed silently; downstream quality dropped without a signal). var snapshot = _addressToHandle.Keys.ToArray(); _addressToHandle.Clear(); _handleToAddress.Clear(); + var failed = 0; foreach (var fullRef in snapshot) { try { await SubscribeOnPumpAsync(fullRef); } - catch { /* skip — operator can re-subscribe */ } + catch (Exception subEx) + { + failed++; + Log.Warning(subEx, + "MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}", + fullRef, _reconnectCount); + SubscriptionReplayFailed?.Invoke(this, + new SubscriptionReplayFailedEventArgs(fullRef, subEx)); + } } + if (failed > 0) + Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length); + else + Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length); + _lastObservedActivityUtc = DateTime.UtcNow; } catch diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs new file mode 100644 index 0000000..ee8f03b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs @@ -0,0 +1,20 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; + +/// +/// Fired by when a previously-active +/// subscription fails to be restored after a reconnect. The backend should treat the tag as +/// unhealthy until the next successful resubscribe. +/// +public sealed class SubscriptionReplayFailedEventArgs : EventArgs +{ + public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception) + { + TagReference = tagReference; + Exception = exception; + } + + public string TagReference { get; } + public Exception Exception { get; } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs index 0134451..7cd543a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; @@ -18,10 +19,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; /// MxAccess AlarmExtension primitives but the wire-up is also Phase 2 follow-up /// (the v1 alarm subsystem is its own subtree). /// -public sealed class MxAccessGalaxyBackend : IGalaxyBackend +public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable { private readonly GalaxyRepository _repository; private readonly MxAccessClient _mx; + private readonly IHistorianDataSource? _historian; private long _nextSessionId; private long _nextSubscriptionId; @@ -37,10 +39,11 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend public event System.EventHandler? OnHostStatusChanged; #pragma warning restore CS0067 - public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx) + public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null) { _repository = repository; _mx = mx; + _historian = historian; } public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) @@ -222,17 +225,50 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask; public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask; - public Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) - => Task.FromResult(new HistoryReadResponse + public async Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) + { + if (_historian is null) + return new HistoryReadResponse + { + Success = false, + Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", + Tags = Array.Empty(), + }; + + var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime; + var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime; + var tags = new List(req.TagReferences.Length); + + try { - Success = false, - Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)", - Tags = Array.Empty(), - }); + foreach (var reference in req.TagReferences) + { + var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false); + tags.Add(new HistoryTagValues + { + TagReference = reference, + Values = samples.Select(s => ToWire(reference, s)).ToArray(), + }); + } + return new HistoryReadResponse { Success = true, Tags = tags.ToArray() }; + } + catch (OperationCanceledException) { throw; } + catch (Exception ex) + { + return new HistoryReadResponse + { + Success = false, + Error = $"Historian read failed: {ex.Message}", + Tags = tags.ToArray(), + }; + } + } public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 }); + public void Dispose() => _historian?.Dispose(); + private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new() { TagReference = reference, @@ -243,6 +279,32 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; + /// + /// Maps a (raw historian row, OPC-UA-free) to the IPC wire + /// shape. The Proxy decodes the MessagePack value and maps + /// through QualityMapper on its side of the pipe — we keep the raw byte here so + /// rich OPC DA status codes (e.g. BadNotConnected, UncertainSubNormal) survive + /// the hop intact. + /// + private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new() + { + TagReference = reference, + ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value), + ValueMessagePackType = 0, + StatusCode = MapHistorianQualityToOpcUa(sample.Quality), + SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + + private static uint MapHistorianQualityToOpcUa(byte q) + { + // Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges. + // The Proxy may refine this when it decodes the wire frame. + if (q >= 192) return 0x00000000u; // Good + if (q >= 64) return 0x40000000u; // Uncertain + return 0x80000000u; // Bad + } + private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new() { AttributeName = row.AttributeName, diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs index efff93f..2a5ceec 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs @@ -4,6 +4,7 @@ using System.Threading; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; @@ -66,9 +67,11 @@ public static class Program pump = new StaPump("Galaxy.Sta"); pump.WaitForStartedAsync().GetAwaiter().GetResult(); mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName); + var historian = BuildHistorianIfEnabled(); backend = new MxAccessGalaxyBackend( new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }), - mx); + mx, + historian); break; } @@ -77,6 +80,7 @@ public static class Program try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); } finally { + (backend as IDisposable)?.Dispose(); mx?.Dispose(); pump?.Dispose(); } @@ -91,4 +95,45 @@ public static class Program } finally { Log.CloseAndFlush(); } } + + /// + /// Builds a from the OTOPCUA_HISTORIAN_* environment + /// variables the supervisor passes at spawn time. Returns null when the historian is + /// disabled (default) so MxAccessGalaxyBackend.HistoryReadAsync returns a clear + /// "not configured" error instead of attempting an SDK connection to localhost. + /// + private static IHistorianDataSource? BuildHistorianIfEnabled() + { + var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED"); + if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1") + return null; + + var cfg = new HistorianConfiguration + { + Enabled = true, + ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost", + Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568), + IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase), + UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"), + Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"), + CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30), + MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000), + FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60), + }; + + var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS"); + if (!string.IsNullOrWhiteSpace(servers)) + cfg.ServerNames = new System.Collections.Generic.List( + servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries)); + + Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}", + cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port); + return new HistorianDataSource(cfg); + } + + private static int TryParseInt(string envName, int defaultValue) + { + var raw = Environment.GetEnvironmentVariable(envName); + return int.TryParse(raw, out var parsed) ? parsed : defaultValue; + } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj index bc8a16a..7498013 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj @@ -30,11 +30,43 @@ + + + + ..\..\lib\ArchestrA.MxAccess.dll true + + + ..\..\lib\aahClientManaged.dll + false + + + ..\..\lib\aahClientCommon.dll + false + + + + + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs new file mode 100644 index 0000000..4078080 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs @@ -0,0 +1,94 @@ +using System; +using System.Linq; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + [Trait("Category", "Unit")] + public sealed class HistorianClusterEndpointPickerTests + { + private static HistorianConfiguration Config(params string[] nodes) => new() + { + ServerName = "ignored", + ServerNames = nodes.ToList(), + FailureCooldownSeconds = 60, + }; + + [Fact] + public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty() + { + var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() }; + var p = new HistorianClusterEndpointPicker(cfg); + p.NodeCount.ShouldBe(1); + p.GetHealthyNodes().ShouldBe(new[] { "only-node" }); + } + + [Fact] + public void Failed_node_enters_cooldown_and_is_skipped() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now); + + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBe(new[] { "b" }); + } + + [Fact] + public void Cooldown_expires_after_configured_window() + { + var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock); + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBe(new[] { "b" }); + + clock = clock.AddSeconds(61); + p.GetHealthyNodes().ShouldBe(new[] { "a", "b" }); + } + + [Fact] + public void MarkHealthy_immediately_clears_cooldown() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a"), () => now); + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBeEmpty(); + p.MarkHealthy("a"); + p.GetHealthyNodes().ShouldBe(new[] { "a" }); + } + + [Fact] + public void All_nodes_in_cooldown_returns_empty_healthy_list() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now); + p.MarkFailed("a", "x"); + p.MarkFailed("b", "y"); + p.GetHealthyNodes().ShouldBeEmpty(); + p.NodeCount.ShouldBe(2); + } + + [Fact] + public void Snapshot_reports_failure_count_and_last_error() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a"), () => now); + p.MarkFailed("a", "first"); + p.MarkFailed("a", "second"); + + var snap = p.SnapshotNodeStates().Single(); + snap.FailureCount.ShouldBe(2); + snap.LastError.ShouldBe("second"); + snap.IsHealthy.ShouldBeFalse(); + snap.CooldownUntil.ShouldNotBeNull(); + } + + [Fact] + public void Duplicate_hostnames_are_deduplicated_case_insensitively() + { + var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB")); + p.NodeCount.ShouldBe(2); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs new file mode 100644 index 0000000..4c7800c --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + [Trait("Category", "Unit")] + public sealed class HistorianWiringTests + { + /// + /// When the Proxy sends a HistoryRead but the supervisor never enabled the historian + /// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a + /// self-explanatory error — not an exception or a hang against localhost. + /// + [Fact] + public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests"); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + historian: null); + + var resp = await backend.HistoryReadAsync(new HistoryReadRequest + { + TagReferences = new[] { "TestTag" }, + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + MaxValuesPerTag = 100, + }, CancellationToken.None); + + resp.Success.ShouldBeFalse(); + resp.Error.ShouldContain("Historian disabled"); + resp.Tags.ShouldBeEmpty(); + } + + /// + /// When the historian is wired up, we expect the backend to call through and map + /// samples onto the IPC wire shape. Uses a fake + /// that returns a single known-good sample so we can assert the mapping stays sane. + /// + [Fact] + public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests"); + var fake = new FakeHistorianDataSource(new HistorianSample + { + Value = 42.5, + Quality = 192, // Good + TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc), + }); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + fake); + + var resp = await backend.HistoryReadAsync(new HistoryReadRequest + { + TagReferences = new[] { "TankLevel" }, + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + MaxValuesPerTag = 100, + }, CancellationToken.None); + + resp.Success.ShouldBeTrue(); + resp.Tags.Length.ShouldBe(1); + resp.Tags[0].TagReference.ShouldBe("TankLevel"); + resp.Tags[0].Values.Length.ShouldBe(1); + resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good + resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull(); + resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe( + new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds()); + } + + private sealed class FakeHistorianDataSource : IHistorianDataSource + { + private readonly HistorianSample _sample; + public FakeHistorianDataSource(HistorianSample sample) => _sample = sample; + + public Task> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List { _sample }); + + public Task> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List()); + + public HistorianHealthSnapshot GetHealthSnapshot() => new(); + public void Dispose() { } + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs new file mode 100644 index 0000000..c071788 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs @@ -0,0 +1,173 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA.MxAccess; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests; + +[Trait("Category", "Unit")] +public sealed class MxAccessClientMonitorLoopTests +{ + /// + /// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it + /// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds, + /// which over a 24h uptime becomes thousands of leaked MXAccess handles and can + /// eventually exhaust the runtime proxy's handle table. + /// + [Fact] + public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem() + { + using var pump = new StaPump("Monitor.Sta"); + await pump.WaitForStartedAsync(); + + var proxy = new CountingProxy(); + var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions + { + AutoReconnect = true, + MonitorInterval = TimeSpan.FromMilliseconds(150), + StaleThreshold = TimeSpan.FromMilliseconds(50), + }); + + await client.ConnectAsync(); + + // Wait past StaleThreshold, then let several monitor cycles fire. + await Task.Delay(700); + + client.Dispose(); + + // One Heartbeat probe fires per monitor tick once the connection looks stale. + proxy.HeartbeatAddCount.ShouldBeGreaterThan(1); + // Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle. + proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount); + proxy.OutstandingHeartbeatHandles.ShouldBe(0); + } + + /// + /// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise + /// SubscriptionReplayFailed so the backend can propagate the degradation, not get + /// silently eaten. + /// + [Fact] + public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay() + { + using var pump = new StaPump("Replay.Sta"); + await pump.WaitForStartedAsync(); + + var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" }); + var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions + { + AutoReconnect = true, + MonitorInterval = TimeSpan.FromMilliseconds(120), + StaleThreshold = TimeSpan.FromMilliseconds(50), + }); + + var failures = new ConcurrentBag(); + client.SubscriptionReplayFailed += (_, e) => failures.Add(e); + + await client.ConnectAsync(); + await client.SubscribeAsync("GoodTag.X", (_, _) => { }); + await client.SubscribeAsync("BadTag.A", (_, _) => { }); + await client.SubscribeAsync("BadTag.B", (_, _) => { }); + + proxy.TriggerProbeFailureOnNextCall(); + + // Wait for the monitor loop to probe → fail → reconnect → replay. + await Task.Delay(800); + + client.Dispose(); + + failures.Count.ShouldBe(2); + var names = new HashSet(); + foreach (var f in failures) names.Add(f.TagReference); + names.ShouldContain("BadTag.A"); + names.ShouldContain("BadTag.B"); + } + + // ----- test doubles ----- + + private sealed class CountingProxy : IMxProxy + { + private int _next = 1; + private readonly ConcurrentDictionary _live = new(); + + public int HeartbeatAddCount; + public int HeartbeatRemoveCount; + public int OutstandingHeartbeatHandles => _live.Count; + + public event MxDataChangeHandler? OnDataChange { add { } remove { } } + public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } + + public int Register(string _) => 42; + public void Unregister(int _) { } + + public int AddItem(int _, string address) + { + var h = Interlocked.Increment(ref _next); + _live[h] = address; + if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount); + return h; + } + + public void RemoveItem(int _, int itemHandle) + { + if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat") + Interlocked.Increment(ref HeartbeatRemoveCount); + } + + public void AdviseSupervisory(int _, int __) { } + public void UnAdviseSupervisory(int _, int __) { } + public void Write(int _, int __, object ___, int ____) { } + } + + /// + /// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall + /// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the + /// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the + /// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once + /// per failing tag. + /// + private sealed class ReplayFailingProxy : IMxProxy + { + private int _next = 1; + private readonly HashSet _failOnReplay; + private int _probeFailOnce; + private readonly ConcurrentDictionary _replayedOnce = new(StringComparer.OrdinalIgnoreCase); + + public ReplayFailingProxy(IEnumerable failOnReplayForTags) + { + _failOnReplay = new HashSet(failOnReplayForTags, StringComparer.OrdinalIgnoreCase); + } + + public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1); + + public event MxDataChangeHandler? OnDataChange { add { } remove { } } + public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } + + public int Register(string _) => 42; + public void Unregister(int _) { } + + public int AddItem(int _, string address) + { + if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1) + throw new InvalidOperationException("simulated probe failure"); + + // Fail only on the *replay* AddItem for listed tags — not the initial subscribe. + if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address)) + throw new InvalidOperationException($"simulated replay failure for {address}"); + + if (_failOnReplay.Contains(address)) _replayedOnce[address] = true; + return Interlocked.Increment(ref _next); + } + + public void RemoveItem(int _, int __) { } + public void AdviseSupervisory(int _, int __) { } + public void UnAdviseSupervisory(int _, int __) { } + public void Write(int _, int __, object ___, int ____) { } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj index fd5d722..6f803d5 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj @@ -24,6 +24,11 @@ + + + ..\..\lib\ArchestrA.MxAccess.dll +