Compare commits
1 Commits
phase-2-pr
...
phase-2-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6df1a79d35 |
@@ -0,0 +1,129 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which
|
||||
/// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands
|
||||
/// out an ordered list of eligible candidates for the data source to try in sequence.
|
||||
/// </summary>
|
||||
internal sealed class HistorianClusterEndpointPicker
|
||||
{
|
||||
private readonly Func<DateTime> _clock;
|
||||
private readonly TimeSpan _cooldown;
|
||||
private readonly object _lock = new object();
|
||||
private readonly List<NodeEntry> _nodes;
|
||||
|
||||
public HistorianClusterEndpointPicker(HistorianConfiguration config)
|
||||
: this(config, () => DateTime.UtcNow) { }
|
||||
|
||||
internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func<DateTime> clock)
|
||||
{
|
||||
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
|
||||
_cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds));
|
||||
|
||||
var names = (config.ServerNames != null && config.ServerNames.Count > 0)
|
||||
? config.ServerNames
|
||||
: new List<string> { config.ServerName };
|
||||
|
||||
_nodes = names
|
||||
.Where(n => !string.IsNullOrWhiteSpace(n))
|
||||
.Select(n => n.Trim())
|
||||
.Distinct(StringComparer.OrdinalIgnoreCase)
|
||||
.Select(n => new NodeEntry { Name = n })
|
||||
.ToList();
|
||||
}
|
||||
|
||||
public int NodeCount
|
||||
{
|
||||
get { lock (_lock) return _nodes.Count; }
|
||||
}
|
||||
|
||||
public IReadOnlyList<string> GetHealthyNodes()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
var now = _clock();
|
||||
return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList();
|
||||
}
|
||||
}
|
||||
|
||||
public int HealthyNodeCount
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
var now = _clock();
|
||||
return _nodes.Count(n => IsHealthyAt(n, now));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void MarkFailed(string node, string? error)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
var entry = FindEntry(node);
|
||||
if (entry == null) return;
|
||||
|
||||
var now = _clock();
|
||||
entry.FailureCount++;
|
||||
entry.LastError = error;
|
||||
entry.LastFailureTime = now;
|
||||
entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null;
|
||||
}
|
||||
}
|
||||
|
||||
public void MarkHealthy(string node)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
var entry = FindEntry(node);
|
||||
if (entry == null) return;
|
||||
entry.CooldownUntil = null;
|
||||
}
|
||||
}
|
||||
|
||||
public List<HistorianClusterNodeState> SnapshotNodeStates()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
var now = _clock();
|
||||
return _nodes.Select(n => new HistorianClusterNodeState
|
||||
{
|
||||
Name = n.Name,
|
||||
IsHealthy = IsHealthyAt(n, now),
|
||||
CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil,
|
||||
FailureCount = n.FailureCount,
|
||||
LastError = n.LastError,
|
||||
LastFailureTime = n.LastFailureTime
|
||||
}).ToList();
|
||||
}
|
||||
}
|
||||
|
||||
private static bool IsHealthyAt(NodeEntry entry, DateTime now)
|
||||
{
|
||||
return entry.CooldownUntil == null || entry.CooldownUntil <= now;
|
||||
}
|
||||
|
||||
private NodeEntry? FindEntry(string node)
|
||||
{
|
||||
for (var i = 0; i < _nodes.Count; i++)
|
||||
if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase))
|
||||
return _nodes[i];
|
||||
return null;
|
||||
}
|
||||
|
||||
private sealed class NodeEntry
|
||||
{
|
||||
public string Name { get; set; } = "";
|
||||
public DateTime? CooldownUntil { get; set; }
|
||||
public int FailureCount { get; set; }
|
||||
public string? LastError { get; set; }
|
||||
public DateTime? LastFailureTime { get; set; }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
using System;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// Point-in-time state of a single historian cluster node. One entry per configured node
|
||||
/// appears inside <see cref="HistorianHealthSnapshot"/>.
|
||||
/// </summary>
|
||||
public sealed class HistorianClusterNodeState
|
||||
{
|
||||
public string Name { get; set; } = "";
|
||||
public bool IsHealthy { get; set; }
|
||||
public DateTime? CooldownUntil { get; set; }
|
||||
public int FailureCount { get; set; }
|
||||
public string? LastError { get; set; }
|
||||
public DateTime? LastFailureTime { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// Wonderware Historian SDK configuration. Populated from environment variables at Host
|
||||
/// startup (see <c>Program.cs</c>) or from the Proxy's <c>DriverInstance.DriverConfig</c>
|
||||
/// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation.
|
||||
/// </summary>
|
||||
public sealed class HistorianConfiguration
|
||||
{
|
||||
public bool Enabled { get; set; } = false;
|
||||
|
||||
/// <summary>Single-node fallback when <see cref="ServerNames"/> is empty.</summary>
|
||||
public string ServerName { get; set; } = "localhost";
|
||||
|
||||
/// <summary>
|
||||
/// Ordered cluster nodes. When non-empty, the data source tries each in order on connect,
|
||||
/// falling through to the next on failure. A failed node is placed in cooldown for
|
||||
/// <see cref="FailureCooldownSeconds"/> before being re-eligible.
|
||||
/// </summary>
|
||||
public List<string> ServerNames { get; set; } = new();
|
||||
|
||||
public int FailureCooldownSeconds { get; set; } = 60;
|
||||
public bool IntegratedSecurity { get; set; } = true;
|
||||
public string? UserName { get; set; }
|
||||
public string? Password { get; set; }
|
||||
public int Port { get; set; } = 32568;
|
||||
public int CommandTimeoutSeconds { get; set; } = 30;
|
||||
public int MaxValuesPerRead { get; set; } = 10000;
|
||||
|
||||
/// <summary>
|
||||
/// Outer safety timeout applied to sync-over-async Historian operations. Must be
|
||||
/// comfortably larger than <see cref="CommandTimeoutSeconds"/>.
|
||||
/// </summary>
|
||||
public int RequestTimeoutSeconds { get; set; } = 60;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,621 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using StringCollection = System.Collections.Specialized.StringCollection;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ArchestrA;
|
||||
using Serilog;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
|
||||
/// OPC-UA-free — emits <see cref="HistorianSample"/>/<see cref="HistorianAggregateSample"/>
|
||||
/// which the Proxy maps to OPC UA <c>DataValue</c> on its side of the IPC.
|
||||
/// </summary>
|
||||
public sealed class HistorianDataSource : IHistorianDataSource
|
||||
{
|
||||
private static readonly ILogger Log = Serilog.Log.ForContext<HistorianDataSource>();
|
||||
|
||||
private readonly HistorianConfiguration _config;
|
||||
private readonly object _connectionLock = new object();
|
||||
private readonly object _eventConnectionLock = new object();
|
||||
private readonly IHistorianConnectionFactory _factory;
|
||||
private HistorianAccess? _connection;
|
||||
private HistorianAccess? _eventConnection;
|
||||
private bool _disposed;
|
||||
|
||||
private readonly object _healthLock = new object();
|
||||
private long _totalSuccesses;
|
||||
private long _totalFailures;
|
||||
private int _consecutiveFailures;
|
||||
private DateTime? _lastSuccessTime;
|
||||
private DateTime? _lastFailureTime;
|
||||
private string? _lastError;
|
||||
private string? _activeProcessNode;
|
||||
private string? _activeEventNode;
|
||||
|
||||
private readonly HistorianClusterEndpointPicker _picker;
|
||||
|
||||
public HistorianDataSource(HistorianConfiguration config)
|
||||
: this(config, new SdkHistorianConnectionFactory(), null) { }
|
||||
|
||||
internal HistorianDataSource(
|
||||
HistorianConfiguration config,
|
||||
IHistorianConnectionFactory factory,
|
||||
HistorianClusterEndpointPicker? picker = null)
|
||||
{
|
||||
_config = config;
|
||||
_factory = factory;
|
||||
_picker = picker ?? new HistorianClusterEndpointPicker(config);
|
||||
}
|
||||
|
||||
private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type)
|
||||
{
|
||||
var candidates = _picker.GetHealthyNodes();
|
||||
if (candidates.Count == 0)
|
||||
{
|
||||
var total = _picker.NodeCount;
|
||||
throw new InvalidOperationException(
|
||||
total == 0
|
||||
? "No historian nodes configured"
|
||||
: $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to");
|
||||
}
|
||||
|
||||
Exception? lastException = null;
|
||||
foreach (var node in candidates)
|
||||
{
|
||||
var attemptConfig = CloneConfigWithServerName(node);
|
||||
try
|
||||
{
|
||||
var conn = _factory.CreateAndConnect(attemptConfig, type);
|
||||
_picker.MarkHealthy(node);
|
||||
return (conn, node);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_picker.MarkFailed(node, ex.Message);
|
||||
lastException = ex;
|
||||
Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node);
|
||||
}
|
||||
}
|
||||
|
||||
var inner = lastException?.Message ?? "(no detail)";
|
||||
throw new InvalidOperationException(
|
||||
$"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}",
|
||||
lastException);
|
||||
}
|
||||
|
||||
private HistorianConfiguration CloneConfigWithServerName(string serverName)
|
||||
{
|
||||
return new HistorianConfiguration
|
||||
{
|
||||
Enabled = _config.Enabled,
|
||||
ServerName = serverName,
|
||||
ServerNames = _config.ServerNames,
|
||||
FailureCooldownSeconds = _config.FailureCooldownSeconds,
|
||||
IntegratedSecurity = _config.IntegratedSecurity,
|
||||
UserName = _config.UserName,
|
||||
Password = _config.Password,
|
||||
Port = _config.Port,
|
||||
CommandTimeoutSeconds = _config.CommandTimeoutSeconds,
|
||||
MaxValuesPerRead = _config.MaxValuesPerRead
|
||||
};
|
||||
}
|
||||
|
||||
public HistorianHealthSnapshot GetHealthSnapshot()
|
||||
{
|
||||
var nodeStates = _picker.SnapshotNodeStates();
|
||||
var healthyCount = 0;
|
||||
foreach (var n in nodeStates)
|
||||
if (n.IsHealthy) healthyCount++;
|
||||
|
||||
lock (_healthLock)
|
||||
{
|
||||
return new HistorianHealthSnapshot
|
||||
{
|
||||
TotalQueries = _totalSuccesses + _totalFailures,
|
||||
TotalSuccesses = _totalSuccesses,
|
||||
TotalFailures = _totalFailures,
|
||||
ConsecutiveFailures = _consecutiveFailures,
|
||||
LastSuccessTime = _lastSuccessTime,
|
||||
LastFailureTime = _lastFailureTime,
|
||||
LastError = _lastError,
|
||||
ProcessConnectionOpen = Volatile.Read(ref _connection) != null,
|
||||
EventConnectionOpen = Volatile.Read(ref _eventConnection) != null,
|
||||
ActiveProcessNode = _activeProcessNode,
|
||||
ActiveEventNode = _activeEventNode,
|
||||
NodeCount = nodeStates.Count,
|
||||
HealthyNodeCount = healthyCount,
|
||||
Nodes = nodeStates
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private void RecordSuccess()
|
||||
{
|
||||
lock (_healthLock)
|
||||
{
|
||||
_totalSuccesses++;
|
||||
_lastSuccessTime = DateTime.UtcNow;
|
||||
_consecutiveFailures = 0;
|
||||
_lastError = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void RecordFailure(string error)
|
||||
{
|
||||
lock (_healthLock)
|
||||
{
|
||||
_totalFailures++;
|
||||
_lastFailureTime = DateTime.UtcNow;
|
||||
_consecutiveFailures++;
|
||||
_lastError = error;
|
||||
}
|
||||
}
|
||||
|
||||
private void EnsureConnected()
|
||||
{
|
||||
if (_disposed)
|
||||
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||
|
||||
if (Volatile.Read(ref _connection) != null) return;
|
||||
|
||||
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process);
|
||||
|
||||
lock (_connectionLock)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
conn.CloseConnection(out _);
|
||||
conn.Dispose();
|
||||
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||
}
|
||||
|
||||
if (_connection != null)
|
||||
{
|
||||
conn.CloseConnection(out _);
|
||||
conn.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
_connection = conn;
|
||||
lock (_healthLock) _activeProcessNode = winningNode;
|
||||
Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port);
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleConnectionError(Exception? ex = null)
|
||||
{
|
||||
lock (_connectionLock)
|
||||
{
|
||||
if (_connection == null) return;
|
||||
|
||||
try
|
||||
{
|
||||
_connection.CloseConnection(out _);
|
||||
_connection.Dispose();
|
||||
}
|
||||
catch (Exception disposeEx)
|
||||
{
|
||||
Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
|
||||
}
|
||||
|
||||
_connection = null;
|
||||
string? failedNode;
|
||||
lock (_healthLock)
|
||||
{
|
||||
failedNode = _activeProcessNode;
|
||||
_activeProcessNode = null;
|
||||
}
|
||||
|
||||
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
|
||||
Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)");
|
||||
}
|
||||
}
|
||||
|
||||
private void EnsureEventConnected()
|
||||
{
|
||||
if (_disposed)
|
||||
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||
|
||||
if (Volatile.Read(ref _eventConnection) != null) return;
|
||||
|
||||
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event);
|
||||
|
||||
lock (_eventConnectionLock)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
conn.CloseConnection(out _);
|
||||
conn.Dispose();
|
||||
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||
}
|
||||
|
||||
if (_eventConnection != null)
|
||||
{
|
||||
conn.CloseConnection(out _);
|
||||
conn.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
_eventConnection = conn;
|
||||
lock (_healthLock) _activeEventNode = winningNode;
|
||||
Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port);
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleEventConnectionError(Exception? ex = null)
|
||||
{
|
||||
lock (_eventConnectionLock)
|
||||
{
|
||||
if (_eventConnection == null) return;
|
||||
|
||||
try
|
||||
{
|
||||
_eventConnection.CloseConnection(out _);
|
||||
_eventConnection.Dispose();
|
||||
}
|
||||
catch (Exception disposeEx)
|
||||
{
|
||||
Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
|
||||
}
|
||||
|
||||
_eventConnection = null;
|
||||
string? failedNode;
|
||||
lock (_healthLock)
|
||||
{
|
||||
failedNode = _activeEventNode;
|
||||
_activeEventNode = null;
|
||||
}
|
||||
|
||||
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
|
||||
Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)");
|
||||
}
|
||||
}
|
||||
|
||||
public Task<List<HistorianSample>> ReadRawAsync(
|
||||
string tagName, DateTime startTime, DateTime endTime, int maxValues,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var results = new List<HistorianSample>();
|
||||
|
||||
try
|
||||
{
|
||||
EnsureConnected();
|
||||
|
||||
using var query = _connection!.CreateHistoryQuery();
|
||||
var args = new HistoryQueryArgs
|
||||
{
|
||||
TagNames = new StringCollection { tagName },
|
||||
StartDateTime = startTime,
|
||||
EndDateTime = endTime,
|
||||
RetrievalMode = HistorianRetrievalMode.Full
|
||||
};
|
||||
|
||||
if (maxValues > 0)
|
||||
args.BatchSize = (uint)maxValues;
|
||||
else if (_config.MaxValuesPerRead > 0)
|
||||
args.BatchSize = (uint)_config.MaxValuesPerRead;
|
||||
|
||||
if (!query.StartQuery(args, out var error))
|
||||
{
|
||||
Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
|
||||
RecordFailure($"raw StartQuery: {error.ErrorCode}");
|
||||
HandleConnectionError();
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
var count = 0;
|
||||
var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
|
||||
|
||||
while (query.MoveNext(out error))
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
var result = query.QueryResult;
|
||||
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
|
||||
|
||||
object? value;
|
||||
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||
value = result.StringValue;
|
||||
else
|
||||
value = result.Value;
|
||||
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = value,
|
||||
TimestampUtc = timestamp,
|
||||
Quality = (byte)(result.OpcQuality & 0xFF),
|
||||
});
|
||||
|
||||
count++;
|
||||
if (limit > 0 && count >= limit) break;
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
RecordSuccess();
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (ObjectDisposedException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
|
||||
RecordFailure($"raw: {ex.Message}");
|
||||
HandleConnectionError(ex);
|
||||
}
|
||||
|
||||
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
|
||||
tagName, results.Count, startTime, endTime);
|
||||
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(
|
||||
string tagName, DateTime startTime, DateTime endTime,
|
||||
double intervalMs, string aggregateColumn,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var results = new List<HistorianAggregateSample>();
|
||||
|
||||
try
|
||||
{
|
||||
EnsureConnected();
|
||||
|
||||
using var query = _connection!.CreateAnalogSummaryQuery();
|
||||
var args = new AnalogSummaryQueryArgs
|
||||
{
|
||||
TagNames = new StringCollection { tagName },
|
||||
StartDateTime = startTime,
|
||||
EndDateTime = endTime,
|
||||
Resolution = (ulong)intervalMs
|
||||
};
|
||||
|
||||
if (!query.StartQuery(args, out var error))
|
||||
{
|
||||
Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
|
||||
RecordFailure($"aggregate StartQuery: {error.ErrorCode}");
|
||||
HandleConnectionError();
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
while (query.MoveNext(out error))
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
var result = query.QueryResult;
|
||||
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
|
||||
var value = ExtractAggregateValue(result, aggregateColumn);
|
||||
|
||||
results.Add(new HistorianAggregateSample
|
||||
{
|
||||
Value = value,
|
||||
TimestampUtc = timestamp,
|
||||
});
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
RecordSuccess();
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (ObjectDisposedException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
|
||||
RecordFailure($"aggregate: {ex.Message}");
|
||||
HandleConnectionError(ex);
|
||||
}
|
||||
|
||||
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
|
||||
aggregateColumn, tagName, results.Count);
|
||||
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
public Task<List<HistorianSample>> ReadAtTimeAsync(
|
||||
string tagName, DateTime[] timestamps,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var results = new List<HistorianSample>();
|
||||
|
||||
if (timestamps == null || timestamps.Length == 0)
|
||||
return Task.FromResult(results);
|
||||
|
||||
try
|
||||
{
|
||||
EnsureConnected();
|
||||
|
||||
foreach (var timestamp in timestamps)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
using var query = _connection!.CreateHistoryQuery();
|
||||
var args = new HistoryQueryArgs
|
||||
{
|
||||
TagNames = new StringCollection { tagName },
|
||||
StartDateTime = timestamp,
|
||||
EndDateTime = timestamp,
|
||||
RetrievalMode = HistorianRetrievalMode.Interpolated,
|
||||
BatchSize = 1
|
||||
};
|
||||
|
||||
if (!query.StartQuery(args, out var error))
|
||||
{
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = null,
|
||||
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||
Quality = 0, // Bad
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (query.MoveNext(out error))
|
||||
{
|
||||
var result = query.QueryResult;
|
||||
object? value;
|
||||
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||
value = result.StringValue;
|
||||
else
|
||||
value = result.Value;
|
||||
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = value,
|
||||
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||
Quality = (byte)(result.OpcQuality & 0xFF),
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = null,
|
||||
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||
Quality = 0,
|
||||
});
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
}
|
||||
RecordSuccess();
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (ObjectDisposedException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
|
||||
RecordFailure($"at-time: {ex.Message}");
|
||||
HandleConnectionError(ex);
|
||||
}
|
||||
|
||||
Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
|
||||
tagName, results.Count, timestamps.Length);
|
||||
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
public Task<List<HistorianEventDto>> ReadEventsAsync(
|
||||
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var results = new List<HistorianEventDto>();
|
||||
|
||||
try
|
||||
{
|
||||
EnsureEventConnected();
|
||||
|
||||
using var query = _eventConnection!.CreateEventQuery();
|
||||
var args = new EventQueryArgs
|
||||
{
|
||||
StartDateTime = startTime,
|
||||
EndDateTime = endTime,
|
||||
EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
|
||||
QueryType = HistorianEventQueryType.Events,
|
||||
EventOrder = HistorianEventOrder.Ascending
|
||||
};
|
||||
|
||||
if (!string.IsNullOrEmpty(sourceName))
|
||||
{
|
||||
query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
|
||||
}
|
||||
|
||||
if (!query.StartQuery(args, out var error))
|
||||
{
|
||||
Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode);
|
||||
RecordFailure($"events StartQuery: {error.ErrorCode}");
|
||||
HandleEventConnectionError();
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
var count = 0;
|
||||
while (query.MoveNext(out error))
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
results.Add(ToDto(query.QueryResult));
|
||||
count++;
|
||||
if (maxEvents > 0 && count >= maxEvents) break;
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
RecordSuccess();
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (ObjectDisposedException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
|
||||
RecordFailure($"events: {ex.Message}");
|
||||
HandleEventConnectionError(ex);
|
||||
}
|
||||
|
||||
Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
|
||||
sourceName ?? "(all)", results.Count, startTime, endTime);
|
||||
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
private static HistorianEventDto ToDto(HistorianEvent evt)
|
||||
{
|
||||
// The ArchestrA SDK marks these properties obsolete but still returns them; their
|
||||
// successors aren't wired in the version we bind against. Using them is the documented
|
||||
// v1 behavior — suppressed locally instead of project-wide so any non-event use of
|
||||
// deprecated SDK surface still surfaces as an error.
|
||||
#pragma warning disable CS0618
|
||||
return new HistorianEventDto
|
||||
{
|
||||
Id = evt.Id,
|
||||
Source = evt.Source,
|
||||
EventTime = evt.EventTime,
|
||||
ReceivedTime = evt.ReceivedTime,
|
||||
DisplayText = evt.DisplayText,
|
||||
Severity = (ushort)evt.Severity
|
||||
};
|
||||
#pragma warning restore CS0618
|
||||
}
|
||||
|
||||
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
|
||||
{
|
||||
switch (column)
|
||||
{
|
||||
case "Average": return result.Average;
|
||||
case "Minimum": return result.Minimum;
|
||||
case "Maximum": return result.Maximum;
|
||||
case "ValueCount": return result.ValueCount;
|
||||
case "First": return result.First;
|
||||
case "Last": return result.Last;
|
||||
case "StdDev": return result.StdDev;
|
||||
default: return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
|
||||
try
|
||||
{
|
||||
_connection?.CloseConnection(out _);
|
||||
_connection?.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Error closing Historian SDK connection");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
_eventConnection?.CloseConnection(out _);
|
||||
_eventConnection?.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Error closing Historian SDK event connection");
|
||||
}
|
||||
|
||||
_connection = null;
|
||||
_eventConnection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
using System;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// SDK-free representation of a Historian event record. Prevents ArchestrA types from
|
||||
/// leaking beyond <c>HistorianDataSource</c>.
|
||||
/// </summary>
|
||||
public sealed class HistorianEventDto
|
||||
{
|
||||
public Guid Id { get; set; }
|
||||
public string? Source { get; set; }
|
||||
public DateTime EventTime { get; set; }
|
||||
public DateTime ReceivedTime { get; set; }
|
||||
public string? DisplayText { get; set; }
|
||||
public ushort Severity { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard
|
||||
/// via an IPC health query (not wired in PR #5; deferred).
|
||||
/// </summary>
|
||||
public sealed class HistorianHealthSnapshot
|
||||
{
|
||||
public long TotalQueries { get; set; }
|
||||
public long TotalSuccesses { get; set; }
|
||||
public long TotalFailures { get; set; }
|
||||
public int ConsecutiveFailures { get; set; }
|
||||
public DateTime? LastSuccessTime { get; set; }
|
||||
public DateTime? LastFailureTime { get; set; }
|
||||
public string? LastError { get; set; }
|
||||
public bool ProcessConnectionOpen { get; set; }
|
||||
public bool EventConnectionOpen { get; set; }
|
||||
public string? ActiveProcessNode { get; set; }
|
||||
public string? ActiveEventNode { get; set; }
|
||||
public int NodeCount { get; set; }
|
||||
public int HealthyNodeCount { get; set; }
|
||||
public List<HistorianClusterNodeState> Nodes { get; set; } = new();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
using System;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// OPC-UA-free representation of a single historical data point. The Host returns these
|
||||
/// across the IPC boundary as <c>GalaxyDataValue</c>; the Proxy maps quality and value to
|
||||
/// OPC UA <c>DataValue</c>. Raw MX quality byte is preserved so the Proxy can use the same
|
||||
/// quality mapper it already uses for live reads.
|
||||
/// </summary>
|
||||
public sealed class HistorianSample
|
||||
{
|
||||
public object? Value { get; set; }
|
||||
|
||||
/// <summary>Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality).</summary>
|
||||
public byte Quality { get; set; }
|
||||
|
||||
public DateTime TimestampUtc { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Result of <see cref="IHistorianDataSource.ReadAggregateAsync"/>. When <see cref="Value"/> is
|
||||
/// null the aggregate is unavailable for that bucket (Proxy maps to <c>BadNoData</c>).
|
||||
/// </summary>
|
||||
public sealed class HistorianAggregateSample
|
||||
{
|
||||
public double? Value { get; set; }
|
||||
public DateTime TimestampUtc { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using ArchestrA;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that
|
||||
/// control connection success, failure, and timeout behavior.
|
||||
/// </summary>
|
||||
internal interface IHistorianConnectionFactory
|
||||
{
|
||||
HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type);
|
||||
}
|
||||
|
||||
/// <summary>Production implementation — opens real Historian SDK connections.</summary>
|
||||
internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory
|
||||
{
|
||||
public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type)
|
||||
{
|
||||
var conn = new HistorianAccess();
|
||||
|
||||
var args = new HistorianConnectionArgs
|
||||
{
|
||||
ServerName = config.ServerName,
|
||||
TcpPort = (ushort)config.Port,
|
||||
IntegratedSecurity = config.IntegratedSecurity,
|
||||
UseArchestrAUser = config.IntegratedSecurity,
|
||||
ConnectionType = type,
|
||||
ReadOnly = true,
|
||||
PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000)
|
||||
};
|
||||
|
||||
if (!config.IntegratedSecurity)
|
||||
{
|
||||
args.UserName = config.UserName ?? string.Empty;
|
||||
args.Password = config.Password ?? string.Empty;
|
||||
}
|
||||
|
||||
if (!conn.OpenConnection(args, out var error))
|
||||
{
|
||||
conn.Dispose();
|
||||
throw new InvalidOperationException(
|
||||
$"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}");
|
||||
}
|
||||
|
||||
var timeoutMs = config.CommandTimeoutSeconds * 1000;
|
||||
var elapsed = 0;
|
||||
while (elapsed < timeoutMs)
|
||||
{
|
||||
var status = new HistorianConnectionStatus();
|
||||
conn.GetConnectionStatus(ref status);
|
||||
|
||||
if (status.ConnectedToServer)
|
||||
return conn;
|
||||
|
||||
if (status.ErrorOccurred)
|
||||
{
|
||||
conn.Dispose();
|
||||
throw new InvalidOperationException(
|
||||
$"Historian SDK connection failed: {status.Error}");
|
||||
}
|
||||
|
||||
Thread.Sleep(250);
|
||||
elapsed += 250;
|
||||
}
|
||||
|
||||
conn.Dispose();
|
||||
throw new TimeoutException(
|
||||
$"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host.
|
||||
/// Implementations read via the aahClient* SDK; the Proxy side maps returned samples
|
||||
/// to OPC UA <c>DataValue</c>.
|
||||
/// </summary>
|
||||
public interface IHistorianDataSource : IDisposable
|
||||
{
|
||||
Task<List<HistorianSample>> ReadRawAsync(
|
||||
string tagName, DateTime startTime, DateTime endTime, int maxValues,
|
||||
CancellationToken ct = default);
|
||||
|
||||
Task<List<HistorianAggregateSample>> ReadAggregateAsync(
|
||||
string tagName, DateTime startTime, DateTime endTime,
|
||||
double intervalMs, string aggregateColumn,
|
||||
CancellationToken ct = default);
|
||||
|
||||
Task<List<HistorianSample>> ReadAtTimeAsync(
|
||||
string tagName, DateTime[] timestamps,
|
||||
CancellationToken ct = default);
|
||||
|
||||
Task<List<HistorianEventDto>> ReadEventsAsync(
|
||||
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
|
||||
CancellationToken ct = default);
|
||||
|
||||
HistorianHealthSnapshot GetHealthSnapshot();
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
@@ -18,10 +19,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
/// MxAccess <c>AlarmExtension</c> primitives but the wire-up is also Phase 2 follow-up
|
||||
/// (the v1 alarm subsystem is its own subtree).
|
||||
/// </summary>
|
||||
public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
||||
{
|
||||
private readonly GalaxyRepository _repository;
|
||||
private readonly MxAccessClient _mx;
|
||||
private readonly IHistorianDataSource? _historian;
|
||||
private long _nextSessionId;
|
||||
private long _nextSubscriptionId;
|
||||
|
||||
@@ -37,10 +39,11 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||
#pragma warning restore CS0067
|
||||
|
||||
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
|
||||
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
|
||||
{
|
||||
_repository = repository;
|
||||
_mx = mx;
|
||||
_historian = historian;
|
||||
}
|
||||
|
||||
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||
@@ -222,17 +225,50 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
|
||||
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
|
||||
|
||||
public Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new HistoryReadResponse
|
||||
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
|
||||
{
|
||||
if (_historian is null)
|
||||
return new HistoryReadResponse
|
||||
{
|
||||
Success = false,
|
||||
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
||||
Tags = Array.Empty<HistoryTagValues>(),
|
||||
};
|
||||
|
||||
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
||||
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
||||
var tags = new List<HistoryTagValues>(req.TagReferences.Length);
|
||||
|
||||
try
|
||||
{
|
||||
Success = false,
|
||||
Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)",
|
||||
Tags = Array.Empty<HistoryTagValues>(),
|
||||
});
|
||||
foreach (var reference in req.TagReferences)
|
||||
{
|
||||
var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false);
|
||||
tags.Add(new HistoryTagValues
|
||||
{
|
||||
TagReference = reference,
|
||||
Values = samples.Select(s => ToWire(reference, s)).ToArray(),
|
||||
});
|
||||
}
|
||||
return new HistoryReadResponse { Success = true, Tags = tags.ToArray() };
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new HistoryReadResponse
|
||||
{
|
||||
Success = false,
|
||||
Error = $"Historian read failed: {ex.Message}",
|
||||
Tags = tags.ToArray(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||
|
||||
public void Dispose() => _historian?.Dispose();
|
||||
|
||||
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
|
||||
{
|
||||
TagReference = reference,
|
||||
@@ -243,6 +279,32 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
||||
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Maps a <see cref="HistorianSample"/> (raw historian row, OPC-UA-free) to the IPC wire
|
||||
/// shape. The Proxy decodes the MessagePack value and maps <see cref="HistorianSample.Quality"/>
|
||||
/// through <c>QualityMapper</c> on its side of the pipe — we keep the raw byte here so
|
||||
/// rich OPC DA status codes (e.g. <c>BadNotConnected</c>, <c>UncertainSubNormal</c>) survive
|
||||
/// the hop intact.
|
||||
/// </summary>
|
||||
private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new()
|
||||
{
|
||||
TagReference = reference,
|
||||
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
|
||||
ValueMessagePackType = 0,
|
||||
StatusCode = MapHistorianQualityToOpcUa(sample.Quality),
|
||||
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
};
|
||||
|
||||
private static uint MapHistorianQualityToOpcUa(byte q)
|
||||
{
|
||||
// Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges.
|
||||
// The Proxy may refine this when it decodes the wire frame.
|
||||
if (q >= 192) return 0x00000000u; // Good
|
||||
if (q >= 64) return 0x40000000u; // Uncertain
|
||||
return 0x80000000u; // Bad
|
||||
}
|
||||
|
||||
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
|
||||
{
|
||||
AttributeName = row.AttributeName,
|
||||
|
||||
@@ -4,6 +4,7 @@ using System.Threading;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||
@@ -66,9 +67,11 @@ public static class Program
|
||||
pump = new StaPump("Galaxy.Sta");
|
||||
pump.WaitForStartedAsync().GetAwaiter().GetResult();
|
||||
mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName);
|
||||
var historian = BuildHistorianIfEnabled();
|
||||
backend = new MxAccessGalaxyBackend(
|
||||
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }),
|
||||
mx);
|
||||
mx,
|
||||
historian);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -77,6 +80,7 @@ public static class Program
|
||||
try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); }
|
||||
finally
|
||||
{
|
||||
(backend as IDisposable)?.Dispose();
|
||||
mx?.Dispose();
|
||||
pump?.Dispose();
|
||||
}
|
||||
@@ -91,4 +95,45 @@ public static class Program
|
||||
}
|
||||
finally { Log.CloseAndFlush(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds a <see cref="HistorianDataSource"/> from the OTOPCUA_HISTORIAN_* environment
|
||||
/// variables the supervisor passes at spawn time. Returns null when the historian is
|
||||
/// disabled (default) so <c>MxAccessGalaxyBackend.HistoryReadAsync</c> returns a clear
|
||||
/// "not configured" error instead of attempting an SDK connection to localhost.
|
||||
/// </summary>
|
||||
private static IHistorianDataSource? BuildHistorianIfEnabled()
|
||||
{
|
||||
var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED");
|
||||
if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1")
|
||||
return null;
|
||||
|
||||
var cfg = new HistorianConfiguration
|
||||
{
|
||||
Enabled = true,
|
||||
ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
|
||||
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
|
||||
IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase),
|
||||
UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"),
|
||||
Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"),
|
||||
CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30),
|
||||
MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000),
|
||||
FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60),
|
||||
};
|
||||
|
||||
var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS");
|
||||
if (!string.IsNullOrWhiteSpace(servers))
|
||||
cfg.ServerNames = new System.Collections.Generic.List<string>(
|
||||
servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
|
||||
|
||||
Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}",
|
||||
cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port);
|
||||
return new HistorianDataSource(cfg);
|
||||
}
|
||||
|
||||
private static int TryParseInt(string envName, int defaultValue)
|
||||
{
|
||||
var raw = Environment.GetEnvironmentVariable(envName);
|
||||
return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,11 +30,43 @@
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Reference Include="ArchestrA.MxAccess">
|
||||
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
|
||||
<Private>true</Private>
|
||||
</Reference>
|
||||
<!-- Wonderware Historian SDK — consumed by Backend/Historian/ for HistoryReadAsync.
|
||||
Previously lived in the v1 Historian.Aveva plugin; folded into Driver.Galaxy.Host
|
||||
for PR #5 because this host is already Galaxy-specific. -->
|
||||
<Reference Include="aahClientManaged">
|
||||
<HintPath>..\..\lib\aahClientManaged.dll</HintPath>
|
||||
<EmbedInteropTypes>false</EmbedInteropTypes>
|
||||
</Reference>
|
||||
<Reference Include="aahClientCommon">
|
||||
<HintPath>..\..\lib\aahClientCommon.dll</HintPath>
|
||||
<EmbedInteropTypes>false</EmbedInteropTypes>
|
||||
</Reference>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<!-- Historian SDK native and satellite DLLs — staged beside the host exe so the
|
||||
aahClientManaged wrapper can P/Invoke into them without an AssemblyResolve hook. -->
|
||||
<None Include="..\..\lib\aahClient.dll">
|
||||
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||
</None>
|
||||
<None Include="..\..\lib\Historian.CBE.dll">
|
||||
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||
</None>
|
||||
<None Include="..\..\lib\Historian.DPAPI.dll">
|
||||
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||
</None>
|
||||
<None Include="..\..\lib\ArchestrA.CloudHistorian.Contract.dll">
|
||||
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||
</None>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||
{
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class HistorianClusterEndpointPickerTests
|
||||
{
|
||||
private static HistorianConfiguration Config(params string[] nodes) => new()
|
||||
{
|
||||
ServerName = "ignored",
|
||||
ServerNames = nodes.ToList(),
|
||||
FailureCooldownSeconds = 60,
|
||||
};
|
||||
|
||||
[Fact]
|
||||
public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty()
|
||||
{
|
||||
var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() };
|
||||
var p = new HistorianClusterEndpointPicker(cfg);
|
||||
p.NodeCount.ShouldBe(1);
|
||||
p.GetHealthyNodes().ShouldBe(new[] { "only-node" });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Failed_node_enters_cooldown_and_is_skipped()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
|
||||
|
||||
p.MarkFailed("a", "boom");
|
||||
p.GetHealthyNodes().ShouldBe(new[] { "b" });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Cooldown_expires_after_configured_window()
|
||||
{
|
||||
var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock);
|
||||
p.MarkFailed("a", "boom");
|
||||
p.GetHealthyNodes().ShouldBe(new[] { "b" });
|
||||
|
||||
clock = clock.AddSeconds(61);
|
||||
p.GetHealthyNodes().ShouldBe(new[] { "a", "b" });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MarkHealthy_immediately_clears_cooldown()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
|
||||
p.MarkFailed("a", "boom");
|
||||
p.GetHealthyNodes().ShouldBeEmpty();
|
||||
p.MarkHealthy("a");
|
||||
p.GetHealthyNodes().ShouldBe(new[] { "a" });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void All_nodes_in_cooldown_returns_empty_healthy_list()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
|
||||
p.MarkFailed("a", "x");
|
||||
p.MarkFailed("b", "y");
|
||||
p.GetHealthyNodes().ShouldBeEmpty();
|
||||
p.NodeCount.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Snapshot_reports_failure_count_and_last_error()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
|
||||
p.MarkFailed("a", "first");
|
||||
p.MarkFailed("a", "second");
|
||||
|
||||
var snap = p.SnapshotNodeStates().Single();
|
||||
snap.FailureCount.ShouldBe(2);
|
||||
snap.LastError.ShouldBe("second");
|
||||
snap.IsHealthy.ShouldBeFalse();
|
||||
snap.CooldownUntil.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Duplicate_hostnames_are_deduplicated_case_insensitively()
|
||||
{
|
||||
var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB"));
|
||||
p.NodeCount.ShouldBe(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||
{
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class HistorianWiringTests
|
||||
{
|
||||
/// <summary>
|
||||
/// When the Proxy sends a HistoryRead but the supervisor never enabled the historian
|
||||
/// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a
|
||||
/// self-explanatory error — not an exception or a hang against localhost.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured()
|
||||
{
|
||||
using var pump = new StaPump("Test.Sta");
|
||||
await pump.WaitForStartedAsync();
|
||||
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
|
||||
using var backend = new MxAccessGalaxyBackend(
|
||||
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||
mx,
|
||||
historian: null);
|
||||
|
||||
var resp = await backend.HistoryReadAsync(new HistoryReadRequest
|
||||
{
|
||||
TagReferences = new[] { "TestTag" },
|
||||
StartUtcUnixMs = 0,
|
||||
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
MaxValuesPerTag = 100,
|
||||
}, CancellationToken.None);
|
||||
|
||||
resp.Success.ShouldBeFalse();
|
||||
resp.Error.ShouldContain("Historian disabled");
|
||||
resp.Tags.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// When the historian is wired up, we expect the backend to call through and map
|
||||
/// samples onto the IPC wire shape. Uses a fake <see cref="IHistorianDataSource"/>
|
||||
/// that returns a single known-good sample so we can assert the mapping stays sane.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue()
|
||||
{
|
||||
using var pump = new StaPump("Test.Sta");
|
||||
await pump.WaitForStartedAsync();
|
||||
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
|
||||
var fake = new FakeHistorianDataSource(new HistorianSample
|
||||
{
|
||||
Value = 42.5,
|
||||
Quality = 192, // Good
|
||||
TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc),
|
||||
});
|
||||
using var backend = new MxAccessGalaxyBackend(
|
||||
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||
mx,
|
||||
fake);
|
||||
|
||||
var resp = await backend.HistoryReadAsync(new HistoryReadRequest
|
||||
{
|
||||
TagReferences = new[] { "TankLevel" },
|
||||
StartUtcUnixMs = 0,
|
||||
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
MaxValuesPerTag = 100,
|
||||
}, CancellationToken.None);
|
||||
|
||||
resp.Success.ShouldBeTrue();
|
||||
resp.Tags.Length.ShouldBe(1);
|
||||
resp.Tags[0].TagReference.ShouldBe("TankLevel");
|
||||
resp.Tags[0].Values.Length.ShouldBe(1);
|
||||
resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good
|
||||
resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull();
|
||||
resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe(
|
||||
new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||
}
|
||||
|
||||
private sealed class FakeHistorianDataSource : IHistorianDataSource
|
||||
{
|
||||
private readonly HistorianSample _sample;
|
||||
public FakeHistorianDataSource(HistorianSample sample) => _sample = sample;
|
||||
|
||||
public Task<List<HistorianSample>> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||
=> Task.FromResult(new List<HistorianSample> { _sample });
|
||||
|
||||
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
|
||||
=> Task.FromResult(new List<HistorianAggregateSample>());
|
||||
|
||||
public Task<List<HistorianSample>> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct)
|
||||
=> Task.FromResult(new List<HistorianSample>());
|
||||
|
||||
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||
=> Task.FromResult(new List<HistorianEventDto>());
|
||||
|
||||
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
||||
public void Dispose() { }
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user