v2 mxgw migration — Phase 1+2+3.1 wiring (7 PRs)

Foundational PRs from lmx_mxgw_impl.md, all green. Bodies only — DI/wiring
deferred to PR 1+2.W (combined wire-up) and PR 3.W.

PR 1.1 — IHistorianDataSource lifted to Core.Abstractions/Historian/
  Reuses existing DataValueSnapshot + HistoricalEvent shapes; sidecar (PR
  3.4) translates byte-quality → uint StatusCode internally.

PR 1.2 — IHistoryRouter + HistoryRouter on the server
  Longest-prefix-match resolution, case-insensitive, ObjectDisposed-guarded,
  swallow-on-shutdown disposal of misbehaving sources.

PR 1.3 — DriverNodeManager.HistoryRead* dispatch through IHistoryRouter
  Per-tag resolution with LegacyDriverHistoryAdapter wrapping
  `_driver as IHistoryProvider` so existing tests + drivers keep working
  until PR 7.2 retires the fallback.

PR 2.1 — AlarmConditionInfo extended with five sub-attribute refs
  InAlarmRef / PriorityRef / DescAttrNameRef / AckedRef / AckMsgWriteRef.
  Optional defaulted parameters preserve all existing 3-arg call sites.

PR 2.2 — AlarmConditionService state machine in Server/Alarms/
  Driver-agnostic port of GalaxyAlarmTracker. Sub-attribute refs come from
  AlarmConditionInfo, values arrive as DataValueSnapshot, ack writes route
  through IAlarmAcknowledger. State machine preserves Active/Acknowledged/
  Inactive transitions, Acked-on-active reset, post-disposal silence.

PR 2.3 — DriverNodeManager wires AlarmConditionService
  MarkAsAlarmCondition registers each alarm-bearing variable with the
  service; DriverWritableAcknowledger routes ack-message writes through
  the driver's IWritable + CapabilityInvoker. Service-raised transitions
  route via OnAlarmServiceTransition → matching ConditionSink. Legacy
  IAlarmSource path unchanged for null service.

PR 3.1 — Driver.Historian.Wonderware shell project (net48 x86)
  Console host shell + smoke test; SDK references + code lift come in
  PR 3.2.

Tests: 9 (PR 1.1) + 5 (PR 2.1) + 10 (PR 1.2) + 19 (PR 2.2) + 1 (PR 3.1)
all pass. Existing AlarmSubscribeIntegrationTests + HistoryReadIntegrationTests
unchanged.

Plan + audit docs (lmx_backend.md, lmx_mxgw.md, lmx_mxgw_impl.md)
included so parallel subagent worktrees can read them.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-29 14:03:36 -04:00
parent 012c42a846
commit ef22a61c39
21 changed files with 3553 additions and 70 deletions

View File

@@ -0,0 +1,19 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <summary>
/// Point-in-time state of a single historian cluster node, included inside
/// <see cref="HistorianHealthSnapshot.Nodes"/> when the backend is clustered.
/// </summary>
/// <param name="Name">Node identifier — backend-specific (typically a hostname).</param>
/// <param name="IsHealthy">True when the node is currently considered usable for reads.</param>
/// <param name="CooldownUntil">When the next retry against an unhealthy node is allowed; null when no cooldown is active.</param>
/// <param name="FailureCount">Consecutive failures observed against this node since the last success.</param>
/// <param name="LastError">Diagnostic text from the last failure against this node; null when no failures.</param>
/// <param name="LastFailureTime">UTC of the last failure against this node; null when no failures.</param>
public sealed record HistorianClusterNodeState(
string Name,
bool IsHealthy,
DateTime? CooldownUntil,
int FailureCount,
string? LastError,
DateTime? LastFailureTime);

View File

@@ -0,0 +1,32 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <summary>
/// Point-in-time runtime health of a historian data source. Returned by
/// <see cref="IHistorianDataSource.GetHealthSnapshot"/> and projected onto the
/// server status dashboard.
/// </summary>
/// <param name="TotalQueries">Lifetime count of read calls received.</param>
/// <param name="TotalSuccesses">Subset of <paramref name="TotalQueries"/> that completed without error.</param>
/// <param name="TotalFailures">Subset of <paramref name="TotalQueries"/> that ended in error.</param>
/// <param name="ConsecutiveFailures">Failures since the last success — non-zero means the source is currently degraded.</param>
/// <param name="LastSuccessTime">UTC of the most recent successful read; null if none yet.</param>
/// <param name="LastFailureTime">UTC of the most recent failed read; null if none yet.</param>
/// <param name="LastError">Diagnostic text from the most recent failure; null when no failures recorded.</param>
/// <param name="ProcessConnectionOpen">True when the source's process-data connection is currently established.</param>
/// <param name="EventConnectionOpen">True when the source's event-data connection is currently established. Some backends share one connection — implementations may report the same value here as <paramref name="ProcessConnectionOpen"/>.</param>
/// <param name="ActiveProcessNode">Cluster node currently serving process reads; null when no node is active or the backend is non-clustered.</param>
/// <param name="ActiveEventNode">Cluster node currently serving event reads; null when no node is active or the backend is non-clustered.</param>
/// <param name="Nodes">Per-cluster-node state. Empty when the backend is non-clustered.</param>
public sealed record HistorianHealthSnapshot(
long TotalQueries,
long TotalSuccesses,
long TotalFailures,
int ConsecutiveFailures,
DateTime? LastSuccessTime,
DateTime? LastFailureTime,
string? LastError,
bool ProcessConnectionOpen,
bool EventConnectionOpen,
string? ActiveProcessNode,
string? ActiveEventNode,
IReadOnlyList<HistorianClusterNodeState> Nodes);

View File

@@ -0,0 +1,74 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <summary>
/// Server-side historian data source. Registered with the server's history router
/// and resolved per OPC UA namespace, independent of any driver's lifecycle.
/// </summary>
/// <remarks>
/// Distinct from <see cref="IHistoryProvider"/>:
/// <list type="bullet">
/// <item><see cref="IHistoryProvider"/> is a *driver capability* — the server
/// dispatches to it via the driver instance.</item>
/// <item><see cref="IHistorianDataSource"/> is a *server registration* — the
/// server resolves it via namespace and calls it directly, so a single
/// historian (e.g. Wonderware) can serve many drivers' nodes, and drivers can
/// restart without dropping history availability.</item>
/// </list>
/// All values returned use the shared <see cref="DataValueSnapshot"/> /
/// <see cref="HistoricalEvent"/> shapes; backend-specific quality / type encodings
/// are translated to OPC UA <c>StatusCode</c> uints inside the data source.
/// </remarks>
public interface IHistorianDataSource : IDisposable
{
/// <summary>
/// Read raw historical samples for a single tag over a time range.
/// </summary>
Task<HistoryReadResult> ReadRawAsync(
string fullReference,
DateTime startUtc,
DateTime endUtc,
uint maxValuesPerNode,
CancellationToken cancellationToken);
/// <summary>
/// Read processed (interval-bucketed) samples — average / min / max / count / etc.
/// A bucket with no source data returns a sample whose
/// <see cref="DataValueSnapshot.StatusCode"/> indicates BadNoData.
/// </summary>
Task<HistoryReadResult> ReadProcessedAsync(
string fullReference,
DateTime startUtc,
DateTime endUtc,
TimeSpan interval,
HistoryAggregateType aggregate,
CancellationToken cancellationToken);
/// <summary>
/// Read one sample per requested timestamp — OPC UA HistoryReadAtTime service.
/// Implementations interpolate or return prior-boundary samples per their
/// backend's policy. The returned list MUST be the same length and order as
/// <paramref name="timestampsUtc"/>; gaps are returned as Bad-quality snapshots.
/// </summary>
Task<HistoryReadResult> ReadAtTimeAsync(
string fullReference,
IReadOnlyList<DateTime> timestampsUtc,
CancellationToken cancellationToken);
/// <summary>
/// Read historical alarm / event records — OPC UA HistoryReadEvents service.
/// Distinct from any live event stream; sources here come from the historian's
/// event log. <paramref name="sourceName"/> is null to return all sources.
/// </summary>
Task<HistoricalEventsResult> ReadEventsAsync(
string? sourceName,
DateTime startUtc,
DateTime endUtc,
int maxEvents,
CancellationToken cancellationToken);
/// <summary>
/// Point-in-time health snapshot for diagnostics and dashboards. Pure
/// observation; never blocks on backend I/O.
/// </summary>
HistorianHealthSnapshot GetHealthSnapshot();
}

View File

@@ -62,10 +62,41 @@ public interface IVariableHandle
/// <param name="SourceName">Human-readable alarm name used for the <c>SourceName</c> event field.</param>
/// <param name="InitialSeverity">Severity at address-space build time; updates arrive via <see cref="IAlarmConditionSink"/>.</param>
/// <param name="InitialDescription">Initial description; updates arrive via <see cref="IAlarmConditionSink"/>.</param>
/// <param name="InAlarmRef">
/// Driver-side full reference for the boolean attribute that toggles when the
/// alarm condition becomes active. Consumed by the server-level alarm-condition
/// service to subscribe to active/inactive transitions. Null when the driver
/// reports alarm transitions through some other channel.
/// </param>
/// <param name="PriorityRef">
/// Driver-side full reference for the integer attribute carrying the alarm's
/// current priority / severity. Live updates flow through the same subscription
/// pipeline as <paramref name="InAlarmRef"/>. Null when the driver does not
/// expose live priority changes.
/// </param>
/// <param name="DescAttrNameRef">
/// Driver-side full reference for the string attribute carrying the human-readable
/// description / message. Null when the driver does not expose a live description.
/// </param>
/// <param name="AckedRef">
/// Driver-side full reference for the boolean attribute that toggles when the
/// alarm is acknowledged. Null when acknowledgement is not observable on the
/// driver side.
/// </param>
/// <param name="AckMsgWriteRef">
/// Driver-side full reference the server writes to acknowledge the condition,
/// typically the alarm's <c>.AckMsg</c> attribute. Null when the driver does not
/// accept acknowledgement writes (or routes them through a separate API).
/// </param>
public sealed record AlarmConditionInfo(
string SourceName,
AlarmSeverity InitialSeverity,
string? InitialDescription);
string? InitialDescription,
string? InAlarmRef = null,
string? PriorityRef = null,
string? DescAttrNameRef = null,
string? AckedRef = null,
string? AckMsgWriteRef = null);
/// <summary>
/// Sink a concrete address-space builder returns from <see cref="IVariableHandle.MarkAsAlarmCondition"/>.

View File

@@ -0,0 +1,58 @@
using System;
using System.Threading;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware;
/// <summary>
/// Entry point for the Wonderware Historian sidecar host. PR 3.1 only scaffolds the
/// console host shell — pipe server wiring and SDK access are added in PR 3.3 and
/// PR 3.2 respectively. The host reads the pipe name, allowed-SID, and shared secret
/// from environment variables (passed by the supervisor at spawn time per
/// <c>driver-stability.md</c>) and validates them up front so misconfiguration fails
/// loudly rather than silently degrading.
/// </summary>
public static class Program
{
public static int Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.WriteTo.File(
@"%ProgramData%\OtOpcUa\historian-wonderware-.log".Replace("%ProgramData%", Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData)),
rollingInterval: RollingInterval.Day)
.CreateLogger();
try
{
var pipeName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PIPE")
?? throw new InvalidOperationException("OTOPCUA_HISTORIAN_PIPE not set — supervisor must pass the sidecar pipe name");
var allowedSidValue = Environment.GetEnvironmentVariable("OTOPCUA_ALLOWED_SID")
?? throw new InvalidOperationException("OTOPCUA_ALLOWED_SID not set — supervisor must pass the server principal SID");
var sharedSecret = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SECRET")
?? throw new InvalidOperationException("OTOPCUA_HISTORIAN_SECRET not set — supervisor must pass the per-process secret at spawn time");
// Touch the secret so a future trim/AOT pass cannot strip the read; the value is
// consumed for real in PR 3.3 when the pipe handshake is wired in.
_ = sharedSecret.Length;
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
Log.Information("Wonderware historian sidecar starting — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue);
// PR 3.1 has no pipe server yet. Block until Ctrl-C so NSSM/the supervisor sees a
// long-running process and the smoke harness can exercise the host lifecycle.
cts.Token.WaitHandle.WaitOne();
Log.Information("Wonderware historian sidecar stopping cleanly");
return 0;
}
catch (Exception ex)
{
Log.Fatal(ex, "Wonderware historian sidecar fatal");
return 2;
}
finally { Log.CloseAndFlush(); }
}
}

View File

@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net48</TargetFramework>
<!-- x86 to match the in-process bitness expectations of the Wonderware Historian SDK
that PR 3.2 will lift in. Mirrors Driver.Galaxy.Host's bitness for consistency. -->
<PlatformTarget>x86</PlatformTarget>
<Prefer32Bit>true</Prefer32Bit>
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<NoWarn>$(NoWarn);CS1591</NoWarn>
<RootNamespace>ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware</RootNamespace>
<AssemblyName>OtOpcUa.Driver.Historian.Wonderware</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Serilog" Version="4.2.0"/>
<PackageReference Include="Serilog.Sinks.File" Version="7.0.0"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests"/>
</ItemGroup>
</Project>

View File

@@ -0,0 +1,289 @@
using System.Collections.Concurrent;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Server.Alarms;
/// <summary>
/// Server-level alarm-condition state machine. Tracks one entry per registered
/// condition; consumes value changes from the four sub-attribute references in
/// <see cref="AlarmConditionInfo"/> (InAlarm / Priority / Description / Acked) and
/// raises <see cref="TransitionRaised"/> on Active / Acknowledged / Inactive
/// transitions per OPC UA Part 9 (simplified). Operator acknowledgement routes
/// through <see cref="IAlarmAcknowledger"/> against <c>AckMsgWriteRef</c>.
/// </summary>
/// <remarks>
/// This is the driver-agnostic replacement for <c>GalaxyAlarmTracker</c>. The
/// service does not own subscription lifecycle — PR 2.3 will wire DriverNodeManager
/// to subscribe through the driver's <c>ISubscribable</c> and forward value changes
/// here via <see cref="OnValueChanged"/>. Keeping the service free of subscription
/// plumbing makes it trivially testable and lets future drivers feed it from any
/// value source (in-process, gRPC, named pipe).
/// </remarks>
public sealed class AlarmConditionService : IDisposable
{
private readonly Func<DateTime> _clock;
// ConditionId → state.
private readonly ConcurrentDictionary<string, AlarmConditionState> _conditions =
new(StringComparer.OrdinalIgnoreCase);
// Sub-attribute full ref → (conditionId, which field). Multiple conditions may
// observe the same sub-attribute (rare but legal); the value is a list to support
// fan-out on a single value change.
private readonly ConcurrentDictionary<string, List<(string ConditionId, AlarmField Field)>> _refToCondition =
new(StringComparer.OrdinalIgnoreCase);
private readonly object _refMapLock = new();
private bool _disposed;
/// <summary>
/// Fired when a registered condition transitions Active / Acknowledged / Inactive.
/// Handlers must be cheap; the event is raised on whatever thread feeds
/// <see cref="OnValueChanged"/> and blocks the value-change pipeline.
/// </summary>
public event EventHandler<AlarmConditionTransition>? TransitionRaised;
public AlarmConditionService() : this(() => DateTime.UtcNow) { }
/// <summary>Test seam — inject a fixed clock for deterministic transition timestamps.</summary>
internal AlarmConditionService(Func<DateTime> clock)
{
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
}
/// <summary>Number of currently tracked conditions. Diagnostic only.</summary>
public int TrackedCount => _conditions.Count;
/// <summary>
/// Register a condition. Idempotent — repeat calls for the same
/// <paramref name="conditionId"/> are a no-op. The acker is captured for the
/// condition's lifetime; pass null when the driver does not accept acks.
/// </summary>
public void Track(string conditionId, AlarmConditionInfo info, IAlarmAcknowledger? acker = null)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentException.ThrowIfNullOrWhiteSpace(conditionId);
ArgumentNullException.ThrowIfNull(info);
var state = new AlarmConditionState(conditionId, info, acker);
if (!_conditions.TryAdd(conditionId, state)) return;
lock (_refMapLock)
{
AddRefMapping(info.InAlarmRef, conditionId, AlarmField.InAlarm);
AddRefMapping(info.PriorityRef, conditionId, AlarmField.Priority);
AddRefMapping(info.DescAttrNameRef, conditionId, AlarmField.DescAttrName);
AddRefMapping(info.AckedRef, conditionId, AlarmField.Acked);
}
}
/// <summary>Deregister a condition. No-op when not tracked.</summary>
public void Untrack(string conditionId)
{
if (_disposed) return;
if (!_conditions.TryRemove(conditionId, out var state)) return;
lock (_refMapLock)
{
RemoveRefMapping(state.Info.InAlarmRef, conditionId);
RemoveRefMapping(state.Info.PriorityRef, conditionId);
RemoveRefMapping(state.Info.DescAttrNameRef, conditionId);
RemoveRefMapping(state.Info.AckedRef, conditionId);
}
}
/// <summary>
/// Returns the set of sub-attribute references the service currently needs
/// subscribed. Callers wire one subscription per ref through the driver's
/// <see cref="ISubscribable"/>; PR 2.3 owns that wiring.
/// </summary>
public IReadOnlyCollection<string> GetSubscribedReferences()
{
lock (_refMapLock) return [.. _refToCondition.Keys];
}
/// <summary>
/// Operator acknowledgement entry point. Returns false when the condition is
/// not tracked, the condition has no acker registered, the condition has no
/// <c>AckMsgWriteRef</c>, or the acker reports the write failed.
/// </summary>
public Task<bool> AcknowledgeAsync(string conditionId, string comment, CancellationToken cancellationToken = default)
{
if (_disposed || !_conditions.TryGetValue(conditionId, out var state))
return Task.FromResult(false);
if (state.Acker is null || string.IsNullOrEmpty(state.Info.AckMsgWriteRef))
return Task.FromResult(false);
return state.Acker.WriteAckMessageAsync(state.Info.AckMsgWriteRef, comment ?? string.Empty, cancellationToken);
}
/// <summary>
/// Snapshot every tracked condition's current state. Diagnostic / dashboard use only.
/// </summary>
public IReadOnlyList<AlarmConditionSnapshot> Snapshot()
{
return [.. _conditions.Values.Select(s =>
{
lock (s.Lock)
return new AlarmConditionSnapshot(s.ConditionId, s.InAlarm, s.Acked, s.Priority, s.Description);
})];
}
/// <summary>
/// Feed a value change for one of the registered sub-attribute references.
/// The service runs the state machine and raises <see cref="TransitionRaised"/>
/// when the change produces a lifecycle transition. Unknown references are
/// silently dropped — the caller may register and unregister concurrently with
/// value-change delivery, and a stale callback for a recently-untracked
/// condition must not throw.
/// </summary>
public void OnValueChanged(string fullReference, DataValueSnapshot value)
{
if (_disposed) return;
if (string.IsNullOrEmpty(fullReference)) return;
List<(string ConditionId, AlarmField Field)>? targets;
lock (_refMapLock)
{
if (!_refToCondition.TryGetValue(fullReference, out targets) || targets.Count == 0) return;
// Snapshot under lock; the state machine runs outside.
targets = [.. targets];
}
var now = _clock();
foreach (var (conditionId, field) in targets)
{
if (!_conditions.TryGetValue(conditionId, out var state)) continue;
AlarmConditionTransition? transition = null;
lock (state.Lock)
{
transition = ApplyValue(state, field, value, now);
}
if (transition is { } t)
{
TransitionRaised?.Invoke(this, t);
}
}
}
/// <summary>
/// Apply one value change to one condition. Returns a transition when the
/// change crosses a state boundary; null otherwise. Caller holds <c>state.Lock</c>.
/// </summary>
private static AlarmConditionTransition? ApplyValue(
AlarmConditionState state, AlarmField field, DataValueSnapshot value, DateTime now)
{
AlarmConditionTransition? transition = null;
state.LastUpdateUtc = now;
switch (field)
{
case AlarmField.InAlarm:
{
var wasActive = state.InAlarm;
var isActive = value.Value is bool b && b;
state.InAlarm = isActive;
if (!wasActive && isActive)
{
// Reset Acked on every active transition so a re-alarm requires fresh ack.
state.Acked = false;
transition = new AlarmConditionTransition(
state.ConditionId, AlarmStateTransition.Active,
state.Priority, state.Description, now);
}
else if (wasActive && !isActive)
{
transition = new AlarmConditionTransition(
state.ConditionId, AlarmStateTransition.Inactive,
state.Priority, state.Description, now);
}
break;
}
case AlarmField.Priority:
state.Priority = CoercePriority(value.Value, state.Priority);
break;
case AlarmField.DescAttrName:
state.Description = value.Value as string;
break;
case AlarmField.Acked:
{
var wasAcked = state.Acked;
var isAcked = value.Value is bool b && b;
state.Acked = isAcked;
// Only fire Acknowledged on false → true while still active. The first
// post-Track callback often arrives with isAcked == wasAcked (state starts
// Acked=true so an initially-quiet alarm doesn't misfire).
if (!wasAcked && isAcked && state.InAlarm)
{
transition = new AlarmConditionTransition(
state.ConditionId, AlarmStateTransition.Acknowledged,
state.Priority, state.Description, now);
}
break;
}
}
return transition;
}
private static int CoercePriority(object? raw, int fallback) => raw switch
{
int i => i,
short s => s,
long l when l <= int.MaxValue => (int)l,
byte b => b,
ushort us => us,
uint ui when ui <= int.MaxValue => (int)ui,
_ => fallback,
};
private void AddRefMapping(string? fullRef, string conditionId, AlarmField field)
{
if (string.IsNullOrEmpty(fullRef)) return;
if (!_refToCondition.TryGetValue(fullRef, out var list))
{
list = [];
_refToCondition[fullRef] = list;
}
list.Add((conditionId, field));
}
private void RemoveRefMapping(string? fullRef, string conditionId)
{
if (string.IsNullOrEmpty(fullRef)) return;
if (!_refToCondition.TryGetValue(fullRef, out var list)) return;
list.RemoveAll(t => string.Equals(t.ConditionId, conditionId, StringComparison.OrdinalIgnoreCase));
if (list.Count == 0) _refToCondition.TryRemove(fullRef, out _);
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_conditions.Clear();
lock (_refMapLock) _refToCondition.Clear();
}
private enum AlarmField { InAlarm, Priority, DescAttrName, Acked }
/// <summary>Per-condition mutable state. Access guarded by <see cref="Lock"/>.</summary>
private sealed class AlarmConditionState(string conditionId, AlarmConditionInfo info, IAlarmAcknowledger? acker)
{
public readonly object Lock = new();
public string ConditionId { get; } = conditionId;
public AlarmConditionInfo Info { get; } = info;
public IAlarmAcknowledger? Acker { get; } = acker;
public bool InAlarm;
// Default Acked=true so the first post-Track callback (.Acked=true on a quiet
// alarm) doesn't misfire as a transition. Active sets it back to false.
public bool Acked = true;
public int Priority;
public string? Description;
public DateTime LastUpdateUtc;
}
}

View File

@@ -0,0 +1,44 @@
namespace ZB.MOM.WW.OtOpcUa.Server.Alarms;
/// <summary>
/// Lifecycle transition for an alarm condition. Mirrors OPC UA Part 9 alarm states
/// simplified to the active / acknowledged / inactive triplet that every driver in
/// the repo exposes today.
/// </summary>
public enum AlarmStateTransition
{
/// <summary>InAlarm flipped false → true. Default to unacknowledged.</summary>
Active,
/// <summary>Acked flipped false → true while the alarm is still active.</summary>
Acknowledged,
/// <summary>InAlarm flipped true → false.</summary>
Inactive,
}
/// <summary>
/// One alarm-state transition raised by <see cref="AlarmConditionService.TransitionRaised"/>.
/// </summary>
/// <param name="ConditionId">Stable identifier the caller registered the condition under (typically the driver's alarm full reference).</param>
/// <param name="Transition">Which state the alarm transitioned to.</param>
/// <param name="Priority">Latest known priority. 0 when no priority sub-attribute was registered or no value has been observed yet.</param>
/// <param name="Description">Latest known description text; null when not registered or not yet observed.</param>
/// <param name="AtUtc">Server-clock UTC of the value change that produced this transition.</param>
public sealed record AlarmConditionTransition(
string ConditionId,
AlarmStateTransition Transition,
int Priority,
string? Description,
DateTime AtUtc);
/// <summary>
/// Read-only snapshot of an alarm condition's current state. Used for diagnostics
/// and dashboards; not part of the live transition stream.
/// </summary>
public sealed record AlarmConditionSnapshot(
string ConditionId,
bool InAlarm,
bool Acked,
int Priority,
string? Description);

View File

@@ -0,0 +1,23 @@
namespace ZB.MOM.WW.OtOpcUa.Server.Alarms;
/// <summary>
/// Strategy for routing operator acknowledgement writes back to the underlying driver.
/// Decouples <see cref="AlarmConditionService"/> from any specific driver's write API
/// so the service can be tested without a real driver and reused across drivers with
/// different write paths.
/// </summary>
/// <remarks>
/// PR 2.3 supplies a default implementation that writes through the driver's
/// <c>IWritable.WriteAsync</c> using the <c>AckMsgWriteRef</c> from
/// <c>AlarmConditionInfo</c>. Drivers that route acks differently (e.g. a dedicated
/// RPC) can supply a custom implementation when registering the condition.
/// </remarks>
public interface IAlarmAcknowledger
{
/// <summary>
/// Writes the operator's <paramref name="comment"/> to <paramref name="ackMsgWriteRef"/>.
/// Returns true on driver-reported success, false otherwise. Implementations should
/// propagate cancellation but never throw on a write that the driver cleanly rejects.
/// </summary>
Task<bool> WriteAckMessageAsync(string ackMsgWriteRef, string comment, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,71 @@
using System.Collections.Concurrent;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Server.History;
/// <summary>
/// Default <see cref="IHistoryRouter"/> implementation.
/// </summary>
public sealed class HistoryRouter : IHistoryRouter
{
private readonly ConcurrentDictionary<string, IHistorianDataSource> _registry =
new(StringComparer.OrdinalIgnoreCase);
private bool _disposed;
/// <inheritdoc />
public void Register(string fullReferencePrefix, IHistorianDataSource source)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(fullReferencePrefix);
ArgumentNullException.ThrowIfNull(source);
if (!_registry.TryAdd(fullReferencePrefix, source))
{
throw new InvalidOperationException(
$"A historian data source is already registered for prefix '{fullReferencePrefix}'.");
}
}
/// <inheritdoc />
public IHistorianDataSource? Resolve(string fullReference)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(fullReference);
// Longest-prefix match. Sources are typically a handful per server, so a linear
// scan is fine and avoids building a trie for a low-cardinality registry.
IHistorianDataSource? best = null;
var bestPrefixLength = -1;
foreach (var (prefix, source) in _registry)
{
if (fullReference.StartsWith(prefix, StringComparison.OrdinalIgnoreCase)
&& prefix.Length > bestPrefixLength)
{
best = source;
bestPrefixLength = prefix.Length;
}
}
return best;
}
/// <summary>
/// Disposes every registered source and prevents further registrations or
/// resolutions. Sources may not all be disposable — null-safe disposal pattern.
/// </summary>
public void Dispose()
{
if (_disposed) return;
_disposed = true;
foreach (var source in _registry.Values)
{
try { source.Dispose(); }
catch { /* best-effort — server shutdown should not throw on a misbehaving source */ }
}
_registry.Clear();
}
}

View File

@@ -0,0 +1,37 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Server.History;
/// <summary>
/// Server-level routing of OPC UA HistoryRead service calls to a registered
/// <see cref="IHistorianDataSource"/>. One router per server instance; sources are
/// registered at startup keyed by a driver-side full-reference prefix (typically the
/// driver instance id).
/// </summary>
/// <remarks>
/// <para>
/// The router decouples history availability from the driver lifecycle: a driver
/// can restart (or be temporarily disconnected) without taking history offline,
/// and a single historian can serve nodes from multiple drivers.
/// </para>
/// <para>
/// Resolution is by longest-prefix match so a per-driver source registered under
/// <c>"galaxy"</c> wins over a fallback registered under empty string.
/// </para>
/// </remarks>
public interface IHistoryRouter : IDisposable
{
/// <summary>
/// Resolves a full reference to its registered data source, or null when no source
/// covers it.
/// </summary>
IHistorianDataSource? Resolve(string fullReference);
/// <summary>
/// Registers a data source for full references that start with
/// <paramref name="fullReferencePrefix"/>. Throws when the prefix is already
/// registered — duplicate registrations indicate a startup-config bug rather than
/// a runtime concern.
/// </summary>
void Register(string fullReferencePrefix, IHistorianDataSource source);
}

View File

@@ -5,6 +5,8 @@ using Opc.Ua.Server;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Authorization;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
using ZB.MOM.WW.OtOpcUa.Server.Alarms;
using ZB.MOM.WW.OtOpcUa.Server.History;
using ZB.MOM.WW.OtOpcUa.Server.Security;
using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest;
// Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation
@@ -85,10 +87,31 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
private readonly IReadable? _virtualReadable;
private readonly IReadable? _scriptedAlarmReadable;
// PR 1.3 — server-level history routing. When non-null + a source is registered for
// the requested full reference, the four HistoryRead* overrides dispatch through the
// router. Otherwise we fall back to the legacy `_driver as IHistoryProvider` path
// wrapped in a thin adapter, so existing tests and drivers that still implement
// IHistoryProvider directly keep working until PR 1.W flips DI to register the
// legacy path inside the router.
private readonly IHistoryRouter? _historyRouter;
private LegacyDriverHistoryAdapter? _legacyHistoryAdapter;
// PR 2.3 — server-level alarm-condition state machine. When non-null, every
// MarkAsAlarmCondition call also registers the condition with the service so the
// server runs the Active/Acknowledged/Inactive transitions itself instead of
// relying on the driver's own tracker. _conditionSinks maps conditionId →
// ConditionSink so service-raised transitions reach the right OPC UA AlarmCondition
// sibling. Legacy IAlarmSource path keeps working in parallel until PR 7.2.
private readonly AlarmConditionService? _alarmService;
private readonly Dictionary<string, ConditionSink> _conditionSinks = new(StringComparer.OrdinalIgnoreCase);
private EventHandler<AlarmConditionTransition>? _alarmTransitionHandler;
public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration,
IDriver driver, CapabilityInvoker invoker, ILogger<DriverNodeManager> logger,
AuthorizationGate? authzGate = null, NodeScopeResolver? scopeResolver = null,
IReadable? virtualReadable = null, IReadable? scriptedAlarmReadable = null)
IReadable? virtualReadable = null, IReadable? scriptedAlarmReadable = null,
IHistoryRouter? historyRouter = null,
AlarmConditionService? alarmService = null)
: base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}")
{
_driver = driver;
@@ -100,7 +123,117 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
_scopeResolver = scopeResolver;
_virtualReadable = virtualReadable;
_scriptedAlarmReadable = scriptedAlarmReadable;
_historyRouter = historyRouter;
_alarmService = alarmService;
_logger = logger;
if (_alarmService is not null)
{
_alarmTransitionHandler = OnAlarmServiceTransition;
_alarmService.TransitionRaised += _alarmTransitionHandler;
}
}
/// <summary>
/// Routes <see cref="AlarmConditionService.TransitionRaised"/> to the matching
/// <see cref="ConditionSink"/> registered during <c>MarkAsAlarmCondition</c>. Translates
/// <see cref="AlarmConditionTransition"/> into the legacy <see cref="AlarmEventArgs"/>
/// shape the existing sink consumes — the sink's switch on <c>AlarmType</c> string
/// ("Active" / "Acknowledged" / "Inactive") is preserved so PR 2.3 doesn't perturb the
/// OPC UA Part 9 state mapping. Stale transitions for an untracked condition are
/// silently dropped.
/// </summary>
private void OnAlarmServiceTransition(object? sender, AlarmConditionTransition t)
{
ConditionSink? sink;
lock (Lock)
{
_conditionSinks.TryGetValue(t.ConditionId, out sink);
}
if (sink is null) return;
var transitionName = t.Transition switch
{
AlarmStateTransition.Active => "Active",
AlarmStateTransition.Acknowledged => "Acknowledged",
AlarmStateTransition.Inactive => "Inactive",
_ => "Unknown",
};
sink.OnTransition(new AlarmEventArgs(
SubscriptionHandle: null!,
SourceNodeId: t.ConditionId,
ConditionId: t.ConditionId,
AlarmType: transitionName,
Message: t.Description ?? t.ConditionId,
Severity: MapPriorityToSeverity(t.Priority),
SourceTimestampUtc: t.AtUtc));
}
/// <summary>
/// Maps the integer priority Galaxy carries on <c>.Priority</c> (typically 1-1000) to
/// the four-bucket <see cref="AlarmSeverity"/> the OPC UA condition sibling consumes.
/// Mirrors the legacy <c>GalaxyProxyDriver.MapSeverity</c> bucketing.
/// </summary>
private static AlarmSeverity MapPriorityToSeverity(int priority) => priority switch
{
<= 250 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 800 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
/// <summary>
/// Default <see cref="IAlarmAcknowledger"/> bound to a driver's <see cref="IWritable"/>.
/// Writes the operator comment to the alarm's <c>.AckMsg</c> sub-attribute via the same
/// dispatcher OnWriteValue uses so the resilience pipeline gates the call. Returns
/// false when the driver doesn't implement <see cref="IWritable"/> — alarms whose
/// drivers can't write are tracked but cannot be acknowledged through this path.
/// </summary>
private sealed class DriverWritableAcknowledger(
IWritable? writable, CapabilityInvoker invoker, string driverInstanceId) : IAlarmAcknowledger
{
public async Task<bool> WriteAckMessageAsync(
string ackMsgWriteRef, string comment, CancellationToken cancellationToken)
{
if (writable is null || string.IsNullOrEmpty(ackMsgWriteRef)) return false;
var request = new DriverWriteRequest(
FullReference: ackMsgWriteRef,
Value: comment ?? string.Empty);
try
{
// Ack writes are not idempotent — repeating an ack would re-trigger the
// driver-side acknowledgement state change. False matches the OnWriteValue
// default path for non-Idempotent attributes.
var results = await invoker.ExecuteWriteAsync(
driverInstanceId,
isIdempotent: false,
async ct => await writable.WriteAsync(new[] { request }, ct).ConfigureAwait(false),
cancellationToken).ConfigureAwait(false);
return results.Count > 0 && results[0].StatusCode == 0;
}
catch
{
return false;
}
}
}
/// <summary>
/// Detach from the alarm service before the base disposes. The service is shared across
/// drivers, so leaking the handler keeps a dead DriverNodeManager pinned in memory and
/// dispatches transitions to a sink that's no longer wired to any OPC UA node.
/// </summary>
protected override void Dispose(bool disposing)
{
if (disposing && _alarmService is not null && _alarmTransitionHandler is not null)
{
_alarmService.TransitionRaised -= _alarmTransitionHandler;
_alarmTransitionHandler = null;
}
base.Dispose(disposing);
}
protected override NodeStateCollection LoadPredefinedNodes(ISystemContext context) => new();
@@ -644,7 +777,22 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
// Without this the Report fires but has no subscribers to deliver to.
_owner.AddRootNotifier(alarm);
return new ConditionSink(_owner, alarm);
var sink = new ConditionSink(_owner, alarm);
// PR 2.3 — when the server-level alarm-condition service is wired, register
// this condition with it so the state machine runs server-side. The sink-map
// entry routes future TransitionRaised events back to this OPC UA node.
// Conditions whose info lacks an InAlarmRef can't be observed without driver
// help — those still rely on the legacy IAlarmSource path until PR 7.2.
if (_owner._alarmService is not null && !string.IsNullOrEmpty(info.InAlarmRef))
{
_owner._conditionSinks[FullReference] = sink;
var acker = new DriverWritableAcknowledger(
_owner._writable, _owner._invoker, _owner._driver.DriverInstanceId);
_owner._alarmService.Track(FullReference, info, acker);
}
return sink;
}
}
@@ -808,29 +956,97 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
internal bool TryGetVariable(string fullRef, out BaseDataVariableState? v)
=> _variablesByFullRef.TryGetValue(fullRef, out v!);
// ===================== HistoryRead service handlers (LMX #1, PR 38) =====================
// ===================== HistoryRead service handlers (LMX #1, PR 38; PR 1.3 routing) =====================
//
// Wires the driver's IHistoryProvider capability (PR 35 added ReadAtTimeAsync / ReadEventsAsync
// alongside the PR 19 ReadRawAsync / ReadProcessedAsync) to the OPC UA HistoryRead service.
// CustomNodeManager2 has four protected per-kind hooks; the base dispatches to the right one
// based on the concrete HistoryReadDetails subtype. Each hook is sync-returning-void — the
// per-driver async calls are bridged via GetAwaiter().GetResult(), matching the pattern
// OnReadValue / OnWriteValue already use in this class so HistoryRead doesn't introduce a
// different sync-over-async convention.
// Wires HistoryRead to the server-level IHistoryRouter (PR 1.2). For each tag:
// (1) the router resolves the longest-matching IHistorianDataSource registration —
// when a server-registered source covers the namespace it wins; (2) when the router
// doesn't match (or no router is configured), we fall back to the driver's own
// IHistoryProvider capability via a thin adapter, preserving the legacy behavior tests
// rely on. PR 1.W will register the legacy adapter inside the router as well, at
// which point this fallback can be deleted.
//
// Per-node routing: every HistoryReadValueId in nodesToRead has a NodeHandle in
// nodesToProcess; the NodeHandle's NodeId.Identifier is the driver-side full reference
// (set during Variable() registration) so we can dispatch straight to IHistoryProvider
// without a second lookup. Nodes without IHistoryProvider backing (drivers that don't
// implement the capability) surface BadHistoryOperationUnsupported per slot and the
// rest of the batch continues — same failure-isolation pattern as OnWriteValue.
//
// Continuation-point handling is pass-through only in this PR: the driver returns null
// from its ContinuationPoint field today so the outer result's ContinuationPoint stays
// empty. Full Session.SaveHistoryContinuationPoint plumbing is a follow-up when a driver
// actually needs paging — the dispatch shape doesn't change, only the result-population.
// Continuation-point handling is pass-through only: the source returns null from its
// ContinuationPoint today so the outer result's ContinuationPoint stays empty. Proper
// Session.SaveHistoryContinuationPoint plumbing is a follow-up when a source actually
// needs paging — the dispatch shape doesn't change, only the result-population.
private IHistoryProvider? History => _driver as IHistoryProvider;
/// <summary>
/// Resolves the historian data source for a given driver full reference. Returns
/// null when neither the router nor the legacy IHistoryProvider path can serve it.
/// </summary>
/// <param name="fullRef">
/// Full reference, or null for driver-root event-history queries (event reads can
/// target a notifier rather than a specific variable). Null fullRef skips router
/// lookup and goes straight to the legacy fallback so today's "all events in the
/// driver namespace" path keeps working.
/// </param>
private IHistorianDataSource? ResolveHistory(string? fullRef)
{
if (fullRef is not null
&& _historyRouter?.Resolve(fullRef) is { } routed)
{
return routed;
}
if (_driver is IHistoryProvider legacy)
{
return _legacyHistoryAdapter ??= new LegacyDriverHistoryAdapter(legacy);
}
return null;
}
/// <summary>
/// Wraps a driver's <see cref="IHistoryProvider"/> as an
/// <see cref="IHistorianDataSource"/> so the four HistoryRead* methods can dispatch
/// through one interface regardless of resolution path. PR 1.W's legacy
/// auto-registration uses the same adapter; PR 7.2 deletes both once
/// IHistoryProvider stops being a driver capability.
/// </summary>
// OTOPCUA0001 (UnwrappedCapabilityCallAnalyzer) flags every direct IHistoryProvider call
// that isn't lexically inside a CapabilityInvoker.ExecuteAsync lambda. The adapter's
// pass-throughs are direct calls — but the four HistoryRead* call sites that own the
// adapter ARE inside ExecuteAsync lambdas, so the wrapping is preserved at runtime.
// Suppress here rather than at every call site.
#pragma warning disable OTOPCUA0001
private sealed class LegacyDriverHistoryAdapter(IHistoryProvider provider) : IHistorianDataSource
{
// HistoryReadResult is unqualified-ambiguous in this file (Core.Abstractions vs.
// Opc.Ua); fully qualify on the adapter signatures so the file's existing var-based
// dispatch sites stay readable.
public Task<Core.Abstractions.HistoryReadResult> ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
=> provider.ReadRawAsync(fullReference, startUtc, endUtc, maxValuesPerNode, cancellationToken);
public Task<Core.Abstractions.HistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
=> provider.ReadProcessedAsync(fullReference, startUtc, endUtc, interval, aggregate, cancellationToken);
public Task<Core.Abstractions.HistoryReadResult> ReadAtTimeAsync(
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
=> provider.ReadAtTimeAsync(fullReference, timestampsUtc, cancellationToken);
public Task<HistoricalEventsResult> ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
CancellationToken cancellationToken)
=> provider.ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, cancellationToken);
// Legacy IHistoryProvider has no health surface. Return an "unknown but reachable"
// snapshot so dashboards don't show the data source as broken.
public HistorianHealthSnapshot GetHealthSnapshot()
=> new(0, 0, 0, 0, null, null, null,
ProcessConnectionOpen: true, EventConnectionOpen: true,
ActiveProcessNode: null, ActiveEventNode: null,
Nodes: []);
// Legacy lifecycle is the driver's responsibility — disposing the adapter must
// not dispose the driver out from under DriverNodeManager.
public void Dispose() { }
}
#pragma warning restore OTOPCUA0001
protected override void HistoryReadRawModified(
ServerSystemContext context, ReadRawModifiedDetails details, TimestampsToReturn timestamps,
@@ -838,12 +1054,6 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
IList<ServiceResult> errors, List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
if (History is null)
{
MarkAllUnsupported(nodesToProcess, results, errors);
return;
}
// IsReadModified=true requests a "modifications" history (who changed the data, when
// it was re-written). The driver side has no modifications store — surface that
// explicitly rather than silently returning raw data, which would mislead the client.
@@ -868,6 +1078,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
continue;
}
var source = ResolveHistory(fullRef);
if (source is null)
{
WriteUnsupported(results, errors, i);
continue;
}
if (_authzGate is not null && _scopeResolver is not null)
{
var historyScope = _scopeResolver.Resolve(fullRef);
@@ -883,7 +1100,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
var driverResult = _invoker.ExecuteAsync(
DriverCapability.HistoryRead,
ResolveHostFor(fullRef),
async ct => await History.ReadRawAsync(
async ct => await source.ReadRawAsync(
fullRef,
details.StartTime,
details.EndTime,
@@ -912,12 +1129,6 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
IList<ServiceResult> errors, List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
if (History is null)
{
MarkAllUnsupported(nodesToProcess, results, errors);
return;
}
// AggregateType is one NodeId shared across every item in the batch — map once.
var aggregate = MapAggregate(details.AggregateType?.FirstOrDefault());
if (aggregate is null)
@@ -930,10 +1141,6 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
for (var n = 0; n < nodesToProcess.Count; n++)
{
var handle = nodesToProcess[n];
// NodeHandle.Index points back to the slot in the outer results/errors/nodesToRead
// arrays. nodesToProcess is the filtered subset (just the nodes this manager
// claimed), so writing to results[n] lands in the wrong slot when N > 1 and nodes
// are interleaved across multiple node managers.
var i = handle.Index;
var fullRef = ResolveFullRef(handle);
if (fullRef is null)
@@ -942,6 +1149,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
continue;
}
var source = ResolveHistory(fullRef);
if (source is null)
{
WriteUnsupported(results, errors, i);
continue;
}
if (_authzGate is not null && _scopeResolver is not null)
{
var historyScope = _scopeResolver.Resolve(fullRef);
@@ -957,7 +1171,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
var driverResult = _invoker.ExecuteAsync(
DriverCapability.HistoryRead,
ResolveHostFor(fullRef),
async ct => await History.ReadProcessedAsync(
async ct => await source.ReadProcessedAsync(
fullRef,
details.StartTime,
details.EndTime,
@@ -987,20 +1201,10 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
IList<ServiceResult> errors, List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
if (History is null)
{
MarkAllUnsupported(nodesToProcess, results, errors);
return;
}
var requestedTimes = (IReadOnlyList<DateTime>)(details.ReqTimes?.ToArray() ?? Array.Empty<DateTime>());
for (var n = 0; n < nodesToProcess.Count; n++)
{
var handle = nodesToProcess[n];
// NodeHandle.Index points back to the slot in the outer results/errors/nodesToRead
// arrays. nodesToProcess is the filtered subset (just the nodes this manager
// claimed), so writing to results[n] lands in the wrong slot when N > 1 and nodes
// are interleaved across multiple node managers.
var i = handle.Index;
var fullRef = ResolveFullRef(handle);
if (fullRef is null)
@@ -1009,6 +1213,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
continue;
}
var source = ResolveHistory(fullRef);
if (source is null)
{
WriteUnsupported(results, errors, i);
continue;
}
if (_authzGate is not null && _scopeResolver is not null)
{
var historyScope = _scopeResolver.Resolve(fullRef);
@@ -1024,7 +1235,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
var driverResult = _invoker.ExecuteAsync(
DriverCapability.HistoryRead,
ResolveHostFor(fullRef),
async ct => await History.ReadAtTimeAsync(fullRef, requestedTimes, ct).ConfigureAwait(false),
async ct => await source.ReadAtTimeAsync(fullRef, requestedTimes, ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
@@ -1048,34 +1259,30 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
IList<ServiceResult> errors, List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
if (History is null)
{
MarkAllUnsupported(nodesToProcess, results, errors);
return;
}
// SourceName filter extraction is deferred — EventFilter SelectClauses + WhereClause
// handling is a dedicated concern (proper per-select-clause Variant population + where
// filter evaluation). This PR treats the event query as "all events in range for the
// node's source" and populates only the standard BaseEventType fields. Richer filter
// handling is a follow-up; clients issuing empty/default filters get the right answer
// today which covers the common alarm-history browse case.
// handling is a dedicated concern. This PR treats the event query as "all events in
// range for the node's source" and populates only the standard BaseEventType fields.
var maxEvents = (int)details.NumValuesPerNode;
if (maxEvents <= 0) maxEvents = 1000;
for (var n = 0; n < nodesToProcess.Count; n++)
{
var handle = nodesToProcess[n];
// NodeHandle.Index points back to the slot in the outer results/errors/nodesToRead
// arrays. nodesToProcess is the filtered subset (just the nodes this manager
// claimed), so writing to results[n] lands in the wrong slot when N > 1 and nodes
// are interleaved across multiple node managers.
var i = handle.Index;
// Event history queries may target a notifier object (e.g. the driver-root folder)
// rather than a specific variable — in that case we pass sourceName=null to mean
// "all sources in the driver's namespace" per the IHistoryProvider contract.
// rather than a specific variable — in that case fullRef is null and we pass
// sourceName=null to the source meaning "all sources in this source's namespace."
var fullRef = ResolveFullRef(handle);
// ResolveHistory tolerates null fullRef — for notifier queries the router is
// skipped and the legacy fallback handles "all sources" reads.
var source = ResolveHistory(fullRef);
if (source is null)
{
WriteUnsupported(results, errors, i);
continue;
}
// fullRef is null for event-history queries that target a notifier (driver root).
// Those are cluster-wide reads + need a different scope shape; skip the gate here
// and let the driver-level authz handle them. Non-null path gets per-node gating.
@@ -1094,7 +1301,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
var driverResult = _invoker.ExecuteAsync(
DriverCapability.HistoryRead,
fullRef is null ? _driver.DriverInstanceId : ResolveHostFor(fullRef),
async ct => await History.ReadEventsAsync(
async ct => await source.ReadEventsAsync(
sourceName: fullRef,
startUtc: details.StartTime,
endUtc: details.EndTime,