using System;
using System.Collections.Generic;
using StringCollection = System.Collections.Specialized.StringCollection;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
{
///
/// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
/// OPC-UA-free — emits /
/// which the sidecar serialises onto the named-pipe wire (PR 3.3 contracts) for the
/// .NET 10 WonderwareHistorianClient to translate into OPC UA DataValue
/// on its side of the IPC. The v1 Galaxy.Host / Proxy architecture this class
/// originally lived in retired in PR 7.2.
///
public sealed class HistorianDataSource : IHistorianDataSource
{
private static readonly ILogger Log = Serilog.Log.ForContext();
private readonly HistorianConfiguration _config;
private readonly object _connectionLock = new object();
private readonly object _eventConnectionLock = new object();
private readonly IHistorianConnectionFactory _factory;
private HistorianAccess? _connection;
private HistorianAccess? _eventConnection;
private bool _disposed;
private readonly object _healthLock = new object();
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
private DateTime? _lastSuccessTime;
private DateTime? _lastFailureTime;
private string? _lastError;
private string? _activeProcessNode;
private string? _activeEventNode;
private readonly HistorianClusterEndpointPicker _picker;
/// Initializes a new instance of the class with the default connection factory.
/// The historian configuration.
public HistorianDataSource(HistorianConfiguration config)
: this(config, new SdkHistorianConnectionFactory(), null) { }
/// Initializes a new instance of the class with the specified connection factory and endpoint picker.
/// The historian configuration.
/// The historian connection factory.
/// The optional cluster endpoint picker.
internal HistorianDataSource(
HistorianConfiguration config,
IHistorianConnectionFactory factory,
HistorianClusterEndpointPicker? picker = null)
{
_config = config;
_factory = factory;
_picker = picker ?? new HistorianClusterEndpointPicker(config);
}
// Error codes that signify the connection or server is the problem rather than the
// query itself. A query-class failure (bad tag name, unsupported aggregate, etc.) must
// not force us to tear down and re-open the (relatively expensive) historian
// connection — that would let a burst of bad-tag queries push an otherwise healthy
// cluster node into cooldown. See Driver.Historian.Wonderware-008.
private static readonly HashSet ConnectionErrorCodes =
new HashSet
{
HistorianAccessError.ErrorValue.FailedToConnect,
HistorianAccessError.ErrorValue.FailedToCreateSession,
HistorianAccessError.ErrorValue.NoReply,
HistorianAccessError.ErrorValue.NotReady,
HistorianAccessError.ErrorValue.NotInitialized,
HistorianAccessError.ErrorValue.Stopping,
HistorianAccessError.ErrorValue.Win32Exception,
HistorianAccessError.ErrorValue.InvalidResponse,
};
///
/// Whether an aahClientManaged error code indicates that the
/// connection (rather than the query payload) is the problem and the
/// shared SDK connection should therefore be reset. Internal for unit testing.
///
/// The historian access error code.
internal static bool IsConnectionClassError(HistorianAccessError.ErrorValue code)
=> ConnectionErrorCodes.Contains(code);
///
/// Builds the per-read linked into the
/// caller's and pre-wired to fire after
/// if positive. The
/// read paths use the resulting token in their ThrowIfCancellationRequested
/// checks so a hung StartQuery or slow MoveNext cannot block the
/// single pipe-server connection thread indefinitely. See
/// Driver.Historian.Wonderware-010.
///
/// The historian configuration.
/// The cancellation token.
internal static CancellationTokenSource BuildRequestCts(HistorianConfiguration cfg, CancellationToken ct)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
if (cfg.RequestTimeoutSeconds > 0)
{
cts.CancelAfter(TimeSpan.FromSeconds(cfg.RequestTimeoutSeconds));
}
return cts;
}
private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type)
{
var candidates = _picker.GetHealthyNodes();
if (candidates.Count == 0)
{
var total = _picker.NodeCount;
throw new InvalidOperationException(
total == 0
? "No historian nodes configured"
: $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to");
}
Exception? lastException = null;
foreach (var node in candidates)
{
var attemptConfig = CloneConfigWithServerName(node);
try
{
var conn = _factory.CreateAndConnect(attemptConfig, type);
_picker.MarkHealthy(node);
return (conn, node);
}
catch (Exception ex)
{
_picker.MarkFailed(node, ex.Message);
lastException = ex;
Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node);
}
}
var inner = lastException?.Message ?? "(no detail)";
throw new InvalidOperationException(
$"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}",
lastException);
}
private HistorianConfiguration CloneConfigWithServerName(string serverName)
{
return new HistorianConfiguration
{
Enabled = _config.Enabled,
ServerName = serverName,
ServerNames = _config.ServerNames,
FailureCooldownSeconds = _config.FailureCooldownSeconds,
IntegratedSecurity = _config.IntegratedSecurity,
UserName = _config.UserName,
Password = _config.Password,
Port = _config.Port,
CommandTimeoutSeconds = _config.CommandTimeoutSeconds,
MaxValuesPerRead = _config.MaxValuesPerRead
};
}
/// Gets a snapshot of the current health status.
public HistorianHealthSnapshot GetHealthSnapshot()
{
var nodeStates = _picker.SnapshotNodeStates();
var healthyCount = 0;
foreach (var n in nodeStates)
if (n.IsHealthy) healthyCount++;
// Driver.Historian.Wonderware-005: derive the connection-open booleans from the
// active-node strings, both of which live under _healthLock. _connection itself
// is published under _connectionLock — reading it here under a different lock
// could produce an internally inconsistent snapshot (open with no node, or
// closed with a non-null node) at the publish/clear boundary. Treating the
// active-node strings as the single source of truth makes the snapshot
// self-consistent by construction.
lock (_healthLock)
{
return new HistorianHealthSnapshot
{
TotalQueries = _totalSuccesses + _totalFailures,
TotalSuccesses = _totalSuccesses,
TotalFailures = _totalFailures,
ConsecutiveFailures = _consecutiveFailures,
LastSuccessTime = _lastSuccessTime,
LastFailureTime = _lastFailureTime,
LastError = _lastError,
ProcessConnectionOpen = _activeProcessNode != null,
EventConnectionOpen = _activeEventNode != null,
ActiveProcessNode = _activeProcessNode,
ActiveEventNode = _activeEventNode,
NodeCount = nodeStates.Count,
HealthyNodeCount = healthyCount,
Nodes = nodeStates
};
}
}
private void RecordSuccess()
{
lock (_healthLock)
{
_totalSuccesses++;
_lastSuccessTime = DateTime.UtcNow;
_consecutiveFailures = 0;
_lastError = null;
}
}
private void RecordFailure(string error)
{
lock (_healthLock)
{
_totalFailures++;
_lastFailureTime = DateTime.UtcNow;
_consecutiveFailures++;
_lastError = error;
}
}
private void EnsureConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
if (Volatile.Read(ref _connection) != null) return;
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process);
lock (_connectionLock)
{
if (_disposed)
{
conn.CloseConnection(out _);
conn.Dispose();
throw new ObjectDisposedException(nameof(HistorianDataSource));
}
if (_connection != null)
{
conn.CloseConnection(out _);
conn.Dispose();
return;
}
_connection = conn;
lock (_healthLock) _activeProcessNode = winningNode;
Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port);
}
}
private void HandleConnectionError(Exception? ex = null)
{
lock (_connectionLock)
{
if (_connection == null) return;
try
{
_connection.CloseConnection(out _);
_connection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
}
_connection = null;
string? failedNode;
lock (_healthLock)
{
failedNode = _activeProcessNode;
_activeProcessNode = null;
}
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)");
}
}
private void EnsureEventConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
if (Volatile.Read(ref _eventConnection) != null) return;
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event);
lock (_eventConnectionLock)
{
if (_disposed)
{
conn.CloseConnection(out _);
conn.Dispose();
throw new ObjectDisposedException(nameof(HistorianDataSource));
}
if (_eventConnection != null)
{
conn.CloseConnection(out _);
conn.Dispose();
return;
}
_eventConnection = conn;
lock (_healthLock) _activeEventNode = winningNode;
Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port);
}
}
///
/// Internal exception signalling that StartQuery returned an SDK error
/// whose code is query-class (bad tag name, unsupported aggregate, etc.)
/// and the shared SDK connection therefore must NOT be reset. The outer catch
/// re-throws this so the IPC frame handler surfaces Success=false without
/// touching the connection. See Driver.Historian.Wonderware-008.
///
internal sealed class QueryClassStartQueryException : InvalidOperationException
{
/// Gets the error code that caused the exception.
public HistorianAccessError.ErrorValue Code { get; }
/// Initializes a new instance of the class.
/// The exception message.
/// The historian access error code.
public QueryClassStartQueryException(string message, HistorianAccessError.ErrorValue code)
: base(message)
{
Code = code;
}
}
///
/// Centralised StartQuery-failure handler. Throws so the caller surfaces
/// Success=false in the IPC reply (the previous return-empty-with-success
/// behaviour made an SDK error look like "no data in range" to the client). The
/// connection is only reset when the error code is connection-class —
/// query-class failures (bad tag name, unsupported aggregate, etc.) must leave
/// the shared SDK connection intact, otherwise a burst of bad-tag queries cycles
/// the connection and pushes a healthy cluster node into cooldown.
/// See Driver.Historian.Wonderware-008.
///
private void HandleStartQueryFailure(
string operation, HistorianAccessError error, bool isEventConnection)
{
var code = error?.ErrorCode ?? HistorianAccessError.ErrorValue.Failure;
var description = error?.ErrorDescription ?? string.Empty;
var connectionClass = IsConnectionClassError(code);
Log.Warning(
"Historian SDK StartQuery failed: {Operation} -> {Code} ({Desc}) [{Kind}]",
operation, code, description,
connectionClass ? "connection-class" : "query-class");
RecordFailure($"{operation}: {code}");
var message = $"Historian SDK StartQuery failed for {operation}: {code} ({description})";
if (connectionClass)
{
if (isEventConnection) HandleEventConnectionError();
else HandleConnectionError();
throw new InvalidOperationException(message);
}
// Query-class — the outer catch block must NOT call HandleConnectionError on this.
throw new QueryClassStartQueryException(message, code);
}
private void HandleEventConnectionError(Exception? ex = null)
{
lock (_eventConnectionLock)
{
if (_eventConnection == null) return;
try
{
_eventConnection.CloseConnection(out _);
_eventConnection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
}
_eventConnection = null;
string? failedNode;
lock (_healthLock)
{
failedNode = _activeEventNode;
_activeEventNode = null;
}
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)");
}
}
/// Reads raw historical samples for the specified tag.
/// The tag name.
/// The start time for the query.
/// The end time for the query.
/// The maximum number of values to return.
/// Cancellation token for the operation.
public Task> ReadRawAsync(
string tagName, DateTime startTime, DateTime endTime, int maxValues,
CancellationToken ct = default)
{
var results = new List();
// Driver.Historian.Wonderware-010: wire RequestTimeoutSeconds into the read path
// so a hung StartQuery / slow MoveNext can't block the connection thread forever.
using var requestCts = BuildRequestCts(_config, ct);
var token = requestCts.Token;
try
{
EnsureConnected();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
RetrievalMode = HistorianRetrievalMode.Full
};
if (maxValues > 0)
args.BatchSize = (uint)maxValues;
else if (_config.MaxValuesPerRead > 0)
args.BatchSize = (uint)_config.MaxValuesPerRead;
if (!query.StartQuery(args, out var error))
{
HandleStartQueryFailure(
$"raw query for tag '{tagName}'", error, isEventConnection: false);
}
var count = 0;
var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
while (query.MoveNext(out error))
{
token.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
results.Add(new HistorianSample
{
Value = SelectValue(result),
TimestampUtc = timestamp,
Quality = (byte)(result.OpcQuality & 0xFF),
});
count++;
if (limit > 0 && count >= limit) break;
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (QueryClassStartQueryException)
{
// Query-class StartQuery failure — HandleStartQueryFailure already logged
// and recorded. Re-throw so the IPC layer surfaces Success=false instead of
// returning an empty list (which would look like "no data in range"). The
// connection is deliberately NOT reset. See Driver.Historian.Wonderware-008.
throw;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
RecordFailure($"raw: {ex.Message}");
HandleConnectionError(ex);
throw;
}
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
tagName, results.Count, startTime, endTime);
return Task.FromResult(results);
}
/// Reads aggregate historical samples for the specified tag.
/// The tag name.
/// The start time for the query.
/// The end time for the query.
/// The interval in milliseconds.
/// The aggregate column name.
/// Cancellation token for the operation.
public Task> ReadAggregateAsync(
string tagName, DateTime startTime, DateTime endTime,
double intervalMs, string aggregateColumn,
CancellationToken ct = default)
{
var results = new List();
// Driver.Historian.Wonderware-010: outer safety timeout — see ReadRawAsync.
using var requestCts = BuildRequestCts(_config, ct);
var token = requestCts.Token;
try
{
EnsureConnected();
using var query = _connection!.CreateAnalogSummaryQuery();
var args = new AnalogSummaryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
Resolution = (ulong)intervalMs
};
if (!query.StartQuery(args, out var error))
{
HandleStartQueryFailure(
$"aggregate query for tag '{tagName}'", error, isEventConnection: false);
}
// Apply the same bucket cap as the raw-read path so a wide time range with a
// small IntervalMs cannot produce an unbounded result set that would overflow
// the 16 MiB FrameWriter frame cap and lose the entire reply.
var bucketLimit = _config.MaxValuesPerRead;
var bucketCount = 0;
while (query.MoveNext(out error))
{
token.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
var value = ExtractAggregateValue(result, aggregateColumn);
results.Add(new HistorianAggregateSample
{
Value = value,
TimestampUtc = timestamp,
});
bucketCount++;
if (bucketLimit > 0 && bucketCount >= bucketLimit)
{
Log.Warning(
"HistoryRead aggregate ({Aggregate}): {Tag} truncated at {Limit} buckets — widen IntervalMs or reduce time range",
aggregateColumn, tagName, bucketLimit);
break;
}
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (QueryClassStartQueryException) { throw; } // see ReadRawAsync — keep connection
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
RecordFailure($"aggregate: {ex.Message}");
HandleConnectionError(ex);
throw;
}
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
aggregateColumn, tagName, results.Count);
return Task.FromResult(results);
}
/// Reads historical samples at specific timestamps for the specified tag.
/// The tag name.
/// The timestamps to read.
/// Cancellation token for the operation.
public Task> ReadAtTimeAsync(
string tagName, DateTime[] timestamps,
CancellationToken ct = default)
{
var results = new List();
if (timestamps == null || timestamps.Length == 0)
return Task.FromResult(results);
// Driver.Historian.Wonderware-010: outer safety timeout — see ReadRawAsync.
using var requestCts = BuildRequestCts(_config, ct);
var token = requestCts.Token;
try
{
EnsureConnected();
foreach (var timestamp in timestamps)
{
token.ThrowIfCancellationRequested();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = timestamp,
EndDateTime = timestamp,
RetrievalMode = HistorianRetrievalMode.Interpolated,
BatchSize = 1
};
if (!query.StartQuery(args, out var error))
{
results.Add(new HistorianSample
{
Value = null,
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = 0, // Bad
});
continue;
}
if (query.MoveNext(out error))
{
var result = query.QueryResult;
results.Add(new HistorianSample
{
Value = SelectValue(result),
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = (byte)(result.OpcQuality & 0xFF),
});
}
else
{
results.Add(new HistorianSample
{
Value = null,
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = 0,
});
}
query.EndQuery(out _);
}
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
RecordFailure($"at-time: {ex.Message}");
HandleConnectionError(ex);
throw;
}
Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
tagName, results.Count, timestamps.Length);
return Task.FromResult(results);
}
/// Reads historical events within the specified time range.
/// The optional event source name filter.
/// The start time for the query.
/// The end time for the query.
/// The maximum number of events to return.
/// Cancellation token for the operation.
public Task> ReadEventsAsync(
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
CancellationToken ct = default)
{
var results = new List();
// Driver.Historian.Wonderware-010: outer safety timeout — see ReadRawAsync.
using var requestCts = BuildRequestCts(_config, ct);
var token = requestCts.Token;
try
{
EnsureEventConnected();
using var query = _eventConnection!.CreateEventQuery();
var args = new EventQueryArgs
{
StartDateTime = startTime,
EndDateTime = endTime,
EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
QueryType = HistorianEventQueryType.Events,
EventOrder = HistorianEventOrder.Ascending
};
if (!string.IsNullOrEmpty(sourceName))
{
query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
}
if (!query.StartQuery(args, out var error))
{
HandleStartQueryFailure(
$"event query for source '{sourceName ?? "(all)"}'", error, isEventConnection: true);
}
var count = 0;
while (query.MoveNext(out error))
{
token.ThrowIfCancellationRequested();
results.Add(ToDto(query.QueryResult));
count++;
if (maxEvents > 0 && count >= maxEvents) break;
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (QueryClassStartQueryException) { throw; } // see ReadRawAsync — keep connection
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
RecordFailure($"events: {ex.Message}");
HandleEventConnectionError(ex);
throw;
}
Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
sourceName ?? "(all)", results.Count, startTime, endTime);
return Task.FromResult(results);
}
private static HistorianEventDto ToDto(HistorianEvent evt)
{
// The ArchestrA SDK marks these properties obsolete but still returns them; their
// successors aren't wired in the version we bind against. Using them is the documented
// v1 behavior — suppressed locally instead of project-wide so any non-event use of
// deprecated SDK surface still surfaces as an error.
#pragma warning disable CS0618
return new HistorianEventDto
{
Id = evt.Id,
Source = evt.Source,
EventTime = evt.EventTime,
ReceivedTime = evt.ReceivedTime,
DisplayText = evt.DisplayText,
Severity = (ushort)evt.Severity
};
#pragma warning restore CS0618
}
///
/// Selects the typed value from a row.
///
/// SDK limitation: HistoryQueryResult exposes only Value
/// (double) and StringValue (string) — there is no tag data-type field on
/// the result. The correct approach would be to branch on the tag's declared
/// data type, but the bound version of aahClientManaged does not surface
/// it per query result. The heuristic below is the best available: prefer
/// StringValue only when it is non-empty AND Value is zero,
/// because string tags in the Historian SDK always project to Value=0
/// while numeric tags may legitimately sample to zero (in which case the SDK
/// does not populate StringValue). A numeric tag at exactly zero with a
/// non-empty formatted StringValue (e.g. "0.00") would be mis-reported
/// as a string; this is a known edge case of the SDK binding.
///
///
/// The history query result.
internal static object? SelectValue(HistoryQueryResult result)
=> SelectValueFromPair(result.Value, result.StringValue);
///
/// SDK-independent overload of the string-vs-numeric heuristic. Exposed so unit
/// tests can pin the logic without having to instantiate the SDK
/// (whose internal property initialisers make
/// it impractical to fake). See Driver.Historian.Wonderware-012.
///
/// The numeric value.
/// The string value.
internal static object? SelectValueFromPair(double value, string? stringValue)
{
if (!string.IsNullOrEmpty(stringValue) && value == 0)
return stringValue;
return value;
}
/// Extracts the specified aggregate value from an analog summary query result.
/// The analog summary query result.
/// The aggregate column name.
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
{
switch (column)
{
case "Average": return result.Average;
case "Minimum": return result.Minimum;
case "Maximum": return result.Maximum;
case "ValueCount": return result.ValueCount;
case "First": return result.First;
case "Last": return result.Last;
case "StdDev": return result.StdDev;
default: return null;
}
}
/// Disposes the historian data source and releases its resources.
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection?.CloseConnection(out _);
_connection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK connection");
}
try
{
_eventConnection?.CloseConnection(out _);
_eventConnection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK event connection");
}
_connection = null;
_eventConnection = null;
}
}
}