Compare commits

...

2 Commits

Author SHA1 Message Date
Joseph Doherty
3717405aa6 Phase 2 PR 7 — wire IHistoryProvider.ReadProcessedAsync end-to-end. PR 5 ported HistorianDataSource.ReadAggregateAsync into Galaxy.Host but left it internal — GalaxyProxyDriver.ReadProcessedAsync still threw NotSupportedException, so OPC UA clients issuing HistoryReadProcessed requests against the v2 topology got rejected at the driver boundary. This PR closes that gap by adding two new Shared.Contracts messages (HistoryReadProcessedRequest/Response, MessageKind 0x62/0x63), routing them through GalaxyFrameHandler, implementing HistoryReadProcessedAsync on all three IGalaxyBackend implementations (Stub/DbBacked return the canonical "pending" Success=false, MxAccessGalaxyBackend delegates to _historian.ReadAggregateAsync), mapping HistorianAggregateSample → GalaxyDataValue at the IPC boundary (null bucket Value → BadNoData 0x800E0000u, otherwise Good 0u), and flipping GalaxyProxyDriver.ReadProcessedAsync from the NotSupported throw to a real IPC call with OPC UA HistoryAggregateType enum mapped to Wonderware AnalogSummary column name on the Proxy side (Average → "Average", Minimum → "Minimum", Maximum → "Maximum", Count → "ValueCount", Total → NotSupported since there's no direct SDK column for sum). Decision #13 IPC data-shape stays intact — HistoryReadProcessedResponse carries GalaxyDataValue[] with the same MessagePack value + OPC UA StatusCode + timestamps shape as the other history responses, so the Proxy's existing ToSnapshot helper handles the conversion without a new code path. MxAccessGalaxyBackend.HistoryReadProcessedAsync guards: null historian → "Historian disabled" (symmetric with HistoryReadAsync); IntervalMs <= 0 → "HistoryReadProcessed requires IntervalMs > 0" (prevents division-by-zero inside the SDK's Resolution parameter); exception during SDK call → Success=false Values=[] with the message so the Proxy surfaces it as InvalidOperationException with a clean error chain. Tests — HistoryReadProcessedTests (new, 4 cases): disabled-error when historian null, rejects zero interval, maps Good sample with Value=12.34 and the Proxy-supplied AggregateColumn + IntervalMs flow unchanged through to the fake IHistorianDataSource, maps null Value bucket to 0x800E0000u BadNoData with null ValueBytes. AggregateColumnMappingTests (new, 5 cases in Proxy.Tests): theory covers all 4 supported HistoryAggregateType enum values → correct column string, and asserts Total throws NotSupportedException with a message that steers callers to Average/Minimum/Maximum/Count (the SDK's AnalogSummaryQueryResult doesn't expose a sum column — the closest is Average × ValueCount which is the responsibility of a caller-side aggregation rather than an extra IPC round-trip). InternalsVisibleTo added to Galaxy.Proxy csproj so Proxy.Tests can reach the internal MapAggregateToColumn static. Builds — Galaxy.Host (net48 x86) + Galaxy.Proxy (net10) both 0 errors, full solution 201 warnings (pre-existing) / 0 errors. Test counts — Host.Tests Unit suite: 28 pass (4 new processed + 9 PR5 historian + 15 pre-existing); Proxy.Tests Unit suite: 14 pass (5 new column-mapping + 9 pre-existing). Deferred to a later PR — ReadAtTime + ReadEvents + Health IPC surfaces (HistorianDataSource has them ported in PR 5 but they need additional contract messages and would push this PR past a comfortable review size); the alarm subsystem wire-up (OnAlarmEvent raising from MxAccessGalaxyBackend) which overlaps the ReadEventsAsync IPC work since both pull from HistorianAccess.CreateEventQuery on the SDK side; the Proxy-side quality-byte refinement where HistorianDataSource's per-sample raw quality byte gets decoded through the existing QualityMapper instead of the category-only mapping in ToWire(HistorianSample) — doesn't change correctness today since Good/Uncertain/Bad categories are all the Admin UI and OPC UA clients surface, but richer OPC DA status codes (BadNotConnected, UncertainSubNormal, etc.) are available on the wire and the Proxy could promote them before handing DataValueSnapshot to ISubscribable consumers. This PR branches off phase-2-pr5-historian because it directly extends the Historian IPC surface added there; if PR 5 merges first PR 7 fast-forwards, otherwise it needs a rebase after PR 5 lands.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 05:53:01 -04:00
Joseph Doherty
6df1a79d35 Phase 2 PR 5 — port Wonderware Historian SDK into Driver.Galaxy.Host/Backend/Historian/. The full v1 Historian.Aveva code path (HistorianDataSource + HistorianClusterEndpointPicker + IHistorianConnectionFactory + SdkHistorianConnectionFactory) now lives inside Galaxy.Host instead of the previously-required out-of-tree plugin + HistorianPluginLoader AssemblyResolve hack, and MxAccessGalaxyBackend.HistoryReadAsync — which previously returned a Phase 2 Task B.1.h follow-up placeholder — now delegates to the ported HistorianDataSource.ReadRawAsync, maps HistorianSample to GalaxyDataValue via the IPC wire shape, and reports Success=true with per-tag HistoryTagValues arrays. OPC-UA-free surface inside Galaxy.Host: the v1 code returned Opc.Ua.DataValue on the hot path, which would have required dragging OPCFoundation.NetStandard.Opc.Ua.Server into net48 x86 Galaxy.Host and bleeding OPC types across the IPC boundary — instead, the port introduces HistorianSample (Value, Quality byte, TimestampUtc) + HistorianAggregateSample (Value, TimestampUtc) POCOs that carry the raw MX quality byte through the pipe unchanged, and the OPC translation happens on the Proxy side via the existing QualityMapper that the live-read path already uses. Decision #13's IPC data-shape contract survives intact — GalaxyDataValue (TagReference + ValueBytes MessagePack + ValueMessagePackType + StatusCode + SourceTimestampUtcUnixMs + ServerTimestampUtcUnixMs) — so no Shared.Contracts wire break vs PR 4. Cluster failover preserved verbatim: HistorianClusterEndpointPicker is the thread-safe pure-logic picker ported verbatim with no SDK dependency (injected DateTime clock, per-node cooldown state, unknown-node-name tolerance, case-insensitive de-dup on configuration-order list), ConnectToAnyHealthyNode iterates the picker's healthy candidates, clones config per attempt, calls the factory, marks healthy on success / failed on exception with the failure message stored for dashboard surfacing, throws "All N healthy historian candidate(s) failed" with the last exception chained when every node exhausts. Process path + Event path use separate HistorianAccess connections (CreateHistoryQuery vs CreateEventQuery vs CreateAnalogSummaryQuery on the SDK surface) guarded by independent _connection/_eventConnection locks — a mid-query failure on one silo resets only that connection, the other stays open. Four SDK paths ported: ReadRawAsync (RetrievalMode.Full, BatchSize from config.MaxValuesPerRead, MoveNext pump, per-sample quality + value decode with the StringValue/Value fallback the v1 code did, limit-based early exit), ReadAggregateAsync (AnalogSummaryQuery + Resolution in ms, ExtractAggregateValue maps Average/Minimum/Maximum/ValueCount/First/Last/StdDev column names — the NodeId to column mapping is moved to the Proxy side since the IPC request carries a string column), ReadAtTimeAsync (per-timestamp HistoryQuery with RetrievalMode.Interpolated + BatchSize=1, returns Quality=0 / Value=null for missing samples), ReadEventsAsync (EventQuery + AddEventFilter("Source",Equal,sourceName) when sourceName is non-null, EventOrder.Ascending, EventCount = maxEvents or config.MaxValuesPerRead); GetHealthSnapshot returns the full runtime-health snapshot (TotalQueries/Successes/Failures + ConsecutiveFailures + LastSuccess/FailureTime + LastError + ProcessConnectionOpen/EventConnectionOpen + ActiveProcessNode/ActiveEventNode + per-node state list). ReadRaw is the only path wired through IPC in PR 5 (HistoryReadRequest/HistoryTagValues/HistoryReadResponse already existed in Shared.Contracts); Aggregate/AtTime/Events/Health are ported-but-not-yet-IPC-exposed — they stay internal to Galaxy.Host for PR 6+ to surface via new contract message kinds (aggregate = OPC UA HistoryReadProcessed, at-time = HistoryReadAtTime, events = HistoryReadEvents, health = admin dashboard IPC query). Galaxy.Host csproj gains aahClientManaged + aahClientCommon references with Private=false (managed wrappers) + None items for aahClient.dll + Historian.CBE.dll + Historian.DPAPI.dll + ArchestrA.CloudHistorian.Contract.dll native satellites staged alongside the host exe via CopyToOutputDirectory=PreserveNewest so aahClientManaged can P/Invoke into them at runtime without an AssemblyResolve hook (cleaner than the v1 HistorianPluginLoader.cs 180-LOC AssemblyResolve + Assembly.LoadFrom dance that existed solely because the plugin was loaded late from Host/bin/Debug/net48/Historian/). Program.cs adds BuildHistorianIfEnabled() that reads OTOPCUA_HISTORIAN_ENABLED (true or 1) + OTOPCUA_HISTORIAN_SERVER + OTOPCUA_HISTORIAN_SERVERS (comma-separated cluster list overrides single-server) + OTOPCUA_HISTORIAN_PORT (default 32568) + OTOPCUA_HISTORIAN_INTEGRATED (default true) + OTOPCUA_HISTORIAN_USER/OTOPCUA_HISTORIAN_PASS + OTOPCUA_HISTORIAN_TIMEOUT_SEC (30) + OTOPCUA_HISTORIAN_MAX_VALUES (10000) + OTOPCUA_HISTORIAN_COOLDOWN_SEC (60), returns null when disabled so MxAccessGalaxyBackend.HistoryReadAsync surfaces a clean "Historian disabled" Success=false instead of a localhost-SDK hang; server.RunAsync finally block now also casts backend to IDisposable.Dispose() so the historian SDK connections get cleanly closed on Ctrl+C. MxAccessGalaxyBackend gains an IHistorianDataSource? historian constructor parameter (defaults null to preserve existing Host.Tests call sites that don't exercise HistoryRead), implements IDisposable that forwards to _historian.Dispose(), and the pragma warning disable CS0618 is locally scoped to the ToDto(HistorianEvent) mapper since the SDK marks Id/Source/DisplayText/Severity obsolete but the replacement surface isn't available in the aahClientManaged version we bind against — every other deprecated-SDK use still surfaces as an error under TreatWarningsAsErrors. Ported from v1 Historian.Aveva unchanged: the CloneConfigWithServerName helper that preserves every config field except ServerName per attempt; the double-checked locking in EnsureConnected/EnsureEventConnected (fast path = Volatile.Read outside lock, slow path acquires lock + re-checks + disposes any raced-in-parallel connection); HandleConnectionError/HandleEventConnectionError that close the dead connection, clear the active-node tracker, MarkFailed the picker entry with the exception message so the node enters cooldown, and log the reset with node= for operator correlation; RecordSuccess/RecordFailure that bump counters under _healthLock. Tests: HistorianClusterEndpointPickerTests (7 cases) — single-node ServerName fallback when ServerNames empty, MarkFailed enters cooldown and skips, cooldown expires after window, MarkHealthy immediately clears, all-in-cooldown returns empty healthy list, Snapshot reports failure count + last error + IsHealthy, case-insensitive de-dup on duplicate hostnames. HistorianWiringTests (2 cases) — HistoryReadAsync returns "Historian disabled" Success=false when historian:null passed; HistoryReadAsync with a fake IHistorianDataSource maps the returned HistorianSample (Value=42.5, Quality=192 Good, Timestamp) to a GalaxyDataValue with StatusCode=0u + SourceTimestampUtcUnixMs matching the sample + MessagePack-encoded value bytes. InternalsVisibleTo("...Host.Tests") added to Galaxy.Host.csproj so tests can reach the internal HistorianClusterEndpointPicker. Full Galaxy.Host.Tests suite: 24 pass / 0 fail (9 new historian + 15 pre-existing MemoryWatchdog/PostMortemMmf/RecyclePolicy/StaPump/EndToEndIpc/Handshake). Full solution build: 0 errors (202 pre-existing warnings). The v1 Historian.Aveva project + Historian.Aveva.Tests still build intact because the archive PR (Stream D.1 destructive delete) is still ahead of us — PR 5 intentionally does not delete either; once PR 2+3 merge and the archive-delete PR lands, a follow-up cleanup can remove Historian.Aveva + its 4 source files + 18 test cases. Alarm subsystem wire-up (OnAlarmEvent raising from MxAccessGalaxyBackend via AlarmExtension primitives) + host-status push (OnHostStatusChanged via a ported GalaxyRuntimeProbeManager) remain PR 6 candidates; they were on the same "Task B.1.h follow-up" list and share the IPC connection-sink wiring with the historian events path — it made PR 5 scope-manageable to do Historian first since that's what has the biggest surface area (981 LOC v1 plus SDK binding) and alarms/host-status have more bespoke integration with the existing MxAccess subscription fan-out.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 01:44:04 -04:00
24 changed files with 1679 additions and 11 deletions

View File

@@ -127,6 +127,15 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
Tags = System.Array.Empty<HistoryTagValues>(),
});
public Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadProcessedResponse
{
Success = false,
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });

View File

@@ -0,0 +1,129 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which
/// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands
/// out an ordered list of eligible candidates for the data source to try in sequence.
/// </summary>
internal sealed class HistorianClusterEndpointPicker
{
private readonly Func<DateTime> _clock;
private readonly TimeSpan _cooldown;
private readonly object _lock = new object();
private readonly List<NodeEntry> _nodes;
public HistorianClusterEndpointPicker(HistorianConfiguration config)
: this(config, () => DateTime.UtcNow) { }
internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func<DateTime> clock)
{
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds));
var names = (config.ServerNames != null && config.ServerNames.Count > 0)
? config.ServerNames
: new List<string> { config.ServerName };
_nodes = names
.Where(n => !string.IsNullOrWhiteSpace(n))
.Select(n => n.Trim())
.Distinct(StringComparer.OrdinalIgnoreCase)
.Select(n => new NodeEntry { Name = n })
.ToList();
}
public int NodeCount
{
get { lock (_lock) return _nodes.Count; }
}
public IReadOnlyList<string> GetHealthyNodes()
{
lock (_lock)
{
var now = _clock();
return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList();
}
}
public int HealthyNodeCount
{
get
{
lock (_lock)
{
var now = _clock();
return _nodes.Count(n => IsHealthyAt(n, now));
}
}
}
public void MarkFailed(string node, string? error)
{
lock (_lock)
{
var entry = FindEntry(node);
if (entry == null) return;
var now = _clock();
entry.FailureCount++;
entry.LastError = error;
entry.LastFailureTime = now;
entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null;
}
}
public void MarkHealthy(string node)
{
lock (_lock)
{
var entry = FindEntry(node);
if (entry == null) return;
entry.CooldownUntil = null;
}
}
public List<HistorianClusterNodeState> SnapshotNodeStates()
{
lock (_lock)
{
var now = _clock();
return _nodes.Select(n => new HistorianClusterNodeState
{
Name = n.Name,
IsHealthy = IsHealthyAt(n, now),
CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil,
FailureCount = n.FailureCount,
LastError = n.LastError,
LastFailureTime = n.LastFailureTime
}).ToList();
}
}
private static bool IsHealthyAt(NodeEntry entry, DateTime now)
{
return entry.CooldownUntil == null || entry.CooldownUntil <= now;
}
private NodeEntry? FindEntry(string node)
{
for (var i = 0; i < _nodes.Count; i++)
if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase))
return _nodes[i];
return null;
}
private sealed class NodeEntry
{
public string Name { get; set; } = "";
public DateTime? CooldownUntil { get; set; }
public int FailureCount { get; set; }
public string? LastError { get; set; }
public DateTime? LastFailureTime { get; set; }
}
}
}

View File

@@ -0,0 +1,18 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Point-in-time state of a single historian cluster node. One entry per configured node
/// appears inside <see cref="HistorianHealthSnapshot"/>.
/// </summary>
public sealed class HistorianClusterNodeState
{
public string Name { get; set; } = "";
public bool IsHealthy { get; set; }
public DateTime? CooldownUntil { get; set; }
public int FailureCount { get; set; }
public string? LastError { get; set; }
public DateTime? LastFailureTime { get; set; }
}
}

View File

@@ -0,0 +1,38 @@
using System.Collections.Generic;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Wonderware Historian SDK configuration. Populated from environment variables at Host
/// startup (see <c>Program.cs</c>) or from the Proxy's <c>DriverInstance.DriverConfig</c>
/// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation.
/// </summary>
public sealed class HistorianConfiguration
{
public bool Enabled { get; set; } = false;
/// <summary>Single-node fallback when <see cref="ServerNames"/> is empty.</summary>
public string ServerName { get; set; } = "localhost";
/// <summary>
/// Ordered cluster nodes. When non-empty, the data source tries each in order on connect,
/// falling through to the next on failure. A failed node is placed in cooldown for
/// <see cref="FailureCooldownSeconds"/> before being re-eligible.
/// </summary>
public List<string> ServerNames { get; set; } = new();
public int FailureCooldownSeconds { get; set; } = 60;
public bool IntegratedSecurity { get; set; } = true;
public string? UserName { get; set; }
public string? Password { get; set; }
public int Port { get; set; } = 32568;
public int CommandTimeoutSeconds { get; set; } = 30;
public int MaxValuesPerRead { get; set; } = 10000;
/// <summary>
/// Outer safety timeout applied to sync-over-async Historian operations. Must be
/// comfortably larger than <see cref="CommandTimeoutSeconds"/>.
/// </summary>
public int RequestTimeoutSeconds { get; set; } = 60;
}
}

View File

@@ -0,0 +1,621 @@
using System;
using System.Collections.Generic;
using StringCollection = System.Collections.Specialized.StringCollection;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
/// OPC-UA-free — emits <see cref="HistorianSample"/>/<see cref="HistorianAggregateSample"/>
/// which the Proxy maps to OPC UA <c>DataValue</c> on its side of the IPC.
/// </summary>
public sealed class HistorianDataSource : IHistorianDataSource
{
private static readonly ILogger Log = Serilog.Log.ForContext<HistorianDataSource>();
private readonly HistorianConfiguration _config;
private readonly object _connectionLock = new object();
private readonly object _eventConnectionLock = new object();
private readonly IHistorianConnectionFactory _factory;
private HistorianAccess? _connection;
private HistorianAccess? _eventConnection;
private bool _disposed;
private readonly object _healthLock = new object();
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
private DateTime? _lastSuccessTime;
private DateTime? _lastFailureTime;
private string? _lastError;
private string? _activeProcessNode;
private string? _activeEventNode;
private readonly HistorianClusterEndpointPicker _picker;
public HistorianDataSource(HistorianConfiguration config)
: this(config, new SdkHistorianConnectionFactory(), null) { }
internal HistorianDataSource(
HistorianConfiguration config,
IHistorianConnectionFactory factory,
HistorianClusterEndpointPicker? picker = null)
{
_config = config;
_factory = factory;
_picker = picker ?? new HistorianClusterEndpointPicker(config);
}
private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type)
{
var candidates = _picker.GetHealthyNodes();
if (candidates.Count == 0)
{
var total = _picker.NodeCount;
throw new InvalidOperationException(
total == 0
? "No historian nodes configured"
: $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to");
}
Exception? lastException = null;
foreach (var node in candidates)
{
var attemptConfig = CloneConfigWithServerName(node);
try
{
var conn = _factory.CreateAndConnect(attemptConfig, type);
_picker.MarkHealthy(node);
return (conn, node);
}
catch (Exception ex)
{
_picker.MarkFailed(node, ex.Message);
lastException = ex;
Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node);
}
}
var inner = lastException?.Message ?? "(no detail)";
throw new InvalidOperationException(
$"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}",
lastException);
}
private HistorianConfiguration CloneConfigWithServerName(string serverName)
{
return new HistorianConfiguration
{
Enabled = _config.Enabled,
ServerName = serverName,
ServerNames = _config.ServerNames,
FailureCooldownSeconds = _config.FailureCooldownSeconds,
IntegratedSecurity = _config.IntegratedSecurity,
UserName = _config.UserName,
Password = _config.Password,
Port = _config.Port,
CommandTimeoutSeconds = _config.CommandTimeoutSeconds,
MaxValuesPerRead = _config.MaxValuesPerRead
};
}
public HistorianHealthSnapshot GetHealthSnapshot()
{
var nodeStates = _picker.SnapshotNodeStates();
var healthyCount = 0;
foreach (var n in nodeStates)
if (n.IsHealthy) healthyCount++;
lock (_healthLock)
{
return new HistorianHealthSnapshot
{
TotalQueries = _totalSuccesses + _totalFailures,
TotalSuccesses = _totalSuccesses,
TotalFailures = _totalFailures,
ConsecutiveFailures = _consecutiveFailures,
LastSuccessTime = _lastSuccessTime,
LastFailureTime = _lastFailureTime,
LastError = _lastError,
ProcessConnectionOpen = Volatile.Read(ref _connection) != null,
EventConnectionOpen = Volatile.Read(ref _eventConnection) != null,
ActiveProcessNode = _activeProcessNode,
ActiveEventNode = _activeEventNode,
NodeCount = nodeStates.Count,
HealthyNodeCount = healthyCount,
Nodes = nodeStates
};
}
}
private void RecordSuccess()
{
lock (_healthLock)
{
_totalSuccesses++;
_lastSuccessTime = DateTime.UtcNow;
_consecutiveFailures = 0;
_lastError = null;
}
}
private void RecordFailure(string error)
{
lock (_healthLock)
{
_totalFailures++;
_lastFailureTime = DateTime.UtcNow;
_consecutiveFailures++;
_lastError = error;
}
}
private void EnsureConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
if (Volatile.Read(ref _connection) != null) return;
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process);
lock (_connectionLock)
{
if (_disposed)
{
conn.CloseConnection(out _);
conn.Dispose();
throw new ObjectDisposedException(nameof(HistorianDataSource));
}
if (_connection != null)
{
conn.CloseConnection(out _);
conn.Dispose();
return;
}
_connection = conn;
lock (_healthLock) _activeProcessNode = winningNode;
Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port);
}
}
private void HandleConnectionError(Exception? ex = null)
{
lock (_connectionLock)
{
if (_connection == null) return;
try
{
_connection.CloseConnection(out _);
_connection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
}
_connection = null;
string? failedNode;
lock (_healthLock)
{
failedNode = _activeProcessNode;
_activeProcessNode = null;
}
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)");
}
}
private void EnsureEventConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(HistorianDataSource));
if (Volatile.Read(ref _eventConnection) != null) return;
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event);
lock (_eventConnectionLock)
{
if (_disposed)
{
conn.CloseConnection(out _);
conn.Dispose();
throw new ObjectDisposedException(nameof(HistorianDataSource));
}
if (_eventConnection != null)
{
conn.CloseConnection(out _);
conn.Dispose();
return;
}
_eventConnection = conn;
lock (_healthLock) _activeEventNode = winningNode;
Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port);
}
}
private void HandleEventConnectionError(Exception? ex = null)
{
lock (_eventConnectionLock)
{
if (_eventConnection == null) return;
try
{
_eventConnection.CloseConnection(out _);
_eventConnection.Dispose();
}
catch (Exception disposeEx)
{
Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
}
_eventConnection = null;
string? failedNode;
lock (_healthLock)
{
failedNode = _activeEventNode;
_activeEventNode = null;
}
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)");
}
}
public Task<List<HistorianSample>> ReadRawAsync(
string tagName, DateTime startTime, DateTime endTime, int maxValues,
CancellationToken ct = default)
{
var results = new List<HistorianSample>();
try
{
EnsureConnected();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
RetrievalMode = HistorianRetrievalMode.Full
};
if (maxValues > 0)
args.BatchSize = (uint)maxValues;
else if (_config.MaxValuesPerRead > 0)
args.BatchSize = (uint)_config.MaxValuesPerRead;
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
RecordFailure($"raw StartQuery: {error.ErrorCode}");
HandleConnectionError();
return Task.FromResult(results);
}
var count = 0;
var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
object? value;
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
value = result.StringValue;
else
value = result.Value;
results.Add(new HistorianSample
{
Value = value,
TimestampUtc = timestamp,
Quality = (byte)(result.OpcQuality & 0xFF),
});
count++;
if (limit > 0 && count >= limit) break;
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
RecordFailure($"raw: {ex.Message}");
HandleConnectionError(ex);
}
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
tagName, results.Count, startTime, endTime);
return Task.FromResult(results);
}
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(
string tagName, DateTime startTime, DateTime endTime,
double intervalMs, string aggregateColumn,
CancellationToken ct = default)
{
var results = new List<HistorianAggregateSample>();
try
{
EnsureConnected();
using var query = _connection!.CreateAnalogSummaryQuery();
var args = new AnalogSummaryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = startTime,
EndDateTime = endTime,
Resolution = (ulong)intervalMs
};
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
RecordFailure($"aggregate StartQuery: {error.ErrorCode}");
HandleConnectionError();
return Task.FromResult(results);
}
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
var result = query.QueryResult;
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
var value = ExtractAggregateValue(result, aggregateColumn);
results.Add(new HistorianAggregateSample
{
Value = value,
TimestampUtc = timestamp,
});
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
RecordFailure($"aggregate: {ex.Message}");
HandleConnectionError(ex);
}
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
aggregateColumn, tagName, results.Count);
return Task.FromResult(results);
}
public Task<List<HistorianSample>> ReadAtTimeAsync(
string tagName, DateTime[] timestamps,
CancellationToken ct = default)
{
var results = new List<HistorianSample>();
if (timestamps == null || timestamps.Length == 0)
return Task.FromResult(results);
try
{
EnsureConnected();
foreach (var timestamp in timestamps)
{
ct.ThrowIfCancellationRequested();
using var query = _connection!.CreateHistoryQuery();
var args = new HistoryQueryArgs
{
TagNames = new StringCollection { tagName },
StartDateTime = timestamp,
EndDateTime = timestamp,
RetrievalMode = HistorianRetrievalMode.Interpolated,
BatchSize = 1
};
if (!query.StartQuery(args, out var error))
{
results.Add(new HistorianSample
{
Value = null,
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = 0, // Bad
});
continue;
}
if (query.MoveNext(out error))
{
var result = query.QueryResult;
object? value;
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
value = result.StringValue;
else
value = result.Value;
results.Add(new HistorianSample
{
Value = value,
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = (byte)(result.OpcQuality & 0xFF),
});
}
else
{
results.Add(new HistorianSample
{
Value = null,
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
Quality = 0,
});
}
query.EndQuery(out _);
}
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
RecordFailure($"at-time: {ex.Message}");
HandleConnectionError(ex);
}
Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
tagName, results.Count, timestamps.Length);
return Task.FromResult(results);
}
public Task<List<HistorianEventDto>> ReadEventsAsync(
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
CancellationToken ct = default)
{
var results = new List<HistorianEventDto>();
try
{
EnsureEventConnected();
using var query = _eventConnection!.CreateEventQuery();
var args = new EventQueryArgs
{
StartDateTime = startTime,
EndDateTime = endTime,
EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
QueryType = HistorianEventQueryType.Events,
EventOrder = HistorianEventOrder.Ascending
};
if (!string.IsNullOrEmpty(sourceName))
{
query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
}
if (!query.StartQuery(args, out var error))
{
Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode);
RecordFailure($"events StartQuery: {error.ErrorCode}");
HandleEventConnectionError();
return Task.FromResult(results);
}
var count = 0;
while (query.MoveNext(out error))
{
ct.ThrowIfCancellationRequested();
results.Add(ToDto(query.QueryResult));
count++;
if (maxEvents > 0 && count >= maxEvents) break;
}
query.EndQuery(out _);
RecordSuccess();
}
catch (OperationCanceledException) { throw; }
catch (ObjectDisposedException) { throw; }
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
RecordFailure($"events: {ex.Message}");
HandleEventConnectionError(ex);
}
Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
sourceName ?? "(all)", results.Count, startTime, endTime);
return Task.FromResult(results);
}
private static HistorianEventDto ToDto(HistorianEvent evt)
{
// The ArchestrA SDK marks these properties obsolete but still returns them; their
// successors aren't wired in the version we bind against. Using them is the documented
// v1 behavior — suppressed locally instead of project-wide so any non-event use of
// deprecated SDK surface still surfaces as an error.
#pragma warning disable CS0618
return new HistorianEventDto
{
Id = evt.Id,
Source = evt.Source,
EventTime = evt.EventTime,
ReceivedTime = evt.ReceivedTime,
DisplayText = evt.DisplayText,
Severity = (ushort)evt.Severity
};
#pragma warning restore CS0618
}
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
{
switch (column)
{
case "Average": return result.Average;
case "Minimum": return result.Minimum;
case "Maximum": return result.Maximum;
case "ValueCount": return result.ValueCount;
case "First": return result.First;
case "Last": return result.Last;
case "StdDev": return result.StdDev;
default: return null;
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection?.CloseConnection(out _);
_connection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK connection");
}
try
{
_eventConnection?.CloseConnection(out _);
_eventConnection?.Dispose();
}
catch (Exception ex)
{
Log.Warning(ex, "Error closing Historian SDK event connection");
}
_connection = null;
_eventConnection = null;
}
}
}

View File

@@ -0,0 +1,18 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// SDK-free representation of a Historian event record. Prevents ArchestrA types from
/// leaking beyond <c>HistorianDataSource</c>.
/// </summary>
public sealed class HistorianEventDto
{
public Guid Id { get; set; }
public string? Source { get; set; }
public DateTime EventTime { get; set; }
public DateTime ReceivedTime { get; set; }
public string? DisplayText { get; set; }
public ushort Severity { get; set; }
}
}

View File

@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard
/// via an IPC health query (not wired in PR #5; deferred).
/// </summary>
public sealed class HistorianHealthSnapshot
{
public long TotalQueries { get; set; }
public long TotalSuccesses { get; set; }
public long TotalFailures { get; set; }
public int ConsecutiveFailures { get; set; }
public DateTime? LastSuccessTime { get; set; }
public DateTime? LastFailureTime { get; set; }
public string? LastError { get; set; }
public bool ProcessConnectionOpen { get; set; }
public bool EventConnectionOpen { get; set; }
public string? ActiveProcessNode { get; set; }
public string? ActiveEventNode { get; set; }
public int NodeCount { get; set; }
public int HealthyNodeCount { get; set; }
public List<HistorianClusterNodeState> Nodes { get; set; } = new();
}
}

View File

@@ -0,0 +1,30 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// OPC-UA-free representation of a single historical data point. The Host returns these
/// across the IPC boundary as <c>GalaxyDataValue</c>; the Proxy maps quality and value to
/// OPC UA <c>DataValue</c>. Raw MX quality byte is preserved so the Proxy can use the same
/// quality mapper it already uses for live reads.
/// </summary>
public sealed class HistorianSample
{
public object? Value { get; set; }
/// <summary>Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality).</summary>
public byte Quality { get; set; }
public DateTime TimestampUtc { get; set; }
}
/// <summary>
/// Result of <see cref="IHistorianDataSource.ReadAggregateAsync"/>. When <see cref="Value"/> is
/// null the aggregate is unavailable for that bucket (Proxy maps to <c>BadNoData</c>).
/// </summary>
public sealed class HistorianAggregateSample
{
public double? Value { get; set; }
public DateTime TimestampUtc { get; set; }
}
}

View File

@@ -0,0 +1,73 @@
using System;
using System.Threading;
using ArchestrA;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that
/// control connection success, failure, and timeout behavior.
/// </summary>
internal interface IHistorianConnectionFactory
{
HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type);
}
/// <summary>Production implementation — opens real Historian SDK connections.</summary>
internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory
{
public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type)
{
var conn = new HistorianAccess();
var args = new HistorianConnectionArgs
{
ServerName = config.ServerName,
TcpPort = (ushort)config.Port,
IntegratedSecurity = config.IntegratedSecurity,
UseArchestrAUser = config.IntegratedSecurity,
ConnectionType = type,
ReadOnly = true,
PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000)
};
if (!config.IntegratedSecurity)
{
args.UserName = config.UserName ?? string.Empty;
args.Password = config.Password ?? string.Empty;
}
if (!conn.OpenConnection(args, out var error))
{
conn.Dispose();
throw new InvalidOperationException(
$"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}");
}
var timeoutMs = config.CommandTimeoutSeconds * 1000;
var elapsed = 0;
while (elapsed < timeoutMs)
{
var status = new HistorianConnectionStatus();
conn.GetConnectionStatus(ref status);
if (status.ConnectedToServer)
return conn;
if (status.ErrorOccurred)
{
conn.Dispose();
throw new InvalidOperationException(
$"Historian SDK connection failed: {status.Error}");
}
Thread.Sleep(250);
elapsed += 250;
}
conn.Dispose();
throw new TimeoutException(
$"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s");
}
}
}

View File

@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
{
/// <summary>
/// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host.
/// Implementations read via the aahClient* SDK; the Proxy side maps returned samples
/// to OPC UA <c>DataValue</c>.
/// </summary>
public interface IHistorianDataSource : IDisposable
{
Task<List<HistorianSample>> ReadRawAsync(
string tagName, DateTime startTime, DateTime endTime, int maxValues,
CancellationToken ct = default);
Task<List<HistorianAggregateSample>> ReadAggregateAsync(
string tagName, DateTime startTime, DateTime endTime,
double intervalMs, string aggregateColumn,
CancellationToken ct = default);
Task<List<HistorianSample>> ReadAtTimeAsync(
string tagName, DateTime[] timestamps,
CancellationToken ct = default);
Task<List<HistorianEventDto>> ReadEventsAsync(
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
CancellationToken ct = default);
HistorianHealthSnapshot GetHealthSnapshot();
}
}

View File

@@ -38,6 +38,7 @@ public interface IGalaxyBackend
Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct);
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
}

View File

@@ -5,6 +5,7 @@ using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
@@ -18,10 +19,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
/// MxAccess <c>AlarmExtension</c> primitives but the wire-up is also Phase 2 follow-up
/// (the v1 alarm subsystem is its own subtree).
/// </summary>
public sealed class MxAccessGalaxyBackend : IGalaxyBackend
public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
{
private readonly GalaxyRepository _repository;
private readonly MxAccessClient _mx;
private readonly IHistorianDataSource? _historian;
private long _nextSessionId;
private long _nextSubscriptionId;
@@ -37,10 +39,11 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
{
_repository = repository;
_mx = mx;
_historian = historian;
}
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
@@ -222,17 +225,92 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
public Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadResponse
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Tags = Array.Empty<HistoryTagValues>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
var tags = new List<HistoryTagValues>(req.TagReferences.Length);
try
{
Success = false,
Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)",
Tags = Array.Empty<HistoryTagValues>(),
});
foreach (var reference in req.TagReferences)
{
var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false);
tags.Add(new HistoryTagValues
{
TagReference = reference,
Values = samples.Select(s => ToWire(reference, s)).ToArray(),
});
}
return new HistoryReadResponse { Success = true, Tags = tags.ToArray() };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadResponse
{
Success = false,
Error = $"Historian read failed: {ex.Message}",
Tags = tags.ToArray(),
};
}
}
public async Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadProcessedResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Values = Array.Empty<GalaxyDataValue>(),
};
if (req.IntervalMs <= 0)
return new HistoryReadProcessedResponse
{
Success = false,
Error = "HistoryReadProcessed requires IntervalMs > 0",
Values = Array.Empty<GalaxyDataValue>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
try
{
var samples = await _historian.ReadAggregateAsync(
req.TagReference, start, end, req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false);
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
return new HistoryReadProcessedResponse { Success = true, Values = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadProcessedResponse
{
Success = false,
Error = $"Historian aggregate read failed: {ex.Message}",
Values = Array.Empty<GalaxyDataValue>(),
};
}
}
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
public void Dispose() => _historian?.Dispose();
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
{
TagReference = reference,
@@ -243,6 +321,47 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
/// <summary>
/// Maps a <see cref="HistorianSample"/> (raw historian row, OPC-UA-free) to the IPC wire
/// shape. The Proxy decodes the MessagePack value and maps <see cref="HistorianSample.Quality"/>
/// through <c>QualityMapper</c> on its side of the pipe — we keep the raw byte here so
/// rich OPC DA status codes (e.g. <c>BadNotConnected</c>, <c>UncertainSubNormal</c>) survive
/// the hop intact.
/// </summary>
private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new()
{
TagReference = reference,
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
ValueMessagePackType = 0,
StatusCode = MapHistorianQualityToOpcUa(sample.Quality),
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static uint MapHistorianQualityToOpcUa(byte q)
{
// Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges.
// The Proxy may refine this when it decodes the wire frame.
if (q >= 192) return 0x00000000u; // Good
if (q >= 64) return 0x40000000u; // Uncertain
return 0x80000000u; // Bad
}
/// <summary>
/// Maps a <see cref="HistorianAggregateSample"/> (one aggregate bucket) to the IPC wire
/// shape. A null <see cref="HistorianAggregateSample.Value"/> means the aggregate was
/// unavailable for the bucket — the Proxy translates that to OPC UA <c>BadNoData</c>.
/// </summary>
private static GalaxyDataValue ToWire(string reference, HistorianAggregateSample sample) => new()
{
TagReference = reference,
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value.Value),
ValueMessagePackType = 0,
StatusCode = sample.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u,
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
{
AttributeName = row.AttributeName,

View File

@@ -85,6 +85,15 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
Tags = System.Array.Empty<HistoryTagValues>(),
});
public Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadProcessedResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse
{

View File

@@ -80,6 +80,13 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct);
return;
}
case MessageKind.HistoryReadProcessedRequest:
{
var resp = await backend.HistoryReadProcessedAsync(
Deserialize<HistoryReadProcessedRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
return;
}
case MessageKind.RecycleHostRequest:
{
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);

View File

@@ -4,6 +4,7 @@ using System.Threading;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
@@ -66,9 +67,11 @@ public static class Program
pump = new StaPump("Galaxy.Sta");
pump.WaitForStartedAsync().GetAwaiter().GetResult();
mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName);
var historian = BuildHistorianIfEnabled();
backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }),
mx);
mx,
historian);
break;
}
@@ -77,6 +80,7 @@ public static class Program
try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); }
finally
{
(backend as IDisposable)?.Dispose();
mx?.Dispose();
pump?.Dispose();
}
@@ -91,4 +95,45 @@ public static class Program
}
finally { Log.CloseAndFlush(); }
}
/// <summary>
/// Builds a <see cref="HistorianDataSource"/> from the OTOPCUA_HISTORIAN_* environment
/// variables the supervisor passes at spawn time. Returns null when the historian is
/// disabled (default) so <c>MxAccessGalaxyBackend.HistoryReadAsync</c> returns a clear
/// "not configured" error instead of attempting an SDK connection to localhost.
/// </summary>
private static IHistorianDataSource? BuildHistorianIfEnabled()
{
var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED");
if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1")
return null;
var cfg = new HistorianConfiguration
{
Enabled = true,
ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase),
UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"),
Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"),
CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30),
MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000),
FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60),
};
var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS");
if (!string.IsNullOrWhiteSpace(servers))
cfg.ServerNames = new System.Collections.Generic.List<string>(
servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}",
cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port);
return new HistorianDataSource(cfg);
}
private static int TryParseInt(string envName, int defaultValue)
{
var raw = Environment.GetEnvironmentVariable(envName);
return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
}
}

View File

@@ -30,11 +30,43 @@
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests"/>
</ItemGroup>
<ItemGroup>
<Reference Include="ArchestrA.MxAccess">
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
<Private>true</Private>
</Reference>
<!-- Wonderware Historian SDK — consumed by Backend/Historian/ for HistoryReadAsync.
Previously lived in the v1 Historian.Aveva plugin; folded into Driver.Galaxy.Host
for PR #5 because this host is already Galaxy-specific. -->
<Reference Include="aahClientManaged">
<HintPath>..\..\lib\aahClientManaged.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
<Reference Include="aahClientCommon">
<HintPath>..\..\lib\aahClientCommon.dll</HintPath>
<EmbedInteropTypes>false</EmbedInteropTypes>
</Reference>
</ItemGroup>
<ItemGroup>
<!-- Historian SDK native and satellite DLLs — staged beside the host exe so the
aahClientManaged wrapper can P/Invoke into them without an AssemblyResolve hook. -->
<None Include="..\..\lib\aahClient.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\Historian.CBE.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\Historian.DPAPI.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="..\..\lib\ArchestrA.CloudHistorian.Contract.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>

View File

@@ -296,10 +296,50 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public Task<HistoryReadResult> ReadProcessedAsync(
public async Task<HistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
=> throw new NotSupportedException("Galaxy historian processed reads are not supported in v2; use ReadRawAsync.");
{
var client = RequireClient();
var column = MapAggregateToColumn(aggregate);
var resp = await client.CallAsync<HistoryReadProcessedRequest, HistoryReadProcessedResponse>(
MessageKind.HistoryReadProcessedRequest,
new HistoryReadProcessedRequest
{
SessionId = _sessionId,
TagReference = fullReference,
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
IntervalMs = (long)interval.TotalMilliseconds,
AggregateColumn = column,
},
MessageKind.HistoryReadProcessedResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadProcessed failed: {resp.Error}");
IReadOnlyList<DataValueSnapshot> samples = [.. resp.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
/// <summary>
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.
/// Kept on the Proxy side so Galaxy.Host stays OPC-UA-free.
/// </summary>
internal static string MapAggregateToColumn(HistoryAggregateType aggregate) => aggregate switch
{
HistoryAggregateType.Average => "Average",
HistoryAggregateType.Minimum => "Minimum",
HistoryAggregateType.Maximum => "Maximum",
HistoryAggregateType.Count => "ValueCount",
HistoryAggregateType.Total => throw new NotSupportedException(
"HistoryAggregateType.Total is not supported by the Wonderware Historian AnalogSummary " +
"query — use Average × Count on the caller side, or switch to Average/Minimum/Maximum/Count."),
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
};
// ---- IRediscoverable ----

View File

@@ -16,6 +16,10 @@
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>

View File

@@ -50,6 +50,8 @@ public enum MessageKind : byte
HistoryReadRequest = 0x60,
HistoryReadResponse = 0x61,
HistoryReadProcessedRequest = 0x62,
HistoryReadProcessedResponse = 0x63,
HostConnectivityStatus = 0x70,
RuntimeStatusChange = 0x71,

View File

@@ -26,3 +26,27 @@ public sealed class HistoryReadResponse
[Key(1)] public string? Error { get; set; }
[Key(2)] public HistoryTagValues[] Tags { get; set; } = System.Array.Empty<HistoryTagValues>();
}
/// <summary>
/// Processed (aggregated) historian read — OPC UA HistoryReadProcessed service. The
/// aggregate column is a string (e.g. "Average", "Minimum") mapped by the Proxy from the
/// OPC UA HistoryAggregateType enum so Galaxy.Host stays OPC-UA-free.
/// </summary>
[MessagePackObject]
public sealed class HistoryReadProcessedRequest
{
[Key(0)] public long SessionId { get; set; }
[Key(1)] public string TagReference { get; set; } = string.Empty;
[Key(2)] public long StartUtcUnixMs { get; set; }
[Key(3)] public long EndUtcUnixMs { get; set; }
[Key(4)] public long IntervalMs { get; set; }
[Key(5)] public string AggregateColumn { get; set; } = "Average";
}
[MessagePackObject]
public sealed class HistoryReadProcessedResponse
{
[Key(0)] public bool Success { get; set; }
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
}

View File

@@ -0,0 +1,94 @@
using System;
using System.Linq;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
{
[Trait("Category", "Unit")]
public sealed class HistorianClusterEndpointPickerTests
{
private static HistorianConfiguration Config(params string[] nodes) => new()
{
ServerName = "ignored",
ServerNames = nodes.ToList(),
FailureCooldownSeconds = 60,
};
[Fact]
public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty()
{
var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() };
var p = new HistorianClusterEndpointPicker(cfg);
p.NodeCount.ShouldBe(1);
p.GetHealthyNodes().ShouldBe(new[] { "only-node" });
}
[Fact]
public void Failed_node_enters_cooldown_and_is_skipped()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
p.MarkFailed("a", "boom");
p.GetHealthyNodes().ShouldBe(new[] { "b" });
}
[Fact]
public void Cooldown_expires_after_configured_window()
{
var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock);
p.MarkFailed("a", "boom");
p.GetHealthyNodes().ShouldBe(new[] { "b" });
clock = clock.AddSeconds(61);
p.GetHealthyNodes().ShouldBe(new[] { "a", "b" });
}
[Fact]
public void MarkHealthy_immediately_clears_cooldown()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
p.MarkFailed("a", "boom");
p.GetHealthyNodes().ShouldBeEmpty();
p.MarkHealthy("a");
p.GetHealthyNodes().ShouldBe(new[] { "a" });
}
[Fact]
public void All_nodes_in_cooldown_returns_empty_healthy_list()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
p.MarkFailed("a", "x");
p.MarkFailed("b", "y");
p.GetHealthyNodes().ShouldBeEmpty();
p.NodeCount.ShouldBe(2);
}
[Fact]
public void Snapshot_reports_failure_count_and_last_error()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
p.MarkFailed("a", "first");
p.MarkFailed("a", "second");
var snap = p.SnapshotNodeStates().Single();
snap.FailureCount.ShouldBe(2);
snap.LastError.ShouldBe("second");
snap.IsHealthy.ShouldBeFalse();
snap.CooldownUntil.ShouldNotBeNull();
}
[Fact]
public void Duplicate_hostnames_are_deduplicated_case_insensitively()
{
var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB"));
p.NodeCount.ShouldBe(2);
}
}
}

View File

@@ -0,0 +1,109 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
{
[Trait("Category", "Unit")]
public sealed class HistorianWiringTests
{
/// <summary>
/// When the Proxy sends a HistoryRead but the supervisor never enabled the historian
/// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a
/// self-explanatory error — not an exception or a hang against localhost.
/// </summary>
[Fact]
public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
historian: null);
var resp = await backend.HistoryReadAsync(new HistoryReadRequest
{
TagReferences = new[] { "TestTag" },
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
MaxValuesPerTag = 100,
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
resp.Tags.ShouldBeEmpty();
}
/// <summary>
/// When the historian is wired up, we expect the backend to call through and map
/// samples onto the IPC wire shape. Uses a fake <see cref="IHistorianDataSource"/>
/// that returns a single known-good sample so we can assert the mapping stays sane.
/// </summary>
[Fact]
public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
var fake = new FakeHistorianDataSource(new HistorianSample
{
Value = 42.5,
Quality = 192, // Good
TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc),
});
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
fake);
var resp = await backend.HistoryReadAsync(new HistoryReadRequest
{
TagReferences = new[] { "TankLevel" },
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
MaxValuesPerTag = 100,
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Tags.Length.ShouldBe(1);
resp.Tags[0].TagReference.ShouldBe("TankLevel");
resp.Tags[0].Values.Length.ShouldBe(1);
resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good
resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull();
resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe(
new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds());
}
private sealed class FakeHistorianDataSource : IHistorianDataSource
{
private readonly HistorianSample _sample;
public FakeHistorianDataSource(HistorianSample sample) => _sample = sample;
public Task<List<HistorianSample>> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample> { _sample });
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
=> Task.FromResult(new List<HistorianAggregateSample>());
public Task<List<HistorianSample>> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianEventDto>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}
}

View File

@@ -0,0 +1,158 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HistoryReadProcessedTests
{
[Fact]
public async Task ReturnsDisabledError_When_NoHistorianConfigured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
historian: null);
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
{
TagReference = "T",
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
IntervalMs = 1000,
AggregateColumn = "Average",
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
}
[Fact]
public async Task Rejects_NonPositiveInterval()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
var fake = new FakeHistorianDataSource();
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
fake);
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
{
TagReference = "T",
IntervalMs = 0,
AggregateColumn = "Average",
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("IntervalMs");
}
[Fact]
public async Task Maps_AggregateSample_With_Value_To_Good()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
var fake = new FakeHistorianDataSource(new HistorianAggregateSample
{
Value = 12.34,
TimestampUtc = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc),
});
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
fake);
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
{
TagReference = "T",
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
IntervalMs = 60_000,
AggregateColumn = "Average",
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(1);
resp.Values[0].StatusCode.ShouldBe(0u); // Good
resp.Values[0].ValueBytes.ShouldNotBeNull();
MessagePackSerializer.Deserialize<double>(resp.Values[0].ValueBytes!).ShouldBe(12.34);
fake.LastAggregateColumn.ShouldBe("Average");
fake.LastIntervalMs.ShouldBe(60_000d);
}
[Fact]
public async Task Maps_Null_Bucket_To_BadNoData()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
var fake = new FakeHistorianDataSource(new HistorianAggregateSample
{
Value = null,
TimestampUtc = DateTime.UtcNow,
});
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
fake);
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
{
TagReference = "T",
IntervalMs = 1000,
AggregateColumn = "Minimum",
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(1);
resp.Values[0].StatusCode.ShouldBe(0x800E0000u); // BadNoData
resp.Values[0].ValueBytes.ShouldBeNull();
}
private sealed class FakeHistorianDataSource : IHistorianDataSource
{
private readonly HistorianAggregateSample[] _samples;
public string? LastAggregateColumn { get; private set; }
public double LastIntervalMs { get; private set; }
public FakeHistorianDataSource(params HistorianAggregateSample[] samples) => _samples = samples;
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(
string tag, DateTime s, DateTime e, double intervalMs, string col, CancellationToken ct)
{
LastAggregateColumn = col;
LastIntervalMs = intervalMs;
return Task.FromResult(new List<HistorianAggregateSample>(_samples));
}
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianEventDto>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}

View File

@@ -0,0 +1,27 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
[Trait("Category", "Unit")]
public sealed class AggregateColumnMappingTests
{
[Theory]
[InlineData(HistoryAggregateType.Average, "Average")]
[InlineData(HistoryAggregateType.Minimum, "Minimum")]
[InlineData(HistoryAggregateType.Maximum, "Maximum")]
[InlineData(HistoryAggregateType.Count, "ValueCount")]
public void Maps_OpcUa_enum_to_AnalogSummary_column(HistoryAggregateType aggregate, string expected)
{
GalaxyProxyDriver.MapAggregateToColumn(aggregate).ShouldBe(expected);
}
[Fact]
public void Total_is_not_supported()
{
Should.Throw<System.NotSupportedException>(
() => GalaxyProxyDriver.MapAggregateToColumn(HistoryAggregateType.Total));
}
}