fix(siteruntime): harden WaitAsync — no spurious match on quality republish, guard throwing predicate, Ask-timeout returns false

This commit is contained in:
Joseph Doherty
2026-06-17 08:44:03 -04:00
parent 75ffa09b8f
commit 04e97f4a87
5 changed files with 390 additions and 32 deletions
@@ -571,12 +571,35 @@ public class InstanceActor : ReceiveActor
}
// Fast path: the current value already satisfies the test → reply now.
if (_attributes.TryGetValue(req.AttributeName, out var current) && test(current))
// A script-supplied predicate (or the codec-equality lambda) runs on the
// actor thread; guard it so a throwing predicate cannot crash the actor or
// leak a never-resolved waiter. On throw: reply non-matched + ErrorMessage
// and return WITHOUT registering (no timeout scheduled).
if (_attributes.TryGetValue(req.AttributeName, out var current))
{
_attributeQualities.TryGetValue(req.AttributeName, out var quality);
replyer.Tell(new WaitForAttributeResponse(
req.CorrelationId, Matched: true, current, quality ?? "Good", TimedOut: false));
return;
bool fastMatch;
try
{
fastMatch = test(current);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"WaitForAttribute predicate threw on the fast-path for {Instance}.{Attribute}; refusing the wait",
_instanceUniqueName, req.AttributeName);
replyer.Tell(new WaitForAttributeResponse(
req.CorrelationId, Matched: false, null, null, TimedOut: false,
ErrorMessage: "Wait predicate threw: " + ex.Message));
return;
}
if (fastMatch)
{
_attributeQualities.TryGetValue(req.AttributeName, out var quality);
replyer.Tell(new WaitForAttributeResponse(
req.CorrelationId, Matched: true, current, quality ?? "Good", TimedOut: false));
return;
}
}
// Defensive cap: refuse rather than register if the instance already has
@@ -584,7 +607,7 @@ public class InstanceActor : ReceiveActor
if (_attributeWaiters.Count >= MaxAttributeWaiters)
{
replyer.Tell(new WaitForAttributeResponse(
req.CorrelationId, Matched: false, null, "", TimedOut: false,
req.CorrelationId, Matched: false, null, null, TimedOut: false,
ErrorMessage: "Too many concurrent attribute waiters on this instance"));
return;
}
@@ -607,7 +630,7 @@ public class InstanceActor : ReceiveActor
if (_attributeWaiters.Remove(msg.CorrelationId, out var pending))
{
pending.Replyer.Tell(new WaitForAttributeResponse(
msg.CorrelationId, Matched: false, null, "", TimedOut: true));
msg.CorrelationId, Matched: false, null, null, TimedOut: true));
}
}
@@ -648,9 +671,14 @@ public class InstanceActor : ReceiveActor
_attributeQualities[attrName] = "Bad";
_attributeTimestamps[attrName] = update.Timestamp;
var currentValue = _attributes.GetValueOrDefault(attrName);
// WaitForAttribute (spec §4.2): quality-only republish — the
// stored value is UNCHANGED (we publish the OLD currentValue, only
// the quality flips to Bad). Do NOT evaluate waiters, or an
// "any-change" / unchanged-value-equality waiter would fire on a
// non-change.
PublishAndNotifyChildren(new AttributeValueChanged(
_instanceUniqueName, update.TagPath, attrName,
currentValue, "Bad", update.Timestamp));
currentValue, "Bad", update.Timestamp), evaluateWaiters: false);
}
continue;
}
@@ -1000,7 +1028,17 @@ public class InstanceActor : ReceiveActor
/// Publishes attribute change to stream and notifies child Script/Alarm actors.
/// WP-22: Tell for attribute notifications (fire-and-forget, never blocks).
/// </summary>
private void PublishAndNotifyChildren(AttributeValueChanged changed)
/// <param name="changed">The attribute change to publish.</param>
/// <param name="evaluateWaiters">
/// WaitForAttribute (spec §4.2): when <c>true</c> (the default), registered
/// <c>Attributes.WaitAsync</c> waiters on this attribute are re-evaluated against
/// <paramref name="changed"/>'s value. Pass <c>false</c> on republish/quality-only
/// paths that do NOT assign a new value to <c>_attributes[name]</c> (e.g. the
/// List-coerce-failure Bad-quality republish, which publishes the OLD value) —
/// otherwise an "any-change" waiter (or a waiter whose target equals the unchanged
/// value) would spuriously fire even though nothing actually changed.
/// </param>
private void PublishAndNotifyChildren(AttributeValueChanged changed, bool evaluateWaiters = true)
{
// WP-23: Publish to site-wide stream
_streamManager?.PublishAttributeValueChanged(changed);
@@ -1017,15 +1055,19 @@ public class InstanceActor : ReceiveActor
alarmActor.Tell(changed);
}
// WaitForAttribute (spec §4.2): re-evaluate any waiters on THIS attribute.
// PublishAndNotifyChildren is THE single choke point for every value change
// — both the DCL ingest path (HandleAttributeValueChanged) and the static
// write path (HandleSetStaticAttributeCore) call it AFTER updating
// _attributes, so changed.Value is the just-applied current value. Iterate a
// snapshot so satisfied waiters can be removed during the loop; each match
// cancels its scheduled timeout (so no stray WaitForAttributeTimeout follows)
// and replies Matched=true.
ResolveMatchedWaiters(changed);
// WaitForAttribute (spec §4.2): re-evaluate any waiters on THIS attribute
// but ONLY when this publish reflects a real value change (evaluateWaiters).
// The genuine value-change paths (HandleAttributeValueChanged, the scalar
// DCL update path, HandleSetStaticAttributeCore) call it AFTER assigning
// _attributes[name], so changed.Value is the just-applied current value.
// Republish/quality-only paths (List-coerce-failure Bad-quality, which
// publishes the OLD value) pass evaluateWaiters:false so an "any-change" or
// unchanged-value-equality waiter does not spuriously fire (spec §4.2).
// Iterate a snapshot so satisfied waiters can be removed during the loop;
// each match cancels its scheduled timeout (so no stray WaitForAttributeTimeout
// follows) and replies Matched=true.
if (evaluateWaiters)
ResolveMatchedWaiters(changed);
}
/// <summary>
@@ -1033,19 +1075,50 @@ public class InstanceActor : ReceiveActor
/// <paramref name="changed"/>'s attribute whose test now passes against the
/// just-applied value — cancelling its timeout, replying Matched, and removing
/// it from the registry. A no-op when there are no waiters.
///
/// <para>
/// Each waiter's match test runs inside a per-waiter try/catch: a throwing
/// script-supplied predicate (or codec lambda) must NOT abort the loop and
/// strand sibling waiters on the same attribute, nor leave the throwing waiter
/// registered with a live scheduled timeout. On throw we cancel that waiter's
/// timeout, reply non-matched + ErrorMessage, remove it, and continue.
/// </para>
/// </summary>
private void ResolveMatchedWaiters(AttributeValueChanged changed)
{
if (_attributeWaiters.Count == 0)
return;
var matched = _attributeWaiters
.Where(kvp => kvp.Value.AttributeName == changed.AttributeName
&& kvp.Value.Test(changed.Value))
// Snapshot the candidate waiters on THIS attribute. Iterating a snapshot
// (and NOT evaluating the test inside the LINQ filter) keeps removal mid-loop
// safe and ensures one throwing test cannot abort materialization for siblings.
var candidates = _attributeWaiters
.Where(kvp => kvp.Value.AttributeName == changed.AttributeName)
.ToList();
foreach (var (cid, pending) in matched)
foreach (var (cid, pending) in candidates)
{
bool matched;
try
{
matched = pending.Test(changed.Value);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"WaitForAttribute predicate threw while resolving waiter {CorrelationId} on {Instance}.{Attribute}; evicting it",
cid, _instanceUniqueName, changed.AttributeName);
pending.Timeout.Cancel();
pending.Replyer.Tell(new WaitForAttributeResponse(
cid, Matched: false, null, null, TimedOut: false,
ErrorMessage: "Wait predicate threw: " + ex.Message));
_attributeWaiters.Remove(cid);
continue;
}
if (!matched)
continue;
pending.Timeout.Cancel();
pending.Replyer.Tell(new WaitForAttributeResponse(
cid, Matched: true, changed.Value, changed.Quality, TimedOut: false));
@@ -81,9 +81,24 @@ public class AttributeAccessor
/// <c>false</c> on timeout (no throw). Honors the script's execution-timeout token.
/// Scope/composition path resolution (<see cref="Resolve"/>) is applied just like
/// <see cref="GetAsync"/> / <see cref="SetAsync"/>.
///
/// <para>
/// <b>Quality-agnostic by default (spec §4.2):</b> matching tests the VALUE, not
/// the quality — a value arriving at Bad quality still satisfies the wait. A
/// quality-gated ("Good"-only) mode is a planned enhancement, deferred per spec §4.2.
/// </para>
///
/// <para>
/// Passing a <b>null</b> <paramref name="targetValue"/> means "match on any change":
/// the wait then matches the next value the attribute receives — and matches
/// IMMEDIATELY (fast-path) if the attribute already holds any value at registration.
/// </para>
/// </summary>
/// <param name="key">The attribute key (scope-resolved before the wait is registered).</param>
/// <param name="targetValue">The value to wait for (codec-encoded for comparison).</param>
/// <param name="targetValue">
/// The value to wait for (codec-encoded for comparison); <c>null</c> means
/// "match on any change" (matches immediately if the attribute already has a value).
/// </param>
/// <param name="timeout">How long to wait before returning false.</param>
/// <returns><c>true</c> on match within the timeout; <c>false</c> on timeout.</returns>
public Task<bool> WaitAsync(string key, object? targetValue, TimeSpan timeout)
@@ -95,6 +110,13 @@ public class AttributeAccessor
/// value, bounded by <paramref name="timeout"/>. Site-local only (the predicate
/// is an in-process delegate). Returns <c>true</c> if matched within the timeout,
/// <c>false</c> on timeout (no throw). Scope/composition path resolution applies.
///
/// <para>
/// <b>Quality-agnostic by default (spec §4.2):</b> the predicate is tested against
/// the VALUE, regardless of quality — a value arriving at Bad quality still
/// satisfies the wait if the predicate passes. A quality-gated ("Good"-only) mode
/// is a planned enhancement, deferred per spec §4.2.
/// </para>
/// </summary>
/// <param name="key">The attribute key (scope-resolved before the wait is registered).</param>
/// <param name="predicate">The site-local predicate tested against the current value.</param>
@@ -399,6 +399,21 @@ public class ScriptRuntimeContext
/// so the InstanceActor's own scheduled timeout reply is the authoritative path
/// for the false/timed-out outcome, not the Ask deadline.
/// </para>
///
/// <para>
/// <b>Quality-agnostic by default (spec §4.2):</b> a value arriving at Bad
/// quality still satisfies the wait — the match tests the value, not the quality.
/// A quality-gated ("Good"-only) mode is a planned enhancement, deferred per spec §4.2.
/// </para>
///
/// <para>
/// <b>Never throws on timeout.</b> An <see cref="Akka.Actor.AskTimeoutException"/>
/// (the pathological case where the InstanceActor's authoritative timeout reply
/// never arrives — actor stopped/restarted) is caught and surfaced as <c>false</c>,
/// matching the timeout contract. An <see cref="OperationCanceledException"/> /
/// <see cref="TaskCanceledException"/> from the script-deadline token is NOT caught
/// — it propagates to abort the script (intended §4.3 behaviour).
/// </para>
/// </summary>
/// <param name="name">The scope-resolved attribute name to wait on.</param>
/// <param name="targetValueEncoded">
@@ -415,10 +430,24 @@ public class ScriptRuntimeContext
var req = new WaitForAttributeRequest(
cid, _instanceName, name, targetValueEncoded, predicate, timeout, DateTimeOffset.UtcNow);
var resp = await _instanceActor.Ask<WaitForAttributeResponse>(
req, timeout + _askTimeout, _scriptTimeoutToken);
try
{
var resp = await _instanceActor.Ask<WaitForAttributeResponse>(
req, timeout + _askTimeout, _scriptTimeoutToken);
return resp.Matched;
return resp.Matched;
}
catch (AskTimeoutException)
{
// Pathological: the InstanceActor's own scheduled timeout reply never
// arrived (e.g. the actor stopped/restarted under us). The helper's
// contract is "false on timeout, never throw" — so swallow and return
// false rather than leaking the Ask exception to the script.
// OperationCanceledException / TaskCanceledException from the
// script-deadline token are deliberately NOT caught here: they must
// propagate to abort the script (§4.3).
return false;
}
}
/// <summary>