feat(historian-gateway): feed historized refs to the recorder on deploy (close continuous-historization ref-feed gap)
v2-ci / build (pull_request) Failing after 39s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (pull_request) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (pull_request) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (pull_request) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (pull_request) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (pull_request) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (pull_request) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (pull_request) Has been skipped

The ContinuousHistorizationRecorder was spawned with an EMPTY historized-ref
set, so it registered interest in nothing and historized nothing. This feeds it
the currently-historized tag refs on every address-space deploy/redeploy so its
DependencyMuxActor interest converges to exactly the historized set (the same
refs the EnsureTags provisioning hook resolves: override-or-FullName).

Design — delta convergence (the plan is a pure DIFF):
- New seam IHistorizedTagSubscriptionSink (Core.Abstractions/Historian) with a
  Null no-op singleton, mirroring how IHistorianProvisioning decouples the T15
  hook. AddressSpaceApplier gains a DEFAULTED ctor param (Null sink) so all ~80
  existing call sites + the production site compile unchanged.
- Apply() only ever sees a plan diff (an incremental/surgical apply carries a
  delta, not the full set), so the applier feeds an add/remove DELTA computed
  from AddedEquipmentTags / RemovedEquipmentTags / ChangedEquipmentTags. The
  recorder keeps the full set and re-registers it. The feed is a single
  non-blocking Tell behind the sink, wrapped in try/catch so a faulting feed
  never blocks or breaks a deploy (same discipline as the provisioning hook).
- Recorder.UpdateHistorizedRefs(added, removed) converges the tracked set, then
  — only when it actually changed — sends ONE RegisterInterest with the full set
  (the mux's RegisterInterest is a full-REPLACE) or one UnregisterInterest when
  it drains to empty (the mux has no per-ref unregister). An unchanged delta is
  a no-op (no mux churn).
- DI: the recorder is now spawned BEFORE the applier so the adapter
  (ActorHistorizedTagSubscriptionSink) can wrap its IActorRef; the Null sink is
  used when continuous historization is off/unwired.

Tests: recorder convergence (add-from-empty, add+remove converge, idempotent,
drain-to-empty unregisters); applier feeds resolved added refs, removed+renamed
deltas, and survives a throwing sink. Build clean (0 warnings on touched
projects); Runtime/OpcUaServer/Gateway/AdminUI suites green.

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
Joseph Doherty
2026-06-26 23:21:18 -04:00
parent 2124f21ab6
commit 2982cc4bb5
7 changed files with 451 additions and 42 deletions
@@ -143,6 +143,101 @@ public sealed class AddressSpaceApplierProvisioningTests
prov.Seen[0].TagName.ShouldBe("Pump1.Good");
}
/// <summary>Capturing <see cref="IHistorizedTagSubscriptionSink"/> double. Records the add/remove
/// ref deltas the applier feeds it. A <see cref="Throw"/> flag simulates a faulting feed.</summary>
private sealed class CapturingSubscriptionSink : IHistorizedTagSubscriptionSink
{
/// <summary>Refs the applier fed as ADDED.</summary>
public List<string> Added { get; } = new();
/// <summary>Refs the applier fed as REMOVED.</summary>
public List<string> Removed { get; } = new();
/// <summary>When true, <see cref="UpdateHistorizedRefs"/> throws synchronously.</summary>
public bool Throw { get; init; }
/// <inheritdoc />
public void UpdateHistorizedRefs(IReadOnlyList<string> added, IReadOnlyList<string> removed)
{
if (Throw) throw new InvalidOperationException("boom");
Added.AddRange(added);
Removed.AddRange(removed);
}
}
/// <summary>The feed pushes ONLY historized added refs, resolved (override-or-FullName) exactly like
/// the provisioning hook — non-historized tags never reach the recorder.</summary>
[Fact]
public void Apply_feeds_historized_added_refs_to_the_subscription_sink()
{
var sink = new CapturingSubscriptionSink();
var applier = new AddressSpaceApplier(
NullOpcUaAddressSpaceSink.Instance, NullLogger<AddressSpaceApplier>.Instance,
historizedSubscriptions: sink);
var plan = PlanWithAddedTags(
HistorizedTag(displayName: "Temp", historianName: "Pump1.Temp", dataType: "Float32"),
HistorizedTag(displayName: "Speed", historianName: null, dataType: "Int32", fullName: "40001"),
NonHistorizedTag(displayName: "Run", dataType: "Boolean"));
applier.Apply(plan);
sink.Added.ShouldBe(new[] { "Pump1.Temp", "40001" }, ignoreOrder: true); // override + FullName fallback
sink.Removed.ShouldBeEmpty();
}
/// <summary>Removed historized tags are fed as REMOVED refs; a changed tag whose historian override is
/// renamed feeds the old ref removed + the new ref added (the recorder converges the full set).</summary>
[Fact]
public void Apply_feeds_removed_and_renamed_historized_refs()
{
var sink = new CapturingSubscriptionSink();
var applier = new AddressSpaceApplier(
NullOpcUaAddressSpaceSink.Instance, NullLogger<AddressSpaceApplier>.Instance,
historizedSubscriptions: sink);
var removedTag = HistorizedTag(displayName: "Old", historianName: "Pump1.Old", dataType: "Float32");
// Same TagId ("tag-T"), historian override renamed A → B (both historized) → remove A, add B.
var prev = HistorizedTag(displayName: "T", historianName: "Pump1.A", dataType: "Float32");
var cur = HistorizedTag(displayName: "T", historianName: "Pump1.B", dataType: "Float32");
var plan = new AddressSpacePlan(
AddedEquipment: Array.Empty<EquipmentNode>(),
RemovedEquipment: Array.Empty<EquipmentNode>(),
ChangedEquipment: Array.Empty<AddressSpacePlan.EquipmentDelta>(),
AddedDrivers: Array.Empty<DriverInstancePlan>(),
RemovedDrivers: Array.Empty<DriverInstancePlan>(),
ChangedDrivers: Array.Empty<AddressSpacePlan.DriverDelta>(),
AddedAlarms: Array.Empty<ScriptedAlarmPlan>(),
RemovedAlarms: Array.Empty<ScriptedAlarmPlan>(),
ChangedAlarms: Array.Empty<AddressSpacePlan.AlarmDelta>())
{
RemovedEquipmentTags = new[] { removedTag },
ChangedEquipmentTags = new[] { new AddressSpacePlan.EquipmentTagDelta(prev, cur) },
};
applier.Apply(plan);
sink.Added.ShouldBe(new[] { "Pump1.B" }, ignoreOrder: true);
sink.Removed.ShouldBe(new[] { "Pump1.Old", "Pump1.A" }, ignoreOrder: true);
}
/// <summary>A synchronously-throwing subscription sink must NOT block or break the publish — the
/// address-space work still completes and <see cref="AddressSpaceApplier.Apply"/> returns its outcome.</summary>
[Fact]
public void Subscription_sink_throw_does_not_block_publish()
{
var applier = new AddressSpaceApplier(
NullOpcUaAddressSpaceSink.Instance,
NullLogger<AddressSpaceApplier>.Instance,
historizedSubscriptions: new CapturingSubscriptionSink { Throw = true });
var outcome = applier.Apply(PlanWithAddedTags(
HistorizedTag(displayName: "Temp", historianName: "Pump1.Temp", dataType: "Float32")));
outcome.RebuildCalled.ShouldBeTrue(); // address-space work still completed
}
private static EquipmentTagPlan HistorizedTag(string displayName, string? historianName, string dataType, string fullName = "ref")
=> new("tag-" + displayName, "eq-1", "drv", FolderPath: "", Name: displayName, DataType: dataType, FullName: fullName,
Writable: false, Alarm: null, IsHistorized: true, HistorianTagname: historianName);
@@ -36,6 +36,76 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit
Assert.Contains("Pump1.Temp", reg.TagRefs);
}
[Fact]
public void UpdateHistorizedRefs_from_empty_registers_the_added_refs()
{
var mux = CreateTestProbe();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: Array.Empty<string>()));
// PreStart registers the (empty) initial set first.
var initial = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
Assert.Empty(initial.TagRefs);
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
new[] { "Pump1.Temp", "Pump2.Flow" }, Array.Empty<string>()));
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
Assert.Equal(new[] { "Pump1.Temp", "Pump2.Flow" }, reg.TagRefs.OrderBy(x => x, StringComparer.Ordinal));
}
[Fact]
public void UpdateHistorizedRefs_converges_adding_and_removing()
{
var mux = CreateTestProbe();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: Array.Empty<string>()));
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(); // PreStart (empty)
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(
new[] { "A", "B" }, Array.Empty<string>()));
var first = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
Assert.Equal(new[] { "A", "B" }, first.TagRefs.OrderBy(x => x, StringComparer.Ordinal));
// Add "C", remove "A" → converge to {B, C}. The mux's RegisterInterest is a full-REPLACE, so the
// recorder sends ONE RegisterInterest carrying exactly the converged set (C registered, A gone).
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(new[] { "C" }, new[] { "A" }));
var second = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
Assert.Equal(new[] { "B", "C" }, second.TagRefs.OrderBy(x => x, StringComparer.Ordinal));
Assert.DoesNotContain("A", second.TagRefs);
}
[Fact]
public void UpdateHistorizedRefs_is_idempotent_when_the_set_is_unchanged()
{
var mux = CreateTestProbe();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: new[] { "A", "B" }));
var initial = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(); // PreStart {A, B}
Assert.Equal(new[] { "A", "B" }, initial.TagRefs.OrderBy(x => x, StringComparer.Ordinal));
// A delta with no net effect (add an already-present ref, remove an absent one) → no mux churn.
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(new[] { "A" }, new[] { "Z" }));
mux.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
}
[Fact]
public void UpdateHistorizedRefs_draining_to_empty_unregisters_all_interest()
{
var mux = CreateTestProbe();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
mux.Ref, new FakeValueWriter(), new InMemoryOutbox(), historizedRefs: new[] { "A" }));
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(); // PreStart {A}
// The mux has no per-ref unregister; an empty converged set drops ALL interest in one message.
rec.Tell(new ContinuousHistorizationRecorder.UpdateHistorizedRefs(Array.Empty<string>(), new[] { "A" }));
mux.ExpectMsg<DependencyMuxActor.UnregisterInterest>();
}
[Fact]
public async Task DependencyValueChanged_appends_to_outbox_then_drains_to_writer()
{