fix(historian-gateway): historize under the historian name, not the mux ref, when HistorianTagname overrides (FU-3)
The continuous-historization recorder conflated two identifiers into one string: the dependency mux fans DependencyValueChanged keyed by the driver FullName (the mux ref), but a value must be historized under the resolved historian name (HistorianTagname override, else FullName). In the common no-override case the two are equal, so it worked; with an override they diverge and the recorder registered mux interest under a key the mux never fans — that tag's values were never captured (and, had they been, would have been written under the mux ref). Carry BOTH identifiers through the seam: a new HistorizedTagRef(MuxRef, HistorianName) record on IHistorizedTagSubscriptionSink. The applier resolves MuxRef = FullName and HistorianName = override-or-FullName. The recorder now keeps a muxRef->historianName map: it registers/filters mux interest by MuxRef but writes the outbox entry (and drains) under HistorianName. The convergence handler re-registers the mux only when the registered key-set changes, so an override-only rename (same FullName) updates the write target without mux churn. Tests: a divergent-override recorder test (interest by mux ref, value written under the override name, never the mux ref) + an override-rename no-churn test; the applier feed tests now assert the full (mux ref, historian name) pairs. Runtime 348/0, OpcUaServer 327/0; 0 warnings. Closes FU-3. Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
+38
-7
@@ -1,5 +1,35 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// A single historized tag the recorder tracks, carrying BOTH identifiers it needs — kept distinct
|
||||
/// because a <c>HistorianTagname</c> override makes them diverge:
|
||||
/// <list type="bullet">
|
||||
/// <item>
|
||||
/// <see cref="MuxRef"/> — the driver-published reference the per-node dependency mux fans its
|
||||
/// <c>DependencyValueChanged</c> by (the tag's driver-side <c>FullName</c>). The recorder
|
||||
/// registers mux interest and matches incoming values by THIS.
|
||||
/// </item>
|
||||
/// <item>
|
||||
/// <see cref="HistorianName"/> — the resolved historian tag name the value is written under (a
|
||||
/// non-alarm historized value variable's <c>HistorianTagname</c> override, else its
|
||||
/// <c>FullName</c>) — the SAME name the EnsureTags provisioning hook ensures.
|
||||
/// </item>
|
||||
/// </list>
|
||||
/// In the common (no-override) case the two are the same string; an override is the only case they
|
||||
/// diverge, and conflating them would silently drop that tag's values (interest registered under a
|
||||
/// key the mux never fans).
|
||||
/// </summary>
|
||||
/// <param name="MuxRef">The driver ref the mux fans by (and the key the recorder registers interest under).</param>
|
||||
/// <param name="HistorianName">The resolved historian tag name the value is historized under.</param>
|
||||
public sealed record HistorizedTagRef(string MuxRef, string HistorianName)
|
||||
{
|
||||
/// <summary>The no-override identity: the mux ref and historian name are the same string (the tag has
|
||||
/// no <c>HistorianTagname</c> override, so it historizes under its own driver <c>FullName</c>).</summary>
|
||||
/// <param name="reference">The driver ref that serves as both the mux key and the historian name.</param>
|
||||
/// <returns>A ref whose <see cref="MuxRef"/> and <see cref="HistorianName"/> are equal.</returns>
|
||||
public static HistorizedTagRef ForSelf(string reference) => new(reference, reference);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Server-side feed that keeps the continuous-historization recorder's set of historized tag refs
|
||||
/// in step with the deployed address space. The <c>AddressSpaceApplier</c> (in the
|
||||
@@ -18,15 +48,16 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
public interface IHistorizedTagSubscriptionSink
|
||||
{
|
||||
/// <summary>
|
||||
/// Converge the recorder's historized-ref interest by an add/remove delta. The refs are
|
||||
/// resolved EXACTLY as the EnsureTags provisioning hook resolves them (a non-alarm historized
|
||||
/// value variable's <c>HistorianTagname</c> override, else its driver-side <c>FullName</c>).
|
||||
/// The recorder applies the delta to its tracked full set and re-registers mux interest only
|
||||
/// when the set actually changes.
|
||||
/// Converge the recorder's historized-ref interest by an add/remove delta. Each ref carries both
|
||||
/// its <see cref="HistorizedTagRef.MuxRef"/> (the driver ref the mux fans by) and its
|
||||
/// <see cref="HistorizedTagRef.HistorianName"/> (the resolved override-or-FullName the value is
|
||||
/// historized under) — the same name the EnsureTags provisioning hook ensures. The recorder
|
||||
/// applies the delta to its tracked full set and re-registers mux interest (keyed by
|
||||
/// <see cref="HistorizedTagRef.MuxRef"/>) only when the registered key-set actually changes.
|
||||
/// </summary>
|
||||
/// <param name="added">Historized refs newly historized by this deploy (added/changed-into tags).</param>
|
||||
/// <param name="removed">Historized refs no longer historized by this deploy (removed/changed-out tags).</param>
|
||||
void UpdateHistorizedRefs(IReadOnlyList<string> added, IReadOnlyList<string> removed);
|
||||
void UpdateHistorizedRefs(IReadOnlyList<HistorizedTagRef> added, IReadOnlyList<HistorizedTagRef> removed);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -42,7 +73,7 @@ public sealed class NullHistorizedTagSubscriptionSink : IHistorizedTagSubscripti
|
||||
private NullHistorizedTagSubscriptionSink() { }
|
||||
|
||||
/// <inheritdoc />
|
||||
public void UpdateHistorizedRefs(IReadOnlyList<string> added, IReadOnlyList<string> removed)
|
||||
public void UpdateHistorizedRefs(IReadOnlyList<HistorizedTagRef> added, IReadOnlyList<HistorizedTagRef> removed)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,37 +309,38 @@ public sealed class AddressSpaceApplier
|
||||
{
|
||||
try
|
||||
{
|
||||
List<string>? added = null;
|
||||
List<string>? removed = null;
|
||||
List<HistorizedTagRef>? added = null;
|
||||
List<HistorizedTagRef>? removed = null;
|
||||
|
||||
// Added historized value variables → new interest.
|
||||
foreach (var tag in plan.AddedEquipmentTags)
|
||||
{
|
||||
if (HistorizedRef(tag) is { } r) (added ??= new List<string>()).Add(r);
|
||||
if (HistorizedRef(tag) is { } r) (added ??= new List<HistorizedTagRef>()).Add(r);
|
||||
}
|
||||
|
||||
// Removed historized value variables → drop interest.
|
||||
foreach (var tag in plan.RemovedEquipmentTags)
|
||||
{
|
||||
if (HistorizedRef(tag) is { } r) (removed ??= new List<string>()).Add(r);
|
||||
if (HistorizedRef(tag) is { } r) (removed ??= new List<HistorizedTagRef>()).Add(r);
|
||||
}
|
||||
|
||||
// Changed tags: the historized ref may have flipped on/off or been renamed (override/FullName
|
||||
// change). Compare previous-vs-current resolved refs — an unchanged ref is a no-op.
|
||||
// change). Compare previous-vs-current resolved ref PAIRS (record equality compares both the
|
||||
// mux ref and the historian name) — an unchanged pair is a no-op.
|
||||
foreach (var d in plan.ChangedEquipmentTags)
|
||||
{
|
||||
var prev = HistorizedRef(d.Previous);
|
||||
var cur = HistorizedRef(d.Current);
|
||||
if (string.Equals(prev, cur, StringComparison.Ordinal)) continue;
|
||||
if (prev is not null) (removed ??= new List<string>()).Add(prev);
|
||||
if (cur is not null) (added ??= new List<string>()).Add(cur);
|
||||
if (prev == cur) continue;
|
||||
if (prev is not null) (removed ??= new List<HistorizedTagRef>()).Add(prev);
|
||||
if (cur is not null) (added ??= new List<HistorizedTagRef>()).Add(cur);
|
||||
}
|
||||
|
||||
if (added is null && removed is null) return;
|
||||
|
||||
_historizedSubscriptions.UpdateHistorizedRefs(
|
||||
added ?? (IReadOnlyList<string>)Array.Empty<string>(),
|
||||
removed ?? (IReadOnlyList<string>)Array.Empty<string>());
|
||||
added ?? (IReadOnlyList<HistorizedTagRef>)Array.Empty<HistorizedTagRef>(),
|
||||
removed ?? (IReadOnlyList<HistorizedTagRef>)Array.Empty<HistorizedTagRef>());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -350,16 +351,21 @@ public sealed class AddressSpaceApplier
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Resolve the historized tag ref for <paramref name="tag"/> EXACTLY as the provisioning hook /
|
||||
/// materialiser do: a non-alarm historized value variable's <c>HistorianTagname</c> override,
|
||||
/// else its driver-side <c>FullName</c>. Returns <c>null</c> when the tag is not a historized
|
||||
/// Resolve the historized tag ref for <paramref name="tag"/> as a
|
||||
/// <see cref="HistorizedTagRef"/> carrying BOTH identifiers the recorder needs: the
|
||||
/// <c>MuxRef</c> = the driver-side <c>FullName</c> the dependency mux fans values by, and the
|
||||
/// <c>HistorianName</c> = the value the EnsureTags hook / materialiser write under (a non-alarm
|
||||
/// historized value variable's <c>HistorianTagname</c> override, else its <c>FullName</c>). The
|
||||
/// two diverge ONLY when an override is set. Returns <c>null</c> when the tag is not a historized
|
||||
/// value variable (not historized, or a native-alarm condition node).
|
||||
/// </summary>
|
||||
/// <param name="tag">The equipment tag to resolve a historized ref for.</param>
|
||||
/// <returns>The resolved historian ref, or <c>null</c> when the tag is not a historized value variable.</returns>
|
||||
private static string? HistorizedRef(EquipmentTagPlan tag) =>
|
||||
/// <returns>The resolved historized ref pair, or <c>null</c> when the tag is not a historized value variable.</returns>
|
||||
private static HistorizedTagRef? HistorizedRef(EquipmentTagPlan tag) =>
|
||||
tag.IsHistorized && tag.Alarm is null
|
||||
? (string.IsNullOrWhiteSpace(tag.HistorianTagname) ? tag.FullName : tag.HistorianTagname)
|
||||
? new HistorizedTagRef(
|
||||
tag.FullName,
|
||||
string.IsNullOrWhiteSpace(tag.HistorianTagname) ? tag.FullName : tag.HistorianTagname)
|
||||
: null;
|
||||
|
||||
private void SafeRebuild()
|
||||
|
||||
+1
-1
@@ -24,7 +24,7 @@ public sealed class ActorHistorizedTagSubscriptionSink : IHistorizedTagSubscript
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void UpdateHistorizedRefs(IReadOnlyList<string> added, IReadOnlyList<string> removed)
|
||||
public void UpdateHistorizedRefs(IReadOnlyList<HistorizedTagRef> added, IReadOnlyList<HistorizedTagRef> removed)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(added);
|
||||
ArgumentNullException.ThrowIfNull(removed);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
@@ -67,12 +68,14 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
||||
/// Converge the recorder's historized-ref interest by an add/remove DELTA — sent by the
|
||||
/// address-space applier (via <see cref="ZB.MOM.WW.OtOpcUa.Core.Abstractions.IHistorizedTagSubscriptionSink"/>)
|
||||
/// after every deploy. The applier only ever sees a plan diff, so it feeds a delta; the recorder
|
||||
/// holds the full set and re-registers it (see <see cref="OnUpdateHistorizedRefs"/>). The refs are
|
||||
/// the same ones the EnsureTags provisioning hook resolves (override-or-FullName).
|
||||
/// holds the full set and re-registers it (see <see cref="OnUpdateHistorizedRefs"/>). Each ref
|
||||
/// carries both its mux key (<see cref="HistorizedTagRef.MuxRef"/>, the driver ref the mux fans by)
|
||||
/// and the resolved historian name (<see cref="HistorizedTagRef.HistorianName"/>,
|
||||
/// override-or-FullName — the same name the EnsureTags provisioning hook ensures).
|
||||
/// </summary>
|
||||
/// <param name="Added">Refs newly historized by this deploy.</param>
|
||||
/// <param name="Removed">Refs no longer historized by this deploy.</param>
|
||||
public sealed record UpdateHistorizedRefs(IReadOnlyList<string> Added, IReadOnlyList<string> Removed);
|
||||
public sealed record UpdateHistorizedRefs(IReadOnlyList<HistorizedTagRef> Added, IReadOnlyList<HistorizedTagRef> Removed);
|
||||
|
||||
/// <summary>A point-in-time snapshot of the recorder's counters.</summary>
|
||||
/// <param name="QueuedDepth">Un-acked entries currently held in the durable outbox.</param>
|
||||
@@ -93,8 +96,12 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
||||
private readonly IActorRef _dependencyMux;
|
||||
private readonly IHistorianValueWriter _writer;
|
||||
private readonly IHistorizationOutbox _outbox;
|
||||
private readonly IReadOnlyList<string> _historizedRefs;
|
||||
private readonly HashSet<string> _historizedSet;
|
||||
|
||||
/// <summary>The tracked historized tags, keyed by mux ref (<see cref="HistorizedTagRef.MuxRef"/> — the
|
||||
/// driver ref the mux fans by, and the key mux interest is registered under) → the historian name the
|
||||
/// value is written under (<see cref="HistorizedTagRef.HistorianName"/>). A <c>HistorianTagname</c>
|
||||
/// override is the only case the two diverge; in the common case they are equal.</summary>
|
||||
private readonly Dictionary<string, string> _refMap;
|
||||
private readonly int _drainBatchSize;
|
||||
private readonly TimeSpan _drainInterval;
|
||||
private readonly TimeSpan _minBackoff;
|
||||
@@ -157,8 +164,15 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
||||
_dependencyMux = dependencyMux ?? throw new ArgumentNullException(nameof(dependencyMux));
|
||||
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
|
||||
_outbox = outbox ?? throw new ArgumentNullException(nameof(outbox));
|
||||
_historizedRefs = historizedRefs ?? throw new ArgumentNullException(nameof(historizedRefs));
|
||||
_historizedSet = new HashSet<string>(_historizedRefs, StringComparer.Ordinal);
|
||||
ArgumentNullException.ThrowIfNull(historizedRefs);
|
||||
// The ctor seed is the no-override identity (mux ref == historian name); production seeds an EMPTY
|
||||
// set and converges via UpdateHistorizedRefs on each deploy (which carries diverging override pairs).
|
||||
_refMap = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
foreach (string r in historizedRefs)
|
||||
{
|
||||
_refMap[r] = r;
|
||||
}
|
||||
|
||||
_drainBatchSize = drainBatchSize > 0 ? drainBatchSize : 64;
|
||||
_drainInterval = drainInterval is { } di && di > TimeSpan.Zero ? di : TimeSpan.FromSeconds(2);
|
||||
_minBackoff = minBackoff is { } mb && mb > TimeSpan.Zero ? mb : TimeSpan.FromSeconds(1);
|
||||
@@ -182,8 +196,9 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
||||
/// <inheritdoc />
|
||||
protected override void PreStart()
|
||||
{
|
||||
// Register interest for the historized refs so the mux fans their DependencyValueChanged to us.
|
||||
_dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(_historizedRefs, Self));
|
||||
// Register interest by mux ref (the key the mux fans DependencyValueChanged by) so it fans those
|
||||
// tags' values to us. Historian-name overrides are tracked separately for the write side.
|
||||
_dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(_refMap.Keys.ToList(), Self));
|
||||
// Seed the steady drain cadence; appends also nudge a prompt drain (see OnValueChangedAsync).
|
||||
Timers.StartSingleTimer(DrainTimerKey, DrainTick.Instance, _drainInterval);
|
||||
base.PreStart();
|
||||
@@ -202,8 +217,11 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
||||
|
||||
private async Task OnValueChangedAsync(VirtualTagActor.DependencyValueChanged msg)
|
||||
{
|
||||
// Defensive: only historize refs we registered interest for (the mux already scopes to these).
|
||||
if (!_historizedSet.Contains(msg.TagId))
|
||||
// The mux fans values keyed by the driver ref (msg.TagId == the mux ref). Only historize refs we
|
||||
// registered interest for, and resolve the HISTORIAN NAME to write under — which differs from the
|
||||
// mux ref when a HistorianTagname override is set. (The mux already scopes to our registered refs;
|
||||
// this is also the defensive filter.)
|
||||
if (!_refMap.TryGetValue(msg.TagId, out string? historianName))
|
||||
{
|
||||
return;
|
||||
}
|
||||
@@ -217,9 +235,11 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
||||
return;
|
||||
}
|
||||
|
||||
// Record under the historian name (override-or-FullName), NOT the mux ref — the outbox entry +
|
||||
// the drain's WriteLiveValues target the historian tag the EnsureTags hook provisioned.
|
||||
var entry = new HistorizationOutboxEntry(
|
||||
Guid.NewGuid(),
|
||||
msg.TagId,
|
||||
historianName,
|
||||
numeric,
|
||||
GoodQuality,
|
||||
DateTime.SpecifyKind(msg.TimestampUtc, DateTimeKind.Utc));
|
||||
@@ -277,31 +297,44 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
||||
/// </summary>
|
||||
private void OnUpdateHistorizedRefs(UpdateHistorizedRefs msg)
|
||||
{
|
||||
var next = new HashSet<string>(_historizedSet, StringComparer.Ordinal);
|
||||
next.ExceptWith(msg.Removed);
|
||||
next.UnionWith(msg.Added);
|
||||
if (next.SetEquals(_historizedSet))
|
||||
// Snapshot the registered mux key-set BEFORE applying the delta, so we re-register only when the
|
||||
// set the mux fans by actually changes (an override-only rename updates the WRITE target but not
|
||||
// which refs the mux fans — no mux churn for those).
|
||||
var beforeKeys = new HashSet<string>(_refMap.Keys, StringComparer.Ordinal);
|
||||
|
||||
// Apply removed-then-added so a ref present in BOTH (a HistorianTagname override changed while the
|
||||
// mux ref / FullName stayed the same) ends mapped to its NEW historian name.
|
||||
foreach (HistorizedTagRef r in msg.Removed)
|
||||
{
|
||||
// No net change — stay idempotent (no mux churn).
|
||||
_refMap.Remove(r.MuxRef);
|
||||
}
|
||||
|
||||
foreach (HistorizedTagRef r in msg.Added)
|
||||
{
|
||||
_refMap[r.MuxRef] = r.HistorianName;
|
||||
}
|
||||
|
||||
if (_refMap.Keys.ToHashSet(StringComparer.Ordinal).SetEquals(beforeKeys))
|
||||
{
|
||||
// Mux key-set unchanged (no-op delta, or an override-only rename) — the map (write targets) is
|
||||
// already updated; stay idempotent at the mux (no register/unregister churn).
|
||||
return;
|
||||
}
|
||||
|
||||
_historizedSet.Clear();
|
||||
_historizedSet.UnionWith(next);
|
||||
|
||||
if (_historizedSet.Count == 0)
|
||||
if (_refMap.Count == 0)
|
||||
{
|
||||
// The mux has no per-ref unregister; drop ALL interest in one message.
|
||||
_dependencyMux.Tell(new DependencyMuxActor.UnregisterInterest(Self));
|
||||
}
|
||||
else
|
||||
{
|
||||
// RegisterInterest REPLACES the prior set at the mux, so one message converges it exactly.
|
||||
_dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(_historizedSet.ToList(), Self));
|
||||
// RegisterInterest REPLACES the prior set at the mux, so one message (carrying the mux keys)
|
||||
// converges it exactly.
|
||||
_dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(_refMap.Keys.ToList(), Self));
|
||||
}
|
||||
|
||||
_log.Debug("ContinuousHistorization: historized-ref interest converged to {Count} ref(s).",
|
||||
_historizedSet.Count);
|
||||
_refMap.Count);
|
||||
}
|
||||
|
||||
private void OnDrainTick()
|
||||
|
||||
+20
-8
@@ -147,17 +147,17 @@ public sealed class AddressSpaceApplierProvisioningTests
|
||||
/// ref deltas the applier feeds it. A <see cref="Throw"/> flag simulates a faulting feed.</summary>
|
||||
private sealed class CapturingSubscriptionSink : IHistorizedTagSubscriptionSink
|
||||
{
|
||||
/// <summary>Refs the applier fed as ADDED.</summary>
|
||||
public List<string> Added { get; } = new();
|
||||
/// <summary>Ref pairs the applier fed as ADDED (mux ref + historian name).</summary>
|
||||
public List<HistorizedTagRef> Added { get; } = new();
|
||||
|
||||
/// <summary>Refs the applier fed as REMOVED.</summary>
|
||||
public List<string> Removed { get; } = new();
|
||||
/// <summary>Ref pairs the applier fed as REMOVED.</summary>
|
||||
public List<HistorizedTagRef> Removed { get; } = new();
|
||||
|
||||
/// <summary>When true, <see cref="UpdateHistorizedRefs"/> throws synchronously.</summary>
|
||||
public bool Throw { get; init; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public void UpdateHistorizedRefs(IReadOnlyList<string> added, IReadOnlyList<string> removed)
|
||||
public void UpdateHistorizedRefs(IReadOnlyList<HistorizedTagRef> added, IReadOnlyList<HistorizedTagRef> removed)
|
||||
{
|
||||
if (Throw) throw new InvalidOperationException("boom");
|
||||
Added.AddRange(added);
|
||||
@@ -182,7 +182,13 @@ public sealed class AddressSpaceApplierProvisioningTests
|
||||
|
||||
applier.Apply(plan);
|
||||
|
||||
sink.Added.ShouldBe(new[] { "Pump1.Temp", "40001" }, ignoreOrder: true); // override + FullName fallback
|
||||
// Each ref carries BOTH identifiers: the override tag feeds (mux ref = FullName "ref", historian
|
||||
// name = override "Pump1.Temp"); the no-override tag feeds (FullName "40001" as both).
|
||||
sink.Added.ShouldBe(new[]
|
||||
{
|
||||
new HistorizedTagRef("ref", "Pump1.Temp"),
|
||||
new HistorizedTagRef("40001", "40001"),
|
||||
}, ignoreOrder: true);
|
||||
sink.Removed.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
@@ -218,8 +224,14 @@ public sealed class AddressSpaceApplierProvisioningTests
|
||||
|
||||
applier.Apply(plan);
|
||||
|
||||
sink.Added.ShouldBe(new[] { "Pump1.B" }, ignoreOrder: true);
|
||||
sink.Removed.ShouldBe(new[] { "Pump1.Old", "Pump1.A" }, ignoreOrder: true);
|
||||
// All three tags default FullName "ref" (the mux ref); the override rename changes only the
|
||||
// historian name, so the changed tag feeds removed (ref, Pump1.A) + added (ref, Pump1.B).
|
||||
sink.Added.ShouldBe(new[] { new HistorizedTagRef("ref", "Pump1.B") }, ignoreOrder: true);
|
||||
sink.Removed.ShouldBe(new[]
|
||||
{
|
||||
new HistorizedTagRef("ref", "Pump1.Old"),
|
||||
new HistorizedTagRef("ref", "Pump1.A"),
|
||||
}, ignoreOrder: true);
|
||||
}
|
||||
|
||||
/// <summary>A synchronously-throwing subscription sink must NOT block or break the publish — the
|
||||
|
||||
+65
-5
@@ -1,6 +1,7 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
@@ -49,7 +50,8 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit
|
||||
Assert.Empty(initial.TagRefs);
|
||||
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
|
||||
new[] { "Pump1.Temp", "Pump2.Flow" }, Array.Empty<string>()));
|
||||
new[] { HistorizedTagRef.ForSelf("Pump1.Temp"), HistorizedTagRef.ForSelf("Pump2.Flow") },
|
||||
Array.Empty<HistorizedTagRef>()));
|
||||
|
||||
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
|
||||
Assert.Equal(new[] { "Pump1.Temp", "Pump2.Flow" }, reg.TagRefs.OrderBy(x => x, StringComparer.Ordinal));
|
||||
@@ -65,13 +67,14 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit
|
||||
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(); // PreStart (empty)
|
||||
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
|
||||
new[] { "A", "B" }, Array.Empty<string>()));
|
||||
new[] { HistorizedTagRef.ForSelf("A"), HistorizedTagRef.ForSelf("B") }, Array.Empty<HistorizedTagRef>()));
|
||||
var first = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
|
||||
Assert.Equal(new[] { "A", "B" }, first.TagRefs.OrderBy(x => x, StringComparer.Ordinal));
|
||||
|
||||
// Add "C", remove "A" → converge to {B, C}. The mux's RegisterInterest is a full-REPLACE, so the
|
||||
// recorder sends ONE RegisterInterest carrying exactly the converged set (C registered, A gone).
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(new[] { "C" }, new[] { "A" }));
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
|
||||
new[] { HistorizedTagRef.ForSelf("C") }, new[] { HistorizedTagRef.ForSelf("A") }));
|
||||
var second = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
|
||||
Assert.Equal(new[] { "B", "C" }, second.TagRefs.OrderBy(x => x, StringComparer.Ordinal));
|
||||
Assert.DoesNotContain("A", second.TagRefs);
|
||||
@@ -88,7 +91,8 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit
|
||||
Assert.Equal(new[] { "A", "B" }, initial.TagRefs.OrderBy(x => x, StringComparer.Ordinal));
|
||||
|
||||
// A delta with no net effect (add an already-present ref, remove an absent one) → no mux churn.
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(new[] { "A" }, new[] { "Z" }));
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
|
||||
new[] { HistorizedTagRef.ForSelf("A") }, new[] { HistorizedTagRef.ForSelf("Z") }));
|
||||
mux.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||
}
|
||||
|
||||
@@ -102,10 +106,66 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit
|
||||
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(); // PreStart {A}
|
||||
|
||||
// The mux has no per-ref unregister; an empty converged set drops ALL interest in one message.
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(Array.Empty<string>(), new[] { "A" }));
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
|
||||
Array.Empty<HistorizedTagRef>(), new[] { HistorizedTagRef.ForSelf("A") }));
|
||||
mux.ExpectMsg<DependencyMuxActor.UnregisterInterest>();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task HistorianTagname_override_registers_by_mux_ref_but_writes_under_historian_name()
|
||||
{
|
||||
// FU-3: a HistorianTagname override makes the mux ref (driver FullName the mux fans by) DIVERGE
|
||||
// from the historian name (the value is stored under). Interest MUST be registered under the mux
|
||||
// ref (else the mux never fans this tag), and the value MUST be written under the override name.
|
||||
var mux = CreateTestProbe();
|
||||
var writer = new FakeValueWriter { Succeed = true };
|
||||
var outbox = new InMemoryOutbox();
|
||||
|
||||
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
|
||||
mux.Ref, writer, outbox, historizedRefs: Array.Empty<string>()));
|
||||
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(); // PreStart (empty)
|
||||
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
|
||||
new[] { new HistorizedTagRef("Area/Line/Pump1", "HIST.Pump1.Temp") },
|
||||
Array.Empty<HistorizedTagRef>()));
|
||||
|
||||
// Interest is registered under the MUX REF (the driver ref the mux keys fan-out by), NOT the name.
|
||||
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
|
||||
Assert.Equal(new[] { "Area/Line/Pump1" }, reg.TagRefs);
|
||||
|
||||
// The mux fans the value keyed by the driver ref; the recorder must historize it under the OVERRIDE.
|
||||
rec.Tell(new VirtualTagActor.DependencyValueChanged("Area/Line/Pump1", 55.0, DateTime.UtcNow));
|
||||
|
||||
await AwaitAssertAsync(() =>
|
||||
Assert.Contains(writer.Snapshot(), w => w.Tag == "HIST.Pump1.Temp" && w.Value == 55.0));
|
||||
// ...and NEVER under the mux ref (the pre-fix behaviour wrote under msg.TagId == the mux ref).
|
||||
Assert.DoesNotContain(writer.Snapshot(), w => w.Tag == "Area/Line/Pump1");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Override_rename_with_same_mux_ref_updates_target_without_mux_churn()
|
||||
{
|
||||
// An override changing while the driver FullName (mux ref) stays the same: removed+added carry the
|
||||
// SAME mux ref with different historian names. The recorder must update the write target but NOT
|
||||
// re-register the mux (the fanned key-set is unchanged).
|
||||
var mux = CreateTestProbe();
|
||||
|
||||
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
|
||||
mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: Array.Empty<string>()));
|
||||
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(); // PreStart (empty)
|
||||
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
|
||||
new[] { new HistorizedTagRef("Area/Pump1", "HIST.Old") }, Array.Empty<HistorizedTagRef>()));
|
||||
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
|
||||
Assert.Equal(new[] { "Area/Pump1" }, reg.TagRefs);
|
||||
|
||||
// Override renamed: same mux ref, new historian name. Key-set unchanged → no further mux message.
|
||||
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
|
||||
new[] { new HistorizedTagRef("Area/Pump1", "HIST.New") },
|
||||
new[] { new HistorizedTagRef("Area/Pump1", "HIST.Old") }));
|
||||
mux.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DependencyValueChanged_appends_to_outbox_then_drains_to_writer()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user