fix(alarms): subscribe native alarms to un-gate the IAlarmSource feed
Phase B native alarms never fired end-to-end: GalaxyDriver suppresses OnAlarmEvent until an alarm subscription exists (_alarmSubscriptions.Count > 0), but the runtime only attached the OnAlarmEvent handler and never called SubscribeAlarmsAsync — so the central feed stayed gated and no transition reached the Part 9 condition / /alerts. Unit tests passed because they inject through the IAlarmSource seam directly; the deferred live /run surfaced it. DriverHostActor computes per-driver alarm refs (alarm-bearing tags' FullNames) and hands them via SetDesiredSubscriptions; DriverInstanceActor calls SubscribeAlarmsAsync for IAlarmSource drivers on Connected entry and whenever alarm refs are pushed while Connected (the deploy path), idempotent via a cached handle reset on detach so reconnect re-subscribes.
This commit is contained in:
@@ -854,6 +854,21 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
.ToArray(),
|
||||
StringComparer.Ordinal);
|
||||
|
||||
// Native-alarm subscription set: the alarm-bearing tags' FullNames (= the driver's
|
||||
// ConditionId/AlarmFullReference). An IAlarmSource driver suppresses OnAlarmEvent until at least one
|
||||
// alarm subscription exists (e.g. GalaxyDriver gates its central feed on _alarmSubscriptions), so the
|
||||
// instance actor must SubscribeAlarmsAsync these refs to un-gate the feed. Routing stays by
|
||||
// ConditionId in ForwardNativeAlarm; this set just opens (and scopes) the subscription.
|
||||
var alarmRefsByDriver = composition.EquipmentTags
|
||||
.Where(t => t.Alarm is not null)
|
||||
.GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal)
|
||||
.ToDictionary(
|
||||
g => g.Key,
|
||||
g => (IReadOnlyList<string>)g.Select(t => t.FullName)
|
||||
.Distinct(StringComparer.Ordinal)
|
||||
.ToArray(),
|
||||
StringComparer.Ordinal);
|
||||
|
||||
// Rebuild the driver live-value routing map from the SAME EquipmentTags pass (mirrors
|
||||
// VirtualTagHostActor._nodeIdByVtag): map each tag's (DriverInstanceId, FullName) wire-ref to
|
||||
// the folder-scoped equipment NodeId the materialiser placed its variable at, so ForwardToMux
|
||||
@@ -904,7 +919,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
foreach (var (driverId, entry) in _children)
|
||||
{
|
||||
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
|
||||
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval));
|
||||
var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty<string>();
|
||||
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs));
|
||||
total += refs.Count;
|
||||
}
|
||||
|
||||
|
||||
@@ -49,7 +49,18 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
/// <see cref="DriverHostActor"/> a single message to drive the SubscribeBulk pass after an
|
||||
/// apply. Sending an empty set clears the desired subscription.
|
||||
/// </summary>
|
||||
public sealed record SetDesiredSubscriptions(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
|
||||
/// <param name="AlarmReferences">
|
||||
/// The native-alarm references (alarm-bearing equipment-tag FullNames = the driver's
|
||||
/// <c>ConditionId</c>/AlarmFullReference) this driver should keep an alarm subscription open for.
|
||||
/// An <see cref="IAlarmSource"/> driver suppresses <see cref="IAlarmSource.OnAlarmEvent"/> until at
|
||||
/// least one alarm subscription exists, so the actor calls
|
||||
/// <see cref="IAlarmSource.SubscribeAlarmsAsync"/> with this set to un-gate the native feed. Empty
|
||||
/// (or null) means the driver has no alarm tags. Defaults to null so non-alarm callers are unchanged.
|
||||
/// </param>
|
||||
public sealed record SetDesiredSubscriptions(
|
||||
IReadOnlyList<string> FullReferences,
|
||||
TimeSpan PublishingInterval,
|
||||
IReadOnlyList<string>? AlarmReferences = null);
|
||||
public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount);
|
||||
public sealed record SubscriptionFailed(string Reason);
|
||||
public sealed record Unsubscribe;
|
||||
@@ -69,6 +80,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
/// to the materialised Part 9 condition. Parallels <see cref="AttributeValuePublished"/>.</summary>
|
||||
public sealed record AttributeAlarmPublished(string DriverInstanceId, AlarmEventArgs Args);
|
||||
private sealed record NativeAlarmRaised(AlarmEventArgs Args);
|
||||
/// <summary>Self-sent on Connected entry / when alarm refs are (re)pushed, to establish the native-alarm
|
||||
/// subscription that un-gates an <see cref="IAlarmSource"/> driver's feed. Handled async so the
|
||||
/// <see cref="IAlarmSource.SubscribeAlarmsAsync"/> call is bounded + off the synchronous handlers.</summary>
|
||||
private sealed record SubscribeAlarms;
|
||||
public sealed class RetryConnect
|
||||
{
|
||||
public static readonly RetryConnect Instance = new();
|
||||
@@ -112,6 +127,17 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
private IReadOnlyList<string> _desiredRefs = Array.Empty<string>();
|
||||
private TimeSpan _desiredInterval = TimeSpan.FromSeconds(1);
|
||||
|
||||
/// <summary>The native-alarm references the host wants kept subscribed (set by
|
||||
/// <see cref="SetDesiredSubscriptions"/>). Re-applied on every <c>Connected</c> entry so the alarm
|
||||
/// feed is re-un-gated after a reconnect/redeploy.</summary>
|
||||
private IReadOnlyList<string> _desiredAlarmRefs = Array.Empty<string>();
|
||||
|
||||
/// <summary>The active native-alarm subscription handle for an <see cref="IAlarmSource"/> driver, or
|
||||
/// null when none is established. Reset on <see cref="DetachAlarmSource"/> so the next Connected entry
|
||||
/// re-subscribes against the freshly re-initialised driver; the null check makes the subscribe
|
||||
/// idempotent across repeated <see cref="SetDesiredSubscriptions"/> pushes.</summary>
|
||||
private IAlarmSubscriptionHandle? _alarmSubscriptionHandle;
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the timer scheduler for scheduling reconnection attempts.
|
||||
/// </summary>
|
||||
@@ -237,6 +263,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
PublishHealthSnapshot();
|
||||
ResubscribeDesired();
|
||||
AttachAlarmSource();
|
||||
SubscribeDesiredAlarms();
|
||||
});
|
||||
Receive<InitializeFailed>(msg =>
|
||||
{
|
||||
@@ -291,7 +318,13 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
StoreDesiredSubscriptions(msg);
|
||||
if (_desiredRefs.Count > 0) Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
|
||||
else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe());
|
||||
// Native-alarm analogue: un-gate the IAlarmSource feed when alarm tags are (now) present. The
|
||||
// common live path — a deploy delivers SetDesiredSubscriptions while the driver is already
|
||||
// Connected — flows through HERE, so the alarm subscribe must happen on this message, not only
|
||||
// on Connected entry.
|
||||
SubscribeDesiredAlarms();
|
||||
});
|
||||
ReceiveAsync<SubscribeAlarms>(HandleSubscribeAlarmsAsync);
|
||||
Receive<DataChangeForward>(OnDataChangeForward);
|
||||
// Native alarm transition marshaled onto the actor thread from the driver's OnAlarmEvent;
|
||||
// project it to the parent the same way DataChangeForward projects AttributeValuePublished.
|
||||
@@ -325,6 +358,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
PublishHealthSnapshot();
|
||||
ResubscribeDesired();
|
||||
AttachAlarmSource();
|
||||
SubscribeDesiredAlarms();
|
||||
});
|
||||
// A failure here is a no-op regardless of generation — the retry timer keeps trying the
|
||||
// current config; only a (generation-matched) InitializeSucceeded transitions state.
|
||||
@@ -524,6 +558,44 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
if (_driver is IAlarmSource src && _alarmEventHandler is not null)
|
||||
src.OnAlarmEvent -= _alarmEventHandler;
|
||||
_alarmEventHandler = null;
|
||||
// Drop the handle so the next Connected entry re-subscribes against the freshly re-initialised
|
||||
// driver (its old alarm-subscription set was cleared on reconnect). The desired alarm refs persist.
|
||||
_alarmSubscriptionHandle = null;
|
||||
}
|
||||
|
||||
/// <summary>Establish the native-alarm subscription that un-gates an <see cref="IAlarmSource"/> driver's
|
||||
/// feed — the driver suppresses <see cref="IAlarmSource.OnAlarmEvent"/> until at least one alarm
|
||||
/// subscription exists. Self-sends the async <see cref="SubscribeAlarms"/> the Connected behaviour
|
||||
/// handles. Idempotent: a no-op unless the driver is an <see cref="IAlarmSource"/>, alarm refs are
|
||||
/// desired, and no subscription is yet established.</summary>
|
||||
private void SubscribeDesiredAlarms()
|
||||
{
|
||||
if (_driver is IAlarmSource && _desiredAlarmRefs.Count > 0 && _alarmSubscriptionHandle is null)
|
||||
Self.Tell(new SubscribeAlarms());
|
||||
}
|
||||
|
||||
/// <summary>Calls the driver's <see cref="IAlarmSource.SubscribeAlarmsAsync"/> (bounded) to register the
|
||||
/// alarm subscription that un-gates its native feed, caching the returned handle. Re-checks the guard
|
||||
/// (the desired set may have cleared, or another SubscribeAlarms may have already established a handle,
|
||||
/// while this was queued). Failures are logged and retried on the next Connected entry — the feed simply
|
||||
/// stays gated until then.</summary>
|
||||
private async Task HandleSubscribeAlarmsAsync(SubscribeAlarms _)
|
||||
{
|
||||
if (_driver is not IAlarmSource src || _desiredAlarmRefs.Count == 0 || _alarmSubscriptionHandle is not null)
|
||||
return;
|
||||
var refs = _desiredAlarmRefs;
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
try
|
||||
{
|
||||
_alarmSubscriptionHandle = await src.SubscribeAlarmsAsync(refs, cts.Token);
|
||||
_log.Info("DriverInstance {Id}: native-alarm subscription established for {Count} alarm ref(s) ({Diag})",
|
||||
_driverInstanceId, refs.Count, _alarmSubscriptionHandle.DiagnosticId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "DriverInstance {Id}: native-alarm subscription failed — feed stays gated until reconnect",
|
||||
_driverInstanceId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Records the host's desired subscription set without touching the live subscription.
|
||||
@@ -532,6 +604,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
_desiredRefs = msg.FullReferences;
|
||||
_desiredInterval = msg.PublishingInterval;
|
||||
_desiredAlarmRefs = msg.AlarmReferences ?? Array.Empty<string>();
|
||||
}
|
||||
|
||||
/// <summary>Re-establish the desired subscription after (re)connecting. Self-sends the one-shot
|
||||
|
||||
+51
-2
@@ -64,6 +64,42 @@ public sealed class DriverInstanceActorNativeAlarmTests : RuntimeActorTestBase
|
||||
.FullReference.ShouldBe("tag-z");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The native-alarm path is gated at the driver: an <see cref="IAlarmSource"/> suppresses
|
||||
/// <c>OnAlarmEvent</c> until at least one alarm subscription exists (e.g. GalaxyDriver gates its
|
||||
/// central feed). When the host pushes a <see cref="DriverInstanceActor.SetDesiredSubscriptions"/>
|
||||
/// carrying native-alarm refs to a <c>Connected</c> driver — the live deploy path — the actor must
|
||||
/// call <see cref="IAlarmSource.SubscribeAlarmsAsync"/> with those refs to un-gate the feed, and must
|
||||
/// NOT re-subscribe when the same set is redeployed (the feed is already un-gated).
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Alarm_subscription_is_established_when_alarm_refs_are_pushed_while_connected()
|
||||
{
|
||||
var driver = new AlarmSourceStubDriver();
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
|
||||
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||
AwaitCondition(() => driver.AlarmSubscriberCount == 1, TimeSpan.FromSeconds(2)); // reached Connected
|
||||
driver.SubscribeAlarmsCallCount.ShouldBe(0, "no alarm subscription before any alarm refs are pushed");
|
||||
|
||||
// A deploy delivers the desired set with a native-alarm ref while the driver is already Connected.
|
||||
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(
|
||||
Array.Empty<string>(), TimeSpan.FromMilliseconds(100), new[] { "Temp.HiHi" }));
|
||||
|
||||
AwaitCondition(() => driver.SubscribeAlarmsCallCount == 1, TimeSpan.FromSeconds(2));
|
||||
driver.LastAlarmRefs.ShouldBe(new[] { "Temp.HiHi" });
|
||||
|
||||
// Redeploying the same alarm set must NOT re-subscribe (idempotent — already un-gated). Round-trip a
|
||||
// data Subscribe to flush the mailbox so the second SetDesiredSubscriptions is fully processed first.
|
||||
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(
|
||||
Array.Empty<string>(), TimeSpan.FromMilliseconds(100), new[] { "Temp.HiHi" }));
|
||||
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
||||
new DriverInstanceActor.Subscribe(new[] { "tag-z" }, TimeSpan.FromMilliseconds(100)),
|
||||
TimeSpan.FromSeconds(3));
|
||||
driver.SubscribeAlarmsCallCount.ShouldBe(1, "an already-established alarm subscription is not re-issued");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A driver that is NOT an <see cref="IAlarmSource"/> (only <see cref="ISubscribable"/>) connects
|
||||
/// and serves data changes normally — <c>AttachAlarmSource</c> is a safe no-op (no crash) and no
|
||||
@@ -266,10 +302,23 @@ public sealed class DriverInstanceActorNativeAlarmTests : RuntimeActorTestBase
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) =>
|
||||
Task.CompletedTask;
|
||||
|
||||
private int _subscribeAlarmsCallCount;
|
||||
|
||||
/// <summary>Number of <see cref="SubscribeAlarmsAsync"/> calls — proves the actor established the
|
||||
/// native-alarm subscription that un-gates the feed (incremented on the actor dispatcher thread).</summary>
|
||||
public int SubscribeAlarmsCallCount => Volatile.Read(ref _subscribeAlarmsCallCount);
|
||||
|
||||
/// <summary>The node set handed to the most recent <see cref="SubscribeAlarmsAsync"/> call.</summary>
|
||||
public IReadOnlyList<string>? LastAlarmRefs { get; private set; }
|
||||
|
||||
/// <summary>Subscribes to alarm events for the specified node set.</summary>
|
||||
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||||
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken) =>
|
||||
Task.FromResult<IAlarmSubscriptionHandle>(new StubAlarmHandle());
|
||||
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
|
||||
{
|
||||
LastAlarmRefs = sourceNodeIds;
|
||||
Interlocked.Increment(ref _subscribeAlarmsCallCount);
|
||||
return Task.FromResult<IAlarmSubscriptionHandle>(new StubAlarmHandle());
|
||||
}
|
||||
|
||||
/// <summary>Cancels an alarm subscription.</summary>
|
||||
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) =>
|
||||
|
||||
Reference in New Issue
Block a user