diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs new file mode 100644 index 0000000..312207b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterEndpointPicker.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which + /// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands + /// out an ordered list of eligible candidates for the data source to try in sequence. + /// + internal sealed class HistorianClusterEndpointPicker + { + private readonly Func _clock; + private readonly TimeSpan _cooldown; + private readonly object _lock = new object(); + private readonly List _nodes; + + public HistorianClusterEndpointPicker(HistorianConfiguration config) + : this(config, () => DateTime.UtcNow) { } + + internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func clock) + { + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + _cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds)); + + var names = (config.ServerNames != null && config.ServerNames.Count > 0) + ? config.ServerNames + : new List { config.ServerName }; + + _nodes = names + .Where(n => !string.IsNullOrWhiteSpace(n)) + .Select(n => n.Trim()) + .Distinct(StringComparer.OrdinalIgnoreCase) + .Select(n => new NodeEntry { Name = n }) + .ToList(); + } + + public int NodeCount + { + get { lock (_lock) return _nodes.Count; } + } + + public IReadOnlyList GetHealthyNodes() + { + lock (_lock) + { + var now = _clock(); + return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList(); + } + } + + public int HealthyNodeCount + { + get + { + lock (_lock) + { + var now = _clock(); + return _nodes.Count(n => IsHealthyAt(n, now)); + } + } + } + + public void MarkFailed(string node, string? error) + { + lock (_lock) + { + var entry = FindEntry(node); + if (entry == null) return; + + var now = _clock(); + entry.FailureCount++; + entry.LastError = error; + entry.LastFailureTime = now; + entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null; + } + } + + public void MarkHealthy(string node) + { + lock (_lock) + { + var entry = FindEntry(node); + if (entry == null) return; + entry.CooldownUntil = null; + } + } + + public List SnapshotNodeStates() + { + lock (_lock) + { + var now = _clock(); + return _nodes.Select(n => new HistorianClusterNodeState + { + Name = n.Name, + IsHealthy = IsHealthyAt(n, now), + CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil, + FailureCount = n.FailureCount, + LastError = n.LastError, + LastFailureTime = n.LastFailureTime + }).ToList(); + } + } + + private static bool IsHealthyAt(NodeEntry entry, DateTime now) + { + return entry.CooldownUntil == null || entry.CooldownUntil <= now; + } + + private NodeEntry? FindEntry(string node) + { + for (var i = 0; i < _nodes.Count; i++) + if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase)) + return _nodes[i]; + return null; + } + + private sealed class NodeEntry + { + public string Name { get; set; } = ""; + public DateTime? CooldownUntil { get; set; } + public int FailureCount { get; set; } + public string? LastError { get; set; } + public DateTime? LastFailureTime { get; set; } + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs new file mode 100644 index 0000000..5b78243 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianClusterNodeState.cs @@ -0,0 +1,18 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Point-in-time state of a single historian cluster node. One entry per configured node + /// appears inside . + /// + public sealed class HistorianClusterNodeState + { + public string Name { get; set; } = ""; + public bool IsHealthy { get; set; } + public DateTime? CooldownUntil { get; set; } + public int FailureCount { get; set; } + public string? LastError { get; set; } + public DateTime? LastFailureTime { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs new file mode 100644 index 0000000..8d3ac17 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianConfiguration.cs @@ -0,0 +1,38 @@ +using System.Collections.Generic; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Wonderware Historian SDK configuration. Populated from environment variables at Host + /// startup (see Program.cs) or from the Proxy's DriverInstance.DriverConfig + /// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation. + /// + public sealed class HistorianConfiguration + { + public bool Enabled { get; set; } = false; + + /// Single-node fallback when is empty. + public string ServerName { get; set; } = "localhost"; + + /// + /// Ordered cluster nodes. When non-empty, the data source tries each in order on connect, + /// falling through to the next on failure. A failed node is placed in cooldown for + /// before being re-eligible. + /// + public List ServerNames { get; set; } = new(); + + public int FailureCooldownSeconds { get; set; } = 60; + public bool IntegratedSecurity { get; set; } = true; + public string? UserName { get; set; } + public string? Password { get; set; } + public int Port { get; set; } = 32568; + public int CommandTimeoutSeconds { get; set; } = 30; + public int MaxValuesPerRead { get; set; } = 10000; + + /// + /// Outer safety timeout applied to sync-over-async Historian operations. Must be + /// comfortably larger than . + /// + public int RequestTimeoutSeconds { get; set; } = 60; + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs new file mode 100644 index 0000000..d132aed --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianDataSource.cs @@ -0,0 +1,621 @@ +using System; +using System.Collections.Generic; +using StringCollection = System.Collections.Specialized.StringCollection; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA; +using Serilog; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Reads historical data from the Wonderware Historian via the aahClientManaged SDK. + /// OPC-UA-free — emits / + /// which the Proxy maps to OPC UA DataValue on its side of the IPC. + /// + public sealed class HistorianDataSource : IHistorianDataSource + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly HistorianConfiguration _config; + private readonly object _connectionLock = new object(); + private readonly object _eventConnectionLock = new object(); + private readonly IHistorianConnectionFactory _factory; + private HistorianAccess? _connection; + private HistorianAccess? _eventConnection; + private bool _disposed; + + private readonly object _healthLock = new object(); + private long _totalSuccesses; + private long _totalFailures; + private int _consecutiveFailures; + private DateTime? _lastSuccessTime; + private DateTime? _lastFailureTime; + private string? _lastError; + private string? _activeProcessNode; + private string? _activeEventNode; + + private readonly HistorianClusterEndpointPicker _picker; + + public HistorianDataSource(HistorianConfiguration config) + : this(config, new SdkHistorianConnectionFactory(), null) { } + + internal HistorianDataSource( + HistorianConfiguration config, + IHistorianConnectionFactory factory, + HistorianClusterEndpointPicker? picker = null) + { + _config = config; + _factory = factory; + _picker = picker ?? new HistorianClusterEndpointPicker(config); + } + + private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type) + { + var candidates = _picker.GetHealthyNodes(); + if (candidates.Count == 0) + { + var total = _picker.NodeCount; + throw new InvalidOperationException( + total == 0 + ? "No historian nodes configured" + : $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to"); + } + + Exception? lastException = null; + foreach (var node in candidates) + { + var attemptConfig = CloneConfigWithServerName(node); + try + { + var conn = _factory.CreateAndConnect(attemptConfig, type); + _picker.MarkHealthy(node); + return (conn, node); + } + catch (Exception ex) + { + _picker.MarkFailed(node, ex.Message); + lastException = ex; + Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node); + } + } + + var inner = lastException?.Message ?? "(no detail)"; + throw new InvalidOperationException( + $"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}", + lastException); + } + + private HistorianConfiguration CloneConfigWithServerName(string serverName) + { + return new HistorianConfiguration + { + Enabled = _config.Enabled, + ServerName = serverName, + ServerNames = _config.ServerNames, + FailureCooldownSeconds = _config.FailureCooldownSeconds, + IntegratedSecurity = _config.IntegratedSecurity, + UserName = _config.UserName, + Password = _config.Password, + Port = _config.Port, + CommandTimeoutSeconds = _config.CommandTimeoutSeconds, + MaxValuesPerRead = _config.MaxValuesPerRead + }; + } + + public HistorianHealthSnapshot GetHealthSnapshot() + { + var nodeStates = _picker.SnapshotNodeStates(); + var healthyCount = 0; + foreach (var n in nodeStates) + if (n.IsHealthy) healthyCount++; + + lock (_healthLock) + { + return new HistorianHealthSnapshot + { + TotalQueries = _totalSuccesses + _totalFailures, + TotalSuccesses = _totalSuccesses, + TotalFailures = _totalFailures, + ConsecutiveFailures = _consecutiveFailures, + LastSuccessTime = _lastSuccessTime, + LastFailureTime = _lastFailureTime, + LastError = _lastError, + ProcessConnectionOpen = Volatile.Read(ref _connection) != null, + EventConnectionOpen = Volatile.Read(ref _eventConnection) != null, + ActiveProcessNode = _activeProcessNode, + ActiveEventNode = _activeEventNode, + NodeCount = nodeStates.Count, + HealthyNodeCount = healthyCount, + Nodes = nodeStates + }; + } + } + + private void RecordSuccess() + { + lock (_healthLock) + { + _totalSuccesses++; + _lastSuccessTime = DateTime.UtcNow; + _consecutiveFailures = 0; + _lastError = null; + } + } + + private void RecordFailure(string error) + { + lock (_healthLock) + { + _totalFailures++; + _lastFailureTime = DateTime.UtcNow; + _consecutiveFailures++; + _lastError = error; + } + } + + private void EnsureConnected() + { + if (_disposed) + throw new ObjectDisposedException(nameof(HistorianDataSource)); + + if (Volatile.Read(ref _connection) != null) return; + + var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process); + + lock (_connectionLock) + { + if (_disposed) + { + conn.CloseConnection(out _); + conn.Dispose(); + throw new ObjectDisposedException(nameof(HistorianDataSource)); + } + + if (_connection != null) + { + conn.CloseConnection(out _); + conn.Dispose(); + return; + } + + _connection = conn; + lock (_healthLock) _activeProcessNode = winningNode; + Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port); + } + } + + private void HandleConnectionError(Exception? ex = null) + { + lock (_connectionLock) + { + if (_connection == null) return; + + try + { + _connection.CloseConnection(out _); + _connection.Dispose(); + } + catch (Exception disposeEx) + { + Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery"); + } + + _connection = null; + string? failedNode; + lock (_healthLock) + { + failedNode = _activeProcessNode; + _activeProcessNode = null; + } + + if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure"); + Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + private void EnsureEventConnected() + { + if (_disposed) + throw new ObjectDisposedException(nameof(HistorianDataSource)); + + if (Volatile.Read(ref _eventConnection) != null) return; + + var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event); + + lock (_eventConnectionLock) + { + if (_disposed) + { + conn.CloseConnection(out _); + conn.Dispose(); + throw new ObjectDisposedException(nameof(HistorianDataSource)); + } + + if (_eventConnection != null) + { + conn.CloseConnection(out _); + conn.Dispose(); + return; + } + + _eventConnection = conn; + lock (_healthLock) _activeEventNode = winningNode; + Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port); + } + } + + private void HandleEventConnectionError(Exception? ex = null) + { + lock (_eventConnectionLock) + { + if (_eventConnection == null) return; + + try + { + _eventConnection.CloseConnection(out _); + _eventConnection.Dispose(); + } + catch (Exception disposeEx) + { + Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery"); + } + + _eventConnection = null; + string? failedNode; + lock (_healthLock) + { + failedNode = _activeEventNode; + _activeEventNode = null; + } + + if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure"); + Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)"); + } + } + + public Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureConnected(); + + using var query = _connection!.CreateHistoryQuery(); + var args = new HistoryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = startTime, + EndDateTime = endTime, + RetrievalMode = HistorianRetrievalMode.Full + }; + + if (maxValues > 0) + args.BatchSize = (uint)maxValues; + else if (_config.MaxValuesPerRead > 0) + args.BatchSize = (uint)_config.MaxValuesPerRead; + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode); + RecordFailure($"raw StartQuery: {error.ErrorCode}"); + HandleConnectionError(); + return Task.FromResult(results); + } + + var count = 0; + var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead; + + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + + var result = query.QueryResult; + var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc); + + object? value; + if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0) + value = result.StringValue; + else + value = result.Value; + + results.Add(new HistorianSample + { + Value = value, + TimestampUtc = timestamp, + Quality = (byte)(result.OpcQuality & 0xFF), + }); + + count++; + if (limit > 0 && count >= limit) break; + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName); + RecordFailure($"raw: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})", + tagName, results.Count, startTime, endTime); + + return Task.FromResult(results); + } + + public Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureConnected(); + + using var query = _connection!.CreateAnalogSummaryQuery(); + var args = new AnalogSummaryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = startTime, + EndDateTime = endTime, + Resolution = (ulong)intervalMs + }; + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode); + RecordFailure($"aggregate StartQuery: {error.ErrorCode}"); + HandleConnectionError(); + return Task.FromResult(results); + } + + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + + var result = query.QueryResult; + var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc); + var value = ExtractAggregateValue(result, aggregateColumn); + + results.Add(new HistorianAggregateSample + { + Value = value, + TimestampUtc = timestamp, + }); + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName); + RecordFailure($"aggregate: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values", + aggregateColumn, tagName, results.Count); + + return Task.FromResult(results); + } + + public Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, + CancellationToken ct = default) + { + var results = new List(); + + if (timestamps == null || timestamps.Length == 0) + return Task.FromResult(results); + + try + { + EnsureConnected(); + + foreach (var timestamp in timestamps) + { + ct.ThrowIfCancellationRequested(); + + using var query = _connection!.CreateHistoryQuery(); + var args = new HistoryQueryArgs + { + TagNames = new StringCollection { tagName }, + StartDateTime = timestamp, + EndDateTime = timestamp, + RetrievalMode = HistorianRetrievalMode.Interpolated, + BatchSize = 1 + }; + + if (!query.StartQuery(args, out var error)) + { + results.Add(new HistorianSample + { + Value = null, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = 0, // Bad + }); + continue; + } + + if (query.MoveNext(out error)) + { + var result = query.QueryResult; + object? value; + if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0) + value = result.StringValue; + else + value = result.Value; + + results.Add(new HistorianSample + { + Value = value, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = (byte)(result.OpcQuality & 0xFF), + }); + } + else + { + results.Add(new HistorianSample + { + Value = null, + TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc), + Quality = 0, + }); + } + + query.EndQuery(out _); + } + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName); + RecordFailure($"at-time: {ex.Message}"); + HandleConnectionError(ex); + } + + Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps", + tagName, results.Count, timestamps.Length); + + return Task.FromResult(results); + } + + public Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, + CancellationToken ct = default) + { + var results = new List(); + + try + { + EnsureEventConnected(); + + using var query = _eventConnection!.CreateEventQuery(); + var args = new EventQueryArgs + { + StartDateTime = startTime, + EndDateTime = endTime, + EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead, + QueryType = HistorianEventQueryType.Events, + EventOrder = HistorianEventOrder.Ascending + }; + + if (!string.IsNullOrEmpty(sourceName)) + { + query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _); + } + + if (!query.StartQuery(args, out var error)) + { + Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode); + RecordFailure($"events StartQuery: {error.ErrorCode}"); + HandleEventConnectionError(); + return Task.FromResult(results); + } + + var count = 0; + while (query.MoveNext(out error)) + { + ct.ThrowIfCancellationRequested(); + results.Add(ToDto(query.QueryResult)); + count++; + if (maxEvents > 0 && count >= maxEvents) break; + } + + query.EndQuery(out _); + RecordSuccess(); + } + catch (OperationCanceledException) { throw; } + catch (ObjectDisposedException) { throw; } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)"); + RecordFailure($"events: {ex.Message}"); + HandleEventConnectionError(ex); + } + + Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})", + sourceName ?? "(all)", results.Count, startTime, endTime); + + return Task.FromResult(results); + } + + private static HistorianEventDto ToDto(HistorianEvent evt) + { + // The ArchestrA SDK marks these properties obsolete but still returns them; their + // successors aren't wired in the version we bind against. Using them is the documented + // v1 behavior — suppressed locally instead of project-wide so any non-event use of + // deprecated SDK surface still surfaces as an error. +#pragma warning disable CS0618 + return new HistorianEventDto + { + Id = evt.Id, + Source = evt.Source, + EventTime = evt.EventTime, + ReceivedTime = evt.ReceivedTime, + DisplayText = evt.DisplayText, + Severity = (ushort)evt.Severity + }; +#pragma warning restore CS0618 + } + + internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column) + { + switch (column) + { + case "Average": return result.Average; + case "Minimum": return result.Minimum; + case "Maximum": return result.Maximum; + case "ValueCount": return result.ValueCount; + case "First": return result.First; + case "Last": return result.Last; + case "StdDev": return result.StdDev; + default: return null; + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + try + { + _connection?.CloseConnection(out _); + _connection?.Dispose(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error closing Historian SDK connection"); + } + + try + { + _eventConnection?.CloseConnection(out _); + _eventConnection?.Dispose(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error closing Historian SDK event connection"); + } + + _connection = null; + _eventConnection = null; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs new file mode 100644 index 0000000..d01e164 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianEventDto.cs @@ -0,0 +1,18 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// SDK-free representation of a Historian event record. Prevents ArchestrA types from + /// leaking beyond HistorianDataSource. + /// + public sealed class HistorianEventDto + { + public Guid Id { get; set; } + public string? Source { get; set; } + public DateTime EventTime { get; set; } + public DateTime ReceivedTime { get; set; } + public string? DisplayText { get; set; } + public ushort Severity { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs new file mode 100644 index 0000000..d056435 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianHealthSnapshot.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard + /// via an IPC health query (not wired in PR #5; deferred). + /// + public sealed class HistorianHealthSnapshot + { + public long TotalQueries { get; set; } + public long TotalSuccesses { get; set; } + public long TotalFailures { get; set; } + public int ConsecutiveFailures { get; set; } + public DateTime? LastSuccessTime { get; set; } + public DateTime? LastFailureTime { get; set; } + public string? LastError { get; set; } + public bool ProcessConnectionOpen { get; set; } + public bool EventConnectionOpen { get; set; } + public string? ActiveProcessNode { get; set; } + public string? ActiveEventNode { get; set; } + public int NodeCount { get; set; } + public int HealthyNodeCount { get; set; } + public List Nodes { get; set; } = new(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs new file mode 100644 index 0000000..3f78ffd --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/HistorianSample.cs @@ -0,0 +1,30 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// OPC-UA-free representation of a single historical data point. The Host returns these + /// across the IPC boundary as GalaxyDataValue; the Proxy maps quality and value to + /// OPC UA DataValue. Raw MX quality byte is preserved so the Proxy can use the same + /// quality mapper it already uses for live reads. + /// + public sealed class HistorianSample + { + public object? Value { get; set; } + + /// Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality). + public byte Quality { get; set; } + + public DateTime TimestampUtc { get; set; } + } + + /// + /// Result of . When is + /// null the aggregate is unavailable for that bucket (Proxy maps to BadNoData). + /// + public sealed class HistorianAggregateSample + { + public double? Value { get; set; } + public DateTime TimestampUtc { get; set; } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs new file mode 100644 index 0000000..51001e9 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianConnectionFactory.cs @@ -0,0 +1,73 @@ +using System; +using System.Threading; +using ArchestrA; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that + /// control connection success, failure, and timeout behavior. + /// + internal interface IHistorianConnectionFactory + { + HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type); + } + + /// Production implementation — opens real Historian SDK connections. + internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory + { + public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type) + { + var conn = new HistorianAccess(); + + var args = new HistorianConnectionArgs + { + ServerName = config.ServerName, + TcpPort = (ushort)config.Port, + IntegratedSecurity = config.IntegratedSecurity, + UseArchestrAUser = config.IntegratedSecurity, + ConnectionType = type, + ReadOnly = true, + PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000) + }; + + if (!config.IntegratedSecurity) + { + args.UserName = config.UserName ?? string.Empty; + args.Password = config.Password ?? string.Empty; + } + + if (!conn.OpenConnection(args, out var error)) + { + conn.Dispose(); + throw new InvalidOperationException( + $"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}"); + } + + var timeoutMs = config.CommandTimeoutSeconds * 1000; + var elapsed = 0; + while (elapsed < timeoutMs) + { + var status = new HistorianConnectionStatus(); + conn.GetConnectionStatus(ref status); + + if (status.ConnectedToServer) + return conn; + + if (status.ErrorOccurred) + { + conn.Dispose(); + throw new InvalidOperationException( + $"Historian SDK connection failed: {status.Error}"); + } + + Thread.Sleep(250); + elapsed += 250; + } + + conn.Dispose(); + throw new TimeoutException( + $"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s"); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs new file mode 100644 index 0000000..146ae1b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Historian/IHistorianDataSource.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian +{ + /// + /// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host. + /// Implementations read via the aahClient* SDK; the Proxy side maps returned samples + /// to OPC UA DataValue. + /// + public interface IHistorianDataSource : IDisposable + { + Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default); + + Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default); + + Task> ReadAtTimeAsync( + string tagName, DateTime[] timestamps, + CancellationToken ct = default); + + Task> ReadEventsAsync( + string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, + CancellationToken ct = default); + + HistorianHealthSnapshot GetHealthSnapshot(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs index 0134451..7cd543a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; @@ -18,10 +19,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; /// MxAccess AlarmExtension primitives but the wire-up is also Phase 2 follow-up /// (the v1 alarm subsystem is its own subtree). /// -public sealed class MxAccessGalaxyBackend : IGalaxyBackend +public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable { private readonly GalaxyRepository _repository; private readonly MxAccessClient _mx; + private readonly IHistorianDataSource? _historian; private long _nextSessionId; private long _nextSubscriptionId; @@ -37,10 +39,11 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend public event System.EventHandler? OnHostStatusChanged; #pragma warning restore CS0067 - public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx) + public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null) { _repository = repository; _mx = mx; + _historian = historian; } public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) @@ -222,17 +225,50 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask; public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask; - public Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) - => Task.FromResult(new HistoryReadResponse + public async Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct) + { + if (_historian is null) + return new HistoryReadResponse + { + Success = false, + Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", + Tags = Array.Empty(), + }; + + var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime; + var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime; + var tags = new List(req.TagReferences.Length); + + try { - Success = false, - Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)", - Tags = Array.Empty(), - }); + foreach (var reference in req.TagReferences) + { + var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false); + tags.Add(new HistoryTagValues + { + TagReference = reference, + Values = samples.Select(s => ToWire(reference, s)).ToArray(), + }); + } + return new HistoryReadResponse { Success = true, Tags = tags.ToArray() }; + } + catch (OperationCanceledException) { throw; } + catch (Exception ex) + { + return new HistoryReadResponse + { + Success = false, + Error = $"Historian read failed: {ex.Message}", + Tags = tags.ToArray(), + }; + } + } public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 }); + public void Dispose() => _historian?.Dispose(); + private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new() { TagReference = reference, @@ -243,6 +279,32 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; + /// + /// Maps a (raw historian row, OPC-UA-free) to the IPC wire + /// shape. The Proxy decodes the MessagePack value and maps + /// through QualityMapper on its side of the pipe — we keep the raw byte here so + /// rich OPC DA status codes (e.g. BadNotConnected, UncertainSubNormal) survive + /// the hop intact. + /// + private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new() + { + TagReference = reference, + ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value), + ValueMessagePackType = 0, + StatusCode = MapHistorianQualityToOpcUa(sample.Quality), + SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + + private static uint MapHistorianQualityToOpcUa(byte q) + { + // Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges. + // The Proxy may refine this when it decodes the wire frame. + if (q >= 192) return 0x00000000u; // Good + if (q >= 64) return 0x40000000u; // Uncertain + return 0x80000000u; // Bad + } + private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new() { AttributeName = row.AttributeName, diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs index efff93f..2a5ceec 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Program.cs @@ -4,6 +4,7 @@ using System.Threading; using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; @@ -66,9 +67,11 @@ public static class Program pump = new StaPump("Galaxy.Sta"); pump.WaitForStartedAsync().GetAwaiter().GetResult(); mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName); + var historian = BuildHistorianIfEnabled(); backend = new MxAccessGalaxyBackend( new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }), - mx); + mx, + historian); break; } @@ -77,6 +80,7 @@ public static class Program try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); } finally { + (backend as IDisposable)?.Dispose(); mx?.Dispose(); pump?.Dispose(); } @@ -91,4 +95,45 @@ public static class Program } finally { Log.CloseAndFlush(); } } + + /// + /// Builds a from the OTOPCUA_HISTORIAN_* environment + /// variables the supervisor passes at spawn time. Returns null when the historian is + /// disabled (default) so MxAccessGalaxyBackend.HistoryReadAsync returns a clear + /// "not configured" error instead of attempting an SDK connection to localhost. + /// + private static IHistorianDataSource? BuildHistorianIfEnabled() + { + var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED"); + if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1") + return null; + + var cfg = new HistorianConfiguration + { + Enabled = true, + ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost", + Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568), + IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase), + UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"), + Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"), + CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30), + MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000), + FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60), + }; + + var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS"); + if (!string.IsNullOrWhiteSpace(servers)) + cfg.ServerNames = new System.Collections.Generic.List( + servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries)); + + Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}", + cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port); + return new HistorianDataSource(cfg); + } + + private static int TryParseInt(string envName, int defaultValue) + { + var raw = Environment.GetEnvironmentVariable(envName); + return int.TryParse(raw, out var parsed) ? parsed : defaultValue; + } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj index bc8a16a..7498013 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj @@ -30,11 +30,43 @@ + + + + ..\..\lib\ArchestrA.MxAccess.dll true + + + ..\..\lib\aahClientManaged.dll + false + + + ..\..\lib\aahClientCommon.dll + false + + + + + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs new file mode 100644 index 0000000..4078080 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianClusterEndpointPickerTests.cs @@ -0,0 +1,94 @@ +using System; +using System.Linq; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + [Trait("Category", "Unit")] + public sealed class HistorianClusterEndpointPickerTests + { + private static HistorianConfiguration Config(params string[] nodes) => new() + { + ServerName = "ignored", + ServerNames = nodes.ToList(), + FailureCooldownSeconds = 60, + }; + + [Fact] + public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty() + { + var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() }; + var p = new HistorianClusterEndpointPicker(cfg); + p.NodeCount.ShouldBe(1); + p.GetHealthyNodes().ShouldBe(new[] { "only-node" }); + } + + [Fact] + public void Failed_node_enters_cooldown_and_is_skipped() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now); + + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBe(new[] { "b" }); + } + + [Fact] + public void Cooldown_expires_after_configured_window() + { + var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock); + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBe(new[] { "b" }); + + clock = clock.AddSeconds(61); + p.GetHealthyNodes().ShouldBe(new[] { "a", "b" }); + } + + [Fact] + public void MarkHealthy_immediately_clears_cooldown() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a"), () => now); + p.MarkFailed("a", "boom"); + p.GetHealthyNodes().ShouldBeEmpty(); + p.MarkHealthy("a"); + p.GetHealthyNodes().ShouldBe(new[] { "a" }); + } + + [Fact] + public void All_nodes_in_cooldown_returns_empty_healthy_list() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now); + p.MarkFailed("a", "x"); + p.MarkFailed("b", "y"); + p.GetHealthyNodes().ShouldBeEmpty(); + p.NodeCount.ShouldBe(2); + } + + [Fact] + public void Snapshot_reports_failure_count_and_last_error() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var p = new HistorianClusterEndpointPicker(Config("a"), () => now); + p.MarkFailed("a", "first"); + p.MarkFailed("a", "second"); + + var snap = p.SnapshotNodeStates().Single(); + snap.FailureCount.ShouldBe(2); + snap.LastError.ShouldBe("second"); + snap.IsHealthy.ShouldBeFalse(); + snap.CooldownUntil.ShouldNotBeNull(); + } + + [Fact] + public void Duplicate_hostnames_are_deduplicated_case_insensitively() + { + var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB")); + p.NodeCount.ShouldBe(2); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs new file mode 100644 index 0000000..4c7800c --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistorianWiringTests.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests +{ + [Trait("Category", "Unit")] + public sealed class HistorianWiringTests + { + /// + /// When the Proxy sends a HistoryRead but the supervisor never enabled the historian + /// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a + /// self-explanatory error — not an exception or a hang against localhost. + /// + [Fact] + public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests"); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + historian: null); + + var resp = await backend.HistoryReadAsync(new HistoryReadRequest + { + TagReferences = new[] { "TestTag" }, + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + MaxValuesPerTag = 100, + }, CancellationToken.None); + + resp.Success.ShouldBeFalse(); + resp.Error.ShouldContain("Historian disabled"); + resp.Tags.ShouldBeEmpty(); + } + + /// + /// When the historian is wired up, we expect the backend to call through and map + /// samples onto the IPC wire shape. Uses a fake + /// that returns a single known-good sample so we can assert the mapping stays sane. + /// + [Fact] + public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests"); + var fake = new FakeHistorianDataSource(new HistorianSample + { + Value = 42.5, + Quality = 192, // Good + TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc), + }); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + fake); + + var resp = await backend.HistoryReadAsync(new HistoryReadRequest + { + TagReferences = new[] { "TankLevel" }, + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + MaxValuesPerTag = 100, + }, CancellationToken.None); + + resp.Success.ShouldBeTrue(); + resp.Tags.Length.ShouldBe(1); + resp.Tags[0].TagReference.ShouldBe("TankLevel"); + resp.Tags[0].Values.Length.ShouldBe(1); + resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good + resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull(); + resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe( + new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds()); + } + + private sealed class FakeHistorianDataSource : IHistorianDataSource + { + private readonly HistorianSample _sample; + public FakeHistorianDataSource(HistorianSample sample) => _sample = sample; + + public Task> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List { _sample }); + + public Task> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List()); + + public HistorianHealthSnapshot GetHealthSnapshot() => new(); + public void Dispose() { } + } + } +}