From 945ccd0b85036e8ae47b4e8456e991db576d85cd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 10 Jun 2026 14:20:02 -0400 Subject: [PATCH] feat(scripted-alarms): DependencyMuxTagUpstreamSource (T7) Concrete ITagUpstreamSource the scripted-alarm host actor pushes DependencyValueChanged values into and ScriptedAlarmEngine reads/subscribes from. Thread-safe: ConcurrentDictionary value cache + per-path ImmutableList observer lists with atomic add/remove and capture-then-invoke fan-out. ReadTag of an unknown path returns a Bad-quality (0x80000000) snapshot stamped via the injected clock. Adds the Core.ScriptedAlarms project reference Runtime needs to see the interface. --- .../DependencyMuxTagUpstreamSource.cs | 160 ++++++++++++++++++ .../ZB.MOM.WW.OtOpcUa.Runtime.csproj | 4 + .../DependencyMuxTagUpstreamSourceTests.cs | 145 ++++++++++++++++ 3 files changed, 309 insertions(+) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/DependencyMuxTagUpstreamSource.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/DependencyMuxTagUpstreamSourceTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/DependencyMuxTagUpstreamSource.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/DependencyMuxTagUpstreamSource.cs new file mode 100644 index 00000000..5302fe40 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/DependencyMuxTagUpstreamSource.cs @@ -0,0 +1,160 @@ +using System.Collections.Concurrent; +using System.Collections.Immutable; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; + +/// +/// Thread-safe the scripted-alarm host actor pushes +/// tag values INTO and the reads / subscribes FROM. +/// In the live runtime the host actor translates each Akka +/// DependencyValueChanged message into a +/// call; the engine sees those values synchronously through and +/// reactively through . +/// +/// +/// +/// A caches the latest snapshot +/// per path so can answer synchronously (the engine's +/// startup-recovery + read-cache-refill paths both call it). Per-path observer +/// lists are held as an immutable list inside a +/// so subscribe / unsubscribe mutate via atomic compare-and-swap and +/// can capture-then-invoke a stable +/// snapshot — a concurrent unsubscribe can never corrupt an in-flight fan-out. +/// +/// +public sealed class DependencyMuxTagUpstreamSource : ITagUpstreamSource +{ + // OPC UA Part 4 StatusCode for an outright Bad value (severity 10, bit 31 set). The + // codebase has no shared Core.Abstractions constant — concrete producers (GalaxyDriver, + // the Wonderware historian client) inline this same 0x80000000u, and the engine's own + // AreInputsReady gate tests exactly this bit — so an "unknown path" snapshot uses it too. + private const uint StatusBad = 0x80000000u; + + private readonly ConcurrentDictionary _cache + = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary>> _observers + = new(StringComparer.Ordinal); + private readonly Func _clock; + + /// + /// Initializes a new . + /// + /// + /// Optional function supplying the current UTC time, used to stamp the Bad-quality + /// snapshot returned for an unknown path. Defaults to , + /// mirroring how takes its clock. + /// + public DependencyMuxTagUpstreamSource(Func? clock = null) + => _clock = clock ?? (() => DateTime.UtcNow); + + /// + /// Update the cached snapshot for and then notify every + /// observer currently subscribed to that path. NOT part of + /// — the host actor calls this from its + /// DependencyValueChanged handler. + /// + /// The tag path whose value changed. + /// The new value snapshot. + public void Push(string path, DataValueSnapshot snapshot) + { + ArgumentNullException.ThrowIfNull(path); + ArgumentNullException.ThrowIfNull(snapshot); + + // Cache first so any observer that re-reads the path inside its callback sees the + // value it is being notified about, not the prior one. + _cache[path] = snapshot; + + // Capture the immutable observer snapshot, then invoke outside any lock: a + // concurrent Subscribe/Dispose swaps the dictionary entry atomically and cannot + // mutate the list we are iterating. + if (_observers.TryGetValue(path, out var observers)) + { + foreach (var observer in observers) + observer(path, snapshot); + } + } + + /// + public DataValueSnapshot ReadTag(string path) + { + ArgumentNullException.ThrowIfNull(path); + if (_cache.TryGetValue(path, out var snapshot)) + return snapshot; + + // No value has been pushed for this path yet — return a Bad-quality placeholder so + // the engine's cold-start guard holds the prior condition rather than NRE-ing on a + // null value cast. + var now = _clock(); + return new DataValueSnapshot(Value: null, StatusCode: StatusBad, SourceTimestampUtc: null, ServerTimestampUtc: now); + } + + /// + public IDisposable SubscribeTag(string path, Action observer) + { + ArgumentNullException.ThrowIfNull(path); + ArgumentNullException.ThrowIfNull(observer); + + _observers.AddOrUpdate( + path, + _ => ImmutableList.Create(observer), + (_, existing) => existing.Add(observer)); + + return new Subscription(this, path, observer); + } + + private void Unsubscribe(string path, Action observer) + { + // Atomically remove exactly this observer. Robust to dispose-after-already-removed + // (the path entry may be gone, or the observer may already have been pulled). + while (_observers.TryGetValue(path, out var existing)) + { + var updated = existing.Remove(observer); + if (ReferenceEquals(updated, existing)) + return; // observer not present — nothing to do + + if (updated.IsEmpty) + { + // Drop the empty entry, but only if it hasn't changed under us. The + // KeyValuePair overload removes atomically iff both key AND value still match. + if (_observers.TryRemove(new KeyValuePair>>(path, existing))) + return; + // Lost the race — another mutation swapped the entry; retry. + continue; + } + + if (_observers.TryUpdate(path, updated, existing)) + return; + // Lost the CAS — another subscribe/unsubscribe won; retry with the fresh list. + } + } + + /// + /// Per-observer subscription handle. deregisters exactly the + /// observer it was created for and is idempotent — calling it after the observer has + /// already been removed is a no-op. + /// + private sealed class Subscription : IDisposable + { + private DependencyMuxTagUpstreamSource? _owner; + private readonly string _path; + private readonly Action _observer; + + public Subscription(DependencyMuxTagUpstreamSource owner, string path, Action observer) + { + _owner = owner; + _path = path; + _observer = observer; + } + + public void Dispose() + { + // Swap _owner to null first so a double-dispose can't deregister twice (the + // second call sees null). Interlocked makes the guard safe under concurrent + // Dispose calls on the same handle. + var owner = Interlocked.Exchange(ref _owner, null); + owner?.Unsubscribe(_path, _observer); + } + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj index 42a358d8..ce69137b 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj @@ -25,6 +25,10 @@ --> + + diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/DependencyMuxTagUpstreamSourceTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/DependencyMuxTagUpstreamSourceTests.cs new file mode 100644 index 00000000..d26b5381 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/DependencyMuxTagUpstreamSourceTests.cs @@ -0,0 +1,145 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms; + +/// +/// Unit tests for — the host-actor-fed +/// ITagUpstreamSource the scripted-alarm engine reads + subscribes through. +/// +public sealed class DependencyMuxTagUpstreamSourceTests +{ + private static DataValueSnapshot Good(object? value, DateTime ts) + => new(value, 0u, ts, ts); + + [Fact] + public void Push_then_ReadTag_returns_the_pushed_snapshot() + { + var src = new DependencyMuxTagUpstreamSource(); + var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc); + var snap = Good(42, now); + + src.Push("a/b/c", snap); + + src.ReadTag("a/b/c").ShouldBeSameAs(snap); + } + + [Fact] + public void ReadTag_of_unknown_path_returns_a_Bad_quality_snapshot() + { + var fixedNow = new DateTime(2026, 06, 10, 9, 30, 0, DateTimeKind.Utc); + var src = new DependencyMuxTagUpstreamSource(clock: () => fixedNow); + + var snap = src.ReadTag("never/pushed"); + + snap.Value.ShouldBeNull(); + // Bad: OPC UA Part 4 StatusCode bit 31 set (severity 10). + (snap.StatusCode & 0x80000000u).ShouldNotBe(0u); + snap.StatusCode.ShouldNotBe(0u); + // Timestamp comes from the injected clock. + snap.ServerTimestampUtc.ShouldBe(fixedNow); + } + + [Fact] + public void SubscribeTag_observer_fires_on_next_push_to_same_path_only() + { + var src = new DependencyMuxTagUpstreamSource(); + var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc); + var received = new List<(string Path, DataValueSnapshot Snap)>(); + + using var sub = src.SubscribeTag("tag/x", (p, s) => received.Add((p, s))); + + // A push to a DIFFERENT path must NOT fire this observer. + src.Push("tag/other", Good(1, now)); + received.ShouldBeEmpty(); + + var snap = Good(7, now); + src.Push("tag/x", snap); + + received.Count.ShouldBe(1); + received[0].Path.ShouldBe("tag/x"); + received[0].Snap.ShouldBeSameAs(snap); + } + + [Fact] + public void Multiple_observers_on_same_path_all_fire() + { + var src = new DependencyMuxTagUpstreamSource(); + var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc); + var a = 0; + var b = 0; + + using var subA = src.SubscribeTag("tag/y", (_, _) => a++); + using var subB = src.SubscribeTag("tag/y", (_, _) => b++); + + src.Push("tag/y", Good(1, now)); + + a.ShouldBe(1); + b.ShouldBe(1); + } + + [Fact] + public void Disposing_a_subscription_stops_delivery_to_that_observer_only() + { + var src = new DependencyMuxTagUpstreamSource(); + var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc); + var a = 0; + var b = 0; + + var subA = src.SubscribeTag("tag/z", (_, _) => a++); + using var subB = src.SubscribeTag("tag/z", (_, _) => b++); + + src.Push("tag/z", Good(1, now)); + a.ShouldBe(1); + b.ShouldBe(1); + + subA.Dispose(); + + src.Push("tag/z", Good(2, now)); + a.ShouldBe(1); // disposed — no further delivery + b.ShouldBe(2); // still subscribed + + // Dispose-after-already-removed must be a no-op, not throw. + Should.NotThrow(() => subA.Dispose()); + } + + [Fact] + public async Task Concurrent_push_and_subscribe_does_not_throw() + { + var src = new DependencyMuxTagUpstreamSource(); + var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc); + var subs = new List(); + var subsLock = new object(); + Exception? failure = null; + + var pusher = Task.Run(() => + { + try + { + for (var i = 0; i < 2000; i++) + src.Push("hot/path", Good(i, now)); + } + catch (Exception ex) { failure = ex; } + }); + + var subscriber = Task.Run(() => + { + try + { + for (var i = 0; i < 2000; i++) + { + var d = src.SubscribeTag("hot/path", (_, _) => { }); + lock (subsLock) subs.Add(d); + } + } + catch (Exception ex) { failure = ex; } + }); + + await Task.WhenAll(pusher, subscriber); + + failure.ShouldBeNull(); + foreach (var d in subs) d.Dispose(); + } +}