diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorizedTagSubscriptionSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorizedTagSubscriptionSink.cs new file mode 100644 index 00000000..8bf720f7 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorizedTagSubscriptionSink.cs @@ -0,0 +1,48 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Server-side feed that keeps the continuous-historization recorder's set of historized tag refs +/// in step with the deployed address space. The AddressSpaceApplier (in the +/// OpcUaServer layer) calls this on every deploy with the add/remove DELTA of historized refs the +/// plan changes — the applier only ever sees a diff (an incremental/surgical apply carries a delta, +/// not the full set), so the recorder behind this seam keeps the full set and converges it. +/// +/// +/// The feed is non-blocking and best-effort: the production adapter is a single +/// fire-and-forget actor Tell, so it never blocks the OPC UA publish thread the applier runs +/// on, and the applier wraps the call so a faulting feed can never break a deploy. The applier +/// references this abstraction (not the Runtime recorder) so the OpcUaServer layer keeps no +/// dependency on Akka / the actor system — exactly mirroring how +/// decouples the EnsureTags provisioning hook. +/// +public interface IHistorizedTagSubscriptionSink +{ + /// + /// Converge the recorder's historized-ref interest by an add/remove delta. The refs are + /// resolved EXACTLY as the EnsureTags provisioning hook resolves them (a non-alarm historized + /// value variable's HistorianTagname override, else its driver-side FullName). + /// The recorder applies the delta to its tracked full set and re-registers mux interest only + /// when the set actually changes. + /// + /// Historized refs newly historized by this deploy (added/changed-into tags). + /// Historized refs no longer historized by this deploy (removed/changed-out tags). + void UpdateHistorizedRefs(IReadOnlyList added, IReadOnlyList removed); +} + +/// +/// No-op — the applier's safe default when continuous +/// historization is disabled or unwired (no recorder to feed). Every call is a no-op and never +/// touches an actor system. +/// +public sealed class NullHistorizedTagSubscriptionSink : IHistorizedTagSubscriptionSink +{ + /// The shared singleton instance. + public static readonly NullHistorizedTagSubscriptionSink Instance = new(); + + private NullHistorizedTagSubscriptionSink() { } + + /// + public void UpdateHistorizedRefs(IReadOnlyList added, IReadOnlyList removed) + { + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/AddressSpaceApplier.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/AddressSpaceApplier.cs index 8439636b..f2385dab 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/AddressSpaceApplier.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/AddressSpaceApplier.cs @@ -29,6 +29,7 @@ public sealed class AddressSpaceApplier private readonly IOpcUaAddressSpaceSink _sink; private readonly ILogger _logger; private readonly IHistorianProvisioning _provisioning; + private readonly IHistorizedTagSubscriptionSink _historizedSubscriptions; /// Initializes a new instance of the AddressSpaceApplier class. /// The OPC UA address space sink to apply changes to. @@ -41,16 +42,26 @@ public sealed class AddressSpaceApplier /// dispatched fire-and-forget off (which runs on the OPC UA publish actor's /// pinned thread), so it can never block or break a deploy. /// + /// + /// Optional continuous-historization feed — when an address space is (re)applied, the add/remove + /// delta of historized tag refs (resolved EXACTLY as the provisioning hook above) is pushed to the + /// recorder so its dependency-mux interest converges to the currently-historized set. Defaults + /// (a null argument) to the no-op , so every + /// existing call site compiles and behaves unchanged. The feed is a single non-blocking post off + /// and is wrapped so it can never block or break a deploy. + /// public AddressSpaceApplier( IOpcUaAddressSpaceSink sink, ILogger logger, - IHistorianProvisioning? provisioning = null) + IHistorianProvisioning? provisioning = null, + IHistorizedTagSubscriptionSink? historizedSubscriptions = null) { ArgumentNullException.ThrowIfNull(sink); ArgumentNullException.ThrowIfNull(logger); _sink = sink; _logger = logger; _provisioning = provisioning ?? NullHistorianProvisioning.Instance; + _historizedSubscriptions = historizedSubscriptions ?? NullHistorizedTagSubscriptionSink.Instance; } /// @@ -196,6 +207,11 @@ public sealed class AddressSpaceApplier // or break the deploy — Apply has already produced its outcome and returns it regardless. ProvisionHistorizedTags(plan); + // Alongside provisioning: feed the continuous-historization recorder the add/remove delta of + // historized tag refs this plan changes, so its dependency-mux interest converges to exactly the + // currently-historized set. Same non-blocking + throw-safe discipline as the provisioning hook. + FeedHistorizedRefs(plan); + return new AddressSpaceApplyOutcome(removedCount, addedCount, changedCount, rebuilt); } @@ -275,6 +291,77 @@ public sealed class AddressSpaceApplier } } + /// + /// Feed the continuous-historization recorder the add/remove delta of historized tag refs this + /// plan changes, so its dependency-mux interest converges to exactly the currently-historized set + /// after every deploy. The plan is a pure DIFF (an incremental/surgical apply carries a delta, not + /// the full set), so a delta feed is the only convergent design this hook can produce — the + /// recorder keeps the full set and re-registers it. Each ref is resolved EXACTLY as + /// / MaterialiseEquipmentTags resolve it + /// (override-or-FullName), and only non-alarm historized value variables count (native-alarm tags + /// materialise as Part 9 condition nodes, never historized value variables). Runs on the OPC UA + /// publish actor's pinned thread, so the only work here is building two small ref lists; the + /// downstream feed is a single non-blocking post behind the sink. The whole hook is wrapped so a + /// faulting feed can never block or break a deploy. + /// + /// The plan whose historized-ref changes drive the recorder's interest set. + private void FeedHistorizedRefs(AddressSpacePlan plan) + { + try + { + List? added = null; + List? removed = null; + + // Added historized value variables → new interest. + foreach (var tag in plan.AddedEquipmentTags) + { + if (HistorizedRef(tag) is { } r) (added ??= new List()).Add(r); + } + + // Removed historized value variables → drop interest. + foreach (var tag in plan.RemovedEquipmentTags) + { + if (HistorizedRef(tag) is { } r) (removed ??= new List()).Add(r); + } + + // Changed tags: the historized ref may have flipped on/off or been renamed (override/FullName + // change). Compare previous-vs-current resolved refs — an unchanged ref is a no-op. + foreach (var d in plan.ChangedEquipmentTags) + { + var prev = HistorizedRef(d.Previous); + var cur = HistorizedRef(d.Current); + if (string.Equals(prev, cur, StringComparison.Ordinal)) continue; + if (prev is not null) (removed ??= new List()).Add(prev); + if (cur is not null) (added ??= new List()).Add(cur); + } + + if (added is null && removed is null) return; + + _historizedSubscriptions.UpdateHistorizedRefs( + added ?? (IReadOnlyList)Array.Empty(), + removed ?? (IReadOnlyList)Array.Empty()); + } + catch (Exception ex) + { + // A synchronous fault in the feed (or in building the ref lists) must not break the deploy. + // Apply has already produced its outcome. + _logger.LogWarning(ex, "AddressSpaceApplier: historized-ref subscription feed faulted; deploy unaffected"); + } + } + + /// + /// Resolve the historized tag ref for EXACTLY as the provisioning hook / + /// materialiser do: a non-alarm historized value variable's HistorianTagname override, + /// else its driver-side FullName. Returns null when the tag is not a historized + /// value variable (not historized, or a native-alarm condition node). + /// + /// The equipment tag to resolve a historized ref for. + /// The resolved historian ref, or null when the tag is not a historized value variable. + private static string? HistorizedRef(EquipmentTagPlan tag) => + tag.IsHistorized && tag.Alarm is null + ? (string.IsNullOrWhiteSpace(tag.HistorianTagname) ? tag.FullName : tag.HistorianTagname) + : null; + private void SafeRebuild() { try diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ActorHistorizedTagSubscriptionSink.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ActorHistorizedTagSubscriptionSink.cs new file mode 100644 index 00000000..bb8a7c85 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ActorHistorizedTagSubscriptionSink.cs @@ -0,0 +1,40 @@ +using Akka.Actor; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; + +/// +/// adapter that bridges the address-space applier (in +/// the OpcUaServer layer) to the actor in Runtime — +/// keeping the applier free of any actor/Runtime dependency (it sees only the abstraction). Each +/// feed is a single non-blocking (a fire-and-forget mailbox post), so it +/// never blocks the OPC UA publish thread the applier runs on; the recorder converges its mux +/// interest from the delta off the actor thread. +/// +public sealed class ActorHistorizedTagSubscriptionSink : IHistorizedTagSubscriptionSink +{ + private readonly IActorRef _recorder; + + /// Initializes a new instance of the class. + /// The continuous-historization recorder actor to feed historized-ref deltas to. + public ActorHistorizedTagSubscriptionSink(IActorRef recorder) + { + ArgumentNullException.ThrowIfNull(recorder); + _recorder = recorder; + } + + /// + public void UpdateHistorizedRefs(IReadOnlyList added, IReadOnlyList removed) + { + ArgumentNullException.ThrowIfNull(added); + ArgumentNullException.ThrowIfNull(removed); + + if (added.Count == 0 && removed.Count == 0) + { + // Nothing to converge — skip the mailbox post entirely. + return; + } + + _recorder.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(added, removed)); + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs index 1af30213..c667fd26 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs @@ -63,6 +63,17 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers private GetStatus() { } } + /// + /// Converge the recorder's historized-ref interest by an add/remove DELTA — sent by the + /// address-space applier (via ) + /// after every deploy. The applier only ever sees a plan diff, so it feeds a delta; the recorder + /// holds the full set and re-registers it (see ). The refs are + /// the same ones the EnsureTags provisioning hook resolves (override-or-FullName). + /// + /// Refs newly historized by this deploy. + /// Refs no longer historized by this deploy. + public sealed record UpdateHistorizedRefs(IReadOnlyList Added, IReadOnlyList Removed); + /// A point-in-time snapshot of the recorder's counters. /// Un-acked entries currently held in the durable outbox. /// Lifetime count of values appended to the outbox. @@ -155,6 +166,7 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers _currentBackoff = _minBackoff; ReceiveAsync(OnValueChangedAsync); + Receive(OnUpdateHistorizedRefs); Receive(_ => OnDrainTick()); Receive(OnDrainResult); ReceiveAsync(async _ => @@ -243,6 +255,55 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers Self.Tell(DrainTick.Instance); } + /// + /// Converge the tracked historized-ref set by the supplied add/remove delta, then — only when + /// the set actually changed — re-register interest with the mux so its fan-out matches exactly + /// the currently-historized refs. + /// + /// Convergence (grounded against ). The mux's + /// is a full-REPLACE (it drops the + /// subscriber's prior ref set and installs the new one), and its + /// drops ALL of a subscriber's interest + /// (no per-ref form). So a single RegisterInterest carrying the full tracked set + /// converges the mux to exactly that set in one message (added refs become fanned, removed + /// refs stop), and an empty set is converged with one UnregisterInterest. The delta is + /// applied removed-then-added so a ref that appears in BOTH (one tag dropped it while another + /// adopted it in the same deploy) ends up registered. + /// + /// + /// Idempotent. A delta that produces no net change to the tracked set sends NOTHING to + /// the mux — no spurious register/unregister churn. + /// + /// + private void OnUpdateHistorizedRefs(UpdateHistorizedRefs msg) + { + var next = new HashSet(_historizedSet, StringComparer.Ordinal); + next.ExceptWith(msg.Removed); + next.UnionWith(msg.Added); + if (next.SetEquals(_historizedSet)) + { + // No net change — stay idempotent (no mux churn). + return; + } + + _historizedSet.Clear(); + _historizedSet.UnionWith(next); + + if (_historizedSet.Count == 0) + { + // The mux has no per-ref unregister; drop ALL interest in one message. + _dependencyMux.Tell(new DependencyMuxActor.UnregisterInterest(Self)); + } + else + { + // RegisterInterest REPLACES the prior set at the mux, so one message converges it exactly. + _dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(_historizedSet.ToList(), Self)); + } + + _log.Debug("ContinuousHistorization: historized-ref interest converged to {Count} ref(s).", + _historizedSet.Count); + } + private void OnDrainTick() { if (_draining) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index 1038ea6d..fb60a6bd 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -207,10 +207,58 @@ public static class ServiceCollectionExtensions var mux = system.ActorOf(DependencyMuxActor.Props(), DependencyMuxActorName); registry.Register(mux); + // Continuous-historization recorder — gated on ContinuousHistorization:Enabled AND the + // gateway-backed IHistorianValueWriter + the durable IHistorizationOutbox being registered + // (the Host registers both ONLY when historization is enabled and the ServerHistorian gateway + // is configured). The recorder taps the dependency mux's value fan-out, so it is spawned after + // (and fed) the same `mux` ref the DriverHostActor uses. It is spawned BEFORE the applier so + // the applier's historized-ref subscription sink can wrap this recorder's IActorRef and feed it + // the add/remove delta of historized refs on every deploy (closing the T18 ref-feed gap). + IActorRef? continuousRecorder = null; + var continuousOptions = resolver.GetService(); + if (continuousOptions is { Enabled: true }) + { + var valueWriter = resolver.GetService(); + var outbox = resolver.GetService(); + if (valueWriter is not null && outbox is not null) + { + // Initial ref set is EMPTY: the deployed address space (and thus the historized-ref + // set) is built later at deploy time, not here. The applier's per-deploy add/remove + // feed populates the recorder's interest from that point on. + continuousRecorder = system.ActorOf( + ContinuousHistorizationRecorder.Props( + dependencyMux: mux, + writer: valueWriter, + outbox: outbox, + historizedRefs: Array.Empty(), + drainBatchSize: continuousOptions.DrainBatchSize, + drainInterval: TimeSpan.FromSeconds(continuousOptions.DrainIntervalSeconds), + minBackoff: TimeSpan.FromSeconds(continuousOptions.MinBackoffSeconds), + maxBackoff: TimeSpan.FromSeconds(continuousOptions.MaxBackoffSeconds)), + ContinuousHistorizationRecorderActorName); + registry.Register(continuousRecorder); + } + else + { + loggerFactory.CreateLogger("ZB.MOM.WW.OtOpcUa.Runtime.ServiceCollectionExtensions") + .LogWarning("ContinuousHistorization is enabled but IHistorianValueWriter and/or IHistorizationOutbox are not registered; the recorder will not be spawned. Expected only in misconfigured deployments or test harnesses."); + } + } + + // Historized-ref subscription sink fed by the applier on every deploy. When the recorder was + // spawned, an adapter wraps its IActorRef (a non-blocking Tell of the add/remove delta); + // otherwise the Null no-op sink, so the applier behaves identically when historization is off. + IHistorizedTagSubscriptionSink historizedSubscriptions = continuousRecorder is not null + ? new ActorHistorizedTagSubscriptionSink(continuousRecorder) + : NullHistorizedTagSubscriptionSink.Instance; + // OPC UA publish actor — pinned dispatcher, owns the address-space side of the // pipeline. AddressSpaceApplier is constructed here so the actor + applier share the // same sink reference (when DeferredAddressSpaceSink swaps later, both see it). - var applier = new AddressSpaceApplier(addressSpaceSink, loggerFactory.CreateLogger()); + var applier = new AddressSpaceApplier( + addressSpaceSink, + loggerFactory.CreateLogger(), + historizedSubscriptions: historizedSubscriptions); var publishActor = system.ActorOf( OpcUaPublishActor.Props( sink: addressSpaceSink, @@ -247,46 +295,6 @@ public static class ServiceCollectionExtensions HistorianAdapterActor.Props(historianSink, roleInfo.LocalNode), HistorianAdapterActorName); registry.Register(historian); - - // Continuous-historization recorder — gated on ContinuousHistorization:Enabled AND the - // gateway-backed IHistorianValueWriter + the durable IHistorizationOutbox being registered - // (the Host registers both ONLY when historization is enabled and the ServerHistorian gateway - // is configured). The recorder taps the dependency mux's value fan-out, so it is spawned after - // (and fed) the same `mux` ref the DriverHostActor uses. - // - // HISTORIZED-REF SET — DOCUMENTED GAP (T18 minimal wiring). The deployed address space (and - // thus the set of historized tag refs) is built later at deploy time, not here at actor-spawn - // time, so there is no clean ref set to resolve in WithOtOpcUaRuntimeActors. Per the plan, T18 - // spawns the recorder with an EMPTY initial ref set and registers its key; populating the refs - // (a later SetHistorizedRefs feed driven off the deployed composition) is the remaining wiring - // and a tracked follow-on. With an empty set the recorder registers interest in nothing and - // historizes nothing until that feed lands — the actor + outbox + writer + meters are wired. - var continuousOptions = resolver.GetService(); - if (continuousOptions is { Enabled: true }) - { - var valueWriter = resolver.GetService(); - var outbox = resolver.GetService(); - if (valueWriter is not null && outbox is not null) - { - var recorder = system.ActorOf( - ContinuousHistorizationRecorder.Props( - dependencyMux: mux, - writer: valueWriter, - outbox: outbox, - historizedRefs: Array.Empty(), - drainBatchSize: continuousOptions.DrainBatchSize, - drainInterval: TimeSpan.FromSeconds(continuousOptions.DrainIntervalSeconds), - minBackoff: TimeSpan.FromSeconds(continuousOptions.MinBackoffSeconds), - maxBackoff: TimeSpan.FromSeconds(continuousOptions.MaxBackoffSeconds)), - ContinuousHistorizationRecorderActorName); - registry.Register(recorder); - } - else - { - loggerFactory.CreateLogger("ZB.MOM.WW.OtOpcUa.Runtime.ServiceCollectionExtensions") - .LogWarning("ContinuousHistorization is enabled but IHistorianValueWriter and/or IHistorizationOutbox are not registered; the recorder will not be spawned. Expected only in misconfigured deployments or test harnesses."); - } - } }); return builder; diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/AddressSpaceApplierProvisioningTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/AddressSpaceApplierProvisioningTests.cs index 7c89ac6f..ebc121f2 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/AddressSpaceApplierProvisioningTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/AddressSpaceApplierProvisioningTests.cs @@ -143,6 +143,101 @@ public sealed class AddressSpaceApplierProvisioningTests prov.Seen[0].TagName.ShouldBe("Pump1.Good"); } + /// Capturing double. Records the add/remove + /// ref deltas the applier feeds it. A flag simulates a faulting feed. + private sealed class CapturingSubscriptionSink : IHistorizedTagSubscriptionSink + { + /// Refs the applier fed as ADDED. + public List Added { get; } = new(); + + /// Refs the applier fed as REMOVED. + public List Removed { get; } = new(); + + /// When true, throws synchronously. + public bool Throw { get; init; } + + /// + public void UpdateHistorizedRefs(IReadOnlyList added, IReadOnlyList removed) + { + if (Throw) throw new InvalidOperationException("boom"); + Added.AddRange(added); + Removed.AddRange(removed); + } + } + + /// The feed pushes ONLY historized added refs, resolved (override-or-FullName) exactly like + /// the provisioning hook — non-historized tags never reach the recorder. + [Fact] + public void Apply_feeds_historized_added_refs_to_the_subscription_sink() + { + var sink = new CapturingSubscriptionSink(); + var applier = new AddressSpaceApplier( + NullOpcUaAddressSpaceSink.Instance, NullLogger.Instance, + historizedSubscriptions: sink); + + var plan = PlanWithAddedTags( + HistorizedTag(displayName: "Temp", historianName: "Pump1.Temp", dataType: "Float32"), + HistorizedTag(displayName: "Speed", historianName: null, dataType: "Int32", fullName: "40001"), + NonHistorizedTag(displayName: "Run", dataType: "Boolean")); + + applier.Apply(plan); + + sink.Added.ShouldBe(new[] { "Pump1.Temp", "40001" }, ignoreOrder: true); // override + FullName fallback + sink.Removed.ShouldBeEmpty(); + } + + /// Removed historized tags are fed as REMOVED refs; a changed tag whose historian override is + /// renamed feeds the old ref removed + the new ref added (the recorder converges the full set). + [Fact] + public void Apply_feeds_removed_and_renamed_historized_refs() + { + var sink = new CapturingSubscriptionSink(); + var applier = new AddressSpaceApplier( + NullOpcUaAddressSpaceSink.Instance, NullLogger.Instance, + historizedSubscriptions: sink); + + var removedTag = HistorizedTag(displayName: "Old", historianName: "Pump1.Old", dataType: "Float32"); + // Same TagId ("tag-T"), historian override renamed A → B (both historized) → remove A, add B. + var prev = HistorizedTag(displayName: "T", historianName: "Pump1.A", dataType: "Float32"); + var cur = HistorizedTag(displayName: "T", historianName: "Pump1.B", dataType: "Float32"); + + var plan = new AddressSpacePlan( + AddedEquipment: Array.Empty(), + RemovedEquipment: Array.Empty(), + ChangedEquipment: Array.Empty(), + AddedDrivers: Array.Empty(), + RemovedDrivers: Array.Empty(), + ChangedDrivers: Array.Empty(), + AddedAlarms: Array.Empty(), + RemovedAlarms: Array.Empty(), + ChangedAlarms: Array.Empty()) + { + RemovedEquipmentTags = new[] { removedTag }, + ChangedEquipmentTags = new[] { new AddressSpacePlan.EquipmentTagDelta(prev, cur) }, + }; + + applier.Apply(plan); + + sink.Added.ShouldBe(new[] { "Pump1.B" }, ignoreOrder: true); + sink.Removed.ShouldBe(new[] { "Pump1.Old", "Pump1.A" }, ignoreOrder: true); + } + + /// A synchronously-throwing subscription sink must NOT block or break the publish — the + /// address-space work still completes and returns its outcome. + [Fact] + public void Subscription_sink_throw_does_not_block_publish() + { + var applier = new AddressSpaceApplier( + NullOpcUaAddressSpaceSink.Instance, + NullLogger.Instance, + historizedSubscriptions: new CapturingSubscriptionSink { Throw = true }); + + var outcome = applier.Apply(PlanWithAddedTags( + HistorizedTag(displayName: "Temp", historianName: "Pump1.Temp", dataType: "Float32"))); + + outcome.RebuildCalled.ShouldBeTrue(); // address-space work still completed + } + private static EquipmentTagPlan HistorizedTag(string displayName, string? historianName, string dataType, string fullName = "ref") => new("tag-" + displayName, "eq-1", "drv", FolderPath: "", Name: displayName, DataType: dataType, FullName: fullName, Writable: false, Alarm: null, IsHistorized: true, HistorianTagname: historianName); diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs index f01d4053..8672598a 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs @@ -36,6 +36,76 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit Assert.Contains("Pump1.Temp", reg.TagRefs); } + [Fact] + public void UpdateHistorizedRefs_from_empty_registers_the_added_refs() + { + var mux = CreateTestProbe(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: Array.Empty())); + + // PreStart registers the (empty) initial set first. + var initial = mux.ExpectMsg(); + Assert.Empty(initial.TagRefs); + + rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs( + new[] { "Pump1.Temp", "Pump2.Flow" }, Array.Empty())); + + var reg = mux.ExpectMsg(); + Assert.Equal(new[] { "Pump1.Temp", "Pump2.Flow" }, reg.TagRefs.OrderBy(x => x, StringComparer.Ordinal)); + } + + [Fact] + public void UpdateHistorizedRefs_converges_adding_and_removing() + { + var mux = CreateTestProbe(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: Array.Empty())); + mux.ExpectMsg(); // PreStart (empty) + + rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs( + new[] { "A", "B" }, Array.Empty())); + var first = mux.ExpectMsg(); + Assert.Equal(new[] { "A", "B" }, first.TagRefs.OrderBy(x => x, StringComparer.Ordinal)); + + // Add "C", remove "A" → converge to {B, C}. The mux's RegisterInterest is a full-REPLACE, so the + // recorder sends ONE RegisterInterest carrying exactly the converged set (C registered, A gone). + rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(new[] { "C" }, new[] { "A" })); + var second = mux.ExpectMsg(); + Assert.Equal(new[] { "B", "C" }, second.TagRefs.OrderBy(x => x, StringComparer.Ordinal)); + Assert.DoesNotContain("A", second.TagRefs); + } + + [Fact] + public void UpdateHistorizedRefs_is_idempotent_when_the_set_is_unchanged() + { + var mux = CreateTestProbe(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: new[] { "A", "B" })); + var initial = mux.ExpectMsg(); // PreStart {A, B} + Assert.Equal(new[] { "A", "B" }, initial.TagRefs.OrderBy(x => x, StringComparer.Ordinal)); + + // A delta with no net effect (add an already-present ref, remove an absent one) → no mux churn. + rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(new[] { "A" }, new[] { "Z" })); + mux.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + } + + [Fact] + public void UpdateHistorizedRefs_draining_to_empty_unregisters_all_interest() + { + var mux = CreateTestProbe(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: new[] { "A" })); + mux.ExpectMsg(); // PreStart {A} + + // The mux has no per-ref unregister; an empty converged set drops ALL interest in one message. + rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(Array.Empty(), new[] { "A" })); + mux.ExpectMsg(); + } + [Fact] public async Task DependencyValueChanged_appends_to_outbox_then_drains_to_writer() {