diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/DependencyMuxTagUpstreamSource.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/DependencyMuxTagUpstreamSource.cs
new file mode 100644
index 00000000..5302fe40
--- /dev/null
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/DependencyMuxTagUpstreamSource.cs
@@ -0,0 +1,160 @@
+using System.Collections.Concurrent;
+using System.Collections.Immutable;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
+
+namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
+
+///
+/// Thread-safe the scripted-alarm host actor pushes
+/// tag values INTO and the reads / subscribes FROM.
+/// In the live runtime the host actor translates each Akka
+/// DependencyValueChanged message into a
+/// call; the engine sees those values synchronously through and
+/// reactively through .
+///
+///
+///
+/// A caches the latest snapshot
+/// per path so can answer synchronously (the engine's
+/// startup-recovery + read-cache-refill paths both call it). Per-path observer
+/// lists are held as an immutable list inside a
+/// so subscribe / unsubscribe mutate via atomic compare-and-swap and
+/// can capture-then-invoke a stable
+/// snapshot — a concurrent unsubscribe can never corrupt an in-flight fan-out.
+///
+///
+public sealed class DependencyMuxTagUpstreamSource : ITagUpstreamSource
+{
+ // OPC UA Part 4 StatusCode for an outright Bad value (severity 10, bit 31 set). The
+ // codebase has no shared Core.Abstractions constant — concrete producers (GalaxyDriver,
+ // the Wonderware historian client) inline this same 0x80000000u, and the engine's own
+ // AreInputsReady gate tests exactly this bit — so an "unknown path" snapshot uses it too.
+ private const uint StatusBad = 0x80000000u;
+
+ private readonly ConcurrentDictionary _cache
+ = new(StringComparer.Ordinal);
+ private readonly ConcurrentDictionary>> _observers
+ = new(StringComparer.Ordinal);
+ private readonly Func _clock;
+
+ ///
+ /// Initializes a new .
+ ///
+ ///
+ /// Optional function supplying the current UTC time, used to stamp the Bad-quality
+ /// snapshot returned for an unknown path. Defaults to ,
+ /// mirroring how takes its clock.
+ ///
+ public DependencyMuxTagUpstreamSource(Func? clock = null)
+ => _clock = clock ?? (() => DateTime.UtcNow);
+
+ ///
+ /// Update the cached snapshot for and then notify every
+ /// observer currently subscribed to that path. NOT part of
+ /// — the host actor calls this from its
+ /// DependencyValueChanged handler.
+ ///
+ /// The tag path whose value changed.
+ /// The new value snapshot.
+ public void Push(string path, DataValueSnapshot snapshot)
+ {
+ ArgumentNullException.ThrowIfNull(path);
+ ArgumentNullException.ThrowIfNull(snapshot);
+
+ // Cache first so any observer that re-reads the path inside its callback sees the
+ // value it is being notified about, not the prior one.
+ _cache[path] = snapshot;
+
+ // Capture the immutable observer snapshot, then invoke outside any lock: a
+ // concurrent Subscribe/Dispose swaps the dictionary entry atomically and cannot
+ // mutate the list we are iterating.
+ if (_observers.TryGetValue(path, out var observers))
+ {
+ foreach (var observer in observers)
+ observer(path, snapshot);
+ }
+ }
+
+ ///
+ public DataValueSnapshot ReadTag(string path)
+ {
+ ArgumentNullException.ThrowIfNull(path);
+ if (_cache.TryGetValue(path, out var snapshot))
+ return snapshot;
+
+ // No value has been pushed for this path yet — return a Bad-quality placeholder so
+ // the engine's cold-start guard holds the prior condition rather than NRE-ing on a
+ // null value cast.
+ var now = _clock();
+ return new DataValueSnapshot(Value: null, StatusCode: StatusBad, SourceTimestampUtc: null, ServerTimestampUtc: now);
+ }
+
+ ///
+ public IDisposable SubscribeTag(string path, Action observer)
+ {
+ ArgumentNullException.ThrowIfNull(path);
+ ArgumentNullException.ThrowIfNull(observer);
+
+ _observers.AddOrUpdate(
+ path,
+ _ => ImmutableList.Create(observer),
+ (_, existing) => existing.Add(observer));
+
+ return new Subscription(this, path, observer);
+ }
+
+ private void Unsubscribe(string path, Action observer)
+ {
+ // Atomically remove exactly this observer. Robust to dispose-after-already-removed
+ // (the path entry may be gone, or the observer may already have been pulled).
+ while (_observers.TryGetValue(path, out var existing))
+ {
+ var updated = existing.Remove(observer);
+ if (ReferenceEquals(updated, existing))
+ return; // observer not present — nothing to do
+
+ if (updated.IsEmpty)
+ {
+ // Drop the empty entry, but only if it hasn't changed under us. The
+ // KeyValuePair overload removes atomically iff both key AND value still match.
+ if (_observers.TryRemove(new KeyValuePair>>(path, existing)))
+ return;
+ // Lost the race — another mutation swapped the entry; retry.
+ continue;
+ }
+
+ if (_observers.TryUpdate(path, updated, existing))
+ return;
+ // Lost the CAS — another subscribe/unsubscribe won; retry with the fresh list.
+ }
+ }
+
+ ///
+ /// Per-observer subscription handle. deregisters exactly the
+ /// observer it was created for and is idempotent — calling it after the observer has
+ /// already been removed is a no-op.
+ ///
+ private sealed class Subscription : IDisposable
+ {
+ private DependencyMuxTagUpstreamSource? _owner;
+ private readonly string _path;
+ private readonly Action _observer;
+
+ public Subscription(DependencyMuxTagUpstreamSource owner, string path, Action observer)
+ {
+ _owner = owner;
+ _path = path;
+ _observer = observer;
+ }
+
+ public void Dispose()
+ {
+ // Swap _owner to null first so a double-dispose can't deregister twice (the
+ // second call sees null). Interlocked makes the guard safe under concurrent
+ // Dispose calls on the same handle.
+ var owner = Interlocked.Exchange(ref _owner, null);
+ owner?.Unsubscribe(_path, _observer);
+ }
+ }
+}
diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj
index 42a358d8..ce69137b 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj
@@ -25,6 +25,10 @@
-->
+
+
diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/DependencyMuxTagUpstreamSourceTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/DependencyMuxTagUpstreamSourceTests.cs
new file mode 100644
index 00000000..d26b5381
--- /dev/null
+++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/DependencyMuxTagUpstreamSourceTests.cs
@@ -0,0 +1,145 @@
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
+
+namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms;
+
+///
+/// Unit tests for — the host-actor-fed
+/// ITagUpstreamSource the scripted-alarm engine reads + subscribes through.
+///
+public sealed class DependencyMuxTagUpstreamSourceTests
+{
+ private static DataValueSnapshot Good(object? value, DateTime ts)
+ => new(value, 0u, ts, ts);
+
+ [Fact]
+ public void Push_then_ReadTag_returns_the_pushed_snapshot()
+ {
+ var src = new DependencyMuxTagUpstreamSource();
+ var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
+ var snap = Good(42, now);
+
+ src.Push("a/b/c", snap);
+
+ src.ReadTag("a/b/c").ShouldBeSameAs(snap);
+ }
+
+ [Fact]
+ public void ReadTag_of_unknown_path_returns_a_Bad_quality_snapshot()
+ {
+ var fixedNow = new DateTime(2026, 06, 10, 9, 30, 0, DateTimeKind.Utc);
+ var src = new DependencyMuxTagUpstreamSource(clock: () => fixedNow);
+
+ var snap = src.ReadTag("never/pushed");
+
+ snap.Value.ShouldBeNull();
+ // Bad: OPC UA Part 4 StatusCode bit 31 set (severity 10).
+ (snap.StatusCode & 0x80000000u).ShouldNotBe(0u);
+ snap.StatusCode.ShouldNotBe(0u);
+ // Timestamp comes from the injected clock.
+ snap.ServerTimestampUtc.ShouldBe(fixedNow);
+ }
+
+ [Fact]
+ public void SubscribeTag_observer_fires_on_next_push_to_same_path_only()
+ {
+ var src = new DependencyMuxTagUpstreamSource();
+ var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
+ var received = new List<(string Path, DataValueSnapshot Snap)>();
+
+ using var sub = src.SubscribeTag("tag/x", (p, s) => received.Add((p, s)));
+
+ // A push to a DIFFERENT path must NOT fire this observer.
+ src.Push("tag/other", Good(1, now));
+ received.ShouldBeEmpty();
+
+ var snap = Good(7, now);
+ src.Push("tag/x", snap);
+
+ received.Count.ShouldBe(1);
+ received[0].Path.ShouldBe("tag/x");
+ received[0].Snap.ShouldBeSameAs(snap);
+ }
+
+ [Fact]
+ public void Multiple_observers_on_same_path_all_fire()
+ {
+ var src = new DependencyMuxTagUpstreamSource();
+ var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
+ var a = 0;
+ var b = 0;
+
+ using var subA = src.SubscribeTag("tag/y", (_, _) => a++);
+ using var subB = src.SubscribeTag("tag/y", (_, _) => b++);
+
+ src.Push("tag/y", Good(1, now));
+
+ a.ShouldBe(1);
+ b.ShouldBe(1);
+ }
+
+ [Fact]
+ public void Disposing_a_subscription_stops_delivery_to_that_observer_only()
+ {
+ var src = new DependencyMuxTagUpstreamSource();
+ var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
+ var a = 0;
+ var b = 0;
+
+ var subA = src.SubscribeTag("tag/z", (_, _) => a++);
+ using var subB = src.SubscribeTag("tag/z", (_, _) => b++);
+
+ src.Push("tag/z", Good(1, now));
+ a.ShouldBe(1);
+ b.ShouldBe(1);
+
+ subA.Dispose();
+
+ src.Push("tag/z", Good(2, now));
+ a.ShouldBe(1); // disposed — no further delivery
+ b.ShouldBe(2); // still subscribed
+
+ // Dispose-after-already-removed must be a no-op, not throw.
+ Should.NotThrow(() => subA.Dispose());
+ }
+
+ [Fact]
+ public async Task Concurrent_push_and_subscribe_does_not_throw()
+ {
+ var src = new DependencyMuxTagUpstreamSource();
+ var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
+ var subs = new List();
+ var subsLock = new object();
+ Exception? failure = null;
+
+ var pusher = Task.Run(() =>
+ {
+ try
+ {
+ for (var i = 0; i < 2000; i++)
+ src.Push("hot/path", Good(i, now));
+ }
+ catch (Exception ex) { failure = ex; }
+ });
+
+ var subscriber = Task.Run(() =>
+ {
+ try
+ {
+ for (var i = 0; i < 2000; i++)
+ {
+ var d = src.SubscribeTag("hot/path", (_, _) => { });
+ lock (subsLock) subs.Add(d);
+ }
+ }
+ catch (Exception ex) { failure = ex; }
+ });
+
+ await Task.WhenAll(pusher, subscriber);
+
+ failure.ShouldBeNull();
+ foreach (var d in subs) d.Dispose();
+ }
+}