refactor(driver-pages): address post-review follow-ups
- DriverInstanceSpec carries ClusterId from the deployment artifact; DriverHostActor threads the real cluster identity into DriverInstanceActor instead of the local NodeId. Old pre-PR artifacts without a ClusterId field fall back to the NodeId so in-flight deployments keep working. - DriverHostActor.ChildEntry holds the full DriverInstanceSpec (was only carrying DriverType + LastConfigJson). Restart respawns preserve RowId, Name, Enabled, ClusterId — no placeholder values. - Drop the unnecessary _faultLock on DriverInstanceActor — every read/write site runs inside an Akka message handler which is single-threaded per actor instance. - DriverStatusPanel.DisposeAsync awaits Timer.DisposeAsync so an in-flight 5s tick can't invoke StateHasChanged on a component whose hub has already been torn down.
This commit is contained in:
+5
-3
@@ -270,10 +270,12 @@
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
_timer?.Dispose();
|
||||
// Drain the timer first so an in-flight callback can't invoke StateHasChanged on
|
||||
// a component that's already releasing its hub. System.Threading.Timer implements
|
||||
// IAsyncDisposable in .NET 6+; the async dispose awaits any in-flight callback.
|
||||
if (_timer is not null) await _timer.DisposeAsync();
|
||||
_opResultClearTimer?.Dispose();
|
||||
if (_hub is not null)
|
||||
await _hub.DisposeAsync();
|
||||
if (_hub is not null) await _hub.DisposeAsync();
|
||||
}
|
||||
|
||||
// Map DriverState string → chip CSS class using the 4 defined theme variants.
|
||||
|
||||
@@ -15,7 +15,8 @@ public sealed record DriverInstanceSpec(
|
||||
string Name,
|
||||
string DriverType,
|
||||
bool Enabled,
|
||||
string DriverConfig);
|
||||
string DriverConfig,
|
||||
string? ClusterId = null);
|
||||
|
||||
public static class DeploymentArtifact
|
||||
{
|
||||
@@ -66,6 +67,7 @@ public static class DeploymentArtifact
|
||||
var type = el.TryGetProperty("DriverType", out var typeEl) ? typeEl.GetString() : null;
|
||||
var enabled = !el.TryGetProperty("Enabled", out var enEl) || enEl.GetBoolean();
|
||||
var config = el.TryGetProperty("DriverConfig", out var cfgEl) ? cfgEl.GetString() : null;
|
||||
var clusterId = el.TryGetProperty("ClusterId", out var clEl) ? clEl.GetString() : null;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(id) || string.IsNullOrWhiteSpace(type)) return null;
|
||||
|
||||
@@ -75,7 +77,8 @@ public static class DeploymentArtifact
|
||||
Name: name ?? id!,
|
||||
DriverType: type!,
|
||||
Enabled: enabled,
|
||||
DriverConfig: config ?? "{}");
|
||||
DriverConfig: config ?? "{}",
|
||||
ClusterId: clusterId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -55,7 +55,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
|
||||
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
|
||||
|
||||
private sealed record ChildEntry(IActorRef Actor, string DriverType, string LastConfigJson, bool Stubbed);
|
||||
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
|
||||
{
|
||||
// Convenience accessors for sites that don't need the full spec.
|
||||
public string DriverType => Spec.DriverType;
|
||||
public string LastConfigJson => Spec.DriverConfig;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public ITimerScheduler Timers { get; set; } = null!;
|
||||
@@ -374,7 +379,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
}
|
||||
|
||||
IActorRef child;
|
||||
var clusterId = _localNode.Value;
|
||||
// Prefer the real ClusterId from the deployment artifact; fall back to the local node
|
||||
// identity for pre-PR artifacts that don't carry it yet (older deploys persisted before
|
||||
// ClusterId was added to DriverInstanceSpec).
|
||||
var clusterId = !string.IsNullOrEmpty(spec.ClusterId) ? spec.ClusterId : _localNode.Value;
|
||||
if (stub)
|
||||
{
|
||||
child = Context.ActorOf(
|
||||
@@ -397,7 +405,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig));
|
||||
}
|
||||
|
||||
_children[spec.DriverInstanceId] = new ChildEntry(child, spec.DriverType, spec.DriverConfig, stub);
|
||||
_children[spec.DriverInstanceId] = new ChildEntry(child, spec, stub);
|
||||
_log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})",
|
||||
_localNode, spec.DriverType, spec.DriverInstanceId, stub);
|
||||
}
|
||||
@@ -406,7 +414,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return;
|
||||
entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId()));
|
||||
_children[spec.DriverInstanceId] = entry with { LastConfigJson = spec.DriverConfig };
|
||||
// Store the full new spec — a delta can change Name, Enabled, ClusterId, etc. in addition to config.
|
||||
_children[spec.DriverInstanceId] = entry with { Spec = spec };
|
||||
_log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId);
|
||||
}
|
||||
|
||||
@@ -467,14 +476,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
Context.Stop(entry.Actor);
|
||||
_children.Remove(msg.DriverInstanceId);
|
||||
|
||||
// Respawn using the same spec that was applied during the last reconcile.
|
||||
SpawnChild(new DriverInstanceSpec(
|
||||
DriverInstanceRowId: Guid.Empty,
|
||||
DriverInstanceId: msg.DriverInstanceId,
|
||||
Name: msg.DriverInstanceId,
|
||||
DriverType: entry.DriverType,
|
||||
Enabled: true,
|
||||
DriverConfig: entry.LastConfigJson));
|
||||
// Respawn from the same spec the last reconcile used — preserves RowId, Name, ClusterId.
|
||||
SpawnChild(entry.Spec);
|
||||
}
|
||||
|
||||
private void HandleReconnectDriver(ReconnectDriver msg)
|
||||
|
||||
@@ -72,9 +72,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private string? _currentConfigJson;
|
||||
|
||||
/// <summary>Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.</summary>
|
||||
/// <summary>
|
||||
/// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.
|
||||
/// No lock needed — every read/write site runs inside an Akka message handler, which is
|
||||
/// single-threaded per actor instance.
|
||||
/// </summary>
|
||||
private readonly Queue<DateTime> _faultTimestamps = new();
|
||||
private readonly object _faultLock = new();
|
||||
|
||||
/// <summary>Active subscription handle (null when not subscribed). Lifetime is one-per-actor —
|
||||
/// re-subscribe across reconnects is the consumer's responsibility today (subscribe-once
|
||||
@@ -412,28 +415,19 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
|
||||
private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0;
|
||||
|
||||
/// <summary>
|
||||
/// Records a transition into a Faulted / error state for the 5-minute sliding counter.
|
||||
/// Thread-safe: called from actor message-handling (single-threaded) but guard is cheap.
|
||||
/// </summary>
|
||||
/// <summary>Records a transition into a Faulted / error state for the 5-minute sliding counter.</summary>
|
||||
private void RecordFault()
|
||||
{
|
||||
lock (_faultLock)
|
||||
{
|
||||
_faultTimestamps.Enqueue(DateTime.UtcNow);
|
||||
}
|
||||
_faultTimestamps.Enqueue(DateTime.UtcNow);
|
||||
}
|
||||
|
||||
/// <summary>Returns how many fault transitions occurred in the last 5 minutes.</summary>
|
||||
private int ErrorCount5Min()
|
||||
{
|
||||
var cutoff = DateTime.UtcNow.AddMinutes(-5);
|
||||
lock (_faultLock)
|
||||
{
|
||||
while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff)
|
||||
_faultTimestamps.Dequeue();
|
||||
return _faultTimestamps.Count;
|
||||
}
|
||||
while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff)
|
||||
_faultTimestamps.Dequeue();
|
||||
return _faultTimestamps.Count;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
Reference in New Issue
Block a user