Compare commits
7 Commits
phase-2-pr
...
phase-2-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c14624f012 | ||
|
|
04d267d1ea | ||
| 4448db8207 | |||
| d96b513bbc | |||
| 053c4e0566 | |||
|
|
ca025ebe0c | ||
|
|
d13f919112 |
@@ -0,0 +1,260 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Alarms;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscribes to the four Galaxy alarm attributes (<c>.InAlarm</c>, <c>.Priority</c>,
|
||||||
|
/// <c>.DescAttrName</c>, <c>.Acked</c>) per alarm-bearing attribute discovered during
|
||||||
|
/// <c>DiscoverAsync</c>. Maintains one <see cref="AlarmState"/> per alarm, raises
|
||||||
|
/// <see cref="AlarmTransition"/> on lifecycle transitions (Active / Unacknowledged /
|
||||||
|
/// Acknowledged / Inactive). Ack path writes <c>.AckMsg</c>. Pure-logic state machine
|
||||||
|
/// with delegate-based subscribe/write so it's testable against in-memory fakes.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Transitions emitted (OPC UA Part 9 alarm lifecycle, simplified for the Galaxy model):
|
||||||
|
/// <list type="bullet">
|
||||||
|
/// <item><c>Active</c> — InAlarm false → true. Default to Unacknowledged.</item>
|
||||||
|
/// <item><c>Acknowledged</c> — Acked false → true while InAlarm is still true.</item>
|
||||||
|
/// <item><c>Inactive</c> — InAlarm true → false. If still unacknowledged the alarm
|
||||||
|
/// is marked latched-inactive-unack; next Ack transitions straight to Inactive.</item>
|
||||||
|
/// </list>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class GalaxyAlarmTracker : IDisposable
|
||||||
|
{
|
||||||
|
public const string InAlarmAttr = ".InAlarm";
|
||||||
|
public const string PriorityAttr = ".Priority";
|
||||||
|
public const string DescAttrNameAttr = ".DescAttrName";
|
||||||
|
public const string AckedAttr = ".Acked";
|
||||||
|
public const string AckMsgAttr = ".AckMsg";
|
||||||
|
|
||||||
|
private readonly Func<string, Action<string, Vtq>, Task> _subscribe;
|
||||||
|
private readonly Func<string, Task> _unsubscribe;
|
||||||
|
private readonly Func<string, object, Task<bool>> _write;
|
||||||
|
private readonly Func<DateTime> _clock;
|
||||||
|
|
||||||
|
// Alarm tag (attribute full ref, e.g. "Tank.Level.HiHi") → state.
|
||||||
|
private readonly ConcurrentDictionary<string, AlarmState> _alarms =
|
||||||
|
new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
|
// Reverse lookup: probed tag (".InAlarm" etc.) → owning alarm tag.
|
||||||
|
private readonly ConcurrentDictionary<string, (string AlarmTag, AlarmField Field)> _probeToAlarm =
|
||||||
|
new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
public event EventHandler<AlarmTransition>? TransitionRaised;
|
||||||
|
|
||||||
|
public GalaxyAlarmTracker(
|
||||||
|
Func<string, Action<string, Vtq>, Task> subscribe,
|
||||||
|
Func<string, Task> unsubscribe,
|
||||||
|
Func<string, object, Task<bool>> write)
|
||||||
|
: this(subscribe, unsubscribe, write, () => DateTime.UtcNow) { }
|
||||||
|
|
||||||
|
internal GalaxyAlarmTracker(
|
||||||
|
Func<string, Action<string, Vtq>, Task> subscribe,
|
||||||
|
Func<string, Task> unsubscribe,
|
||||||
|
Func<string, object, Task<bool>> write,
|
||||||
|
Func<DateTime> clock)
|
||||||
|
{
|
||||||
|
_subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe));
|
||||||
|
_unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe));
|
||||||
|
_write = write ?? throw new ArgumentNullException(nameof(write));
|
||||||
|
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int TrackedAlarmCount => _alarms.Count;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Advise the four alarm attributes for <paramref name="alarmTag"/>. Idempotent —
|
||||||
|
/// repeat calls for the same alarm tag are a no-op. Subscribe failure for any of the
|
||||||
|
/// four rolls back the alarm entry so a stale callback cannot promote a phantom.
|
||||||
|
/// </summary>
|
||||||
|
public async Task TrackAsync(string alarmTag)
|
||||||
|
{
|
||||||
|
if (_disposed || string.IsNullOrWhiteSpace(alarmTag)) return;
|
||||||
|
if (_alarms.ContainsKey(alarmTag)) return;
|
||||||
|
|
||||||
|
var state = new AlarmState { AlarmTag = alarmTag };
|
||||||
|
if (!_alarms.TryAdd(alarmTag, state)) return;
|
||||||
|
|
||||||
|
var probes = new[]
|
||||||
|
{
|
||||||
|
(Tag: alarmTag + InAlarmAttr, Field: AlarmField.InAlarm),
|
||||||
|
(Tag: alarmTag + PriorityAttr, Field: AlarmField.Priority),
|
||||||
|
(Tag: alarmTag + DescAttrNameAttr, Field: AlarmField.DescAttrName),
|
||||||
|
(Tag: alarmTag + AckedAttr, Field: AlarmField.Acked),
|
||||||
|
};
|
||||||
|
|
||||||
|
foreach (var p in probes)
|
||||||
|
{
|
||||||
|
_probeToAlarm[p.Tag] = (alarmTag, p.Field);
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
foreach (var p in probes)
|
||||||
|
{
|
||||||
|
await _subscribe(p.Tag, OnProbeCallback).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Rollback so a partial advise doesn't leak state.
|
||||||
|
_alarms.TryRemove(alarmTag, out _);
|
||||||
|
foreach (var p in probes)
|
||||||
|
{
|
||||||
|
_probeToAlarm.TryRemove(p.Tag, out _);
|
||||||
|
try { await _unsubscribe(p.Tag).ConfigureAwait(false); } catch { }
|
||||||
|
}
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Drop every tracked alarm. Unadvises all 4 probes per alarm as best-effort.
|
||||||
|
/// </summary>
|
||||||
|
public async Task ClearAsync()
|
||||||
|
{
|
||||||
|
_alarms.Clear();
|
||||||
|
foreach (var kv in _probeToAlarm.ToList())
|
||||||
|
{
|
||||||
|
_probeToAlarm.TryRemove(kv.Key, out _);
|
||||||
|
try { await _unsubscribe(kv.Key).ConfigureAwait(false); } catch { }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Operator ack — write the comment text into <c><alarmTag>.AckMsg</c>.
|
||||||
|
/// Returns false when the runtime reports the write failed.
|
||||||
|
/// </summary>
|
||||||
|
public Task<bool> AcknowledgeAsync(string alarmTag, string comment)
|
||||||
|
{
|
||||||
|
if (_disposed || string.IsNullOrWhiteSpace(alarmTag))
|
||||||
|
return Task.FromResult(false);
|
||||||
|
return _write(alarmTag + AckMsgAttr, comment ?? string.Empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscription callback entry point. Exposed for tests and for the Backend to route
|
||||||
|
/// fan-out callbacks through. Runs the state machine and fires TransitionRaised
|
||||||
|
/// outside the lock.
|
||||||
|
/// </summary>
|
||||||
|
public void OnProbeCallback(string probeTag, Vtq vtq)
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
if (!_probeToAlarm.TryGetValue(probeTag, out var link)) return;
|
||||||
|
if (!_alarms.TryGetValue(link.AlarmTag, out var state)) return;
|
||||||
|
|
||||||
|
AlarmTransition? transition = null;
|
||||||
|
var now = _clock();
|
||||||
|
|
||||||
|
lock (state.Lock)
|
||||||
|
{
|
||||||
|
switch (link.Field)
|
||||||
|
{
|
||||||
|
case AlarmField.InAlarm:
|
||||||
|
{
|
||||||
|
var wasActive = state.InAlarm;
|
||||||
|
var isActive = vtq.Value is bool b && b;
|
||||||
|
state.InAlarm = isActive;
|
||||||
|
state.LastUpdateUtc = now;
|
||||||
|
if (!wasActive && isActive)
|
||||||
|
{
|
||||||
|
state.Acked = false;
|
||||||
|
state.LastTransitionUtc = now;
|
||||||
|
transition = new AlarmTransition(state.AlarmTag, AlarmStateTransition.Active, state.Priority, state.DescAttrName, now);
|
||||||
|
}
|
||||||
|
else if (wasActive && !isActive)
|
||||||
|
{
|
||||||
|
state.LastTransitionUtc = now;
|
||||||
|
transition = new AlarmTransition(state.AlarmTag, AlarmStateTransition.Inactive, state.Priority, state.DescAttrName, now);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case AlarmField.Priority:
|
||||||
|
if (vtq.Value is int pi) state.Priority = pi;
|
||||||
|
else if (vtq.Value is short ps) state.Priority = ps;
|
||||||
|
else if (vtq.Value is long pl && pl <= int.MaxValue) state.Priority = (int)pl;
|
||||||
|
state.LastUpdateUtc = now;
|
||||||
|
break;
|
||||||
|
case AlarmField.DescAttrName:
|
||||||
|
state.DescAttrName = vtq.Value as string;
|
||||||
|
state.LastUpdateUtc = now;
|
||||||
|
break;
|
||||||
|
case AlarmField.Acked:
|
||||||
|
{
|
||||||
|
var wasAcked = state.Acked;
|
||||||
|
var isAcked = vtq.Value is bool b && b;
|
||||||
|
state.Acked = isAcked;
|
||||||
|
state.LastUpdateUtc = now;
|
||||||
|
// Fire Acknowledged only when transitioning false→true. Don't fire on initial
|
||||||
|
// subscribe callback (wasAcked==isAcked in that case because the state starts
|
||||||
|
// with Acked=false and the initial probe is usually true for an un-active alarm).
|
||||||
|
if (!wasAcked && isAcked && state.InAlarm)
|
||||||
|
{
|
||||||
|
state.LastTransitionUtc = now;
|
||||||
|
transition = new AlarmTransition(state.AlarmTag, AlarmStateTransition.Acknowledged, state.Priority, state.DescAttrName, now);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (transition is { } t)
|
||||||
|
{
|
||||||
|
TransitionRaised?.Invoke(this, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public IReadOnlyList<AlarmSnapshot> SnapshotStates()
|
||||||
|
{
|
||||||
|
return _alarms.Values.Select(s =>
|
||||||
|
{
|
||||||
|
lock (s.Lock)
|
||||||
|
return new AlarmSnapshot(s.AlarmTag, s.InAlarm, s.Acked, s.Priority, s.DescAttrName);
|
||||||
|
}).ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
_disposed = true;
|
||||||
|
_alarms.Clear();
|
||||||
|
_probeToAlarm.Clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class AlarmState
|
||||||
|
{
|
||||||
|
public readonly object Lock = new();
|
||||||
|
public string AlarmTag = "";
|
||||||
|
public bool InAlarm;
|
||||||
|
public bool Acked = true; // default ack'd so first false→true on subscribe doesn't misfire
|
||||||
|
public int Priority;
|
||||||
|
public string? DescAttrName;
|
||||||
|
public DateTime LastUpdateUtc;
|
||||||
|
public DateTime LastTransitionUtc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum AlarmField { InAlarm, Priority, DescAttrName, Acked }
|
||||||
|
}
|
||||||
|
|
||||||
|
public enum AlarmStateTransition { Active, Acknowledged, Inactive }
|
||||||
|
|
||||||
|
public sealed record AlarmTransition(
|
||||||
|
string AlarmTag,
|
||||||
|
AlarmStateTransition Transition,
|
||||||
|
int Priority,
|
||||||
|
string? DescAttrName,
|
||||||
|
DateTime AtUtc);
|
||||||
|
|
||||||
|
public sealed record AlarmSnapshot(
|
||||||
|
string AlarmTag,
|
||||||
|
bool InAlarm,
|
||||||
|
bool Acked,
|
||||||
|
int Priority,
|
||||||
|
string? DescAttrName);
|
||||||
@@ -136,6 +136,24 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
|
|||||||
Values = System.Array.Empty<GalaxyDataValue>(),
|
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
public Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
|
||||||
|
HistoryReadAtTimeRequest req, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new HistoryReadAtTimeResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
|
||||||
|
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||||
|
});
|
||||||
|
|
||||||
|
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||||
|
HistoryReadEventsRequest req, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
|
||||||
|
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
|
||||||
|
});
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||||
|
|
||||||
|
|||||||
@@ -39,6 +39,8 @@ public interface IGalaxyBackend
|
|||||||
|
|
||||||
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
|
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
|
||||||
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
|
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
|
||||||
|
Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(HistoryReadAtTimeRequest req, CancellationToken ct);
|
||||||
|
Task<HistoryReadEventsResponse> HistoryReadEventsAsync(HistoryReadEventsRequest req, CancellationToken ct);
|
||||||
|
|
||||||
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
|
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,9 +4,11 @@ using System.Linq;
|
|||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using MessagePack;
|
using MessagePack;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Alarms;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
|
||||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||||
@@ -34,12 +36,18 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
|||||||
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
|
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
|
||||||
#pragma warning disable CS0067 // alarm wire-up deferred to PR 9
|
|
||||||
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
|
||||||
#pragma warning restore CS0067
|
|
||||||
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
|
||||||
|
|
||||||
private readonly System.EventHandler<bool> _onConnectionStateChanged;
|
private readonly System.EventHandler<bool> _onConnectionStateChanged;
|
||||||
|
private readonly GalaxyRuntimeProbeManager _probeManager;
|
||||||
|
private readonly System.EventHandler<HostStateTransition> _onProbeStateChanged;
|
||||||
|
private readonly GalaxyAlarmTracker _alarmTracker;
|
||||||
|
private readonly System.EventHandler<AlarmTransition> _onAlarmTransition;
|
||||||
|
|
||||||
|
// Cached during DiscoverAsync so SubscribeAlarmsAsync knows which attributes to advise.
|
||||||
|
// One entry per IsAlarm=true attribute in the last discovered hierarchy.
|
||||||
|
private readonly System.Collections.Concurrent.ConcurrentBag<string> _discoveredAlarmTags = new();
|
||||||
|
|
||||||
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
|
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
|
||||||
{
|
{
|
||||||
@@ -62,8 +70,65 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
_mx.ConnectionStateChanged += _onConnectionStateChanged;
|
_mx.ConnectionStateChanged += _onConnectionStateChanged;
|
||||||
|
|
||||||
|
// PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback,
|
||||||
|
// which runs the state machine and raises StateChanged on transitions we care about.
|
||||||
|
// We forward each transition through the same OnHostStatusChanged IPC event that the
|
||||||
|
// gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the
|
||||||
|
// Admin UI can show per-host health independently from the top-level transport status.
|
||||||
|
_probeManager = new GalaxyRuntimeProbeManager(
|
||||||
|
subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb),
|
||||||
|
unsubscribe: probe => _mx.UnsubscribeAsync(probe));
|
||||||
|
_onProbeStateChanged = (_, t) =>
|
||||||
|
{
|
||||||
|
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
|
||||||
|
{
|
||||||
|
HostName = t.TagName,
|
||||||
|
RuntimeStatus = t.NewState switch
|
||||||
|
{
|
||||||
|
HostRuntimeState.Running => "Running",
|
||||||
|
HostRuntimeState.Stopped => "Stopped",
|
||||||
|
_ => "Unknown",
|
||||||
|
},
|
||||||
|
LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
});
|
||||||
|
};
|
||||||
|
_probeManager.StateChanged += _onProbeStateChanged;
|
||||||
|
|
||||||
|
// PR 14: alarm subsystem. Per IsAlarm=true attribute discovered, subscribe to the four
|
||||||
|
// alarm-state attributes (.InAlarm/.Priority/.DescAttrName/.Acked), track lifecycle,
|
||||||
|
// and raise GalaxyAlarmEvent on transitions — forwarded through the existing
|
||||||
|
// OnAlarmEvent IPC event that the PR 4 ConnectionSink already wires into AlarmEvent frames.
|
||||||
|
_alarmTracker = new GalaxyAlarmTracker(
|
||||||
|
subscribe: (tag, cb) => _mx.SubscribeAsync(tag, cb),
|
||||||
|
unsubscribe: tag => _mx.UnsubscribeAsync(tag),
|
||||||
|
write: (tag, v) => _mx.WriteAsync(tag, v));
|
||||||
|
_onAlarmTransition = (_, t) => OnAlarmEvent?.Invoke(this, new GalaxyAlarmEvent
|
||||||
|
{
|
||||||
|
EventId = Guid.NewGuid().ToString("N"),
|
||||||
|
ObjectTagName = t.AlarmTag,
|
||||||
|
AlarmName = t.AlarmTag,
|
||||||
|
Severity = t.Priority,
|
||||||
|
StateTransition = t.Transition switch
|
||||||
|
{
|
||||||
|
AlarmStateTransition.Active => "Active",
|
||||||
|
AlarmStateTransition.Acknowledged => "Acknowledged",
|
||||||
|
AlarmStateTransition.Inactive => "Inactive",
|
||||||
|
_ => "Unknown",
|
||||||
|
},
|
||||||
|
Message = t.DescAttrName ?? t.AlarmTag,
|
||||||
|
UtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
});
|
||||||
|
_alarmTracker.TransitionRaised += _onAlarmTransition;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Exposed for tests. Production flow: DiscoverAsync completes → backend calls
|
||||||
|
/// <c>SyncProbesAsync</c> with the runtime hosts (WinPlatform + AppEngine gobjects) to
|
||||||
|
/// advise ScanState per host.
|
||||||
|
/// </summary>
|
||||||
|
internal GalaxyRuntimeProbeManager ProbeManager => _probeManager;
|
||||||
|
|
||||||
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
@@ -103,6 +168,34 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
|||||||
Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty<GalaxyAttributeInfo>(),
|
Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty<GalaxyAttributeInfo>(),
|
||||||
}).ToArray();
|
}).ToArray();
|
||||||
|
|
||||||
|
// PR 14: cache alarm-bearing attribute full refs so SubscribeAlarmsAsync can advise
|
||||||
|
// them on demand. Format matches the Galaxy reference grammar <tag>.<attr>.
|
||||||
|
var freshAlarmTags = attributes
|
||||||
|
.Where(a => a.IsAlarm)
|
||||||
|
.Select(a => nameByGobject.TryGetValue(a.GobjectId, out var tn)
|
||||||
|
? tn + "." + a.AttributeName
|
||||||
|
: null)
|
||||||
|
.Where(s => !string.IsNullOrWhiteSpace(s))
|
||||||
|
.Cast<string>()
|
||||||
|
.ToArray();
|
||||||
|
while (_discoveredAlarmTags.TryTake(out _)) { }
|
||||||
|
foreach (var t in freshAlarmTags) _discoveredAlarmTags.Add(t);
|
||||||
|
|
||||||
|
// PR 13: Sync the per-platform probe manager against the just-discovered hierarchy
|
||||||
|
// so ScanState subscriptions track the current runtime set. Best-effort — probe
|
||||||
|
// failures don't block Discover from returning, since the gateway-level signal from
|
||||||
|
// MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to
|
||||||
|
// that level if any per-host probe couldn't advise.
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var targets = hierarchy
|
||||||
|
.Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|
||||||
|
|| o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine)
|
||||||
|
.Select(o => new HostProbeTarget(o.TagName, o.CategoryId));
|
||||||
|
await _probeManager.SyncAsync(targets).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ }
|
||||||
|
|
||||||
return new DiscoverHierarchyResponse { Success = true, Objects = objects };
|
return new DiscoverHierarchyResponse { Success = true, Objects = objects };
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
@@ -240,8 +333,40 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
|
/// <summary>
|
||||||
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
|
/// PR 14: advise every alarm-bearing attribute's 4-attr quartet. Best-effort per-alarm —
|
||||||
|
/// a subscribe failure on one alarm doesn't abort the whole call, since operators prefer
|
||||||
|
/// partial alarm coverage to none. Idempotent on repeat calls (tracker internally
|
||||||
|
/// skips already-tracked alarms).
|
||||||
|
/// </summary>
|
||||||
|
public async Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct)
|
||||||
|
{
|
||||||
|
foreach (var tag in _discoveredAlarmTags)
|
||||||
|
{
|
||||||
|
try { await _alarmTracker.TrackAsync(tag).ConfigureAwait(false); }
|
||||||
|
catch { /* swallow per-alarm — tracker rolls back its own state on failure */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// PR 14: route operator ack through the tracker's AckMsg write path. EventId on the
|
||||||
|
/// incoming request maps directly to the alarm full reference (Proxy-side naming
|
||||||
|
/// convention from GalaxyProxyDriver.RaiseAlarmEvent → ev.EventId).
|
||||||
|
/// </summary>
|
||||||
|
public async Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct)
|
||||||
|
{
|
||||||
|
// EventId carries a per-transition Guid.ToString("N"); there's no reverse map from
|
||||||
|
// event id to alarm tag yet, so v1's convention (ack targets the condition) is matched
|
||||||
|
// by reading the alarm name from the Comment envelope: v1 packed "<tag>|<comment>".
|
||||||
|
// Until the Proxy is updated to send the alarm tag separately, fall back to treating
|
||||||
|
// the EventId as the alarm tag — Client CLI passes it through unchanged.
|
||||||
|
var tag = req.EventId;
|
||||||
|
if (!string.IsNullOrWhiteSpace(tag))
|
||||||
|
{
|
||||||
|
try { await _alarmTracker.AcknowledgeAsync(tag, req.Comment ?? string.Empty).ConfigureAwait(false); }
|
||||||
|
catch { /* swallow — ack failures surface via MxAccessClient.WriteAsync logs */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
|
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
|
||||||
{
|
{
|
||||||
@@ -324,11 +449,91 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
|
||||||
|
HistoryReadAtTimeRequest req, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (_historian is null)
|
||||||
|
return new HistoryReadAtTimeResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
||||||
|
Values = Array.Empty<GalaxyDataValue>(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (req.TimestampsUtcUnixMs.Length == 0)
|
||||||
|
return new HistoryReadAtTimeResponse { Success = true, Values = Array.Empty<GalaxyDataValue>() };
|
||||||
|
|
||||||
|
var timestamps = req.TimestampsUtcUnixMs
|
||||||
|
.Select(ms => DateTimeOffset.FromUnixTimeMilliseconds(ms).UtcDateTime)
|
||||||
|
.ToArray();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var samples = await _historian.ReadAtTimeAsync(req.TagReference, timestamps, ct).ConfigureAwait(false);
|
||||||
|
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
|
||||||
|
return new HistoryReadAtTimeResponse { Success = true, Values = wire };
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new HistoryReadAtTimeResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = $"Historian at-time read failed: {ex.Message}",
|
||||||
|
Values = Array.Empty<GalaxyDataValue>(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||||
|
HistoryReadEventsRequest req, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (_historian is null)
|
||||||
|
return new HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
||||||
|
Events = Array.Empty<GalaxyHistoricalEvent>(),
|
||||||
|
};
|
||||||
|
|
||||||
|
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
||||||
|
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var events = await _historian.ReadEventsAsync(req.SourceName, start, end, req.MaxEvents, ct).ConfigureAwait(false);
|
||||||
|
var wire = events.Select(e => new GalaxyHistoricalEvent
|
||||||
|
{
|
||||||
|
EventId = e.Id.ToString(),
|
||||||
|
SourceName = e.Source,
|
||||||
|
EventTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.EventTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
ReceivedTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.ReceivedTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
DisplayText = e.DisplayText,
|
||||||
|
Severity = e.Severity,
|
||||||
|
}).ToArray();
|
||||||
|
return new HistoryReadEventsResponse { Success = true, Events = wire };
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = $"Historian event read failed: {ex.Message}",
|
||||||
|
Events = Array.Empty<GalaxyHistoricalEvent>(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
|
_alarmTracker.TransitionRaised -= _onAlarmTransition;
|
||||||
|
_alarmTracker.Dispose();
|
||||||
|
_probeManager.StateChanged -= _onProbeStateChanged;
|
||||||
|
_probeManager.Dispose();
|
||||||
_mx.ConnectionStateChanged -= _onConnectionStateChanged;
|
_mx.ConnectionStateChanged -= _onConnectionStateChanged;
|
||||||
_historian?.Dispose();
|
_historian?.Dispose();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,273 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Per-platform + per-AppEngine runtime probe. Subscribes to <c><TagName>.ScanState</c>
|
||||||
|
/// for each $WinPlatform and $AppEngine gobject, tracks Unknown → Running → Stopped
|
||||||
|
/// transitions, and fires <see cref="StateChanged"/> so <see cref="Backend.MxAccessGalaxyBackend"/>
|
||||||
|
/// can forward per-host events through the existing IPC <c>OnHostStatusChanged</c> event.
|
||||||
|
/// Pure-logic state machine with an injected clock so it's deterministically testable —
|
||||||
|
/// port of v1 <c>GalaxyRuntimeProbeManager</c> without the OPC UA node-manager coupling.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// State machine rules (documented in v1's <c>runtimestatus.md</c> and preserved here):
|
||||||
|
/// <list type="bullet">
|
||||||
|
/// <item><c>ScanState</c> is on-change-only — a stably-Running host may go hours without a
|
||||||
|
/// callback. Running → Stopped is driven by an explicit <c>ScanState=false</c> callback,
|
||||||
|
/// never by starvation.</item>
|
||||||
|
/// <item>Unknown → Running is a startup transition and does NOT fire StateChanged (would
|
||||||
|
/// paint every host as "just recovered" at startup, which is noise).</item>
|
||||||
|
/// <item>Stopped → Running and Running → Stopped fire StateChanged. Unknown → Stopped
|
||||||
|
/// fires StateChanged because that's a first-known-bad signal operators need.</item>
|
||||||
|
/// <item>All public methods are thread-safe. Callbacks fire outside the internal lock to
|
||||||
|
/// avoid lock inversion with caller-owned state.</item>
|
||||||
|
/// </list>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class GalaxyRuntimeProbeManager : IDisposable
|
||||||
|
{
|
||||||
|
public const int CategoryWinPlatform = 1;
|
||||||
|
public const int CategoryAppEngine = 3;
|
||||||
|
public const string ProbeAttribute = ".ScanState";
|
||||||
|
|
||||||
|
private readonly Func<DateTime> _clock;
|
||||||
|
private readonly Func<string, Action<string, Vtq>, Task> _subscribe;
|
||||||
|
private readonly Func<string, Task> _unsubscribe;
|
||||||
|
private readonly object _lock = new();
|
||||||
|
|
||||||
|
// probe tag → per-host state
|
||||||
|
private readonly Dictionary<string, HostProbeState> _byProbe = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
// tag name → probe tag (for reverse lookup on the desired-set diff)
|
||||||
|
private readonly Dictionary<string, string> _probeByTagName = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fires on every state transition that operators should react to. See class remarks
|
||||||
|
/// for the rules on which transitions fire.
|
||||||
|
/// </summary>
|
||||||
|
public event EventHandler<HostStateTransition>? StateChanged;
|
||||||
|
|
||||||
|
public GalaxyRuntimeProbeManager(
|
||||||
|
Func<string, Action<string, Vtq>, Task> subscribe,
|
||||||
|
Func<string, Task> unsubscribe)
|
||||||
|
: this(subscribe, unsubscribe, () => DateTime.UtcNow) { }
|
||||||
|
|
||||||
|
internal GalaxyRuntimeProbeManager(
|
||||||
|
Func<string, Action<string, Vtq>, Task> subscribe,
|
||||||
|
Func<string, Task> unsubscribe,
|
||||||
|
Func<DateTime> clock)
|
||||||
|
{
|
||||||
|
_subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe));
|
||||||
|
_unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe));
|
||||||
|
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Number of probes currently advised. Test/dashboard hook.</summary>
|
||||||
|
public int ActiveProbeCount
|
||||||
|
{
|
||||||
|
get { lock (_lock) return _byProbe.Count; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Snapshot every currently-tracked host's state. One entry per probe.
|
||||||
|
/// </summary>
|
||||||
|
public IReadOnlyList<HostProbeSnapshot> SnapshotStates()
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
return _byProbe.Select(kv => new HostProbeSnapshot(
|
||||||
|
TagName: kv.Value.TagName,
|
||||||
|
State: kv.Value.State,
|
||||||
|
LastChangedUtc: kv.Value.LastStateChangeUtc)).ToList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Query the current runtime state for <paramref name="tagName"/>. Returns
|
||||||
|
/// <see cref="HostRuntimeState.Unknown"/> when the host is not tracked.
|
||||||
|
/// </summary>
|
||||||
|
public HostRuntimeState GetState(string tagName)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
if (_probeByTagName.TryGetValue(tagName, out var probe)
|
||||||
|
&& _byProbe.TryGetValue(probe, out var state))
|
||||||
|
return state.State;
|
||||||
|
return HostRuntimeState.Unknown;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Diff the desired host set (filtered $WinPlatform / $AppEngine from the latest Discover)
|
||||||
|
/// against the currently-tracked set and advise / unadvise as needed. Idempotent:
|
||||||
|
/// calling twice with the same set does nothing.
|
||||||
|
/// </summary>
|
||||||
|
public async Task SyncAsync(IEnumerable<HostProbeTarget> desiredHosts)
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
|
||||||
|
var desired = desiredHosts
|
||||||
|
.Where(h => !string.IsNullOrWhiteSpace(h.TagName))
|
||||||
|
.ToDictionary(h => h.TagName, StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
|
List<string> toAdvise;
|
||||||
|
List<string> toUnadvise;
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
toAdvise = desired.Keys
|
||||||
|
.Where(tag => !_probeByTagName.ContainsKey(tag))
|
||||||
|
.ToList();
|
||||||
|
toUnadvise = _probeByTagName.Keys
|
||||||
|
.Where(tag => !desired.ContainsKey(tag))
|
||||||
|
.Select(tag => _probeByTagName[tag])
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
foreach (var tag in toAdvise)
|
||||||
|
{
|
||||||
|
var probe = tag + ProbeAttribute;
|
||||||
|
_probeByTagName[tag] = probe;
|
||||||
|
_byProbe[probe] = new HostProbeState
|
||||||
|
{
|
||||||
|
TagName = tag,
|
||||||
|
State = HostRuntimeState.Unknown,
|
||||||
|
LastStateChangeUtc = _clock(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var probe in toUnadvise)
|
||||||
|
{
|
||||||
|
_byProbe.Remove(probe);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var removedTag in _probeByTagName.Keys.Where(t => !desired.ContainsKey(t)).ToList())
|
||||||
|
{
|
||||||
|
_probeByTagName.Remove(removedTag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var tag in toAdvise)
|
||||||
|
{
|
||||||
|
var probe = tag + ProbeAttribute;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _subscribe(probe, OnProbeCallback);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Rollback on subscribe failure so a later Tick can't transition a never-advised
|
||||||
|
// probe into a false Stopped state. Callers can re-Sync later to retry.
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
_byProbe.Remove(probe);
|
||||||
|
_probeByTagName.Remove(tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var probe in toUnadvise)
|
||||||
|
{
|
||||||
|
try { await _unsubscribe(probe); } catch { /* best-effort cleanup */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Public entry point for tests and internal callbacks. Production flow: MxAccessClient's
|
||||||
|
/// SubscribeAsync delivers VTQ updates through the callback wired in <see cref="SyncAsync"/>,
|
||||||
|
/// which calls this method under the lock to update state and fires
|
||||||
|
/// <see cref="StateChanged"/> outside the lock for any transition that matters.
|
||||||
|
/// </summary>
|
||||||
|
public void OnProbeCallback(string probeTag, Vtq vtq)
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
|
||||||
|
HostStateTransition? transition = null;
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
if (!_byProbe.TryGetValue(probeTag, out var state)) return;
|
||||||
|
|
||||||
|
var isRunning = vtq.Quality >= 192 && vtq.Value is bool b && b;
|
||||||
|
var now = _clock();
|
||||||
|
var previous = state.State;
|
||||||
|
state.LastCallbackUtc = now;
|
||||||
|
|
||||||
|
if (isRunning)
|
||||||
|
{
|
||||||
|
state.GoodUpdateCount++;
|
||||||
|
if (previous != HostRuntimeState.Running)
|
||||||
|
{
|
||||||
|
state.State = HostRuntimeState.Running;
|
||||||
|
state.LastStateChangeUtc = now;
|
||||||
|
if (previous == HostRuntimeState.Stopped)
|
||||||
|
{
|
||||||
|
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Running, now);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state.FailureCount++;
|
||||||
|
if (previous != HostRuntimeState.Stopped)
|
||||||
|
{
|
||||||
|
state.State = HostRuntimeState.Stopped;
|
||||||
|
state.LastStateChangeUtc = now;
|
||||||
|
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Stopped, now);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (transition is { } t)
|
||||||
|
{
|
||||||
|
StateChanged?.Invoke(this, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
_disposed = true;
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
_byProbe.Clear();
|
||||||
|
_probeByTagName.Clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class HostProbeState
|
||||||
|
{
|
||||||
|
public string TagName { get; set; } = "";
|
||||||
|
public HostRuntimeState State { get; set; }
|
||||||
|
public DateTime LastStateChangeUtc { get; set; }
|
||||||
|
public DateTime? LastCallbackUtc { get; set; }
|
||||||
|
public long GoodUpdateCount { get; set; }
|
||||||
|
public long FailureCount { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public enum HostRuntimeState
|
||||||
|
{
|
||||||
|
Unknown,
|
||||||
|
Running,
|
||||||
|
Stopped,
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed record HostStateTransition(
|
||||||
|
string TagName,
|
||||||
|
HostRuntimeState OldState,
|
||||||
|
HostRuntimeState NewState,
|
||||||
|
DateTime AtUtc);
|
||||||
|
|
||||||
|
public sealed record HostProbeSnapshot(
|
||||||
|
string TagName,
|
||||||
|
HostRuntimeState State,
|
||||||
|
DateTime LastChangedUtc);
|
||||||
|
|
||||||
|
public readonly record struct HostProbeTarget(string TagName, int CategoryId)
|
||||||
|
{
|
||||||
|
public bool IsRuntimeHost =>
|
||||||
|
CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|
||||||
|
|| CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine;
|
||||||
|
}
|
||||||
@@ -94,6 +94,24 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
|
|||||||
Values = System.Array.Empty<GalaxyDataValue>(),
|
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
public Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
|
||||||
|
HistoryReadAtTimeRequest req, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new HistoryReadAtTimeResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
|
||||||
|
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||||
|
});
|
||||||
|
|
||||||
|
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||||
|
HistoryReadEventsRequest req, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
|
||||||
|
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
|
||||||
|
});
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse
|
=> Task.FromResult(new RecycleStatusResponse
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -87,6 +87,20 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
|
|||||||
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
|
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
case MessageKind.HistoryReadAtTimeRequest:
|
||||||
|
{
|
||||||
|
var resp = await backend.HistoryReadAtTimeAsync(
|
||||||
|
Deserialize<HistoryReadAtTimeRequest>(body), ct);
|
||||||
|
await writer.WriteAsync(MessageKind.HistoryReadAtTimeResponse, resp, ct);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case MessageKind.HistoryReadEventsRequest:
|
||||||
|
{
|
||||||
|
var resp = await backend.HistoryReadEventsAsync(
|
||||||
|
Deserialize<HistoryReadEventsRequest>(body), ct);
|
||||||
|
await writer.WriteAsync(MessageKind.HistoryReadEventsResponse, resp, ct);
|
||||||
|
return;
|
||||||
|
}
|
||||||
case MessageKind.RecycleHostRequest:
|
case MessageKind.RecycleHostRequest:
|
||||||
{
|
{
|
||||||
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
|
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
|
||||||
|
|||||||
@@ -48,10 +48,14 @@ public enum MessageKind : byte
|
|||||||
AlarmEvent = 0x51,
|
AlarmEvent = 0x51,
|
||||||
AlarmAckRequest = 0x52,
|
AlarmAckRequest = 0x52,
|
||||||
|
|
||||||
HistoryReadRequest = 0x60,
|
HistoryReadRequest = 0x60,
|
||||||
HistoryReadResponse = 0x61,
|
HistoryReadResponse = 0x61,
|
||||||
HistoryReadProcessedRequest = 0x62,
|
HistoryReadProcessedRequest = 0x62,
|
||||||
HistoryReadProcessedResponse = 0x63,
|
HistoryReadProcessedResponse = 0x63,
|
||||||
|
HistoryReadAtTimeRequest = 0x64,
|
||||||
|
HistoryReadAtTimeResponse = 0x65,
|
||||||
|
HistoryReadEventsRequest = 0x66,
|
||||||
|
HistoryReadEventsResponse = 0x67,
|
||||||
|
|
||||||
HostConnectivityStatus = 0x70,
|
HostConnectivityStatus = 0x70,
|
||||||
RuntimeStatusChange = 0x71,
|
RuntimeStatusChange = 0x71,
|
||||||
|
|||||||
@@ -50,3 +50,61 @@ public sealed class HistoryReadProcessedResponse
|
|||||||
[Key(1)] public string? Error { get; set; }
|
[Key(1)] public string? Error { get; set; }
|
||||||
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
|
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// At-time historian read — OPC UA HistoryReadAtTime service. Returns one sample per
|
||||||
|
/// requested timestamp (interpolated when no exact match exists). The per-timestamp array
|
||||||
|
/// is flow-encoded as Unix milliseconds to avoid MessagePack DateTime quirks.
|
||||||
|
/// </summary>
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class HistoryReadAtTimeRequest
|
||||||
|
{
|
||||||
|
[Key(0)] public long SessionId { get; set; }
|
||||||
|
[Key(1)] public string TagReference { get; set; } = string.Empty;
|
||||||
|
[Key(2)] public long[] TimestampsUtcUnixMs { get; set; } = System.Array.Empty<long>();
|
||||||
|
}
|
||||||
|
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class HistoryReadAtTimeResponse
|
||||||
|
{
|
||||||
|
[Key(0)] public bool Success { get; set; }
|
||||||
|
[Key(1)] public string? Error { get; set; }
|
||||||
|
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Historical events read — OPC UA HistoryReadEvents service and Alarm & Condition
|
||||||
|
/// history. <c>SourceName</c> null means "all sources". Distinct from the live
|
||||||
|
/// <see cref="GalaxyAlarmEvent"/> stream because historical rows carry both
|
||||||
|
/// <c>EventTime</c> (when the event occurred in the process) and <c>ReceivedTime</c>
|
||||||
|
/// (when the Historian persisted it) and have no StateTransition — the Historian logs
|
||||||
|
/// the instantaneous event, not the OPC UA alarm lifecycle.
|
||||||
|
/// </summary>
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
[Key(0)] public long SessionId { get; set; }
|
||||||
|
[Key(1)] public string? SourceName { get; set; }
|
||||||
|
[Key(2)] public long StartUtcUnixMs { get; set; }
|
||||||
|
[Key(3)] public long EndUtcUnixMs { get; set; }
|
||||||
|
[Key(4)] public int MaxEvents { get; set; } = 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class GalaxyHistoricalEvent
|
||||||
|
{
|
||||||
|
[Key(0)] public string EventId { get; set; } = string.Empty;
|
||||||
|
[Key(1)] public string? SourceName { get; set; }
|
||||||
|
[Key(2)] public long EventTimeUtcUnixMs { get; set; }
|
||||||
|
[Key(3)] public long ReceivedTimeUtcUnixMs { get; set; }
|
||||||
|
[Key(4)] public string? DisplayText { get; set; }
|
||||||
|
[Key(5)] public ushort Severity { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
[Key(0)] public bool Success { get; set; }
|
||||||
|
[Key(1)] public string? Error { get; set; }
|
||||||
|
[Key(2)] public GalaxyHistoricalEvent[] Events { get; set; } = System.Array.Empty<GalaxyHistoricalEvent>();
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,190 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Alarms;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class GalaxyAlarmTrackerTests
|
||||||
|
{
|
||||||
|
private sealed class FakeSubscriber
|
||||||
|
{
|
||||||
|
public readonly ConcurrentDictionary<string, Action<string, Vtq>> Subs = new();
|
||||||
|
public readonly ConcurrentQueue<string> Unsubs = new();
|
||||||
|
public readonly ConcurrentQueue<(string Tag, object Value)> Writes = new();
|
||||||
|
public bool WriteReturns { get; set; } = true;
|
||||||
|
|
||||||
|
public Task Subscribe(string tag, Action<string, Vtq> cb)
|
||||||
|
{
|
||||||
|
Subs[tag] = cb;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
public Task Unsubscribe(string tag)
|
||||||
|
{
|
||||||
|
Unsubs.Enqueue(tag);
|
||||||
|
Subs.TryRemove(tag, out _);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
public Task<bool> Write(string tag, object value)
|
||||||
|
{
|
||||||
|
Writes.Enqueue((tag, value));
|
||||||
|
return Task.FromResult(WriteReturns);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Vtq Bool(bool v) => new(v, DateTime.UtcNow, 192);
|
||||||
|
private static Vtq Int(int v) => new(v, DateTime.UtcNow, 192);
|
||||||
|
private static Vtq Str(string v) => new(v, DateTime.UtcNow, 192);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Track_subscribes_to_four_alarm_attributes()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
|
||||||
|
await t.TrackAsync("Tank.Level.HiHi");
|
||||||
|
|
||||||
|
fake.Subs.ShouldContainKey("Tank.Level.HiHi.InAlarm");
|
||||||
|
fake.Subs.ShouldContainKey("Tank.Level.HiHi.Priority");
|
||||||
|
fake.Subs.ShouldContainKey("Tank.Level.HiHi.DescAttrName");
|
||||||
|
fake.Subs.ShouldContainKey("Tank.Level.HiHi.Acked");
|
||||||
|
t.TrackedAlarmCount.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Track_is_idempotent_on_repeat_call()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
|
||||||
|
await t.TrackAsync("Alarm.A");
|
||||||
|
await t.TrackAsync("Alarm.A");
|
||||||
|
|
||||||
|
t.TrackedAlarmCount.ShouldBe(1);
|
||||||
|
fake.Subs.Count.ShouldBe(4); // 4 sub calls, not 8
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task InAlarm_false_to_true_fires_Active_transition()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
var transitions = new ConcurrentQueue<AlarmTransition>();
|
||||||
|
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
|
||||||
|
|
||||||
|
await t.TrackAsync("Alarm.A");
|
||||||
|
fake.Subs["Alarm.A.Priority"]("Alarm.A.Priority", Int(500));
|
||||||
|
fake.Subs["Alarm.A.DescAttrName"]("Alarm.A.DescAttrName", Str("TankLevelHiHi"));
|
||||||
|
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(true));
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(1);
|
||||||
|
transitions.TryDequeue(out var tr).ShouldBeTrue();
|
||||||
|
tr!.Transition.ShouldBe(AlarmStateTransition.Active);
|
||||||
|
tr.Priority.ShouldBe(500);
|
||||||
|
tr.DescAttrName.ShouldBe("TankLevelHiHi");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task InAlarm_true_to_false_fires_Inactive_transition()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
var transitions = new ConcurrentQueue<AlarmTransition>();
|
||||||
|
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
|
||||||
|
|
||||||
|
await t.TrackAsync("Alarm.A");
|
||||||
|
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(true));
|
||||||
|
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(false));
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(2);
|
||||||
|
transitions.TryDequeue(out _);
|
||||||
|
transitions.TryDequeue(out var tr).ShouldBeTrue();
|
||||||
|
tr!.Transition.ShouldBe(AlarmStateTransition.Inactive);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Acked_false_to_true_fires_Acknowledged_while_InAlarm_is_true()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
var transitions = new ConcurrentQueue<AlarmTransition>();
|
||||||
|
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
|
||||||
|
|
||||||
|
await t.TrackAsync("Alarm.A");
|
||||||
|
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(true)); // Active, clears Acked flag
|
||||||
|
fake.Subs["Alarm.A.Acked"]("Alarm.A.Acked", Bool(true)); // Acknowledged
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(2);
|
||||||
|
transitions.TryDequeue(out _);
|
||||||
|
transitions.TryDequeue(out var tr).ShouldBeTrue();
|
||||||
|
tr!.Transition.ShouldBe(AlarmStateTransition.Acknowledged);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Acked_transition_while_InAlarm_is_false_does_not_fire()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
var transitions = new ConcurrentQueue<AlarmTransition>();
|
||||||
|
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
|
||||||
|
|
||||||
|
await t.TrackAsync("Alarm.A");
|
||||||
|
// Initial Acked=true on subscribe (alarm is at rest, pre-ack'd) — should not fire.
|
||||||
|
fake.Subs["Alarm.A.Acked"]("Alarm.A.Acked", Bool(true));
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Acknowledge_writes_AckMsg_with_comment()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
await t.TrackAsync("Alarm.A");
|
||||||
|
|
||||||
|
var ok = await t.AcknowledgeAsync("Alarm.A", "acknowledged by operator");
|
||||||
|
|
||||||
|
ok.ShouldBeTrue();
|
||||||
|
fake.Writes.Count.ShouldBe(1);
|
||||||
|
fake.Writes.TryDequeue(out var w).ShouldBeTrue();
|
||||||
|
w.Tag.ShouldBe("Alarm.A.AckMsg");
|
||||||
|
w.Value.ShouldBe("acknowledged by operator");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Snapshot_reports_latest_fields()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
await t.TrackAsync("Alarm.A");
|
||||||
|
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(true));
|
||||||
|
fake.Subs["Alarm.A.Priority"]("Alarm.A.Priority", Int(900));
|
||||||
|
fake.Subs["Alarm.A.DescAttrName"]("Alarm.A.DescAttrName", Str("MyAlarm"));
|
||||||
|
fake.Subs["Alarm.A.Acked"]("Alarm.A.Acked", Bool(true));
|
||||||
|
|
||||||
|
var snap = t.SnapshotStates();
|
||||||
|
snap.Count.ShouldBe(1);
|
||||||
|
snap[0].InAlarm.ShouldBeTrue();
|
||||||
|
snap[0].Acked.ShouldBeTrue();
|
||||||
|
snap[0].Priority.ShouldBe(900);
|
||||||
|
snap[0].DescAttrName.ShouldBe("MyAlarm");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Foreign_probe_callback_is_dropped()
|
||||||
|
{
|
||||||
|
var fake = new FakeSubscriber();
|
||||||
|
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
|
||||||
|
var transitions = new ConcurrentQueue<AlarmTransition>();
|
||||||
|
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
|
||||||
|
|
||||||
|
// No TrackAsync was called — this callback is foreign and should be silently ignored.
|
||||||
|
t.OnProbeCallback("Unknown.InAlarm", Bool(true));
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,231 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class GalaxyRuntimeProbeManagerTests
|
||||||
|
{
|
||||||
|
private sealed class FakeSubscriber
|
||||||
|
{
|
||||||
|
public readonly ConcurrentDictionary<string, Action<string, Vtq>> Subs = new();
|
||||||
|
public readonly ConcurrentQueue<string> UnsubCalls = new();
|
||||||
|
public bool FailSubscribeFor { get; set; }
|
||||||
|
public string? FailSubscribeTag { get; set; }
|
||||||
|
|
||||||
|
public Task Subscribe(string probe, Action<string, Vtq> cb)
|
||||||
|
{
|
||||||
|
if (FailSubscribeFor && string.Equals(probe, FailSubscribeTag, StringComparison.OrdinalIgnoreCase))
|
||||||
|
throw new InvalidOperationException("subscribe refused");
|
||||||
|
Subs[probe] = cb;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Unsubscribe(string probe)
|
||||||
|
{
|
||||||
|
UnsubCalls.Enqueue(probe);
|
||||||
|
Subs.TryRemove(probe, out _);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Vtq Good(bool scanState) => new(scanState, DateTime.UtcNow, 192);
|
||||||
|
private static Vtq Bad() => new(null, DateTime.UtcNow, 0);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Sync_subscribes_to_ScanState_per_host()
|
||||||
|
{
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[]
|
||||||
|
{
|
||||||
|
new HostProbeTarget("PlatformA", GalaxyRuntimeProbeManager.CategoryWinPlatform),
|
||||||
|
new HostProbeTarget("EngineB", GalaxyRuntimeProbeManager.CategoryAppEngine),
|
||||||
|
});
|
||||||
|
|
||||||
|
mgr.ActiveProbeCount.ShouldBe(2);
|
||||||
|
subs.Subs.ShouldContainKey("PlatformA.ScanState");
|
||||||
|
subs.Subs.ShouldContainKey("EngineB.ScanState");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Sync_is_idempotent_on_repeat_call_with_same_set()
|
||||||
|
{
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||||
|
var targets = new[] { new HostProbeTarget("PlatformA", 1) };
|
||||||
|
|
||||||
|
await mgr.SyncAsync(targets);
|
||||||
|
await mgr.SyncAsync(targets);
|
||||||
|
|
||||||
|
mgr.ActiveProbeCount.ShouldBe(1);
|
||||||
|
subs.Subs.Count.ShouldBe(1);
|
||||||
|
subs.UnsubCalls.Count.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Sync_unadvises_removed_hosts()
|
||||||
|
{
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[]
|
||||||
|
{
|
||||||
|
new HostProbeTarget("PlatformA", 1),
|
||||||
|
new HostProbeTarget("PlatformB", 1),
|
||||||
|
});
|
||||||
|
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||||
|
|
||||||
|
mgr.ActiveProbeCount.ShouldBe(1);
|
||||||
|
subs.UnsubCalls.ShouldContain("PlatformB.ScanState");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events()
|
||||||
|
{
|
||||||
|
var subs = new FakeSubscriber { FailSubscribeFor = true, FailSubscribeTag = "PlatformA.ScanState" };
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||||
|
|
||||||
|
mgr.ActiveProbeCount.ShouldBe(0); // rolled back
|
||||||
|
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Unknown);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unknown_to_Running_does_not_fire_StateChanged()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||||
|
var transitions = new ConcurrentQueue<HostStateTransition>();
|
||||||
|
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
|
||||||
|
|
||||||
|
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Running);
|
||||||
|
transitions.Count.ShouldBe(0); // startup transition, operators don't care
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Running_to_Stopped_fires_StateChanged_with_both_states()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||||
|
var transitions = new ConcurrentQueue<HostStateTransition>();
|
||||||
|
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(1);
|
||||||
|
transitions.TryDequeue(out var t).ShouldBeTrue();
|
||||||
|
t!.TagName.ShouldBe("PlatformA");
|
||||||
|
t.OldState.ShouldBe(HostRuntimeState.Running);
|
||||||
|
t.NewState.ShouldBe(HostRuntimeState.Stopped);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Stopped_to_Running_fires_StateChanged_for_recovery()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||||
|
var transitions = new ConcurrentQueue<HostStateTransition>();
|
||||||
|
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Stopped→Running (fires)
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||||
|
var transitions = new ConcurrentQueue<HostStateTransition>();
|
||||||
|
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||||
|
// First callback is bad-quality — we must flag the host Stopped so operators see it.
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Bad());
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(1);
|
||||||
|
transitions.TryDequeue(out var t).ShouldBeTrue();
|
||||||
|
t!.OldState.ShouldBe(HostRuntimeState.Unknown);
|
||||||
|
t.NewState.ShouldBe(HostRuntimeState.Stopped);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Repeated_Good_Running_callbacks_do_not_fire_duplicate_events()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||||
|
var count = 0;
|
||||||
|
mgr.StateChanged += (_, _) => Interlocked.Increment(ref count);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
|
||||||
|
for (var i = 0; i < 5; i++)
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
|
||||||
|
|
||||||
|
count.ShouldBe(0); // only the silent Unknown→Running on the first, no events after
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unknown_callback_for_non_tracked_probe_is_dropped()
|
||||||
|
{
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
|
||||||
|
|
||||||
|
mgr.OnProbeCallback("ProbeForSomeoneElse.ScanState", Good(true));
|
||||||
|
|
||||||
|
mgr.ActiveProbeCount.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Snapshot_reports_current_state_for_every_tracked_host()
|
||||||
|
{
|
||||||
|
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var subs = new FakeSubscriber();
|
||||||
|
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
|
||||||
|
|
||||||
|
await mgr.SyncAsync(new[]
|
||||||
|
{
|
||||||
|
new HostProbeTarget("PlatformA", 1),
|
||||||
|
new HostProbeTarget("EngineB", 3),
|
||||||
|
});
|
||||||
|
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Running
|
||||||
|
subs.Subs["EngineB.ScanState"]("EngineB.ScanState", Bad()); // Stopped
|
||||||
|
|
||||||
|
var snap = mgr.SnapshotStates();
|
||||||
|
snap.Count.ShouldBe(2);
|
||||||
|
snap.ShouldContain(s => s.TagName == "PlatformA" && s.State == HostRuntimeState.Running);
|
||||||
|
snap.ShouldContain(s => s.TagName == "EngineB" && s.State == HostRuntimeState.Stopped);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids()
|
||||||
|
{
|
||||||
|
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryWinPlatform).IsRuntimeHost.ShouldBeTrue();
|
||||||
|
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryAppEngine).IsRuntimeHost.ShouldBeTrue();
|
||||||
|
new HostProbeTarget("X", 4 /* $Area */).IsRuntimeHost.ShouldBeFalse();
|
||||||
|
new HostProbeTarget("X", 11 /* $ApplicationObject */).IsRuntimeHost.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,147 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MessagePack;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class HistoryReadAtTimeTests
|
||||||
|
{
|
||||||
|
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? historian, StaPump pump) =>
|
||||||
|
new(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
new MxAccessClient(pump, new MxProxyAdapter(), "attime-test"),
|
||||||
|
historian);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Returns_disabled_error_when_no_historian_configured()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
using var backend = BuildBackend(null, pump);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
|
||||||
|
{
|
||||||
|
TagReference = "T",
|
||||||
|
TimestampsUtcUnixMs = new[] { 1L, 2L },
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeFalse();
|
||||||
|
resp.Error.ShouldContain("Historian disabled");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Empty_timestamp_list_short_circuits_to_success_with_no_values()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var fake = new FakeHistorian();
|
||||||
|
using var backend = BuildBackend(fake, pump);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
|
||||||
|
{
|
||||||
|
TagReference = "T",
|
||||||
|
TimestampsUtcUnixMs = Array.Empty<long>(),
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeTrue();
|
||||||
|
resp.Values.ShouldBeEmpty();
|
||||||
|
fake.Calls.ShouldBe(0); // no round-trip to SDK for empty timestamp list
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Timestamps_survive_Unix_ms_round_trip_to_DateTime()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var t1 = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var t2 = new DateTime(2026, 4, 18, 10, 5, 0, DateTimeKind.Utc);
|
||||||
|
var fake = new FakeHistorian(
|
||||||
|
new HistorianSample { Value = 100.0, Quality = 192, TimestampUtc = t1 },
|
||||||
|
new HistorianSample { Value = 101.5, Quality = 192, TimestampUtc = t2 });
|
||||||
|
using var backend = BuildBackend(fake, pump);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
|
||||||
|
{
|
||||||
|
TagReference = "TankLevel",
|
||||||
|
TimestampsUtcUnixMs = new[]
|
||||||
|
{
|
||||||
|
new DateTimeOffset(t1, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
new DateTimeOffset(t2, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
},
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeTrue();
|
||||||
|
resp.Values.Length.ShouldBe(2);
|
||||||
|
resp.Values[0].SourceTimestampUtcUnixMs.ShouldBe(new DateTimeOffset(t1, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||||
|
resp.Values[0].StatusCode.ShouldBe(0u); // Good (quality 192)
|
||||||
|
MessagePackSerializer.Deserialize<double>(resp.Values[0].ValueBytes!).ShouldBe(100.0);
|
||||||
|
|
||||||
|
fake.Calls.ShouldBe(1);
|
||||||
|
fake.LastTimestamps.Length.ShouldBe(2);
|
||||||
|
fake.LastTimestamps[0].ShouldBe(t1);
|
||||||
|
fake.LastTimestamps[1].ShouldBe(t2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Missing_sample_maps_to_Bad_category()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
// Quality=0 means no sample at that timestamp per HistorianDataSource.ReadAtTimeAsync.
|
||||||
|
var fake = new FakeHistorian(new HistorianSample
|
||||||
|
{
|
||||||
|
Value = null,
|
||||||
|
Quality = 0,
|
||||||
|
TimestampUtc = DateTime.UtcNow,
|
||||||
|
});
|
||||||
|
using var backend = BuildBackend(fake, pump);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
|
||||||
|
{
|
||||||
|
TagReference = "T",
|
||||||
|
TimestampsUtcUnixMs = new[] { 1L },
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeTrue();
|
||||||
|
resp.Values.Length.ShouldBe(1);
|
||||||
|
resp.Values[0].StatusCode.ShouldBe(0x80000000u); // Bad category
|
||||||
|
resp.Values[0].ValueBytes.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeHistorian : IHistorianDataSource
|
||||||
|
{
|
||||||
|
private readonly HistorianSample[] _samples;
|
||||||
|
public int Calls { get; private set; }
|
||||||
|
public DateTime[] LastTimestamps { get; private set; } = Array.Empty<DateTime>();
|
||||||
|
|
||||||
|
public FakeHistorian(params HistorianSample[] samples) => _samples = samples;
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
|
||||||
|
{
|
||||||
|
Calls++;
|
||||||
|
LastTimestamps = ts;
|
||||||
|
return Task.FromResult(new List<HistorianSample>(_samples));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample>());
|
||||||
|
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianAggregateSample>());
|
||||||
|
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianEventDto>());
|
||||||
|
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
||||||
|
public void Dispose() { }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,129 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class HistoryReadEventsTests
|
||||||
|
{
|
||||||
|
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? h, StaPump pump) =>
|
||||||
|
new(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
new MxAccessClient(pump, new MxProxyAdapter(), "events-test"),
|
||||||
|
h);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Returns_disabled_error_when_no_historian_configured()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
using var backend = BuildBackend(null, pump);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
SourceName = "TankA",
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
MaxEvents = 100,
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeFalse();
|
||||||
|
resp.Error.ShouldContain("Historian disabled");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Maps_HistorianEventDto_to_GalaxyHistoricalEvent_wire_shape()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
|
||||||
|
var eventId = Guid.NewGuid();
|
||||||
|
var eventTime = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var receivedTime = eventTime.AddMilliseconds(150);
|
||||||
|
var fake = new FakeHistorian(new HistorianEventDto
|
||||||
|
{
|
||||||
|
Id = eventId,
|
||||||
|
Source = "TankA.Level.HiHi",
|
||||||
|
EventTime = eventTime,
|
||||||
|
ReceivedTime = receivedTime,
|
||||||
|
DisplayText = "HiHi alarm tripped",
|
||||||
|
Severity = 900,
|
||||||
|
});
|
||||||
|
using var backend = BuildBackend(fake, pump);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
SourceName = "TankA.Level.HiHi",
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
MaxEvents = 50,
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeTrue();
|
||||||
|
resp.Events.Length.ShouldBe(1);
|
||||||
|
var got = resp.Events[0];
|
||||||
|
got.EventId.ShouldBe(eventId.ToString());
|
||||||
|
got.SourceName.ShouldBe("TankA.Level.HiHi");
|
||||||
|
got.DisplayText.ShouldBe("HiHi alarm tripped");
|
||||||
|
got.Severity.ShouldBe<ushort>(900);
|
||||||
|
got.EventTimeUtcUnixMs.ShouldBe(new DateTimeOffset(eventTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||||
|
got.ReceivedTimeUtcUnixMs.ShouldBe(new DateTimeOffset(receivedTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||||
|
|
||||||
|
fake.LastSourceName.ShouldBe("TankA.Level.HiHi");
|
||||||
|
fake.LastMaxEvents.ShouldBe(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Null_source_name_is_passed_through_as_all_sources()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var fake = new FakeHistorian();
|
||||||
|
using var backend = BuildBackend(fake, pump);
|
||||||
|
|
||||||
|
await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
SourceName = null,
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = 1,
|
||||||
|
MaxEvents = 10,
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
fake.LastSourceName.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeHistorian : IHistorianDataSource
|
||||||
|
{
|
||||||
|
private readonly HistorianEventDto[] _events;
|
||||||
|
public string? LastSourceName { get; private set; } = "<unset>";
|
||||||
|
public int LastMaxEvents { get; private set; }
|
||||||
|
|
||||||
|
public FakeHistorian(params HistorianEventDto[] events) => _events = events;
|
||||||
|
|
||||||
|
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
{
|
||||||
|
LastSourceName = src;
|
||||||
|
LastMaxEvents = max;
|
||||||
|
return Task.FromResult(new List<HistorianEventDto>(_events));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample>());
|
||||||
|
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianAggregateSample>());
|
||||||
|
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample>());
|
||||||
|
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
||||||
|
public void Dispose() { }
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user