Phase 2 PR 5 — port Wonderware Historian SDK into Driver.Galaxy.Host/Backend/Historian/. The full v1 Historian.Aveva code path (HistorianDataSource + HistorianClusterEndpointPicker + IHistorianConnectionFactory + SdkHistorianConnectionFactory) now lives inside Galaxy.Host instead of the previously-required out-of-tree plugin + HistorianPluginLoader AssemblyResolve hack, and MxAccessGalaxyBackend.HistoryReadAsync — which previously returned a Phase 2 Task B.1.h follow-up placeholder — now delegates to the ported HistorianDataSource.ReadRawAsync, maps HistorianSample to GalaxyDataValue via the IPC wire shape, and reports Success=true with per-tag HistoryTagValues arrays. OPC-UA-free surface inside Galaxy.Host: the v1 code returned Opc.Ua.DataValue on the hot path, which would have required dragging OPCFoundation.NetStandard.Opc.Ua.Server into net48 x86 Galaxy.Host and bleeding OPC types across the IPC boundary — instead, the port introduces HistorianSample (Value, Quality byte, TimestampUtc) + HistorianAggregateSample (Value, TimestampUtc) POCOs that carry the raw MX quality byte through the pipe unchanged, and the OPC translation happens on the Proxy side via the existing QualityMapper that the live-read path already uses. Decision #13's IPC data-shape contract survives intact — GalaxyDataValue (TagReference + ValueBytes MessagePack + ValueMessagePackType + StatusCode + SourceTimestampUtcUnixMs + ServerTimestampUtcUnixMs) — so no Shared.Contracts wire break vs PR 4. Cluster failover preserved verbatim: HistorianClusterEndpointPicker is the thread-safe pure-logic picker ported verbatim with no SDK dependency (injected DateTime clock, per-node cooldown state, unknown-node-name tolerance, case-insensitive de-dup on configuration-order list), ConnectToAnyHealthyNode iterates the picker's healthy candidates, clones config per attempt, calls the factory, marks healthy on success / failed on exception with the failure message stored for dashboard surfacing, throws "All N healthy historian candidate(s) failed" with the last exception chained when every node exhausts. Process path + Event path use separate HistorianAccess connections (CreateHistoryQuery vs CreateEventQuery vs CreateAnalogSummaryQuery on the SDK surface) guarded by independent _connection/_eventConnection locks — a mid-query failure on one silo resets only that connection, the other stays open. Four SDK paths ported: ReadRawAsync (RetrievalMode.Full, BatchSize from config.MaxValuesPerRead, MoveNext pump, per-sample quality + value decode with the StringValue/Value fallback the v1 code did, limit-based early exit), ReadAggregateAsync (AnalogSummaryQuery + Resolution in ms, ExtractAggregateValue maps Average/Minimum/Maximum/ValueCount/First/Last/StdDev column names — the NodeId to column mapping is moved to the Proxy side since the IPC request carries a string column), ReadAtTimeAsync (per-timestamp HistoryQuery with RetrievalMode.Interpolated + BatchSize=1, returns Quality=0 / Value=null for missing samples), ReadEventsAsync (EventQuery + AddEventFilter("Source",Equal,sourceName) when sourceName is non-null, EventOrder.Ascending, EventCount = maxEvents or config.MaxValuesPerRead); GetHealthSnapshot returns the full runtime-health snapshot (TotalQueries/Successes/Failures + ConsecutiveFailures + LastSuccess/FailureTime + LastError + ProcessConnectionOpen/EventConnectionOpen + ActiveProcessNode/ActiveEventNode + per-node state list). ReadRaw is the only path wired through IPC in PR 5 (HistoryReadRequest/HistoryTagValues/HistoryReadResponse already existed in Shared.Contracts); Aggregate/AtTime/Events/Health are ported-but-not-yet-IPC-exposed — they stay internal to Galaxy.Host for PR 6+ to surface via new contract message kinds (aggregate = OPC UA HistoryReadProcessed, at-time = HistoryReadAtTime, events = HistoryReadEvents, health = admin dashboard IPC query). Galaxy.Host csproj gains aahClientManaged + aahClientCommon references with Private=false (managed wrappers) + None items for aahClient.dll + Historian.CBE.dll + Historian.DPAPI.dll + ArchestrA.CloudHistorian.Contract.dll native satellites staged alongside the host exe via CopyToOutputDirectory=PreserveNewest so aahClientManaged can P/Invoke into them at runtime without an AssemblyResolve hook (cleaner than the v1 HistorianPluginLoader.cs 180-LOC AssemblyResolve + Assembly.LoadFrom dance that existed solely because the plugin was loaded late from Host/bin/Debug/net48/Historian/). Program.cs adds BuildHistorianIfEnabled() that reads OTOPCUA_HISTORIAN_ENABLED (true or 1) + OTOPCUA_HISTORIAN_SERVER + OTOPCUA_HISTORIAN_SERVERS (comma-separated cluster list overrides single-server) + OTOPCUA_HISTORIAN_PORT (default 32568) + OTOPCUA_HISTORIAN_INTEGRATED (default true) + OTOPCUA_HISTORIAN_USER/OTOPCUA_HISTORIAN_PASS + OTOPCUA_HISTORIAN_TIMEOUT_SEC (30) + OTOPCUA_HISTORIAN_MAX_VALUES (10000) + OTOPCUA_HISTORIAN_COOLDOWN_SEC (60), returns null when disabled so MxAccessGalaxyBackend.HistoryReadAsync surfaces a clean "Historian disabled" Success=false instead of a localhost-SDK hang; server.RunAsync finally block now also casts backend to IDisposable.Dispose() so the historian SDK connections get cleanly closed on Ctrl+C. MxAccessGalaxyBackend gains an IHistorianDataSource? historian constructor parameter (defaults null to preserve existing Host.Tests call sites that don't exercise HistoryRead), implements IDisposable that forwards to _historian.Dispose(), and the pragma warning disable CS0618 is locally scoped to the ToDto(HistorianEvent) mapper since the SDK marks Id/Source/DisplayText/Severity obsolete but the replacement surface isn't available in the aahClientManaged version we bind against — every other deprecated-SDK use still surfaces as an error under TreatWarningsAsErrors. Ported from v1 Historian.Aveva unchanged: the CloneConfigWithServerName helper that preserves every config field except ServerName per attempt; the double-checked locking in EnsureConnected/EnsureEventConnected (fast path = Volatile.Read outside lock, slow path acquires lock + re-checks + disposes any raced-in-parallel connection); HandleConnectionError/HandleEventConnectionError that close the dead connection, clear the active-node tracker, MarkFailed the picker entry with the exception message so the node enters cooldown, and log the reset with node= for operator correlation; RecordSuccess/RecordFailure that bump counters under _healthLock. Tests: HistorianClusterEndpointPickerTests (7 cases) — single-node ServerName fallback when ServerNames empty, MarkFailed enters cooldown and skips, cooldown expires after window, MarkHealthy immediately clears, all-in-cooldown returns empty healthy list, Snapshot reports failure count + last error + IsHealthy, case-insensitive de-dup on duplicate hostnames. HistorianWiringTests (2 cases) — HistoryReadAsync returns "Historian disabled" Success=false when historian:null passed; HistoryReadAsync with a fake IHistorianDataSource maps the returned HistorianSample (Value=42.5, Quality=192 Good, Timestamp) to a GalaxyDataValue with StatusCode=0u + SourceTimestampUtcUnixMs matching the sample + MessagePack-encoded value bytes. InternalsVisibleTo("...Host.Tests") added to Galaxy.Host.csproj so tests can reach the internal HistorianClusterEndpointPicker. Full Galaxy.Host.Tests suite: 24 pass / 0 fail (9 new historian + 15 pre-existing MemoryWatchdog/PostMortemMmf/RecyclePolicy/StaPump/EndToEndIpc/Handshake). Full solution build: 0 errors (202 pre-existing warnings). The v1 Historian.Aveva project + Historian.Aveva.Tests still build intact because the archive PR (Stream D.1 destructive delete) is still ahead of us — PR 5 intentionally does not delete either; once PR 2+3 merge and the archive-delete PR lands, a follow-up cleanup can remove Historian.Aveva + its 4 source files + 18 test cases. Alarm subsystem wire-up (OnAlarmEvent raising from MxAccessGalaxyBackend via AlarmExtension primitives) + host-status push (OnHostStatusChanged via a ported GalaxyRuntimeProbeManager) remain PR 6 candidates; they were on the same "Task B.1.h follow-up" list and share the IPC connection-sink wiring with the historian events path — it made PR 5 scope-manageable to do Historian first since that's what has the biggest surface area (981 LOC v1 plus SDK binding) and alarms/host-status have more bespoke integration with the existing MxAccess subscription fan-out.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,621 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using StringCollection = System.Collections.Specialized.StringCollection;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ArchestrA;
|
||||
using Serilog;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||
{
|
||||
/// <summary>
|
||||
/// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
|
||||
/// OPC-UA-free — emits <see cref="HistorianSample"/>/<see cref="HistorianAggregateSample"/>
|
||||
/// which the Proxy maps to OPC UA <c>DataValue</c> on its side of the IPC.
|
||||
/// </summary>
|
||||
public sealed class HistorianDataSource : IHistorianDataSource
|
||||
{
|
||||
private static readonly ILogger Log = Serilog.Log.ForContext<HistorianDataSource>();
|
||||
|
||||
private readonly HistorianConfiguration _config;
|
||||
private readonly object _connectionLock = new object();
|
||||
private readonly object _eventConnectionLock = new object();
|
||||
private readonly IHistorianConnectionFactory _factory;
|
||||
private HistorianAccess? _connection;
|
||||
private HistorianAccess? _eventConnection;
|
||||
private bool _disposed;
|
||||
|
||||
private readonly object _healthLock = new object();
|
||||
private long _totalSuccesses;
|
||||
private long _totalFailures;
|
||||
private int _consecutiveFailures;
|
||||
private DateTime? _lastSuccessTime;
|
||||
private DateTime? _lastFailureTime;
|
||||
private string? _lastError;
|
||||
private string? _activeProcessNode;
|
||||
private string? _activeEventNode;
|
||||
|
||||
private readonly HistorianClusterEndpointPicker _picker;
|
||||
|
||||
public HistorianDataSource(HistorianConfiguration config)
|
||||
: this(config, new SdkHistorianConnectionFactory(), null) { }
|
||||
|
||||
internal HistorianDataSource(
|
||||
HistorianConfiguration config,
|
||||
IHistorianConnectionFactory factory,
|
||||
HistorianClusterEndpointPicker? picker = null)
|
||||
{
|
||||
_config = config;
|
||||
_factory = factory;
|
||||
_picker = picker ?? new HistorianClusterEndpointPicker(config);
|
||||
}
|
||||
|
||||
private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type)
|
||||
{
|
||||
var candidates = _picker.GetHealthyNodes();
|
||||
if (candidates.Count == 0)
|
||||
{
|
||||
var total = _picker.NodeCount;
|
||||
throw new InvalidOperationException(
|
||||
total == 0
|
||||
? "No historian nodes configured"
|
||||
: $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to");
|
||||
}
|
||||
|
||||
Exception? lastException = null;
|
||||
foreach (var node in candidates)
|
||||
{
|
||||
var attemptConfig = CloneConfigWithServerName(node);
|
||||
try
|
||||
{
|
||||
var conn = _factory.CreateAndConnect(attemptConfig, type);
|
||||
_picker.MarkHealthy(node);
|
||||
return (conn, node);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_picker.MarkFailed(node, ex.Message);
|
||||
lastException = ex;
|
||||
Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node);
|
||||
}
|
||||
}
|
||||
|
||||
var inner = lastException?.Message ?? "(no detail)";
|
||||
throw new InvalidOperationException(
|
||||
$"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}",
|
||||
lastException);
|
||||
}
|
||||
|
||||
private HistorianConfiguration CloneConfigWithServerName(string serverName)
|
||||
{
|
||||
return new HistorianConfiguration
|
||||
{
|
||||
Enabled = _config.Enabled,
|
||||
ServerName = serverName,
|
||||
ServerNames = _config.ServerNames,
|
||||
FailureCooldownSeconds = _config.FailureCooldownSeconds,
|
||||
IntegratedSecurity = _config.IntegratedSecurity,
|
||||
UserName = _config.UserName,
|
||||
Password = _config.Password,
|
||||
Port = _config.Port,
|
||||
CommandTimeoutSeconds = _config.CommandTimeoutSeconds,
|
||||
MaxValuesPerRead = _config.MaxValuesPerRead
|
||||
};
|
||||
}
|
||||
|
||||
public HistorianHealthSnapshot GetHealthSnapshot()
|
||||
{
|
||||
var nodeStates = _picker.SnapshotNodeStates();
|
||||
var healthyCount = 0;
|
||||
foreach (var n in nodeStates)
|
||||
if (n.IsHealthy) healthyCount++;
|
||||
|
||||
lock (_healthLock)
|
||||
{
|
||||
return new HistorianHealthSnapshot
|
||||
{
|
||||
TotalQueries = _totalSuccesses + _totalFailures,
|
||||
TotalSuccesses = _totalSuccesses,
|
||||
TotalFailures = _totalFailures,
|
||||
ConsecutiveFailures = _consecutiveFailures,
|
||||
LastSuccessTime = _lastSuccessTime,
|
||||
LastFailureTime = _lastFailureTime,
|
||||
LastError = _lastError,
|
||||
ProcessConnectionOpen = Volatile.Read(ref _connection) != null,
|
||||
EventConnectionOpen = Volatile.Read(ref _eventConnection) != null,
|
||||
ActiveProcessNode = _activeProcessNode,
|
||||
ActiveEventNode = _activeEventNode,
|
||||
NodeCount = nodeStates.Count,
|
||||
HealthyNodeCount = healthyCount,
|
||||
Nodes = nodeStates
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private void RecordSuccess()
|
||||
{
|
||||
lock (_healthLock)
|
||||
{
|
||||
_totalSuccesses++;
|
||||
_lastSuccessTime = DateTime.UtcNow;
|
||||
_consecutiveFailures = 0;
|
||||
_lastError = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void RecordFailure(string error)
|
||||
{
|
||||
lock (_healthLock)
|
||||
{
|
||||
_totalFailures++;
|
||||
_lastFailureTime = DateTime.UtcNow;
|
||||
_consecutiveFailures++;
|
||||
_lastError = error;
|
||||
}
|
||||
}
|
||||
|
||||
private void EnsureConnected()
|
||||
{
|
||||
if (_disposed)
|
||||
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||
|
||||
if (Volatile.Read(ref _connection) != null) return;
|
||||
|
||||
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process);
|
||||
|
||||
lock (_connectionLock)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
conn.CloseConnection(out _);
|
||||
conn.Dispose();
|
||||
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||
}
|
||||
|
||||
if (_connection != null)
|
||||
{
|
||||
conn.CloseConnection(out _);
|
||||
conn.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
_connection = conn;
|
||||
lock (_healthLock) _activeProcessNode = winningNode;
|
||||
Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port);
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleConnectionError(Exception? ex = null)
|
||||
{
|
||||
lock (_connectionLock)
|
||||
{
|
||||
if (_connection == null) return;
|
||||
|
||||
try
|
||||
{
|
||||
_connection.CloseConnection(out _);
|
||||
_connection.Dispose();
|
||||
}
|
||||
catch (Exception disposeEx)
|
||||
{
|
||||
Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
|
||||
}
|
||||
|
||||
_connection = null;
|
||||
string? failedNode;
|
||||
lock (_healthLock)
|
||||
{
|
||||
failedNode = _activeProcessNode;
|
||||
_activeProcessNode = null;
|
||||
}
|
||||
|
||||
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
|
||||
Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)");
|
||||
}
|
||||
}
|
||||
|
||||
private void EnsureEventConnected()
|
||||
{
|
||||
if (_disposed)
|
||||
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||
|
||||
if (Volatile.Read(ref _eventConnection) != null) return;
|
||||
|
||||
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event);
|
||||
|
||||
lock (_eventConnectionLock)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
conn.CloseConnection(out _);
|
||||
conn.Dispose();
|
||||
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||
}
|
||||
|
||||
if (_eventConnection != null)
|
||||
{
|
||||
conn.CloseConnection(out _);
|
||||
conn.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
_eventConnection = conn;
|
||||
lock (_healthLock) _activeEventNode = winningNode;
|
||||
Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port);
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleEventConnectionError(Exception? ex = null)
|
||||
{
|
||||
lock (_eventConnectionLock)
|
||||
{
|
||||
if (_eventConnection == null) return;
|
||||
|
||||
try
|
||||
{
|
||||
_eventConnection.CloseConnection(out _);
|
||||
_eventConnection.Dispose();
|
||||
}
|
||||
catch (Exception disposeEx)
|
||||
{
|
||||
Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
|
||||
}
|
||||
|
||||
_eventConnection = null;
|
||||
string? failedNode;
|
||||
lock (_healthLock)
|
||||
{
|
||||
failedNode = _activeEventNode;
|
||||
_activeEventNode = null;
|
||||
}
|
||||
|
||||
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
|
||||
Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)");
|
||||
}
|
||||
}
|
||||
|
||||
public Task<List<HistorianSample>> ReadRawAsync(
|
||||
string tagName, DateTime startTime, DateTime endTime, int maxValues,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var results = new List<HistorianSample>();
|
||||
|
||||
try
|
||||
{
|
||||
EnsureConnected();
|
||||
|
||||
using var query = _connection!.CreateHistoryQuery();
|
||||
var args = new HistoryQueryArgs
|
||||
{
|
||||
TagNames = new StringCollection { tagName },
|
||||
StartDateTime = startTime,
|
||||
EndDateTime = endTime,
|
||||
RetrievalMode = HistorianRetrievalMode.Full
|
||||
};
|
||||
|
||||
if (maxValues > 0)
|
||||
args.BatchSize = (uint)maxValues;
|
||||
else if (_config.MaxValuesPerRead > 0)
|
||||
args.BatchSize = (uint)_config.MaxValuesPerRead;
|
||||
|
||||
if (!query.StartQuery(args, out var error))
|
||||
{
|
||||
Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
|
||||
RecordFailure($"raw StartQuery: {error.ErrorCode}");
|
||||
HandleConnectionError();
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
var count = 0;
|
||||
var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
|
||||
|
||||
while (query.MoveNext(out error))
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
var result = query.QueryResult;
|
||||
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
|
||||
|
||||
object? value;
|
||||
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||
value = result.StringValue;
|
||||
else
|
||||
value = result.Value;
|
||||
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = value,
|
||||
TimestampUtc = timestamp,
|
||||
Quality = (byte)(result.OpcQuality & 0xFF),
|
||||
});
|
||||
|
||||
count++;
|
||||
if (limit > 0 && count >= limit) break;
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
RecordSuccess();
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (ObjectDisposedException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
|
||||
RecordFailure($"raw: {ex.Message}");
|
||||
HandleConnectionError(ex);
|
||||
}
|
||||
|
||||
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
|
||||
tagName, results.Count, startTime, endTime);
|
||||
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(
|
||||
string tagName, DateTime startTime, DateTime endTime,
|
||||
double intervalMs, string aggregateColumn,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var results = new List<HistorianAggregateSample>();
|
||||
|
||||
try
|
||||
{
|
||||
EnsureConnected();
|
||||
|
||||
using var query = _connection!.CreateAnalogSummaryQuery();
|
||||
var args = new AnalogSummaryQueryArgs
|
||||
{
|
||||
TagNames = new StringCollection { tagName },
|
||||
StartDateTime = startTime,
|
||||
EndDateTime = endTime,
|
||||
Resolution = (ulong)intervalMs
|
||||
};
|
||||
|
||||
if (!query.StartQuery(args, out var error))
|
||||
{
|
||||
Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
|
||||
RecordFailure($"aggregate StartQuery: {error.ErrorCode}");
|
||||
HandleConnectionError();
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
while (query.MoveNext(out error))
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
var result = query.QueryResult;
|
||||
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
|
||||
var value = ExtractAggregateValue(result, aggregateColumn);
|
||||
|
||||
results.Add(new HistorianAggregateSample
|
||||
{
|
||||
Value = value,
|
||||
TimestampUtc = timestamp,
|
||||
});
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
RecordSuccess();
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (ObjectDisposedException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
|
||||
RecordFailure($"aggregate: {ex.Message}");
|
||||
HandleConnectionError(ex);
|
||||
}
|
||||
|
||||
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
|
||||
aggregateColumn, tagName, results.Count);
|
||||
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
public Task<List<HistorianSample>> ReadAtTimeAsync(
|
||||
string tagName, DateTime[] timestamps,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var results = new List<HistorianSample>();
|
||||
|
||||
if (timestamps == null || timestamps.Length == 0)
|
||||
return Task.FromResult(results);
|
||||
|
||||
try
|
||||
{
|
||||
EnsureConnected();
|
||||
|
||||
foreach (var timestamp in timestamps)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
using var query = _connection!.CreateHistoryQuery();
|
||||
var args = new HistoryQueryArgs
|
||||
{
|
||||
TagNames = new StringCollection { tagName },
|
||||
StartDateTime = timestamp,
|
||||
EndDateTime = timestamp,
|
||||
RetrievalMode = HistorianRetrievalMode.Interpolated,
|
||||
BatchSize = 1
|
||||
};
|
||||
|
||||
if (!query.StartQuery(args, out var error))
|
||||
{
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = null,
|
||||
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||
Quality = 0, // Bad
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (query.MoveNext(out error))
|
||||
{
|
||||
var result = query.QueryResult;
|
||||
object? value;
|
||||
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||
value = result.StringValue;
|
||||
else
|
||||
value = result.Value;
|
||||
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = value,
|
||||
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||
Quality = (byte)(result.OpcQuality & 0xFF),
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = null,
|
||||
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||
Quality = 0,
|
||||
});
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
}
|
||||
RecordSuccess();
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (ObjectDisposedException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
|
||||
RecordFailure($"at-time: {ex.Message}");
|
||||
HandleConnectionError(ex);
|
||||
}
|
||||
|
||||
Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
|
||||
tagName, results.Count, timestamps.Length);
|
||||
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
public Task<List<HistorianEventDto>> ReadEventsAsync(
|
||||
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var results = new List<HistorianEventDto>();
|
||||
|
||||
try
|
||||
{
|
||||
EnsureEventConnected();
|
||||
|
||||
using var query = _eventConnection!.CreateEventQuery();
|
||||
var args = new EventQueryArgs
|
||||
{
|
||||
StartDateTime = startTime,
|
||||
EndDateTime = endTime,
|
||||
EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
|
||||
QueryType = HistorianEventQueryType.Events,
|
||||
EventOrder = HistorianEventOrder.Ascending
|
||||
};
|
||||
|
||||
if (!string.IsNullOrEmpty(sourceName))
|
||||
{
|
||||
query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
|
||||
}
|
||||
|
||||
if (!query.StartQuery(args, out var error))
|
||||
{
|
||||
Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode);
|
||||
RecordFailure($"events StartQuery: {error.ErrorCode}");
|
||||
HandleEventConnectionError();
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
var count = 0;
|
||||
while (query.MoveNext(out error))
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
results.Add(ToDto(query.QueryResult));
|
||||
count++;
|
||||
if (maxEvents > 0 && count >= maxEvents) break;
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
RecordSuccess();
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (ObjectDisposedException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
|
||||
RecordFailure($"events: {ex.Message}");
|
||||
HandleEventConnectionError(ex);
|
||||
}
|
||||
|
||||
Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
|
||||
sourceName ?? "(all)", results.Count, startTime, endTime);
|
||||
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
private static HistorianEventDto ToDto(HistorianEvent evt)
|
||||
{
|
||||
// The ArchestrA SDK marks these properties obsolete but still returns them; their
|
||||
// successors aren't wired in the version we bind against. Using them is the documented
|
||||
// v1 behavior — suppressed locally instead of project-wide so any non-event use of
|
||||
// deprecated SDK surface still surfaces as an error.
|
||||
#pragma warning disable CS0618
|
||||
return new HistorianEventDto
|
||||
{
|
||||
Id = evt.Id,
|
||||
Source = evt.Source,
|
||||
EventTime = evt.EventTime,
|
||||
ReceivedTime = evt.ReceivedTime,
|
||||
DisplayText = evt.DisplayText,
|
||||
Severity = (ushort)evt.Severity
|
||||
};
|
||||
#pragma warning restore CS0618
|
||||
}
|
||||
|
||||
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
|
||||
{
|
||||
switch (column)
|
||||
{
|
||||
case "Average": return result.Average;
|
||||
case "Minimum": return result.Minimum;
|
||||
case "Maximum": return result.Maximum;
|
||||
case "ValueCount": return result.ValueCount;
|
||||
case "First": return result.First;
|
||||
case "Last": return result.Last;
|
||||
case "StdDev": return result.StdDev;
|
||||
default: return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
|
||||
try
|
||||
{
|
||||
_connection?.CloseConnection(out _);
|
||||
_connection?.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Error closing Historian SDK connection");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
_eventConnection?.CloseConnection(out _);
|
||||
_eventConnection?.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Error closing Historian SDK event connection");
|
||||
}
|
||||
|
||||
_connection = null;
|
||||
_eventConnection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user