fix(site-runtime): resolve SiteRuntime-001/002/003 — route data-sourced writes to DCL, real per-attribute API results, race-free redeploy
This commit is contained in:
@@ -39,6 +39,12 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
|
||||
private readonly ISiteHealthCollector? _healthCollector;
|
||||
private readonly IServiceProvider? _serviceProvider;
|
||||
private readonly Dictionary<string, IActorRef> _instanceActors = new();
|
||||
/// <summary>
|
||||
/// Tracks Instance Actors that are terminating as part of a redeployment, keyed by
|
||||
/// the terminating actor ref. The buffered command is applied once <see cref="Terminated"/>
|
||||
/// confirms the child has fully stopped (SiteRuntime-003).
|
||||
/// </summary>
|
||||
private readonly Dictionary<IActorRef, PendingRedeploy> _pendingRedeploys = new();
|
||||
private int _totalDeployedCount;
|
||||
|
||||
public ITimerScheduler Timers { get; set; } = null!;
|
||||
@@ -94,6 +100,10 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
|
||||
|
||||
// Internal deploy persistence result
|
||||
Receive<DeployPersistenceResult>(HandleDeployPersistenceResult);
|
||||
|
||||
// Terminated signal — drains a buffered redeployment once the previous
|
||||
// Instance Actor has fully stopped (SiteRuntime-003).
|
||||
Receive<Terminated>(HandleTerminated);
|
||||
}
|
||||
|
||||
protected override void PreStart()
|
||||
@@ -211,6 +221,13 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
|
||||
/// <summary>
|
||||
/// Handles a new deployment: stores config in SQLite, clears previous static overrides,
|
||||
/// and creates or replaces the Instance Actor.
|
||||
///
|
||||
/// Redeployment of an already-running instance must wait for the previous Instance
|
||||
/// Actor to fully terminate (including PostStop on its descendants) before the
|
||||
/// replacement is created — otherwise <see cref="Context.ActorOf"/> can collide on
|
||||
/// the still-registered child name. Instead of guessing with a fixed timer, the
|
||||
/// terminating child is watched and the in-flight command is buffered until the
|
||||
/// <see cref="Terminated"/> signal arrives.
|
||||
/// </summary>
|
||||
private void HandleDeploy(DeployInstanceCommand command)
|
||||
{
|
||||
@@ -219,28 +236,54 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
|
||||
"Deploying instance {Instance}, deploymentId={DeploymentId}",
|
||||
instanceName, command.DeploymentId);
|
||||
|
||||
// Stop existing actor if present (redeployment replaces)
|
||||
// Redeployment replaces a running instance. Watch + stop the existing actor
|
||||
// and buffer this command until its Terminated signal confirms the child
|
||||
// (and its whole subtree) has fully stopped and freed its actor name.
|
||||
if (_instanceActors.TryGetValue(instanceName, out var existing))
|
||||
{
|
||||
Context.Stop(existing);
|
||||
_instanceActors.Remove(instanceName);
|
||||
// Wait for the child to be removed from the children collection
|
||||
// by yielding and retrying — Context.Stop is processed before the next message
|
||||
Context.System.Scheduler.ScheduleTellOnce(
|
||||
TimeSpan.FromMilliseconds(500), Self, command, Sender);
|
||||
_pendingRedeploys[existing] = new PendingRedeploy(command, Sender);
|
||||
Context.Watch(existing);
|
||||
Context.Stop(existing);
|
||||
UpdateInstanceCounts();
|
||||
return;
|
||||
}
|
||||
|
||||
// Fresh deployment — no existing actor to replace.
|
||||
ApplyDeployment(command, Sender, isRedeploy: false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Recreates an Instance Actor once its predecessor has fully terminated during a
|
||||
/// redeployment, draining the buffered <see cref="DeployInstanceCommand"/>.
|
||||
/// </summary>
|
||||
private void HandleTerminated(Terminated terminated)
|
||||
{
|
||||
if (!_pendingRedeploys.Remove(terminated.ActorRef, out var pending))
|
||||
return;
|
||||
|
||||
ApplyDeployment(pending.Command, pending.OriginalSender, isRedeploy: true);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates the Instance Actor, persists the config, and replies to the deployer.
|
||||
/// A redeployment is an update of an existing instance, so the deployed-instance
|
||||
/// counter is only incremented for genuinely new deployments.
|
||||
/// </summary>
|
||||
private void ApplyDeployment(DeployInstanceCommand command, IActorRef sender, bool isRedeploy)
|
||||
{
|
||||
var instanceName = command.InstanceUniqueName;
|
||||
|
||||
// Ensure DCL connections exist for any data-sourced attributes
|
||||
EnsureDclConnections(command.FlattenedConfigurationJson);
|
||||
|
||||
// Create the Instance Actor immediately (no existing actor to replace)
|
||||
// Create the Instance Actor immediately
|
||||
CreateInstanceActor(instanceName, command.FlattenedConfigurationJson);
|
||||
_totalDeployedCount++;
|
||||
if (!isRedeploy)
|
||||
_totalDeployedCount++;
|
||||
UpdateInstanceCounts();
|
||||
|
||||
// Persist to SQLite and clear static overrides asynchronously
|
||||
var sender = Sender;
|
||||
Task.Run(async () =>
|
||||
{
|
||||
await _storage.StoreDeployedConfigAsync(
|
||||
@@ -614,9 +657,11 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
|
||||
|
||||
/// <summary>
|
||||
/// Writes attribute values on a deployed instance for a Route.To().SetAttribute(s)
|
||||
/// call (or a central Test Run bound to the instance). Writes are Tell'd to the
|
||||
/// Instance Actor — serialized through its mailbox — and acknowledged optimistically,
|
||||
/// matching the fire-and-forget semantics of Instance.SetAttribute.
|
||||
/// call (or a central Test Run bound to the instance). Each write is Ask'd to the
|
||||
/// Instance Actor, which routes data-sourced attributes through the DCL and static
|
||||
/// attributes to a persisted override. The response reflects the real per-attribute
|
||||
/// outcome (a non-existent attribute or a failed device write reports failure),
|
||||
/// rather than an unconditional optimistic ack.
|
||||
/// </summary>
|
||||
private void RouteInboundApiSetAttributes(RouteToSetAttributesRequest request)
|
||||
{
|
||||
@@ -629,14 +674,33 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var (name, value) in request.AttributeValues)
|
||||
{
|
||||
instanceActor.Tell(new SetStaticAttributeCommand(
|
||||
request.CorrelationId, request.InstanceUniqueName, name, value, DateTimeOffset.UtcNow));
|
||||
}
|
||||
var sender = Sender;
|
||||
var correlationId = request.CorrelationId;
|
||||
var asks = request.AttributeValues
|
||||
.Select(kvp => instanceActor.Ask<SetStaticAttributeResponse>(
|
||||
new SetStaticAttributeCommand(
|
||||
correlationId, request.InstanceUniqueName, kvp.Key, kvp.Value, DateTimeOffset.UtcNow),
|
||||
TimeSpan.FromSeconds(30)))
|
||||
.ToArray();
|
||||
|
||||
Sender.Tell(new RouteToSetAttributesResponse(
|
||||
request.CorrelationId, true, null, DateTimeOffset.UtcNow));
|
||||
Task.WhenAll(asks).ContinueWith(t =>
|
||||
{
|
||||
if (!t.IsCompletedSuccessfully)
|
||||
return new RouteToSetAttributesResponse(
|
||||
correlationId, false,
|
||||
t.Exception?.GetBaseException().Message ?? "Attribute write timed out",
|
||||
DateTimeOffset.UtcNow);
|
||||
|
||||
var failures = t.Result
|
||||
.Where(r => !r.Success)
|
||||
.Select(r => $"{r.AttributeName}: {r.ErrorMessage}")
|
||||
.ToArray();
|
||||
|
||||
return failures.Length == 0
|
||||
? new RouteToSetAttributesResponse(correlationId, true, null, DateTimeOffset.UtcNow)
|
||||
: new RouteToSetAttributesResponse(
|
||||
correlationId, false, string.Join("; ", failures), DateTimeOffset.UtcNow);
|
||||
}).PipeTo(sender);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -789,4 +853,9 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
|
||||
EnableInstanceCommand Command, DeployedInstance? Config, string? Error, IActorRef OriginalSender);
|
||||
internal record DeployPersistenceResult(
|
||||
string DeploymentId, string InstanceName, bool Success, string? Error, IActorRef OriginalSender);
|
||||
|
||||
/// <summary>
|
||||
/// A redeployment command buffered until the previous Instance Actor terminates.
|
||||
/// </summary>
|
||||
internal record PendingRedeploy(DeployInstanceCommand Command, IActorRef OriginalSender);
|
||||
}
|
||||
|
||||
@@ -198,10 +198,44 @@ public class InstanceActor : ReceiveActor
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Updates a static attribute in memory and persists the override to SQLite.
|
||||
/// Handles an attribute write (<c>Instance.SetAttribute</c> / Inbound API).
|
||||
/// WP-24: State mutation serialized through this actor's mailbox.
|
||||
///
|
||||
/// The write is routed by the attribute's data binding:
|
||||
/// * Data-sourced attribute → forwards a <see cref="WriteTagRequest"/> to the
|
||||
/// DCL, which writes the physical device. The in-memory value is NOT
|
||||
/// optimistically updated and NO static override is persisted — the
|
||||
/// confirmed device value arrives later via the subscription. Success or
|
||||
/// failure of the device write is returned to the caller.
|
||||
/// * Static attribute → updates the in-memory value and persists the override
|
||||
/// to SQLite.
|
||||
///
|
||||
/// Either way the caller receives a <see cref="SetStaticAttributeResponse"/>.
|
||||
/// </summary>
|
||||
private void HandleSetStaticAttribute(SetStaticAttributeCommand command)
|
||||
{
|
||||
// Resolve the target attribute's data binding from the flattened config.
|
||||
var resolved = _configuration?.Attributes
|
||||
.FirstOrDefault(a => a.CanonicalName == command.AttributeName);
|
||||
|
||||
var isDataSourced = resolved != null
|
||||
&& !string.IsNullOrEmpty(resolved.DataSourceReference)
|
||||
&& !string.IsNullOrEmpty(resolved.BoundDataConnectionName);
|
||||
|
||||
if (isDataSourced)
|
||||
{
|
||||
HandleSetDataAttribute(command, resolved!);
|
||||
return;
|
||||
}
|
||||
|
||||
HandleSetStaticAttributeCore(command);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Static attribute write: updates in-memory state, publishes the change,
|
||||
/// persists the override to SQLite, and replies with success.
|
||||
/// </summary>
|
||||
private void HandleSetStaticAttributeCore(SetStaticAttributeCommand command)
|
||||
{
|
||||
_attributes[command.AttributeName] = command.Value;
|
||||
|
||||
@@ -216,8 +250,7 @@ public class InstanceActor : ReceiveActor
|
||||
|
||||
PublishAndNotifyChildren(changed);
|
||||
|
||||
// Persist asynchronously -- fire and forget since the actor is the source of truth
|
||||
// and SetAttribute is called from scripts via Tell (no response consumer).
|
||||
// Persist asynchronously -- fire and forget since the actor is the source of truth.
|
||||
var instanceName = _instanceUniqueName;
|
||||
var attributeName = command.AttributeName;
|
||||
var logger = _logger;
|
||||
@@ -230,6 +263,58 @@ public class InstanceActor : ReceiveActor
|
||||
instanceName,
|
||||
attributeName);
|
||||
}, TaskContinuationOptions.OnlyOnFaulted);
|
||||
|
||||
Sender.Tell(new SetStaticAttributeResponse(
|
||||
command.CorrelationId, _instanceUniqueName, command.AttributeName,
|
||||
true, null, DateTimeOffset.UtcNow));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Data-sourced attribute write: forwards a write request to the DCL and pipes
|
||||
/// the device write result back to the caller. The in-memory value is left
|
||||
/// untouched (it is refreshed by the subscription when the device confirms);
|
||||
/// no static override is persisted for a data-sourced attribute.
|
||||
/// </summary>
|
||||
private void HandleSetDataAttribute(SetStaticAttributeCommand command, ResolvedAttribute resolved)
|
||||
{
|
||||
var caller = Sender;
|
||||
var correlationId = command.CorrelationId;
|
||||
var attributeName = command.AttributeName;
|
||||
var instanceName = _instanceUniqueName;
|
||||
|
||||
if (_dclManager == null)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"SetAttribute on data-sourced attribute {Instance}.{Attribute} cannot be routed — no DCL manager configured",
|
||||
instanceName, attributeName);
|
||||
caller.Tell(new SetStaticAttributeResponse(
|
||||
correlationId, instanceName, attributeName, false,
|
||||
"Data Connection Layer not available for write.", DateTimeOffset.UtcNow));
|
||||
return;
|
||||
}
|
||||
|
||||
var writeRequest = new WriteTagRequest(
|
||||
correlationId,
|
||||
resolved.BoundDataConnectionName!,
|
||||
resolved.DataSourceReference!,
|
||||
command.Value,
|
||||
DateTimeOffset.UtcNow);
|
||||
|
||||
// Ask the DCL and pipe the result back to the original caller. The DCL
|
||||
// returns the failure synchronously so the script can handle it.
|
||||
_dclManager.Ask<WriteTagResponse>(writeRequest, TimeSpan.FromSeconds(30))
|
||||
.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsCompletedSuccessfully)
|
||||
return new SetStaticAttributeResponse(
|
||||
correlationId, instanceName, attributeName,
|
||||
t.Result.Success, t.Result.ErrorMessage, DateTimeOffset.UtcNow);
|
||||
|
||||
return new SetStaticAttributeResponse(
|
||||
correlationId, instanceName, attributeName, false,
|
||||
t.Exception?.GetBaseException().Message ?? "DCL write timed out",
|
||||
DateTimeOffset.UtcNow);
|
||||
}).PipeTo(caller);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -25,17 +25,17 @@ public class AttributeAccessor
|
||||
|
||||
public object? this[string key]
|
||||
{
|
||||
// Both reads and writes block on the actor Ask; the write also blocks
|
||||
// on the DCL round-trip for data-connected attributes. The async
|
||||
// variants (GetAsync/SetAsync) are preferred where awaiting is possible.
|
||||
get => _ctx.GetAttribute(Resolve(key)).GetAwaiter().GetResult();
|
||||
set => _ctx.SetAttribute(Resolve(key), value?.ToString() ?? string.Empty);
|
||||
set => _ctx.SetAttribute(Resolve(key), value?.ToString() ?? string.Empty).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
public Task<object?> GetAsync(string key) => _ctx.GetAttribute(Resolve(key));
|
||||
|
||||
public Task SetAsync(string key, object? value)
|
||||
{
|
||||
_ctx.SetAttribute(Resolve(key), value?.ToString() ?? string.Empty);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
=> _ctx.SetAttribute(Resolve(key), value?.ToString() ?? string.Empty);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -99,18 +99,31 @@ public class ScriptRuntimeContext
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sets an attribute value. For data-connected attributes, forwards to DCL via Instance Actor.
|
||||
/// For static attributes, updates in-memory and persists to SQLite via Instance Actor.
|
||||
/// All mutations serialized through the Instance Actor mailbox.
|
||||
/// Sets an attribute value. For data-connected attributes the Instance Actor
|
||||
/// forwards the write to the DCL, which writes the physical device; the
|
||||
/// in-memory value is not optimistically updated. For static attributes the
|
||||
/// Instance Actor updates the in-memory value and persists the override to
|
||||
/// SQLite. All mutations are serialized through the Instance Actor mailbox.
|
||||
///
|
||||
/// The write is awaited so that a device-write failure on a data-connected
|
||||
/// attribute is surfaced synchronously to the calling script as an
|
||||
/// <see cref="InvalidOperationException"/>.
|
||||
/// </summary>
|
||||
public void SetAttribute(string attributeName, string value)
|
||||
public async Task SetAttribute(string attributeName, string value)
|
||||
{
|
||||
var correlationId = Guid.NewGuid().ToString();
|
||||
var command = new SetStaticAttributeCommand(
|
||||
correlationId, _instanceName, attributeName, value, DateTimeOffset.UtcNow);
|
||||
|
||||
// Tell (fire-and-forget) — mutation serialized through Instance Actor
|
||||
_instanceActor.Tell(command);
|
||||
// Ask — mutation serialized through the Instance Actor mailbox; the reply
|
||||
// carries the device-write outcome for data-connected attributes.
|
||||
var response = await _instanceActor.Ask<SetStaticAttributeResponse>(command, _askTimeout);
|
||||
|
||||
if (!response.Success)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
$"SetAttribute('{attributeName}') failed: {response.ErrorMessage}");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
Reference in New Issue
Block a user