Compare commits

...

6 Commits

Author SHA1 Message Date
Joseph Doherty
04d267d1ea Phase 2 PR 13 — port GalaxyRuntimeProbeManager state machine + wire per-platform ScanState probing. PR 8 gave operators the gateway-level transport signal (MxAccessClient.ConnectionStateChanged → OnHostStatusChanged tagged with the Wonderware client identity) — enough to detect when the entire MXAccess COM proxy dies, but silent when a specific or goes down while the gateway stays alive. This PR closes that gap: a pure-logic GalaxyRuntimeProbeManager (ported from v1's 472-LOC GalaxyRuntimeProbeManager.cs, distilled to ~240 LOC of state machine without the OPC UA node-manager entanglement) lives in Backend/Stability/, advises <TagName>.ScanState per WinPlatform + AppEngine gobject discovered during DiscoverAsync, runs the Unknown → Running → Stopped state machine with v1's documented semantics preserved verbatim, and raises a StateChanged event on transitions operators should react to. MxAccessGalaxyBackend instantiates the probe manager in the constructor with SubscribeAsync/UnsubscribeAsync delegate pointers into MxAccessClient, hooks StateChanged to forward each transition through the same OnHostStatusChanged IPC event the gateway signal uses (HostName = platform/engine TagName, RuntimeStatus = 'Running'|'Stopped'|'Unknown', LastObservedUtcUnixMs from the state-change timestamp), so Admin UI gets per-host signals flowing through the existing PR 8 wire with no additional IPC plumbing. State machine rules ported from v1 runtimestatus.md: (a) ScanState is on-change-only — a stably-Running host may go hours without a callback, so Running → Stopped is driven only by explicit ScanState=false, never by starvation; (b) Unknown → Running is a startup transition and does NOT fire StateChanged (would paint every host as 'just recovered' at startup, which is noise and can clear Bad quality set by a concurrently-stopping sibling); (c) Stopped → Running fires StateChanged for the real recovery case; (d) Running → Stopped fires StateChanged; (e) Unknown → Stopped fires StateChanged because that's the first-known-bad signal operators need when a host is down at our startup time. MxAccessGalaxyBackend.DiscoverAsync calls _probeManager.SyncAsync with the runtime-host subset of the hierarchy (CategoryId == 1 WinPlatform or 3 AppEngine) as a best-effort step after building the Discover response — probe failures are swallowed so Discover still returns the hierarchy even if a per-host advise fails; the gateway signal covers the critical rung. SyncAsync is idempotent (second call with the same set is a no-op) and handles the diff on re-Discover for tag rename / host add / host remove. Subscribe failure rolls back the host's state entry under the lock so a later probe callback for a never-advised tag can't transition a phantom state from Unknown to Stopped and fan out a false host-down signal (the same protection v1's GalaxyRuntimeProbeManager had at line 237-243 of v1 with a captured-probe-string comparison under the lock). MxAccessGalaxyBackend.Dispose unsubscribes the StateChanged handler before disposing the probe manager to prevent dangling invocation-list references across reconnects, same discipline as PR 8's ConnectionStateChanged and PR 6's SubscriptionReplayFailed. Tests (12 new GalaxyRuntimeProbeManagerTests): Sync_subscribes_to_ScanState_per_host verifies tag.ScanState subscriptions are advised per Platform/Engine; Sync_is_idempotent_on_repeat_call_with_same_set verifies no duplicate subscribes; Sync_unadvises_removed_hosts verifies the diff unadvises gone hosts; Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events covers the rollback-on-subscribe-fail guard; Unknown_to_Running_does_not_fire_StateChanged preserves the startup-noise rule; Running_to_Stopped_fires_StateChanged_with_both_states asserts OldState and NewState are both captured in the transition record; Stopped_to_Running_fires_StateChanged_for_recovery verifies the recovery case; Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal preserves the first-bad rule; Repeated_Good_Running_callbacks_do_not_fire_duplicate_events verifies the state-tracking de-dup; Unknown_callback_for_non_tracked_probe_is_dropped asserts a foreign callback is silently ignored; Snapshot_reports_current_state_for_every_tracked_host covers the dashboard query hook; IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids asserts the CategoryId filter. Galaxy.Host.Tests Unit suite 75 pass / 0 fail (12 new probe + 63 pre-existing). Galaxy.Host builds clean (0 errors / 0 warnings). Branches off v2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:27:56 -04:00
4448db8207 Merge pull request 'Phase 2 PR 12 � richer historian quality mapping' (#11) from phase-2-pr12-quality-mapper into v2 2026-04-18 07:22:44 -04:00
d96b513bbc Merge pull request 'Phase 2 PR 11 � HistoryReadEvents IPC (alarm history)' (#10) from phase-2-pr11-history-events into v2 2026-04-18 07:22:33 -04:00
053c4e0566 Merge pull request 'Phase 2 PR 10 � HistoryReadAtTime IPC surface' (#9) from phase-2-pr10-history-attime into v2 2026-04-18 07:22:16 -04:00
Joseph Doherty
ca025ebe0c Phase 2 PR 11 — HistoryReadEvents IPC (alarm history). New Shared.Contracts messages HistoryReadEventsRequest/Response + GalaxyHistoricalEvent DTO (MessageKind 0x66/0x67). IGalaxyBackend gains HistoryReadEventsAsync, Stub/DbBacked return canonical pending error, MxAccessGalaxyBackend delegates to _historian.ReadEventsAsync (ported in PR 5) and maps HistorianEventDto → GalaxyHistoricalEvent — Guid.ToString() for EventId wire shape, DateTime → Unix ms for both EventTime (when the event fired in the process) and ReceivedTime (when the Historian persisted it), DisplayText + Severity pass through. SourceName is string? — null means 'all sources' (passed straight through to HistorianDataSource.ReadEventsAsync which adds the AddEventFilter('Source', Equal, ...) only when non-null). Distinct from the live GalaxyAlarmEvent type because historical rows carry both timestamps and lack StateTransition (Historian logs instantaneous events, not the OPC UA Part 9 alarm lifecycle; translating to OPC UA event lifecycle is the alarm-subsystem's job). Guards: null historian → Historian-disabled error; SDK exception → Success=false with message chained. Tests (3 new): disabled-error when historian null, maps HistorianEventDto with full field set (Id/Source/EventTime/ReceivedTime/DisplayText/Severity=900) to GalaxyHistoricalEvent, null SourceName passes through unchanged (verifies the 'all sources' contract). Galaxy.Host.Tests Unit suite 34 pass / 0 fail. Galaxy.Host builds clean. Branches off phase-2-pr10-history-attime since both extend the MessageKind enum; fast-forwards if PR 10 merges first.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:08:16 -04:00
Joseph Doherty
d13f919112 Phase 2 PR 10 — HistoryReadAtTime IPC surface. New Shared.Contracts messages HistoryReadAtTimeRequest/Response (MessageKind 0x64/0x65), IGalaxyBackend gains HistoryReadAtTimeAsync, Stub/DbBacked return canonical pending error, MxAccessGalaxyBackend delegates to _historian.ReadAtTimeAsync (ported in PR 5, exposed now) — request timestamp array is flow-encoded as Unix ms to avoid MessagePack DateTime quirks then re-hydrated to DateTime on the Host side. Per-sample mapping uses the same ToWire(HistorianSample) helper as ReadRawAsync so the category→StatusCode mapping stays consistent (Quality byte 192+ → Good 0u, 64-191 → Uncertain, 0-63 → Bad 0x80000000u). Guards: null historian → "Historian disabled" (symmetric with other history paths); empty timestamp array short-circuits to Success=true, Values=[] without an SDK round-trip; SDK exception → Success=false with the message chained. Proxy-side IHistoryProvider.ReadAtTimeAsync capability doesn't exist in Core.Abstractions yet (OPC UA HistoryReadAtTime service is supported but the current IHistoryProvider only has ReadRawAsync + ReadProcessedAsync) — this PR adds the Host-side surface so a future Core.Abstractions extension can wire it through without needing another IPC change. Tests (4 new): disabled-error when historian null, empty-timestamp short-circuit without SDK call, Unix-ms↔DateTime round-trip with Good samples at two distinct timestamps, missing sample (Quality=0) maps to 0x80000000u Bad category. Galaxy.Host.Tests Unit suite: 31 pass / 0 fail (4 new at-time + 27 pre-existing). Galaxy.Host builds clean. Branches off v2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:03:25 -04:00
11 changed files with 1023 additions and 2 deletions

View File

@@ -136,6 +136,24 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadAtTimeResponse
{
Success = false,
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
HistoryReadEventsRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadEventsResponse
{
Success = false,
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });

View File

@@ -39,6 +39,8 @@ public interface IGalaxyBackend
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(HistoryReadAtTimeRequest req, CancellationToken ct);
Task<HistoryReadEventsResponse> HistoryReadEventsAsync(HistoryReadEventsRequest req, CancellationToken ct);
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
}

View File

@@ -7,6 +7,7 @@ using MessagePack;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
@@ -40,6 +41,8 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
private readonly System.EventHandler<bool> _onConnectionStateChanged;
private readonly GalaxyRuntimeProbeManager _probeManager;
private readonly System.EventHandler<HostStateTransition> _onProbeStateChanged;
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
{
@@ -62,8 +65,39 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
});
};
_mx.ConnectionStateChanged += _onConnectionStateChanged;
// PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback,
// which runs the state machine and raises StateChanged on transitions we care about.
// We forward each transition through the same OnHostStatusChanged IPC event that the
// gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the
// Admin UI can show per-host health independently from the top-level transport status.
_probeManager = new GalaxyRuntimeProbeManager(
subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb),
unsubscribe: probe => _mx.UnsubscribeAsync(probe));
_onProbeStateChanged = (_, t) =>
{
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
{
HostName = t.TagName,
RuntimeStatus = t.NewState switch
{
HostRuntimeState.Running => "Running",
HostRuntimeState.Stopped => "Stopped",
_ => "Unknown",
},
LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
});
};
_probeManager.StateChanged += _onProbeStateChanged;
}
/// <summary>
/// Exposed for tests. Production flow: DiscoverAsync completes → backend calls
/// <c>SyncProbesAsync</c> with the runtime hosts (WinPlatform + AppEngine gobjects) to
/// advise ScanState per host.
/// </summary>
internal GalaxyRuntimeProbeManager ProbeManager => _probeManager;
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{
try
@@ -103,6 +137,21 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty<GalaxyAttributeInfo>(),
}).ToArray();
// PR 13: Sync the per-platform probe manager against the just-discovered hierarchy
// so ScanState subscriptions track the current runtime set. Best-effort — probe
// failures don't block Discover from returning, since the gateway-level signal from
// MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to
// that level if any per-host probe couldn't advise.
try
{
var targets = hierarchy
.Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|| o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine)
.Select(o => new HostProbeTarget(o.TagName, o.CategoryId));
await _probeManager.SyncAsync(targets).ConfigureAwait(false);
}
catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ }
return new DiscoverHierarchyResponse { Success = true, Objects = objects };
}
catch (Exception ex)
@@ -324,11 +373,89 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
}
}
public async Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadAtTimeResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Values = Array.Empty<GalaxyDataValue>(),
};
if (req.TimestampsUtcUnixMs.Length == 0)
return new HistoryReadAtTimeResponse { Success = true, Values = Array.Empty<GalaxyDataValue>() };
var timestamps = req.TimestampsUtcUnixMs
.Select(ms => DateTimeOffset.FromUnixTimeMilliseconds(ms).UtcDateTime)
.ToArray();
try
{
var samples = await _historian.ReadAtTimeAsync(req.TagReference, timestamps, ct).ConfigureAwait(false);
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
return new HistoryReadAtTimeResponse { Success = true, Values = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadAtTimeResponse
{
Success = false,
Error = $"Historian at-time read failed: {ex.Message}",
Values = Array.Empty<GalaxyDataValue>(),
};
}
}
public async Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
HistoryReadEventsRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadEventsResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Events = Array.Empty<GalaxyHistoricalEvent>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
try
{
var events = await _historian.ReadEventsAsync(req.SourceName, start, end, req.MaxEvents, ct).ConfigureAwait(false);
var wire = events.Select(e => new GalaxyHistoricalEvent
{
EventId = e.Id.ToString(),
SourceName = e.Source,
EventTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.EventTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
ReceivedTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.ReceivedTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
DisplayText = e.DisplayText,
Severity = e.Severity,
}).ToArray();
return new HistoryReadEventsResponse { Success = true, Events = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadEventsResponse
{
Success = false,
Error = $"Historian event read failed: {ex.Message}",
Events = Array.Empty<GalaxyHistoricalEvent>(),
};
}
}
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
public void Dispose()
{
_probeManager.StateChanged -= _onProbeStateChanged;
_probeManager.Dispose();
_mx.ConnectionStateChanged -= _onConnectionStateChanged;
_historian?.Dispose();
}

View File

@@ -0,0 +1,273 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
/// <summary>
/// Per-platform + per-AppEngine runtime probe. Subscribes to <c>&lt;TagName&gt;.ScanState</c>
/// for each $WinPlatform and $AppEngine gobject, tracks Unknown → Running → Stopped
/// transitions, and fires <see cref="StateChanged"/> so <see cref="Backend.MxAccessGalaxyBackend"/>
/// can forward per-host events through the existing IPC <c>OnHostStatusChanged</c> event.
/// Pure-logic state machine with an injected clock so it's deterministically testable —
/// port of v1 <c>GalaxyRuntimeProbeManager</c> without the OPC UA node-manager coupling.
/// </summary>
/// <remarks>
/// State machine rules (documented in v1's <c>runtimestatus.md</c> and preserved here):
/// <list type="bullet">
/// <item><c>ScanState</c> is on-change-only — a stably-Running host may go hours without a
/// callback. Running → Stopped is driven by an explicit <c>ScanState=false</c> callback,
/// never by starvation.</item>
/// <item>Unknown → Running is a startup transition and does NOT fire StateChanged (would
/// paint every host as "just recovered" at startup, which is noise).</item>
/// <item>Stopped → Running and Running → Stopped fire StateChanged. Unknown → Stopped
/// fires StateChanged because that's a first-known-bad signal operators need.</item>
/// <item>All public methods are thread-safe. Callbacks fire outside the internal lock to
/// avoid lock inversion with caller-owned state.</item>
/// </list>
/// </remarks>
public sealed class GalaxyRuntimeProbeManager : IDisposable
{
public const int CategoryWinPlatform = 1;
public const int CategoryAppEngine = 3;
public const string ProbeAttribute = ".ScanState";
private readonly Func<DateTime> _clock;
private readonly Func<string, Action<string, Vtq>, Task> _subscribe;
private readonly Func<string, Task> _unsubscribe;
private readonly object _lock = new();
// probe tag → per-host state
private readonly Dictionary<string, HostProbeState> _byProbe = new(StringComparer.OrdinalIgnoreCase);
// tag name → probe tag (for reverse lookup on the desired-set diff)
private readonly Dictionary<string, string> _probeByTagName = new(StringComparer.OrdinalIgnoreCase);
private bool _disposed;
/// <summary>
/// Fires on every state transition that operators should react to. See class remarks
/// for the rules on which transitions fire.
/// </summary>
public event EventHandler<HostStateTransition>? StateChanged;
public GalaxyRuntimeProbeManager(
Func<string, Action<string, Vtq>, Task> subscribe,
Func<string, Task> unsubscribe)
: this(subscribe, unsubscribe, () => DateTime.UtcNow) { }
internal GalaxyRuntimeProbeManager(
Func<string, Action<string, Vtq>, Task> subscribe,
Func<string, Task> unsubscribe,
Func<DateTime> clock)
{
_subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe));
_unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe));
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
}
/// <summary>Number of probes currently advised. Test/dashboard hook.</summary>
public int ActiveProbeCount
{
get { lock (_lock) return _byProbe.Count; }
}
/// <summary>
/// Snapshot every currently-tracked host's state. One entry per probe.
/// </summary>
public IReadOnlyList<HostProbeSnapshot> SnapshotStates()
{
lock (_lock)
{
return _byProbe.Select(kv => new HostProbeSnapshot(
TagName: kv.Value.TagName,
State: kv.Value.State,
LastChangedUtc: kv.Value.LastStateChangeUtc)).ToList();
}
}
/// <summary>
/// Query the current runtime state for <paramref name="tagName"/>. Returns
/// <see cref="HostRuntimeState.Unknown"/> when the host is not tracked.
/// </summary>
public HostRuntimeState GetState(string tagName)
{
lock (_lock)
{
if (_probeByTagName.TryGetValue(tagName, out var probe)
&& _byProbe.TryGetValue(probe, out var state))
return state.State;
return HostRuntimeState.Unknown;
}
}
/// <summary>
/// Diff the desired host set (filtered $WinPlatform / $AppEngine from the latest Discover)
/// against the currently-tracked set and advise / unadvise as needed. Idempotent:
/// calling twice with the same set does nothing.
/// </summary>
public async Task SyncAsync(IEnumerable<HostProbeTarget> desiredHosts)
{
if (_disposed) return;
var desired = desiredHosts
.Where(h => !string.IsNullOrWhiteSpace(h.TagName))
.ToDictionary(h => h.TagName, StringComparer.OrdinalIgnoreCase);
List<string> toAdvise;
List<string> toUnadvise;
lock (_lock)
{
toAdvise = desired.Keys
.Where(tag => !_probeByTagName.ContainsKey(tag))
.ToList();
toUnadvise = _probeByTagName.Keys
.Where(tag => !desired.ContainsKey(tag))
.Select(tag => _probeByTagName[tag])
.ToList();
foreach (var tag in toAdvise)
{
var probe = tag + ProbeAttribute;
_probeByTagName[tag] = probe;
_byProbe[probe] = new HostProbeState
{
TagName = tag,
State = HostRuntimeState.Unknown,
LastStateChangeUtc = _clock(),
};
}
foreach (var probe in toUnadvise)
{
_byProbe.Remove(probe);
}
foreach (var removedTag in _probeByTagName.Keys.Where(t => !desired.ContainsKey(t)).ToList())
{
_probeByTagName.Remove(removedTag);
}
}
foreach (var tag in toAdvise)
{
var probe = tag + ProbeAttribute;
try
{
await _subscribe(probe, OnProbeCallback);
}
catch
{
// Rollback on subscribe failure so a later Tick can't transition a never-advised
// probe into a false Stopped state. Callers can re-Sync later to retry.
lock (_lock)
{
_byProbe.Remove(probe);
_probeByTagName.Remove(tag);
}
}
}
foreach (var probe in toUnadvise)
{
try { await _unsubscribe(probe); } catch { /* best-effort cleanup */ }
}
}
/// <summary>
/// Public entry point for tests and internal callbacks. Production flow: MxAccessClient's
/// SubscribeAsync delivers VTQ updates through the callback wired in <see cref="SyncAsync"/>,
/// which calls this method under the lock to update state and fires
/// <see cref="StateChanged"/> outside the lock for any transition that matters.
/// </summary>
public void OnProbeCallback(string probeTag, Vtq vtq)
{
if (_disposed) return;
HostStateTransition? transition = null;
lock (_lock)
{
if (!_byProbe.TryGetValue(probeTag, out var state)) return;
var isRunning = vtq.Quality >= 192 && vtq.Value is bool b && b;
var now = _clock();
var previous = state.State;
state.LastCallbackUtc = now;
if (isRunning)
{
state.GoodUpdateCount++;
if (previous != HostRuntimeState.Running)
{
state.State = HostRuntimeState.Running;
state.LastStateChangeUtc = now;
if (previous == HostRuntimeState.Stopped)
{
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Running, now);
}
}
}
else
{
state.FailureCount++;
if (previous != HostRuntimeState.Stopped)
{
state.State = HostRuntimeState.Stopped;
state.LastStateChangeUtc = now;
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Stopped, now);
}
}
}
if (transition is { } t)
{
StateChanged?.Invoke(this, t);
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
lock (_lock)
{
_byProbe.Clear();
_probeByTagName.Clear();
}
}
private sealed class HostProbeState
{
public string TagName { get; set; } = "";
public HostRuntimeState State { get; set; }
public DateTime LastStateChangeUtc { get; set; }
public DateTime? LastCallbackUtc { get; set; }
public long GoodUpdateCount { get; set; }
public long FailureCount { get; set; }
}
}
public enum HostRuntimeState
{
Unknown,
Running,
Stopped,
}
public sealed record HostStateTransition(
string TagName,
HostRuntimeState OldState,
HostRuntimeState NewState,
DateTime AtUtc);
public sealed record HostProbeSnapshot(
string TagName,
HostRuntimeState State,
DateTime LastChangedUtc);
public readonly record struct HostProbeTarget(string TagName, int CategoryId)
{
public bool IsRuntimeHost =>
CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|| CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine;
}

View File

@@ -94,6 +94,24 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadAtTimeResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
HistoryReadEventsRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadEventsResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse
{

View File

@@ -87,6 +87,20 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
return;
}
case MessageKind.HistoryReadAtTimeRequest:
{
var resp = await backend.HistoryReadAtTimeAsync(
Deserialize<HistoryReadAtTimeRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadAtTimeResponse, resp, ct);
return;
}
case MessageKind.HistoryReadEventsRequest:
{
var resp = await backend.HistoryReadEventsAsync(
Deserialize<HistoryReadEventsRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadEventsResponse, resp, ct);
return;
}
case MessageKind.RecycleHostRequest:
{
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);

View File

@@ -48,10 +48,14 @@ public enum MessageKind : byte
AlarmEvent = 0x51,
AlarmAckRequest = 0x52,
HistoryReadRequest = 0x60,
HistoryReadResponse = 0x61,
HistoryReadRequest = 0x60,
HistoryReadResponse = 0x61,
HistoryReadProcessedRequest = 0x62,
HistoryReadProcessedResponse = 0x63,
HistoryReadAtTimeRequest = 0x64,
HistoryReadAtTimeResponse = 0x65,
HistoryReadEventsRequest = 0x66,
HistoryReadEventsResponse = 0x67,
HostConnectivityStatus = 0x70,
RuntimeStatusChange = 0x71,

View File

@@ -50,3 +50,61 @@ public sealed class HistoryReadProcessedResponse
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
}
/// <summary>
/// At-time historian read — OPC UA HistoryReadAtTime service. Returns one sample per
/// requested timestamp (interpolated when no exact match exists). The per-timestamp array
/// is flow-encoded as Unix milliseconds to avoid MessagePack DateTime quirks.
/// </summary>
[MessagePackObject]
public sealed class HistoryReadAtTimeRequest
{
[Key(0)] public long SessionId { get; set; }
[Key(1)] public string TagReference { get; set; } = string.Empty;
[Key(2)] public long[] TimestampsUtcUnixMs { get; set; } = System.Array.Empty<long>();
}
[MessagePackObject]
public sealed class HistoryReadAtTimeResponse
{
[Key(0)] public bool Success { get; set; }
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
}
/// <summary>
/// Historical events read — OPC UA HistoryReadEvents service and Alarm &amp; Condition
/// history. <c>SourceName</c> null means "all sources". Distinct from the live
/// <see cref="GalaxyAlarmEvent"/> stream because historical rows carry both
/// <c>EventTime</c> (when the event occurred in the process) and <c>ReceivedTime</c>
/// (when the Historian persisted it) and have no StateTransition — the Historian logs
/// the instantaneous event, not the OPC UA alarm lifecycle.
/// </summary>
[MessagePackObject]
public sealed class HistoryReadEventsRequest
{
[Key(0)] public long SessionId { get; set; }
[Key(1)] public string? SourceName { get; set; }
[Key(2)] public long StartUtcUnixMs { get; set; }
[Key(3)] public long EndUtcUnixMs { get; set; }
[Key(4)] public int MaxEvents { get; set; } = 1000;
}
[MessagePackObject]
public sealed class GalaxyHistoricalEvent
{
[Key(0)] public string EventId { get; set; } = string.Empty;
[Key(1)] public string? SourceName { get; set; }
[Key(2)] public long EventTimeUtcUnixMs { get; set; }
[Key(3)] public long ReceivedTimeUtcUnixMs { get; set; }
[Key(4)] public string? DisplayText { get; set; }
[Key(5)] public ushort Severity { get; set; }
}
[MessagePackObject]
public sealed class HistoryReadEventsResponse
{
[Key(0)] public bool Success { get; set; }
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyHistoricalEvent[] Events { get; set; } = System.Array.Empty<GalaxyHistoricalEvent>();
}

View File

@@ -0,0 +1,231 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class GalaxyRuntimeProbeManagerTests
{
private sealed class FakeSubscriber
{
public readonly ConcurrentDictionary<string, Action<string, Vtq>> Subs = new();
public readonly ConcurrentQueue<string> UnsubCalls = new();
public bool FailSubscribeFor { get; set; }
public string? FailSubscribeTag { get; set; }
public Task Subscribe(string probe, Action<string, Vtq> cb)
{
if (FailSubscribeFor && string.Equals(probe, FailSubscribeTag, StringComparison.OrdinalIgnoreCase))
throw new InvalidOperationException("subscribe refused");
Subs[probe] = cb;
return Task.CompletedTask;
}
public Task Unsubscribe(string probe)
{
UnsubCalls.Enqueue(probe);
Subs.TryRemove(probe, out _);
return Task.CompletedTask;
}
}
private static Vtq Good(bool scanState) => new(scanState, DateTime.UtcNow, 192);
private static Vtq Bad() => new(null, DateTime.UtcNow, 0);
[Fact]
public async Task Sync_subscribes_to_ScanState_per_host()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", GalaxyRuntimeProbeManager.CategoryWinPlatform),
new HostProbeTarget("EngineB", GalaxyRuntimeProbeManager.CategoryAppEngine),
});
mgr.ActiveProbeCount.ShouldBe(2);
subs.Subs.ShouldContainKey("PlatformA.ScanState");
subs.Subs.ShouldContainKey("EngineB.ScanState");
}
[Fact]
public async Task Sync_is_idempotent_on_repeat_call_with_same_set()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
var targets = new[] { new HostProbeTarget("PlatformA", 1) };
await mgr.SyncAsync(targets);
await mgr.SyncAsync(targets);
mgr.ActiveProbeCount.ShouldBe(1);
subs.Subs.Count.ShouldBe(1);
subs.UnsubCalls.Count.ShouldBe(0);
}
[Fact]
public async Task Sync_unadvises_removed_hosts()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", 1),
new HostProbeTarget("PlatformB", 1),
});
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
mgr.ActiveProbeCount.ShouldBe(1);
subs.UnsubCalls.ShouldContain("PlatformB.ScanState");
}
[Fact]
public async Task Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events()
{
var subs = new FakeSubscriber { FailSubscribeFor = true, FailSubscribeTag = "PlatformA.ScanState" };
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
mgr.ActiveProbeCount.ShouldBe(0); // rolled back
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Unknown);
}
[Fact]
public async Task Unknown_to_Running_does_not_fire_StateChanged()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Running);
transitions.Count.ShouldBe(0); // startup transition, operators don't care
}
[Fact]
public async Task Running_to_Stopped_fires_StateChanged_with_both_states()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
transitions.Count.ShouldBe(1);
transitions.TryDequeue(out var t).ShouldBeTrue();
t!.TagName.ShouldBe("PlatformA");
t.OldState.ShouldBe(HostRuntimeState.Running);
t.NewState.ShouldBe(HostRuntimeState.Stopped);
}
[Fact]
public async Task Stopped_to_Running_fires_StateChanged_for_recovery()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Stopped→Running (fires)
transitions.Count.ShouldBe(2);
}
[Fact]
public async Task Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
// First callback is bad-quality — we must flag the host Stopped so operators see it.
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Bad());
transitions.Count.ShouldBe(1);
transitions.TryDequeue(out var t).ShouldBeTrue();
t!.OldState.ShouldBe(HostRuntimeState.Unknown);
t.NewState.ShouldBe(HostRuntimeState.Stopped);
}
[Fact]
public async Task Repeated_Good_Running_callbacks_do_not_fire_duplicate_events()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var count = 0;
mgr.StateChanged += (_, _) => Interlocked.Increment(ref count);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
for (var i = 0; i < 5; i++)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
count.ShouldBe(0); // only the silent Unknown→Running on the first, no events after
}
[Fact]
public async Task Unknown_callback_for_non_tracked_probe_is_dropped()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
mgr.OnProbeCallback("ProbeForSomeoneElse.ScanState", Good(true));
mgr.ActiveProbeCount.ShouldBe(0);
}
[Fact]
public async Task Snapshot_reports_current_state_for_every_tracked_host()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", 1),
new HostProbeTarget("EngineB", 3),
});
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Running
subs.Subs["EngineB.ScanState"]("EngineB.ScanState", Bad()); // Stopped
var snap = mgr.SnapshotStates();
snap.Count.ShouldBe(2);
snap.ShouldContain(s => s.TagName == "PlatformA" && s.State == HostRuntimeState.Running);
snap.ShouldContain(s => s.TagName == "EngineB" && s.State == HostRuntimeState.Stopped);
}
[Fact]
public void IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids()
{
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryWinPlatform).IsRuntimeHost.ShouldBeTrue();
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryAppEngine).IsRuntimeHost.ShouldBeTrue();
new HostProbeTarget("X", 4 /* $Area */).IsRuntimeHost.ShouldBeFalse();
new HostProbeTarget("X", 11 /* $ApplicationObject */).IsRuntimeHost.ShouldBeFalse();
}
}

View File

@@ -0,0 +1,147 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HistoryReadAtTimeTests
{
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? historian, StaPump pump) =>
new(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
new MxAccessClient(pump, new MxProxyAdapter(), "attime-test"),
historian);
[Fact]
public async Task Returns_disabled_error_when_no_historian_configured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
using var backend = BuildBackend(null, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = new[] { 1L, 2L },
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
}
[Fact]
public async Task Empty_timestamp_list_short_circuits_to_success_with_no_values()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var fake = new FakeHistorian();
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = Array.Empty<long>(),
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.ShouldBeEmpty();
fake.Calls.ShouldBe(0); // no round-trip to SDK for empty timestamp list
}
[Fact]
public async Task Timestamps_survive_Unix_ms_round_trip_to_DateTime()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var t1 = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var t2 = new DateTime(2026, 4, 18, 10, 5, 0, DateTimeKind.Utc);
var fake = new FakeHistorian(
new HistorianSample { Value = 100.0, Quality = 192, TimestampUtc = t1 },
new HistorianSample { Value = 101.5, Quality = 192, TimestampUtc = t2 });
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "TankLevel",
TimestampsUtcUnixMs = new[]
{
new DateTimeOffset(t1, TimeSpan.Zero).ToUnixTimeMilliseconds(),
new DateTimeOffset(t2, TimeSpan.Zero).ToUnixTimeMilliseconds(),
},
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(2);
resp.Values[0].SourceTimestampUtcUnixMs.ShouldBe(new DateTimeOffset(t1, TimeSpan.Zero).ToUnixTimeMilliseconds());
resp.Values[0].StatusCode.ShouldBe(0u); // Good (quality 192)
MessagePackSerializer.Deserialize<double>(resp.Values[0].ValueBytes!).ShouldBe(100.0);
fake.Calls.ShouldBe(1);
fake.LastTimestamps.Length.ShouldBe(2);
fake.LastTimestamps[0].ShouldBe(t1);
fake.LastTimestamps[1].ShouldBe(t2);
}
[Fact]
public async Task Missing_sample_maps_to_Bad_category()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
// Quality=0 means no sample at that timestamp per HistorianDataSource.ReadAtTimeAsync.
var fake = new FakeHistorian(new HistorianSample
{
Value = null,
Quality = 0,
TimestampUtc = DateTime.UtcNow,
});
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = new[] { 1L },
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(1);
resp.Values[0].StatusCode.ShouldBe(0x80000000u); // Bad category
resp.Values[0].ValueBytes.ShouldBeNull();
}
private sealed class FakeHistorian : IHistorianDataSource
{
private readonly HistorianSample[] _samples;
public int Calls { get; private set; }
public DateTime[] LastTimestamps { get; private set; } = Array.Empty<DateTime>();
public FakeHistorian(params HistorianSample[] samples) => _samples = samples;
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
{
Calls++;
LastTimestamps = ts;
return Task.FromResult(new List<HistorianSample>(_samples));
}
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
=> Task.FromResult(new List<HistorianAggregateSample>());
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianEventDto>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}

View File

@@ -0,0 +1,129 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HistoryReadEventsTests
{
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? h, StaPump pump) =>
new(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
new MxAccessClient(pump, new MxProxyAdapter(), "events-test"),
h);
[Fact]
public async Task Returns_disabled_error_when_no_historian_configured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
using var backend = BuildBackend(null, pump);
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
{
SourceName = "TankA",
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
MaxEvents = 100,
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
}
[Fact]
public async Task Maps_HistorianEventDto_to_GalaxyHistoricalEvent_wire_shape()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var eventId = Guid.NewGuid();
var eventTime = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var receivedTime = eventTime.AddMilliseconds(150);
var fake = new FakeHistorian(new HistorianEventDto
{
Id = eventId,
Source = "TankA.Level.HiHi",
EventTime = eventTime,
ReceivedTime = receivedTime,
DisplayText = "HiHi alarm tripped",
Severity = 900,
});
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
{
SourceName = "TankA.Level.HiHi",
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
MaxEvents = 50,
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Events.Length.ShouldBe(1);
var got = resp.Events[0];
got.EventId.ShouldBe(eventId.ToString());
got.SourceName.ShouldBe("TankA.Level.HiHi");
got.DisplayText.ShouldBe("HiHi alarm tripped");
got.Severity.ShouldBe<ushort>(900);
got.EventTimeUtcUnixMs.ShouldBe(new DateTimeOffset(eventTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
got.ReceivedTimeUtcUnixMs.ShouldBe(new DateTimeOffset(receivedTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
fake.LastSourceName.ShouldBe("TankA.Level.HiHi");
fake.LastMaxEvents.ShouldBe(50);
}
[Fact]
public async Task Null_source_name_is_passed_through_as_all_sources()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var fake = new FakeHistorian();
using var backend = BuildBackend(fake, pump);
await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
{
SourceName = null,
StartUtcUnixMs = 0,
EndUtcUnixMs = 1,
MaxEvents = 10,
}, CancellationToken.None);
fake.LastSourceName.ShouldBeNull();
}
private sealed class FakeHistorian : IHistorianDataSource
{
private readonly HistorianEventDto[] _events;
public string? LastSourceName { get; private set; } = "<unset>";
public int LastMaxEvents { get; private set; }
public FakeHistorian(params HistorianEventDto[] events) => _events = events;
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
{
LastSourceName = src;
LastMaxEvents = max;
return Task.FromResult(new List<HistorianEventDto>(_events));
}
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
=> Task.FromResult(new List<HistorianAggregateSample>());
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}