9ab1c00265
Fixes the 8 findings from the 2026-06-24 re-review (commit c42bb485), with a
regression test per Medium finding:
- DataConnectionLayer-029 (Med): HandleAlarmSubscribeCompleted now mirrors the
tag-path re-check — if a feed is already stored for the source, release the
redundant just-created subscription instead of overwriting + leaking the first
one (the double-subscribe window DCL-023 reopened). +regression test.
- InboundAPI-031 (Med): remove WaitForAttribute's local 5s grace backstop (tighter
than the CommunicationService Ask's timeout+IntegrationTimeout round-trip budget,
so a slow-but-valid timed-out 'false' got cancelled into a 500). Link only the
client-abort + explicit caller tokens; the lower layer owns the backstop. +test.
- SiteRuntime-032 (Med): derive the deployed count from an authoritative set of
deployed config names (HashSet) instead of a map-presence-gated int, so deleting
a DISABLED instance decrements correctly (SiteRuntime-029's gate leaked it).
+deploy->disable->delete regression test.
- StoreAndForward-028 (Med): reset _bufferedCount in StopAsync alongside the
register-guard so a same-instance Stop->Start re-seeds from a clean base (no ~2N
gauge double-count). +restart regression test.
- AuditLog-017 (Low): test the OnIngestAsync scope-resolution guard (actor survives,
replies empty, counts the failure) — no longer unpinned.
- CentralUI-037 / ScriptAnalysis-009 / SiteRuntime-033 (Low): doc-comment + spec
fixes (Database-throws in the inbound sandbox; baseReferences param wording;
native-alarm cap return-to-normal + per-condition NativeAlarmDropped eviction).
Targeted suites green: SiteRuntime 5, StoreAndForward 6, InboundAPI 31,
DataConnectionLayer 10, AuditLog 5, ScriptAnalysis 40, CentralUI ScriptAnalysis 52.
390 lines
21 KiB
C#
390 lines
21 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using NSubstitute;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
|
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors;
|
|
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests;
|
|
|
|
/// <summary>Task-10: native alarm subscribe + source-ref routing + unavailable signal.</summary>
|
|
public class DataConnectionActorAlarmTests : TestKit
|
|
{
|
|
private readonly ISiteHealthCollector _health = Substitute.For<ISiteHealthCollector>();
|
|
private readonly IDataConnectionFactory _factory = Substitute.For<IDataConnectionFactory>();
|
|
private readonly DataConnectionOptions _options = new()
|
|
{
|
|
ReconnectInterval = TimeSpan.FromMilliseconds(100),
|
|
TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200),
|
|
WriteTimeout = TimeSpan.FromSeconds(5)
|
|
};
|
|
|
|
private static NativeAlarmTransition Raise(string sourceRef, string sourceObj) =>
|
|
Raise(sourceRef, sourceObj, "AnalogLimit.Hi");
|
|
|
|
private static NativeAlarmTransition Raise(string sourceRef, string sourceObj, string typeName,
|
|
AlarmTransitionKind kind = AlarmTransitionKind.Raise) =>
|
|
new(sourceRef, sourceObj, typeName, kind,
|
|
new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 500),
|
|
"Process", "hi", "hi", "", "", null, DateTimeOffset.UtcNow, "92", "90");
|
|
|
|
private static (IDataConnection Adapter, Func<AlarmTransitionCallback?> Cb) BuildAlarmAdapter()
|
|
{
|
|
AlarmTransitionCallback? cb = null;
|
|
var adapter = Substitute.For<IDataConnection, IAlarmSubscribableConnection>();
|
|
adapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
((IAlarmSubscribableConnection)adapter)
|
|
.SubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<string?>(),
|
|
Arg.Do<AlarmTransitionCallback>(c => cb = c), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromResult("alarm-sub-1"));
|
|
return (adapter, () => cb);
|
|
}
|
|
|
|
[Fact]
|
|
public void SubscribeAlarms_RoutesTransitionToInstanceSubscriber()
|
|
{
|
|
AlarmTransitionCallback? cb = null;
|
|
var adapter = Substitute.For<IDataConnection, IAlarmSubscribableConnection>();
|
|
adapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
((IAlarmSubscribableConnection)adapter)
|
|
.SubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<string?>(),
|
|
Arg.Do<AlarmTransitionCallback>(c => cb = c), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromResult("alarm-sub-1"));
|
|
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
|
|
|
|
Assert.NotNull(cb);
|
|
cb!(Raise("Tank01.Hi", "Tank01"));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.SourceObjectReference == "Tank01");
|
|
}
|
|
|
|
[Fact]
|
|
public void SubscribeAlarms_OnNonAlarmCapableAdapter_RepliesFailure()
|
|
{
|
|
var adapter = Substitute.For<IDataConnection>(); // not IAlarmSubscribableConnection
|
|
adapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => !m.Success && m.ErrorMessage != null);
|
|
}
|
|
|
|
// ── M2.4 (#8): conditionFilter is now applied client-side in the actor ──
|
|
|
|
[Fact]
|
|
public void SubscribeAlarms_WithTypeFilter_DeliversOnlyMatchingTypes()
|
|
{
|
|
var (adapter, getCb) = BuildAlarmAdapter();
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01",
|
|
"AnalogLimit.Hi,AnalogLimit.Lo", DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
|
|
var cb = getCb();
|
|
Assert.NotNull(cb);
|
|
|
|
// Non-matching type is dropped (no message delivered).
|
|
cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi"));
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(250));
|
|
|
|
// Matching type is delivered.
|
|
cb!(Raise("Tank01.Hi", "Tank01", "AnalogLimit.Hi"));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.AlarmTypeName == "AnalogLimit.Hi");
|
|
}
|
|
|
|
[Fact]
|
|
public void SubscribeAlarms_WithNullFilter_DeliversAllTypes()
|
|
{
|
|
var (adapter, getCb) = BuildAlarmAdapter();
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
|
|
var cb = getCb();
|
|
Assert.NotNull(cb);
|
|
|
|
cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi"));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.AlarmTypeName == "AnalogLimit.HiHi");
|
|
cb!(Raise("Tank01.Lo", "Tank01", "DiscreteAlarm"));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.AlarmTypeName == "DiscreteAlarm");
|
|
}
|
|
|
|
[Fact]
|
|
public void SubscribeAlarms_FilterMatch_IgnoresCaseAndWhitespace()
|
|
{
|
|
var (adapter, getCb) = BuildAlarmAdapter();
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01",
|
|
" analoglimit.hi ,\tDISCRETEALARM ", DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
|
|
var cb = getCb();
|
|
Assert.NotNull(cb);
|
|
|
|
cb!(Raise("Tank01.Hi", "Tank01", "AnalogLimit.Hi")); // case differs from filter
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.AlarmTypeName == "AnalogLimit.Hi");
|
|
cb!(Raise("Tank01.Disc", "Tank01", "DiscreteAlarm"));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.AlarmTypeName == "DiscreteAlarm");
|
|
cb!(Raise("Tank01.HiHi", "Tank01", "AnalogLimit.HiHi")); // not listed
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(250));
|
|
}
|
|
|
|
[Fact]
|
|
public void SubscribeAlarms_GatewayWideFeed_IsFilteredClientSide()
|
|
{
|
|
// MxGateway has no server-side filter: its adapter opens ONE gateway-wide
|
|
// feed and the actor is the authoritative gate. A filtered source must
|
|
// only see its own matching types even though the feed carries everything.
|
|
var (adapter, getCb) = BuildAlarmAdapter();
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "MxGateway")));
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor",
|
|
"HighTemp", DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
|
|
var cb = getCb();
|
|
Assert.NotNull(cb);
|
|
|
|
// Gateway-wide feed delivers a transition for a different source object —
|
|
// dropped by source routing.
|
|
cb!(Raise("Pump.Fault", "Pump", "HighTemp"));
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
|
// Right source, wrong type — dropped by the client-side type gate.
|
|
cb!(Raise("Reactor.LowTemp", "Reactor", "LowTemp"));
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
|
// Right source, right type — delivered.
|
|
cb!(Raise("Reactor.HighTemp", "Reactor", "HighTemp"));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u =>
|
|
u.Transition.SourceObjectReference == "Reactor" && u.Transition.AlarmTypeName == "HighTemp");
|
|
}
|
|
|
|
[Fact]
|
|
public void SubscribeAlarms_WithFilter_StillForwardsSnapshotCompleteSentinel()
|
|
{
|
|
// The SnapshotComplete framing sentinel (empty AlarmTypeName) must survive
|
|
// the type gate so the NativeAlarmActor's snapshot swap can complete.
|
|
var (adapter, getCb) = BuildAlarmAdapter();
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Tank01",
|
|
"AnalogLimit.Hi", DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
|
|
var cb = getCb();
|
|
Assert.NotNull(cb);
|
|
|
|
// Snapshot-complete sentinel: empty source refs (the framing marker) but
|
|
// routed because every subscriber receives it; never type-filtered.
|
|
cb!(new NativeAlarmTransition("Tank01", "Tank01", "", AlarmTransitionKind.SnapshotComplete,
|
|
new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0),
|
|
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
|
|
}
|
|
|
|
[Fact]
|
|
public void SubscribeAlarms_RealEmptyRefSnapshotComplete_IsBroadcastToSpecificSource()
|
|
{
|
|
// Regression: the real MxGatewayAlarmMapper.SnapshotComplete() emits the
|
|
// sentinel with EMPTY SourceReference / SourceObjectReference. With a
|
|
// specific (prefix) source like "Reactor." the per-source match
|
|
// ("".StartsWith("Reactor.") == false) used to drop it, stranding the
|
|
// buffered snapshot in the NativeAlarmActor forever — statically-active
|
|
// conditions (delivered only in the snapshot) never surfaced. The sentinel
|
|
// must be broadcast to every subscriber so the snapshot swap completes.
|
|
var (adapter, getCb) = BuildAlarmAdapter();
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "MxGateway")));
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor.", null, DateTimeOffset.UtcNow));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
|
|
var cb = getCb();
|
|
Assert.NotNull(cb);
|
|
|
|
// Active alarm under the source arrives in the snapshot and routes by prefix.
|
|
cb!(new NativeAlarmTransition(
|
|
"Galaxy!Area.Reactor.HeartbeatTimeoutAlarm", "Reactor.HeartbeatTimeoutAlarm", "Syst",
|
|
AlarmTransitionKind.Snapshot,
|
|
new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 400),
|
|
"Area", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u =>
|
|
u.Transition.Kind == AlarmTransitionKind.Snapshot &&
|
|
u.Transition.SourceObjectReference == "Reactor.HeartbeatTimeoutAlarm");
|
|
|
|
// Real sentinel with EMPTY refs — must still reach the "Reactor." subscriber.
|
|
cb!(new NativeAlarmTransition("", "", "", AlarmTransitionKind.SnapshotComplete,
|
|
new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0),
|
|
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
|
|
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
|
|
}
|
|
|
|
// ── DataConnectionLayer-023: mid-flight alarm unsubscribe must release the adapter feed ──
|
|
|
|
[Fact]
|
|
public async Task DCL023_UnsubscribeDuringInFlightAlarmSubscribe_ReleasesAdapterFeed_AndKeepsStateClean()
|
|
{
|
|
// Regression test for DataConnectionLayer-023. Previously the native-alarm
|
|
// subscribe path never inherited the DCL-021 obsolete-completion guard: if the
|
|
// last subscriber unsubscribed while the adapter alarm subscribe was in flight,
|
|
// HandleUnsubscribeAlarms could not tear down the feed (the subscription id was
|
|
// not stored yet), and the late AlarmSubscribeCompleted unconditionally stored
|
|
// _alarmSubscriptionIds[source] = id — an orphaned device-side alarm feed that
|
|
// streamed transitions to nobody for the lifetime of the adapter. After the fix,
|
|
// HandleUnsubscribeAlarms clears the in-flight marker and the late completion is
|
|
// recognized as orphaned (no subscriber remains) and released via
|
|
// UnsubscribeAlarmsAsync, with no leaked subscription id retained.
|
|
var subscribeStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var releaseSubscribe = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
|
var adapter = Substitute.For<IDataConnection, IAlarmSubscribableConnection>();
|
|
adapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
var alarmable = (IAlarmSubscribableConnection)adapter;
|
|
// Park the adapter subscribe so UnsubscribeAlarmsRequest is processed first.
|
|
alarmable.SubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<string?>(),
|
|
Arg.Any<AlarmTransitionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ =>
|
|
{
|
|
subscribeStarted.TrySetResult();
|
|
return releaseSubscribe.Task;
|
|
});
|
|
alarmable.UnsubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
|
|
|
// Subscribe a source — block on the parked adapter subscribe.
|
|
actor.Tell(new SubscribeAlarmsRequest("c1", "instA", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
|
// The immediate SubscribeAlarmsResponse only arrives after HandleAlarmSubscribeCompleted;
|
|
// since the subscribe is parked, none is expected yet.
|
|
await subscribeStarted.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
// The last subscriber unsubscribes while the alarm subscribe is still in flight.
|
|
actor.Tell(new UnsubscribeAlarmsRequest("unsub-c1", "instA", "conn", "Tank01", DateTimeOffset.UtcNow));
|
|
await Task.Delay(150);
|
|
|
|
// Release the parked subscribe — AlarmSubscribeCompleted is now orphaned.
|
|
releaseSubscribe.SetResult("alarm-sub-orphan");
|
|
await Task.Delay(300);
|
|
|
|
// The orphaned, just-created adapter feed must be released exactly once.
|
|
await alarmable.Received(1).UnsubscribeAlarmsAsync(
|
|
Arg.Is<string>(s => s == "alarm-sub-orphan"), Arg.Any<CancellationToken>());
|
|
|
|
// No leaked subscription id must remain: a fresh subscribe to the same source
|
|
// must open a NEW adapter feed (proving _alarmSubscriptionIds was not populated
|
|
// with the orphaned id, which would have short-circuited the adapter subscribe).
|
|
var resubStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
alarmable.SubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<string?>(),
|
|
Arg.Any<AlarmTransitionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ =>
|
|
{
|
|
resubStarted.TrySetResult();
|
|
return Task.FromResult("alarm-sub-2");
|
|
});
|
|
|
|
actor.Tell(new SubscribeAlarmsRequest("c2", "instB", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
|
await resubStarted.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success, TimeSpan.FromSeconds(5));
|
|
|
|
// Two distinct adapter subscribes total (the orphaned one + the fresh one).
|
|
await alarmable.Received(2).SubscribeAlarmsAsync(
|
|
"Tank01", Arg.Any<string?>(), Arg.Any<AlarmTransitionCallback>(), Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
// ── DataConnectionLayer-029: a re-subscribe during an orphaned in-flight subscribe
|
|
// must not leak a duplicate adapter feed ──
|
|
|
|
[Fact]
|
|
public async Task DCL029_ResubscribeDuringOrphanedInFlightSubscribe_ReleasesDuplicateFeed_NoLeak()
|
|
{
|
|
// Regression test for DataConnectionLayer-029. The DCL-023 fix clears the in-flight
|
|
// marker on unsubscribe, which reopens a double-subscribe window: unsubscribe (last
|
|
// subscriber, subId not stored yet) → a fresh subscribe for the SAME source sees
|
|
// neither a stored id nor an in-flight marker, so it issues a SECOND adapter feed →
|
|
// both completions fire. The DCL-023 orphan guard does NOT trigger on either
|
|
// completion (the re-subscribe re-added the subscriber), so the alarm completion
|
|
// handler used to OVERWRITE _alarmSubscriptionIds with the second id — leaking the
|
|
// first feed (never unsubscribed, kept streaming). After DCL-029 the handler mirrors
|
|
// the tag-path re-check: when a feed is already stored, the redundant completion
|
|
// releases its just-created feed instead of overwriting + leaking.
|
|
var sub1Started = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var sub1Release = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var sub2Started = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var sub2Release = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
|
var adapter = Substitute.For<IDataConnection, IAlarmSubscribableConnection>();
|
|
adapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
var alarmable = (IAlarmSubscribableConnection)adapter;
|
|
|
|
var calls = 0;
|
|
alarmable.SubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<string?>(),
|
|
Arg.Any<AlarmTransitionCallback>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ =>
|
|
{
|
|
if (Interlocked.Increment(ref calls) == 1)
|
|
{
|
|
sub1Started.TrySetResult();
|
|
return sub1Release.Task;
|
|
}
|
|
sub2Started.TrySetResult();
|
|
return sub2Release.Task;
|
|
});
|
|
alarmable.UnsubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
|
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
|
|
|
// (1) Subscribe A — adapter subscribe #1 parks, in-flight={Tank01}.
|
|
actor.Tell(new SubscribeAlarmsRequest("c1", "instA", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
|
await sub1Started.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
// (2) Last subscriber unsubscribes while subscribe #1 is in flight — clears the
|
|
// in-flight marker (DCL-023); subId#1 is not stored yet so no teardown happens.
|
|
actor.Tell(new UnsubscribeAlarmsRequest("unsub-c1", "instA", "conn", "Tank01", DateTimeOffset.UtcNow));
|
|
await Task.Delay(150);
|
|
|
|
// (3) Fresh subscribe for the SAME source before #1 completes — neither a stored id
|
|
// nor an in-flight marker exists, so the actor issues a SECOND adapter subscribe.
|
|
actor.Tell(new SubscribeAlarmsRequest("c2", "instB", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
|
await sub2Started.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
// (4) Complete subscribe #1 → a subscriber exists again (B re-added), so the orphan
|
|
// guard does NOT fire and subId#1 is stored as the live feed.
|
|
sub1Release.SetResult("alarm-sub-1");
|
|
await Task.Delay(150);
|
|
|
|
// (5) Complete subscribe #2 → a feed is already stored, so this redundant completion
|
|
// releases its just-created feed (#2) instead of overwriting + leaking subId#1.
|
|
sub2Release.SetResult("alarm-sub-2");
|
|
await Task.Delay(300);
|
|
|
|
// The duplicate feed (#2) is released exactly once; the first feed (#1) is retained.
|
|
await alarmable.Received(1).UnsubscribeAlarmsAsync("alarm-sub-2", Arg.Any<CancellationToken>());
|
|
await alarmable.DidNotReceive().UnsubscribeAlarmsAsync("alarm-sub-1", Arg.Any<CancellationToken>());
|
|
|
|
// The retained feed (#1) is what a later unsubscribe tears down — proving subId#1,
|
|
// not the duplicate, is the id _alarmSubscriptionIds actually tracks (no leak).
|
|
actor.Tell(new UnsubscribeAlarmsRequest("unsub-c2", "instB", "conn", "Tank01", DateTimeOffset.UtcNow));
|
|
await Task.Delay(200);
|
|
await alarmable.Received(1).UnsubscribeAlarmsAsync("alarm-sub-1", Arg.Any<CancellationToken>());
|
|
}
|
|
}
|