feat(scripted-alarms): DependencyMuxTagUpstreamSource (T7)

Concrete ITagUpstreamSource the scripted-alarm host actor pushes
DependencyValueChanged values into and ScriptedAlarmEngine reads/subscribes
from. Thread-safe: ConcurrentDictionary value cache + per-path ImmutableList
observer lists with atomic add/remove and capture-then-invoke fan-out.
ReadTag of an unknown path returns a Bad-quality (0x80000000) snapshot stamped
via the injected clock. Adds the Core.ScriptedAlarms project reference Runtime
needs to see the interface.
This commit is contained in:
Joseph Doherty
2026-06-10 14:20:02 -04:00
parent b5748288df
commit 945ccd0b85
3 changed files with 309 additions and 0 deletions
@@ -0,0 +1,160 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
/// <summary>
/// Thread-safe <see cref="ITagUpstreamSource"/> the scripted-alarm host actor pushes
/// tag values INTO and the <see cref="ScriptedAlarmEngine"/> reads / subscribes FROM.
/// In the live runtime the host actor translates each Akka
/// <c>DependencyValueChanged</c> message into a <see cref="Push(string, DataValueSnapshot)"/>
/// call; the engine sees those values synchronously through <see cref="ReadTag"/> and
/// reactively through <see cref="SubscribeTag"/>.
/// </summary>
/// <remarks>
/// <para>
/// A <see cref="ConcurrentDictionary{TKey, TValue}"/> caches the latest snapshot
/// per path so <see cref="ReadTag"/> can answer synchronously (the engine's
/// startup-recovery + read-cache-refill paths both call it). Per-path observer
/// lists are held as an immutable list inside a <see cref="ConcurrentDictionary{TKey, TValue}"/>
/// so subscribe / unsubscribe mutate via atomic compare-and-swap and
/// <see cref="Push(string, DataValueSnapshot)"/> can capture-then-invoke a stable
/// snapshot — a concurrent unsubscribe can never corrupt an in-flight fan-out.
/// </para>
/// </remarks>
public sealed class DependencyMuxTagUpstreamSource : ITagUpstreamSource
{
// OPC UA Part 4 StatusCode for an outright Bad value (severity 10, bit 31 set). The
// codebase has no shared Core.Abstractions constant — concrete producers (GalaxyDriver,
// the Wonderware historian client) inline this same 0x80000000u, and the engine's own
// AreInputsReady gate tests exactly this bit — so an "unknown path" snapshot uses it too.
private const uint StatusBad = 0x80000000u;
private readonly ConcurrentDictionary<string, DataValueSnapshot> _cache
= new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, ImmutableList<Action<string, DataValueSnapshot>>> _observers
= new(StringComparer.Ordinal);
private readonly Func<DateTime> _clock;
/// <summary>
/// Initializes a new <see cref="DependencyMuxTagUpstreamSource"/>.
/// </summary>
/// <param name="clock">
/// Optional function supplying the current UTC time, used to stamp the Bad-quality
/// snapshot returned for an unknown path. Defaults to <see cref="DateTime.UtcNow"/>,
/// mirroring how <see cref="ScriptedAlarmEngine"/> takes its clock.
/// </param>
public DependencyMuxTagUpstreamSource(Func<DateTime>? clock = null)
=> _clock = clock ?? (() => DateTime.UtcNow);
/// <summary>
/// Update the cached snapshot for <paramref name="path"/> and then notify every
/// observer currently subscribed to that path. NOT part of
/// <see cref="ITagUpstreamSource"/> — the host actor calls this from its
/// <c>DependencyValueChanged</c> handler.
/// </summary>
/// <param name="path">The tag path whose value changed.</param>
/// <param name="snapshot">The new value snapshot.</param>
public void Push(string path, DataValueSnapshot snapshot)
{
ArgumentNullException.ThrowIfNull(path);
ArgumentNullException.ThrowIfNull(snapshot);
// Cache first so any observer that re-reads the path inside its callback sees the
// value it is being notified about, not the prior one.
_cache[path] = snapshot;
// Capture the immutable observer snapshot, then invoke outside any lock: a
// concurrent Subscribe/Dispose swaps the dictionary entry atomically and cannot
// mutate the list we are iterating.
if (_observers.TryGetValue(path, out var observers))
{
foreach (var observer in observers)
observer(path, snapshot);
}
}
/// <inheritdoc/>
public DataValueSnapshot ReadTag(string path)
{
ArgumentNullException.ThrowIfNull(path);
if (_cache.TryGetValue(path, out var snapshot))
return snapshot;
// No value has been pushed for this path yet — return a Bad-quality placeholder so
// the engine's cold-start guard holds the prior condition rather than NRE-ing on a
// null value cast.
var now = _clock();
return new DataValueSnapshot(Value: null, StatusCode: StatusBad, SourceTimestampUtc: null, ServerTimestampUtc: now);
}
/// <inheritdoc/>
public IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer)
{
ArgumentNullException.ThrowIfNull(path);
ArgumentNullException.ThrowIfNull(observer);
_observers.AddOrUpdate(
path,
_ => ImmutableList.Create(observer),
(_, existing) => existing.Add(observer));
return new Subscription(this, path, observer);
}
private void Unsubscribe(string path, Action<string, DataValueSnapshot> observer)
{
// Atomically remove exactly this observer. Robust to dispose-after-already-removed
// (the path entry may be gone, or the observer may already have been pulled).
while (_observers.TryGetValue(path, out var existing))
{
var updated = existing.Remove(observer);
if (ReferenceEquals(updated, existing))
return; // observer not present — nothing to do
if (updated.IsEmpty)
{
// Drop the empty entry, but only if it hasn't changed under us. The
// KeyValuePair overload removes atomically iff both key AND value still match.
if (_observers.TryRemove(new KeyValuePair<string, ImmutableList<Action<string, DataValueSnapshot>>>(path, existing)))
return;
// Lost the race — another mutation swapped the entry; retry.
continue;
}
if (_observers.TryUpdate(path, updated, existing))
return;
// Lost the CAS — another subscribe/unsubscribe won; retry with the fresh list.
}
}
/// <summary>
/// Per-observer subscription handle. <see cref="Dispose"/> deregisters exactly the
/// observer it was created for and is idempotent — calling it after the observer has
/// already been removed is a no-op.
/// </summary>
private sealed class Subscription : IDisposable
{
private DependencyMuxTagUpstreamSource? _owner;
private readonly string _path;
private readonly Action<string, DataValueSnapshot> _observer;
public Subscription(DependencyMuxTagUpstreamSource owner, string path, Action<string, DataValueSnapshot> observer)
{
_owner = owner;
_path = path;
_observer = observer;
}
public void Dispose()
{
// Swap _owner to null first so a double-dispose can't deregister twice (the
// second call sees null). Interlocked makes the guard safe under concurrent
// Dispose calls on the same handle.
var owner = Interlocked.Exchange(ref _owner, null);
owner?.Unsubscribe(_path, _observer);
}
}
}
@@ -25,6 +25,10 @@
-->
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
<!-- ITagUpstreamSource (the scripted-alarm engine's value-feed seam) lives here;
DependencyMuxTagUpstreamSource implements it so the host actor can push
DependencyValueChanged values into the engine. -->
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms\ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.csproj"/>
<!-- IScriptLogPublisher lives in Core.Scripting; DpsScriptLogPublisher implements it
here so the concrete Akka DPS routing stays out of the Core layer. -->
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.Scripting\ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
@@ -0,0 +1,145 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms;
/// <summary>
/// Unit tests for <see cref="DependencyMuxTagUpstreamSource"/> — the host-actor-fed
/// <c>ITagUpstreamSource</c> the scripted-alarm engine reads + subscribes through.
/// </summary>
public sealed class DependencyMuxTagUpstreamSourceTests
{
private static DataValueSnapshot Good(object? value, DateTime ts)
=> new(value, 0u, ts, ts);
[Fact]
public void Push_then_ReadTag_returns_the_pushed_snapshot()
{
var src = new DependencyMuxTagUpstreamSource();
var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
var snap = Good(42, now);
src.Push("a/b/c", snap);
src.ReadTag("a/b/c").ShouldBeSameAs(snap);
}
[Fact]
public void ReadTag_of_unknown_path_returns_a_Bad_quality_snapshot()
{
var fixedNow = new DateTime(2026, 06, 10, 9, 30, 0, DateTimeKind.Utc);
var src = new DependencyMuxTagUpstreamSource(clock: () => fixedNow);
var snap = src.ReadTag("never/pushed");
snap.Value.ShouldBeNull();
// Bad: OPC UA Part 4 StatusCode bit 31 set (severity 10).
(snap.StatusCode & 0x80000000u).ShouldNotBe(0u);
snap.StatusCode.ShouldNotBe(0u);
// Timestamp comes from the injected clock.
snap.ServerTimestampUtc.ShouldBe(fixedNow);
}
[Fact]
public void SubscribeTag_observer_fires_on_next_push_to_same_path_only()
{
var src = new DependencyMuxTagUpstreamSource();
var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
var received = new List<(string Path, DataValueSnapshot Snap)>();
using var sub = src.SubscribeTag("tag/x", (p, s) => received.Add((p, s)));
// A push to a DIFFERENT path must NOT fire this observer.
src.Push("tag/other", Good(1, now));
received.ShouldBeEmpty();
var snap = Good(7, now);
src.Push("tag/x", snap);
received.Count.ShouldBe(1);
received[0].Path.ShouldBe("tag/x");
received[0].Snap.ShouldBeSameAs(snap);
}
[Fact]
public void Multiple_observers_on_same_path_all_fire()
{
var src = new DependencyMuxTagUpstreamSource();
var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
var a = 0;
var b = 0;
using var subA = src.SubscribeTag("tag/y", (_, _) => a++);
using var subB = src.SubscribeTag("tag/y", (_, _) => b++);
src.Push("tag/y", Good(1, now));
a.ShouldBe(1);
b.ShouldBe(1);
}
[Fact]
public void Disposing_a_subscription_stops_delivery_to_that_observer_only()
{
var src = new DependencyMuxTagUpstreamSource();
var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
var a = 0;
var b = 0;
var subA = src.SubscribeTag("tag/z", (_, _) => a++);
using var subB = src.SubscribeTag("tag/z", (_, _) => b++);
src.Push("tag/z", Good(1, now));
a.ShouldBe(1);
b.ShouldBe(1);
subA.Dispose();
src.Push("tag/z", Good(2, now));
a.ShouldBe(1); // disposed — no further delivery
b.ShouldBe(2); // still subscribed
// Dispose-after-already-removed must be a no-op, not throw.
Should.NotThrow(() => subA.Dispose());
}
[Fact]
public async Task Concurrent_push_and_subscribe_does_not_throw()
{
var src = new DependencyMuxTagUpstreamSource();
var now = new DateTime(2026, 06, 10, 12, 0, 0, DateTimeKind.Utc);
var subs = new List<IDisposable>();
var subsLock = new object();
Exception? failure = null;
var pusher = Task.Run(() =>
{
try
{
for (var i = 0; i < 2000; i++)
src.Push("hot/path", Good(i, now));
}
catch (Exception ex) { failure = ex; }
});
var subscriber = Task.Run(() =>
{
try
{
for (var i = 0; i < 2000; i++)
{
var d = src.SubscribeTag("hot/path", (_, _) => { });
lock (subsLock) subs.Add(d);
}
}
catch (Exception ex) { failure = ex; }
});
await Task.WhenAll(pusher, subscriber);
failure.ShouldBeNull();
foreach (var d in subs) d.Dispose();
}
}