diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Instance/WaitForAttribute.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Instance/WaitForAttribute.cs new file mode 100644 index 00000000..39409759 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Instance/WaitForAttribute.cs @@ -0,0 +1,63 @@ +namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Instance; + +/// +/// Request to wait, event-driven, until an attribute reaches a value (or any +/// value satisfying a predicate), bounded by a timeout — the backing protocol for +/// the script-facing Attributes.WaitAsync helper. +/// +/// +/// Site-local only. The optional is a non-serializable +/// in-process delegate, so this message MUST flow only within a single site node's +/// actor system (script execution → Instance Actor). It is never sent across the +/// ClusterClient / gRPC boundary. The value-equality form () +/// would serialize, but the routed/inbound variant is deliberately out of scope here. +/// +/// +/// Per-wait correlation id; keys the waiter registry and the timeout self-message. +/// The instance this wait targets. +/// The attribute to watch — already scope-resolved by the accessor. +/// +/// The codec-encoded target value (AttributeValueCodec.Encode(target)). A +/// match compares the codec-encoded form of the current value against this string. +/// When both this and are null the wait matches on ANY change. +/// +/// +/// Site-local predicate tested against the raw (decoded) current value. Mutually +/// exclusive with — null when the encoded target is used. +/// +/// How long to wait before self-evicting with a timeout reply. +/// When the request was issued (UTC). +public record WaitForAttributeRequest( + string CorrelationId, + string InstanceName, + string AttributeName, + string? TargetValueEncoded, + Func? Predicate, + TimeSpan Timeout, + DateTimeOffset OccurredAtUtc); + +/// +/// Reply to a . Exactly one of +/// / is set on the happy paths; +/// is populated only on the defensive cap-exceeded path. +/// +/// Echoes the request's correlation id. +/// True when the attribute reached the target/predicate within the timeout. +/// The matched value (null on timeout / error). +/// The attribute quality at match time (empty on timeout / error). +/// True when the timeout fired before a match. +/// Non-null only when the wait was refused (e.g. per-instance waiter cap exceeded). +public record WaitForAttributeResponse( + string CorrelationId, + bool Matched, + object? Value, + string Quality, + bool TimedOut, + string? ErrorMessage = null); + +/// +/// Internal self-message scheduled by the Instance Actor to fire a waiter's +/// timeout. Site-local only; never crosses a cluster boundary. +/// +/// The waiter whose timeout fired. +public record WaitForAttributeTimeout(string CorrelationId); diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/AlarmExecutionActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/AlarmExecutionActor.cs index e8fa34b6..3c041a48 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/AlarmExecutionActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/AlarmExecutionActor.cs @@ -113,7 +113,12 @@ public class AlarmExecutionActor : ReceiveActor // context's id as its ParentExecutionId — null today, so the // run is a root, but the plumbing exists for a future // firing id. - parentExecutionId: parentExecutionId); + parentExecutionId: parentExecutionId, + // WaitForAttribute (spec §4.4): thread the alarm on-trigger + // script's per-script execution-timeout token so a + // Attributes.WaitAsync inside an on-trigger script is bounded + // by the same script deadline. + scriptTimeoutToken: cts.Token); var globals = new ScriptGlobals { diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs index 55b4e781..59643389 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs @@ -68,6 +68,18 @@ public class InstanceActor : ReceiveActor // mirroring the rest of the actor's by-name dictionaries). private readonly Dictionary _resolvedAttributeByName = new(); + // WaitForAttribute (spec §4.2): one-shot waiter registry keyed by the + // request CorrelationId. Each entry holds the watched attribute name, the + // match test (decoded target equality OR a site-local predicate), the + // original Sender to reply to, and the scheduled-timeout handle so a match + // can cancel it. Single-threaded actor access — no locking needed. + private readonly Dictionary _attributeWaiters = new(); + + // WaitForAttribute: defensive per-instance cap so a script leaking waiters + // in a loop cannot grow the registry without bound. Exceeding it refuses the + // wait with an error reply rather than registering. + private const int MaxAttributeWaiters = 100; + // DCL manager actor reference for subscribing to tag values private readonly IActorRef? _dclManager; // Maps each tag path to every attribute canonical name that references it. @@ -170,6 +182,12 @@ public class InstanceActor : ReceiveActor // WP-22/23: Handle attribute value changes from DCL (Tell pattern) Receive(HandleAttributeValueChanged); + // WaitForAttribute (spec §4.2): event-driven "wait for value" waiter + // registration + its scheduled-timeout self-message. Both flow only + // site-locally (the predicate variant carries a non-serializable delegate). + Receive(HandleWaitForAttribute); + Receive(HandleWaitForAttributeTimeout); + // Handle tag value updates from DCL — convert to AttributeValueChanged Receive(HandleTagValueUpdate); Receive(_ => { }); // Ack from DCL subscribe — no action needed @@ -519,6 +537,80 @@ public class InstanceActor : ReceiveActor PublishAndNotifyChildren(changed); } + /// + /// WaitForAttribute (spec §4.2): registers a one-shot event-driven waiter for + /// an attribute to reach a value (encoded-equality), satisfy a site-local + /// predicate, or change at all. The current-value fast-path and the + /// change-handling in both run on + /// this single-threaded actor, so a value that flips between "read current" + /// and "register" cannot be missed (spec §5). + /// + private void HandleWaitForAttribute(WaitForAttributeRequest req) + { + // Capture the sender immediately — Sender is invalid once we schedule / + // return and a later message arrives. + var replyer = Sender; + + // Build the match test: explicit predicate wins; else null encoded target + // means "any change"; else compare the codec-encoded current value to the + // encoded target (avoids needing the attribute's DataType to decode). + Func test; + if (req.Predicate is not null) + { + test = req.Predicate; + } + else if (req.TargetValueEncoded is null) + { + test = _ => true; + } + else + { + var target = req.TargetValueEncoded; + test = v => string.Equals( + AttributeValueCodec.Encode(v), target, StringComparison.Ordinal); + } + + // Fast path: the current value already satisfies the test → reply now. + if (_attributes.TryGetValue(req.AttributeName, out var current) && test(current)) + { + _attributeQualities.TryGetValue(req.AttributeName, out var quality); + replyer.Tell(new WaitForAttributeResponse( + req.CorrelationId, Matched: true, current, quality ?? "Good", TimedOut: false)); + return; + } + + // Defensive cap: refuse rather than register if the instance already has + // too many concurrent waiters (guards against a script leaking waiters). + if (_attributeWaiters.Count >= MaxAttributeWaiters) + { + replyer.Tell(new WaitForAttributeResponse( + req.CorrelationId, Matched: false, null, "", TimedOut: false, + ErrorMessage: "Too many concurrent attribute waiters on this instance")); + return; + } + + // Register and schedule the self-evicting timeout (NativeAlarmActor idiom). + var handle = Context.System.Scheduler.ScheduleTellOnceCancelable( + req.Timeout, Self, new WaitForAttributeTimeout(req.CorrelationId), Self); + + _attributeWaiters[req.CorrelationId] = + new PendingWait(req.AttributeName, test, replyer, handle); + } + + /// + /// WaitForAttribute (spec §4.2): the scheduled timeout fired for a waiter that + /// never matched. If still registered (a match would have removed + canceled + /// it), reply TimedOut and evict it. + /// + private void HandleWaitForAttributeTimeout(WaitForAttributeTimeout msg) + { + if (_attributeWaiters.Remove(msg.CorrelationId, out var pending)) + { + pending.Replyer.Tell(new WaitForAttributeResponse( + msg.CorrelationId, Matched: false, null, "", TimedOut: true)); + } + } + /// /// Handles tag value updates from DCL. Maps the tag path back to the attribute /// canonical name and converts to an AttributeValueChanged for unified processing. @@ -924,6 +1016,41 @@ public class InstanceActor : ReceiveActor { alarmActor.Tell(changed); } + + // WaitForAttribute (spec §4.2): re-evaluate any waiters on THIS attribute. + // PublishAndNotifyChildren is THE single choke point for every value change + // — both the DCL ingest path (HandleAttributeValueChanged) and the static + // write path (HandleSetStaticAttributeCore) call it AFTER updating + // _attributes, so changed.Value is the just-applied current value. Iterate a + // snapshot so satisfied waiters can be removed during the loop; each match + // cancels its scheduled timeout (so no stray WaitForAttributeTimeout follows) + // and replies Matched=true. + ResolveMatchedWaiters(changed); + } + + /// + /// WaitForAttribute (spec §4.2): fires every registered waiter on + /// 's attribute whose test now passes against the + /// just-applied value — cancelling its timeout, replying Matched, and removing + /// it from the registry. A no-op when there are no waiters. + /// + private void ResolveMatchedWaiters(AttributeValueChanged changed) + { + if (_attributeWaiters.Count == 0) + return; + + var matched = _attributeWaiters + .Where(kvp => kvp.Value.AttributeName == changed.AttributeName + && kvp.Value.Test(changed.Value)) + .ToList(); + + foreach (var (cid, pending) in matched) + { + pending.Timeout.Cancel(); + pending.Replyer.Tell(new WaitForAttributeResponse( + cid, Matched: true, changed.Value, changed.Quality, TimedOut: false)); + _attributeWaiters.Remove(cid); + } } /// @@ -1202,4 +1329,17 @@ public class InstanceActor : ReceiveActor /// Internal message for async override loading result. /// internal record LoadOverridesResult(Dictionary Overrides, string? Error); + + /// + /// WaitForAttribute (spec §4.2): one registered, not-yet-satisfied waiter. + /// + /// The attribute this waiter watches (scope-resolved). + /// The match test (decoded-target equality OR site-local predicate OR any-change). + /// The original sender to reply to on match / timeout. + /// The scheduled timeout handle, canceled on match. + private sealed record PendingWait( + string AttributeName, + Func Test, + IActorRef Replyer, + ICancelable Timeout); } diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/ScriptExecutionActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/ScriptExecutionActor.cs index 6cc43423..99f34e23 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/ScriptExecutionActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/ScriptExecutionActor.cs @@ -221,7 +221,12 @@ public class ScriptExecutionActor : ReceiveActor // M2.12 (#25): thread the singleton site event logger so // recursion-limit violations at CallScript/CallShared emit a // script Error site event in addition to ILogger.LogError. - siteEventLogger: siteEventLogger); + siteEventLogger: siteEventLogger, + // WaitForAttribute (spec §4.3/§4.4): thread the per-script + // execution-timeout token so Attributes.WaitAsync's Ask is + // bounded by the script's own ExecutionTimeoutSeconds — a + // shorter script deadline wins over the wait's own timeout. + scriptTimeoutToken: cts.Token); var globals = new ScriptGlobals { diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScopeAccessors.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScopeAccessors.cs index bd78a5b5..165a446f 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScopeAccessors.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScopeAccessors.cs @@ -73,6 +73,35 @@ public class AttributeAccessor /// A task that represents the asynchronous operation. public Task SetAsync(string key, object? value) => _ctx.SetAttribute(Resolve(key), AttributeValueCodec.Encode(value) ?? string.Empty); + + /// + /// WaitForAttribute (spec §3-§5): waits event-driven until the attribute equals + /// (value-equality, codec-normalized), bounded by + /// . Returns true if matched within the timeout, + /// false on timeout (no throw). Honors the script's execution-timeout token. + /// Scope/composition path resolution () is applied just like + /// / . + /// + /// The attribute key (scope-resolved before the wait is registered). + /// The value to wait for (codec-encoded for comparison). + /// How long to wait before returning false. + /// true on match within the timeout; false on timeout. + public Task WaitAsync(string key, object? targetValue, TimeSpan timeout) + => _ctx.WaitAttribute(Resolve(key), AttributeValueCodec.Encode(targetValue), null, timeout); + + /// + /// WaitForAttribute (spec §3-§5): predicate form — waits event-driven until + /// returns true for the attribute's current + /// value, bounded by . Site-local only (the predicate + /// is an in-process delegate). Returns true if matched within the timeout, + /// false on timeout (no throw). Scope/composition path resolution applies. + /// + /// The attribute key (scope-resolved before the wait is registered). + /// The site-local predicate tested against the current value. + /// How long to wait before returning false. + /// true on match within the timeout; false on timeout. + public Task WaitAsync(string key, Func predicate, TimeSpan timeout) + => _ctx.WaitAttribute(Resolve(key), null, predicate, timeout); } /// diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs index 6e214e86..d37a91a7 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -46,6 +46,16 @@ public class ScriptRuntimeContext private readonly ILogger _logger; private readonly string _instanceName; + /// + /// WaitForAttribute (spec §4.3): the per-script execution-timeout token from + /// the owning ScriptExecutionActor/AlarmExecutionActor + /// (cts.Token). Bounds the Attributes.WaitAsync Ask so a script + /// that hits its own ExecutionTimeoutSeconds abandons the wait. Defaults + /// to for contexts that do not thread one + /// (legacy callers / tests / the alarm path when it has no CTS). + /// + private readonly CancellationToken _scriptTimeoutToken; + /// /// WP-13: External system client for ExternalSystem.Call/CachedCall. /// @@ -194,6 +204,13 @@ public class ScriptRuntimeContext /// ILogger.LogError + throw. When null the existing behaviour is /// unchanged; all existing callers and tests remain source-compatible. /// + /// + /// WaitForAttribute (spec §4.3): the per-script execution-timeout token + /// (cts.Token on the owning execution actor) used to bound + /// Attributes.WaitAsync. Defaults to + /// for callers / tests that do not + /// thread one — those waits are bounded only by their own timeout. + /// public ScriptRuntimeContext( IActorRef instanceActor, IActorRef self, @@ -215,7 +232,8 @@ public class ScriptRuntimeContext Guid? executionId = null, Guid? parentExecutionId = null, string? sourceNode = null, - ISiteEventLogger? siteEventLogger = null) + ISiteEventLogger? siteEventLogger = null, + CancellationToken scriptTimeoutToken = default) { _instanceActor = instanceActor; _self = self; @@ -245,6 +263,9 @@ public class ScriptRuntimeContext _parentExecutionId = parentExecutionId; // M2.12 (#25): optional — null when not wired (tests / AlarmExecutionActor). _siteEventLogger = siteEventLogger; + // WaitForAttribute (spec §4.3): default(CancellationToken) == None when + // not threaded in — the WaitAsync Ask is then bounded only by its own timeout. + _scriptTimeoutToken = scriptTimeoutToken; } /// @@ -297,7 +318,11 @@ public class ScriptRuntimeContext // …parented to THIS run's execution id (the spawner). parentExecutionId: _executionId, sourceNode: _sourceNode, - siteEventLogger: _siteEventLogger); + siteEventLogger: _siteEventLogger, + // WaitForAttribute (spec §4.3): an inline shared-script call shares the + // parent run's execution-timeout token so a WaitAsync inside the shared + // script is bounded by the SAME script deadline. + scriptTimeoutToken: _scriptTimeoutToken); } /// @@ -360,6 +385,42 @@ public class ScriptRuntimeContext return response.Value; } + /// + /// WaitForAttribute (spec §3-§5): waits event-driven for an attribute to reach + /// a value (encoded-equality), satisfy a site-local predicate, or change at all, + /// bounded by . Returns true if matched within + /// the timeout, false on timeout — NEVER throws on timeout. The backing + /// Attributes.WaitAsync for the accessor. + /// + /// + /// The Ask is bounded by the script's own execution-timeout token (§4.3): a + /// script that hits its ExecutionTimeoutSeconds abandons the wait. The + /// Ask timeout is the wait timeout plus a small slack + /// so the InstanceActor's own scheduled timeout reply is the authoritative path + /// for the false/timed-out outcome, not the Ask deadline. + /// + /// + /// The scope-resolved attribute name to wait on. + /// + /// The codec-encoded target value; null (with null ) + /// means "any change". + /// + /// Site-local predicate; null when the encoded target is used. + /// How long to wait before returning false. + /// true on match within the timeout; false on timeout. + public async Task WaitAttribute( + string name, string? targetValueEncoded, Func? predicate, TimeSpan timeout) + { + var cid = Guid.NewGuid().ToString(); + var req = new WaitForAttributeRequest( + cid, _instanceName, name, targetValueEncoded, predicate, timeout, DateTimeOffset.UtcNow); + + var resp = await _instanceActor.Ask( + req, timeout + _askTimeout, _scriptTimeoutToken); + + return resp.Matched; + } + /// /// Sets an attribute value. For data-connected attributes the Instance Actor /// forwards the write to the DCL, which writes the physical device; the diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/InstanceActorWaitForAttributeTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/InstanceActorWaitForAttributeTests.cs new file mode 100644 index 00000000..c9db26dc --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/InstanceActorWaitForAttributeTests.cs @@ -0,0 +1,341 @@ +using Akka.Actor; +using Akka.TestKit; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Instance; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts; +using System.Text.Json; + +namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors; + +/// +/// Tests for the event-driven WaitForAttribute one-shot waiter registry in +/// (Attributes.WaitAsync spec §3-§5). Covers the +/// fast-path, change-match, timeout, no-leak (timeout-canceled-on-match), and +/// predicate-overload acceptance criteria. +/// +public class InstanceActorWaitForAttributeTests : TestKit, IDisposable +{ + private readonly SiteStorageService _storage; + private readonly ScriptCompilationService _compilationService; + private readonly SharedScriptLibrary _sharedScriptLibrary; + private readonly SiteRuntimeOptions _options; + private readonly string _dbFile; + + public InstanceActorWaitForAttributeTests() + { + _dbFile = Path.Combine(Path.GetTempPath(), $"instance-waitfor-test-{Guid.NewGuid():N}.db"); + _storage = new SiteStorageService( + $"Data Source={_dbFile}", + NullLogger.Instance); + _storage.InitializeAsync().GetAwaiter().GetResult(); + _compilationService = new ScriptCompilationService( + NullLogger.Instance); + _sharedScriptLibrary = new SharedScriptLibrary( + _compilationService, NullLogger.Instance); + _options = new SiteRuntimeOptions(); + } + + private IActorRef CreateInstanceActor(string instanceName, FlattenedConfiguration config) + { + return ActorOf(Props.Create(() => new InstanceActor( + instanceName, + JsonSerializer.Serialize(config), + _storage, + _compilationService, + _sharedScriptLibrary, + null, // no stream manager in tests + _options, + NullLogger.Instance))); + } + + void IDisposable.Dispose() + { + Shutdown(); + try { File.Delete(_dbFile); } catch { /* cleanup */ } + } + + // ── 1. Fast-path: attribute already at target ──────────────────────────── + + /// + /// Acceptance §7.1: when the attribute already equals the target at the time + /// the waiter registers, the actor must reply immediately with Matched=true + /// (carrying the current value), without scheduling a timeout. + /// + [Fact] + public void WaitForAttribute_FastPath_AlreadyAtTarget_RepliesMatchedImmediately() + { + var config = new FlattenedConfiguration + { + InstanceUniqueName = "Pump1", + Attributes = + [ + new ResolvedAttribute { CanonicalName = "Flag", Value = "true", DataType = "Boolean" } + ] + }; + + var actor = CreateInstanceActor("Pump1", config); + + actor.Tell(new WaitForAttributeRequest( + "wfa-fast", "Pump1", "Flag", + "true", null, TimeSpan.FromSeconds(30), DateTimeOffset.UtcNow)); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(response.Matched); + Assert.False(response.TimedOut); + Assert.Equal("wfa-fast", response.CorrelationId); + Assert.Equal("true", response.Value?.ToString()); + } + + // ── 2. Change-match: register first, then drive a value change ─────────── + + /// + /// Acceptance §7.1/§7.4: registering when the value does NOT match, then + /// driving the attribute to the target value (via a DCL TagValueUpdate) must + /// produce a single Matched=true reply carrying the new value. + /// + [Fact] + public void WaitForAttribute_ChangeMatch_RepliesMatchedWithNewValue() + { + const string tag = "ns=3;s=Recipe.Processed"; + var config = new FlattenedConfiguration + { + InstanceUniqueName = "Pump1", + Attributes = + [ + new ResolvedAttribute + { + CanonicalName = "Processed", Value = "false", DataType = "Boolean", + DataSourceReference = tag, BoundDataConnectionName = "PLC" + } + ] + }; + + var dcl = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => new InstanceActor( + "Pump1", + JsonSerializer.Serialize(config), + _storage, + _compilationService, + _sharedScriptLibrary, + null, + _options, + NullLogger.Instance, + dcl.Ref))); + + dcl.ExpectMsg(TimeSpan.FromSeconds(5)); + + // Register: current value "false" does not match the target. The value + // arrives from the DCL as a boolean true, whose codec-encoded form is + // "True" — so the target must be encoded the same way the accessor would + // (AttributeValueCodec.Encode(true)), NOT the literal string "true". + var target = ZB.MOM.WW.ScadaBridge.Commons.Types.AttributeValueCodec.Encode(true); + actor.Tell(new WaitForAttributeRequest( + "wfa-change", "Pump1", "Processed", + target, null, TimeSpan.FromSeconds(30), DateTimeOffset.UtcNow)); + + // No reply yet — the value has not changed to the target. + ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + + // Drive the value to the target through the DCL ingest path. + actor.Tell(new TagValueUpdate("PLC", tag, true, QualityCode.Good, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(response.Matched); + Assert.False(response.TimedOut); + Assert.Equal("wfa-change", response.CorrelationId); + Assert.Equal(true, response.Value); + Assert.Equal("Good", response.Quality); + } + + // ── 3. Timeout: value never matches ────────────────────────────────────── + + /// + /// Acceptance §7.2: when the attribute never reaches the target within the + /// timeout, the actor replies Matched=false, TimedOut=true (no throw). + /// + [Fact] + public void WaitForAttribute_Timeout_RepliesNotMatchedTimedOut() + { + var config = new FlattenedConfiguration + { + InstanceUniqueName = "Pump1", + Attributes = + [ + new ResolvedAttribute { CanonicalName = "Flag", Value = "false", DataType = "Boolean" } + ] + }; + + var actor = CreateInstanceActor("Pump1", config); + + actor.Tell(new WaitForAttributeRequest( + "wfa-timeout", "Pump1", "Flag", + "true", null, TimeSpan.FromMilliseconds(300), DateTimeOffset.UtcNow)); + + // The scheduled timeout fires; allow a tolerant deadline. + var response = ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.False(response.Matched); + Assert.True(response.TimedOut); + Assert.Equal("wfa-timeout", response.CorrelationId); + } + + // ── 4. No-leak: timeout canceled on match (no second reply) ────────────── + + /// + /// Acceptance §7.5: after a successful change-match, the scheduled timeout + /// must have been canceled and the waiter removed — so NO second (timeout) + /// response arrives after the match. + /// + [Fact] + public void WaitForAttribute_Match_CancelsTimeout_NoSecondReply() + { + var config = new FlattenedConfiguration + { + InstanceUniqueName = "Pump1", + Attributes = + [ + new ResolvedAttribute { CanonicalName = "Flag", Value = "false", DataType = "Boolean" } + ] + }; + + var actor = CreateInstanceActor("Pump1", config); + + // Register with a short timeout, then match BEFORE it would fire. + actor.Tell(new WaitForAttributeRequest( + "wfa-noleak", "Pump1", "Flag", + "true", null, TimeSpan.FromMilliseconds(500), DateTimeOffset.UtcNow)); + + // Drive the static value to the target; the actor publishes via + // HandleAttributeValueChanged, satisfying the waiter. + actor.Tell(new SetStaticAttributeCommand( + "set-flag", "Pump1", "Flag", "true", DateTimeOffset.UtcNow)); + + // First reply: the match. (A SetStaticAttributeResponse also arrives for + // the set command — filter for the WaitForAttributeResponse.) + var matched = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(matched.Matched); + Assert.False(matched.TimedOut); + + // The set command's own ack — drain it so the no-msg assert below is clean. + ExpectMsg(TimeSpan.FromSeconds(5)); + + // No second WaitForAttributeResponse (the timeout was canceled) for longer + // than the original 500ms timeout window. + ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + + // ── 5. Predicate overload ──────────────────────────────────────────────── + + /// + /// Acceptance §7 (predicate form): registering with a site-local predicate and + /// then flipping the value so the predicate passes must produce Matched=true. + /// + [Fact] + public void WaitForAttribute_PredicateOverload_MatchesOnPredicatePass() + { + const string tag = "ns=3;s=Level"; + var config = new FlattenedConfiguration + { + InstanceUniqueName = "Pump1", + Attributes = + [ + new ResolvedAttribute + { + CanonicalName = "Level", Value = "0", DataType = "Int32", + DataSourceReference = tag, BoundDataConnectionName = "PLC" + } + ] + }; + + var dcl = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => new InstanceActor( + "Pump1", + JsonSerializer.Serialize(config), + _storage, + _compilationService, + _sharedScriptLibrary, + null, + _options, + NullLogger.Instance, + dcl.Ref))); + + dcl.ExpectMsg(TimeSpan.FromSeconds(5)); + + // Predicate: value > 50 (current is 0, so no immediate match). + Func predicate = v => + v is not null && int.TryParse(v.ToString(), out var n) && n > 50; + + actor.Tell(new WaitForAttributeRequest( + "wfa-pred", "Pump1", "Level", + null, predicate, TimeSpan.FromSeconds(30), DateTimeOffset.UtcNow)); + + ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + + // A value below the threshold must NOT satisfy the predicate. + actor.Tell(new TagValueUpdate("PLC", tag, 25, QualityCode.Good, DateTimeOffset.UtcNow)); + ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + + // A value above the threshold satisfies it. + actor.Tell(new TagValueUpdate("PLC", tag, 75, QualityCode.Good, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(response.Matched); + Assert.False(response.TimedOut); + Assert.Equal(75, response.Value); + } + + // ── 6. "any change" (null target + null predicate) ─────────────────────── + + /// + /// Spec §4.1: a null TargetValueEncoded + null Predicate means "wait for any + /// change" — the next value update on that attribute matches. + /// + [Fact] + public void WaitForAttribute_AnyChange_MatchesOnNextUpdate() + { + const string tag = "ns=3;s=Speed"; + var config = new FlattenedConfiguration + { + InstanceUniqueName = "Pump1", + Attributes = + [ + new ResolvedAttribute + { + CanonicalName = "Speed", Value = "0", DataType = "Int32", + DataSourceReference = tag, BoundDataConnectionName = "PLC" + } + ] + }; + + var dcl = CreateTestProbe(); + var actor = ActorOf(Props.Create(() => new InstanceActor( + "Pump1", + JsonSerializer.Serialize(config), + _storage, + _compilationService, + _sharedScriptLibrary, + null, + _options, + NullLogger.Instance, + dcl.Ref))); + + dcl.ExpectMsg(TimeSpan.FromSeconds(5)); + + // "any change" registers with a non-trivial timeout. The fast-path uses + // `_ => true`, so a currently-present attribute matches immediately. + actor.Tell(new WaitForAttributeRequest( + "wfa-any", "Pump1", "Speed", + null, null, TimeSpan.FromSeconds(30), DateTimeOffset.UtcNow)); + + // Speed=0 is already present, so the "any change" test (_ => true) matches + // immediately on the fast path. + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(response.Matched); + Assert.False(response.TimedOut); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Scripts/ScopeAccessorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Scripts/ScopeAccessorTests.cs index 048a2449..91848839 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Scripts/ScopeAccessorTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Scripts/ScopeAccessorTests.cs @@ -1,3 +1,8 @@ +using Akka.Actor; +using Akka.TestKit; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Instance; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Scripts; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts; @@ -137,3 +142,81 @@ public class ScopeAccessorTests Assert.Equal("[1,2,3]", encoded); } } + +/// +/// WaitAsync (spec §3-§5, acceptance §7.6) scope-resolution tests. Unlike the +/// path-arithmetic tests above, these route a real +/// against a TestProbe standing in for the Instance Actor, so they need a live +/// ActorSystem — hence a TestKit-derived class. They assert that +/// Attributes.WaitAsync applies +/// (the composition prefix) to the key BEFORE the request is sent to the actor — +/// the same contract Get/Set obey. +/// +public class AttributeAccessorWaitAsyncTests : TestKit, IDisposable +{ + private ScriptRuntimeContext MakeContext(IActorRef instanceActor) => + new( + instanceActor, + instanceActor, + sharedScriptLibrary: null!, + currentCallDepth: 0, + maxCallDepth: 10, + askTimeout: TimeSpan.FromSeconds(2), + instanceName: "Pump1", + logger: NullLogger.Instance); + + void IDisposable.Dispose() => Shutdown(); + + [Fact] + public void WaitAsync_Value_AppliesScopeResolution_BeforeSendingRequest() + { + var probe = CreateTestProbe(); + var ctx = MakeContext(probe.Ref); + + // Composed scope "TempSensor" — Resolve("Flag") => "TempSensor.Flag". + var acc = new AttributeAccessor(ctx, "TempSensor"); + + // Fire-and-forget; the assertion is on the message the actor receives. + _ = acc.WaitAsync("Flag", true, TimeSpan.FromSeconds(30)); + + var req = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("TempSensor.Flag", req.AttributeName); + // The value overload encodes the target via AttributeValueCodec.Encode and + // sends a null predicate. bool true encodes to "True" (capital T). + Assert.Equal(AttributeValueCodec.Encode(true), req.TargetValueEncoded); + Assert.Equal("True", req.TargetValueEncoded); + Assert.Null(req.Predicate); + Assert.Equal("Pump1", req.InstanceName); + } + + [Fact] + public void WaitAsync_Predicate_AppliesScopeResolution_AndSendsPredicate() + { + var probe = CreateTestProbe(); + var ctx = MakeContext(probe.Ref); + + var acc = new AttributeAccessor(ctx, "Motor.TempSensor"); + + Func predicate = _ => true; + _ = acc.WaitAsync("Level", predicate, TimeSpan.FromSeconds(30)); + + var req = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("Motor.TempSensor.Level", req.AttributeName); + // The predicate overload sends the delegate and a null encoded target. + Assert.Null(req.TargetValueEncoded); + Assert.NotNull(req.Predicate); + } + + [Fact] + public void WaitAsync_RootScope_LeavesKeyBare() + { + var probe = CreateTestProbe(); + var ctx = MakeContext(probe.Ref); + + var acc = new AttributeAccessor(ctx, ""); + _ = acc.WaitAsync("Flag", true, TimeSpan.FromSeconds(30)); + + var req = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("Flag", req.AttributeName); + } +}