From 0e989c867d9db4e84ad013a1afc0f1885ad2adf7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 17 Jun 2026 12:13:02 -0400 Subject: [PATCH] feat(siteruntime): event-driven Attributes.WriteBatchAndWaitAsync (batched DCL write + trigger + existing WaitForAttribute waiter) + compile mirror --- .../DataConnection/WriteTagBatchRequest.cs | 40 ++++++ .../Messages/Instance/WriteAttributeBatch.cs | 43 +++++++ .../Actors/DataConnectionActor.cs | 66 ++++++++++ .../Actors/DataConnectionManagerActor.cs | 21 ++++ .../ScriptCompileSurface.cs | 3 + .../Actors/InstanceActor.cs | 118 ++++++++++++++++++ .../Scripts/ScopeAccessors.cs | 34 +++++ .../Scripts/ScriptRuntimeContext.cs | 38 ++++++ 8 files changed, 363 insertions(+) create mode 100644 src/ZB.MOM.WW.ScadaBridge.Commons/Messages/DataConnection/WriteTagBatchRequest.cs create mode 100644 src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Instance/WriteAttributeBatch.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/DataConnection/WriteTagBatchRequest.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/DataConnection/WriteTagBatchRequest.cs new file mode 100644 index 00000000..8c55f4d2 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/DataConnection/WriteTagBatchRequest.cs @@ -0,0 +1,40 @@ +namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; + +/// +/// Request to write a SET of device tags through the DCL in ONE batch round-trip, +/// optionally followed by a trigger-flag write. Composes the adapter's +/// +/// (the batch) with a single WriteAsync (the trigger) so the script-facing +/// Attributes.WriteBatchAndWaitAsync helper replaces N sequential per-attribute +/// writes with one gateway call. The wait half is the EXISTING event-driven +/// WaitForAttribute waiter — it is NOT part of this DCL message; failures are +/// returned synchronously to the calling Instance Actor. +/// +/// Per-write correlation id; echoed on the response. +/// The data connection that owns every tag in the batch (and the trigger). +/// Device tag path → value to write in the single batch round-trip. +/// Optional device tag path of a flag written AFTER the batch succeeds; null to skip. +/// Value to write to (ignored when it is null/empty). +/// When the request was issued (UTC). +public record WriteTagBatchRequest( + string CorrelationId, + string ConnectionName, + IReadOnlyDictionary Values, // device tagPath -> value + string? TriggerTagPath, // optional flag written AFTER the batch + object? TriggerValue, + DateTimeOffset Timestamp); + +/// +/// Response for a . is true only +/// when the whole batch AND the optional trigger committed; otherwise +/// describes the first failing leg. +/// +/// Echoes the request's correlation id. +/// True when the batch (and trigger, if any) all committed. +/// Non-null on failure — the aggregated batch error, the trigger error, a timeout, or an adapter exception. +/// When the response was produced (UTC). +public record WriteTagBatchResponse( + string CorrelationId, + bool Success, + string? ErrorMessage, + DateTimeOffset Timestamp); diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Instance/WriteAttributeBatch.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Instance/WriteAttributeBatch.cs new file mode 100644 index 00000000..b836246b --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Instance/WriteAttributeBatch.cs @@ -0,0 +1,43 @@ +namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Instance; + +/// +/// Request to write a SET of data-sourced attributes on one instance to the device in a +/// single DCL batch round-trip, optionally followed by a trigger-flag attribute write. +/// The Instance Actor resolves each attribute's data binding (connection + device tag +/// path), decodes List values, enforces single-connection scope, and forwards one +/// WriteTagBatchRequest to the DCL. This is the write half of the script-facing +/// Attributes.WriteBatchAndWaitAsync helper; the wait half reuses the existing +/// event-driven WaitForAttribute waiter. +/// +/// +/// Site-local only. Values are carried codec-ENCODED (strings), so this message +/// would serialize — but the helper composing it issues the wait via the in-process +/// predicate-capable WaitForAttribute path, so the whole flow stays within one +/// site node's actor system (script execution → Instance Actor → DCL). +/// +/// +/// Per-write correlation id; echoed on the response. +/// The instance whose attributes are written. +/// Canonical (scope-resolved) attribute name → codec-encoded value. +/// Optional canonical attribute name of a flag written AFTER the batch; null to skip. +/// Codec-encoded value for . +/// When the request was issued (UTC). +public record WriteAttributeBatchRequest( + string CorrelationId, + string InstanceName, + IReadOnlyDictionary AttributeEncodedValues, // canonical attr name -> codec-encoded value + string? TriggerAttribute, + string? TriggerEncodedValue, + DateTimeOffset OccurredAtUtc); + +/// +/// Reply to a . is true only +/// when the DCL batch (and trigger, if any) all committed. +/// +/// Echoes the request's correlation id. +/// True when the batch (and trigger, if any) all committed at the device. +/// Non-null on failure — an unresolved/non-data-sourced attribute, a multi-connection batch, a list-decode failure, or the DCL error/timeout. +public record WriteAttributeBatchResponse( + string CorrelationId, + bool Success, + string? ErrorMessage); diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs index 24b71a04..60ad161a 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -245,6 +245,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers break; case SubscribeTagsRequest: case WriteTagRequest: + case WriteTagBatchRequest: case UnsubscribeTagsRequest: case SubscribeAlarmsRequest: case UnsubscribeAlarmsRequest: @@ -331,6 +332,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers case WriteTagRequest req: HandleWrite(req); break; + case WriteTagBatchRequest req: + HandleWriteBatch(req); + break; case TagValueReceived tvr: HandleTagValueReceived(tvr); break; @@ -457,6 +461,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers break; case SubscribeTagsRequest: case WriteTagRequest: + case WriteTagBatchRequest: case SubscribeAlarmsRequest: Stash.Stash(); break; @@ -1059,6 +1064,67 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers }).PipeTo(sender); } + /// + /// Batch write counterpart of . Writes every value in + /// to the device in ONE adapter + /// WriteBatchAsync round-trip, then (if present) writes the trigger flag with + /// a single WriteAsync. Both legs share one + /// budget (DataConnectionLayer-005). Any failed value in the batch, a failed trigger, + /// the timeout, or an adapter exception is translated into a failed + /// returned synchronously to the caller — never a + /// dropped reply. NOTE: this deliberately composes the batch + trigger primitives and + /// uses the EXISTING event-driven WaitForAttribute waiter for the wait half; it does + /// NOT call the adapter's poll-based WriteBatchAndWaitAsync. + /// + private void HandleWriteBatch(WriteTagBatchRequest request) + { + _log.Debug("[{0}] Batch-writing {1} tag(s)", _connectionName, request.Values.Count); + var sender = Sender; + var cts = new CancellationTokenSource(_options.WriteTimeout); + + async Task RunAsync() + { + try + { + var results = await _adapter.WriteBatchAsync( + new Dictionary(request.Values), cts.Token); + var failed = results.Values.Where(r => !r.Success).Select(r => r.ErrorMessage).ToList(); + if (failed.Count > 0) + return new WriteTagBatchResponse( + request.CorrelationId, false, + "Batch write failed: " + string.Join("; ", failed), DateTimeOffset.UtcNow); + + if (!string.IsNullOrEmpty(request.TriggerTagPath)) + { + var tr = await _adapter.WriteAsync(request.TriggerTagPath, request.TriggerValue, cts.Token); + if (!tr.Success) + return new WriteTagBatchResponse( + request.CorrelationId, false, + "Trigger write failed: " + tr.ErrorMessage, DateTimeOffset.UtcNow); + } + + return new WriteTagBatchResponse(request.CorrelationId, true, null, DateTimeOffset.UtcNow); + } + catch (OperationCanceledException) + { + return new WriteTagBatchResponse( + request.CorrelationId, false, + $"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow); + } + catch (Exception ex) + { + return new WriteTagBatchResponse( + request.CorrelationId, false, ex.GetBaseException().Message, DateTimeOffset.UtcNow); + } + finally + { + cts.Dispose(); + } + } + + RunAsync().PipeTo(sender); + } + // ── OPC UA Tag Browser (interactive design-time query) ── /// diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionManagerActor.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionManagerActor.cs index 5069c1b1..a46b020e 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionManagerActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionManagerActor.cs @@ -46,6 +46,7 @@ public class DataConnectionManagerActor : ReceiveActor Receive(HandleRouteAlarms); Receive(HandleRouteAlarms); Receive(HandleRouteWrite); + Receive(HandleRouteWriteBatch); Receive(HandleRemoveConnection); Receive(HandleGetAllHealthReports); Receive(HandleBrowse); @@ -141,6 +142,26 @@ public class DataConnectionManagerActor : ReceiveActor } } + /// + /// Routes a to the child + /// that owns the named connection — the batch + /// counterpart of . The manager owns only the + /// unknown-connection failure (the same split as every other routed message); + /// the child resolves connected/not-connected and the per-write outcomes. + /// + private void HandleRouteWriteBatch(WriteTagBatchRequest request) + { + if (_connectionActors.TryGetValue(request.ConnectionName, out var actor)) + actor.Forward(request); + else + { + _log.Warning("No connection actor for {0}", request.ConnectionName); + Sender.Tell(new WriteTagBatchResponse( + request.CorrelationId, false, + $"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow)); + } + } + /// /// Routes a from the central UI's OPC UA /// Tag Browser to the child that owns the diff --git a/src/ZB.MOM.WW.ScadaBridge.ScriptAnalysis/ScriptCompileSurface.cs b/src/ZB.MOM.WW.ScadaBridge.ScriptAnalysis/ScriptCompileSurface.cs index 03390ae1..3e3911fc 100644 --- a/src/ZB.MOM.WW.ScadaBridge.ScriptAnalysis/ScriptCompileSurface.cs +++ b/src/ZB.MOM.WW.ScadaBridge.ScriptAnalysis/ScriptCompileSurface.cs @@ -193,6 +193,9 @@ public sealed class ScriptCompileSurface /// Mirrors AttributeAccessor.WaitForAsync. public Task WaitForAsync(string key, object? targetValue, TimeSpan timeout, bool requireGoodQuality = false) => throw new NotSupportedException(CompileOnly); public Task WaitForAsync(string key, Func predicate, TimeSpan timeout, bool requireGoodQuality = false) => throw new NotSupportedException(CompileOnly); + + /// Mirrors AttributeAccessor.WriteBatchAndWaitAsync. + public Task WriteBatchAndWaitAsync(IReadOnlyDictionary values, string flagKey, object? flagValue, string responseKey, object? responseValue, TimeSpan timeout) => throw new NotSupportedException(CompileOnly); } /// Compile-only mirror of ChildrenAccessor. diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs index c640db9b..d6cea648 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/InstanceActor.cs @@ -188,6 +188,12 @@ public class InstanceActor : ReceiveActor Receive(HandleWaitForAttribute); Receive(HandleWaitForAttributeTimeout); + // Batch write + (event-driven) wait: resolves a set of data-sourced + // attributes to one DCL connection and forwards a single WriteTagBatchRequest. + // Backs the script-facing Attributes.WriteBatchAndWaitAsync helper; the wait + // half is the WaitForAttribute waiter above. + Receive(HandleWriteAttributeBatch); + // Handle tag value updates from DCL — convert to AttributeValueChanged Receive(HandleTagValueUpdate); Receive(_ => { }); // Ack from DCL subscribe — no action needed @@ -499,6 +505,118 @@ public class InstanceActor : ReceiveActor }).PipeTo(caller); } + /// + /// Batch write: resolves a SET of data-sourced attributes (and an optional trigger + /// attribute) to their device bindings, enforces a single data connection across the + /// whole batch, decodes List values (same rule as ), + /// and forwards ONE to the DCL — replacing N + /// sequential per-attribute writes with a single gateway round-trip. The write outcome + /// is returned synchronously to the caller; the EVENT-DRIVEN wait for the response + /// attribute is performed separately by the caller via the existing WaitForAttribute + /// waiter (this handler does not wait). Resolution mirrors the data-sourced + /// SetAttribute path: an attribute is data-sourced only when it resolves AND has both a + /// and a + /// . + /// + private void HandleWriteAttributeBatch(WriteAttributeBatchRequest request) + { + var caller = Sender; + var cid = request.CorrelationId; + + if (_dclManager == null) + { + caller.Tell(new WriteAttributeBatchResponse( + cid, false, "Data Connection Layer not available for write.")); + return; + } + + var values = new Dictionary(); + string? connName = null; + + foreach (var kv in request.AttributeEncodedValues) + { + if (!_resolvedAttributeByName.TryGetValue(kv.Key, out var resolved) + || string.IsNullOrEmpty(resolved.DataSourceReference) + || string.IsNullOrEmpty(resolved.BoundDataConnectionName)) + { + caller.Tell(new WriteAttributeBatchResponse( + cid, false, $"Attribute '{kv.Key}' is not a data-sourced attribute.")); + return; + } + + if (connName == null) + connName = resolved.BoundDataConnectionName; + else if (!string.Equals(connName, resolved.BoundDataConnectionName, StringComparison.Ordinal)) + { + caller.Tell(new WriteAttributeBatchResponse( + cid, false, "Batch write spans multiple data connections; not supported.")); + return; + } + + object? wv = kv.Value; + // MV: a data-sourced List attribute's encoded value is the canonical JSON + // array string — decode it to a typed List so the DCL/Variant write + // produces a real array (same poison-rejection rule as HandleSetDataAttribute). + if (IsListAttribute(resolved) && !string.IsNullOrWhiteSpace(kv.Value)) + { + var decoded = DecodeAttributeValue(resolved, kv.Value); + if (decoded == null) + { + caller.Tell(new WriteAttributeBatchResponse( + cid, false, $"Invalid list value for attribute '{kv.Key}'")); + return; + } + wv = decoded; + } + + values[resolved.DataSourceReference!] = wv; + } + + string? triggerPath = null; + object? triggerVal = null; + if (!string.IsNullOrEmpty(request.TriggerAttribute)) + { + if (!_resolvedAttributeByName.TryGetValue(request.TriggerAttribute, out var tr) + || string.IsNullOrEmpty(tr.DataSourceReference) + || string.IsNullOrEmpty(tr.BoundDataConnectionName)) + { + caller.Tell(new WriteAttributeBatchResponse( + cid, false, $"Trigger attribute '{request.TriggerAttribute}' is not data-sourced.")); + return; + } + + if (connName != null && !string.Equals(connName, tr.BoundDataConnectionName, StringComparison.Ordinal)) + { + caller.Tell(new WriteAttributeBatchResponse( + cid, false, "Trigger attribute is on a different data connection.")); + return; + } + + connName ??= tr.BoundDataConnectionName; + triggerPath = tr.DataSourceReference; + triggerVal = request.TriggerEncodedValue; + } + + if (connName == null) + { + caller.Tell(new WriteAttributeBatchResponse(cid, false, "No attributes to write.")); + return; + } + + var dclReq = new WriteTagBatchRequest( + cid, connName!, values, triggerPath, triggerVal, DateTimeOffset.UtcNow); + + // Ask the DCL and pipe the batch outcome back to the original caller. The DCL + // bounds its own write with WriteTimeout; this Ask is bounded a bit longer so a + // device timeout is surfaced as a DCL failure rather than an Ask timeout. + _dclManager.Ask(dclReq, TimeSpan.FromSeconds(30)) + .ContinueWith(t => t.IsCompletedSuccessfully + ? new WriteAttributeBatchResponse(cid, t.Result.Success, t.Result.ErrorMessage) + : new WriteAttributeBatchResponse( + cid, false, t.Exception?.GetBaseException().Message ?? "DCL batch write timed out")) + .PipeTo(caller); + } + /// /// WP-15: Routes script call requests to the appropriate Script Actor. /// Uses Ask pattern (WP-22). diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScopeAccessors.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScopeAccessors.cs index 35d29d7c..2e1fa43f 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScopeAccessors.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScopeAccessors.cs @@ -74,6 +74,40 @@ public class AttributeAccessor public Task SetAsync(string key, object? value) => _ctx.SetAttribute(Resolve(key), AttributeValueCodec.Encode(value) ?? string.Empty); + /// + /// Writes a SET of data-sourced attributes to the device in ONE gateway batch + /// round-trip, then writes =, + /// then waits EVENT-DRIVEN (reusing the existing WaitForAttribute waiter — not a + /// poll) for to reach , + /// bounded by . Replaces N sequential per-attribute writes + /// with one batched call before the wait. All keys are scope-resolved () + /// and all values codec-encoded just like the other accessors. Returns true if + /// the response was observed within the timeout, false on timeout (no throw on + /// timeout); throws if the batch/trigger + /// write itself fails. + /// + /// Attribute key → value to batch-write (keys scope-resolved, values codec-encoded). + /// Trigger attribute key written AFTER the batch. + /// Value to write to the trigger. + /// Attribute key to wait on. + /// Target value to wait for (null ⇒ any change). + /// How long to wait before returning false. + /// true on response match within the timeout; false on timeout. + public Task WriteBatchAndWaitAsync( + IReadOnlyDictionary values, string flagKey, object? flagValue, + string responseKey, object? responseValue, TimeSpan timeout) + { + var encoded = new Dictionary(values.Count); + foreach (var kv in values) + encoded[Resolve(kv.Key)] = AttributeValueCodec.Encode(kv.Value); + + return _ctx.WriteBatchAndWait( + encoded, + Resolve(flagKey), AttributeValueCodec.Encode(flagValue), + Resolve(responseKey), AttributeValueCodec.Encode(responseValue), + timeout); + } + /// /// WaitForAttribute (spec §3-§5): waits event-driven until the attribute equals /// (value-equality, codec-normalized), bounded by diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs index 7ace9074..054bc31f 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -525,6 +525,44 @@ public class ScriptRuntimeContext } } + /// + /// Writes a SET of data-sourced attributes to the device in ONE DCL batch round-trip, + /// then writes a trigger flag, then waits EVENT-DRIVEN (reusing the existing + /// waiter — not a poll) for + /// to reach , bounded by . + /// Replaces N sequential per-attribute writes with a single gateway call before the + /// wait. Throws if the write/trigger leg fails + /// (resolution error, multi-connection batch, device error); returns the wait result + /// otherwise (true = matched, false = timeout, never throws on timeout). + /// + /// Scope-resolved attribute name → codec-encoded value to batch-write. + /// Scope-resolved trigger attribute name written AFTER the batch. + /// Codec-encoded value for the trigger. + /// Scope-resolved attribute to wait on. + /// Codec-encoded target value (null ⇒ any change). + /// How long to wait for the response before returning false. + /// true on response match within the timeout; false on timeout. + public async Task WriteBatchAndWait( + IReadOnlyDictionary encodedValues, string triggerAttr, string? triggerEncoded, + string responseAttr, string? responseEncoded, TimeSpan timeout) + { + var cid = Guid.NewGuid().ToString(); + var batchReq = new WriteAttributeBatchRequest( + cid, _instanceName, encodedValues, triggerAttr, triggerEncoded, DateTimeOffset.UtcNow); + + // 35s: the InstanceActor's DCL Ask is internally bounded at 30s, so allow a small + // margin so the DCL's own typed failure/timeout reply is the one we observe rather + // than an AskTimeoutException here. Honors the script execution-timeout token. + var batchResp = await _instanceActor.Ask( + batchReq, TimeSpan.FromSeconds(35), _scriptTimeoutToken); + + if (!batchResp.Success) + throw new InvalidOperationException( + $"WriteBatchAndWait write failed: {batchResp.ErrorMessage}"); + + return await WaitAttribute(responseAttr, responseEncoded, null, timeout); + } + /// /// Calls a sibling script on the same instance by name (Ask pattern). /// WP-20: Enforces recursion limit.