feat(siteruntime): event-driven Attributes.WriteBatchAndWaitAsync (batched DCL write + trigger + existing WaitForAttribute waiter) + compile mirror

This commit is contained in:
Joseph Doherty
2026-06-17 12:13:02 -04:00
parent e8db6c71a8
commit 0e989c867d
8 changed files with 363 additions and 0 deletions
@@ -0,0 +1,40 @@
namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
/// <summary>
/// Request to write a SET of device tags through the DCL in ONE batch round-trip,
/// optionally followed by a trigger-flag write. Composes the adapter's
/// <see cref="ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol.IDataConnection.WriteBatchAsync"/>
/// (the batch) with a single <c>WriteAsync</c> (the trigger) so the script-facing
/// <c>Attributes.WriteBatchAndWaitAsync</c> helper replaces N sequential per-attribute
/// writes with one gateway call. The wait half is the EXISTING event-driven
/// <c>WaitForAttribute</c> waiter — it is NOT part of this DCL message; failures are
/// returned synchronously to the calling Instance Actor.
/// </summary>
/// <param name="CorrelationId">Per-write correlation id; echoed on the response.</param>
/// <param name="ConnectionName">The data connection that owns every tag in the batch (and the trigger).</param>
/// <param name="Values">Device tag path → value to write in the single batch round-trip.</param>
/// <param name="TriggerTagPath">Optional device tag path of a flag written AFTER the batch succeeds; null to skip.</param>
/// <param name="TriggerValue">Value to write to <paramref name="TriggerTagPath"/> (ignored when it is null/empty).</param>
/// <param name="Timestamp">When the request was issued (UTC).</param>
public record WriteTagBatchRequest(
string CorrelationId,
string ConnectionName,
IReadOnlyDictionary<string, object?> Values, // device tagPath -> value
string? TriggerTagPath, // optional flag written AFTER the batch
object? TriggerValue,
DateTimeOffset Timestamp);
/// <summary>
/// Response for a <see cref="WriteTagBatchRequest"/>. <see cref="Success"/> is true only
/// when the whole batch AND the optional trigger committed; otherwise
/// <see cref="ErrorMessage"/> describes the first failing leg.
/// </summary>
/// <param name="CorrelationId">Echoes the request's correlation id.</param>
/// <param name="Success">True when the batch (and trigger, if any) all committed.</param>
/// <param name="ErrorMessage">Non-null on failure — the aggregated batch error, the trigger error, a timeout, or an adapter exception.</param>
/// <param name="Timestamp">When the response was produced (UTC).</param>
public record WriteTagBatchResponse(
string CorrelationId,
bool Success,
string? ErrorMessage,
DateTimeOffset Timestamp);
@@ -0,0 +1,43 @@
namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Instance;
/// <summary>
/// Request to write a SET of data-sourced attributes on one instance to the device in a
/// single DCL batch round-trip, optionally followed by a trigger-flag attribute write.
/// The Instance Actor resolves each attribute's data binding (connection + device tag
/// path), decodes List values, enforces single-connection scope, and forwards one
/// <c>WriteTagBatchRequest</c> to the DCL. This is the write half of the script-facing
/// <c>Attributes.WriteBatchAndWaitAsync</c> helper; the wait half reuses the existing
/// event-driven <c>WaitForAttribute</c> waiter.
///
/// <para>
/// <b>Site-local only.</b> Values are carried codec-ENCODED (strings), so this message
/// would serialize — but the helper composing it issues the wait via the in-process
/// predicate-capable <c>WaitForAttribute</c> path, so the whole flow stays within one
/// site node's actor system (script execution → Instance Actor → DCL).
/// </para>
/// </summary>
/// <param name="CorrelationId">Per-write correlation id; echoed on the response.</param>
/// <param name="InstanceName">The instance whose attributes are written.</param>
/// <param name="AttributeEncodedValues">Canonical (scope-resolved) attribute name → codec-encoded value.</param>
/// <param name="TriggerAttribute">Optional canonical attribute name of a flag written AFTER the batch; null to skip.</param>
/// <param name="TriggerEncodedValue">Codec-encoded value for <paramref name="TriggerAttribute"/>.</param>
/// <param name="OccurredAtUtc">When the request was issued (UTC).</param>
public record WriteAttributeBatchRequest(
string CorrelationId,
string InstanceName,
IReadOnlyDictionary<string, string?> AttributeEncodedValues, // canonical attr name -> codec-encoded value
string? TriggerAttribute,
string? TriggerEncodedValue,
DateTimeOffset OccurredAtUtc);
/// <summary>
/// Reply to a <see cref="WriteAttributeBatchRequest"/>. <see cref="Success"/> is true only
/// when the DCL batch (and trigger, if any) all committed.
/// </summary>
/// <param name="CorrelationId">Echoes the request's correlation id.</param>
/// <param name="Success">True when the batch (and trigger, if any) all committed at the device.</param>
/// <param name="ErrorMessage">Non-null on failure — an unresolved/non-data-sourced attribute, a multi-connection batch, a list-decode failure, or the DCL error/timeout.</param>
public record WriteAttributeBatchResponse(
string CorrelationId,
bool Success,
string? ErrorMessage);
@@ -245,6 +245,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
break; break;
case SubscribeTagsRequest: case SubscribeTagsRequest:
case WriteTagRequest: case WriteTagRequest:
case WriteTagBatchRequest:
case UnsubscribeTagsRequest: case UnsubscribeTagsRequest:
case SubscribeAlarmsRequest: case SubscribeAlarmsRequest:
case UnsubscribeAlarmsRequest: case UnsubscribeAlarmsRequest:
@@ -331,6 +332,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
case WriteTagRequest req: case WriteTagRequest req:
HandleWrite(req); HandleWrite(req);
break; break;
case WriteTagBatchRequest req:
HandleWriteBatch(req);
break;
case TagValueReceived tvr: case TagValueReceived tvr:
HandleTagValueReceived(tvr); HandleTagValueReceived(tvr);
break; break;
@@ -457,6 +461,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
break; break;
case SubscribeTagsRequest: case SubscribeTagsRequest:
case WriteTagRequest: case WriteTagRequest:
case WriteTagBatchRequest:
case SubscribeAlarmsRequest: case SubscribeAlarmsRequest:
Stash.Stash(); Stash.Stash();
break; break;
@@ -1059,6 +1064,67 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}).PipeTo(sender); }).PipeTo(sender);
} }
/// <summary>
/// Batch write counterpart of <see cref="HandleWrite"/>. Writes every value in
/// <see cref="WriteTagBatchRequest.Values"/> to the device in ONE adapter
/// <c>WriteBatchAsync</c> round-trip, then (if present) writes the trigger flag with
/// a single <c>WriteAsync</c>. Both legs share one <see cref="DataConnectionOptions.WriteTimeout"/>
/// budget (DataConnectionLayer-005). Any failed value in the batch, a failed trigger,
/// the timeout, or an adapter exception is translated into a failed
/// <see cref="WriteTagBatchResponse"/> returned synchronously to the caller — never a
/// dropped reply. NOTE: this deliberately composes the batch + trigger primitives and
/// uses the EXISTING event-driven WaitForAttribute waiter for the wait half; it does
/// NOT call the adapter's poll-based <c>WriteBatchAndWaitAsync</c>.
/// </summary>
private void HandleWriteBatch(WriteTagBatchRequest request)
{
_log.Debug("[{0}] Batch-writing {1} tag(s)", _connectionName, request.Values.Count);
var sender = Sender;
var cts = new CancellationTokenSource(_options.WriteTimeout);
async Task<WriteTagBatchResponse> RunAsync()
{
try
{
var results = await _adapter.WriteBatchAsync(
new Dictionary<string, object?>(request.Values), cts.Token);
var failed = results.Values.Where(r => !r.Success).Select(r => r.ErrorMessage).ToList();
if (failed.Count > 0)
return new WriteTagBatchResponse(
request.CorrelationId, false,
"Batch write failed: " + string.Join("; ", failed), DateTimeOffset.UtcNow);
if (!string.IsNullOrEmpty(request.TriggerTagPath))
{
var tr = await _adapter.WriteAsync(request.TriggerTagPath, request.TriggerValue, cts.Token);
if (!tr.Success)
return new WriteTagBatchResponse(
request.CorrelationId, false,
"Trigger write failed: " + tr.ErrorMessage, DateTimeOffset.UtcNow);
}
return new WriteTagBatchResponse(request.CorrelationId, true, null, DateTimeOffset.UtcNow);
}
catch (OperationCanceledException)
{
return new WriteTagBatchResponse(
request.CorrelationId, false,
$"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
}
catch (Exception ex)
{
return new WriteTagBatchResponse(
request.CorrelationId, false, ex.GetBaseException().Message, DateTimeOffset.UtcNow);
}
finally
{
cts.Dispose();
}
}
RunAsync().PipeTo(sender);
}
// ── OPC UA Tag Browser (interactive design-time query) ── // ── OPC UA Tag Browser (interactive design-time query) ──
/// <summary> /// <summary>
@@ -46,6 +46,7 @@ public class DataConnectionManagerActor : ReceiveActor
Receive<SubscribeAlarmsRequest>(HandleRouteAlarms); Receive<SubscribeAlarmsRequest>(HandleRouteAlarms);
Receive<UnsubscribeAlarmsRequest>(HandleRouteAlarms); Receive<UnsubscribeAlarmsRequest>(HandleRouteAlarms);
Receive<WriteTagRequest>(HandleRouteWrite); Receive<WriteTagRequest>(HandleRouteWrite);
Receive<WriteTagBatchRequest>(HandleRouteWriteBatch);
Receive<RemoveConnectionCommand>(HandleRemoveConnection); Receive<RemoveConnectionCommand>(HandleRemoveConnection);
Receive<GetAllHealthReports>(HandleGetAllHealthReports); Receive<GetAllHealthReports>(HandleGetAllHealthReports);
Receive<BrowseNodeCommand>(HandleBrowse); Receive<BrowseNodeCommand>(HandleBrowse);
@@ -141,6 +142,26 @@ public class DataConnectionManagerActor : ReceiveActor
} }
} }
/// <summary>
/// Routes a <see cref="WriteTagBatchRequest"/> to the child
/// <see cref="DataConnectionActor"/> that owns the named connection — the batch
/// counterpart of <see cref="HandleRouteWrite"/>. The manager owns only the
/// unknown-connection failure (the same split as every other routed message);
/// the child resolves connected/not-connected and the per-write outcomes.
/// </summary>
private void HandleRouteWriteBatch(WriteTagBatchRequest request)
{
if (_connectionActors.TryGetValue(request.ConnectionName, out var actor))
actor.Forward(request);
else
{
_log.Warning("No connection actor for {0}", request.ConnectionName);
Sender.Tell(new WriteTagBatchResponse(
request.CorrelationId, false,
$"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow));
}
}
/// <summary> /// <summary>
/// Routes a <see cref="BrowseNodeCommand"/> from the central UI's OPC UA /// Routes a <see cref="BrowseNodeCommand"/> from the central UI's OPC UA
/// Tag Browser to the child <see cref="DataConnectionActor"/> that owns the /// Tag Browser to the child <see cref="DataConnectionActor"/> that owns the
@@ -193,6 +193,9 @@ public sealed class ScriptCompileSurface
/// <summary>Mirrors <c>AttributeAccessor.WaitForAsync</c>.</summary> /// <summary>Mirrors <c>AttributeAccessor.WaitForAsync</c>.</summary>
public Task<WaitResult> WaitForAsync(string key, object? targetValue, TimeSpan timeout, bool requireGoodQuality = false) => throw new NotSupportedException(CompileOnly); public Task<WaitResult> WaitForAsync(string key, object? targetValue, TimeSpan timeout, bool requireGoodQuality = false) => throw new NotSupportedException(CompileOnly);
public Task<WaitResult> WaitForAsync(string key, Func<object?, bool> predicate, TimeSpan timeout, bool requireGoodQuality = false) => throw new NotSupportedException(CompileOnly); public Task<WaitResult> WaitForAsync(string key, Func<object?, bool> predicate, TimeSpan timeout, bool requireGoodQuality = false) => throw new NotSupportedException(CompileOnly);
/// <summary>Mirrors <c>AttributeAccessor.WriteBatchAndWaitAsync</c>.</summary>
public Task<bool> WriteBatchAndWaitAsync(IReadOnlyDictionary<string, object?> values, string flagKey, object? flagValue, string responseKey, object? responseValue, TimeSpan timeout) => throw new NotSupportedException(CompileOnly);
} }
/// <summary>Compile-only mirror of <c>ChildrenAccessor</c>.</summary> /// <summary>Compile-only mirror of <c>ChildrenAccessor</c>.</summary>
@@ -188,6 +188,12 @@ public class InstanceActor : ReceiveActor
Receive<WaitForAttributeRequest>(HandleWaitForAttribute); Receive<WaitForAttributeRequest>(HandleWaitForAttribute);
Receive<WaitForAttributeTimeout>(HandleWaitForAttributeTimeout); Receive<WaitForAttributeTimeout>(HandleWaitForAttributeTimeout);
// Batch write + (event-driven) wait: resolves a set of data-sourced
// attributes to one DCL connection and forwards a single WriteTagBatchRequest.
// Backs the script-facing Attributes.WriteBatchAndWaitAsync helper; the wait
// half is the WaitForAttribute waiter above.
Receive<WriteAttributeBatchRequest>(HandleWriteAttributeBatch);
// Handle tag value updates from DCL — convert to AttributeValueChanged // Handle tag value updates from DCL — convert to AttributeValueChanged
Receive<TagValueUpdate>(HandleTagValueUpdate); Receive<TagValueUpdate>(HandleTagValueUpdate);
Receive<SubscribeTagsResponse>(_ => { }); // Ack from DCL subscribe — no action needed Receive<SubscribeTagsResponse>(_ => { }); // Ack from DCL subscribe — no action needed
@@ -499,6 +505,118 @@ public class InstanceActor : ReceiveActor
}).PipeTo(caller); }).PipeTo(caller);
} }
/// <summary>
/// Batch write: resolves a SET of data-sourced attributes (and an optional trigger
/// attribute) to their device bindings, enforces a single data connection across the
/// whole batch, decodes List values (same rule as <see cref="HandleSetDataAttribute"/>),
/// and forwards ONE <see cref="WriteTagBatchRequest"/> to the DCL — replacing N
/// sequential per-attribute writes with a single gateway round-trip. The write outcome
/// is returned synchronously to the caller; the EVENT-DRIVEN wait for the response
/// attribute is performed separately by the caller via the existing WaitForAttribute
/// waiter (this handler does not wait). Resolution mirrors the data-sourced
/// SetAttribute path: an attribute is data-sourced only when it resolves AND has both a
/// <see cref="ResolvedAttribute.DataSourceReference"/> and a
/// <see cref="ResolvedAttribute.BoundDataConnectionName"/>.
/// </summary>
private void HandleWriteAttributeBatch(WriteAttributeBatchRequest request)
{
var caller = Sender;
var cid = request.CorrelationId;
if (_dclManager == null)
{
caller.Tell(new WriteAttributeBatchResponse(
cid, false, "Data Connection Layer not available for write."));
return;
}
var values = new Dictionary<string, object?>();
string? connName = null;
foreach (var kv in request.AttributeEncodedValues)
{
if (!_resolvedAttributeByName.TryGetValue(kv.Key, out var resolved)
|| string.IsNullOrEmpty(resolved.DataSourceReference)
|| string.IsNullOrEmpty(resolved.BoundDataConnectionName))
{
caller.Tell(new WriteAttributeBatchResponse(
cid, false, $"Attribute '{kv.Key}' is not a data-sourced attribute."));
return;
}
if (connName == null)
connName = resolved.BoundDataConnectionName;
else if (!string.Equals(connName, resolved.BoundDataConnectionName, StringComparison.Ordinal))
{
caller.Tell(new WriteAttributeBatchResponse(
cid, false, "Batch write spans multiple data connections; not supported."));
return;
}
object? wv = kv.Value;
// MV: a data-sourced List attribute's encoded value is the canonical JSON
// array string — decode it to a typed List<T> so the DCL/Variant write
// produces a real array (same poison-rejection rule as HandleSetDataAttribute).
if (IsListAttribute(resolved) && !string.IsNullOrWhiteSpace(kv.Value))
{
var decoded = DecodeAttributeValue(resolved, kv.Value);
if (decoded == null)
{
caller.Tell(new WriteAttributeBatchResponse(
cid, false, $"Invalid list value for attribute '{kv.Key}'"));
return;
}
wv = decoded;
}
values[resolved.DataSourceReference!] = wv;
}
string? triggerPath = null;
object? triggerVal = null;
if (!string.IsNullOrEmpty(request.TriggerAttribute))
{
if (!_resolvedAttributeByName.TryGetValue(request.TriggerAttribute, out var tr)
|| string.IsNullOrEmpty(tr.DataSourceReference)
|| string.IsNullOrEmpty(tr.BoundDataConnectionName))
{
caller.Tell(new WriteAttributeBatchResponse(
cid, false, $"Trigger attribute '{request.TriggerAttribute}' is not data-sourced."));
return;
}
if (connName != null && !string.Equals(connName, tr.BoundDataConnectionName, StringComparison.Ordinal))
{
caller.Tell(new WriteAttributeBatchResponse(
cid, false, "Trigger attribute is on a different data connection."));
return;
}
connName ??= tr.BoundDataConnectionName;
triggerPath = tr.DataSourceReference;
triggerVal = request.TriggerEncodedValue;
}
if (connName == null)
{
caller.Tell(new WriteAttributeBatchResponse(cid, false, "No attributes to write."));
return;
}
var dclReq = new WriteTagBatchRequest(
cid, connName!, values, triggerPath, triggerVal, DateTimeOffset.UtcNow);
// Ask the DCL and pipe the batch outcome back to the original caller. The DCL
// bounds its own write with WriteTimeout; this Ask is bounded a bit longer so a
// device timeout is surfaced as a DCL failure rather than an Ask timeout.
_dclManager.Ask<WriteTagBatchResponse>(dclReq, TimeSpan.FromSeconds(30))
.ContinueWith(t => t.IsCompletedSuccessfully
? new WriteAttributeBatchResponse(cid, t.Result.Success, t.Result.ErrorMessage)
: new WriteAttributeBatchResponse(
cid, false, t.Exception?.GetBaseException().Message ?? "DCL batch write timed out"))
.PipeTo(caller);
}
/// <summary> /// <summary>
/// WP-15: Routes script call requests to the appropriate Script Actor. /// WP-15: Routes script call requests to the appropriate Script Actor.
/// Uses Ask pattern (WP-22). /// Uses Ask pattern (WP-22).
@@ -74,6 +74,40 @@ public class AttributeAccessor
public Task SetAsync(string key, object? value) public Task SetAsync(string key, object? value)
=> _ctx.SetAttribute(Resolve(key), AttributeValueCodec.Encode(value) ?? string.Empty); => _ctx.SetAttribute(Resolve(key), AttributeValueCodec.Encode(value) ?? string.Empty);
/// <summary>
/// Writes a SET of data-sourced attributes to the device in ONE gateway batch
/// round-trip, then writes <paramref name="flagKey"/>=<paramref name="flagValue"/>,
/// then waits EVENT-DRIVEN (reusing the existing <c>WaitForAttribute</c> waiter — not a
/// poll) for <paramref name="responseKey"/> to reach <paramref name="responseValue"/>,
/// bounded by <paramref name="timeout"/>. Replaces N sequential per-attribute writes
/// with one batched call before the wait. All keys are scope-resolved (<see cref="Resolve"/>)
/// and all values codec-encoded just like the other accessors. Returns <c>true</c> if
/// the response was observed within the timeout, <c>false</c> on timeout (no throw on
/// timeout); throws <see cref="System.InvalidOperationException"/> if the batch/trigger
/// write itself fails.
/// </summary>
/// <param name="values">Attribute key → value to batch-write (keys scope-resolved, values codec-encoded).</param>
/// <param name="flagKey">Trigger attribute key written AFTER the batch.</param>
/// <param name="flagValue">Value to write to the trigger.</param>
/// <param name="responseKey">Attribute key to wait on.</param>
/// <param name="responseValue">Target value to wait for (<c>null</c> ⇒ any change).</param>
/// <param name="timeout">How long to wait before returning false.</param>
/// <returns><c>true</c> on response match within the timeout; <c>false</c> on timeout.</returns>
public Task<bool> WriteBatchAndWaitAsync(
IReadOnlyDictionary<string, object?> values, string flagKey, object? flagValue,
string responseKey, object? responseValue, TimeSpan timeout)
{
var encoded = new Dictionary<string, string?>(values.Count);
foreach (var kv in values)
encoded[Resolve(kv.Key)] = AttributeValueCodec.Encode(kv.Value);
return _ctx.WriteBatchAndWait(
encoded,
Resolve(flagKey), AttributeValueCodec.Encode(flagValue),
Resolve(responseKey), AttributeValueCodec.Encode(responseValue),
timeout);
}
/// <summary> /// <summary>
/// WaitForAttribute (spec §3-§5): waits event-driven until the attribute equals /// WaitForAttribute (spec §3-§5): waits event-driven until the attribute equals
/// <paramref name="targetValue"/> (value-equality, codec-normalized), bounded by /// <paramref name="targetValue"/> (value-equality, codec-normalized), bounded by
@@ -525,6 +525,44 @@ public class ScriptRuntimeContext
} }
} }
/// <summary>
/// Writes a SET of data-sourced attributes to the device in ONE DCL batch round-trip,
/// then writes a trigger flag, then waits EVENT-DRIVEN (reusing the existing
/// <see cref="WaitAttribute"/> waiter — not a poll) for <paramref name="responseAttr"/>
/// to reach <paramref name="responseEncoded"/>, bounded by <paramref name="timeout"/>.
/// Replaces N sequential per-attribute writes with a single gateway call before the
/// wait. Throws <see cref="InvalidOperationException"/> if the write/trigger leg fails
/// (resolution error, multi-connection batch, device error); returns the wait result
/// otherwise (true = matched, false = timeout, never throws on timeout).
/// </summary>
/// <param name="encodedValues">Scope-resolved attribute name → codec-encoded value to batch-write.</param>
/// <param name="triggerAttr">Scope-resolved trigger attribute name written AFTER the batch.</param>
/// <param name="triggerEncoded">Codec-encoded value for the trigger.</param>
/// <param name="responseAttr">Scope-resolved attribute to wait on.</param>
/// <param name="responseEncoded">Codec-encoded target value (null ⇒ any change).</param>
/// <param name="timeout">How long to wait for the response before returning false.</param>
/// <returns><c>true</c> on response match within the timeout; <c>false</c> on timeout.</returns>
public async Task<bool> WriteBatchAndWait(
IReadOnlyDictionary<string, string?> encodedValues, string triggerAttr, string? triggerEncoded,
string responseAttr, string? responseEncoded, TimeSpan timeout)
{
var cid = Guid.NewGuid().ToString();
var batchReq = new WriteAttributeBatchRequest(
cid, _instanceName, encodedValues, triggerAttr, triggerEncoded, DateTimeOffset.UtcNow);
// 35s: the InstanceActor's DCL Ask is internally bounded at 30s, so allow a small
// margin so the DCL's own typed failure/timeout reply is the one we observe rather
// than an AskTimeoutException here. Honors the script execution-timeout token.
var batchResp = await _instanceActor.Ask<WriteAttributeBatchResponse>(
batchReq, TimeSpan.FromSeconds(35), _scriptTimeoutToken);
if (!batchResp.Success)
throw new InvalidOperationException(
$"WriteBatchAndWait write failed: {batchResp.ErrorMessage}");
return await WaitAttribute(responseAttr, responseEncoded, null, timeout);
}
/// <summary> /// <summary>
/// Calls a sibling script on the same instance by name (Ask pattern). /// Calls a sibling script on the same instance by name (Ask pattern).
/// WP-20: Enforces recursion limit. /// WP-20: Enforces recursion limit.