Compare commits
6 Commits
phase-2-pr
...
phase-2-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04d267d1ea | ||
| 4448db8207 | |||
| d96b513bbc | |||
| 053c4e0566 | |||
|
|
f24f969a85 | ||
|
|
ca025ebe0c |
@@ -145,6 +145,15 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
|
||||
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||
});
|
||||
|
||||
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||
HistoryReadEventsRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new HistoryReadEventsResponse
|
||||
{
|
||||
Success = false,
|
||||
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
|
||||
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
|
||||
});
|
||||
|
||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||
|
||||
/// <summary>
|
||||
/// Maps a raw OPC DA quality byte (as returned by Wonderware Historian's <c>OpcQuality</c>)
|
||||
/// to an OPC UA <c>StatusCode</c> uint. Preserves specific codes (BadNotConnected,
|
||||
/// UncertainSubNormal, etc.) instead of collapsing to Good/Uncertain/Bad categories.
|
||||
/// Mirrors v1 <c>QualityMapper.MapToOpcUaStatusCode</c> without pulling in OPC UA types —
|
||||
/// the returned value is the 32-bit OPC UA <c>StatusCode</c> wire encoding that the Proxy
|
||||
/// surfaces directly as <c>DataValueSnapshot.StatusCode</c>.
|
||||
/// </summary>
|
||||
public static class HistorianQualityMapper
|
||||
{
|
||||
/// <summary>
|
||||
/// Map an 8-bit OPC DA quality byte to the corresponding OPC UA StatusCode. The byte
|
||||
/// family bits decide the category (Good >= 192, Uncertain 64-191, Bad 0-63); the
|
||||
/// low-nibble subcode selects the specific code.
|
||||
/// </summary>
|
||||
public static uint Map(byte q) => q switch
|
||||
{
|
||||
// Good family (192+)
|
||||
192 => 0x00000000u, // Good
|
||||
216 => 0x00D80000u, // Good_LocalOverride
|
||||
|
||||
// Uncertain family (64-191)
|
||||
64 => 0x40000000u, // Uncertain
|
||||
68 => 0x40900000u, // Uncertain_LastUsableValue
|
||||
80 => 0x40930000u, // Uncertain_SensorNotAccurate
|
||||
84 => 0x40940000u, // Uncertain_EngineeringUnitsExceeded
|
||||
88 => 0x40950000u, // Uncertain_SubNormal
|
||||
|
||||
// Bad family (0-63)
|
||||
0 => 0x80000000u, // Bad
|
||||
4 => 0x80890000u, // Bad_ConfigurationError
|
||||
8 => 0x808A0000u, // Bad_NotConnected
|
||||
12 => 0x808B0000u, // Bad_DeviceFailure
|
||||
16 => 0x808C0000u, // Bad_SensorFailure
|
||||
20 => 0x80050000u, // Bad_CommunicationError
|
||||
24 => 0x808D0000u, // Bad_OutOfService
|
||||
32 => 0x80320000u, // Bad_WaitingForInitialData
|
||||
|
||||
// Unknown code — fall back to the category so callers still get a sensible bucket.
|
||||
_ when q >= 192 => 0x00000000u,
|
||||
_ when q >= 64 => 0x40000000u,
|
||||
_ => 0x80000000u,
|
||||
};
|
||||
}
|
||||
@@ -40,6 +40,7 @@ public interface IGalaxyBackend
|
||||
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
|
||||
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
|
||||
Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(HistoryReadAtTimeRequest req, CancellationToken ct);
|
||||
Task<HistoryReadEventsResponse> HistoryReadEventsAsync(HistoryReadEventsRequest req, CancellationToken ct);
|
||||
|
||||
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ using MessagePack;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
@@ -40,6 +41,8 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
||||
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||
|
||||
private readonly System.EventHandler<bool> _onConnectionStateChanged;
|
||||
private readonly GalaxyRuntimeProbeManager _probeManager;
|
||||
private readonly System.EventHandler<HostStateTransition> _onProbeStateChanged;
|
||||
|
||||
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
|
||||
{
|
||||
@@ -62,8 +65,39 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
||||
});
|
||||
};
|
||||
_mx.ConnectionStateChanged += _onConnectionStateChanged;
|
||||
|
||||
// PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback,
|
||||
// which runs the state machine and raises StateChanged on transitions we care about.
|
||||
// We forward each transition through the same OnHostStatusChanged IPC event that the
|
||||
// gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the
|
||||
// Admin UI can show per-host health independently from the top-level transport status.
|
||||
_probeManager = new GalaxyRuntimeProbeManager(
|
||||
subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb),
|
||||
unsubscribe: probe => _mx.UnsubscribeAsync(probe));
|
||||
_onProbeStateChanged = (_, t) =>
|
||||
{
|
||||
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
|
||||
{
|
||||
HostName = t.TagName,
|
||||
RuntimeStatus = t.NewState switch
|
||||
{
|
||||
HostRuntimeState.Running => "Running",
|
||||
HostRuntimeState.Stopped => "Stopped",
|
||||
_ => "Unknown",
|
||||
},
|
||||
LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||
});
|
||||
};
|
||||
_probeManager.StateChanged += _onProbeStateChanged;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Exposed for tests. Production flow: DiscoverAsync completes → backend calls
|
||||
/// <c>SyncProbesAsync</c> with the runtime hosts (WinPlatform + AppEngine gobjects) to
|
||||
/// advise ScanState per host.
|
||||
/// </summary>
|
||||
internal GalaxyRuntimeProbeManager ProbeManager => _probeManager;
|
||||
|
||||
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
@@ -103,6 +137,21 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
||||
Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty<GalaxyAttributeInfo>(),
|
||||
}).ToArray();
|
||||
|
||||
// PR 13: Sync the per-platform probe manager against the just-discovered hierarchy
|
||||
// so ScanState subscriptions track the current runtime set. Best-effort — probe
|
||||
// failures don't block Discover from returning, since the gateway-level signal from
|
||||
// MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to
|
||||
// that level if any per-host probe couldn't advise.
|
||||
try
|
||||
{
|
||||
var targets = hierarchy
|
||||
.Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|
||||
|| o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine)
|
||||
.Select(o => new HostProbeTarget(o.TagName, o.CategoryId));
|
||||
await _probeManager.SyncAsync(targets).ConfigureAwait(false);
|
||||
}
|
||||
catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ }
|
||||
|
||||
return new DiscoverHierarchyResponse { Success = true, Objects = objects };
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -360,11 +409,53 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||
HistoryReadEventsRequest req, CancellationToken ct)
|
||||
{
|
||||
if (_historian is null)
|
||||
return new HistoryReadEventsResponse
|
||||
{
|
||||
Success = false,
|
||||
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
||||
Events = Array.Empty<GalaxyHistoricalEvent>(),
|
||||
};
|
||||
|
||||
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
||||
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
||||
|
||||
try
|
||||
{
|
||||
var events = await _historian.ReadEventsAsync(req.SourceName, start, end, req.MaxEvents, ct).ConfigureAwait(false);
|
||||
var wire = events.Select(e => new GalaxyHistoricalEvent
|
||||
{
|
||||
EventId = e.Id.ToString(),
|
||||
SourceName = e.Source,
|
||||
EventTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.EventTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||
ReceivedTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.ReceivedTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||
DisplayText = e.DisplayText,
|
||||
Severity = e.Severity,
|
||||
}).ToArray();
|
||||
return new HistoryReadEventsResponse { Success = true, Events = wire };
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new HistoryReadEventsResponse
|
||||
{
|
||||
Success = false,
|
||||
Error = $"Historian event read failed: {ex.Message}",
|
||||
Events = Array.Empty<GalaxyHistoricalEvent>(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_probeManager.StateChanged -= _onProbeStateChanged;
|
||||
_probeManager.Dispose();
|
||||
_mx.ConnectionStateChanged -= _onConnectionStateChanged;
|
||||
_historian?.Dispose();
|
||||
}
|
||||
@@ -391,19 +482,11 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
||||
TagReference = reference,
|
||||
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
|
||||
ValueMessagePackType = 0,
|
||||
StatusCode = MapHistorianQualityToOpcUa(sample.Quality),
|
||||
StatusCode = HistorianQualityMapper.Map(sample.Quality),
|
||||
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
};
|
||||
|
||||
private static uint MapHistorianQualityToOpcUa(byte q)
|
||||
{
|
||||
// Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges.
|
||||
// The Proxy may refine this when it decodes the wire frame.
|
||||
if (q >= 192) return 0x00000000u; // Good
|
||||
if (q >= 64) return 0x40000000u; // Uncertain
|
||||
return 0x80000000u; // Bad
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Maps a <see cref="HistorianAggregateSample"/> (one aggregate bucket) to the IPC wire
|
||||
|
||||
@@ -0,0 +1,273 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
|
||||
|
||||
/// <summary>
|
||||
/// Per-platform + per-AppEngine runtime probe. Subscribes to <c><TagName>.ScanState</c>
|
||||
/// for each $WinPlatform and $AppEngine gobject, tracks Unknown → Running → Stopped
|
||||
/// transitions, and fires <see cref="StateChanged"/> so <see cref="Backend.MxAccessGalaxyBackend"/>
|
||||
/// can forward per-host events through the existing IPC <c>OnHostStatusChanged</c> event.
|
||||
/// Pure-logic state machine with an injected clock so it's deterministically testable —
|
||||
/// port of v1 <c>GalaxyRuntimeProbeManager</c> without the OPC UA node-manager coupling.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// State machine rules (documented in v1's <c>runtimestatus.md</c> and preserved here):
|
||||
/// <list type="bullet">
|
||||
/// <item><c>ScanState</c> is on-change-only — a stably-Running host may go hours without a
|
||||
/// callback. Running → Stopped is driven by an explicit <c>ScanState=false</c> callback,
|
||||
/// never by starvation.</item>
|
||||
/// <item>Unknown → Running is a startup transition and does NOT fire StateChanged (would
|
||||
/// paint every host as "just recovered" at startup, which is noise).</item>
|
||||
/// <item>Stopped → Running and Running → Stopped fire StateChanged. Unknown → Stopped
|
||||
/// fires StateChanged because that's a first-known-bad signal operators need.</item>
|
||||
/// <item>All public methods are thread-safe. Callbacks fire outside the internal lock to
|
||||
/// avoid lock inversion with caller-owned state.</item>
|
||||
/// </list>
|
||||
/// </remarks>
|
||||
public sealed class GalaxyRuntimeProbeManager : IDisposable
|
||||
{
|
||||
public const int CategoryWinPlatform = 1;
|
||||
public const int CategoryAppEngine = 3;
|
||||
public const string ProbeAttribute = ".ScanState";
|
||||
|
||||
private readonly Func<DateTime> _clock;
|
||||
private readonly Func<string, Action<string, Vtq>, Task> _subscribe;
|
||||
private readonly Func<string, Task> _unsubscribe;
|
||||
private readonly object _lock = new();
|
||||
|
||||
// probe tag → per-host state
|
||||
private readonly Dictionary<string, HostProbeState> _byProbe = new(StringComparer.OrdinalIgnoreCase);
|
||||
// tag name → probe tag (for reverse lookup on the desired-set diff)
|
||||
private readonly Dictionary<string, string> _probeByTagName = new(StringComparer.OrdinalIgnoreCase);
|
||||
private bool _disposed;
|
||||
|
||||
/// <summary>
|
||||
/// Fires on every state transition that operators should react to. See class remarks
|
||||
/// for the rules on which transitions fire.
|
||||
/// </summary>
|
||||
public event EventHandler<HostStateTransition>? StateChanged;
|
||||
|
||||
public GalaxyRuntimeProbeManager(
|
||||
Func<string, Action<string, Vtq>, Task> subscribe,
|
||||
Func<string, Task> unsubscribe)
|
||||
: this(subscribe, unsubscribe, () => DateTime.UtcNow) { }
|
||||
|
||||
internal GalaxyRuntimeProbeManager(
|
||||
Func<string, Action<string, Vtq>, Task> subscribe,
|
||||
Func<string, Task> unsubscribe,
|
||||
Func<DateTime> clock)
|
||||
{
|
||||
_subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe));
|
||||
_unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe));
|
||||
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
|
||||
}
|
||||
|
||||
/// <summary>Number of probes currently advised. Test/dashboard hook.</summary>
|
||||
public int ActiveProbeCount
|
||||
{
|
||||
get { lock (_lock) return _byProbe.Count; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Snapshot every currently-tracked host's state. One entry per probe.
|
||||
/// </summary>
|
||||
public IReadOnlyList<HostProbeSnapshot> SnapshotStates()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
return _byProbe.Select(kv => new HostProbeSnapshot(
|
||||
TagName: kv.Value.TagName,
|
||||
State: kv.Value.State,
|
||||
LastChangedUtc: kv.Value.LastStateChangeUtc)).ToList();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Query the current runtime state for <paramref name="tagName"/>. Returns
|
||||
/// <see cref="HostRuntimeState.Unknown"/> when the host is not tracked.
|
||||
/// </summary>
|
||||
public HostRuntimeState GetState(string tagName)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_probeByTagName.TryGetValue(tagName, out var probe)
|
||||
&& _byProbe.TryGetValue(probe, out var state))
|
||||
return state.State;
|
||||
return HostRuntimeState.Unknown;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Diff the desired host set (filtered $WinPlatform / $AppEngine from the latest Discover)
|
||||
/// against the currently-tracked set and advise / unadvise as needed. Idempotent:
|
||||
/// calling twice with the same set does nothing.
|
||||
/// </summary>
|
||||
public async Task SyncAsync(IEnumerable<HostProbeTarget> desiredHosts)
|
||||
{
|
||||
if (_disposed) return;
|
||||
|
||||
var desired = desiredHosts
|
||||
.Where(h => !string.IsNullOrWhiteSpace(h.TagName))
|
||||
.ToDictionary(h => h.TagName, StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
List<string> toAdvise;
|
||||
List<string> toUnadvise;
|
||||
lock (_lock)
|
||||
{
|
||||
toAdvise = desired.Keys
|
||||
.Where(tag => !_probeByTagName.ContainsKey(tag))
|
||||
.ToList();
|
||||
toUnadvise = _probeByTagName.Keys
|
||||
.Where(tag => !desired.ContainsKey(tag))
|
||||
.Select(tag => _probeByTagName[tag])
|
||||
.ToList();
|
||||
|
||||
foreach (var tag in toAdvise)
|
||||
{
|
||||
var probe = tag + ProbeAttribute;
|
||||
_probeByTagName[tag] = probe;
|
||||
_byProbe[probe] = new HostProbeState
|
||||
{
|
||||
TagName = tag,
|
||||
State = HostRuntimeState.Unknown,
|
||||
LastStateChangeUtc = _clock(),
|
||||
};
|
||||
}
|
||||
|
||||
foreach (var probe in toUnadvise)
|
||||
{
|
||||
_byProbe.Remove(probe);
|
||||
}
|
||||
|
||||
foreach (var removedTag in _probeByTagName.Keys.Where(t => !desired.ContainsKey(t)).ToList())
|
||||
{
|
||||
_probeByTagName.Remove(removedTag);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var tag in toAdvise)
|
||||
{
|
||||
var probe = tag + ProbeAttribute;
|
||||
try
|
||||
{
|
||||
await _subscribe(probe, OnProbeCallback);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Rollback on subscribe failure so a later Tick can't transition a never-advised
|
||||
// probe into a false Stopped state. Callers can re-Sync later to retry.
|
||||
lock (_lock)
|
||||
{
|
||||
_byProbe.Remove(probe);
|
||||
_probeByTagName.Remove(tag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var probe in toUnadvise)
|
||||
{
|
||||
try { await _unsubscribe(probe); } catch { /* best-effort cleanup */ }
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Public entry point for tests and internal callbacks. Production flow: MxAccessClient's
|
||||
/// SubscribeAsync delivers VTQ updates through the callback wired in <see cref="SyncAsync"/>,
|
||||
/// which calls this method under the lock to update state and fires
|
||||
/// <see cref="StateChanged"/> outside the lock for any transition that matters.
|
||||
/// </summary>
|
||||
public void OnProbeCallback(string probeTag, Vtq vtq)
|
||||
{
|
||||
if (_disposed) return;
|
||||
|
||||
HostStateTransition? transition = null;
|
||||
lock (_lock)
|
||||
{
|
||||
if (!_byProbe.TryGetValue(probeTag, out var state)) return;
|
||||
|
||||
var isRunning = vtq.Quality >= 192 && vtq.Value is bool b && b;
|
||||
var now = _clock();
|
||||
var previous = state.State;
|
||||
state.LastCallbackUtc = now;
|
||||
|
||||
if (isRunning)
|
||||
{
|
||||
state.GoodUpdateCount++;
|
||||
if (previous != HostRuntimeState.Running)
|
||||
{
|
||||
state.State = HostRuntimeState.Running;
|
||||
state.LastStateChangeUtc = now;
|
||||
if (previous == HostRuntimeState.Stopped)
|
||||
{
|
||||
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Running, now);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
state.FailureCount++;
|
||||
if (previous != HostRuntimeState.Stopped)
|
||||
{
|
||||
state.State = HostRuntimeState.Stopped;
|
||||
state.LastStateChangeUtc = now;
|
||||
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Stopped, now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (transition is { } t)
|
||||
{
|
||||
StateChanged?.Invoke(this, t);
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
lock (_lock)
|
||||
{
|
||||
_byProbe.Clear();
|
||||
_probeByTagName.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class HostProbeState
|
||||
{
|
||||
public string TagName { get; set; } = "";
|
||||
public HostRuntimeState State { get; set; }
|
||||
public DateTime LastStateChangeUtc { get; set; }
|
||||
public DateTime? LastCallbackUtc { get; set; }
|
||||
public long GoodUpdateCount { get; set; }
|
||||
public long FailureCount { get; set; }
|
||||
}
|
||||
}
|
||||
|
||||
public enum HostRuntimeState
|
||||
{
|
||||
Unknown,
|
||||
Running,
|
||||
Stopped,
|
||||
}
|
||||
|
||||
public sealed record HostStateTransition(
|
||||
string TagName,
|
||||
HostRuntimeState OldState,
|
||||
HostRuntimeState NewState,
|
||||
DateTime AtUtc);
|
||||
|
||||
public sealed record HostProbeSnapshot(
|
||||
string TagName,
|
||||
HostRuntimeState State,
|
||||
DateTime LastChangedUtc);
|
||||
|
||||
public readonly record struct HostProbeTarget(string TagName, int CategoryId)
|
||||
{
|
||||
public bool IsRuntimeHost =>
|
||||
CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|
||||
|| CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine;
|
||||
}
|
||||
@@ -103,6 +103,15 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
|
||||
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||
});
|
||||
|
||||
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||
HistoryReadEventsRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new HistoryReadEventsResponse
|
||||
{
|
||||
Success = false,
|
||||
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
|
||||
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
|
||||
});
|
||||
|
||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||
=> Task.FromResult(new RecycleStatusResponse
|
||||
{
|
||||
|
||||
@@ -94,6 +94,13 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
|
||||
await writer.WriteAsync(MessageKind.HistoryReadAtTimeResponse, resp, ct);
|
||||
return;
|
||||
}
|
||||
case MessageKind.HistoryReadEventsRequest:
|
||||
{
|
||||
var resp = await backend.HistoryReadEventsAsync(
|
||||
Deserialize<HistoryReadEventsRequest>(body), ct);
|
||||
await writer.WriteAsync(MessageKind.HistoryReadEventsResponse, resp, ct);
|
||||
return;
|
||||
}
|
||||
case MessageKind.RecycleHostRequest:
|
||||
{
|
||||
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
|
||||
|
||||
@@ -54,6 +54,8 @@ public enum MessageKind : byte
|
||||
HistoryReadProcessedResponse = 0x63,
|
||||
HistoryReadAtTimeRequest = 0x64,
|
||||
HistoryReadAtTimeResponse = 0x65,
|
||||
HistoryReadEventsRequest = 0x66,
|
||||
HistoryReadEventsResponse = 0x67,
|
||||
|
||||
HostConnectivityStatus = 0x70,
|
||||
RuntimeStatusChange = 0x71,
|
||||
|
||||
@@ -71,3 +71,40 @@ public sealed class HistoryReadAtTimeResponse
|
||||
[Key(1)] public string? Error { get; set; }
|
||||
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Historical events read — OPC UA HistoryReadEvents service and Alarm & Condition
|
||||
/// history. <c>SourceName</c> null means "all sources". Distinct from the live
|
||||
/// <see cref="GalaxyAlarmEvent"/> stream because historical rows carry both
|
||||
/// <c>EventTime</c> (when the event occurred in the process) and <c>ReceivedTime</c>
|
||||
/// (when the Historian persisted it) and have no StateTransition — the Historian logs
|
||||
/// the instantaneous event, not the OPC UA alarm lifecycle.
|
||||
/// </summary>
|
||||
[MessagePackObject]
|
||||
public sealed class HistoryReadEventsRequest
|
||||
{
|
||||
[Key(0)] public long SessionId { get; set; }
|
||||
[Key(1)] public string? SourceName { get; set; }
|
||||
[Key(2)] public long StartUtcUnixMs { get; set; }
|
||||
[Key(3)] public long EndUtcUnixMs { get; set; }
|
||||
[Key(4)] public int MaxEvents { get; set; } = 1000;
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class GalaxyHistoricalEvent
|
||||
{
|
||||
[Key(0)] public string EventId { get; set; } = string.Empty;
|
||||
[Key(1)] public string? SourceName { get; set; }
|
||||
[Key(2)] public long EventTimeUtcUnixMs { get; set; }
|
||||
[Key(3)] public long ReceivedTimeUtcUnixMs { get; set; }
|
||||
[Key(4)] public string? DisplayText { get; set; }
|
||||
[Key(5)] public ushort Severity { get; set; }
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class HistoryReadEventsResponse
|
||||
{
|
||||
[Key(0)] public bool Success { get; set; }
|
||||
[Key(1)] public string? Error { get; set; }
|
||||
[Key(2)] public GalaxyHistoricalEvent[] Events { get; set; } = System.Array.Empty<GalaxyHistoricalEvent>();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,231 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class GalaxyRuntimeProbeManagerTests
|
||||
{
|
||||
private sealed class FakeSubscriber
|
||||
{
|
||||
public readonly ConcurrentDictionary<string, Action<string, Vtq>> Subs = new();
|
||||
public readonly ConcurrentQueue<string> UnsubCalls = new();
|
||||
public bool FailSubscribeFor { get; set; }
|
||||
public string? FailSubscribeTag { get; set; }
|
||||
|
||||
public Task Subscribe(string probe, Action<string, Vtq> cb)
|
||||
{
|
||||
if (FailSubscribeFor && string.Equals(probe, FailSubscribeTag, StringComparison.OrdinalIgnoreCase))
|
||||
throw new InvalidOperationException("subscribe refused");
|
||||
Subs[probe] = cb;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task Unsubscribe(string probe)
|
||||
{
|
||||
UnsubCalls.Enqueue(probe);
|
||||
Subs.TryRemove(probe, out _);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private static Vtq Good(bool scanState) => new(scanState, DateTime.UtcNow, 192);
|
||||
private static Vtq Bad() => new(null, DateTime.UtcNow, 0);
|
||||
|
||||
[Fact]
|
||||
public async Task Sync_subscribes_to_ScanState_per_host()
|
||||
{
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||
|
||||
await mgr.SyncAsync(new[]
|
||||
{
|
||||
new HostProbeTarget("PlatformA", GalaxyRuntimeProbeManager.CategoryWinPlatform),
|
||||
new HostProbeTarget("EngineB", GalaxyRuntimeProbeManager.CategoryAppEngine),
|
||||
});
|
||||
|
||||
mgr.ActiveProbeCount.ShouldBe(2);
|
||||
subs.Subs.ShouldContainKey("PlatformA.ScanState");
|
||||
subs.Subs.ShouldContainKey("EngineB.ScanState");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Sync_is_idempotent_on_repeat_call_with_same_set()
|
||||
{
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||
var targets = new[] { new HostProbeTarget("PlatformA", 1) };
|
||||
|
||||
await mgr.SyncAsync(targets);
|
||||
await mgr.SyncAsync(targets);
|
||||
|
||||
mgr.ActiveProbeCount.ShouldBe(1);
|
||||
subs.Subs.Count.ShouldBe(1);
|
||||
subs.UnsubCalls.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Sync_unadvises_removed_hosts()
|
||||
{
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||
|
||||
await mgr.SyncAsync(new[]
|
||||
{
|
||||
new HostProbeTarget("PlatformA", 1),
|
||||
new HostProbeTarget("PlatformB", 1),
|
||||
});
|
||||
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||
|
||||
mgr.ActiveProbeCount.ShouldBe(1);
|
||||
subs.UnsubCalls.ShouldContain("PlatformB.ScanState");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events()
|
||||
{
|
||||
var subs = new FakeSubscriber { FailSubscribeFor = true, FailSubscribeTag = "PlatformA.ScanState" };
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||
|
||||
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||
|
||||
mgr.ActiveProbeCount.ShouldBe(0); // rolled back
|
||||
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Unknown);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unknown_to_Running_does_not_fire_StateChanged()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||
var transitions = new ConcurrentQueue<HostStateTransition>();
|
||||
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
|
||||
|
||||
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
|
||||
|
||||
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Running);
|
||||
transitions.Count.ShouldBe(0); // startup transition, operators don't care
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Running_to_Stopped_fires_StateChanged_with_both_states()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||
var transitions = new ConcurrentQueue<HostStateTransition>();
|
||||
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
|
||||
|
||||
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
|
||||
|
||||
transitions.Count.ShouldBe(1);
|
||||
transitions.TryDequeue(out var t).ShouldBeTrue();
|
||||
t!.TagName.ShouldBe("PlatformA");
|
||||
t.OldState.ShouldBe(HostRuntimeState.Running);
|
||||
t.NewState.ShouldBe(HostRuntimeState.Stopped);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Stopped_to_Running_fires_StateChanged_for_recovery()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||
var transitions = new ConcurrentQueue<HostStateTransition>();
|
||||
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
|
||||
|
||||
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Stopped→Running (fires)
|
||||
|
||||
transitions.Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||
var transitions = new ConcurrentQueue<HostStateTransition>();
|
||||
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
|
||||
|
||||
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||
// First callback is bad-quality — we must flag the host Stopped so operators see it.
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Bad());
|
||||
|
||||
transitions.Count.ShouldBe(1);
|
||||
transitions.TryDequeue(out var t).ShouldBeTrue();
|
||||
t!.OldState.ShouldBe(HostRuntimeState.Unknown);
|
||||
t.NewState.ShouldBe(HostRuntimeState.Stopped);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Repeated_Good_Running_callbacks_do_not_fire_duplicate_events()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||
var count = 0;
|
||||
mgr.StateChanged += (_, _) => Interlocked.Increment(ref count);
|
||||
|
||||
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||
for (var i = 0; i < 5; i++)
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
|
||||
|
||||
count.ShouldBe(0); // only the silent Unknown→Running on the first, no events after
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unknown_callback_for_non_tracked_probe_is_dropped()
|
||||
{
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||
|
||||
mgr.OnProbeCallback("ProbeForSomeoneElse.ScanState", Good(true));
|
||||
|
||||
mgr.ActiveProbeCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Snapshot_reports_current_state_for_every_tracked_host()
|
||||
{
|
||||
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var subs = new FakeSubscriber();
|
||||
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||
|
||||
await mgr.SyncAsync(new[]
|
||||
{
|
||||
new HostProbeTarget("PlatformA", 1),
|
||||
new HostProbeTarget("EngineB", 3),
|
||||
});
|
||||
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Running
|
||||
subs.Subs["EngineB.ScanState"]("EngineB.ScanState", Bad()); // Stopped
|
||||
|
||||
var snap = mgr.SnapshotStates();
|
||||
snap.Count.ShouldBe(2);
|
||||
snap.ShouldContain(s => s.TagName == "PlatformA" && s.State == HostRuntimeState.Running);
|
||||
snap.ShouldContain(s => s.TagName == "EngineB" && s.State == HostRuntimeState.Stopped);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids()
|
||||
{
|
||||
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryWinPlatform).IsRuntimeHost.ShouldBeTrue();
|
||||
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryAppEngine).IsRuntimeHost.ShouldBeTrue();
|
||||
new HostProbeTarget("X", 4 /* $Area */).IsRuntimeHost.ShouldBeFalse();
|
||||
new HostProbeTarget("X", 11 /* $ApplicationObject */).IsRuntimeHost.ShouldBeFalse();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class HistorianQualityMapperTests
|
||||
{
|
||||
/// <summary>
|
||||
/// Rich mapping preserves specific OPC DA subcodes through the historian ToWire path.
|
||||
/// Before PR 12 the category-only fallback collapsed e.g. BadNotConnected(8) to
|
||||
/// Bad(0x80000000) so downstream OPC UA clients could not distinguish transport issues
|
||||
/// from sensor issues. After PR 12 every known subcode round-trips to its canonical
|
||||
/// uint32 StatusCode and Proxy translation stays byte-for-byte with v1 QualityMapper.
|
||||
/// </summary>
|
||||
[Theory]
|
||||
[InlineData((byte)192, 0x00000000u)] // Good
|
||||
[InlineData((byte)216, 0x00D80000u)] // Good_LocalOverride
|
||||
[InlineData((byte)64, 0x40000000u)] // Uncertain
|
||||
[InlineData((byte)68, 0x40900000u)] // Uncertain_LastUsableValue
|
||||
[InlineData((byte)80, 0x40930000u)] // Uncertain_SensorNotAccurate
|
||||
[InlineData((byte)84, 0x40940000u)] // Uncertain_EngineeringUnitsExceeded
|
||||
[InlineData((byte)88, 0x40950000u)] // Uncertain_SubNormal
|
||||
[InlineData((byte)0, 0x80000000u)] // Bad
|
||||
[InlineData((byte)4, 0x80890000u)] // Bad_ConfigurationError
|
||||
[InlineData((byte)8, 0x808A0000u)] // Bad_NotConnected
|
||||
[InlineData((byte)12, 0x808B0000u)] // Bad_DeviceFailure
|
||||
[InlineData((byte)16, 0x808C0000u)] // Bad_SensorFailure
|
||||
[InlineData((byte)20, 0x80050000u)] // Bad_CommunicationError
|
||||
[InlineData((byte)24, 0x808D0000u)] // Bad_OutOfService
|
||||
[InlineData((byte)32, 0x80320000u)] // Bad_WaitingForInitialData
|
||||
public void Maps_specific_OPC_DA_codes_to_canonical_StatusCode(byte quality, uint expected)
|
||||
{
|
||||
HistorianQualityMapper.Map(quality).ShouldBe(expected);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData((byte)200)] // Good — unknown subcode in Good family
|
||||
[InlineData((byte)255)] // Good — unknown
|
||||
public void Unknown_good_family_codes_fall_back_to_plain_Good(byte q)
|
||||
{
|
||||
HistorianQualityMapper.Map(q).ShouldBe(0x00000000u);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData((byte)100)] // Uncertain — unknown subcode
|
||||
[InlineData((byte)150)] // Uncertain — unknown
|
||||
public void Unknown_uncertain_family_codes_fall_back_to_plain_Uncertain(byte q)
|
||||
{
|
||||
HistorianQualityMapper.Map(q).ShouldBe(0x40000000u);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData((byte)1)] // Bad — unknown subcode
|
||||
[InlineData((byte)50)] // Bad — unknown
|
||||
public void Unknown_bad_family_codes_fall_back_to_plain_Bad(byte q)
|
||||
{
|
||||
HistorianQualityMapper.Map(q).ShouldBe(0x80000000u);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,129 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class HistoryReadEventsTests
|
||||
{
|
||||
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? h, StaPump pump) =>
|
||||
new(
|
||||
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||
new MxAccessClient(pump, new MxProxyAdapter(), "events-test"),
|
||||
h);
|
||||
|
||||
[Fact]
|
||||
public async Task Returns_disabled_error_when_no_historian_configured()
|
||||
{
|
||||
using var pump = new StaPump("Test.Sta");
|
||||
await pump.WaitForStartedAsync();
|
||||
using var backend = BuildBackend(null, pump);
|
||||
|
||||
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||
{
|
||||
SourceName = "TankA",
|
||||
StartUtcUnixMs = 0,
|
||||
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
MaxEvents = 100,
|
||||
}, CancellationToken.None);
|
||||
|
||||
resp.Success.ShouldBeFalse();
|
||||
resp.Error.ShouldContain("Historian disabled");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Maps_HistorianEventDto_to_GalaxyHistoricalEvent_wire_shape()
|
||||
{
|
||||
using var pump = new StaPump("Test.Sta");
|
||||
await pump.WaitForStartedAsync();
|
||||
|
||||
var eventId = Guid.NewGuid();
|
||||
var eventTime = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||
var receivedTime = eventTime.AddMilliseconds(150);
|
||||
var fake = new FakeHistorian(new HistorianEventDto
|
||||
{
|
||||
Id = eventId,
|
||||
Source = "TankA.Level.HiHi",
|
||||
EventTime = eventTime,
|
||||
ReceivedTime = receivedTime,
|
||||
DisplayText = "HiHi alarm tripped",
|
||||
Severity = 900,
|
||||
});
|
||||
using var backend = BuildBackend(fake, pump);
|
||||
|
||||
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||
{
|
||||
SourceName = "TankA.Level.HiHi",
|
||||
StartUtcUnixMs = 0,
|
||||
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
MaxEvents = 50,
|
||||
}, CancellationToken.None);
|
||||
|
||||
resp.Success.ShouldBeTrue();
|
||||
resp.Events.Length.ShouldBe(1);
|
||||
var got = resp.Events[0];
|
||||
got.EventId.ShouldBe(eventId.ToString());
|
||||
got.SourceName.ShouldBe("TankA.Level.HiHi");
|
||||
got.DisplayText.ShouldBe("HiHi alarm tripped");
|
||||
got.Severity.ShouldBe<ushort>(900);
|
||||
got.EventTimeUtcUnixMs.ShouldBe(new DateTimeOffset(eventTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||
got.ReceivedTimeUtcUnixMs.ShouldBe(new DateTimeOffset(receivedTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||
|
||||
fake.LastSourceName.ShouldBe("TankA.Level.HiHi");
|
||||
fake.LastMaxEvents.ShouldBe(50);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Null_source_name_is_passed_through_as_all_sources()
|
||||
{
|
||||
using var pump = new StaPump("Test.Sta");
|
||||
await pump.WaitForStartedAsync();
|
||||
var fake = new FakeHistorian();
|
||||
using var backend = BuildBackend(fake, pump);
|
||||
|
||||
await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||
{
|
||||
SourceName = null,
|
||||
StartUtcUnixMs = 0,
|
||||
EndUtcUnixMs = 1,
|
||||
MaxEvents = 10,
|
||||
}, CancellationToken.None);
|
||||
|
||||
fake.LastSourceName.ShouldBeNull();
|
||||
}
|
||||
|
||||
private sealed class FakeHistorian : IHistorianDataSource
|
||||
{
|
||||
private readonly HistorianEventDto[] _events;
|
||||
public string? LastSourceName { get; private set; } = "<unset>";
|
||||
public int LastMaxEvents { get; private set; }
|
||||
|
||||
public FakeHistorian(params HistorianEventDto[] events) => _events = events;
|
||||
|
||||
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||
{
|
||||
LastSourceName = src;
|
||||
LastMaxEvents = max;
|
||||
return Task.FromResult(new List<HistorianEventDto>(_events));
|
||||
}
|
||||
|
||||
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||
=> Task.FromResult(new List<HistorianSample>());
|
||||
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
|
||||
=> Task.FromResult(new List<HistorianAggregateSample>());
|
||||
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
|
||||
=> Task.FromResult(new List<HistorianSample>());
|
||||
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
||||
public void Dispose() { }
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user