c1ce5833e9
v2-ci / build (push) Failing after 51s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
Materialised SystemPlatform/Galaxy variables previously stayed BadWaitingForInitialData because nothing told the driver to subscribe (OpcUaPublishActor TODO 'on a future SubscribeBulk pass') and published values were only forwarded to the VirtualTag mux, never the OPC UA sink. DriverHostActor now, after each apply, groups the deployment's galaxy tag MXAccess refs by driver and sends DriverInstanceActor.SetDesiredSubscriptions; the actor retains the set and (re)subscribes on every Connected entry, so values resume after reconnects/redeploys (closes the F8b/#113 gap). Published values are also forwarded to OpcUaPublishActor as AttributeValueUpdate (NodeId == galaxy MxAccessRef) so the materialised variable shows live data. Verified live in docker-dev: galaxy TestMachine_001 tags go Good with a changing TestChangingInt. +1 unit test.
518 lines
23 KiB
C#
518 lines
23 KiB
C#
using Akka.Actor;
|
|
using Akka.Event;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
|
|
|
// private timer key type — file-scoped so the name stays unique per-file
|
|
file sealed class HealthPollTick
|
|
{
|
|
public static readonly HealthPollTick Instance = new();
|
|
private HealthPollTick() { }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Akka wrapper for a single <see cref="IDriver"/> instance. States:
|
|
///
|
|
/// <list type="bullet">
|
|
/// <item><c>Connecting</c> — calling <see cref="IDriver.InitializeAsync"/>.</item>
|
|
/// <item><c>Connected</c> — initialised; serving Read/Write/Subscribe requests.</item>
|
|
/// <item><c>Reconnecting</c> — disconnect observed; periodic retry of Initialize.</item>
|
|
/// <item><c>Failed</c> — terminal until parent restarts the actor.</item>
|
|
/// </list>
|
|
///
|
|
/// Engine wiring (subscriptions → AttributeValueUpdate publishes, ApplyDelta-driven Reinitialize,
|
|
/// per-tag write Asks) is staged for follow-up F7. This skeleton compiles + has a working
|
|
/// state machine so the Phase 6 control-plane integration tests can target it.
|
|
/// </summary>
|
|
public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|
{
|
|
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
|
|
|
|
public sealed record InitializeRequested(string DriverConfigJson);
|
|
public sealed record InitializeSucceeded;
|
|
public sealed record InitializeFailed(string Reason);
|
|
public sealed record DisconnectObserved(string Reason);
|
|
public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation);
|
|
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
|
|
public sealed record WriteAttribute(string TagId, object Value);
|
|
public sealed record WriteAttributeResult(bool Success, string? Reason);
|
|
public sealed record Subscribe(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
|
|
/// <summary>
|
|
/// Sets the set of references this driver should keep subscribed for the lifetime of the
|
|
/// current deployment. Unlike the one-shot <see cref="Subscribe"/>, the desired set is
|
|
/// retained and (re)established automatically every time the actor (re)enters
|
|
/// <c>Connected</c> — closing the F8b/#113 "re-subscribe across reconnects" gap and giving
|
|
/// <see cref="DriverHostActor"/> a single message to drive the SubscribeBulk pass after an
|
|
/// apply. Sending an empty set clears the desired subscription.
|
|
/// </summary>
|
|
public sealed record SetDesiredSubscriptions(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
|
|
public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount);
|
|
public sealed record SubscriptionFailed(string Reason);
|
|
public sealed record Unsubscribe;
|
|
/// <summary>
|
|
/// Sent by <see cref="DriverHostActor"/> when the AdminUI issues a Reconnect operation.
|
|
/// Pushes the actor out of <c>Connected</c> into <c>Reconnecting</c> so the transport is
|
|
/// re-established without fully stopping and respawning the actor. Safe to send in any
|
|
/// state — a no-op when already Reconnecting or Connecting.
|
|
/// </summary>
|
|
public sealed record ForceReconnect;
|
|
/// <summary>Published to the actor's parent whenever the subscribed IDriver fires
|
|
/// <see cref="ISubscribable.OnDataChange"/>. The parent forwards to OpcUaPublishActor.</summary>
|
|
public sealed record AttributeValuePublished(string FullReference, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
|
|
private sealed record DataChangeForward(string FullReference, DataValueSnapshot Snapshot);
|
|
public sealed class RetryConnect
|
|
{
|
|
public static readonly RetryConnect Instance = new();
|
|
private RetryConnect() { }
|
|
}
|
|
|
|
/// <summary>Interval between periodic health-poll heartbeats sent to the snapshot store.</summary>
|
|
public static readonly TimeSpan HealthPollInterval = TimeSpan.FromSeconds(30);
|
|
|
|
private readonly IDriver _driver;
|
|
private readonly string _driverInstanceId;
|
|
private readonly string _clusterId;
|
|
private readonly IDriverHealthPublisher _healthPublisher;
|
|
private readonly TimeSpan _reconnectInterval;
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
private string? _currentConfigJson;
|
|
|
|
/// <summary>
|
|
/// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.
|
|
/// No lock needed — every read/write site runs inside an Akka message handler, which is
|
|
/// single-threaded per actor instance.
|
|
/// </summary>
|
|
private readonly Queue<DateTime> _faultTimestamps = new();
|
|
|
|
/// <summary>Active subscription handle (null when not subscribed). Lifetime is one-per-actor —
|
|
/// re-subscribe across reconnects is the consumer's responsibility today (subscribe-once
|
|
/// semantics keep the actor simple; mux-driven re-subscribe is tracked as F8b/#113).</summary>
|
|
private ISubscriptionHandle? _subscriptionHandle;
|
|
private EventHandler<DataChangeEventArgs>? _dataChangeHandler;
|
|
|
|
/// <summary>The references the host wants kept subscribed (set by <see cref="SetDesiredSubscriptions"/>).
|
|
/// Re-applied on every entry into <c>Connected</c> so values resume after a reconnect or redeploy.</summary>
|
|
private IReadOnlyList<string> _desiredRefs = Array.Empty<string>();
|
|
private TimeSpan _desiredInterval = TimeSpan.FromSeconds(1);
|
|
|
|
/// <summary>
|
|
/// Gets or sets the timer scheduler for scheduling reconnection attempts.
|
|
/// </summary>
|
|
public ITimerScheduler Timers { get; set; } = null!;
|
|
|
|
/// <summary>
|
|
/// Creates a Props object for instantiating a <see cref="DriverInstanceActor"/>.
|
|
/// </summary>
|
|
/// <param name="driver">The driver instance to wrap.</param>
|
|
/// <param name="reconnectInterval">Optional interval for reconnection attempts; defaults to 10 seconds.</param>
|
|
/// <param name="startStubbed">If true, the actor starts in stub mode for testing or unavailable platforms.</param>
|
|
/// <param name="healthPublisher">Optional health publisher; defaults to <see cref="NullDriverHealthPublisher"/> so tests and
|
|
/// stub paths don't need to provide one.</param>
|
|
/// <param name="clusterId">Optional cluster identifier forwarded in <see cref="DriverHealthChanged"/> messages;
|
|
/// defaults to an empty string when not provided (e.g. in unit tests).</param>
|
|
public static Props Props(
|
|
IDriver driver,
|
|
TimeSpan? reconnectInterval = null,
|
|
bool startStubbed = false,
|
|
IDriverHealthPublisher? healthPublisher = null,
|
|
string? clusterId = null) =>
|
|
Akka.Actor.Props.Create(() => new DriverInstanceActor(
|
|
driver,
|
|
reconnectInterval ?? DefaultReconnectInterval,
|
|
startStubbed,
|
|
healthPublisher ?? NullDriverHealthPublisher.Instance,
|
|
clusterId ?? string.Empty));
|
|
|
|
/// <summary>
|
|
/// Returns true when the driver should boot in DEV-STUB mode based on host platform and
|
|
/// configured roles. Only the v1 in-process types stay Windows-only:
|
|
/// <list type="bullet">
|
|
/// <item><c>"Galaxy"</c> — legacy MXAccess COM proxy (retired in PR 7.2; gated for any
|
|
/// leftover DriverInstance rows that still reference the old type name).</item>
|
|
/// <item><c>"Historian.Wonderware"</c> — Wonderware Historian sidecar over Windows-only
|
|
/// named pipes.</item>
|
|
/// </list>
|
|
/// The v2 <c>"GalaxyMxGateway"</c> driver talks gRPC to an external mxaccessgw process,
|
|
/// so it runs on any platform .NET 10 supports — Linux containers included. Not stubbed.
|
|
/// </summary>
|
|
/// <param name="driverType">The type identifier of the driver.</param>
|
|
/// <param name="roles">Operational roles configured for this instance.</param>
|
|
public static bool ShouldStub(string driverType, IEnumerable<string> roles)
|
|
{
|
|
var isWindowsOnly = driverType is "Galaxy" or "Historian.Wonderware";
|
|
if (!OperatingSystem.IsWindows() && isWindowsOnly) return true;
|
|
if (roles.Contains("dev") && isWindowsOnly) return true;
|
|
return false;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="DriverInstanceActor"/> class.
|
|
/// </summary>
|
|
/// <param name="driver">The driver instance to wrap and manage.</param>
|
|
/// <param name="reconnectInterval">Interval between reconnection attempts.</param>
|
|
/// <param name="startStubbed">If true, start in stub mode for testing or unavailable platforms.</param>
|
|
/// <param name="healthPublisher">Sink for health-change notifications; must not be null.</param>
|
|
/// <param name="clusterId">Cluster identifier forwarded in health snapshots.</param>
|
|
public DriverInstanceActor(
|
|
IDriver driver,
|
|
TimeSpan reconnectInterval,
|
|
bool startStubbed = false,
|
|
IDriverHealthPublisher? healthPublisher = null,
|
|
string? clusterId = null)
|
|
{
|
|
_driver = driver;
|
|
_driverInstanceId = driver.DriverInstanceId;
|
|
_clusterId = clusterId ?? string.Empty;
|
|
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
|
_reconnectInterval = reconnectInterval;
|
|
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
|
|
new KeyValuePair<string, object?>("event", startStubbed ? "spawn_stub" : "spawn"),
|
|
new KeyValuePair<string, object?>("driver_type", driver.DriverType));
|
|
if (startStubbed)
|
|
{
|
|
Context.GetLogger().Info("[DEV-STUB] driver={Name} type={Type}",
|
|
_driverInstanceId, driver.DriverType);
|
|
Become(Stubbed);
|
|
}
|
|
else
|
|
{
|
|
Become(Connecting);
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
protected override void PreStart()
|
|
{
|
|
// Warm up the snapshot store immediately so AdminUI sees current state as soon as the
|
|
// actor starts, before any state transition fires. Also start the periodic heartbeat so
|
|
// long-lived Healthy drivers keep their snapshot fresh for newly-joined SignalR clients.
|
|
PublishHealthSnapshot();
|
|
Timers.StartPeriodicTimer("health-poll", HealthPollTick.Instance, HealthPollInterval);
|
|
}
|
|
|
|
private void Stubbed()
|
|
{
|
|
// Stubbed drivers accept the standard message contracts but return deterministic
|
|
// success without touching real hardware. Read returns null; Write succeeds.
|
|
Receive<InitializeRequested>(_ => { /* no-op */ });
|
|
Receive<ApplyDelta>(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation)));
|
|
Receive<WriteAttribute>(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed")));
|
|
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
|
|
Receive<ForceReconnect>(_ => { /* stubbed drivers don't reconnect */ });
|
|
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
|
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
|
}
|
|
|
|
private void Connecting()
|
|
{
|
|
Receive<InitializeRequested>(msg => InitializeAsync(msg.DriverConfigJson));
|
|
Receive<InitializeSucceeded>(_ =>
|
|
{
|
|
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
|
|
Become(Connected);
|
|
PublishHealthSnapshot();
|
|
ResubscribeDesired();
|
|
});
|
|
Receive<InitializeFailed>(msg =>
|
|
{
|
|
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
|
|
RecordFault();
|
|
Become(Reconnecting);
|
|
PublishHealthSnapshot();
|
|
});
|
|
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
|
Receive<ForceReconnect>(_ => { /* already connecting — no-op */ });
|
|
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
|
}
|
|
|
|
private void Connected()
|
|
{
|
|
ReceiveAsync<ApplyDelta>(HandleApplyDeltaAsync);
|
|
Receive<DisconnectObserved>(msg =>
|
|
{
|
|
_log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting",
|
|
_driverInstanceId, msg.Reason);
|
|
DetachSubscription();
|
|
RecordFault();
|
|
Become(Reconnecting);
|
|
PublishHealthSnapshot();
|
|
});
|
|
Receive<ForceReconnect>(_ =>
|
|
{
|
|
_log.Info("DriverInstance {Id}: ForceReconnect requested by admin; re-entering Reconnecting", _driverInstanceId);
|
|
DetachSubscription();
|
|
Become(Reconnecting);
|
|
PublishHealthSnapshot();
|
|
});
|
|
ReceiveAsync<WriteAttribute>(HandleWriteAsync);
|
|
ReceiveAsync<Subscribe>(HandleSubscribeAsync);
|
|
ReceiveAsync<Unsubscribe>(_ => UnsubscribeAsync());
|
|
Receive<SetDesiredSubscriptions>(msg =>
|
|
{
|
|
StoreDesiredSubscriptions(msg);
|
|
if (_desiredRefs.Count > 0) Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
|
|
else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe());
|
|
});
|
|
Receive<DataChangeForward>(OnDataChangeForward);
|
|
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
|
}
|
|
|
|
private void Reconnecting()
|
|
{
|
|
Receive<RetryConnect>(_ => InitializeAsync(_currentConfigJson ?? "{}"));
|
|
Receive<InitializeSucceeded>(_ =>
|
|
{
|
|
Timers.Cancel("retry-connect");
|
|
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
|
|
Become(Connected);
|
|
PublishHealthSnapshot();
|
|
ResubscribeDesired();
|
|
});
|
|
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
|
|
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
|
Receive<ForceReconnect>(_ => { /* already reconnecting — no-op */ });
|
|
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
|
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
|
|
}
|
|
|
|
private void InitializeAsync(string driverConfigJson)
|
|
{
|
|
_currentConfigJson = driverConfigJson;
|
|
var self = Self;
|
|
_ = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
|
|
self.Tell(new InitializeSucceeded());
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
self.Tell(new InitializeFailed(ex.Message));
|
|
}
|
|
});
|
|
}
|
|
|
|
private async Task HandleApplyDeltaAsync(ApplyDelta msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
await _driver.ReinitializeAsync(msg.DriverConfigJson, CancellationToken.None);
|
|
_currentConfigJson = msg.DriverConfigJson;
|
|
replyTo.Tell(new ApplyResult(true, null, msg.Correlation));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
replyTo.Tell(new ApplyResult(false, ex.Message, msg.Correlation));
|
|
}
|
|
}
|
|
|
|
private async Task HandleWriteAsync(WriteAttribute msg)
|
|
{
|
|
if (_driver is not IWritable writable)
|
|
{
|
|
Sender.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
|
|
return;
|
|
}
|
|
|
|
var replyTo = Sender;
|
|
var request = new[] { new WriteRequest(msg.TagId, msg.Value) };
|
|
// Bound the write so a hung backend can't pin this actor forever — decision #44/#45 keeps
|
|
// retry off by default, but a stalled call still needs an answer.
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
try
|
|
{
|
|
var results = await writable.WriteAsync(request, cts.Token).ConfigureAwait(false);
|
|
if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode))
|
|
{
|
|
replyTo.Tell(new WriteAttributeResult(true, null));
|
|
return;
|
|
}
|
|
var status = results is { Count: > 0 } ? results[0].StatusCode : 0xFFFFFFFF;
|
|
replyTo.Tell(new WriteAttributeResult(false, $"StatusCode=0x{status:X8}"));
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
replyTo.Tell(new WriteAttributeResult(false, "write timeout"));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
replyTo.Tell(new WriteAttributeResult(false, ex.Message));
|
|
}
|
|
}
|
|
|
|
private async Task HandleSubscribeAsync(Subscribe msg)
|
|
{
|
|
if (_driver is not ISubscribable subscribable)
|
|
{
|
|
Sender.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
|
|
return;
|
|
}
|
|
if (_subscriptionHandle is not null)
|
|
{
|
|
// Subscribe-twice — drop the prior subscription before establishing the new one.
|
|
await UnsubscribeAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
var replyTo = Sender;
|
|
var self = Self;
|
|
try
|
|
{
|
|
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
|
|
subscribable.OnDataChange += _dataChangeHandler;
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
|
_subscriptionHandle = await subscribable
|
|
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token)
|
|
.ConfigureAwait(false);
|
|
|
|
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
|
|
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
|
|
_driverInstanceId, msg.FullReferences.Count, _subscriptionHandle.DiagnosticId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
DetachSubscription();
|
|
_log.Warning(ex, "DriverInstance {Id}: subscribe failed", _driverInstanceId);
|
|
replyTo.Tell(new SubscriptionFailed(ex.Message));
|
|
}
|
|
}
|
|
|
|
private async Task UnsubscribeAsync()
|
|
{
|
|
if (_driver is not ISubscribable subscribable || _subscriptionHandle is null)
|
|
{
|
|
DetachSubscription();
|
|
return;
|
|
}
|
|
try
|
|
{
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverInstance {Id}: unsubscribe threw (continuing)", _driverInstanceId);
|
|
}
|
|
finally
|
|
{
|
|
DetachSubscription();
|
|
}
|
|
}
|
|
|
|
/// <summary>Tear down the event handler + null the handle. Called from Unsubscribe path, on
|
|
/// PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't push
|
|
/// data-change events to an actor that has lost its driver connection.</summary>
|
|
private void DetachSubscription()
|
|
{
|
|
if (_driver is ISubscribable subscribable && _dataChangeHandler is not null)
|
|
{
|
|
subscribable.OnDataChange -= _dataChangeHandler;
|
|
}
|
|
_dataChangeHandler = null;
|
|
_subscriptionHandle = null;
|
|
}
|
|
|
|
/// <summary>Records the host's desired subscription set without touching the live subscription.
|
|
/// The set is (re)applied by <see cref="ResubscribeDesired"/> on the next <c>Connected</c> entry.</summary>
|
|
private void StoreDesiredSubscriptions(SetDesiredSubscriptions msg)
|
|
{
|
|
_desiredRefs = msg.FullReferences;
|
|
_desiredInterval = msg.PublishingInterval;
|
|
}
|
|
|
|
/// <summary>Re-establish the desired subscription after (re)connecting. Self-sends the one-shot
|
|
/// <see cref="Subscribe"/> the Connected behaviour already handles (which drops any prior handle
|
|
/// first), so values resume streaming after a reconnect or redeploy without host involvement.</summary>
|
|
private void ResubscribeDesired()
|
|
{
|
|
if (_desiredRefs.Count > 0)
|
|
{
|
|
Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
|
|
}
|
|
}
|
|
|
|
private void OnDataChangeForward(DataChangeForward msg)
|
|
{
|
|
var quality = QualityFromStatus(msg.Snapshot.StatusCode);
|
|
var ts = msg.Snapshot.SourceTimestampUtc ?? msg.Snapshot.ServerTimestampUtc;
|
|
Context.Parent.Tell(new AttributeValuePublished(msg.FullReference, msg.Snapshot.Value, quality, ts));
|
|
}
|
|
|
|
/// <summary>Translate an OPC UA status code to the 3-state <see cref="OpcUaQuality"/> projection
|
|
/// the publish actor consumes. Severity bits (top 2): 00 = Good, 01 = Uncertain, 10/11 = Bad.</summary>
|
|
private static OpcUaQuality QualityFromStatus(uint statusCode)
|
|
{
|
|
var severity = statusCode >> 30;
|
|
return severity switch
|
|
{
|
|
0 => OpcUaQuality.Good,
|
|
1 => OpcUaQuality.Uncertain,
|
|
_ => OpcUaQuality.Bad,
|
|
};
|
|
}
|
|
|
|
private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0;
|
|
|
|
/// <summary>Records a transition into a Faulted / error state for the 5-minute sliding counter.</summary>
|
|
private void RecordFault()
|
|
{
|
|
_faultTimestamps.Enqueue(DateTime.UtcNow);
|
|
}
|
|
|
|
/// <summary>Returns how many fault transitions occurred in the last 5 minutes.</summary>
|
|
private int ErrorCount5Min()
|
|
{
|
|
var cutoff = DateTime.UtcNow.AddMinutes(-5);
|
|
while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff)
|
|
_faultTimestamps.Dequeue();
|
|
return _faultTimestamps.Count;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Polls <see cref="IDriver.GetHealth"/> and forwards the snapshot to the health publisher.
|
|
/// Called on every observable state change and by the periodic <see cref="HealthPollTick"/>
|
|
/// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients.
|
|
/// Deduplicates: if the resulting (state, lastSuccess, lastError, errorCount) tuple matches
|
|
/// the last publish, this call is a no-op. Stops flood-publishing identical Healthy snapshots
|
|
/// every 30s when nothing has changed. Newly-joined SignalR clients still get the current
|
|
/// snapshot via <c>DriverStatusHub.JoinDriver</c> which reads the store directly.
|
|
/// </summary>
|
|
private void PublishHealthSnapshot()
|
|
{
|
|
try
|
|
{
|
|
var health = _driver.GetHealth();
|
|
var errorCount = ErrorCount5Min();
|
|
var fingerprint = (health.State, health.LastSuccessfulRead, health.LastError, errorCount);
|
|
if (_lastPublishedFingerprint is { } prev && prev.Equals(fingerprint))
|
|
return;
|
|
_lastPublishedFingerprint = fingerprint;
|
|
_healthPublisher.Publish(_clusterId, _driverInstanceId, health, errorCount);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverInstance {Id}: GetHealth threw during health publish; skipping", _driverInstanceId);
|
|
}
|
|
}
|
|
|
|
/// <summary>Fingerprint of the last <see cref="PublishHealthSnapshot"/> call; null until first publish.</summary>
|
|
private (DriverState State, DateTime? LastSuccess, string? LastError, int ErrorCount)? _lastPublishedFingerprint;
|
|
|
|
/// <inheritdoc />
|
|
protected override void PostStop()
|
|
{
|
|
DetachSubscription();
|
|
try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); }
|
|
catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); }
|
|
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
|
|
new KeyValuePair<string, object?>("event", "stop"),
|
|
new KeyValuePair<string, object?>("driver_type", _driver.DriverType));
|
|
}
|
|
}
|