Compare commits

...

16 Commits

Author SHA1 Message Date
Joseph Doherty
c14624f012 Phase 2 PR 14 — alarm subsystem wire-up. Per IsAlarm=true attribute (PR 9 added the discovery flag), GalaxyAlarmTracker in Backend/Alarms/ advises the four Galaxy alarm-state attributes: .InAlarm (boolean alarm active), .Priority (int severity), .DescAttrName (human-readable description), .Acked (boolean acknowledged). Runs the OPC UA Part 9 alarm lifecycle state machine simplified for the Galaxy AlarmExtension model and raises AlarmTransition events on transitions operators must react to — Active (InAlarm false→true, default Unacknowledged), Acknowledged (Acked false→true while InAlarm still true), Inactive (InAlarm true→false). MxAccessGalaxyBackend instantiates the tracker in its constructor with delegate-based subscribe/unsubscribe/write pointers to MxAccessClient, hooks TransitionRaised to forward each transition through the existing OnAlarmEvent IPC event that PR 4 ConnectionSink wires into MessageKind.AlarmEvent frames — no new contract messages required since GalaxyAlarmEvent already exists in Shared.Contracts. Field mapping: EventId = fresh Guid.ToString('N') per transition, ObjectTagName = alarm attribute full reference, AlarmName = alarm attribute full reference, Severity = tracked Priority, StateTransition = 'Active'|'Acknowledged'|'Inactive', Message = DescAttrName or tag fallback, UtcUnixMs = transition time. DiscoverAsync caches every IsAlarm=true attribute's full reference (tag.attribute) into _discoveredAlarmTags (ConcurrentBag cleared-then-filled on every re-Discover to track Galaxy redeploys). SubscribeAlarmsAsync iterates the cache and advises each via GalaxyAlarmTracker.TrackAsync; best-effort per-alarm — a subscribe failure on one alarm doesn't abort the whole call since operators prefer partial alarm coverage to none. Tracker is internally idempotent on repeat Track calls (second invocation for same alarm tag is a no-op; already-subscribed check short-circuits before the 4 MXAccess sub calls). Subscribe-failure rollback inside TrackAsync removes the alarm state + unadvises any of the 4 that did succeed so a partial advise can't leak a phantom tracking entry. AcknowledgeAlarmAsync routes to tracker.AcknowledgeAsync which writes the operator comment to <tag>.AckMsg via MxAccessClient.WriteAsync — writes use the existing MXAccess OnWriteComplete TCS-by-handle path (PR 4 Medium 4) so a runtime-refused ack bubbles up as Success=false rather than false-positive. State-machine quirks preserved from v1: (1) initial Acked=true on subscribe does NOT fire Acknowledged (alarm at rest, pre-acknowledged — default state is Acked=true so the first subscribe callback is a no-op transition), (2) Acked false→true only fires Acknowledged when InAlarm is currently true (acking a latched-inactive alarm is not a user-visible transition), (3) Active transition clears the Acked flag in-state so the next Acked callback correctly fires Acknowledged (v1 had this buried in the ConditionState logic; we track it on the AlarmState struct directly). Priority value handled as int/short/long via type pattern match with int.MaxValue guard — Galaxy attribute category returns varying CLR types (Int32 is canonical but some older templates use Int16), and a long overflow cast to int would silently corrupt the severity. Dispose cascade in MxAccessGalaxyBackend.Dispose: alarm-tracker unsubscribe→dispose, probe-manager unsubscribe→dispose, mx.ConnectionStateChanged detach, historian dispose — same discipline PR 6 / PR 8 / PR 13 established so dangling invocation-list refs don't survive a backend recycle. #pragma warning disable CS0067 around OnAlarmEvent removed since the event is now raised. Tests (9 new, GalaxyAlarmTrackerTests): four-attribute subscribe per alarm, idempotent repeat-track, InAlarm false→true fires Active with Priority + Desc, InAlarm true→false fires Inactive, Acked false→true while InAlarm fires Acknowledged, Acked transition while InAlarm=false does not fire, AckMsg write path carries the comment, snapshot reports latest four fields, foreign probe callback for a non-tracked tag is silently dropped. Full Galaxy.Host.Tests Unit suite 84 pass / 0 fail (9 new alarm + 12 PR 13 probe + 21 PR 12 quality + 42 pre-existing). Galaxy.Host builds clean (0/0). Branches off phase-2-pr13-runtime-probe so the MxAccessGalaxyBackend constructor/Dispose chain gets the probe-manager + alarm-tracker wire-up in a coherent order; fast-forwards if PR 13 merges first.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:34:13 -04:00
Joseph Doherty
04d267d1ea Phase 2 PR 13 — port GalaxyRuntimeProbeManager state machine + wire per-platform ScanState probing. PR 8 gave operators the gateway-level transport signal (MxAccessClient.ConnectionStateChanged → OnHostStatusChanged tagged with the Wonderware client identity) — enough to detect when the entire MXAccess COM proxy dies, but silent when a specific or goes down while the gateway stays alive. This PR closes that gap: a pure-logic GalaxyRuntimeProbeManager (ported from v1's 472-LOC GalaxyRuntimeProbeManager.cs, distilled to ~240 LOC of state machine without the OPC UA node-manager entanglement) lives in Backend/Stability/, advises <TagName>.ScanState per WinPlatform + AppEngine gobject discovered during DiscoverAsync, runs the Unknown → Running → Stopped state machine with v1's documented semantics preserved verbatim, and raises a StateChanged event on transitions operators should react to. MxAccessGalaxyBackend instantiates the probe manager in the constructor with SubscribeAsync/UnsubscribeAsync delegate pointers into MxAccessClient, hooks StateChanged to forward each transition through the same OnHostStatusChanged IPC event the gateway signal uses (HostName = platform/engine TagName, RuntimeStatus = 'Running'|'Stopped'|'Unknown', LastObservedUtcUnixMs from the state-change timestamp), so Admin UI gets per-host signals flowing through the existing PR 8 wire with no additional IPC plumbing. State machine rules ported from v1 runtimestatus.md: (a) ScanState is on-change-only — a stably-Running host may go hours without a callback, so Running → Stopped is driven only by explicit ScanState=false, never by starvation; (b) Unknown → Running is a startup transition and does NOT fire StateChanged (would paint every host as 'just recovered' at startup, which is noise and can clear Bad quality set by a concurrently-stopping sibling); (c) Stopped → Running fires StateChanged for the real recovery case; (d) Running → Stopped fires StateChanged; (e) Unknown → Stopped fires StateChanged because that's the first-known-bad signal operators need when a host is down at our startup time. MxAccessGalaxyBackend.DiscoverAsync calls _probeManager.SyncAsync with the runtime-host subset of the hierarchy (CategoryId == 1 WinPlatform or 3 AppEngine) as a best-effort step after building the Discover response — probe failures are swallowed so Discover still returns the hierarchy even if a per-host advise fails; the gateway signal covers the critical rung. SyncAsync is idempotent (second call with the same set is a no-op) and handles the diff on re-Discover for tag rename / host add / host remove. Subscribe failure rolls back the host's state entry under the lock so a later probe callback for a never-advised tag can't transition a phantom state from Unknown to Stopped and fan out a false host-down signal (the same protection v1's GalaxyRuntimeProbeManager had at line 237-243 of v1 with a captured-probe-string comparison under the lock). MxAccessGalaxyBackend.Dispose unsubscribes the StateChanged handler before disposing the probe manager to prevent dangling invocation-list references across reconnects, same discipline as PR 8's ConnectionStateChanged and PR 6's SubscriptionReplayFailed. Tests (12 new GalaxyRuntimeProbeManagerTests): Sync_subscribes_to_ScanState_per_host verifies tag.ScanState subscriptions are advised per Platform/Engine; Sync_is_idempotent_on_repeat_call_with_same_set verifies no duplicate subscribes; Sync_unadvises_removed_hosts verifies the diff unadvises gone hosts; Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events covers the rollback-on-subscribe-fail guard; Unknown_to_Running_does_not_fire_StateChanged preserves the startup-noise rule; Running_to_Stopped_fires_StateChanged_with_both_states asserts OldState and NewState are both captured in the transition record; Stopped_to_Running_fires_StateChanged_for_recovery verifies the recovery case; Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal preserves the first-bad rule; Repeated_Good_Running_callbacks_do_not_fire_duplicate_events verifies the state-tracking de-dup; Unknown_callback_for_non_tracked_probe_is_dropped asserts a foreign callback is silently ignored; Snapshot_reports_current_state_for_every_tracked_host covers the dashboard query hook; IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids asserts the CategoryId filter. Galaxy.Host.Tests Unit suite 75 pass / 0 fail (12 new probe + 63 pre-existing). Galaxy.Host builds clean (0 errors / 0 warnings). Branches off v2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:27:56 -04:00
4448db8207 Merge pull request 'Phase 2 PR 12 � richer historian quality mapping' (#11) from phase-2-pr12-quality-mapper into v2 2026-04-18 07:22:44 -04:00
d96b513bbc Merge pull request 'Phase 2 PR 11 � HistoryReadEvents IPC (alarm history)' (#10) from phase-2-pr11-history-events into v2 2026-04-18 07:22:33 -04:00
053c4e0566 Merge pull request 'Phase 2 PR 10 � HistoryReadAtTime IPC surface' (#9) from phase-2-pr10-history-attime into v2 2026-04-18 07:22:16 -04:00
Joseph Doherty
f24f969a85 Phase 2 PR 12 — richer historian quality mapping. Replace MxAccessGalaxyBackend's inline MapHistorianQualityToOpcUa category-only helper (192+→Good, 64-191→Uncertain, 0-63→Bad) with a new public HistorianQualityMapper.Map utility that preserves specific OPC DA subcodes — BadNotConnected(8)→0x808A0000u instead of generic Bad(0x80000000u), UncertainSubNormal(88)→0x40950000u instead of generic Uncertain, Good_LocalOverride(216)→0x00D80000u instead of generic Good, etc. Mirrors v1 QualityMapper.MapToOpcUaStatusCode byte-for-byte without pulling in OPC UA types — the function returns uint32 literals that are the canonical OPC UA StatusCode wire encoding, surfaced directly as DataValueSnapshot.StatusCode on the Proxy side with no additional translation. Unknown subcodes fall back to the family category (255→Good, 150→Uncertain, 50→Bad) so a future SDK change that adds a quality code we don't map yet still gets a sensible bucket. GalaxyDataValue wire shape unchanged (StatusCode stays uint) — this is a pure fidelity upgrade on the Host side. Downstream callers (Admin UI status dashboard, OPC UA clients receiving historian samples) can now distinguish e.g. a transport outage (BadNotConnected) from a sensor fault (BadSensorFailure) from a warm-up delay (BadWaitingForInitialData) without a second round-trip or dashboard heuristic. 21 new tests (HistorianQualityMapperTests): theory with 15 rows covering every specific mapping from the v1 QualityMapper table, plus 6 fallback tests verifying unknown-subcode codes in each family (Good/Uncertain/Bad) collapse to the family default. Galaxy.Host.Tests Unit suite 56/0 (21 new + 35 existing). Galaxy.Host builds clean (0/0). Branches off v2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:11:02 -04:00
Joseph Doherty
ca025ebe0c Phase 2 PR 11 — HistoryReadEvents IPC (alarm history). New Shared.Contracts messages HistoryReadEventsRequest/Response + GalaxyHistoricalEvent DTO (MessageKind 0x66/0x67). IGalaxyBackend gains HistoryReadEventsAsync, Stub/DbBacked return canonical pending error, MxAccessGalaxyBackend delegates to _historian.ReadEventsAsync (ported in PR 5) and maps HistorianEventDto → GalaxyHistoricalEvent — Guid.ToString() for EventId wire shape, DateTime → Unix ms for both EventTime (when the event fired in the process) and ReceivedTime (when the Historian persisted it), DisplayText + Severity pass through. SourceName is string? — null means 'all sources' (passed straight through to HistorianDataSource.ReadEventsAsync which adds the AddEventFilter('Source', Equal, ...) only when non-null). Distinct from the live GalaxyAlarmEvent type because historical rows carry both timestamps and lack StateTransition (Historian logs instantaneous events, not the OPC UA Part 9 alarm lifecycle; translating to OPC UA event lifecycle is the alarm-subsystem's job). Guards: null historian → Historian-disabled error; SDK exception → Success=false with message chained. Tests (3 new): disabled-error when historian null, maps HistorianEventDto with full field set (Id/Source/EventTime/ReceivedTime/DisplayText/Severity=900) to GalaxyHistoricalEvent, null SourceName passes through unchanged (verifies the 'all sources' contract). Galaxy.Host.Tests Unit suite 34 pass / 0 fail. Galaxy.Host builds clean. Branches off phase-2-pr10-history-attime since both extend the MessageKind enum; fast-forwards if PR 10 merges first.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:08:16 -04:00
Joseph Doherty
d13f919112 Phase 2 PR 10 — HistoryReadAtTime IPC surface. New Shared.Contracts messages HistoryReadAtTimeRequest/Response (MessageKind 0x64/0x65), IGalaxyBackend gains HistoryReadAtTimeAsync, Stub/DbBacked return canonical pending error, MxAccessGalaxyBackend delegates to _historian.ReadAtTimeAsync (ported in PR 5, exposed now) — request timestamp array is flow-encoded as Unix ms to avoid MessagePack DateTime quirks then re-hydrated to DateTime on the Host side. Per-sample mapping uses the same ToWire(HistorianSample) helper as ReadRawAsync so the category→StatusCode mapping stays consistent (Quality byte 192+ → Good 0u, 64-191 → Uncertain, 0-63 → Bad 0x80000000u). Guards: null historian → "Historian disabled" (symmetric with other history paths); empty timestamp array short-circuits to Success=true, Values=[] without an SDK round-trip; SDK exception → Success=false with the message chained. Proxy-side IHistoryProvider.ReadAtTimeAsync capability doesn't exist in Core.Abstractions yet (OPC UA HistoryReadAtTime service is supported but the current IHistoryProvider only has ReadRawAsync + ReadProcessedAsync) — this PR adds the Host-side surface so a future Core.Abstractions extension can wire it through without needing another IPC change. Tests (4 new): disabled-error when historian null, empty-timestamp short-circuit without SDK call, Unix-ms↔DateTime round-trip with Good samples at two distinct timestamps, missing sample (Quality=0) maps to 0x80000000u Bad category. Galaxy.Host.Tests Unit suite: 31 pass / 0 fail (4 new at-time + 27 pre-existing). Galaxy.Host builds clean. Branches off v2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:03:25 -04:00
d2ebb91cb1 Merge pull request 'Phase 2 PR 9 — thread IsAlarm discovery flag end-to-end' (#8) from phase-2-pr9-alarms into v2 2026-04-18 06:59:25 -04:00
90ce0af375 Merge pull request 'Phase 2 PR 8 — gateway-level host-status push from MxAccessGalaxyBackend' (#7) from phase-2-pr8-alarms-hoststatus into v2 2026-04-18 06:59:04 -04:00
e250356e2a Merge pull request 'Phase 2 PR 7 — wire IHistoryProvider.ReadProcessedAsync end-to-end' (#6) from phase-2-pr7-history-processed into v2 2026-04-18 06:59:02 -04:00
067ad78e06 Merge pull request 'Phase 2 PR 6 — close PR 4 monitor-loop low findings (probe leak + replay signal)' (#5) from phase-2-pr6-monitor-findings into v2 2026-04-18 06:57:57 -04:00
6cfa8d326d Merge pull request 'Phase 2 PR 4 — close 4 open MXAccess findings (push frames + reconnect + write-await + read-cancel)' (#3) from phase-2-pr4-findings into v2 2026-04-18 06:57:21 -04:00
Joseph Doherty
30ece6e22c Phase 2 PR 8 — wire gateway-level host-status push from MxAccessGalaxyBackend. PR 4 built the IPC infrastructure for OnHostStatusChanged (MessageKind.RuntimeStatusChange frame + ConnectionSink forwarding through FrameWriter) but no backend actually raised the event; the #pragma warning disable CS0067 around MxAccessGalaxyBackend.OnHostStatusChanged declared the event for interface symmetry while acknowledging the wire-up was Phase 2 follow-up. This PR closes the gateway-level signal: MxAccessClient.ConnectionStateChanged (already raised on false→true Register and true→false Unregister transitions, including the reconnect path in MonitorLoopAsync) now drives OnHostStatusChanged with a synthetic HostConnectivityStatus tagged HostName=MxAccessClient.ClientName, RuntimeStatus="Running" on reconnect + "Stopped" on disconnect, LastObservedUtcUnixMs set to the transition moment. The Admin UI's existing IHostConnectivityProbe subscriber on GalaxyProxyDriver (HostStatusChangedEventArgs) already handles the full translation — OnHostConnectivityUpdate parses "Running"/"Stopped"/"Faulted" into the Core.Abstractions HostState enum and fires OnHostStatusChanged downstream, so this single backend-side event wire-up produces an end-to-end signal with no further Proxy changes required. Per-platform and per-AppEngine ScanState probing (the 472 LOC GalaxyRuntimeProbeManager state machine in v1 that advises <Host>.ScanState on every deployed $WinPlatform + $AppEngine gobject, tracks Unknown → Running → Stopped transitions, handles the on-change-only delivery quirk of ScanState, and surfaces IsHostStopped(gobjectId) for the node manager's Read path to short-circuit on-demand reads against known-stopped runtimes) remains deferred to a follow-up PR — the gateway-level signal gives operators the top-level transport-health rung of the status ladder, which is what matters when the Galaxy COM proxy itself goes down (vs a specific platform going down). MxAccessClient.ClientName property exposes the previously-private _clientName field so the backend can tag its pushes with a stable gateway identity — operators configure this via OTOPCUA_GALAXY_CLIENT_NAME env var (default "OtOpcUa-Galaxy.Host" per Program.cs). MxAccessGalaxyBackend constructor subscribes the new _onConnectionStateChanged field before returning + Dispose unsubscribes it via _mx.ConnectionStateChanged -= _onConnectionStateChanged to prevent the backend's own dispose from leaving a dangling handler on the MxAccessClient (same shape as MxAccessClient.SubscriptionReplayFailed PR 6 dispose discipline). #pragma warning disable CS0067 removed from around OnHostStatusChanged since the event is now raised; the directive is narrowed to cover only OnAlarmEvent which stays unraised pending the alarm subsystem port (PR 9 candidate). Tests — HostStatusPushTests (new, 2 cases): ConnectionStateChanged_raises_OnHostStatusChanged_with_gateway_name fires mx.ConnectAsync → mx.DisconnectAsync and asserts two notifications in order with HostName="GatewayClient" (the clientName passed to MxAccessClient ctor), RuntimeStatus="Running" then "Stopped", LastObservedUtcUnixMs > 0; Dispose_unsubscribes_so_post_dispose_state_changes_do_not_fire_events asserts that after backend.Dispose() a subsequent mx.DisconnectAsync does not bump the count on a registered OnHostStatusChanged handler — guards against the subscription-leak regression where a lingering backend instance would accumulate cross-reconnect notifications for a dead writer. Host.Tests csproj gains a Reference to lib/ArchestrA.MxAccess.dll (identical to the reference PR 6 adds — conflict-free cherry-pick/merge since both PRs stage the same <Reference> node; git will collapse to one when either lands first). Full Galaxy.Host.Tests Unit suite: 26 pass / 0 fail (2 new host-status + 9 PR5 historian + 15 pre-existing PostMortemMmf/RecyclePolicy/StaPump/MemoryWatchdog/EndToEndIpc/Handshake). Galaxy.Host builds clean (0 errors, 0 warnings). Branch base — PR 8 is on phase-2-pr5-historian rather than phase-2-pr4-findings because the constructor path on MxAccessGalaxyBackend gained a new historian parameter in PR 5 and the Dispose implementation needs to coordinate the two unsubscribes; targeting the earlier base would leave a trivial conflict on Dispose.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 06:03:16 -04:00
Joseph Doherty
3717405aa6 Phase 2 PR 7 — wire IHistoryProvider.ReadProcessedAsync end-to-end. PR 5 ported HistorianDataSource.ReadAggregateAsync into Galaxy.Host but left it internal — GalaxyProxyDriver.ReadProcessedAsync still threw NotSupportedException, so OPC UA clients issuing HistoryReadProcessed requests against the v2 topology got rejected at the driver boundary. This PR closes that gap by adding two new Shared.Contracts messages (HistoryReadProcessedRequest/Response, MessageKind 0x62/0x63), routing them through GalaxyFrameHandler, implementing HistoryReadProcessedAsync on all three IGalaxyBackend implementations (Stub/DbBacked return the canonical "pending" Success=false, MxAccessGalaxyBackend delegates to _historian.ReadAggregateAsync), mapping HistorianAggregateSample → GalaxyDataValue at the IPC boundary (null bucket Value → BadNoData 0x800E0000u, otherwise Good 0u), and flipping GalaxyProxyDriver.ReadProcessedAsync from the NotSupported throw to a real IPC call with OPC UA HistoryAggregateType enum mapped to Wonderware AnalogSummary column name on the Proxy side (Average → "Average", Minimum → "Minimum", Maximum → "Maximum", Count → "ValueCount", Total → NotSupported since there's no direct SDK column for sum). Decision #13 IPC data-shape stays intact — HistoryReadProcessedResponse carries GalaxyDataValue[] with the same MessagePack value + OPC UA StatusCode + timestamps shape as the other history responses, so the Proxy's existing ToSnapshot helper handles the conversion without a new code path. MxAccessGalaxyBackend.HistoryReadProcessedAsync guards: null historian → "Historian disabled" (symmetric with HistoryReadAsync); IntervalMs <= 0 → "HistoryReadProcessed requires IntervalMs > 0" (prevents division-by-zero inside the SDK's Resolution parameter); exception during SDK call → Success=false Values=[] with the message so the Proxy surfaces it as InvalidOperationException with a clean error chain. Tests — HistoryReadProcessedTests (new, 4 cases): disabled-error when historian null, rejects zero interval, maps Good sample with Value=12.34 and the Proxy-supplied AggregateColumn + IntervalMs flow unchanged through to the fake IHistorianDataSource, maps null Value bucket to 0x800E0000u BadNoData with null ValueBytes. AggregateColumnMappingTests (new, 5 cases in Proxy.Tests): theory covers all 4 supported HistoryAggregateType enum values → correct column string, and asserts Total throws NotSupportedException with a message that steers callers to Average/Minimum/Maximum/Count (the SDK's AnalogSummaryQueryResult doesn't expose a sum column — the closest is Average × ValueCount which is the responsibility of a caller-side aggregation rather than an extra IPC round-trip). InternalsVisibleTo added to Galaxy.Proxy csproj so Proxy.Tests can reach the internal MapAggregateToColumn static. Builds — Galaxy.Host (net48 x86) + Galaxy.Proxy (net10) both 0 errors, full solution 201 warnings (pre-existing) / 0 errors. Test counts — Host.Tests Unit suite: 28 pass (4 new processed + 9 PR5 historian + 15 pre-existing); Proxy.Tests Unit suite: 14 pass (5 new column-mapping + 9 pre-existing). Deferred to a later PR — ReadAtTime + ReadEvents + Health IPC surfaces (HistorianDataSource has them ported in PR 5 but they need additional contract messages and would push this PR past a comfortable review size); the alarm subsystem wire-up (OnAlarmEvent raising from MxAccessGalaxyBackend) which overlaps the ReadEventsAsync IPC work since both pull from HistorianAccess.CreateEventQuery on the SDK side; the Proxy-side quality-byte refinement where HistorianDataSource's per-sample raw quality byte gets decoded through the existing QualityMapper instead of the category-only mapping in ToWire(HistorianSample) — doesn't change correctness today since Good/Uncertain/Bad categories are all the Admin UI and OPC UA clients surface, but richer OPC DA status codes (BadNotConnected, UncertainSubNormal, etc.) are available on the wire and the Proxy could promote them before handing DataValueSnapshot to ISubscribable consumers. This PR branches off phase-2-pr5-historian because it directly extends the Historian IPC surface added there; if PR 5 merges first PR 7 fast-forwards, otherwise it needs a rebase after PR 5 lands.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 05:53:01 -04:00
Joseph Doherty
1c2bf74d38 Phase 2 PR 6 — close the 2 low findings carried forward from PR 4. Low finding #1 ($Heartbeat probe handle leak in MonitorLoopAsync): the probe calls _proxy.AddItem(connectionHandle, "$Heartbeat") on every monitor tick that observes the connection is past StaleThreshold, but previously discarded the returned item handle — so every probe (one per MonitorInterval, default 5s) leaked one item handle into the MXAccess proxy's internal handle table. Fix: capture the item handle, call RemoveItem(connectionHandle, probeHandle) in the InvokeAsync's finally block so it runs on the same pump turn as the AddItem, best-effort RemoveItem swallow so a dying proxy doesn't throw secondary exceptions out of the probe path. Probe ok becomes probeHandle > 0 so any AddItem that returns 0 (MXAccess's "could not create") counts as a failed probe, matching v1 behavior. Low finding #2 (subscription replay silently swallowed per-tag failures): after a reconnect, the replay loop iterates the pre-reconnect subscription snapshot and calls SubscribeOnPumpAsync for each; previously those failures went into a bare catch { /* skip */ } so an operator had no signal when specific tags failed to re-subscribe — the first indication downstream was a quality drop on OPC UA clients. Fix: new SubscriptionReplayFailedEventArgs (TagReference + Exception) + SubscriptionReplayFailed event on MxAccessClient that fires once per tag that fails to re-subscribe, Log.Warning per failure with the reconnect counter + tag reference, and a summary log line at the end of the replay loop ("{failed} of {total} failed" or "{total} re-subscribed cleanly"). Serilog using + ILogger Log = Serilog.Log.ForContext<MxAccessClient>() added. Tests — MxAccessClientMonitorLoopTests (new file, 2 cases): Heartbeat_probe_calls_RemoveItem_for_every_AddItem constructs a CountingProxy IMxProxy that tracks AddItem/RemoveItem pair counts scoped to the "$Heartbeat" address, runs the client with MonitorInterval=150ms + StaleThreshold=50ms for 700ms, asserts HeartbeatAddCount > 1, HeartbeatAddCount == HeartbeatRemoveCount, OutstandingHeartbeatHandles == 0 after dispose; SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay uses a ReplayFailingProxy that throws on the next $Heartbeat probe (to trigger the reconnect path) and throws on the replay-time AddItem for specified tag names ("BadTag.A", "BadTag.B"), subscribes GoodTag.X + BadTag.A + BadTag.B before triggering probe failure, collects SubscriptionReplayFailed args into a ConcurrentBag, asserts exactly 2 events fired and both bad tags are represented — GoodTag.X replays cleanly so it does not fire. Host.Tests csproj gains a Reference to lib/ArchestrA.MxAccess.dll because IMxProxy's MxDataChangeHandler delegate signature mentions MXSTATUS_PROXY and the compiler resolves all delegate parameter types when a test class implements the interface, even if the test code never names the type. No regressions: full Galaxy.Host.Tests Unit suite 26 pass / 0 fail (2 new monitor-loop tests + 9 PR5 historian + 15 pre-existing PostMortemMmf/RecyclePolicy/StaPump/MemoryWatchdog/EndToEndIpc/Handshake). Galaxy.Host builds clean (0 errors, 0 warnings) — the new Serilog.Log.ForContext usage picks up the existing Serilog package ref that PR 4 pulled in for the monitor-loop infrastructure. Both findings were flagged as non-blocking for PR 4 merge and are now resolved alongside whichever merge order the reviewer picks; this PR branches off phase-2-pr4-findings so it can rebase cleanly if PR 4 lands first or be re-based onto master after PR 4 merges.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 02:06:15 -04:00
24 changed files with 2370 additions and 23 deletions

View File

@@ -0,0 +1,260 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Alarms;
/// <summary>
/// Subscribes to the four Galaxy alarm attributes (<c>.InAlarm</c>, <c>.Priority</c>,
/// <c>.DescAttrName</c>, <c>.Acked</c>) per alarm-bearing attribute discovered during
/// <c>DiscoverAsync</c>. Maintains one <see cref="AlarmState"/> per alarm, raises
/// <see cref="AlarmTransition"/> on lifecycle transitions (Active / Unacknowledged /
/// Acknowledged / Inactive). Ack path writes <c>.AckMsg</c>. Pure-logic state machine
/// with delegate-based subscribe/write so it's testable against in-memory fakes.
/// </summary>
/// <remarks>
/// Transitions emitted (OPC UA Part 9 alarm lifecycle, simplified for the Galaxy model):
/// <list type="bullet">
/// <item><c>Active</c> — InAlarm false → true. Default to Unacknowledged.</item>
/// <item><c>Acknowledged</c> — Acked false → true while InAlarm is still true.</item>
/// <item><c>Inactive</c> — InAlarm true → false. If still unacknowledged the alarm
/// is marked latched-inactive-unack; next Ack transitions straight to Inactive.</item>
/// </list>
/// </remarks>
public sealed class GalaxyAlarmTracker : IDisposable
{
public const string InAlarmAttr = ".InAlarm";
public const string PriorityAttr = ".Priority";
public const string DescAttrNameAttr = ".DescAttrName";
public const string AckedAttr = ".Acked";
public const string AckMsgAttr = ".AckMsg";
private readonly Func<string, Action<string, Vtq>, Task> _subscribe;
private readonly Func<string, Task> _unsubscribe;
private readonly Func<string, object, Task<bool>> _write;
private readonly Func<DateTime> _clock;
// Alarm tag (attribute full ref, e.g. "Tank.Level.HiHi") → state.
private readonly ConcurrentDictionary<string, AlarmState> _alarms =
new(StringComparer.OrdinalIgnoreCase);
// Reverse lookup: probed tag (".InAlarm" etc.) → owning alarm tag.
private readonly ConcurrentDictionary<string, (string AlarmTag, AlarmField Field)> _probeToAlarm =
new(StringComparer.OrdinalIgnoreCase);
private bool _disposed;
public event EventHandler<AlarmTransition>? TransitionRaised;
public GalaxyAlarmTracker(
Func<string, Action<string, Vtq>, Task> subscribe,
Func<string, Task> unsubscribe,
Func<string, object, Task<bool>> write)
: this(subscribe, unsubscribe, write, () => DateTime.UtcNow) { }
internal GalaxyAlarmTracker(
Func<string, Action<string, Vtq>, Task> subscribe,
Func<string, Task> unsubscribe,
Func<string, object, Task<bool>> write,
Func<DateTime> clock)
{
_subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe));
_unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe));
_write = write ?? throw new ArgumentNullException(nameof(write));
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
}
public int TrackedAlarmCount => _alarms.Count;
/// <summary>
/// Advise the four alarm attributes for <paramref name="alarmTag"/>. Idempotent —
/// repeat calls for the same alarm tag are a no-op. Subscribe failure for any of the
/// four rolls back the alarm entry so a stale callback cannot promote a phantom.
/// </summary>
public async Task TrackAsync(string alarmTag)
{
if (_disposed || string.IsNullOrWhiteSpace(alarmTag)) return;
if (_alarms.ContainsKey(alarmTag)) return;
var state = new AlarmState { AlarmTag = alarmTag };
if (!_alarms.TryAdd(alarmTag, state)) return;
var probes = new[]
{
(Tag: alarmTag + InAlarmAttr, Field: AlarmField.InAlarm),
(Tag: alarmTag + PriorityAttr, Field: AlarmField.Priority),
(Tag: alarmTag + DescAttrNameAttr, Field: AlarmField.DescAttrName),
(Tag: alarmTag + AckedAttr, Field: AlarmField.Acked),
};
foreach (var p in probes)
{
_probeToAlarm[p.Tag] = (alarmTag, p.Field);
}
try
{
foreach (var p in probes)
{
await _subscribe(p.Tag, OnProbeCallback).ConfigureAwait(false);
}
}
catch
{
// Rollback so a partial advise doesn't leak state.
_alarms.TryRemove(alarmTag, out _);
foreach (var p in probes)
{
_probeToAlarm.TryRemove(p.Tag, out _);
try { await _unsubscribe(p.Tag).ConfigureAwait(false); } catch { }
}
throw;
}
}
/// <summary>
/// Drop every tracked alarm. Unadvises all 4 probes per alarm as best-effort.
/// </summary>
public async Task ClearAsync()
{
_alarms.Clear();
foreach (var kv in _probeToAlarm.ToList())
{
_probeToAlarm.TryRemove(kv.Key, out _);
try { await _unsubscribe(kv.Key).ConfigureAwait(false); } catch { }
}
}
/// <summary>
/// Operator ack — write the comment text into <c>&lt;alarmTag&gt;.AckMsg</c>.
/// Returns false when the runtime reports the write failed.
/// </summary>
public Task<bool> AcknowledgeAsync(string alarmTag, string comment)
{
if (_disposed || string.IsNullOrWhiteSpace(alarmTag))
return Task.FromResult(false);
return _write(alarmTag + AckMsgAttr, comment ?? string.Empty);
}
/// <summary>
/// Subscription callback entry point. Exposed for tests and for the Backend to route
/// fan-out callbacks through. Runs the state machine and fires TransitionRaised
/// outside the lock.
/// </summary>
public void OnProbeCallback(string probeTag, Vtq vtq)
{
if (_disposed) return;
if (!_probeToAlarm.TryGetValue(probeTag, out var link)) return;
if (!_alarms.TryGetValue(link.AlarmTag, out var state)) return;
AlarmTransition? transition = null;
var now = _clock();
lock (state.Lock)
{
switch (link.Field)
{
case AlarmField.InAlarm:
{
var wasActive = state.InAlarm;
var isActive = vtq.Value is bool b && b;
state.InAlarm = isActive;
state.LastUpdateUtc = now;
if (!wasActive && isActive)
{
state.Acked = false;
state.LastTransitionUtc = now;
transition = new AlarmTransition(state.AlarmTag, AlarmStateTransition.Active, state.Priority, state.DescAttrName, now);
}
else if (wasActive && !isActive)
{
state.LastTransitionUtc = now;
transition = new AlarmTransition(state.AlarmTag, AlarmStateTransition.Inactive, state.Priority, state.DescAttrName, now);
}
break;
}
case AlarmField.Priority:
if (vtq.Value is int pi) state.Priority = pi;
else if (vtq.Value is short ps) state.Priority = ps;
else if (vtq.Value is long pl && pl <= int.MaxValue) state.Priority = (int)pl;
state.LastUpdateUtc = now;
break;
case AlarmField.DescAttrName:
state.DescAttrName = vtq.Value as string;
state.LastUpdateUtc = now;
break;
case AlarmField.Acked:
{
var wasAcked = state.Acked;
var isAcked = vtq.Value is bool b && b;
state.Acked = isAcked;
state.LastUpdateUtc = now;
// Fire Acknowledged only when transitioning false→true. Don't fire on initial
// subscribe callback (wasAcked==isAcked in that case because the state starts
// with Acked=false and the initial probe is usually true for an un-active alarm).
if (!wasAcked && isAcked && state.InAlarm)
{
state.LastTransitionUtc = now;
transition = new AlarmTransition(state.AlarmTag, AlarmStateTransition.Acknowledged, state.Priority, state.DescAttrName, now);
}
break;
}
}
}
if (transition is { } t)
{
TransitionRaised?.Invoke(this, t);
}
}
public IReadOnlyList<AlarmSnapshot> SnapshotStates()
{
return _alarms.Values.Select(s =>
{
lock (s.Lock)
return new AlarmSnapshot(s.AlarmTag, s.InAlarm, s.Acked, s.Priority, s.DescAttrName);
}).ToList();
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_alarms.Clear();
_probeToAlarm.Clear();
}
private sealed class AlarmState
{
public readonly object Lock = new();
public string AlarmTag = "";
public bool InAlarm;
public bool Acked = true; // default ack'd so first false→true on subscribe doesn't misfire
public int Priority;
public string? DescAttrName;
public DateTime LastUpdateUtc;
public DateTime LastTransitionUtc;
}
private enum AlarmField { InAlarm, Priority, DescAttrName, Acked }
}
public enum AlarmStateTransition { Active, Acknowledged, Inactive }
public sealed record AlarmTransition(
string AlarmTag,
AlarmStateTransition Transition,
int Priority,
string? DescAttrName,
DateTime AtUtc);
public sealed record AlarmSnapshot(
string AlarmTag,
bool InAlarm,
bool Acked,
int Priority,
string? DescAttrName);

View File

@@ -127,6 +127,33 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
Tags = System.Array.Empty<HistoryTagValues>(),
});
public Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadProcessedResponse
{
Success = false,
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadAtTimeResponse
{
Success = false,
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
HistoryReadEventsRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadEventsResponse
{
Success = false,
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });

View File

@@ -0,0 +1,46 @@
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
/// <summary>
/// Maps a raw OPC DA quality byte (as returned by Wonderware Historian's <c>OpcQuality</c>)
/// to an OPC UA <c>StatusCode</c> uint. Preserves specific codes (BadNotConnected,
/// UncertainSubNormal, etc.) instead of collapsing to Good/Uncertain/Bad categories.
/// Mirrors v1 <c>QualityMapper.MapToOpcUaStatusCode</c> without pulling in OPC UA types —
/// the returned value is the 32-bit OPC UA <c>StatusCode</c> wire encoding that the Proxy
/// surfaces directly as <c>DataValueSnapshot.StatusCode</c>.
/// </summary>
public static class HistorianQualityMapper
{
/// <summary>
/// Map an 8-bit OPC DA quality byte to the corresponding OPC UA StatusCode. The byte
/// family bits decide the category (Good &gt;= 192, Uncertain 64-191, Bad 0-63); the
/// low-nibble subcode selects the specific code.
/// </summary>
public static uint Map(byte q) => q switch
{
// Good family (192+)
192 => 0x00000000u, // Good
216 => 0x00D80000u, // Good_LocalOverride
// Uncertain family (64-191)
64 => 0x40000000u, // Uncertain
68 => 0x40900000u, // Uncertain_LastUsableValue
80 => 0x40930000u, // Uncertain_SensorNotAccurate
84 => 0x40940000u, // Uncertain_EngineeringUnitsExceeded
88 => 0x40950000u, // Uncertain_SubNormal
// Bad family (0-63)
0 => 0x80000000u, // Bad
4 => 0x80890000u, // Bad_ConfigurationError
8 => 0x808A0000u, // Bad_NotConnected
12 => 0x808B0000u, // Bad_DeviceFailure
16 => 0x808C0000u, // Bad_SensorFailure
20 => 0x80050000u, // Bad_CommunicationError
24 => 0x808D0000u, // Bad_OutOfService
32 => 0x80320000u, // Bad_WaitingForInitialData
// Unknown code — fall back to the category so callers still get a sensible bucket.
_ when q >= 192 => 0x00000000u,
_ when q >= 64 => 0x40000000u,
_ => 0x80000000u,
};
}

View File

@@ -38,6 +38,9 @@ public interface IGalaxyBackend
Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct);
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(HistoryReadAtTimeRequest req, CancellationToken ct);
Task<HistoryReadEventsResponse> HistoryReadEventsAsync(HistoryReadEventsRequest req, CancellationToken ct);
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
}

View File

@@ -4,6 +4,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
@@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
/// </summary>
public sealed class MxAccessClient : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<MxAccessClient>();
private readonly StaPump _pump;
private readonly IMxProxy _proxy;
private readonly string _clientName;
@@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
public event EventHandler<bool>? ConnectionStateChanged;
/// <summary>
/// Fires once per failed subscription replay after a reconnect. Carries the tag reference
/// and the exception so the backend can propagate the degradation signal (e.g. mark the
/// subscription bad on the Proxy side rather than silently losing its callback). Added for
/// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an
/// operator would only find out that a specific subscription stopped updating through a
/// data-quality complaint from downstream.
/// </summary>
public event EventHandler<SubscriptionReplayFailedEventArgs>? SubscriptionReplayFailed;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
{
_pump = pump;
@@ -54,6 +67,13 @@ public sealed class MxAccessClient : IDisposable
public int SubscriptionCount => _subscriptions.Count;
public int ReconnectCount => _reconnectCount;
/// <summary>
/// Wonderware client identity used when registering with the LMXProxyServer. Surfaced so
/// <see cref="Backend.MxAccessGalaxyBackend"/> can tag its <c>OnHostStatusChanged</c> IPC
/// pushes with a stable gateway name per PR 8.
/// </summary>
public string ClientName => _clientName;
/// <summary>Connects on the STA thread. Idempotent. Starts the reconnect monitor on first call.</summary>
public async Task<int> ConnectAsync()
{
@@ -117,16 +137,29 @@ public sealed class MxAccessClient : IDisposable
if (idle <= _options.StaleThreshold) continue;
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
// our reconnect signal.
// our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item
// handle; we must RemoveItem it on the same pump turn or the long-running monitor
// leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely).
bool probeOk;
try
{
probeOk = await _pump.InvokeAsync(() =>
{
// AddItem on the connection handle is cheap and round-trips through COM.
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
int probeHandle = 0;
try
{
probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat");
return probeHandle > 0;
}
catch { return false; }
finally
{
if (probeHandle > 0)
{
try { _proxy.RemoveItem(_connectionHandle, probeHandle); }
catch { /* proxy is dying; best-effort cleanup */ }
}
}
});
}
catch { probeOk = false; }
@@ -155,16 +188,33 @@ public sealed class MxAccessClient : IDisposable
_reconnectCount++;
ConnectionStateChanged?.Invoke(this, true);
// Replay every subscription that was active before the disconnect.
// Replay every subscription that was active before the disconnect. PR 6 low
// finding #2: surface per-tag failures — log them and raise
// SubscriptionReplayFailed so the backend can propagate the degraded state
// (previously swallowed silently; downstream quality dropped without a signal).
var snapshot = _addressToHandle.Keys.ToArray();
_addressToHandle.Clear();
_handleToAddress.Clear();
var failed = 0;
foreach (var fullRef in snapshot)
{
try { await SubscribeOnPumpAsync(fullRef); }
catch { /* skip — operator can re-subscribe */ }
catch (Exception subEx)
{
failed++;
Log.Warning(subEx,
"MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}",
fullRef, _reconnectCount);
SubscriptionReplayFailed?.Invoke(this,
new SubscriptionReplayFailedEventArgs(fullRef, subEx));
}
}
if (failed > 0)
Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length);
else
Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length);
_lastObservedActivityUtc = DateTime.UtcNow;
}
catch

View File

@@ -0,0 +1,20 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
/// <summary>
/// Fired by <see cref="MxAccessClient.SubscriptionReplayFailed"/> when a previously-active
/// subscription fails to be restored after a reconnect. The backend should treat the tag as
/// unhealthy until the next successful resubscribe.
/// </summary>
public sealed class SubscriptionReplayFailedEventArgs : EventArgs
{
public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception)
{
TagReference = tagReference;
Exception = exception;
}
public string TagReference { get; }
public Exception Exception { get; }
}

View File

@@ -4,9 +4,11 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Alarms;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
@@ -34,18 +36,99 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
_refToSubs = new(System.StringComparer.OrdinalIgnoreCase);
public event System.EventHandler<OnDataChangeNotification>? OnDataChange;
#pragma warning disable CS0067 // event not yet raised — alarm + host-status wire-up in PR #4 follow-up
public event System.EventHandler<GalaxyAlarmEvent>? OnAlarmEvent;
public event System.EventHandler<HostConnectivityStatus>? OnHostStatusChanged;
#pragma warning restore CS0067
private readonly System.EventHandler<bool> _onConnectionStateChanged;
private readonly GalaxyRuntimeProbeManager _probeManager;
private readonly System.EventHandler<HostStateTransition> _onProbeStateChanged;
private readonly GalaxyAlarmTracker _alarmTracker;
private readonly System.EventHandler<AlarmTransition> _onAlarmTransition;
// Cached during DiscoverAsync so SubscribeAlarmsAsync knows which attributes to advise.
// One entry per IsAlarm=true attribute in the last discovered hierarchy.
private readonly System.Collections.Concurrent.ConcurrentBag<string> _discoveredAlarmTags = new();
public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null)
{
_repository = repository;
_mx = mx;
_historian = historian;
// PR 8: gateway-level host-status push. When the MXAccess COM proxy transitions
// connected↔disconnected, raise OnHostStatusChanged with a synthetic host entry named
// after the Wonderware client identity so the Admin UI surfaces top-level transport
// health even before per-platform/per-engine probing lands (deferred to a later PR that
// ports v1's GalaxyRuntimeProbeManager with ScanState subscriptions).
_onConnectionStateChanged = (_, connected) =>
{
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
{
HostName = _mx.ClientName,
RuntimeStatus = connected ? "Running" : "Stopped",
LastObservedUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
});
};
_mx.ConnectionStateChanged += _onConnectionStateChanged;
// PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback,
// which runs the state machine and raises StateChanged on transitions we care about.
// We forward each transition through the same OnHostStatusChanged IPC event that the
// gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the
// Admin UI can show per-host health independently from the top-level transport status.
_probeManager = new GalaxyRuntimeProbeManager(
subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb),
unsubscribe: probe => _mx.UnsubscribeAsync(probe));
_onProbeStateChanged = (_, t) =>
{
OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus
{
HostName = t.TagName,
RuntimeStatus = t.NewState switch
{
HostRuntimeState.Running => "Running",
HostRuntimeState.Stopped => "Stopped",
_ => "Unknown",
},
LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
});
};
_probeManager.StateChanged += _onProbeStateChanged;
// PR 14: alarm subsystem. Per IsAlarm=true attribute discovered, subscribe to the four
// alarm-state attributes (.InAlarm/.Priority/.DescAttrName/.Acked), track lifecycle,
// and raise GalaxyAlarmEvent on transitions — forwarded through the existing
// OnAlarmEvent IPC event that the PR 4 ConnectionSink already wires into AlarmEvent frames.
_alarmTracker = new GalaxyAlarmTracker(
subscribe: (tag, cb) => _mx.SubscribeAsync(tag, cb),
unsubscribe: tag => _mx.UnsubscribeAsync(tag),
write: (tag, v) => _mx.WriteAsync(tag, v));
_onAlarmTransition = (_, t) => OnAlarmEvent?.Invoke(this, new GalaxyAlarmEvent
{
EventId = Guid.NewGuid().ToString("N"),
ObjectTagName = t.AlarmTag,
AlarmName = t.AlarmTag,
Severity = t.Priority,
StateTransition = t.Transition switch
{
AlarmStateTransition.Active => "Active",
AlarmStateTransition.Acknowledged => "Acknowledged",
AlarmStateTransition.Inactive => "Inactive",
_ => "Unknown",
},
Message = t.DescAttrName ?? t.AlarmTag,
UtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
});
_alarmTracker.TransitionRaised += _onAlarmTransition;
}
/// <summary>
/// Exposed for tests. Production flow: DiscoverAsync completes → backend calls
/// <c>SyncProbesAsync</c> with the runtime hosts (WinPlatform + AppEngine gobjects) to
/// advise ScanState per host.
/// </summary>
internal GalaxyRuntimeProbeManager ProbeManager => _probeManager;
public async Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest req, CancellationToken ct)
{
try
@@ -85,6 +168,34 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty<GalaxyAttributeInfo>(),
}).ToArray();
// PR 14: cache alarm-bearing attribute full refs so SubscribeAlarmsAsync can advise
// them on demand. Format matches the Galaxy reference grammar <tag>.<attr>.
var freshAlarmTags = attributes
.Where(a => a.IsAlarm)
.Select(a => nameByGobject.TryGetValue(a.GobjectId, out var tn)
? tn + "." + a.AttributeName
: null)
.Where(s => !string.IsNullOrWhiteSpace(s))
.Cast<string>()
.ToArray();
while (_discoveredAlarmTags.TryTake(out _)) { }
foreach (var t in freshAlarmTags) _discoveredAlarmTags.Add(t);
// PR 13: Sync the per-platform probe manager against the just-discovered hierarchy
// so ScanState subscriptions track the current runtime set. Best-effort — probe
// failures don't block Discover from returning, since the gateway-level signal from
// MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to
// that level if any per-host probe couldn't advise.
try
{
var targets = hierarchy
.Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|| o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine)
.Select(o => new HostProbeTarget(o.TagName, o.CategoryId));
await _probeManager.SyncAsync(targets).ConfigureAwait(false);
}
catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ }
return new DiscoverHierarchyResponse { Success = true, Objects = objects };
}
catch (Exception ex)
@@ -222,8 +333,40 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
}
}
public Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct) => Task.CompletedTask;
public Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct) => Task.CompletedTask;
/// <summary>
/// PR 14: advise every alarm-bearing attribute's 4-attr quartet. Best-effort per-alarm —
/// a subscribe failure on one alarm doesn't abort the whole call, since operators prefer
/// partial alarm coverage to none. Idempotent on repeat calls (tracker internally
/// skips already-tracked alarms).
/// </summary>
public async Task SubscribeAlarmsAsync(AlarmSubscribeRequest req, CancellationToken ct)
{
foreach (var tag in _discoveredAlarmTags)
{
try { await _alarmTracker.TrackAsync(tag).ConfigureAwait(false); }
catch { /* swallow per-alarm — tracker rolls back its own state on failure */ }
}
}
/// <summary>
/// PR 14: route operator ack through the tracker's AckMsg write path. EventId on the
/// incoming request maps directly to the alarm full reference (Proxy-side naming
/// convention from GalaxyProxyDriver.RaiseAlarmEvent → ev.EventId).
/// </summary>
public async Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct)
{
// EventId carries a per-transition Guid.ToString("N"); there's no reverse map from
// event id to alarm tag yet, so v1's convention (ack targets the condition) is matched
// by reading the alarm name from the Comment envelope: v1 packed "<tag>|<comment>".
// Until the Proxy is updated to send the alarm tag separately, fall back to treating
// the EventId as the alarm tag — Client CLI passes it through unchanged.
var tag = req.EventId;
if (!string.IsNullOrWhiteSpace(tag))
{
try { await _alarmTracker.AcknowledgeAsync(tag, req.Comment ?? string.Empty).ConfigureAwait(false); }
catch { /* swallow — ack failures surface via MxAccessClient.WriteAsync logs */ }
}
}
public async Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct)
{
@@ -264,10 +407,136 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
}
}
public async Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadProcessedResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Values = Array.Empty<GalaxyDataValue>(),
};
if (req.IntervalMs <= 0)
return new HistoryReadProcessedResponse
{
Success = false,
Error = "HistoryReadProcessed requires IntervalMs > 0",
Values = Array.Empty<GalaxyDataValue>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
try
{
var samples = await _historian.ReadAggregateAsync(
req.TagReference, start, end, req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false);
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
return new HistoryReadProcessedResponse { Success = true, Values = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadProcessedResponse
{
Success = false,
Error = $"Historian aggregate read failed: {ex.Message}",
Values = Array.Empty<GalaxyDataValue>(),
};
}
}
public async Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadAtTimeResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Values = Array.Empty<GalaxyDataValue>(),
};
if (req.TimestampsUtcUnixMs.Length == 0)
return new HistoryReadAtTimeResponse { Success = true, Values = Array.Empty<GalaxyDataValue>() };
var timestamps = req.TimestampsUtcUnixMs
.Select(ms => DateTimeOffset.FromUnixTimeMilliseconds(ms).UtcDateTime)
.ToArray();
try
{
var samples = await _historian.ReadAtTimeAsync(req.TagReference, timestamps, ct).ConfigureAwait(false);
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
return new HistoryReadAtTimeResponse { Success = true, Values = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadAtTimeResponse
{
Success = false,
Error = $"Historian at-time read failed: {ex.Message}",
Values = Array.Empty<GalaxyDataValue>(),
};
}
}
public async Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
HistoryReadEventsRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadEventsResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Events = Array.Empty<GalaxyHistoricalEvent>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
try
{
var events = await _historian.ReadEventsAsync(req.SourceName, start, end, req.MaxEvents, ct).ConfigureAwait(false);
var wire = events.Select(e => new GalaxyHistoricalEvent
{
EventId = e.Id.ToString(),
SourceName = e.Source,
EventTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.EventTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
ReceivedTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.ReceivedTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
DisplayText = e.DisplayText,
Severity = e.Severity,
}).ToArray();
return new HistoryReadEventsResponse { Success = true, Events = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadEventsResponse
{
Success = false,
Error = $"Historian event read failed: {ex.Message}",
Events = Array.Empty<GalaxyHistoricalEvent>(),
};
}
}
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
public void Dispose() => _historian?.Dispose();
public void Dispose()
{
_alarmTracker.TransitionRaised -= _onAlarmTransition;
_alarmTracker.Dispose();
_probeManager.StateChanged -= _onProbeStateChanged;
_probeManager.Dispose();
_mx.ConnectionStateChanged -= _onConnectionStateChanged;
_historian?.Dispose();
}
private static GalaxyDataValue ToWire(string reference, Vtq vtq) => new()
{
@@ -291,19 +560,26 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
TagReference = reference,
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value),
ValueMessagePackType = 0,
StatusCode = MapHistorianQualityToOpcUa(sample.Quality),
StatusCode = HistorianQualityMapper.Map(sample.Quality),
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static uint MapHistorianQualityToOpcUa(byte q)
/// <summary>
/// Maps a <see cref="HistorianAggregateSample"/> (one aggregate bucket) to the IPC wire
/// shape. A null <see cref="HistorianAggregateSample.Value"/> means the aggregate was
/// unavailable for the bucket — the Proxy translates that to OPC UA <c>BadNoData</c>.
/// </summary>
private static GalaxyDataValue ToWire(string reference, HistorianAggregateSample sample) => new()
{
// Category-only mapping — mirrors QualityMapper.MapToOpcUaStatusCode for the common ranges.
// The Proxy may refine this when it decodes the wire frame.
if (q >= 192) return 0x00000000u; // Good
if (q >= 64) return 0x40000000u; // Uncertain
return 0x80000000u; // Bad
}
TagReference = reference,
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value.Value),
ValueMessagePackType = 0,
StatusCode = sample.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u,
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
{

View File

@@ -0,0 +1,273 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
/// <summary>
/// Per-platform + per-AppEngine runtime probe. Subscribes to <c>&lt;TagName&gt;.ScanState</c>
/// for each $WinPlatform and $AppEngine gobject, tracks Unknown → Running → Stopped
/// transitions, and fires <see cref="StateChanged"/> so <see cref="Backend.MxAccessGalaxyBackend"/>
/// can forward per-host events through the existing IPC <c>OnHostStatusChanged</c> event.
/// Pure-logic state machine with an injected clock so it's deterministically testable —
/// port of v1 <c>GalaxyRuntimeProbeManager</c> without the OPC UA node-manager coupling.
/// </summary>
/// <remarks>
/// State machine rules (documented in v1's <c>runtimestatus.md</c> and preserved here):
/// <list type="bullet">
/// <item><c>ScanState</c> is on-change-only — a stably-Running host may go hours without a
/// callback. Running → Stopped is driven by an explicit <c>ScanState=false</c> callback,
/// never by starvation.</item>
/// <item>Unknown → Running is a startup transition and does NOT fire StateChanged (would
/// paint every host as "just recovered" at startup, which is noise).</item>
/// <item>Stopped → Running and Running → Stopped fire StateChanged. Unknown → Stopped
/// fires StateChanged because that's a first-known-bad signal operators need.</item>
/// <item>All public methods are thread-safe. Callbacks fire outside the internal lock to
/// avoid lock inversion with caller-owned state.</item>
/// </list>
/// </remarks>
public sealed class GalaxyRuntimeProbeManager : IDisposable
{
public const int CategoryWinPlatform = 1;
public const int CategoryAppEngine = 3;
public const string ProbeAttribute = ".ScanState";
private readonly Func<DateTime> _clock;
private readonly Func<string, Action<string, Vtq>, Task> _subscribe;
private readonly Func<string, Task> _unsubscribe;
private readonly object _lock = new();
// probe tag → per-host state
private readonly Dictionary<string, HostProbeState> _byProbe = new(StringComparer.OrdinalIgnoreCase);
// tag name → probe tag (for reverse lookup on the desired-set diff)
private readonly Dictionary<string, string> _probeByTagName = new(StringComparer.OrdinalIgnoreCase);
private bool _disposed;
/// <summary>
/// Fires on every state transition that operators should react to. See class remarks
/// for the rules on which transitions fire.
/// </summary>
public event EventHandler<HostStateTransition>? StateChanged;
public GalaxyRuntimeProbeManager(
Func<string, Action<string, Vtq>, Task> subscribe,
Func<string, Task> unsubscribe)
: this(subscribe, unsubscribe, () => DateTime.UtcNow) { }
internal GalaxyRuntimeProbeManager(
Func<string, Action<string, Vtq>, Task> subscribe,
Func<string, Task> unsubscribe,
Func<DateTime> clock)
{
_subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe));
_unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe));
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
}
/// <summary>Number of probes currently advised. Test/dashboard hook.</summary>
public int ActiveProbeCount
{
get { lock (_lock) return _byProbe.Count; }
}
/// <summary>
/// Snapshot every currently-tracked host's state. One entry per probe.
/// </summary>
public IReadOnlyList<HostProbeSnapshot> SnapshotStates()
{
lock (_lock)
{
return _byProbe.Select(kv => new HostProbeSnapshot(
TagName: kv.Value.TagName,
State: kv.Value.State,
LastChangedUtc: kv.Value.LastStateChangeUtc)).ToList();
}
}
/// <summary>
/// Query the current runtime state for <paramref name="tagName"/>. Returns
/// <see cref="HostRuntimeState.Unknown"/> when the host is not tracked.
/// </summary>
public HostRuntimeState GetState(string tagName)
{
lock (_lock)
{
if (_probeByTagName.TryGetValue(tagName, out var probe)
&& _byProbe.TryGetValue(probe, out var state))
return state.State;
return HostRuntimeState.Unknown;
}
}
/// <summary>
/// Diff the desired host set (filtered $WinPlatform / $AppEngine from the latest Discover)
/// against the currently-tracked set and advise / unadvise as needed. Idempotent:
/// calling twice with the same set does nothing.
/// </summary>
public async Task SyncAsync(IEnumerable<HostProbeTarget> desiredHosts)
{
if (_disposed) return;
var desired = desiredHosts
.Where(h => !string.IsNullOrWhiteSpace(h.TagName))
.ToDictionary(h => h.TagName, StringComparer.OrdinalIgnoreCase);
List<string> toAdvise;
List<string> toUnadvise;
lock (_lock)
{
toAdvise = desired.Keys
.Where(tag => !_probeByTagName.ContainsKey(tag))
.ToList();
toUnadvise = _probeByTagName.Keys
.Where(tag => !desired.ContainsKey(tag))
.Select(tag => _probeByTagName[tag])
.ToList();
foreach (var tag in toAdvise)
{
var probe = tag + ProbeAttribute;
_probeByTagName[tag] = probe;
_byProbe[probe] = new HostProbeState
{
TagName = tag,
State = HostRuntimeState.Unknown,
LastStateChangeUtc = _clock(),
};
}
foreach (var probe in toUnadvise)
{
_byProbe.Remove(probe);
}
foreach (var removedTag in _probeByTagName.Keys.Where(t => !desired.ContainsKey(t)).ToList())
{
_probeByTagName.Remove(removedTag);
}
}
foreach (var tag in toAdvise)
{
var probe = tag + ProbeAttribute;
try
{
await _subscribe(probe, OnProbeCallback);
}
catch
{
// Rollback on subscribe failure so a later Tick can't transition a never-advised
// probe into a false Stopped state. Callers can re-Sync later to retry.
lock (_lock)
{
_byProbe.Remove(probe);
_probeByTagName.Remove(tag);
}
}
}
foreach (var probe in toUnadvise)
{
try { await _unsubscribe(probe); } catch { /* best-effort cleanup */ }
}
}
/// <summary>
/// Public entry point for tests and internal callbacks. Production flow: MxAccessClient's
/// SubscribeAsync delivers VTQ updates through the callback wired in <see cref="SyncAsync"/>,
/// which calls this method under the lock to update state and fires
/// <see cref="StateChanged"/> outside the lock for any transition that matters.
/// </summary>
public void OnProbeCallback(string probeTag, Vtq vtq)
{
if (_disposed) return;
HostStateTransition? transition = null;
lock (_lock)
{
if (!_byProbe.TryGetValue(probeTag, out var state)) return;
var isRunning = vtq.Quality >= 192 && vtq.Value is bool b && b;
var now = _clock();
var previous = state.State;
state.LastCallbackUtc = now;
if (isRunning)
{
state.GoodUpdateCount++;
if (previous != HostRuntimeState.Running)
{
state.State = HostRuntimeState.Running;
state.LastStateChangeUtc = now;
if (previous == HostRuntimeState.Stopped)
{
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Running, now);
}
}
}
else
{
state.FailureCount++;
if (previous != HostRuntimeState.Stopped)
{
state.State = HostRuntimeState.Stopped;
state.LastStateChangeUtc = now;
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Stopped, now);
}
}
}
if (transition is { } t)
{
StateChanged?.Invoke(this, t);
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
lock (_lock)
{
_byProbe.Clear();
_probeByTagName.Clear();
}
}
private sealed class HostProbeState
{
public string TagName { get; set; } = "";
public HostRuntimeState State { get; set; }
public DateTime LastStateChangeUtc { get; set; }
public DateTime? LastCallbackUtc { get; set; }
public long GoodUpdateCount { get; set; }
public long FailureCount { get; set; }
}
}
public enum HostRuntimeState
{
Unknown,
Running,
Stopped,
}
public sealed record HostStateTransition(
string TagName,
HostRuntimeState OldState,
HostRuntimeState NewState,
DateTime AtUtc);
public sealed record HostProbeSnapshot(
string TagName,
HostRuntimeState State,
DateTime LastChangedUtc);
public readonly record struct HostProbeTarget(string TagName, int CategoryId)
{
public bool IsRuntimeHost =>
CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|| CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine;
}

View File

@@ -85,6 +85,33 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
Tags = System.Array.Empty<HistoryTagValues>(),
});
public Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadProcessedResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadAtTimeResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
HistoryReadEventsRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadEventsResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse
{

View File

@@ -80,6 +80,27 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct);
return;
}
case MessageKind.HistoryReadProcessedRequest:
{
var resp = await backend.HistoryReadProcessedAsync(
Deserialize<HistoryReadProcessedRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
return;
}
case MessageKind.HistoryReadAtTimeRequest:
{
var resp = await backend.HistoryReadAtTimeAsync(
Deserialize<HistoryReadAtTimeRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadAtTimeResponse, resp, ct);
return;
}
case MessageKind.HistoryReadEventsRequest:
{
var resp = await backend.HistoryReadEventsAsync(
Deserialize<HistoryReadEventsRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadEventsResponse, resp, ct);
return;
}
case MessageKind.RecycleHostRequest:
{
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);

View File

@@ -297,10 +297,50 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public Task<HistoryReadResult> ReadProcessedAsync(
public async Task<HistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
=> throw new NotSupportedException("Galaxy historian processed reads are not supported in v2; use ReadRawAsync.");
{
var client = RequireClient();
var column = MapAggregateToColumn(aggregate);
var resp = await client.CallAsync<HistoryReadProcessedRequest, HistoryReadProcessedResponse>(
MessageKind.HistoryReadProcessedRequest,
new HistoryReadProcessedRequest
{
SessionId = _sessionId,
TagReference = fullReference,
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
IntervalMs = (long)interval.TotalMilliseconds,
AggregateColumn = column,
},
MessageKind.HistoryReadProcessedResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadProcessed failed: {resp.Error}");
IReadOnlyList<DataValueSnapshot> samples = [.. resp.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
/// <summary>
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.
/// Kept on the Proxy side so Galaxy.Host stays OPC-UA-free.
/// </summary>
internal static string MapAggregateToColumn(HistoryAggregateType aggregate) => aggregate switch
{
HistoryAggregateType.Average => "Average",
HistoryAggregateType.Minimum => "Minimum",
HistoryAggregateType.Maximum => "Maximum",
HistoryAggregateType.Count => "ValueCount",
HistoryAggregateType.Total => throw new NotSupportedException(
"HistoryAggregateType.Total is not supported by the Wonderware Historian AnalogSummary " +
"query — use Average × Count on the caller side, or switch to Average/Minimum/Maximum/Count."),
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
};
// ---- IRediscoverable ----

View File

@@ -16,6 +16,10 @@
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>

View File

@@ -48,8 +48,14 @@ public enum MessageKind : byte
AlarmEvent = 0x51,
AlarmAckRequest = 0x52,
HistoryReadRequest = 0x60,
HistoryReadResponse = 0x61,
HistoryReadRequest = 0x60,
HistoryReadResponse = 0x61,
HistoryReadProcessedRequest = 0x62,
HistoryReadProcessedResponse = 0x63,
HistoryReadAtTimeRequest = 0x64,
HistoryReadAtTimeResponse = 0x65,
HistoryReadEventsRequest = 0x66,
HistoryReadEventsResponse = 0x67,
HostConnectivityStatus = 0x70,
RuntimeStatusChange = 0x71,

View File

@@ -26,3 +26,85 @@ public sealed class HistoryReadResponse
[Key(1)] public string? Error { get; set; }
[Key(2)] public HistoryTagValues[] Tags { get; set; } = System.Array.Empty<HistoryTagValues>();
}
/// <summary>
/// Processed (aggregated) historian read — OPC UA HistoryReadProcessed service. The
/// aggregate column is a string (e.g. "Average", "Minimum") mapped by the Proxy from the
/// OPC UA HistoryAggregateType enum so Galaxy.Host stays OPC-UA-free.
/// </summary>
[MessagePackObject]
public sealed class HistoryReadProcessedRequest
{
[Key(0)] public long SessionId { get; set; }
[Key(1)] public string TagReference { get; set; } = string.Empty;
[Key(2)] public long StartUtcUnixMs { get; set; }
[Key(3)] public long EndUtcUnixMs { get; set; }
[Key(4)] public long IntervalMs { get; set; }
[Key(5)] public string AggregateColumn { get; set; } = "Average";
}
[MessagePackObject]
public sealed class HistoryReadProcessedResponse
{
[Key(0)] public bool Success { get; set; }
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
}
/// <summary>
/// At-time historian read — OPC UA HistoryReadAtTime service. Returns one sample per
/// requested timestamp (interpolated when no exact match exists). The per-timestamp array
/// is flow-encoded as Unix milliseconds to avoid MessagePack DateTime quirks.
/// </summary>
[MessagePackObject]
public sealed class HistoryReadAtTimeRequest
{
[Key(0)] public long SessionId { get; set; }
[Key(1)] public string TagReference { get; set; } = string.Empty;
[Key(2)] public long[] TimestampsUtcUnixMs { get; set; } = System.Array.Empty<long>();
}
[MessagePackObject]
public sealed class HistoryReadAtTimeResponse
{
[Key(0)] public bool Success { get; set; }
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
}
/// <summary>
/// Historical events read — OPC UA HistoryReadEvents service and Alarm &amp; Condition
/// history. <c>SourceName</c> null means "all sources". Distinct from the live
/// <see cref="GalaxyAlarmEvent"/> stream because historical rows carry both
/// <c>EventTime</c> (when the event occurred in the process) and <c>ReceivedTime</c>
/// (when the Historian persisted it) and have no StateTransition — the Historian logs
/// the instantaneous event, not the OPC UA alarm lifecycle.
/// </summary>
[MessagePackObject]
public sealed class HistoryReadEventsRequest
{
[Key(0)] public long SessionId { get; set; }
[Key(1)] public string? SourceName { get; set; }
[Key(2)] public long StartUtcUnixMs { get; set; }
[Key(3)] public long EndUtcUnixMs { get; set; }
[Key(4)] public int MaxEvents { get; set; } = 1000;
}
[MessagePackObject]
public sealed class GalaxyHistoricalEvent
{
[Key(0)] public string EventId { get; set; } = string.Empty;
[Key(1)] public string? SourceName { get; set; }
[Key(2)] public long EventTimeUtcUnixMs { get; set; }
[Key(3)] public long ReceivedTimeUtcUnixMs { get; set; }
[Key(4)] public string? DisplayText { get; set; }
[Key(5)] public ushort Severity { get; set; }
}
[MessagePackObject]
public sealed class HistoryReadEventsResponse
{
[Key(0)] public bool Success { get; set; }
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyHistoricalEvent[] Events { get; set; } = System.Array.Empty<GalaxyHistoricalEvent>();
}

View File

@@ -0,0 +1,190 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Alarms;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class GalaxyAlarmTrackerTests
{
private sealed class FakeSubscriber
{
public readonly ConcurrentDictionary<string, Action<string, Vtq>> Subs = new();
public readonly ConcurrentQueue<string> Unsubs = new();
public readonly ConcurrentQueue<(string Tag, object Value)> Writes = new();
public bool WriteReturns { get; set; } = true;
public Task Subscribe(string tag, Action<string, Vtq> cb)
{
Subs[tag] = cb;
return Task.CompletedTask;
}
public Task Unsubscribe(string tag)
{
Unsubs.Enqueue(tag);
Subs.TryRemove(tag, out _);
return Task.CompletedTask;
}
public Task<bool> Write(string tag, object value)
{
Writes.Enqueue((tag, value));
return Task.FromResult(WriteReturns);
}
}
private static Vtq Bool(bool v) => new(v, DateTime.UtcNow, 192);
private static Vtq Int(int v) => new(v, DateTime.UtcNow, 192);
private static Vtq Str(string v) => new(v, DateTime.UtcNow, 192);
[Fact]
public async Task Track_subscribes_to_four_alarm_attributes()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
await t.TrackAsync("Tank.Level.HiHi");
fake.Subs.ShouldContainKey("Tank.Level.HiHi.InAlarm");
fake.Subs.ShouldContainKey("Tank.Level.HiHi.Priority");
fake.Subs.ShouldContainKey("Tank.Level.HiHi.DescAttrName");
fake.Subs.ShouldContainKey("Tank.Level.HiHi.Acked");
t.TrackedAlarmCount.ShouldBe(1);
}
[Fact]
public async Task Track_is_idempotent_on_repeat_call()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
await t.TrackAsync("Alarm.A");
await t.TrackAsync("Alarm.A");
t.TrackedAlarmCount.ShouldBe(1);
fake.Subs.Count.ShouldBe(4); // 4 sub calls, not 8
}
[Fact]
public async Task InAlarm_false_to_true_fires_Active_transition()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
var transitions = new ConcurrentQueue<AlarmTransition>();
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
await t.TrackAsync("Alarm.A");
fake.Subs["Alarm.A.Priority"]("Alarm.A.Priority", Int(500));
fake.Subs["Alarm.A.DescAttrName"]("Alarm.A.DescAttrName", Str("TankLevelHiHi"));
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(true));
transitions.Count.ShouldBe(1);
transitions.TryDequeue(out var tr).ShouldBeTrue();
tr!.Transition.ShouldBe(AlarmStateTransition.Active);
tr.Priority.ShouldBe(500);
tr.DescAttrName.ShouldBe("TankLevelHiHi");
}
[Fact]
public async Task InAlarm_true_to_false_fires_Inactive_transition()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
var transitions = new ConcurrentQueue<AlarmTransition>();
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
await t.TrackAsync("Alarm.A");
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(true));
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(false));
transitions.Count.ShouldBe(2);
transitions.TryDequeue(out _);
transitions.TryDequeue(out var tr).ShouldBeTrue();
tr!.Transition.ShouldBe(AlarmStateTransition.Inactive);
}
[Fact]
public async Task Acked_false_to_true_fires_Acknowledged_while_InAlarm_is_true()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
var transitions = new ConcurrentQueue<AlarmTransition>();
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
await t.TrackAsync("Alarm.A");
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(true)); // Active, clears Acked flag
fake.Subs["Alarm.A.Acked"]("Alarm.A.Acked", Bool(true)); // Acknowledged
transitions.Count.ShouldBe(2);
transitions.TryDequeue(out _);
transitions.TryDequeue(out var tr).ShouldBeTrue();
tr!.Transition.ShouldBe(AlarmStateTransition.Acknowledged);
}
[Fact]
public async Task Acked_transition_while_InAlarm_is_false_does_not_fire()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
var transitions = new ConcurrentQueue<AlarmTransition>();
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
await t.TrackAsync("Alarm.A");
// Initial Acked=true on subscribe (alarm is at rest, pre-ack'd) — should not fire.
fake.Subs["Alarm.A.Acked"]("Alarm.A.Acked", Bool(true));
transitions.Count.ShouldBe(0);
}
[Fact]
public async Task Acknowledge_writes_AckMsg_with_comment()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
await t.TrackAsync("Alarm.A");
var ok = await t.AcknowledgeAsync("Alarm.A", "acknowledged by operator");
ok.ShouldBeTrue();
fake.Writes.Count.ShouldBe(1);
fake.Writes.TryDequeue(out var w).ShouldBeTrue();
w.Tag.ShouldBe("Alarm.A.AckMsg");
w.Value.ShouldBe("acknowledged by operator");
}
[Fact]
public async Task Snapshot_reports_latest_fields()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
await t.TrackAsync("Alarm.A");
fake.Subs["Alarm.A.InAlarm"]("Alarm.A.InAlarm", Bool(true));
fake.Subs["Alarm.A.Priority"]("Alarm.A.Priority", Int(900));
fake.Subs["Alarm.A.DescAttrName"]("Alarm.A.DescAttrName", Str("MyAlarm"));
fake.Subs["Alarm.A.Acked"]("Alarm.A.Acked", Bool(true));
var snap = t.SnapshotStates();
snap.Count.ShouldBe(1);
snap[0].InAlarm.ShouldBeTrue();
snap[0].Acked.ShouldBeTrue();
snap[0].Priority.ShouldBe(900);
snap[0].DescAttrName.ShouldBe("MyAlarm");
}
[Fact]
public async Task Foreign_probe_callback_is_dropped()
{
var fake = new FakeSubscriber();
using var t = new GalaxyAlarmTracker(fake.Subscribe, fake.Unsubscribe, fake.Write);
var transitions = new ConcurrentQueue<AlarmTransition>();
t.TransitionRaised += (_, tr) => transitions.Enqueue(tr);
// No TrackAsync was called — this callback is foreign and should be silently ignored.
t.OnProbeCallback("Unknown.InAlarm", Bool(true));
transitions.Count.ShouldBe(0);
}
}

View File

@@ -0,0 +1,231 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class GalaxyRuntimeProbeManagerTests
{
private sealed class FakeSubscriber
{
public readonly ConcurrentDictionary<string, Action<string, Vtq>> Subs = new();
public readonly ConcurrentQueue<string> UnsubCalls = new();
public bool FailSubscribeFor { get; set; }
public string? FailSubscribeTag { get; set; }
public Task Subscribe(string probe, Action<string, Vtq> cb)
{
if (FailSubscribeFor && string.Equals(probe, FailSubscribeTag, StringComparison.OrdinalIgnoreCase))
throw new InvalidOperationException("subscribe refused");
Subs[probe] = cb;
return Task.CompletedTask;
}
public Task Unsubscribe(string probe)
{
UnsubCalls.Enqueue(probe);
Subs.TryRemove(probe, out _);
return Task.CompletedTask;
}
}
private static Vtq Good(bool scanState) => new(scanState, DateTime.UtcNow, 192);
private static Vtq Bad() => new(null, DateTime.UtcNow, 0);
[Fact]
public async Task Sync_subscribes_to_ScanState_per_host()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", GalaxyRuntimeProbeManager.CategoryWinPlatform),
new HostProbeTarget("EngineB", GalaxyRuntimeProbeManager.CategoryAppEngine),
});
mgr.ActiveProbeCount.ShouldBe(2);
subs.Subs.ShouldContainKey("PlatformA.ScanState");
subs.Subs.ShouldContainKey("EngineB.ScanState");
}
[Fact]
public async Task Sync_is_idempotent_on_repeat_call_with_same_set()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
var targets = new[] { new HostProbeTarget("PlatformA", 1) };
await mgr.SyncAsync(targets);
await mgr.SyncAsync(targets);
mgr.ActiveProbeCount.ShouldBe(1);
subs.Subs.Count.ShouldBe(1);
subs.UnsubCalls.Count.ShouldBe(0);
}
[Fact]
public async Task Sync_unadvises_removed_hosts()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", 1),
new HostProbeTarget("PlatformB", 1),
});
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
mgr.ActiveProbeCount.ShouldBe(1);
subs.UnsubCalls.ShouldContain("PlatformB.ScanState");
}
[Fact]
public async Task Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events()
{
var subs = new FakeSubscriber { FailSubscribeFor = true, FailSubscribeTag = "PlatformA.ScanState" };
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
mgr.ActiveProbeCount.ShouldBe(0); // rolled back
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Unknown);
}
[Fact]
public async Task Unknown_to_Running_does_not_fire_StateChanged()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Running);
transitions.Count.ShouldBe(0); // startup transition, operators don't care
}
[Fact]
public async Task Running_to_Stopped_fires_StateChanged_with_both_states()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
transitions.Count.ShouldBe(1);
transitions.TryDequeue(out var t).ShouldBeTrue();
t!.TagName.ShouldBe("PlatformA");
t.OldState.ShouldBe(HostRuntimeState.Running);
t.NewState.ShouldBe(HostRuntimeState.Stopped);
}
[Fact]
public async Task Stopped_to_Running_fires_StateChanged_for_recovery()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Stopped→Running (fires)
transitions.Count.ShouldBe(2);
}
[Fact]
public async Task Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
// First callback is bad-quality — we must flag the host Stopped so operators see it.
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Bad());
transitions.Count.ShouldBe(1);
transitions.TryDequeue(out var t).ShouldBeTrue();
t!.OldState.ShouldBe(HostRuntimeState.Unknown);
t.NewState.ShouldBe(HostRuntimeState.Stopped);
}
[Fact]
public async Task Repeated_Good_Running_callbacks_do_not_fire_duplicate_events()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var count = 0;
mgr.StateChanged += (_, _) => Interlocked.Increment(ref count);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
for (var i = 0; i < 5; i++)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
count.ShouldBe(0); // only the silent Unknown→Running on the first, no events after
}
[Fact]
public async Task Unknown_callback_for_non_tracked_probe_is_dropped()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
mgr.OnProbeCallback("ProbeForSomeoneElse.ScanState", Good(true));
mgr.ActiveProbeCount.ShouldBe(0);
}
[Fact]
public async Task Snapshot_reports_current_state_for_every_tracked_host()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", 1),
new HostProbeTarget("EngineB", 3),
});
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Running
subs.Subs["EngineB.ScanState"]("EngineB.ScanState", Bad()); // Stopped
var snap = mgr.SnapshotStates();
snap.Count.ShouldBe(2);
snap.ShouldContain(s => s.TagName == "PlatformA" && s.State == HostRuntimeState.Running);
snap.ShouldContain(s => s.TagName == "EngineB" && s.State == HostRuntimeState.Stopped);
}
[Fact]
public void IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids()
{
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryWinPlatform).IsRuntimeHost.ShouldBeTrue();
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryAppEngine).IsRuntimeHost.ShouldBeTrue();
new HostProbeTarget("X", 4 /* $Area */).IsRuntimeHost.ShouldBeFalse();
new HostProbeTarget("X", 11 /* $ApplicationObject */).IsRuntimeHost.ShouldBeFalse();
}
}

View File

@@ -0,0 +1,61 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HistorianQualityMapperTests
{
/// <summary>
/// Rich mapping preserves specific OPC DA subcodes through the historian ToWire path.
/// Before PR 12 the category-only fallback collapsed e.g. BadNotConnected(8) to
/// Bad(0x80000000) so downstream OPC UA clients could not distinguish transport issues
/// from sensor issues. After PR 12 every known subcode round-trips to its canonical
/// uint32 StatusCode and Proxy translation stays byte-for-byte with v1 QualityMapper.
/// </summary>
[Theory]
[InlineData((byte)192, 0x00000000u)] // Good
[InlineData((byte)216, 0x00D80000u)] // Good_LocalOverride
[InlineData((byte)64, 0x40000000u)] // Uncertain
[InlineData((byte)68, 0x40900000u)] // Uncertain_LastUsableValue
[InlineData((byte)80, 0x40930000u)] // Uncertain_SensorNotAccurate
[InlineData((byte)84, 0x40940000u)] // Uncertain_EngineeringUnitsExceeded
[InlineData((byte)88, 0x40950000u)] // Uncertain_SubNormal
[InlineData((byte)0, 0x80000000u)] // Bad
[InlineData((byte)4, 0x80890000u)] // Bad_ConfigurationError
[InlineData((byte)8, 0x808A0000u)] // Bad_NotConnected
[InlineData((byte)12, 0x808B0000u)] // Bad_DeviceFailure
[InlineData((byte)16, 0x808C0000u)] // Bad_SensorFailure
[InlineData((byte)20, 0x80050000u)] // Bad_CommunicationError
[InlineData((byte)24, 0x808D0000u)] // Bad_OutOfService
[InlineData((byte)32, 0x80320000u)] // Bad_WaitingForInitialData
public void Maps_specific_OPC_DA_codes_to_canonical_StatusCode(byte quality, uint expected)
{
HistorianQualityMapper.Map(quality).ShouldBe(expected);
}
[Theory]
[InlineData((byte)200)] // Good — unknown subcode in Good family
[InlineData((byte)255)] // Good — unknown
public void Unknown_good_family_codes_fall_back_to_plain_Good(byte q)
{
HistorianQualityMapper.Map(q).ShouldBe(0x00000000u);
}
[Theory]
[InlineData((byte)100)] // Uncertain — unknown subcode
[InlineData((byte)150)] // Uncertain — unknown
public void Unknown_uncertain_family_codes_fall_back_to_plain_Uncertain(byte q)
{
HistorianQualityMapper.Map(q).ShouldBe(0x40000000u);
}
[Theory]
[InlineData((byte)1)] // Bad — unknown subcode
[InlineData((byte)50)] // Bad — unknown
public void Unknown_bad_family_codes_fall_back_to_plain_Bad(byte q)
{
HistorianQualityMapper.Map(q).ShouldBe(0x80000000u);
}
}

View File

@@ -0,0 +1,147 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HistoryReadAtTimeTests
{
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? historian, StaPump pump) =>
new(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
new MxAccessClient(pump, new MxProxyAdapter(), "attime-test"),
historian);
[Fact]
public async Task Returns_disabled_error_when_no_historian_configured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
using var backend = BuildBackend(null, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = new[] { 1L, 2L },
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
}
[Fact]
public async Task Empty_timestamp_list_short_circuits_to_success_with_no_values()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var fake = new FakeHistorian();
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = Array.Empty<long>(),
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.ShouldBeEmpty();
fake.Calls.ShouldBe(0); // no round-trip to SDK for empty timestamp list
}
[Fact]
public async Task Timestamps_survive_Unix_ms_round_trip_to_DateTime()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var t1 = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var t2 = new DateTime(2026, 4, 18, 10, 5, 0, DateTimeKind.Utc);
var fake = new FakeHistorian(
new HistorianSample { Value = 100.0, Quality = 192, TimestampUtc = t1 },
new HistorianSample { Value = 101.5, Quality = 192, TimestampUtc = t2 });
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "TankLevel",
TimestampsUtcUnixMs = new[]
{
new DateTimeOffset(t1, TimeSpan.Zero).ToUnixTimeMilliseconds(),
new DateTimeOffset(t2, TimeSpan.Zero).ToUnixTimeMilliseconds(),
},
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(2);
resp.Values[0].SourceTimestampUtcUnixMs.ShouldBe(new DateTimeOffset(t1, TimeSpan.Zero).ToUnixTimeMilliseconds());
resp.Values[0].StatusCode.ShouldBe(0u); // Good (quality 192)
MessagePackSerializer.Deserialize<double>(resp.Values[0].ValueBytes!).ShouldBe(100.0);
fake.Calls.ShouldBe(1);
fake.LastTimestamps.Length.ShouldBe(2);
fake.LastTimestamps[0].ShouldBe(t1);
fake.LastTimestamps[1].ShouldBe(t2);
}
[Fact]
public async Task Missing_sample_maps_to_Bad_category()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
// Quality=0 means no sample at that timestamp per HistorianDataSource.ReadAtTimeAsync.
var fake = new FakeHistorian(new HistorianSample
{
Value = null,
Quality = 0,
TimestampUtc = DateTime.UtcNow,
});
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = new[] { 1L },
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(1);
resp.Values[0].StatusCode.ShouldBe(0x80000000u); // Bad category
resp.Values[0].ValueBytes.ShouldBeNull();
}
private sealed class FakeHistorian : IHistorianDataSource
{
private readonly HistorianSample[] _samples;
public int Calls { get; private set; }
public DateTime[] LastTimestamps { get; private set; } = Array.Empty<DateTime>();
public FakeHistorian(params HistorianSample[] samples) => _samples = samples;
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
{
Calls++;
LastTimestamps = ts;
return Task.FromResult(new List<HistorianSample>(_samples));
}
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
=> Task.FromResult(new List<HistorianAggregateSample>());
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianEventDto>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}

View File

@@ -0,0 +1,129 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HistoryReadEventsTests
{
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? h, StaPump pump) =>
new(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
new MxAccessClient(pump, new MxProxyAdapter(), "events-test"),
h);
[Fact]
public async Task Returns_disabled_error_when_no_historian_configured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
using var backend = BuildBackend(null, pump);
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
{
SourceName = "TankA",
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
MaxEvents = 100,
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
}
[Fact]
public async Task Maps_HistorianEventDto_to_GalaxyHistoricalEvent_wire_shape()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var eventId = Guid.NewGuid();
var eventTime = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var receivedTime = eventTime.AddMilliseconds(150);
var fake = new FakeHistorian(new HistorianEventDto
{
Id = eventId,
Source = "TankA.Level.HiHi",
EventTime = eventTime,
ReceivedTime = receivedTime,
DisplayText = "HiHi alarm tripped",
Severity = 900,
});
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
{
SourceName = "TankA.Level.HiHi",
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
MaxEvents = 50,
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Events.Length.ShouldBe(1);
var got = resp.Events[0];
got.EventId.ShouldBe(eventId.ToString());
got.SourceName.ShouldBe("TankA.Level.HiHi");
got.DisplayText.ShouldBe("HiHi alarm tripped");
got.Severity.ShouldBe<ushort>(900);
got.EventTimeUtcUnixMs.ShouldBe(new DateTimeOffset(eventTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
got.ReceivedTimeUtcUnixMs.ShouldBe(new DateTimeOffset(receivedTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
fake.LastSourceName.ShouldBe("TankA.Level.HiHi");
fake.LastMaxEvents.ShouldBe(50);
}
[Fact]
public async Task Null_source_name_is_passed_through_as_all_sources()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var fake = new FakeHistorian();
using var backend = BuildBackend(fake, pump);
await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
{
SourceName = null,
StartUtcUnixMs = 0,
EndUtcUnixMs = 1,
MaxEvents = 10,
}, CancellationToken.None);
fake.LastSourceName.ShouldBeNull();
}
private sealed class FakeHistorian : IHistorianDataSource
{
private readonly HistorianEventDto[] _events;
public string? LastSourceName { get; private set; } = "<unset>";
public int LastMaxEvents { get; private set; }
public FakeHistorian(params HistorianEventDto[] events) => _events = events;
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
{
LastSourceName = src;
LastMaxEvents = max;
return Task.FromResult(new List<HistorianEventDto>(_events));
}
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
=> Task.FromResult(new List<HistorianAggregateSample>());
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}

View File

@@ -0,0 +1,158 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HistoryReadProcessedTests
{
[Fact]
public async Task ReturnsDisabledError_When_NoHistorianConfigured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
historian: null);
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
{
TagReference = "T",
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
IntervalMs = 1000,
AggregateColumn = "Average",
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
}
[Fact]
public async Task Rejects_NonPositiveInterval()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
var fake = new FakeHistorianDataSource();
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
fake);
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
{
TagReference = "T",
IntervalMs = 0,
AggregateColumn = "Average",
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("IntervalMs");
}
[Fact]
public async Task Maps_AggregateSample_With_Value_To_Good()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
var fake = new FakeHistorianDataSource(new HistorianAggregateSample
{
Value = 12.34,
TimestampUtc = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc),
});
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
fake);
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
{
TagReference = "T",
StartUtcUnixMs = 0,
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
IntervalMs = 60_000,
AggregateColumn = "Average",
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(1);
resp.Values[0].StatusCode.ShouldBe(0u); // Good
resp.Values[0].ValueBytes.ShouldNotBeNull();
MessagePackSerializer.Deserialize<double>(resp.Values[0].ValueBytes!).ShouldBe(12.34);
fake.LastAggregateColumn.ShouldBe("Average");
fake.LastIntervalMs.ShouldBe(60_000d);
}
[Fact]
public async Task Maps_Null_Bucket_To_BadNoData()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test");
var fake = new FakeHistorianDataSource(new HistorianAggregateSample
{
Value = null,
TimestampUtc = DateTime.UtcNow,
});
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
fake);
var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest
{
TagReference = "T",
IntervalMs = 1000,
AggregateColumn = "Minimum",
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(1);
resp.Values[0].StatusCode.ShouldBe(0x800E0000u); // BadNoData
resp.Values[0].ValueBytes.ShouldBeNull();
}
private sealed class FakeHistorianDataSource : IHistorianDataSource
{
private readonly HistorianAggregateSample[] _samples;
public string? LastAggregateColumn { get; private set; }
public double LastIntervalMs { get; private set; }
public FakeHistorianDataSource(params HistorianAggregateSample[] samples) => _samples = samples;
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(
string tag, DateTime s, DateTime e, double intervalMs, string col, CancellationToken ct)
{
LastAggregateColumn = col;
LastIntervalMs = intervalMs;
return Task.FromResult(new List<HistorianAggregateSample>(_samples));
}
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianEventDto>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}

View File

@@ -0,0 +1,91 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HostStatusPushTests
{
/// <summary>
/// PR 8 — when MxAccessClient.ConnectionStateChanged fires false→true→false,
/// MxAccessGalaxyBackend raises OnHostStatusChanged once per transition with
/// HostName=ClientName, RuntimeStatus="Running"/"Stopped", and a timestamp.
/// This is the gateway-level signal; per-platform ScanState probes are deferred.
/// </summary>
[Fact]
public async Task ConnectionStateChanged_raises_OnHostStatusChanged_with_gateway_name()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var proxy = new FakeProxy();
var mx = new MxAccessClient(pump, proxy, "GatewayClient", new MxAccessClientOptions { AutoReconnect = false });
using var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
historian: null);
var notifications = new ConcurrentQueue<HostConnectivityStatus>();
backend.OnHostStatusChanged += (_, s) => notifications.Enqueue(s);
await mx.ConnectAsync();
await mx.DisconnectAsync();
notifications.Count.ShouldBe(2);
notifications.TryDequeue(out var first).ShouldBeTrue();
first!.HostName.ShouldBe("GatewayClient");
first.RuntimeStatus.ShouldBe("Running");
first.LastObservedUtcUnixMs.ShouldBeGreaterThan(0);
notifications.TryDequeue(out var second).ShouldBeTrue();
second!.HostName.ShouldBe("GatewayClient");
second.RuntimeStatus.ShouldBe("Stopped");
}
[Fact]
public async Task Dispose_unsubscribes_so_post_dispose_state_changes_do_not_fire_events()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var proxy = new FakeProxy();
var mx = new MxAccessClient(pump, proxy, "GatewayClient", new MxAccessClientOptions { AutoReconnect = false });
var backend = new MxAccessGalaxyBackend(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
mx,
historian: null);
var count = 0;
backend.OnHostStatusChanged += (_, _) => Interlocked.Increment(ref count);
await mx.ConnectAsync();
count.ShouldBe(1);
backend.Dispose();
await mx.DisconnectAsync();
count.ShouldBe(1); // no second notification after Dispose
}
private sealed class FakeProxy : IMxProxy
{
private int _next = 1;
public int Register(string _) => 42;
public void Unregister(int _) { }
public int AddItem(int _, string __) => Interlocked.Increment(ref _next);
public void RemoveItem(int _, int __) { }
public void AdviseSupervisory(int _, int __) { }
public void UnAdviseSupervisory(int _, int __) { }
public void Write(int _, int __, object ___, int ____) { }
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
}
}

View File

@@ -0,0 +1,173 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class MxAccessClientMonitorLoopTests
{
/// <summary>
/// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it
/// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds,
/// which over a 24h uptime becomes thousands of leaked MXAccess handles and can
/// eventually exhaust the runtime proxy's handle table.
/// </summary>
[Fact]
public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem()
{
using var pump = new StaPump("Monitor.Sta");
await pump.WaitForStartedAsync();
var proxy = new CountingProxy();
var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions
{
AutoReconnect = true,
MonitorInterval = TimeSpan.FromMilliseconds(150),
StaleThreshold = TimeSpan.FromMilliseconds(50),
});
await client.ConnectAsync();
// Wait past StaleThreshold, then let several monitor cycles fire.
await Task.Delay(700);
client.Dispose();
// One Heartbeat probe fires per monitor tick once the connection looks stale.
proxy.HeartbeatAddCount.ShouldBeGreaterThan(1);
// Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle.
proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount);
proxy.OutstandingHeartbeatHandles.ShouldBe(0);
}
/// <summary>
/// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise
/// SubscriptionReplayFailed so the backend can propagate the degradation, not get
/// silently eaten.
/// </summary>
[Fact]
public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay()
{
using var pump = new StaPump("Replay.Sta");
await pump.WaitForStartedAsync();
var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" });
var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions
{
AutoReconnect = true,
MonitorInterval = TimeSpan.FromMilliseconds(120),
StaleThreshold = TimeSpan.FromMilliseconds(50),
});
var failures = new ConcurrentBag<SubscriptionReplayFailedEventArgs>();
client.SubscriptionReplayFailed += (_, e) => failures.Add(e);
await client.ConnectAsync();
await client.SubscribeAsync("GoodTag.X", (_, _) => { });
await client.SubscribeAsync("BadTag.A", (_, _) => { });
await client.SubscribeAsync("BadTag.B", (_, _) => { });
proxy.TriggerProbeFailureOnNextCall();
// Wait for the monitor loop to probe → fail → reconnect → replay.
await Task.Delay(800);
client.Dispose();
failures.Count.ShouldBe(2);
var names = new HashSet<string>();
foreach (var f in failures) names.Add(f.TagReference);
names.ShouldContain("BadTag.A");
names.ShouldContain("BadTag.B");
}
// ----- test doubles -----
private sealed class CountingProxy : IMxProxy
{
private int _next = 1;
private readonly ConcurrentDictionary<int, string> _live = new();
public int HeartbeatAddCount;
public int HeartbeatRemoveCount;
public int OutstandingHeartbeatHandles => _live.Count;
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
public int Register(string _) => 42;
public void Unregister(int _) { }
public int AddItem(int _, string address)
{
var h = Interlocked.Increment(ref _next);
_live[h] = address;
if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount);
return h;
}
public void RemoveItem(int _, int itemHandle)
{
if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat")
Interlocked.Increment(ref HeartbeatRemoveCount);
}
public void AdviseSupervisory(int _, int __) { }
public void UnAdviseSupervisory(int _, int __) { }
public void Write(int _, int __, object ___, int ____) { }
}
/// <summary>
/// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall
/// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the
/// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the
/// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once
/// per failing tag.
/// </summary>
private sealed class ReplayFailingProxy : IMxProxy
{
private int _next = 1;
private readonly HashSet<string> _failOnReplay;
private int _probeFailOnce;
private readonly ConcurrentDictionary<string, bool> _replayedOnce = new(StringComparer.OrdinalIgnoreCase);
public ReplayFailingProxy(IEnumerable<string> failOnReplayForTags)
{
_failOnReplay = new HashSet<string>(failOnReplayForTags, StringComparer.OrdinalIgnoreCase);
}
public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1);
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
public int Register(string _) => 42;
public void Unregister(int _) { }
public int AddItem(int _, string address)
{
if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1)
throw new InvalidOperationException("simulated probe failure");
// Fail only on the *replay* AddItem for listed tags — not the initial subscribe.
if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address))
throw new InvalidOperationException($"simulated replay failure for {address}");
if (_failOnReplay.Contains(address)) _replayedOnce[address] = true;
return Interlocked.Increment(ref _next);
}
public void RemoveItem(int _, int __) { }
public void AdviseSupervisory(int _, int __) { }
public void UnAdviseSupervisory(int _, int __) { }
public void Write(int _, int __, object ___, int ____) { }
}
}

View File

@@ -24,6 +24,11 @@
<ItemGroup>
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj"/>
<Reference Include="System.ServiceProcess"/>
<!-- IMxProxy's delegate signatures mention ArchestrA.MxAccess.MXSTATUS_PROXY, so tests
implementing the interface must resolve that type at compile time. -->
<Reference Include="ArchestrA.MxAccess">
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>

View File

@@ -0,0 +1,27 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
[Trait("Category", "Unit")]
public sealed class AggregateColumnMappingTests
{
[Theory]
[InlineData(HistoryAggregateType.Average, "Average")]
[InlineData(HistoryAggregateType.Minimum, "Minimum")]
[InlineData(HistoryAggregateType.Maximum, "Maximum")]
[InlineData(HistoryAggregateType.Count, "ValueCount")]
public void Maps_OpcUa_enum_to_AnalogSummary_column(HistoryAggregateType aggregate, string expected)
{
GalaxyProxyDriver.MapAggregateToColumn(aggregate).ShouldBe(expected);
}
[Fact]
public void Total_is_not_supported()
{
Should.Throw<System.NotSupportedException>(
() => GalaxyProxyDriver.MapAggregateToColumn(HistoryAggregateType.Total));
}
}