Compare commits
3 Commits
phase-2-st
...
phase-2-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3717405aa6 | ||
|
|
6df1a79d35 | ||
|
|
caa9cb86f6 |
91
docs/v2/implementation/pr-4-body.md
Normal file
91
docs/v2/implementation/pr-4-body.md
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
# PR 4 — Phase 2 follow-up: close the 4 open MXAccess findings
|
||||||
|
|
||||||
|
**Source**: `phase-2-pr4-findings` (branched from `phase-2-stream-d`)
|
||||||
|
**Target**: `v2`
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
Closes the 4 high/medium open findings carried forward in `exit-gate-phase-2-final.md`:
|
||||||
|
|
||||||
|
- **High 1 — `ReadAsync` subscription-leak on cancel.** One-shot read now wraps the
|
||||||
|
subscribe→first-OnDataChange→unsubscribe pattern in a `try/finally` so the per-tag
|
||||||
|
callback is always detached, and if the read installed the underlying MXAccess
|
||||||
|
subscription itself (no other caller had it), it tears it down on the way out.
|
||||||
|
- **High 2 — No reconnect loop on the MXAccess COM connection.** New
|
||||||
|
`MxAccessClientOptions { AutoReconnect, MonitorInterval, StaleThreshold }` + a background
|
||||||
|
`MonitorLoopAsync` that watches a stale-activity threshold + probes the proxy via a
|
||||||
|
no-op COM call, then reconnects-with-replay (re-Register, re-AddItem every active
|
||||||
|
subscription) when the proxy is dead. Liveness signal: every `OnDataChange` callback bumps
|
||||||
|
`_lastObservedActivityUtc`. Defaults match v1 monitor cadence (5s poll, 60s stale).
|
||||||
|
`ReconnectCount` exposed for diagnostics; `ConnectionStateChanged` event for downstream
|
||||||
|
consumers (the supervisor on the Proxy side already surfaces this through its
|
||||||
|
HeartbeatMonitor, but the Host-side event lets local logging/metrics hook in).
|
||||||
|
- **Medium 3 — `MxAccessGalaxyBackend.SubscribeAsync` doesn't push OnDataChange frames back to
|
||||||
|
the Proxy.** New `IGalaxyBackend.OnDataChange` / `OnAlarmEvent` / `OnHostStatusChanged`
|
||||||
|
events that the new `GalaxyFrameHandler.AttachConnection` subscribes per-connection and
|
||||||
|
forwards as outbound `OnDataChangeNotification` / `AlarmEvent` /
|
||||||
|
`RuntimeStatusChange` frames through the connection's `FrameWriter`. `MxAccessGalaxyBackend`
|
||||||
|
fans out per-tag value changes to every `SubscriptionId` that's listening to that tag
|
||||||
|
(multiple Proxy subs may share a Galaxy attribute — single COM subscription, multi-fan-out
|
||||||
|
on the wire). Stub + DbBacked backends declare the events with `#pragma warning disable
|
||||||
|
CS0067` (treat-warnings-as-errors would otherwise fail on never-raised events that exist
|
||||||
|
only to satisfy the interface).
|
||||||
|
- **Medium 4 — `WriteValuesAsync` doesn't await `OnWriteComplete`.** New
|
||||||
|
`WriteAsync(...)` overload returns `bool` after awaiting the OnWriteComplete callback via
|
||||||
|
the v1-style `TaskCompletionSource`-keyed-by-item-handle pattern in `_pendingWrites`.
|
||||||
|
`MxAccessGalaxyBackend.WriteValuesAsync` now reports per-tag `Bad_InternalError` when the
|
||||||
|
runtime rejected the write, instead of false-positive `Good`.
|
||||||
|
|
||||||
|
## Pipe server change
|
||||||
|
|
||||||
|
`IFrameHandler` gains `AttachConnection(FrameWriter writer): IDisposable` so the handler can
|
||||||
|
register backend event sinks on each accepted connection and detach them at disconnect. The
|
||||||
|
`PipeServer.RunOneConnectionAsync` calls it after the Hello handshake and disposes it in the
|
||||||
|
finally of the per-connection scope. `StubFrameHandler` returns `IFrameHandler.NoopAttachment.Instance`
|
||||||
|
(net48 doesn't support default interface methods, so the empty-attach lives as a public nested
|
||||||
|
class).
|
||||||
|
|
||||||
|
## Tests
|
||||||
|
|
||||||
|
**`dotnet test ZB.MOM.WW.OtOpcUa.slnx`**: **460 pass / 7 skip (E2E on admin shell) / 1
|
||||||
|
pre-existing baseline failure**. No regressions. The Driver.Galaxy.Host unit tests + 5 live
|
||||||
|
ZB smoke + 3 live MXAccess COM smoke all pass unchanged.
|
||||||
|
|
||||||
|
## Test plan for reviewers
|
||||||
|
|
||||||
|
- [ ] `dotnet build` clean
|
||||||
|
- [ ] `dotnet test` shows 460/7-skip/1-baseline
|
||||||
|
- [ ] Spot-check `MxAccessClient.MonitorLoopAsync` against v1's `MxAccessClient.Monitor`
|
||||||
|
partial (`src/ZB.MOM.WW.OtOpcUa.Host/MxAccess/MxAccessClient.Monitor.cs`) — same
|
||||||
|
polling cadence, same probe-then-reconnect-with-replay shape
|
||||||
|
- [ ] Read `GalaxyFrameHandler.ConnectionSink.Dispose` and confirm event handlers are
|
||||||
|
detached on connection close (no leaked invocation list refs)
|
||||||
|
- [ ] `WriteValuesAsync` returning `Bad_InternalError` on a runtime-rejected write is the
|
||||||
|
correct shape — confirm against the v1 `MxAccessClient.ReadWrite.cs` pattern
|
||||||
|
|
||||||
|
## What's NOT in this PR
|
||||||
|
|
||||||
|
- Wonderware Historian SDK plugin port (Task B.1.h) — separate PR, larger scope.
|
||||||
|
- Alarm subsystem wire-up (`MxAccessGalaxyBackend.SubscribeAlarmsAsync` is still a no-op).
|
||||||
|
`OnAlarmEvent` is declared on the backend interface and pushed by the frame handler when
|
||||||
|
raised; `MxAccessGalaxyBackend` just doesn't raise it yet (waits for the alarm-tracking
|
||||||
|
port from v1's `AlarmObjectFilter` + Galaxy alarm primitives).
|
||||||
|
- Host-status push (`OnHostStatusChanged`) — declared on the interface and pushed by the
|
||||||
|
frame handler; `MxAccessGalaxyBackend` doesn't raise it (the Galaxy.Host's
|
||||||
|
`HostConnectivityProbe` from v1 needs porting too, scoped under the Historian PR).
|
||||||
|
|
||||||
|
## Adversarial review
|
||||||
|
|
||||||
|
Quick pass over the PR 4 deltas. No new findings beyond:
|
||||||
|
|
||||||
|
- **Low 1** — `MonitorLoopAsync`'s `$Heartbeat` probe item-handle is leaked
|
||||||
|
(`AddItem` succeeds, never `RemoveItem`'d). Cosmetic — the probe item is internal to
|
||||||
|
the COM connection, dies with `Unregister` at disconnect/recycle. Worth a follow-up
|
||||||
|
to call `RemoveItem` after the probe succeeds.
|
||||||
|
- **Low 2** — Replay loop in `MonitorLoopAsync` swallows per-subscription failures. If
|
||||||
|
Galaxy permanently rejects a previously-valid reference (rare but possible after a
|
||||||
|
re-deploy), the user gets silent data loss for that one subscription. The stub-handler-
|
||||||
|
unaware operator wouldn't notice. Worth surfacing as a `ConnectionStateChanged(false)
|
||||||
|
→ ConnectionStateChanged(true)` payload that includes the replay-failures list.
|
||||||
|
|
||||||
|
Both are low-priority follow-ups, not PR 4 blockers.
|
||||||
@@ -21,6 +21,13 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
|
|||||||
private long _nextSessionId;
|
private long _nextSessionId;
|
||||||
private long _nextSubscriptionId;
|
private long _nextSubscriptionId;
|
||||||
|
|
||||||
|
// DB-only backend doesn't have a runtime data plane; never raises events.
|
||||||
|
#pragma warning disable CS0067
|
||||||
|
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||||
|
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||||
|
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||||
|
#pragma warning restore CS0067
|
||||||
|
|
||||||
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||||
{
|
{
|
||||||
var id = Interlocked.Increment(ref _nextSessionId);
|
var id = Interlocked.Increment(ref _nextSessionId);
|
||||||
@@ -120,6 +127,15 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
|
|||||||
Tags = System.Array.Empty<HistoryTagValues>(),
|
Tags = System.Array.Empty<HistoryTagValues>(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
public Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
|
||||||
|
HistoryReadProcessedRequest req, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new HistoryReadProcessedResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
|
||||||
|
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||||
|
});
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,129 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Thread-safe, pure-logic endpoint picker for the Wonderware Historian cluster. Tracks which
|
||||||
|
/// configured nodes are healthy, places failed nodes in a time-bounded cooldown, and hands
|
||||||
|
/// out an ordered list of eligible candidates for the data source to try in sequence.
|
||||||
|
/// </summary>
|
||||||
|
internal sealed class HistorianClusterEndpointPicker
|
||||||
|
{
|
||||||
|
private readonly Func<DateTime> _clock;
|
||||||
|
private readonly TimeSpan _cooldown;
|
||||||
|
private readonly object _lock = new object();
|
||||||
|
private readonly List<NodeEntry> _nodes;
|
||||||
|
|
||||||
|
public HistorianClusterEndpointPicker(HistorianConfiguration config)
|
||||||
|
: this(config, () => DateTime.UtcNow) { }
|
||||||
|
|
||||||
|
internal HistorianClusterEndpointPicker(HistorianConfiguration config, Func<DateTime> clock)
|
||||||
|
{
|
||||||
|
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
|
||||||
|
_cooldown = TimeSpan.FromSeconds(Math.Max(0, config.FailureCooldownSeconds));
|
||||||
|
|
||||||
|
var names = (config.ServerNames != null && config.ServerNames.Count > 0)
|
||||||
|
? config.ServerNames
|
||||||
|
: new List<string> { config.ServerName };
|
||||||
|
|
||||||
|
_nodes = names
|
||||||
|
.Where(n => !string.IsNullOrWhiteSpace(n))
|
||||||
|
.Select(n => n.Trim())
|
||||||
|
.Distinct(StringComparer.OrdinalIgnoreCase)
|
||||||
|
.Select(n => new NodeEntry { Name = n })
|
||||||
|
.ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int NodeCount
|
||||||
|
{
|
||||||
|
get { lock (_lock) return _nodes.Count; }
|
||||||
|
}
|
||||||
|
|
||||||
|
public IReadOnlyList<string> GetHealthyNodes()
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
var now = _clock();
|
||||||
|
return _nodes.Where(n => IsHealthyAt(n, now)).Select(n => n.Name).ToList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int HealthyNodeCount
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
var now = _clock();
|
||||||
|
return _nodes.Count(n => IsHealthyAt(n, now));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void MarkFailed(string node, string? error)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
var entry = FindEntry(node);
|
||||||
|
if (entry == null) return;
|
||||||
|
|
||||||
|
var now = _clock();
|
||||||
|
entry.FailureCount++;
|
||||||
|
entry.LastError = error;
|
||||||
|
entry.LastFailureTime = now;
|
||||||
|
entry.CooldownUntil = _cooldown.TotalMilliseconds > 0 ? now + _cooldown : (DateTime?)null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void MarkHealthy(string node)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
var entry = FindEntry(node);
|
||||||
|
if (entry == null) return;
|
||||||
|
entry.CooldownUntil = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<HistorianClusterNodeState> SnapshotNodeStates()
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
var now = _clock();
|
||||||
|
return _nodes.Select(n => new HistorianClusterNodeState
|
||||||
|
{
|
||||||
|
Name = n.Name,
|
||||||
|
IsHealthy = IsHealthyAt(n, now),
|
||||||
|
CooldownUntil = IsHealthyAt(n, now) ? null : n.CooldownUntil,
|
||||||
|
FailureCount = n.FailureCount,
|
||||||
|
LastError = n.LastError,
|
||||||
|
LastFailureTime = n.LastFailureTime
|
||||||
|
}).ToList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static bool IsHealthyAt(NodeEntry entry, DateTime now)
|
||||||
|
{
|
||||||
|
return entry.CooldownUntil == null || entry.CooldownUntil <= now;
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeEntry? FindEntry(string node)
|
||||||
|
{
|
||||||
|
for (var i = 0; i < _nodes.Count; i++)
|
||||||
|
if (string.Equals(_nodes[i].Name, node, StringComparison.OrdinalIgnoreCase))
|
||||||
|
return _nodes[i];
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class NodeEntry
|
||||||
|
{
|
||||||
|
public string Name { get; set; } = "";
|
||||||
|
public DateTime? CooldownUntil { get; set; }
|
||||||
|
public int FailureCount { get; set; }
|
||||||
|
public string? LastError { get; set; }
|
||||||
|
public DateTime? LastFailureTime { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Point-in-time state of a single historian cluster node. One entry per configured node
|
||||||
|
/// appears inside <see cref="HistorianHealthSnapshot"/>.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianClusterNodeState
|
||||||
|
{
|
||||||
|
public string Name { get; set; } = "";
|
||||||
|
public bool IsHealthy { get; set; }
|
||||||
|
public DateTime? CooldownUntil { get; set; }
|
||||||
|
public int FailureCount { get; set; }
|
||||||
|
public string? LastError { get; set; }
|
||||||
|
public DateTime? LastFailureTime { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,38 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Wonderware Historian SDK configuration. Populated from environment variables at Host
|
||||||
|
/// startup (see <c>Program.cs</c>) or from the Proxy's <c>DriverInstance.DriverConfig</c>
|
||||||
|
/// section passed during OpenSession. Kept OPC-UA-free — the Proxy side owns UA translation.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianConfiguration
|
||||||
|
{
|
||||||
|
public bool Enabled { get; set; } = false;
|
||||||
|
|
||||||
|
/// <summary>Single-node fallback when <see cref="ServerNames"/> is empty.</summary>
|
||||||
|
public string ServerName { get; set; } = "localhost";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Ordered cluster nodes. When non-empty, the data source tries each in order on connect,
|
||||||
|
/// falling through to the next on failure. A failed node is placed in cooldown for
|
||||||
|
/// <see cref="FailureCooldownSeconds"/> before being re-eligible.
|
||||||
|
/// </summary>
|
||||||
|
public List<string> ServerNames { get; set; } = new();
|
||||||
|
|
||||||
|
public int FailureCooldownSeconds { get; set; } = 60;
|
||||||
|
public bool IntegratedSecurity { get; set; } = true;
|
||||||
|
public string? UserName { get; set; }
|
||||||
|
public string? Password { get; set; }
|
||||||
|
public int Port { get; set; } = 32568;
|
||||||
|
public int CommandTimeoutSeconds { get; set; } = 30;
|
||||||
|
public int MaxValuesPerRead { get; set; } = 10000;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Outer safety timeout applied to sync-over-async Historian operations. Must be
|
||||||
|
/// comfortably larger than <see cref="CommandTimeoutSeconds"/>.
|
||||||
|
/// </summary>
|
||||||
|
public int RequestTimeoutSeconds { get; set; } = 60;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,621 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using StringCollection = System.Collections.Specialized.StringCollection;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using ArchestrA;
|
||||||
|
using Serilog;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Reads historical data from the Wonderware Historian via the aahClientManaged SDK.
|
||||||
|
/// OPC-UA-free — emits <see cref="HistorianSample"/>/<see cref="HistorianAggregateSample"/>
|
||||||
|
/// which the Proxy maps to OPC UA <c>DataValue</c> on its side of the IPC.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianDataSource : IHistorianDataSource
|
||||||
|
{
|
||||||
|
private static readonly ILogger Log = Serilog.Log.ForContext<HistorianDataSource>();
|
||||||
|
|
||||||
|
private readonly HistorianConfiguration _config;
|
||||||
|
private readonly object _connectionLock = new object();
|
||||||
|
private readonly object _eventConnectionLock = new object();
|
||||||
|
private readonly IHistorianConnectionFactory _factory;
|
||||||
|
private HistorianAccess? _connection;
|
||||||
|
private HistorianAccess? _eventConnection;
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
private readonly object _healthLock = new object();
|
||||||
|
private long _totalSuccesses;
|
||||||
|
private long _totalFailures;
|
||||||
|
private int _consecutiveFailures;
|
||||||
|
private DateTime? _lastSuccessTime;
|
||||||
|
private DateTime? _lastFailureTime;
|
||||||
|
private string? _lastError;
|
||||||
|
private string? _activeProcessNode;
|
||||||
|
private string? _activeEventNode;
|
||||||
|
|
||||||
|
private readonly HistorianClusterEndpointPicker _picker;
|
||||||
|
|
||||||
|
public HistorianDataSource(HistorianConfiguration config)
|
||||||
|
: this(config, new SdkHistorianConnectionFactory(), null) { }
|
||||||
|
|
||||||
|
internal HistorianDataSource(
|
||||||
|
HistorianConfiguration config,
|
||||||
|
IHistorianConnectionFactory factory,
|
||||||
|
HistorianClusterEndpointPicker? picker = null)
|
||||||
|
{
|
||||||
|
_config = config;
|
||||||
|
_factory = factory;
|
||||||
|
_picker = picker ?? new HistorianClusterEndpointPicker(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
private (HistorianAccess Connection, string Node) ConnectToAnyHealthyNode(HistorianConnectionType type)
|
||||||
|
{
|
||||||
|
var candidates = _picker.GetHealthyNodes();
|
||||||
|
if (candidates.Count == 0)
|
||||||
|
{
|
||||||
|
var total = _picker.NodeCount;
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
total == 0
|
||||||
|
? "No historian nodes configured"
|
||||||
|
: $"All {total} historian nodes are in cooldown — no healthy endpoints to connect to");
|
||||||
|
}
|
||||||
|
|
||||||
|
Exception? lastException = null;
|
||||||
|
foreach (var node in candidates)
|
||||||
|
{
|
||||||
|
var attemptConfig = CloneConfigWithServerName(node);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var conn = _factory.CreateAndConnect(attemptConfig, type);
|
||||||
|
_picker.MarkHealthy(node);
|
||||||
|
return (conn, node);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_picker.MarkFailed(node, ex.Message);
|
||||||
|
lastException = ex;
|
||||||
|
Log.Warning(ex, "Historian node {Node} failed during connect attempt; trying next candidate", node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var inner = lastException?.Message ?? "(no detail)";
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"All {candidates.Count} healthy historian candidate(s) failed during connect: {inner}",
|
||||||
|
lastException);
|
||||||
|
}
|
||||||
|
|
||||||
|
private HistorianConfiguration CloneConfigWithServerName(string serverName)
|
||||||
|
{
|
||||||
|
return new HistorianConfiguration
|
||||||
|
{
|
||||||
|
Enabled = _config.Enabled,
|
||||||
|
ServerName = serverName,
|
||||||
|
ServerNames = _config.ServerNames,
|
||||||
|
FailureCooldownSeconds = _config.FailureCooldownSeconds,
|
||||||
|
IntegratedSecurity = _config.IntegratedSecurity,
|
||||||
|
UserName = _config.UserName,
|
||||||
|
Password = _config.Password,
|
||||||
|
Port = _config.Port,
|
||||||
|
CommandTimeoutSeconds = _config.CommandTimeoutSeconds,
|
||||||
|
MaxValuesPerRead = _config.MaxValuesPerRead
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public HistorianHealthSnapshot GetHealthSnapshot()
|
||||||
|
{
|
||||||
|
var nodeStates = _picker.SnapshotNodeStates();
|
||||||
|
var healthyCount = 0;
|
||||||
|
foreach (var n in nodeStates)
|
||||||
|
if (n.IsHealthy) healthyCount++;
|
||||||
|
|
||||||
|
lock (_healthLock)
|
||||||
|
{
|
||||||
|
return new HistorianHealthSnapshot
|
||||||
|
{
|
||||||
|
TotalQueries = _totalSuccesses + _totalFailures,
|
||||||
|
TotalSuccesses = _totalSuccesses,
|
||||||
|
TotalFailures = _totalFailures,
|
||||||
|
ConsecutiveFailures = _consecutiveFailures,
|
||||||
|
LastSuccessTime = _lastSuccessTime,
|
||||||
|
LastFailureTime = _lastFailureTime,
|
||||||
|
LastError = _lastError,
|
||||||
|
ProcessConnectionOpen = Volatile.Read(ref _connection) != null,
|
||||||
|
EventConnectionOpen = Volatile.Read(ref _eventConnection) != null,
|
||||||
|
ActiveProcessNode = _activeProcessNode,
|
||||||
|
ActiveEventNode = _activeEventNode,
|
||||||
|
NodeCount = nodeStates.Count,
|
||||||
|
HealthyNodeCount = healthyCount,
|
||||||
|
Nodes = nodeStates
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void RecordSuccess()
|
||||||
|
{
|
||||||
|
lock (_healthLock)
|
||||||
|
{
|
||||||
|
_totalSuccesses++;
|
||||||
|
_lastSuccessTime = DateTime.UtcNow;
|
||||||
|
_consecutiveFailures = 0;
|
||||||
|
_lastError = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void RecordFailure(string error)
|
||||||
|
{
|
||||||
|
lock (_healthLock)
|
||||||
|
{
|
||||||
|
_totalFailures++;
|
||||||
|
_lastFailureTime = DateTime.UtcNow;
|
||||||
|
_consecutiveFailures++;
|
||||||
|
_lastError = error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void EnsureConnected()
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||||
|
|
||||||
|
if (Volatile.Read(ref _connection) != null) return;
|
||||||
|
|
||||||
|
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Process);
|
||||||
|
|
||||||
|
lock (_connectionLock)
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
{
|
||||||
|
conn.CloseConnection(out _);
|
||||||
|
conn.Dispose();
|
||||||
|
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_connection != null)
|
||||||
|
{
|
||||||
|
conn.CloseConnection(out _);
|
||||||
|
conn.Dispose();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_connection = conn;
|
||||||
|
lock (_healthLock) _activeProcessNode = winningNode;
|
||||||
|
Log.Information("Historian SDK connection opened to {Server}:{Port}", winningNode, _config.Port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleConnectionError(Exception? ex = null)
|
||||||
|
{
|
||||||
|
lock (_connectionLock)
|
||||||
|
{
|
||||||
|
if (_connection == null) return;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_connection.CloseConnection(out _);
|
||||||
|
_connection.Dispose();
|
||||||
|
}
|
||||||
|
catch (Exception disposeEx)
|
||||||
|
{
|
||||||
|
Log.Debug(disposeEx, "Error disposing Historian SDK connection during error recovery");
|
||||||
|
}
|
||||||
|
|
||||||
|
_connection = null;
|
||||||
|
string? failedNode;
|
||||||
|
lock (_healthLock)
|
||||||
|
{
|
||||||
|
failedNode = _activeProcessNode;
|
||||||
|
_activeProcessNode = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
|
||||||
|
Log.Warning(ex, "Historian SDK connection reset (node={Node})", failedNode ?? "(unknown)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void EnsureEventConnected()
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||||
|
|
||||||
|
if (Volatile.Read(ref _eventConnection) != null) return;
|
||||||
|
|
||||||
|
var (conn, winningNode) = ConnectToAnyHealthyNode(HistorianConnectionType.Event);
|
||||||
|
|
||||||
|
lock (_eventConnectionLock)
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
{
|
||||||
|
conn.CloseConnection(out _);
|
||||||
|
conn.Dispose();
|
||||||
|
throw new ObjectDisposedException(nameof(HistorianDataSource));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_eventConnection != null)
|
||||||
|
{
|
||||||
|
conn.CloseConnection(out _);
|
||||||
|
conn.Dispose();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_eventConnection = conn;
|
||||||
|
lock (_healthLock) _activeEventNode = winningNode;
|
||||||
|
Log.Information("Historian SDK event connection opened to {Server}:{Port}", winningNode, _config.Port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleEventConnectionError(Exception? ex = null)
|
||||||
|
{
|
||||||
|
lock (_eventConnectionLock)
|
||||||
|
{
|
||||||
|
if (_eventConnection == null) return;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_eventConnection.CloseConnection(out _);
|
||||||
|
_eventConnection.Dispose();
|
||||||
|
}
|
||||||
|
catch (Exception disposeEx)
|
||||||
|
{
|
||||||
|
Log.Debug(disposeEx, "Error disposing Historian SDK event connection during error recovery");
|
||||||
|
}
|
||||||
|
|
||||||
|
_eventConnection = null;
|
||||||
|
string? failedNode;
|
||||||
|
lock (_healthLock)
|
||||||
|
{
|
||||||
|
failedNode = _activeEventNode;
|
||||||
|
_activeEventNode = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failedNode != null) _picker.MarkFailed(failedNode, ex?.Message ?? "mid-query failure");
|
||||||
|
Log.Warning(ex, "Historian SDK event connection reset (node={Node})", failedNode ?? "(unknown)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadRawAsync(
|
||||||
|
string tagName, DateTime startTime, DateTime endTime, int maxValues,
|
||||||
|
CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
var results = new List<HistorianSample>();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
EnsureConnected();
|
||||||
|
|
||||||
|
using var query = _connection!.CreateHistoryQuery();
|
||||||
|
var args = new HistoryQueryArgs
|
||||||
|
{
|
||||||
|
TagNames = new StringCollection { tagName },
|
||||||
|
StartDateTime = startTime,
|
||||||
|
EndDateTime = endTime,
|
||||||
|
RetrievalMode = HistorianRetrievalMode.Full
|
||||||
|
};
|
||||||
|
|
||||||
|
if (maxValues > 0)
|
||||||
|
args.BatchSize = (uint)maxValues;
|
||||||
|
else if (_config.MaxValuesPerRead > 0)
|
||||||
|
args.BatchSize = (uint)_config.MaxValuesPerRead;
|
||||||
|
|
||||||
|
if (!query.StartQuery(args, out var error))
|
||||||
|
{
|
||||||
|
Log.Warning("Historian SDK raw query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
|
||||||
|
RecordFailure($"raw StartQuery: {error.ErrorCode}");
|
||||||
|
HandleConnectionError();
|
||||||
|
return Task.FromResult(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
var count = 0;
|
||||||
|
var limit = maxValues > 0 ? maxValues : _config.MaxValuesPerRead;
|
||||||
|
|
||||||
|
while (query.MoveNext(out error))
|
||||||
|
{
|
||||||
|
ct.ThrowIfCancellationRequested();
|
||||||
|
|
||||||
|
var result = query.QueryResult;
|
||||||
|
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
|
||||||
|
|
||||||
|
object? value;
|
||||||
|
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||||
|
value = result.StringValue;
|
||||||
|
else
|
||||||
|
value = result.Value;
|
||||||
|
|
||||||
|
results.Add(new HistorianSample
|
||||||
|
{
|
||||||
|
Value = value,
|
||||||
|
TimestampUtc = timestamp,
|
||||||
|
Quality = (byte)(result.OpcQuality & 0xFF),
|
||||||
|
});
|
||||||
|
|
||||||
|
count++;
|
||||||
|
if (limit > 0 && count >= limit) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
query.EndQuery(out _);
|
||||||
|
RecordSuccess();
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (ObjectDisposedException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Log.Warning(ex, "HistoryRead raw failed for {Tag}", tagName);
|
||||||
|
RecordFailure($"raw: {ex.Message}");
|
||||||
|
HandleConnectionError(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
|
||||||
|
tagName, results.Count, startTime, endTime);
|
||||||
|
|
||||||
|
return Task.FromResult(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(
|
||||||
|
string tagName, DateTime startTime, DateTime endTime,
|
||||||
|
double intervalMs, string aggregateColumn,
|
||||||
|
CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
var results = new List<HistorianAggregateSample>();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
EnsureConnected();
|
||||||
|
|
||||||
|
using var query = _connection!.CreateAnalogSummaryQuery();
|
||||||
|
var args = new AnalogSummaryQueryArgs
|
||||||
|
{
|
||||||
|
TagNames = new StringCollection { tagName },
|
||||||
|
StartDateTime = startTime,
|
||||||
|
EndDateTime = endTime,
|
||||||
|
Resolution = (ulong)intervalMs
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!query.StartQuery(args, out var error))
|
||||||
|
{
|
||||||
|
Log.Warning("Historian SDK aggregate query start failed for {Tag}: {Error}", tagName, error.ErrorCode);
|
||||||
|
RecordFailure($"aggregate StartQuery: {error.ErrorCode}");
|
||||||
|
HandleConnectionError();
|
||||||
|
return Task.FromResult(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (query.MoveNext(out error))
|
||||||
|
{
|
||||||
|
ct.ThrowIfCancellationRequested();
|
||||||
|
|
||||||
|
var result = query.QueryResult;
|
||||||
|
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
|
||||||
|
var value = ExtractAggregateValue(result, aggregateColumn);
|
||||||
|
|
||||||
|
results.Add(new HistorianAggregateSample
|
||||||
|
{
|
||||||
|
Value = value,
|
||||||
|
TimestampUtc = timestamp,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
query.EndQuery(out _);
|
||||||
|
RecordSuccess();
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (ObjectDisposedException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Log.Warning(ex, "HistoryRead aggregate failed for {Tag}", tagName);
|
||||||
|
RecordFailure($"aggregate: {ex.Message}");
|
||||||
|
HandleConnectionError(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
|
||||||
|
aggregateColumn, tagName, results.Count);
|
||||||
|
|
||||||
|
return Task.FromResult(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadAtTimeAsync(
|
||||||
|
string tagName, DateTime[] timestamps,
|
||||||
|
CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
var results = new List<HistorianSample>();
|
||||||
|
|
||||||
|
if (timestamps == null || timestamps.Length == 0)
|
||||||
|
return Task.FromResult(results);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
EnsureConnected();
|
||||||
|
|
||||||
|
foreach (var timestamp in timestamps)
|
||||||
|
{
|
||||||
|
ct.ThrowIfCancellationRequested();
|
||||||
|
|
||||||
|
using var query = _connection!.CreateHistoryQuery();
|
||||||
|
var args = new HistoryQueryArgs
|
||||||
|
{
|
||||||
|
TagNames = new StringCollection { tagName },
|
||||||
|
StartDateTime = timestamp,
|
||||||
|
EndDateTime = timestamp,
|
||||||
|
RetrievalMode = HistorianRetrievalMode.Interpolated,
|
||||||
|
BatchSize = 1
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!query.StartQuery(args, out var error))
|
||||||
|
{
|
||||||
|
results.Add(new HistorianSample
|
||||||
|
{
|
||||||
|
Value = null,
|
||||||
|
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||||
|
Quality = 0, // Bad
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query.MoveNext(out error))
|
||||||
|
{
|
||||||
|
var result = query.QueryResult;
|
||||||
|
object? value;
|
||||||
|
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||||
|
value = result.StringValue;
|
||||||
|
else
|
||||||
|
value = result.Value;
|
||||||
|
|
||||||
|
results.Add(new HistorianSample
|
||||||
|
{
|
||||||
|
Value = value,
|
||||||
|
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||||
|
Quality = (byte)(result.OpcQuality & 0xFF),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
results.Add(new HistorianSample
|
||||||
|
{
|
||||||
|
Value = null,
|
||||||
|
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||||
|
Quality = 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
query.EndQuery(out _);
|
||||||
|
}
|
||||||
|
RecordSuccess();
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (ObjectDisposedException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Log.Warning(ex, "HistoryRead at-time failed for {Tag}", tagName);
|
||||||
|
RecordFailure($"at-time: {ex.Message}");
|
||||||
|
HandleConnectionError(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
Log.Debug("HistoryRead at-time: {Tag} returned {Count} values for {Timestamps} timestamps",
|
||||||
|
tagName, results.Count, timestamps.Length);
|
||||||
|
|
||||||
|
return Task.FromResult(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<List<HistorianEventDto>> ReadEventsAsync(
|
||||||
|
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
|
||||||
|
CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
var results = new List<HistorianEventDto>();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
EnsureEventConnected();
|
||||||
|
|
||||||
|
using var query = _eventConnection!.CreateEventQuery();
|
||||||
|
var args = new EventQueryArgs
|
||||||
|
{
|
||||||
|
StartDateTime = startTime,
|
||||||
|
EndDateTime = endTime,
|
||||||
|
EventCount = maxEvents > 0 ? (uint)maxEvents : (uint)_config.MaxValuesPerRead,
|
||||||
|
QueryType = HistorianEventQueryType.Events,
|
||||||
|
EventOrder = HistorianEventOrder.Ascending
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!string.IsNullOrEmpty(sourceName))
|
||||||
|
{
|
||||||
|
query.AddEventFilter("Source", HistorianComparisionType.Equal, sourceName, out _);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!query.StartQuery(args, out var error))
|
||||||
|
{
|
||||||
|
Log.Warning("Historian SDK event query start failed: {Error}", error.ErrorCode);
|
||||||
|
RecordFailure($"events StartQuery: {error.ErrorCode}");
|
||||||
|
HandleEventConnectionError();
|
||||||
|
return Task.FromResult(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
var count = 0;
|
||||||
|
while (query.MoveNext(out error))
|
||||||
|
{
|
||||||
|
ct.ThrowIfCancellationRequested();
|
||||||
|
results.Add(ToDto(query.QueryResult));
|
||||||
|
count++;
|
||||||
|
if (maxEvents > 0 && count >= maxEvents) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
query.EndQuery(out _);
|
||||||
|
RecordSuccess();
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (ObjectDisposedException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Log.Warning(ex, "HistoryRead events failed for source {Source}", sourceName ?? "(all)");
|
||||||
|
RecordFailure($"events: {ex.Message}");
|
||||||
|
HandleEventConnectionError(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
Log.Debug("HistoryRead events: source={Source} returned {Count} events ({Start} to {End})",
|
||||||
|
sourceName ?? "(all)", results.Count, startTime, endTime);
|
||||||
|
|
||||||
|
return Task.FromResult(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HistorianEventDto ToDto(HistorianEvent evt)
|
||||||
|
{
|
||||||
|
// The ArchestrA SDK marks these properties obsolete but still returns them; their
|
||||||
|
// successors aren't wired in the version we bind against. Using them is the documented
|
||||||
|
// v1 behavior — suppressed locally instead of project-wide so any non-event use of
|
||||||
|
// deprecated SDK surface still surfaces as an error.
|
||||||
|
#pragma warning disable CS0618
|
||||||
|
return new HistorianEventDto
|
||||||
|
{
|
||||||
|
Id = evt.Id,
|
||||||
|
Source = evt.Source,
|
||||||
|
EventTime = evt.EventTime,
|
||||||
|
ReceivedTime = evt.ReceivedTime,
|
||||||
|
DisplayText = evt.DisplayText,
|
||||||
|
Severity = (ushort)evt.Severity
|
||||||
|
};
|
||||||
|
#pragma warning restore CS0618
|
||||||
|
}
|
||||||
|
|
||||||
|
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
|
||||||
|
{
|
||||||
|
switch (column)
|
||||||
|
{
|
||||||
|
case "Average": return result.Average;
|
||||||
|
case "Minimum": return result.Minimum;
|
||||||
|
case "Maximum": return result.Maximum;
|
||||||
|
case "ValueCount": return result.ValueCount;
|
||||||
|
case "First": return result.First;
|
||||||
|
case "Last": return result.Last;
|
||||||
|
case "StdDev": return result.StdDev;
|
||||||
|
default: return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
_disposed = true;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_connection?.CloseConnection(out _);
|
||||||
|
_connection?.Dispose();
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Log.Warning(ex, "Error closing Historian SDK connection");
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_eventConnection?.CloseConnection(out _);
|
||||||
|
_eventConnection?.Dispose();
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Log.Warning(ex, "Error closing Historian SDK event connection");
|
||||||
|
}
|
||||||
|
|
||||||
|
_connection = null;
|
||||||
|
_eventConnection = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// SDK-free representation of a Historian event record. Prevents ArchestrA types from
|
||||||
|
/// leaking beyond <c>HistorianDataSource</c>.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianEventDto
|
||||||
|
{
|
||||||
|
public Guid Id { get; set; }
|
||||||
|
public string? Source { get; set; }
|
||||||
|
public DateTime EventTime { get; set; }
|
||||||
|
public DateTime ReceivedTime { get; set; }
|
||||||
|
public string? DisplayText { get; set; }
|
||||||
|
public ushort Severity { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Point-in-time runtime health of the historian subsystem — consumed by the status dashboard
|
||||||
|
/// via an IPC health query (not wired in PR #5; deferred).
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianHealthSnapshot
|
||||||
|
{
|
||||||
|
public long TotalQueries { get; set; }
|
||||||
|
public long TotalSuccesses { get; set; }
|
||||||
|
public long TotalFailures { get; set; }
|
||||||
|
public int ConsecutiveFailures { get; set; }
|
||||||
|
public DateTime? LastSuccessTime { get; set; }
|
||||||
|
public DateTime? LastFailureTime { get; set; }
|
||||||
|
public string? LastError { get; set; }
|
||||||
|
public bool ProcessConnectionOpen { get; set; }
|
||||||
|
public bool EventConnectionOpen { get; set; }
|
||||||
|
public string? ActiveProcessNode { get; set; }
|
||||||
|
public string? ActiveEventNode { get; set; }
|
||||||
|
public int NodeCount { get; set; }
|
||||||
|
public int HealthyNodeCount { get; set; }
|
||||||
|
public List<HistorianClusterNodeState> Nodes { get; set; } = new();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// OPC-UA-free representation of a single historical data point. The Host returns these
|
||||||
|
/// across the IPC boundary as <c>GalaxyDataValue</c>; the Proxy maps quality and value to
|
||||||
|
/// OPC UA <c>DataValue</c>. Raw MX quality byte is preserved so the Proxy can use the same
|
||||||
|
/// quality mapper it already uses for live reads.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianSample
|
||||||
|
{
|
||||||
|
public object? Value { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality).</summary>
|
||||||
|
public byte Quality { get; set; }
|
||||||
|
|
||||||
|
public DateTime TimestampUtc { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Result of <see cref="IHistorianDataSource.ReadAggregateAsync"/>. When <see cref="Value"/> is
|
||||||
|
/// null the aggregate is unavailable for that bucket (Proxy maps to <c>BadNoData</c>).
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianAggregateSample
|
||||||
|
{
|
||||||
|
public double? Value { get; set; }
|
||||||
|
public DateTime TimestampUtc { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,73 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using ArchestrA;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Creates and opens Historian SDK connections. Extracted so tests can inject fakes that
|
||||||
|
/// control connection success, failure, and timeout behavior.
|
||||||
|
/// </summary>
|
||||||
|
internal interface IHistorianConnectionFactory
|
||||||
|
{
|
||||||
|
HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Production implementation — opens real Historian SDK connections.</summary>
|
||||||
|
internal sealed class SdkHistorianConnectionFactory : IHistorianConnectionFactory
|
||||||
|
{
|
||||||
|
public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type)
|
||||||
|
{
|
||||||
|
var conn = new HistorianAccess();
|
||||||
|
|
||||||
|
var args = new HistorianConnectionArgs
|
||||||
|
{
|
||||||
|
ServerName = config.ServerName,
|
||||||
|
TcpPort = (ushort)config.Port,
|
||||||
|
IntegratedSecurity = config.IntegratedSecurity,
|
||||||
|
UseArchestrAUser = config.IntegratedSecurity,
|
||||||
|
ConnectionType = type,
|
||||||
|
ReadOnly = true,
|
||||||
|
PacketTimeout = (uint)(config.CommandTimeoutSeconds * 1000)
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!config.IntegratedSecurity)
|
||||||
|
{
|
||||||
|
args.UserName = config.UserName ?? string.Empty;
|
||||||
|
args.Password = config.Password ?? string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!conn.OpenConnection(args, out var error))
|
||||||
|
{
|
||||||
|
conn.Dispose();
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Failed to open Historian SDK connection to {config.ServerName}:{config.Port}: {error.ErrorCode}");
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeoutMs = config.CommandTimeoutSeconds * 1000;
|
||||||
|
var elapsed = 0;
|
||||||
|
while (elapsed < timeoutMs)
|
||||||
|
{
|
||||||
|
var status = new HistorianConnectionStatus();
|
||||||
|
conn.GetConnectionStatus(ref status);
|
||||||
|
|
||||||
|
if (status.ConnectedToServer)
|
||||||
|
return conn;
|
||||||
|
|
||||||
|
if (status.ErrorOccurred)
|
||||||
|
{
|
||||||
|
conn.Dispose();
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Historian SDK connection failed: {status.Error}");
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.Sleep(250);
|
||||||
|
elapsed += 250;
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.Dispose();
|
||||||
|
throw new TimeoutException(
|
||||||
|
$"Historian SDK connection to {config.ServerName}:{config.Port} timed out after {config.CommandTimeoutSeconds}s");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// OPC-UA-free surface for the Wonderware Historian subsystem inside Galaxy.Host.
|
||||||
|
/// Implementations read via the aahClient* SDK; the Proxy side maps returned samples
|
||||||
|
/// to OPC UA <c>DataValue</c>.
|
||||||
|
/// </summary>
|
||||||
|
public interface IHistorianDataSource : IDisposable
|
||||||
|
{
|
||||||
|
Task<List<HistorianSample>> ReadRawAsync(
|
||||||
|
string tagName, DateTime startTime, DateTime endTime, int maxValues,
|
||||||
|
CancellationToken ct = default);
|
||||||
|
|
||||||
|
Task<List<HistorianAggregateSample>> ReadAggregateAsync(
|
||||||
|
string tagName, DateTime startTime, DateTime endTime,
|
||||||
|
double intervalMs, string aggregateColumn,
|
||||||
|
CancellationToken ct = default);
|
||||||
|
|
||||||
|
Task<List<HistorianSample>> ReadAtTimeAsync(
|
||||||
|
string tagName, DateTime[] timestamps,
|
||||||
|
CancellationToken ct = default);
|
||||||
|
|
||||||
|
Task<List<HistorianEventDto>> ReadEventsAsync(
|
||||||
|
string? sourceName, DateTime startTime, DateTime endTime, int maxEvents,
|
||||||
|
CancellationToken ct = default);
|
||||||
|
|
||||||
|
HistorianHealthSnapshot GetHealthSnapshot();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -14,6 +14,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public interface IGalaxyBackend
|
public interface IGalaxyBackend
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Server-pushed events the backend raises asynchronously (data-change, alarm,
|
||||||
|
/// host-status). The frame handler subscribes once on connect and forwards each
|
||||||
|
/// event to the Proxy as a typed <see cref="MessageKind"/> notification.
|
||||||
|
/// </summary>
|
||||||
|
event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||||
|
event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||||
|
event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||||
|
|
||||||
Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct);
|
Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct);
|
||||||
Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct);
|
Task CloseSessionAsync(CloseSessionRequest req, CancellationToken ct);
|
||||||
|
|
||||||
@@ -29,6 +38,7 @@ public interface IGalaxyBackend
|
|||||||
Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct);
|
Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct);
|
||||||
|
|
||||||
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
|
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
|
||||||
|
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
|
||||||
|
|
||||||
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
|
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using ArchestrA.MxAccess;
|
using ArchestrA.MxAccess;
|
||||||
@@ -20,6 +21,7 @@ public sealed class MxAccessClient : IDisposable
|
|||||||
private readonly StaPump _pump;
|
private readonly StaPump _pump;
|
||||||
private readonly IMxProxy _proxy;
|
private readonly IMxProxy _proxy;
|
||||||
private readonly string _clientName;
|
private readonly string _clientName;
|
||||||
|
private readonly MxAccessClientOptions _options;
|
||||||
|
|
||||||
// Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read).
|
// Galaxy attribute reference → MXAccess item handle (set on first Subscribe/Read).
|
||||||
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
|
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
|
||||||
@@ -30,21 +32,32 @@ public sealed class MxAccessClient : IDisposable
|
|||||||
|
|
||||||
private int _connectionHandle;
|
private int _connectionHandle;
|
||||||
private bool _connected;
|
private bool _connected;
|
||||||
|
private DateTime _lastObservedActivityUtc = DateTime.UtcNow;
|
||||||
|
private CancellationTokenSource? _monitorCts;
|
||||||
|
private int _reconnectCount;
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName)
|
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
|
||||||
|
public event EventHandler<bool>? ConnectionStateChanged;
|
||||||
|
|
||||||
|
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
|
||||||
{
|
{
|
||||||
_pump = pump;
|
_pump = pump;
|
||||||
_proxy = proxy;
|
_proxy = proxy;
|
||||||
_clientName = clientName;
|
_clientName = clientName;
|
||||||
|
_options = options ?? new MxAccessClientOptions();
|
||||||
_proxy.OnDataChange += OnDataChange;
|
_proxy.OnDataChange += OnDataChange;
|
||||||
_proxy.OnWriteComplete += OnWriteComplete;
|
_proxy.OnWriteComplete += OnWriteComplete;
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool IsConnected => _connected;
|
public bool IsConnected => _connected;
|
||||||
public int SubscriptionCount => _subscriptions.Count;
|
public int SubscriptionCount => _subscriptions.Count;
|
||||||
|
public int ReconnectCount => _reconnectCount;
|
||||||
|
|
||||||
/// <summary>Connects on the STA thread. Idempotent.</summary>
|
/// <summary>Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call.</summary>
|
||||||
public Task<int> ConnectAsync() => _pump.InvokeAsync(() =>
|
public async Task<int> ConnectAsync()
|
||||||
|
{
|
||||||
|
var handle = await _pump.InvokeAsync(() =>
|
||||||
{
|
{
|
||||||
if (_connected) return _connectionHandle;
|
if (_connected) return _connectionHandle;
|
||||||
_connectionHandle = _proxy.Register(_clientName);
|
_connectionHandle = _proxy.Register(_clientName);
|
||||||
@@ -52,7 +65,23 @@ public sealed class MxAccessClient : IDisposable
|
|||||||
return _connectionHandle;
|
return _connectionHandle;
|
||||||
});
|
});
|
||||||
|
|
||||||
public Task DisconnectAsync() => _pump.InvokeAsync(() =>
|
ConnectionStateChanged?.Invoke(this, true);
|
||||||
|
|
||||||
|
if (_options.AutoReconnect && _monitorCts is null)
|
||||||
|
{
|
||||||
|
_monitorCts = new CancellationTokenSource();
|
||||||
|
_ = Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
|
||||||
|
}
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task DisconnectAsync()
|
||||||
|
{
|
||||||
|
_monitorCts?.Cancel();
|
||||||
|
_monitorCts = null;
|
||||||
|
|
||||||
|
await _pump.InvokeAsync(() =>
|
||||||
{
|
{
|
||||||
if (!_connected) return;
|
if (!_connected) return;
|
||||||
try { _proxy.Unregister(_connectionHandle); }
|
try { _proxy.Unregister(_connectionHandle); }
|
||||||
@@ -64,6 +93,88 @@ public sealed class MxAccessClient : IDisposable
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
ConnectionStateChanged?.Invoke(this, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Background loop that watches for connection liveness signals and triggers
|
||||||
|
/// reconnect-with-replay when the connection appears dead. Per Phase 2 high finding #2:
|
||||||
|
/// v1's MxAccessClient.Monitor pattern lifted into the new pump-based client. Uses
|
||||||
|
/// observed-activity timestamp + optional probe-tag subscription. Without an explicit
|
||||||
|
/// probe tag, falls back to "no data change in N seconds + no successful read in N
|
||||||
|
/// seconds = unhealthy" — same shape as v1.
|
||||||
|
/// </summary>
|
||||||
|
private async Task MonitorLoopAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try { await Task.Delay(_options.MonitorInterval, ct); }
|
||||||
|
catch (OperationCanceledException) { break; }
|
||||||
|
|
||||||
|
if (!_connected || _disposed) continue;
|
||||||
|
|
||||||
|
var idle = DateTime.UtcNow - _lastObservedActivityUtc;
|
||||||
|
if (idle <= _options.StaleThreshold) continue;
|
||||||
|
|
||||||
|
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
|
||||||
|
// our reconnect signal.
|
||||||
|
bool probeOk;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
probeOk = await _pump.InvokeAsync(() =>
|
||||||
|
{
|
||||||
|
// AddItem on the connection handle is cheap and round-trips through COM.
|
||||||
|
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
|
||||||
|
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
|
||||||
|
catch { return false; }
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch { probeOk = false; }
|
||||||
|
|
||||||
|
if (probeOk)
|
||||||
|
{
|
||||||
|
_lastObservedActivityUtc = DateTime.UtcNow;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connection appears dead — reconnect-with-replay.
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _pump.InvokeAsync(() =>
|
||||||
|
{
|
||||||
|
try { _proxy.Unregister(_connectionHandle); } catch { /* dead anyway */ }
|
||||||
|
_connected = false;
|
||||||
|
});
|
||||||
|
ConnectionStateChanged?.Invoke(this, false);
|
||||||
|
|
||||||
|
await _pump.InvokeAsync(() =>
|
||||||
|
{
|
||||||
|
_connectionHandle = _proxy.Register(_clientName);
|
||||||
|
_connected = true;
|
||||||
|
});
|
||||||
|
_reconnectCount++;
|
||||||
|
ConnectionStateChanged?.Invoke(this, true);
|
||||||
|
|
||||||
|
// Replay every subscription that was active before the disconnect.
|
||||||
|
var snapshot = _addressToHandle.Keys.ToArray();
|
||||||
|
_addressToHandle.Clear();
|
||||||
|
_handleToAddress.Clear();
|
||||||
|
foreach (var fullRef in snapshot)
|
||||||
|
{
|
||||||
|
try { await SubscribeOnPumpAsync(fullRef); }
|
||||||
|
catch { /* skip — operator can re-subscribe */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
_lastObservedActivityUtc = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Reconnect failed; back off and retry on the next tick.
|
||||||
|
_connected = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// One-shot read implemented as a transient subscribe + unsubscribe.
|
/// One-shot read implemented as a transient subscribe + unsubscribe.
|
||||||
/// <c>LMXProxyServer</c> doesn't expose a synchronous read, so the canonical pattern
|
/// <c>LMXProxyServer</c> doesn't expose a synchronous read, so the canonical pattern
|
||||||
@@ -79,26 +190,72 @@ public sealed class MxAccessClient : IDisposable
|
|||||||
|
|
||||||
// Stash the one-shot handler before sending the subscribe, then remove it after firing.
|
// Stash the one-shot handler before sending the subscribe, then remove it after firing.
|
||||||
_subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot));
|
_subscriptions.AddOrUpdate(fullReference, oneShot, (_, existing) => Combine(existing, oneShot));
|
||||||
|
var addedToReadOnlyAttribute = !_addressToHandle.ContainsKey(fullReference);
|
||||||
|
|
||||||
var itemHandle = await SubscribeOnPumpAsync(fullReference);
|
try
|
||||||
|
{
|
||||||
|
await SubscribeOnPumpAsync(fullReference);
|
||||||
|
|
||||||
using var _ = ct.Register(() => tcs.TrySetCanceled());
|
using var _ = ct.Register(() => tcs.TrySetCanceled());
|
||||||
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
|
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(timeout, ct));
|
||||||
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
|
if (raceTask != tcs.Task) throw new TimeoutException($"MXAccess read of {fullReference} timed out after {timeout}");
|
||||||
|
|
||||||
// Detach the one-shot handler.
|
return await tcs.Task;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
// High 1 — always detach the one-shot handler, even on cancellation/timeout/throw.
|
||||||
|
// If we were the one who added the underlying MXAccess subscription (no other
|
||||||
|
// caller had it), tear it down too so we don't leak a probe item handle.
|
||||||
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
|
_subscriptions.AddOrUpdate(fullReference, _ => default!, (_, existing) => Remove(existing, oneShot));
|
||||||
|
if (addedToReadOnlyAttribute)
|
||||||
|
{
|
||||||
|
try { await UnsubscribeAsync(fullReference); }
|
||||||
|
catch { /* shutdown-best-effort */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Writes <paramref name="value"/> to the runtime and AWAITS the OnWriteComplete
|
||||||
|
/// callback so the caller learns the actual write status. Per Phase 2 medium finding #4
|
||||||
|
/// in <c>exit-gate-phase-2.md</c>: the previous fire-and-forget version returned a
|
||||||
|
/// false-positive Good even when the runtime rejected the write post-callback.
|
||||||
|
/// </summary>
|
||||||
|
public async Task<bool> WriteAsync(string fullReference, object value,
|
||||||
|
int securityClassification = 0, TimeSpan? timeout = null)
|
||||||
|
{
|
||||||
|
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
||||||
|
var actualTimeout = timeout ?? TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
var itemHandle = await _pump.InvokeAsync(() => ResolveItem(fullReference));
|
||||||
|
|
||||||
|
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
if (!_pendingWrites.TryAdd(itemHandle, tcs))
|
||||||
|
{
|
||||||
|
// A prior write to the same item handle is still pending — uncommon but possible
|
||||||
|
// if the caller spammed writes. Replace it: the older TCS observes a Cancelled task.
|
||||||
|
if (_pendingWrites.TryRemove(itemHandle, out var prior))
|
||||||
|
prior.TrySetCanceled();
|
||||||
|
_pendingWrites[itemHandle] = tcs;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _pump.InvokeAsync(() =>
|
||||||
|
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification));
|
||||||
|
|
||||||
|
var raceTask = await Task.WhenAny(tcs.Task, Task.Delay(actualTimeout));
|
||||||
|
if (raceTask != tcs.Task)
|
||||||
|
throw new TimeoutException($"MXAccess write of {fullReference} timed out after {actualTimeout}");
|
||||||
|
|
||||||
return await tcs.Task;
|
return await tcs.Task;
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
public Task WriteAsync(string fullReference, object value, int securityClassification = 0) =>
|
|
||||||
_pump.InvokeAsync(() =>
|
|
||||||
{
|
{
|
||||||
if (!_connected) throw new InvalidOperationException("MxAccessClient not connected");
|
_pendingWrites.TryRemove(itemHandle, out _);
|
||||||
var itemHandle = ResolveItem(fullReference);
|
}
|
||||||
_proxy.Write(_connectionHandle, itemHandle, value, securityClassification);
|
}
|
||||||
});
|
|
||||||
|
|
||||||
public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback)
|
public async Task SubscribeAsync(string fullReference, Action<string, Vtq> callback)
|
||||||
{
|
{
|
||||||
@@ -148,6 +305,9 @@ public sealed class MxAccessClient : IDisposable
|
|||||||
{
|
{
|
||||||
if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return;
|
if (!_handleToAddress.TryGetValue(phItemHandle, out var fullRef)) return;
|
||||||
|
|
||||||
|
// Liveness: any data-change event is proof the connection is alive.
|
||||||
|
_lastObservedActivityUtc = DateTime.UtcNow;
|
||||||
|
|
||||||
var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow;
|
var ts = pftItemTimeStamp is DateTime dt ? dt.ToUniversalTime() : DateTime.UtcNow;
|
||||||
var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality));
|
var quality = (byte)Math.Min(255, Math.Max(0, pwItemQuality));
|
||||||
var vtq = new Vtq(pvItemValue, ts, quality);
|
var vtq = new Vtq(pvItemValue, ts, quality);
|
||||||
@@ -169,10 +329,30 @@ public sealed class MxAccessClient : IDisposable
|
|||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
|
_disposed = true;
|
||||||
|
_monitorCts?.Cancel();
|
||||||
|
|
||||||
try { DisconnectAsync().GetAwaiter().GetResult(); }
|
try { DisconnectAsync().GetAwaiter().GetResult(); }
|
||||||
catch { /* swallow */ }
|
catch { /* swallow */ }
|
||||||
|
|
||||||
_proxy.OnDataChange -= OnDataChange;
|
_proxy.OnDataChange -= OnDataChange;
|
||||||
_proxy.OnWriteComplete -= OnWriteComplete;
|
_proxy.OnWriteComplete -= OnWriteComplete;
|
||||||
|
_monitorCts?.Dispose();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Tunables for <see cref="MxAccessClient"/>'s reconnect monitor. Defaults match the v1
|
||||||
|
/// monitor's polling cadence so behavior is consistent across the lift.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class MxAccessClientOptions
|
||||||
|
{
|
||||||
|
/// <summary>Whether to start the background monitor at connect time.</summary>
|
||||||
|
public bool AutoReconnect { get; init; } = true;
|
||||||
|
|
||||||
|
/// <summary>How often the monitor wakes up to check liveness.</summary>
|
||||||
|
public TimeSpan MonitorInterval { get; init; } = TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
/// <summary>If no data-change activity in this window, the monitor probes the connection.</summary>
|
||||||
|
public TimeSpan StaleThreshold { get; init; } = TimeSpan.FromSeconds(60);
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ using System.Threading;
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using MessagePack;
|
using MessagePack;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||||
|
|
||||||
@@ -18,20 +19,31 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
|||||||
/// MxAccess <c>AlarmExtension</c> primitives but the wire-up is also Phase 2 follow-up
|
/// MxAccess <c>AlarmExtension</c> primitives but the wire-up is also Phase 2 follow-up
|
||||||
/// (the v1 alarm subsystem is its own subtree).
|
/// (the v1 alarm subsystem is its own subtree).
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
||||||
{
|
{
|
||||||
private readonly GalaxyRepository _repository;
|
private readonly GalaxyRepository _repository;
|
||||||
private readonly MxAccessClient _mx;
|
private readonly MxAccessClient _mx;
|
||||||
|
private readonly IHistorianDataSource? _historian;
|
||||||
private long _nextSessionId;
|
private long _nextSessionId;
|
||||||
private long _nextSubscriptionId;
|
private long _nextSubscriptionId;
|
||||||
|
|
||||||
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
|
// Active SubscriptionId → MXAccess full reference list — so Unsubscribe can find them.
|
||||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, IReadOnlyList<string>> _subs = new();
|
||||||
|
// Reverse lookup: tag reference → subscription IDs subscribed to it (one tag may belong to many).
|
||||||
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, System.Collections.Concurrent.ConcurrentBag<long>>
|
||||||
|
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx)
|
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||||
|
#pragma warning disable CS0067 // event not yet raised — alarm + host-status wire-up in PR #4 follow-up
|
||||||
|
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||||
|
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||||
|
#pragma warning restore CS0067
|
||||||
|
|
||||||
|
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
|
||||||
{
|
{
|
||||||
_repository = repository;
|
_repository = repository;
|
||||||
_mx = mx;
|
_mx = mx;
|
||||||
|
_historian = historian;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||||
@@ -120,8 +132,13 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
|||||||
? null
|
? null
|
||||||
: MessagePackSerializer.Deserialize<object>(w.ValueBytes);
|
: MessagePackSerializer.Deserialize<object>(w.ValueBytes);
|
||||||
|
|
||||||
await _mx.WriteAsync(w.TagReference, value!);
|
var ok = await _mx.WriteAsync(w.TagReference, value!);
|
||||||
results.Add(new WriteValueResult { TagReference = w.TagReference, StatusCode = 0 });
|
results.Add(new WriteValueResult
|
||||||
|
{
|
||||||
|
TagReference = w.TagReference,
|
||||||
|
StatusCode = ok ? 0u : 0x80020000u, // Good or Bad_InternalError
|
||||||
|
Error = ok ? null : "MXAccess runtime reported write failure",
|
||||||
|
});
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -137,12 +154,16 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// For each requested tag, register a subscription that publishes back via the
|
|
||||||
// shared MXAccess data-change handler. The OnDataChange push frame to the Proxy
|
|
||||||
// is wired in the upcoming subscription-push pass; for now the value is captured
|
|
||||||
// for the first ReadAsync to hit it (so the subscribe surface itself is functional).
|
|
||||||
foreach (var tag in req.TagReferences)
|
foreach (var tag in req.TagReferences)
|
||||||
await _mx.SubscribeAsync(tag, (_, __) => { /* push-frame plumbing in next iteration */ });
|
{
|
||||||
|
_refToSubs.AddOrUpdate(tag,
|
||||||
|
_ => new System.Collections.Concurrent.ConcurrentBag<long> { sid },
|
||||||
|
(_, bag) => { bag.Add(sid); return bag; });
|
||||||
|
|
||||||
|
// The MXAccess SubscribeAsync only takes one callback per tag; the same callback
|
||||||
|
// fires for every active subscription of that tag — we fan out by SubscriptionId.
|
||||||
|
await _mx.SubscribeAsync(tag, OnTagValueChanged);
|
||||||
|
}
|
||||||
|
|
||||||
_subs[sid] = req.TagReferences;
|
_subs[sid] = req.TagReferences;
|
||||||
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
|
return new SubscribeResponse { Success = true, SubscriptionId = sid, ActualIntervalMs = req.RequestedIntervalMs };
|
||||||
@@ -157,23 +178,139 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
|||||||
{
|
{
|
||||||
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
|
if (!_subs.TryRemove(req.SubscriptionId, out var refs)) return;
|
||||||
foreach (var r in refs)
|
foreach (var r in refs)
|
||||||
|
{
|
||||||
|
// Drop this subscription from the reverse map; only unsubscribe from MXAccess if no
|
||||||
|
// other subscription is still listening (multiple Proxy subs may share a tag).
|
||||||
|
_refToSubs.TryGetValue(r, out var bag);
|
||||||
|
if (bag is not null)
|
||||||
|
{
|
||||||
|
var remaining = new System.Collections.Concurrent.ConcurrentBag<long>(
|
||||||
|
bag.Where(id => id != req.SubscriptionId));
|
||||||
|
if (remaining.IsEmpty)
|
||||||
|
{
|
||||||
|
_refToSubs.TryRemove(r, out _);
|
||||||
await _mx.UnsubscribeAsync(r);
|
await _mx.UnsubscribeAsync(r);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_refToSubs[r] = remaining;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fires for every value change on any subscribed Galaxy attribute. Wraps the value in
|
||||||
|
/// a <see cref="GalaxyDataValue"/> and raises <see cref="OnDataChange"/> once per
|
||||||
|
/// subscription that includes this tag — the IPC sink translates that into outbound
|
||||||
|
/// <c>OnDataChangeNotification</c> frames.
|
||||||
|
/// </summary>
|
||||||
|
private void OnTagValueChanged(string fullReference, MxAccess.Vtq vtq)
|
||||||
|
{
|
||||||
|
if (!_refToSubs.TryGetValue(fullReference, out var bag) || bag.IsEmpty) return;
|
||||||
|
|
||||||
|
var wireValue = ToWire(fullReference, vtq);
|
||||||
|
// Emit one notification per active SubscriptionId for this tag — the Proxy fans out to
|
||||||
|
// each ISubscribable consumer based on the SubscriptionId in the payload.
|
||||||
|
foreach (var sid in bag.Distinct())
|
||||||
|
{
|
||||||
|
OnDataChange?.Invoke(this, new OnDataChangeNotification
|
||||||
|
{
|
||||||
|
SubscriptionId = sid,
|
||||||
|
Values = new[] { wireValue },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
|
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
|
||||||
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
|
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
|
||||||
|
|
||||||
public Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
|
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new HistoryReadResponse
|
{
|
||||||
|
if (_historian is null)
|
||||||
|
return new HistoryReadResponse
|
||||||
{
|
{
|
||||||
Success = false,
|
Success = false,
|
||||||
Error = "Wonderware Historian plugin loader not yet wired (Phase 2 Task B.1.h follow-up)",
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
||||||
Tags = Array.Empty<HistoryTagValues>(),
|
Tags = Array.Empty<HistoryTagValues>(),
|
||||||
|
};
|
||||||
|
|
||||||
|
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
||||||
|
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
||||||
|
var tags = new List<HistoryTagValues>(req.TagReferences.Length);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
foreach (var reference in req.TagReferences)
|
||||||
|
{
|
||||||
|
var samples = await _historian.ReadRawAsync(reference, start, end, (int)req.MaxValuesPerTag, ct).ConfigureAwait(false);
|
||||||
|
tags.Add(new HistoryTagValues
|
||||||
|
{
|
||||||
|
TagReference = reference,
|
||||||
|
Values = samples.Select(s => ToWire(reference, s)).ToArray(),
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
return new HistoryReadResponse { Success = true, Tags = tags.ToArray() };
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new HistoryReadResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = $"Historian read failed: {ex.Message}",
|
||||||
|
Tags = tags.ToArray(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
|
||||||
|
HistoryReadProcessedRequest req, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (_historian is null)
|
||||||
|
return new HistoryReadProcessedResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
||||||
|
Values = Array.Empty<GalaxyDataValue>(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (req.IntervalMs <= 0)
|
||||||
|
return new HistoryReadProcessedResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "HistoryReadProcessed requires IntervalMs > 0",
|
||||||
|
Values = Array.Empty<GalaxyDataValue>(),
|
||||||
|
};
|
||||||
|
|
||||||
|
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
||||||
|
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var samples = await _historian.ReadAggregateAsync(
|
||||||
|
req.TagReference, start, end, req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false);
|
||||||
|
|
||||||
|
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
|
||||||
|
return new HistoryReadProcessedResponse { Success = true, Values = wire };
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new HistoryReadProcessedResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = $"Historian aggregate read failed: {ex.Message}",
|
||||||
|
Values = Array.Empty<GalaxyDataValue>(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||||
|
|
||||||
|
public void Dispose() => _historian?.Dispose();
|
||||||
|
|
||||||
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
|
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
|
||||||
{
|
{
|
||||||
TagReference = reference,
|
TagReference = reference,
|
||||||
@@ -184,6 +321,47 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend
|
|||||||
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maps a <see cref="HistorianSample"/> (raw historian row, OPC-UA-free) to the IPC wire
|
||||||
|
/// shape. The Proxy decodes the MessagePack value and maps <see cref="HistorianSample.Quality"/>
|
||||||
|
/// through <c>QualityMapper</c> on its side of the pipe — we keep the raw byte here so
|
||||||
|
/// rich OPC DA status codes (e.g. <c>BadNotConnected</c>, <c>UncertainSubNormal</c>) survive
|
||||||
|
/// the hop intact.
|
||||||
|
/// </summary>
|
||||||
|
private static GalaxyDataValue ToWire(string reference, HistorianSample sample) => new()
|
||||||
|
{
|
||||||
|
TagReference = reference,
|
||||||
|
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
|
||||||
|
ValueMessagePackType = 0,
|
||||||
|
StatusCode = MapHistorianQualityToOpcUa(sample.Quality),
|
||||||
|
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
};
|
||||||
|
|
||||||
|
private static uint MapHistorianQualityToOpcUa(byte q)
|
||||||
|
{
|
||||||
|
// Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges.
|
||||||
|
// The Proxy may refine this when it decodes the wire frame.
|
||||||
|
if (q >= 192) return 0x00000000u; // Good
|
||||||
|
if (q >= 64) return 0x40000000u; // Uncertain
|
||||||
|
return 0x80000000u; // Bad
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maps a <see cref="HistorianAggregateSample"/> (one aggregate bucket) to the IPC wire
|
||||||
|
/// shape. A null <see cref="HistorianAggregateSample.Value"/> means the aggregate was
|
||||||
|
/// unavailable for the bucket — the Proxy translates that to OPC UA <c>BadNoData</c>.
|
||||||
|
/// </summary>
|
||||||
|
private static GalaxyDataValue ToWire(string reference, HistorianAggregateSample sample) => new()
|
||||||
|
{
|
||||||
|
TagReference = reference,
|
||||||
|
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value.Value),
|
||||||
|
ValueMessagePackType = 0,
|
||||||
|
StatusCode = sample.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u,
|
||||||
|
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
};
|
||||||
|
|
||||||
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
|
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
|
||||||
{
|
{
|
||||||
AttributeName = row.AttributeName,
|
AttributeName = row.AttributeName,
|
||||||
|
|||||||
@@ -15,6 +15,13 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
|
|||||||
private long _nextSessionId;
|
private long _nextSessionId;
|
||||||
private long _nextSubscriptionId;
|
private long _nextSubscriptionId;
|
||||||
|
|
||||||
|
// Stub backend never raises events — implements the interface members for symmetry.
|
||||||
|
#pragma warning disable CS0067
|
||||||
|
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||||
|
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||||
|
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||||
|
#pragma warning restore CS0067
|
||||||
|
|
||||||
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||||
{
|
{
|
||||||
var id = Interlocked.Increment(ref _nextSessionId);
|
var id = Interlocked.Increment(ref _nextSessionId);
|
||||||
@@ -78,6 +85,15 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
|
|||||||
Tags = System.Array.Empty<HistoryTagValues>(),
|
Tags = System.Array.Empty<HistoryTagValues>(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
public Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
|
||||||
|
HistoryReadProcessedRequest req, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new HistoryReadProcessedResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
|
||||||
|
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||||
|
});
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse
|
=> Task.FromResult(new RecycleStatusResponse
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -80,6 +80,13 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
|
|||||||
await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct);
|
await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
case MessageKind.HistoryReadProcessedRequest:
|
||||||
|
{
|
||||||
|
var resp = await backend.HistoryReadProcessedAsync(
|
||||||
|
Deserialize<HistoryReadProcessedRequest>(body), ct);
|
||||||
|
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
|
||||||
|
return;
|
||||||
|
}
|
||||||
case MessageKind.RecycleHostRequest:
|
case MessageKind.RecycleHostRequest:
|
||||||
{
|
{
|
||||||
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
|
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
|
||||||
@@ -99,9 +106,64 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscribes the backend's server-pushed events for the lifetime of the connection.
|
||||||
|
/// The returned disposable unsubscribes when the connection closes — without it the
|
||||||
|
/// backend's static event invocation list would accumulate dead writer references and
|
||||||
|
/// leak memory + raise <see cref="ObjectDisposedException"/> on every push.
|
||||||
|
/// </summary>
|
||||||
|
public IDisposable AttachConnection(FrameWriter writer)
|
||||||
|
{
|
||||||
|
var sink = new ConnectionSink(backend, writer, logger);
|
||||||
|
sink.Attach();
|
||||||
|
return sink;
|
||||||
|
}
|
||||||
|
|
||||||
private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
|
private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
|
||||||
|
|
||||||
private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct)
|
private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct)
|
||||||
=> writer.WriteAsync(MessageKind.ErrorResponse,
|
=> writer.WriteAsync(MessageKind.ErrorResponse,
|
||||||
new ErrorResponse { Code = code, Message = message }, ct);
|
new ErrorResponse { Code = code, Message = message }, ct);
|
||||||
|
|
||||||
|
private sealed class ConnectionSink : IDisposable
|
||||||
|
{
|
||||||
|
private readonly IGalaxyBackend _backend;
|
||||||
|
private readonly FrameWriter _writer;
|
||||||
|
private readonly ILogger _logger;
|
||||||
|
private EventHandler<OnDataChangeNotification>? _onData;
|
||||||
|
private EventHandler<GalaxyAlarmEvent>? _onAlarm;
|
||||||
|
private EventHandler<HostConnectivityStatus>? _onHost;
|
||||||
|
|
||||||
|
public ConnectionSink(IGalaxyBackend backend, FrameWriter writer, ILogger logger)
|
||||||
|
{
|
||||||
|
_backend = backend; _writer = writer; _logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Attach()
|
||||||
|
{
|
||||||
|
_onData = (_, e) => Push(MessageKind.OnDataChangeNotification, e);
|
||||||
|
_onAlarm = (_, e) => Push(MessageKind.AlarmEvent, e);
|
||||||
|
_onHost = (_, e) => Push(MessageKind.RuntimeStatusChange,
|
||||||
|
new RuntimeStatusChangeNotification { Status = e });
|
||||||
|
_backend.OnDataChange += _onData;
|
||||||
|
_backend.OnAlarmEvent += _onAlarm;
|
||||||
|
_backend.OnHostStatusChanged += _onHost;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void Push<T>(MessageKind kind, T payload)
|
||||||
|
{
|
||||||
|
// Fire-and-forget — pushes can race with disposal of the writer. We swallow
|
||||||
|
// ObjectDisposedException because the dispose path will detach this sink shortly.
|
||||||
|
try { _writer.WriteAsync(kind, payload, CancellationToken.None).GetAwaiter().GetResult(); }
|
||||||
|
catch (ObjectDisposedException) { }
|
||||||
|
catch (Exception ex) { _logger.Warning(ex, "ConnectionSink push failed for {Kind}", kind); }
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_onData is not null) _backend.OnDataChange -= _onData;
|
||||||
|
if (_onAlarm is not null) _backend.OnAlarmEvent -= _onAlarm;
|
||||||
|
if (_onHost is not null) _backend.OnHostStatusChanged -= _onHost;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -98,6 +98,8 @@ public sealed class PipeServer : IDisposable
|
|||||||
new HelloAck { Accepted = true, HostName = Environment.MachineName },
|
new HelloAck { Accepted = true, HostName = Environment.MachineName },
|
||||||
linked.Token).ConfigureAwait(false);
|
linked.Token).ConfigureAwait(false);
|
||||||
|
|
||||||
|
using var attachment = handler.AttachConnection(writer);
|
||||||
|
|
||||||
while (!linked.Token.IsCancellationRequested)
|
while (!linked.Token.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
|
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
|
||||||
@@ -157,4 +159,19 @@ public sealed class PipeServer : IDisposable
|
|||||||
public interface IFrameHandler
|
public interface IFrameHandler
|
||||||
{
|
{
|
||||||
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
|
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Called once per accepted connection after the Hello handshake. Lets the handler
|
||||||
|
/// attach server-pushed event sinks (data-change, alarm, host-status) to the
|
||||||
|
/// connection's <paramref name="writer"/>. Returns an <see cref="IDisposable"/> the
|
||||||
|
/// pipe server disposes when the connection closes — backends use it to unsubscribe.
|
||||||
|
/// Implementations that don't push events can return <see cref="NoopAttachment"/>.
|
||||||
|
/// </summary>
|
||||||
|
IDisposable AttachConnection(FrameWriter writer);
|
||||||
|
|
||||||
|
public sealed class NoopAttachment : IDisposable
|
||||||
|
{
|
||||||
|
public static readonly NoopAttachment Instance = new();
|
||||||
|
public void Dispose() { }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using System;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using MessagePack;
|
using MessagePack;
|
||||||
@@ -27,4 +28,6 @@ public sealed class StubFrameHandler : IFrameHandler
|
|||||||
new ErrorResponse { Code = "not-implemented", Message = $"Kind {kind} is stubbed — MXAccess lift deferred" },
|
new ErrorResponse { Code = "not-implemented", Message = $"Kind {kind} is stubbed — MXAccess lift deferred" },
|
||||||
ct);
|
ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IDisposable AttachConnection(FrameWriter writer) => IFrameHandler.NoopAttachment.Instance;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ using System.Threading;
|
|||||||
using Serilog;
|
using Serilog;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||||
@@ -66,9 +67,11 @@ public static class Program
|
|||||||
pump = new StaPump("Galaxy.Sta");
|
pump = new StaPump("Galaxy.Sta");
|
||||||
pump.WaitForStartedAsync().GetAwaiter().GetResult();
|
pump.WaitForStartedAsync().GetAwaiter().GetResult();
|
||||||
mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName);
|
mx = new MxAccessClient(pump, new MxProxyAdapter(), clientName);
|
||||||
|
var historian = BuildHistorianIfEnabled();
|
||||||
backend = new MxAccessGalaxyBackend(
|
backend = new MxAccessGalaxyBackend(
|
||||||
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }),
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = zbConn }),
|
||||||
mx);
|
mx,
|
||||||
|
historian);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -77,6 +80,7 @@ public static class Program
|
|||||||
try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); }
|
try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); }
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
(backend as IDisposable)?.Dispose();
|
||||||
mx?.Dispose();
|
mx?.Dispose();
|
||||||
pump?.Dispose();
|
pump?.Dispose();
|
||||||
}
|
}
|
||||||
@@ -91,4 +95,45 @@ public static class Program
|
|||||||
}
|
}
|
||||||
finally { Log.CloseAndFlush(); }
|
finally { Log.CloseAndFlush(); }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builds a <see cref="HistorianDataSource"/> from the OTOPCUA_HISTORIAN_* environment
|
||||||
|
/// variables the supervisor passes at spawn time. Returns null when the historian is
|
||||||
|
/// disabled (default) so <c>MxAccessGalaxyBackend.HistoryReadAsync</c> returns a clear
|
||||||
|
/// "not configured" error instead of attempting an SDK connection to localhost.
|
||||||
|
/// </summary>
|
||||||
|
private static IHistorianDataSource? BuildHistorianIfEnabled()
|
||||||
|
{
|
||||||
|
var enabled = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED");
|
||||||
|
if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase) && enabled != "1")
|
||||||
|
return null;
|
||||||
|
|
||||||
|
var cfg = new HistorianConfiguration
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
|
||||||
|
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
|
||||||
|
IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase),
|
||||||
|
UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"),
|
||||||
|
Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"),
|
||||||
|
CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30),
|
||||||
|
MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000),
|
||||||
|
FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60),
|
||||||
|
};
|
||||||
|
|
||||||
|
var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS");
|
||||||
|
if (!string.IsNullOrWhiteSpace(servers))
|
||||||
|
cfg.ServerNames = new System.Collections.Generic.List<string>(
|
||||||
|
servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
|
||||||
|
|
||||||
|
Log.Information("Historian enabled — {NodeCount} configured node(s), port={Port}",
|
||||||
|
cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port);
|
||||||
|
return new HistorianDataSource(cfg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int TryParseInt(string envName, int defaultValue)
|
||||||
|
{
|
||||||
|
var raw = Environment.GetEnvironmentVariable(envName);
|
||||||
|
return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,11 +30,43 @@
|
|||||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
|
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests"/>
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Reference Include="ArchestrA.MxAccess">
|
<Reference Include="ArchestrA.MxAccess">
|
||||||
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
|
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
|
||||||
<Private>true</Private>
|
<Private>true</Private>
|
||||||
</Reference>
|
</Reference>
|
||||||
|
<!-- Wonderware Historian SDK — consumed by Backend/Historian/ for HistoryReadAsync.
|
||||||
|
Previously lived in the v1 Historian.Aveva plugin; folded into Driver.Galaxy.Host
|
||||||
|
for PR #5 because this host is already Galaxy-specific. -->
|
||||||
|
<Reference Include="aahClientManaged">
|
||||||
|
<HintPath>..\..\lib\aahClientManaged.dll</HintPath>
|
||||||
|
<EmbedInteropTypes>false</EmbedInteropTypes>
|
||||||
|
</Reference>
|
||||||
|
<Reference Include="aahClientCommon">
|
||||||
|
<HintPath>..\..\lib\aahClientCommon.dll</HintPath>
|
||||||
|
<EmbedInteropTypes>false</EmbedInteropTypes>
|
||||||
|
</Reference>
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<!-- Historian SDK native and satellite DLLs — staged beside the host exe so the
|
||||||
|
aahClientManaged wrapper can P/Invoke into them without an AssemblyResolve hook. -->
|
||||||
|
<None Include="..\..\lib\aahClient.dll">
|
||||||
|
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||||
|
</None>
|
||||||
|
<None Include="..\..\lib\Historian.CBE.dll">
|
||||||
|
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||||
|
</None>
|
||||||
|
<None Include="..\..\lib\Historian.DPAPI.dll">
|
||||||
|
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||||
|
</None>
|
||||||
|
<None Include="..\..\lib\ArchestrA.CloudHistorian.Contract.dll">
|
||||||
|
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||||
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@@ -296,10 +296,50 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
|
|||||||
return new HistoryReadResult(samples, ContinuationPoint: null);
|
return new HistoryReadResult(samples, ContinuationPoint: null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<HistoryReadResult> ReadProcessedAsync(
|
public async Task<HistoryReadResult> ReadProcessedAsync(
|
||||||
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
||||||
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
||||||
=> throw new NotSupportedException("Galaxy historian processed reads are not supported in v2; use ReadRawAsync.");
|
{
|
||||||
|
var client = RequireClient();
|
||||||
|
var column = MapAggregateToColumn(aggregate);
|
||||||
|
|
||||||
|
var resp = await client.CallAsync<HistoryReadProcessedRequest, HistoryReadProcessedResponse>(
|
||||||
|
MessageKind.HistoryReadProcessedRequest,
|
||||||
|
new HistoryReadProcessedRequest
|
||||||
|
{
|
||||||
|
SessionId = _sessionId,
|
||||||
|
TagReference = fullReference,
|
||||||
|
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
IntervalMs = (long)interval.TotalMilliseconds,
|
||||||
|
AggregateColumn = column,
|
||||||
|
},
|
||||||
|
MessageKind.HistoryReadProcessedResponse,
|
||||||
|
cancellationToken);
|
||||||
|
|
||||||
|
if (!resp.Success)
|
||||||
|
throw new InvalidOperationException($"Galaxy.Host HistoryReadProcessed failed: {resp.Error}");
|
||||||
|
|
||||||
|
IReadOnlyList<DataValueSnapshot> samples = [.. resp.Values.Select(ToSnapshot)];
|
||||||
|
return new HistoryReadResult(samples, ContinuationPoint: null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
|
||||||
|
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.
|
||||||
|
/// Kept on the Proxy side so Galaxy.Host stays OPC-UA-free.
|
||||||
|
/// </summary>
|
||||||
|
internal static string MapAggregateToColumn(HistoryAggregateType aggregate) => aggregate switch
|
||||||
|
{
|
||||||
|
HistoryAggregateType.Average => "Average",
|
||||||
|
HistoryAggregateType.Minimum => "Minimum",
|
||||||
|
HistoryAggregateType.Maximum => "Maximum",
|
||||||
|
HistoryAggregateType.Count => "ValueCount",
|
||||||
|
HistoryAggregateType.Total => throw new NotSupportedException(
|
||||||
|
"HistoryAggregateType.Total is not supported by the Wonderware Historian AnalogSummary " +
|
||||||
|
"query — use Average × Count on the caller side, or switch to Average/Minimum/Maximum/Count."),
|
||||||
|
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
|
||||||
|
};
|
||||||
|
|
||||||
// ---- IRediscoverable ----
|
// ---- IRediscoverable ----
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,10 @@
|
|||||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
|
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests"/>
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||||
|
|||||||
@@ -50,6 +50,8 @@ public enum MessageKind : byte
|
|||||||
|
|
||||||
HistoryReadRequest = 0x60,
|
HistoryReadRequest = 0x60,
|
||||||
HistoryReadResponse = 0x61,
|
HistoryReadResponse = 0x61,
|
||||||
|
HistoryReadProcessedRequest = 0x62,
|
||||||
|
HistoryReadProcessedResponse = 0x63,
|
||||||
|
|
||||||
HostConnectivityStatus = 0x70,
|
HostConnectivityStatus = 0x70,
|
||||||
RuntimeStatusChange = 0x71,
|
RuntimeStatusChange = 0x71,
|
||||||
|
|||||||
@@ -26,3 +26,27 @@ public sealed class HistoryReadResponse
|
|||||||
[Key(1)] public string? Error { get; set; }
|
[Key(1)] public string? Error { get; set; }
|
||||||
[Key(2)] public HistoryTagValues[] Tags { get; set; } = System.Array.Empty<HistoryTagValues>();
|
[Key(2)] public HistoryTagValues[] Tags { get; set; } = System.Array.Empty<HistoryTagValues>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Processed (aggregated) historian read — OPC UA HistoryReadProcessed service. The
|
||||||
|
/// aggregate column is a string (e.g. "Average", "Minimum") mapped by the Proxy from the
|
||||||
|
/// OPC UA HistoryAggregateType enum so Galaxy.Host stays OPC-UA-free.
|
||||||
|
/// </summary>
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class HistoryReadProcessedRequest
|
||||||
|
{
|
||||||
|
[Key(0)] public long SessionId { get; set; }
|
||||||
|
[Key(1)] public string TagReference { get; set; } = string.Empty;
|
||||||
|
[Key(2)] public long StartUtcUnixMs { get; set; }
|
||||||
|
[Key(3)] public long EndUtcUnixMs { get; set; }
|
||||||
|
[Key(4)] public long IntervalMs { get; set; }
|
||||||
|
[Key(5)] public string AggregateColumn { get; set; } = "Average";
|
||||||
|
}
|
||||||
|
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class HistoryReadProcessedResponse
|
||||||
|
{
|
||||||
|
[Key(0)] public bool Success { get; set; }
|
||||||
|
[Key(1)] public string? Error { get; set; }
|
||||||
|
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,94 @@
|
|||||||
|
using System;
|
||||||
|
using System.Linq;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||||
|
{
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class HistorianClusterEndpointPickerTests
|
||||||
|
{
|
||||||
|
private static HistorianConfiguration Config(params string[] nodes) => new()
|
||||||
|
{
|
||||||
|
ServerName = "ignored",
|
||||||
|
ServerNames = nodes.ToList(),
|
||||||
|
FailureCooldownSeconds = 60,
|
||||||
|
};
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Single_node_config_falls_back_to_ServerName_when_ServerNames_empty()
|
||||||
|
{
|
||||||
|
var cfg = new HistorianConfiguration { ServerName = "only-node", ServerNames = new() };
|
||||||
|
var p = new HistorianClusterEndpointPicker(cfg);
|
||||||
|
p.NodeCount.ShouldBe(1);
|
||||||
|
p.GetHealthyNodes().ShouldBe(new[] { "only-node" });
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Failed_node_enters_cooldown_and_is_skipped()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
|
||||||
|
|
||||||
|
p.MarkFailed("a", "boom");
|
||||||
|
p.GetHealthyNodes().ShouldBe(new[] { "b" });
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Cooldown_expires_after_configured_window()
|
||||||
|
{
|
||||||
|
var clock = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => clock);
|
||||||
|
p.MarkFailed("a", "boom");
|
||||||
|
p.GetHealthyNodes().ShouldBe(new[] { "b" });
|
||||||
|
|
||||||
|
clock = clock.AddSeconds(61);
|
||||||
|
p.GetHealthyNodes().ShouldBe(new[] { "a", "b" });
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MarkHealthy_immediately_clears_cooldown()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
|
||||||
|
p.MarkFailed("a", "boom");
|
||||||
|
p.GetHealthyNodes().ShouldBeEmpty();
|
||||||
|
p.MarkHealthy("a");
|
||||||
|
p.GetHealthyNodes().ShouldBe(new[] { "a" });
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void All_nodes_in_cooldown_returns_empty_healthy_list()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var p = new HistorianClusterEndpointPicker(Config("a", "b"), () => now);
|
||||||
|
p.MarkFailed("a", "x");
|
||||||
|
p.MarkFailed("b", "y");
|
||||||
|
p.GetHealthyNodes().ShouldBeEmpty();
|
||||||
|
p.NodeCount.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Snapshot_reports_failure_count_and_last_error()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var p = new HistorianClusterEndpointPicker(Config("a"), () => now);
|
||||||
|
p.MarkFailed("a", "first");
|
||||||
|
p.MarkFailed("a", "second");
|
||||||
|
|
||||||
|
var snap = p.SnapshotNodeStates().Single();
|
||||||
|
snap.FailureCount.ShouldBe(2);
|
||||||
|
snap.LastError.ShouldBe("second");
|
||||||
|
snap.IsHealthy.ShouldBeFalse();
|
||||||
|
snap.CooldownUntil.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Duplicate_hostnames_are_deduplicated_case_insensitively()
|
||||||
|
{
|
||||||
|
var p = new HistorianClusterEndpointPicker(Config("NodeA", "nodea", "NodeB"));
|
||||||
|
p.NodeCount.ShouldBe(2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,109 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||||
|
{
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class HistorianWiringTests
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// When the Proxy sends a HistoryRead but the supervisor never enabled the historian
|
||||||
|
/// (OTOPCUA_HISTORIAN_ENABLED unset), we expect a clean Success=false with a
|
||||||
|
/// self-explanatory error — not an exception or a hang against localhost.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task HistoryReadAsync_returns_disabled_error_when_no_historian_configured()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
|
||||||
|
using var backend = new MxAccessGalaxyBackend(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
mx,
|
||||||
|
historian: null);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadAsync(new HistoryReadRequest
|
||||||
|
{
|
||||||
|
TagReferences = new[] { "TestTag" },
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
MaxValuesPerTag = 100,
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeFalse();
|
||||||
|
resp.Error.ShouldContain("Historian disabled");
|
||||||
|
resp.Tags.ShouldBeEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When the historian is wired up, we expect the backend to call through and map
|
||||||
|
/// samples onto the IPC wire shape. Uses a fake <see cref="IHistorianDataSource"/>
|
||||||
|
/// that returns a single known-good sample so we can assert the mapping stays sane.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task HistoryReadAsync_maps_sample_to_GalaxyDataValue()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "HistorianWiringTests");
|
||||||
|
var fake = new FakeHistorianDataSource(new HistorianSample
|
||||||
|
{
|
||||||
|
Value = 42.5,
|
||||||
|
Quality = 192, // Good
|
||||||
|
TimestampUtc = new DateTime(2026, 4, 18, 9, 0, 0, DateTimeKind.Utc),
|
||||||
|
});
|
||||||
|
using var backend = new MxAccessGalaxyBackend(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
mx,
|
||||||
|
fake);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadAsync(new HistoryReadRequest
|
||||||
|
{
|
||||||
|
TagReferences = new[] { "TankLevel" },
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
MaxValuesPerTag = 100,
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeTrue();
|
||||||
|
resp.Tags.Length.ShouldBe(1);
|
||||||
|
resp.Tags[0].TagReference.ShouldBe("TankLevel");
|
||||||
|
resp.Tags[0].Values.Length.ShouldBe(1);
|
||||||
|
resp.Tags[0].Values[0].StatusCode.ShouldBe(0u); // Good
|
||||||
|
resp.Tags[0].Values[0].ValueBytes.ShouldNotBeNull();
|
||||||
|
resp.Tags[0].Values[0].SourceTimestampUtcUnixMs.ShouldBe(
|
||||||
|
new DateTimeOffset(2026, 4, 18, 9, 0, 0, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeHistorianDataSource : IHistorianDataSource
|
||||||
|
{
|
||||||
|
private readonly HistorianSample _sample;
|
||||||
|
public FakeHistorianDataSource(HistorianSample sample) => _sample = sample;
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadRawAsync(string tagName, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample> { _sample });
|
||||||
|
|
||||||
|
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tagName, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianAggregateSample>());
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadAtTimeAsync(string tagName, DateTime[] ts, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample>());
|
||||||
|
|
||||||
|
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianEventDto>());
|
||||||
|
|
||||||
|
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
||||||
|
public void Dispose() { }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,158 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MessagePack;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class HistoryReadProcessedTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task ReturnsDisabledError_When_NoHistorianConfigured()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
|
||||||
|
using var backend = new MxAccessGalaxyBackend(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
mx,
|
||||||
|
historian: null);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
|
||||||
|
{
|
||||||
|
TagReference = "T",
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
IntervalMs = 1000,
|
||||||
|
AggregateColumn = "Average",
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeFalse();
|
||||||
|
resp.Error.ShouldContain("Historian disabled");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Rejects_NonPositiveInterval()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
|
||||||
|
var fake = new FakeHistorianDataSource();
|
||||||
|
using var backend = new MxAccessGalaxyBackend(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
mx,
|
||||||
|
fake);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
|
||||||
|
{
|
||||||
|
TagReference = "T",
|
||||||
|
IntervalMs = 0,
|
||||||
|
AggregateColumn = "Average",
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeFalse();
|
||||||
|
resp.Error.ShouldContain("IntervalMs");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Maps_AggregateSample_With_Value_To_Good()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
|
||||||
|
var fake = new FakeHistorianDataSource(new HistorianAggregateSample
|
||||||
|
{
|
||||||
|
Value = 12.34,
|
||||||
|
TimestampUtc = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc),
|
||||||
|
});
|
||||||
|
using var backend = new MxAccessGalaxyBackend(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
mx,
|
||||||
|
fake);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
|
||||||
|
{
|
||||||
|
TagReference = "T",
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
IntervalMs = 60_000,
|
||||||
|
AggregateColumn = "Average",
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeTrue();
|
||||||
|
resp.Values.Length.ShouldBe(1);
|
||||||
|
resp.Values[0].StatusCode.ShouldBe(0u); // Good
|
||||||
|
resp.Values[0].ValueBytes.ShouldNotBeNull();
|
||||||
|
MessagePackSerializer.Deserialize<double>(resp.Values[0].ValueBytes!).ShouldBe(12.34);
|
||||||
|
fake.LastAggregateColumn.ShouldBe("Average");
|
||||||
|
fake.LastIntervalMs.ShouldBe(60_000d);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Maps_Null_Bucket_To_BadNoData()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
|
||||||
|
var fake = new FakeHistorianDataSource(new HistorianAggregateSample
|
||||||
|
{
|
||||||
|
Value = null,
|
||||||
|
TimestampUtc = DateTime.UtcNow,
|
||||||
|
});
|
||||||
|
using var backend = new MxAccessGalaxyBackend(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
mx,
|
||||||
|
fake);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
|
||||||
|
{
|
||||||
|
TagReference = "T",
|
||||||
|
IntervalMs = 1000,
|
||||||
|
AggregateColumn = "Minimum",
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeTrue();
|
||||||
|
resp.Values.Length.ShouldBe(1);
|
||||||
|
resp.Values[0].StatusCode.ShouldBe(0x800E0000u); // BadNoData
|
||||||
|
resp.Values[0].ValueBytes.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeHistorianDataSource : IHistorianDataSource
|
||||||
|
{
|
||||||
|
private readonly HistorianAggregateSample[] _samples;
|
||||||
|
public string? LastAggregateColumn { get; private set; }
|
||||||
|
public double LastIntervalMs { get; private set; }
|
||||||
|
|
||||||
|
public FakeHistorianDataSource(params HistorianAggregateSample[] samples) => _samples = samples;
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample>());
|
||||||
|
|
||||||
|
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(
|
||||||
|
string tag, DateTime s, DateTime e, double intervalMs, string col, CancellationToken ct)
|
||||||
|
{
|
||||||
|
LastAggregateColumn = col;
|
||||||
|
LastIntervalMs = intervalMs;
|
||||||
|
return Task.FromResult(new List<HistorianAggregateSample>(_samples));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample>());
|
||||||
|
|
||||||
|
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianEventDto>());
|
||||||
|
|
||||||
|
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
||||||
|
public void Dispose() { }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class AggregateColumnMappingTests
|
||||||
|
{
|
||||||
|
[Theory]
|
||||||
|
[InlineData(HistoryAggregateType.Average, "Average")]
|
||||||
|
[InlineData(HistoryAggregateType.Minimum, "Minimum")]
|
||||||
|
[InlineData(HistoryAggregateType.Maximum, "Maximum")]
|
||||||
|
[InlineData(HistoryAggregateType.Count, "ValueCount")]
|
||||||
|
public void Maps_OpcUa_enum_to_AnalogSummary_column(HistoryAggregateType aggregate, string expected)
|
||||||
|
{
|
||||||
|
GalaxyProxyDriver.MapAggregateToColumn(aggregate).ShouldBe(expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Total_is_not_supported()
|
||||||
|
{
|
||||||
|
Should.Throw<System.NotSupportedException>(
|
||||||
|
() => GalaxyProxyDriver.MapAggregateToColumn(HistoryAggregateType.Total));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user