diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs index 13ad3092..8c589e6f 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs @@ -98,10 +98,13 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers private readonly IHistorizationOutbox _outbox; /// The tracked historized tags, keyed by mux ref ( — the - /// driver ref the mux fans by, and the key mux interest is registered under) → the historian name the - /// value is written under (). A HistorianTagname - /// override is the only case the two diverge; in the common case they are equal. - private readonly Dictionary _refMap; + /// driver ref the mux fans by, and the key mux interest is registered under) → the SET of historian + /// names that ref's value is written under (). It is a SET, + /// not a single name, because one driver ref can back SEVERAL historized equipment tags via aliasing + /// (identical machines sharing a register), each with its own HistorianTagname — the mux fans + /// one value per ref and every aliased historian tag must receive it. In the common case the set holds + /// a single name equal to the mux ref (no override, no alias). + private readonly Dictionary> _refMap; private readonly int _drainBatchSize; private readonly TimeSpan _drainInterval; private readonly TimeSpan _minBackoff; @@ -167,10 +170,10 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers ArgumentNullException.ThrowIfNull(historizedRefs); // The ctor seed is the no-override identity (mux ref == historian name); production seeds an EMPTY // set and converges via UpdateHistorizedRefs on each deploy (which carries diverging override pairs). - _refMap = new Dictionary(StringComparer.Ordinal); + _refMap = new Dictionary>(StringComparer.Ordinal); foreach (string r in historizedRefs) { - _refMap[r] = r; + AddRef(r, r); } _drainBatchSize = drainBatchSize > 0 ? drainBatchSize : 64; @@ -218,10 +221,11 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers private async Task OnValueChangedAsync(VirtualTagActor.DependencyValueChanged msg) { // The mux fans values keyed by the driver ref (msg.TagId == the mux ref). Only historize refs we - // registered interest for, and resolve the HISTORIAN NAME to write under — which differs from the - // mux ref when a HistorianTagname override is set. (The mux already scopes to our registered refs; - // this is also the defensive filter.) - if (!_refMap.TryGetValue(msg.TagId, out string? historianName)) + // registered interest for, and resolve the HISTORIAN NAME(s) to write under — which differ from the + // mux ref under a HistorianTagname override, and there can be MORE THAN ONE when several historized + // equipment tags alias the same driver ref. (The mux already scopes to our registered refs; this is + // also the defensive filter.) + if (!_refMap.TryGetValue(msg.TagId, out HashSet? historianNames) || historianNames.Count == 0) { return; } @@ -235,40 +239,49 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers return; } - // Record under the historian name (override-or-FullName), NOT the mux ref — the outbox entry + - // the drain's WriteLiveValues target the historian tag the EnsureTags hook provisioned. - var entry = new HistorizationOutboxEntry( - Guid.NewGuid(), - historianName, - numeric, - GoodQuality, - DateTime.SpecifyKind(msg.TimestampUtc, DateTimeKind.Utc)); + var timestampUtc = DateTime.SpecifyKind(msg.TimestampUtc, DateTimeKind.Utc); - // Durable boundary: append (awaited so appends stay serialized) BEFORE the value is considered - // captured. The outbox drops the oldest entry on capacity overflow and tracks DroppedCount. - try + // One outbox entry PER historian name: a single mux fan-out must reach every aliased historized tag + // (each its own historian name), not just one — collapsing to one name here would silently drop the + // rest. Snapshot the names first: although the mailbox is suspended across the awaits below (so the + // set cannot be mutated concurrently), iterating a snapshot is robust and clearer. + foreach (string historianName in historianNames.ToList()) { - await _outbox.AppendAsync(entry, _lifetime.Token).ConfigureAwait(false); - } - catch (OperationCanceledException) when (_lifetime.IsCancellationRequested) - { - // Normal shutdown raced the append — not a fault. Drop quietly. - return; - } - catch (Exception ex) - { - // A durable-boundary failure (e.g. a PerEntry fsync hitting disk-full / I/O error) must NEVER - // propagate out of the handler — that would trip Akka supervision into a restart, and under a - // persistent disk fault the actor would restart-loop (re-register → next value → append fails - // → restart → …). Mirror the drain path's catch-all: meter the failure (category only, no - // value content), drop this value, and stay alive. Do NOT record it or nudge the drain. - _outboxAppendFailures++; - _log.Warning("ContinuousHistorization: outbox append failed ({Exception}); value dropped.", - ex.GetType().Name); - return; - } + // Record under the historian name (override-or-FullName), NOT the mux ref — the outbox entry + + // the drain's WriteLiveValues target the historian tag the EnsureTags hook provisioned. + var entry = new HistorizationOutboxEntry( + Guid.NewGuid(), + historianName, + numeric, + GoodQuality, + timestampUtc); - _totalRecorded++; + // Durable boundary: append (awaited so appends stay serialized) BEFORE the value is considered + // captured. The outbox drops the oldest entry on capacity overflow and tracks DroppedCount. + try + { + await _outbox.AppendAsync(entry, _lifetime.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (_lifetime.IsCancellationRequested) + { + // Normal shutdown raced the append — not a fault. Drop quietly (abandon the rest too). + return; + } + catch (Exception ex) + { + // A durable-boundary failure (e.g. a PerEntry fsync hitting disk-full / I/O error) must + // NEVER propagate out of the handler — that would trip Akka supervision into a restart, and + // under a persistent disk fault the actor would restart-loop (re-register → next value → + // append fails → restart → …). Mirror the drain path's catch-all: meter the failure + // (category only, no value content), drop THIS name's value, and continue with the rest. + _outboxAppendFailures++; + _log.Warning("ContinuousHistorization: outbox append failed ({Exception}); value dropped.", + ex.GetType().Name); + continue; + } + + _totalRecorded++; + } // Nudge a prompt drain attempt; the DrainTick handler de-dups (already draining) and honours // any active backoff window, so this never bypasses a failure cooldown. @@ -303,21 +316,31 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers var beforeKeys = new HashSet(_refMap.Keys, StringComparer.Ordinal); // Apply removed-then-added so a ref present in BOTH (a HistorianTagname override changed while the - // mux ref / FullName stayed the same) ends mapped to its NEW historian name. + // mux ref / FullName stayed the same) ends mapped to its NEW historian name. Each (muxRef, name) + // pair is removed/added INDIVIDUALLY from the per-ref name set, so aliased tags sharing a mux ref + // are converged independently (removing one alias's name leaves the others fanning). foreach (HistorizedTagRef r in msg.Removed) { - _refMap.Remove(r.MuxRef); + if (_refMap.TryGetValue(r.MuxRef, out HashSet? names)) + { + names.Remove(r.HistorianName); + if (names.Count == 0) + { + _refMap.Remove(r.MuxRef); + } + } } foreach (HistorizedTagRef r in msg.Added) { - _refMap[r.MuxRef] = r.HistorianName; + AddRef(r.MuxRef, r.HistorianName); } if (_refMap.Keys.ToHashSet(StringComparer.Ordinal).SetEquals(beforeKeys)) { - // Mux key-set unchanged (no-op delta, or an override-only rename) — the map (write targets) is - // already updated; stay idempotent at the mux (no register/unregister churn). + // Mux key-set unchanged (no-op delta, an override-only rename, or an alias add/remove that + // leaves the ref still fanned) — the per-ref name sets (write targets) are already updated; + // stay idempotent at the mux (no register/unregister churn). return; } @@ -333,10 +356,24 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers _dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(_refMap.Keys.ToList(), Self)); } - _log.Debug("ContinuousHistorization: historized-ref interest converged to {Count} ref(s).", + _log.Debug("ContinuousHistorization: historized-ref interest converged to {Count} mux ref(s).", _refMap.Count); } + /// Adds a (mux ref → historian name) mapping to the tracked set, creating the per-ref + /// name set on first use. Idempotent — adding the same pair twice is a no-op (a + /// add). + private void AddRef(string muxRef, string historianName) + { + if (!_refMap.TryGetValue(muxRef, out HashSet? names)) + { + names = new HashSet(StringComparer.Ordinal); + _refMap[muxRef] = names; + } + + names.Add(historianName); + } + private void OnDrainTick() { if (_draining) diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs index d8621346..c5bed574 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs @@ -143,15 +143,16 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit } [Fact] - public void Override_rename_with_same_mux_ref_updates_target_without_mux_churn() + public async Task Override_rename_with_same_mux_ref_updates_target_without_mux_churn() { // An override changing while the driver FullName (mux ref) stays the same: removed+added carry the // SAME mux ref with different historian names. The recorder must update the write target but NOT // re-register the mux (the fanned key-set is unchanged). var mux = CreateTestProbe(); + var writer = new FakeValueWriter { Succeed = true }; var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( - mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: Array.Empty())); + mux.Ref, writer, new InMemoryOutbox(), historizedRefs: Array.Empty())); mux.ExpectMsg(); // PreStart (empty) rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs( @@ -164,6 +165,75 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit new[] { new HistorizedTagRef("Area/Pump1", "HIST.New") }, new[] { new HistorizedTagRef("Area/Pump1", "HIST.Old") })); mux.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + + // ...and the WRITE TARGET actually updated: a value now historizes under HIST.New, never HIST.Old. + rec.Tell(new VirtualTagActor.DependencyValueChanged("Area/Pump1", 9.0, DateTime.UtcNow)); + await AwaitAssertAsync(() => + Assert.Contains(writer.Snapshot(), w => w.Tag == "HIST.New" && w.Value == 9.0)); + Assert.DoesNotContain(writer.Snapshot(), w => w.Tag == "HIST.Old"); + } + + [Fact] + public async Task Aliased_refs_with_distinct_overrides_each_get_the_value() + { + // Two historized tags share ONE driver ref ("Area/Pump1" — identical machines sharing a register), + // each with its OWN HistorianTagname. The mux fans a SINGLE value for the shared ref; BOTH historian + // tags must be written. A muxRef→single-name map would silently drop one (the FU-3 review's Critical). + var mux = CreateTestProbe(); + var writer = new FakeValueWriter { Succeed = true }; + var outbox = new InMemoryOutbox(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, writer, outbox, historizedRefs: Array.Empty())); + mux.ExpectMsg(); // PreStart (empty) + + rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs( + new[] + { + new HistorizedTagRef("Area/Pump1", "HIST.MachineA.Temp"), + new HistorizedTagRef("Area/Pump1", "HIST.MachineB.Temp"), + }, + Array.Empty())); + + // ONE mux ref is registered (both aliases share it). + var reg = mux.ExpectMsg(); + Assert.Equal(new[] { "Area/Pump1" }, reg.TagRefs); + + rec.Tell(new VirtualTagActor.DependencyValueChanged("Area/Pump1", 71.0, DateTime.UtcNow)); + + // BOTH historian names receive the single fanned value. + await AwaitAssertAsync(() => + { + var snap = writer.Snapshot(); + Assert.Contains(snap, w => w.Tag == "HIST.MachineA.Temp" && w.Value == 71.0); + Assert.Contains(snap, w => w.Tag == "HIST.MachineB.Temp" && w.Value == 71.0); + }); + } + + [Fact] + public void Removing_one_alias_keeps_the_shared_mux_ref_registered_for_the_other() + { + // Removing ONE alias's historian name from a shared mux ref must NOT drop the mux registration — + // the ref is still fanned for the surviving alias (key-set unchanged → no mux churn). + var mux = CreateTestProbe(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: Array.Empty())); + mux.ExpectMsg(); // PreStart (empty) + + rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs( + new[] + { + new HistorizedTagRef("Area/Pump1", "HIST.A"), + new HistorizedTagRef("Area/Pump1", "HIST.B"), + }, + Array.Empty())); + mux.ExpectMsg(); // {Area/Pump1} + + // Drop only HIST.A — Area/Pump1 still fans for HIST.B, so the key-set is unchanged: no mux churn. + rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs( + Array.Empty(), new[] { new HistorizedTagRef("Area/Pump1", "HIST.A") })); + mux.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); } [Fact]