refactor(driver-pages): address Phase 6/8 deep-review findings
v2-ci / build (push) Failing after 32s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
v2-ci / build (push) Failing after 32s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
- Topic-name drift fix: DriverHealthChanged.TopicName and DriverControlTopic.Name now live on the message contracts in Commons. AkkaDriverHealthPublisher, DriverStatusSignalRBridge, DriverHostActor, and AdminOperationsActor all delegate to the single constant so a rename can't silently desynchronise publisher and subscriber. - DriverStatusPanel._opResultClearTimer switched from System.Timers.Timer to System.Threading.Timer + awaited DisposeAsync. Prevents an in-flight 8s clear-callback from invoking StateHasChanged on a component whose hub has already been released. - PublishHealthSnapshot deduplicates against the last published (state, lastSuccess, lastError, errorCount) fingerprint. The 30s heartbeat no longer floods the SignalR layer with identical Healthy snapshots — newly-joined clients still warm up via the snapshot store on JoinDriver.
This commit is contained in:
@@ -1,5 +1,16 @@
|
|||||||
namespace ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
namespace ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Shared DPS topic for driver-control commands (<see cref="RestartDriver"/>,
|
||||||
|
/// <see cref="ReconnectDriver"/>). Publishers (AdminOperationsActor) and subscribers
|
||||||
|
/// (DriverHostActor) reference this single constant so renames can't silently
|
||||||
|
/// desynchronise.
|
||||||
|
/// </summary>
|
||||||
|
public static class DriverControlTopic
|
||||||
|
{
|
||||||
|
public const string Name = "driver-control";
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// AdminUI → AdminOperationsActor: restart the driver actor for one instance.
|
/// AdminUI → AdminOperationsActor: restart the driver actor for one instance.
|
||||||
/// A restart fully stops and respawns the actor — loses in-memory state, may briefly
|
/// A restart fully stops and respawns the actor — loses in-memory state, may briefly
|
||||||
|
|||||||
@@ -20,4 +20,12 @@ public sealed record DriverHealthChanged(
|
|||||||
DateTime? LastSuccessfulReadUtc,
|
DateTime? LastSuccessfulReadUtc,
|
||||||
string? LastError,
|
string? LastError,
|
||||||
int ErrorCount5Min,
|
int ErrorCount5Min,
|
||||||
DateTime PublishedUtc);
|
DateTime PublishedUtc)
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// DPS topic name. Both the runtime <c>AkkaDriverHealthPublisher</c> and the AdminUI
|
||||||
|
/// <c>DriverStatusSignalRBridge</c> reference this single constant so renames can't
|
||||||
|
/// silently desynchronise publisher and subscriber.
|
||||||
|
/// </summary>
|
||||||
|
public const string TopicName = "driver-health";
|
||||||
|
}
|
||||||
|
|||||||
+10
-11
@@ -157,7 +157,7 @@
|
|||||||
private bool _showRestartConfirm;
|
private bool _showRestartConfirm;
|
||||||
private string? _opResultMessage;
|
private string? _opResultMessage;
|
||||||
private bool _opResultOk;
|
private bool _opResultOk;
|
||||||
private System.Timers.Timer? _opResultClearTimer;
|
private System.Threading.Timer? _opResultClearTimer;
|
||||||
|
|
||||||
protected override async Task OnInitializedAsync()
|
protected override async Task OnInitializedAsync()
|
||||||
{
|
{
|
||||||
@@ -257,24 +257,23 @@
|
|||||||
{
|
{
|
||||||
_opResultOk = ok;
|
_opResultOk = ok;
|
||||||
_opResultMessage = message;
|
_opResultMessage = message;
|
||||||
// Auto-clear the result chip after 8 s.
|
// Auto-clear the result chip after 8 s. System.Threading.Timer is used (not
|
||||||
|
// System.Timers.Timer) so DisposeAsync can drain any in-flight callback.
|
||||||
_opResultClearTimer?.Dispose();
|
_opResultClearTimer?.Dispose();
|
||||||
_opResultClearTimer = new System.Timers.Timer(8_000) { AutoReset = false };
|
_opResultClearTimer = new System.Threading.Timer(_ =>
|
||||||
_opResultClearTimer.Elapsed += async (_, _) =>
|
|
||||||
{
|
{
|
||||||
_opResultMessage = null;
|
_opResultMessage = null;
|
||||||
await InvokeAsync(StateHasChanged);
|
InvokeAsync(StateHasChanged);
|
||||||
};
|
}, null, TimeSpan.FromSeconds(8), Timeout.InfiniteTimeSpan);
|
||||||
_opResultClearTimer.Start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask DisposeAsync()
|
public async ValueTask DisposeAsync()
|
||||||
{
|
{
|
||||||
// Drain the timer first so an in-flight callback can't invoke StateHasChanged on
|
// Drain BOTH timers first so an in-flight callback can't invoke StateHasChanged on
|
||||||
// a component that's already releasing its hub. System.Threading.Timer implements
|
// a component whose hub has already been released. System.Threading.Timer's async
|
||||||
// IAsyncDisposable in .NET 6+; the async dispose awaits any in-flight callback.
|
// dispose awaits any in-flight callback (.NET 6+).
|
||||||
if (_timer is not null) await _timer.DisposeAsync();
|
if (_timer is not null) await _timer.DisposeAsync();
|
||||||
_opResultClearTimer?.Dispose();
|
if (_opResultClearTimer is not null) await _opResultClearTimer.DisposeAsync();
|
||||||
if (_hub is not null) await _hub.DisposeAsync();
|
if (_hub is not null) await _hub.DisposeAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ namespace ZB.MOM.WW.OtOpcUa.AdminUI.Hubs;
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class DriverStatusSignalRBridge : ReceiveActor
|
public sealed class DriverStatusSignalRBridge : ReceiveActor
|
||||||
{
|
{
|
||||||
public const string TopicName = "driver-health";
|
public const string TopicName = DriverHealthChanged.TopicName;
|
||||||
|
|
||||||
private readonly IHubContext<DriverStatusHub> _hub;
|
private readonly IHubContext<DriverStatusHub> _hub;
|
||||||
private readonly IDriverStatusSnapshotStore _store;
|
private readonly IDriverStatusSnapshotStore _store;
|
||||||
|
|||||||
@@ -178,7 +178,7 @@ public sealed class AdminOperationsActor : ReceiveActor
|
|||||||
{
|
{
|
||||||
// Broadcast to every DriverHostActor on every node via the driver-control DPS topic.
|
// Broadcast to every DriverHostActor on every node via the driver-control DPS topic.
|
||||||
// Only the host that owns the instance will act; others ignore it (id not found in _children).
|
// Only the host that owns the instance will act; others ignore it (id not found in _children).
|
||||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("driver-control", msg));
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
|
||||||
|
|
||||||
await using var db = await _dbFactory.CreateDbContextAsync();
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
||||||
db.ConfigEdits.Add(new ConfigEdit
|
db.ConfigEdits.Add(new ConfigEdit
|
||||||
@@ -208,7 +208,7 @@ public sealed class AdminOperationsActor : ReceiveActor
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Broadcast to every DriverHostActor; only the one owning the instance reacts.
|
// Broadcast to every DriverHostActor; only the one owning the instance reacts.
|
||||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("driver-control", msg));
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
|
||||||
|
|
||||||
await using var db = await _dbFactory.CreateDbContextAsync();
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
||||||
db.ConfigEdits.Add(new ConfigEdit
|
db.ConfigEdits.Add(new ConfigEdit
|
||||||
|
|||||||
@@ -12,8 +12,9 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class AkkaDriverHealthPublisher : IDriverHealthPublisher
|
public sealed class AkkaDriverHealthPublisher : IDriverHealthPublisher
|
||||||
{
|
{
|
||||||
/// <summary>The DistributedPubSub topic name for driver-health snapshots.</summary>
|
/// <summary>The DistributedPubSub topic name for driver-health snapshots — single source
|
||||||
public const string TopicName = "driver-health";
|
/// of truth on the message contract itself.</summary>
|
||||||
|
public const string TopicName = DriverHealthChanged.TopicName;
|
||||||
|
|
||||||
private readonly ActorSystem _system;
|
private readonly ActorSystem _system;
|
||||||
|
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
{
|
{
|
||||||
public const string DeploymentsTopic = "deployments";
|
public const string DeploymentsTopic = "deployments";
|
||||||
public const string DeploymentAcksTopic = "deployment-acks";
|
public const string DeploymentAcksTopic = "deployment-acks";
|
||||||
public const string DriverControlTopic = "driver-control";
|
public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name;
|
||||||
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
|
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
|
||||||
|
|
||||||
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
||||||
|
|||||||
@@ -434,13 +434,22 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
/// Polls <see cref="IDriver.GetHealth"/> and forwards the snapshot to the health publisher.
|
/// Polls <see cref="IDriver.GetHealth"/> and forwards the snapshot to the health publisher.
|
||||||
/// Called on every observable state change and by the periodic <see cref="HealthPollTick"/>
|
/// Called on every observable state change and by the periodic <see cref="HealthPollTick"/>
|
||||||
/// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients.
|
/// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients.
|
||||||
|
/// Deduplicates: if the resulting (state, lastSuccess, lastError, errorCount) tuple matches
|
||||||
|
/// the last publish, this call is a no-op. Stops flood-publishing identical Healthy snapshots
|
||||||
|
/// every 30s when nothing has changed. Newly-joined SignalR clients still get the current
|
||||||
|
/// snapshot via <c>DriverStatusHub.JoinDriver</c> which reads the store directly.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private void PublishHealthSnapshot()
|
private void PublishHealthSnapshot()
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var health = _driver.GetHealth();
|
var health = _driver.GetHealth();
|
||||||
_healthPublisher.Publish(_clusterId, _driverInstanceId, health, ErrorCount5Min());
|
var errorCount = ErrorCount5Min();
|
||||||
|
var fingerprint = (health.State, health.LastSuccessfulRead, health.LastError, errorCount);
|
||||||
|
if (_lastPublishedFingerprint is { } prev && prev.Equals(fingerprint))
|
||||||
|
return;
|
||||||
|
_lastPublishedFingerprint = fingerprint;
|
||||||
|
_healthPublisher.Publish(_clusterId, _driverInstanceId, health, errorCount);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -448,6 +457,9 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Fingerprint of the last <see cref="PublishHealthSnapshot"/> call; null until first publish.</summary>
|
||||||
|
private (DriverState State, DateTime? LastSuccess, string? LastError, int ErrorCount)? _lastPublishedFingerprint;
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
protected override void PostStop()
|
protected override void PostStop()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user