feat(historian-gateway): EnsureTags provisioning hook in AddressSpaceApplier (non-blocking)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||||
|
|
||||||
@@ -27,16 +28,29 @@ public sealed class AddressSpaceApplier
|
|||||||
{
|
{
|
||||||
private readonly IOpcUaAddressSpaceSink _sink;
|
private readonly IOpcUaAddressSpaceSink _sink;
|
||||||
private readonly ILogger<AddressSpaceApplier> _logger;
|
private readonly ILogger<AddressSpaceApplier> _logger;
|
||||||
|
private readonly IHistorianProvisioning _provisioning;
|
||||||
|
|
||||||
/// <summary>Initializes a new instance of the AddressSpaceApplier class.</summary>
|
/// <summary>Initializes a new instance of the AddressSpaceApplier class.</summary>
|
||||||
/// <param name="sink">The OPC UA address space sink to apply changes to.</param>
|
/// <param name="sink">The OPC UA address space sink to apply changes to.</param>
|
||||||
/// <param name="logger">The logger instance.</param>
|
/// <param name="logger">The logger instance.</param>
|
||||||
public AddressSpaceApplier(IOpcUaAddressSpaceSink sink, ILogger<AddressSpaceApplier> logger)
|
/// <param name="provisioning">
|
||||||
|
/// Optional historian tag provisioner — when an address space is (re)built, historized added
|
||||||
|
/// tags are auto-ensured in the historian via <see cref="IHistorianProvisioning.EnsureTagsAsync"/>.
|
||||||
|
/// Defaults (a <c>null</c> argument) to the no-op <see cref="NullHistorianProvisioning"/>, so every
|
||||||
|
/// existing two-argument call site compiles and behaves unchanged. The provisioning round-trip is
|
||||||
|
/// dispatched fire-and-forget off <see cref="Apply"/> (which runs on the OPC UA publish actor's
|
||||||
|
/// pinned thread), so it can never block or break a deploy.
|
||||||
|
/// </param>
|
||||||
|
public AddressSpaceApplier(
|
||||||
|
IOpcUaAddressSpaceSink sink,
|
||||||
|
ILogger<AddressSpaceApplier> logger,
|
||||||
|
IHistorianProvisioning? provisioning = null)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(sink);
|
ArgumentNullException.ThrowIfNull(sink);
|
||||||
ArgumentNullException.ThrowIfNull(logger);
|
ArgumentNullException.ThrowIfNull(logger);
|
||||||
_sink = sink;
|
_sink = sink;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
_provisioning = provisioning ?? NullHistorianProvisioning.Instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -177,9 +191,88 @@ public sealed class AddressSpaceApplier
|
|||||||
"AddressSpaceApplier: applied plan (added={Added}, removed={Removed}, changed={Changed}, surgicalTags={Surgical}, renamedFolders={Renamed}, rebuild={Rebuild})",
|
"AddressSpaceApplier: applied plan (added={Added}, removed={Removed}, changed={Changed}, surgicalTags={Surgical}, renamedFolders={Renamed}, rebuild={Rebuild})",
|
||||||
addedCount, removedCount, changedCount, rebuilt ? 0 : surgicalTagDeltas.Count, rebuilt ? 0 : renamedFolders.Count, rebuilt);
|
addedCount, removedCount, changedCount, rebuilt ? 0 : surgicalTagDeltas.Count, rebuilt ? 0 : renamedFolders.Count, rebuilt);
|
||||||
|
|
||||||
|
// After the address-space work has completed, auto-provision the historian for the added
|
||||||
|
// historized tags. This is fully detached (fire-and-forget) and wrapped so it can NEVER block
|
||||||
|
// or break the deploy — Apply has already produced its outcome and returns it regardless.
|
||||||
|
ProvisionHistorizedTags(plan);
|
||||||
|
|
||||||
return new AddressSpaceApplyOutcome(removedCount, addedCount, changedCount, rebuilt);
|
return new AddressSpaceApplyOutcome(removedCount, addedCount, changedCount, rebuilt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Auto-provision the historian for the added historized equipment tags. Runs on the OPC UA
|
||||||
|
/// publish actor's pinned thread, so the synchronous portion is kept to building the request
|
||||||
|
/// list only and the gateway round-trip is dispatched fire-and-forget. The whole hook is wrapped
|
||||||
|
/// in try/catch — a synchronously-throwing provisioner (or any request-building fault) is
|
||||||
|
/// swallowed so it cannot break a deploy.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="plan">The plan whose added historized tags to ensure in the historian.</param>
|
||||||
|
private void ProvisionHistorizedTags(AddressSpacePlan plan)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
List<HistorianTagProvisionRequest>? requests = null;
|
||||||
|
foreach (var tag in plan.AddedEquipmentTags)
|
||||||
|
{
|
||||||
|
// Only historized value variables are provisioned. Native-alarm tags materialise as
|
||||||
|
// Part 9 condition nodes (never historized value variables) — the materialiser resolves
|
||||||
|
// a historian tagname only for the non-alarm branch, so mirror that and skip them.
|
||||||
|
if (!tag.IsHistorized || tag.Alarm is not null) continue;
|
||||||
|
|
||||||
|
// Parse the driver-agnostic data type from the tag's DataType string. An unparseable
|
||||||
|
// type is skipped (logged at Debug) rather than faulting the hook.
|
||||||
|
if (!Enum.TryParse<DriverDataType>(tag.DataType, ignoreCase: true, out var dataType))
|
||||||
|
{
|
||||||
|
_logger.LogDebug(
|
||||||
|
"AddressSpaceApplier: skipping historian provisioning for an added historized tag whose data type '{DataType}' is not a DriverDataType",
|
||||||
|
tag.DataType);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve the historian name EXACTLY as MaterialiseEquipmentTags does: a null/blank
|
||||||
|
// override falls back to the driver-side FullName.
|
||||||
|
var historianName = string.IsNullOrWhiteSpace(tag.HistorianTagname) ? tag.FullName : tag.HistorianTagname;
|
||||||
|
(requests ??= new List<HistorianTagProvisionRequest>()).Add(
|
||||||
|
new HistorianTagProvisionRequest(historianName, dataType, EngineeringUnit: null, Description: tag.Name));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (requests is null) return;
|
||||||
|
|
||||||
|
// Fire-and-forget OFF the apply path. Never await/.Wait()/.Result here — Apply must return
|
||||||
|
// its outcome without blocking on the gateway. The continuation observes the task so a
|
||||||
|
// faulted provisioning never becomes an unobserved exception, and logs the tally.
|
||||||
|
var provisionCount = requests.Count;
|
||||||
|
var dispatch = _provisioning.EnsureTagsAsync(requests, CancellationToken.None);
|
||||||
|
_ = dispatch.ContinueWith(
|
||||||
|
t =>
|
||||||
|
{
|
||||||
|
if (t.IsFaulted)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(t.Exception?.GetBaseException(),
|
||||||
|
"AddressSpaceApplier: historian provisioning of {Count} tag(s) faulted; deploy unaffected", provisionCount);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var result = t.Result;
|
||||||
|
if (result.Failed > 0 || result.Skipped > 0)
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"AddressSpaceApplier: historian provisioning completed (requested={Requested}, ensured={Ensured}, skipped={Skipped}, failed={Failed})",
|
||||||
|
result.Requested, result.Ensured, result.Skipped, result.Failed);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
CancellationToken.None,
|
||||||
|
TaskContinuationOptions.None,
|
||||||
|
TaskScheduler.Default);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// A synchronous fault (e.g. the provisioner throws before returning a task) must not break
|
||||||
|
// the deploy. Apply has already produced its outcome.
|
||||||
|
_logger.LogWarning(ex, "AddressSpaceApplier: historian provisioning hook faulted synchronously; deploy unaffected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void SafeRebuild()
|
private void SafeRebuild()
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
|
||||||
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Commons\ZB.MOM.WW.OtOpcUa.Commons.csproj"/>
|
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Commons\ZB.MOM.WW.OtOpcUa.Commons.csproj"/>
|
||||||
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Configuration\ZB.MOM.WW.OtOpcUa.Configuration.csproj"/>
|
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Configuration\ZB.MOM.WW.OtOpcUa.Configuration.csproj"/>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|||||||
+167
@@ -0,0 +1,167 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// T15 — verifies the non-blocking historian-provisioning hook in
|
||||||
|
/// <see cref="AddressSpaceApplier.Apply"/>. The hook fires AFTER the address-space work and
|
||||||
|
/// dispatches <see cref="IHistorianProvisioning.EnsureTagsAsync"/> fire-and-forget, so a slow or
|
||||||
|
/// throwing provisioner can never block or break a deploy on the OPC UA publish actor's pinned
|
||||||
|
/// thread.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AddressSpaceApplierProvisioningTests
|
||||||
|
{
|
||||||
|
/// <summary>Capturing <see cref="IHistorianProvisioning"/> double. Records the requests it was
|
||||||
|
/// handed and signals a <see cref="TaskCompletionSource"/> when invoked, so a test can await the
|
||||||
|
/// fire-and-forget dispatch deterministically (never poll/sleep). A <see cref="Throw"/> flag
|
||||||
|
/// simulates a synchronous provisioner fault.</summary>
|
||||||
|
private sealed class CapturingProvisioner : IHistorianProvisioning
|
||||||
|
{
|
||||||
|
private readonly TaskCompletionSource _called = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
|
||||||
|
/// <summary>Gets the requests the hook handed to <see cref="EnsureTagsAsync"/>.</summary>
|
||||||
|
public List<HistorianTagProvisionRequest> Seen { get; } = new();
|
||||||
|
|
||||||
|
/// <summary>When true, <see cref="EnsureTagsAsync"/> throws synchronously (a fault before any await).</summary>
|
||||||
|
public bool Throw { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Completes once <see cref="EnsureTagsAsync"/> has been invoked.</summary>
|
||||||
|
public Task Called => _called.Task;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<HistorianProvisionResult> EnsureTagsAsync(
|
||||||
|
IReadOnlyList<HistorianTagProvisionRequest> requests, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (Throw)
|
||||||
|
{
|
||||||
|
_called.TrySetResult();
|
||||||
|
throw new InvalidOperationException("boom");
|
||||||
|
}
|
||||||
|
|
||||||
|
Seen.AddRange(requests);
|
||||||
|
_called.TrySetResult();
|
||||||
|
return Task.FromResult(new HistorianProvisionResult(requests.Count, requests.Count, 0, 0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>The hook provisions ONLY historized added tags, with the resolved historian name
|
||||||
|
/// (override when set, else the driver-side FullName).</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Apply_provisions_only_historized_added_tags()
|
||||||
|
{
|
||||||
|
var prov = new CapturingProvisioner();
|
||||||
|
var applier = new AddressSpaceApplier(NullOpcUaAddressSpaceSink.Instance, NullLogger<AddressSpaceApplier>.Instance, prov);
|
||||||
|
|
||||||
|
// Leaf display name "Temp"; historian override "Pump1.Temp".
|
||||||
|
var plan = PlanWithAddedTags(
|
||||||
|
HistorizedTag(displayName: "Temp", historianName: "Pump1.Temp", dataType: "Float32"),
|
||||||
|
NonHistorizedTag(displayName: "Run", dataType: "Boolean"));
|
||||||
|
|
||||||
|
var outcome = applier.Apply(plan);
|
||||||
|
|
||||||
|
outcome.RebuildCalled.ShouldBeTrue();
|
||||||
|
|
||||||
|
// Fire-and-forget: await the capturing double's signal so the assertion is deterministic.
|
||||||
|
await prov.Called.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
|
||||||
|
prov.Seen.Count.ShouldBe(1);
|
||||||
|
prov.Seen[0].TagName.ShouldBe("Pump1.Temp"); // resolved historian name (override)
|
||||||
|
prov.Seen[0].DataType.ShouldBe(DriverDataType.Float32);
|
||||||
|
prov.Seen[0].Description.ShouldBe("Temp"); // leaf display name
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>A null/blank historian-name override resolves to the driver-side FullName — mirroring
|
||||||
|
/// the materialiser's resolution exactly.</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Apply_resolves_historian_name_from_fullname_when_override_blank()
|
||||||
|
{
|
||||||
|
var prov = new CapturingProvisioner();
|
||||||
|
var applier = new AddressSpaceApplier(NullOpcUaAddressSpaceSink.Instance, NullLogger<AddressSpaceApplier>.Instance, prov);
|
||||||
|
|
||||||
|
// IsHistorized but no override → historian name falls back to FullName ("40001").
|
||||||
|
var plan = PlanWithAddedTags(
|
||||||
|
HistorizedTag(displayName: "Speed", historianName: null, dataType: "Int32", fullName: "40001"));
|
||||||
|
|
||||||
|
applier.Apply(plan);
|
||||||
|
|
||||||
|
await prov.Called.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
|
||||||
|
prov.Seen.Count.ShouldBe(1);
|
||||||
|
prov.Seen[0].TagName.ShouldBe("40001");
|
||||||
|
prov.Seen[0].DataType.ShouldBe(DriverDataType.Int32);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>A synchronously-throwing provisioner must NOT block or break the publish: the
|
||||||
|
/// synchronous <see cref="AddressSpaceApplier.Apply"/> still completes its address-space work and
|
||||||
|
/// returns its normal outcome.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Provisioner_throw_does_not_block_publish()
|
||||||
|
{
|
||||||
|
var applier = new AddressSpaceApplier(
|
||||||
|
NullOpcUaAddressSpaceSink.Instance,
|
||||||
|
NullLogger<AddressSpaceApplier>.Instance,
|
||||||
|
new CapturingProvisioner { Throw = true });
|
||||||
|
|
||||||
|
var outcome = applier.Apply(PlanWithAddedTags(
|
||||||
|
HistorizedTag(displayName: "Temp", historianName: "Pump1.Temp", dataType: "Float32")));
|
||||||
|
|
||||||
|
outcome.RebuildCalled.ShouldBeTrue(); // address-space work still completed
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>The default ctor (no provisioner) binds the no-op <see cref="NullHistorianProvisioning"/>
|
||||||
|
/// and never faults a deploy — preserving every existing call site.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Default_ctor_uses_null_provisioning_and_does_not_throw()
|
||||||
|
{
|
||||||
|
var applier = new AddressSpaceApplier(NullOpcUaAddressSpaceSink.Instance, NullLogger<AddressSpaceApplier>.Instance);
|
||||||
|
|
||||||
|
var outcome = applier.Apply(PlanWithAddedTags(
|
||||||
|
HistorizedTag(displayName: "Temp", historianName: "Pump1.Temp", dataType: "Float32")));
|
||||||
|
|
||||||
|
outcome.RebuildCalled.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>An added historized tag whose DataType string is not a <see cref="DriverDataType"/> is
|
||||||
|
/// skipped (no request) — the hook never throws on an unparseable type.</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Apply_skips_added_tag_with_unparseable_datatype()
|
||||||
|
{
|
||||||
|
var prov = new CapturingProvisioner();
|
||||||
|
var applier = new AddressSpaceApplier(NullOpcUaAddressSpaceSink.Instance, NullLogger<AddressSpaceApplier>.Instance, prov);
|
||||||
|
|
||||||
|
// "Float" is NOT a DriverDataType member (the members are Float32/Float64); it must be skipped.
|
||||||
|
var plan = PlanWithAddedTags(
|
||||||
|
HistorizedTag(displayName: "Bad", historianName: "Pump1.Bad", dataType: "Float"),
|
||||||
|
HistorizedTag(displayName: "Good", historianName: "Pump1.Good", dataType: "Float32"));
|
||||||
|
|
||||||
|
applier.Apply(plan);
|
||||||
|
|
||||||
|
await prov.Called.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
|
||||||
|
prov.Seen.Count.ShouldBe(1);
|
||||||
|
prov.Seen[0].TagName.ShouldBe("Pump1.Good");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static EquipmentTagPlan HistorizedTag(string displayName, string? historianName, string dataType, string fullName = "ref")
|
||||||
|
=> new("tag-" + displayName, "eq-1", "drv", FolderPath: "", Name: displayName, DataType: dataType, FullName: fullName,
|
||||||
|
Writable: false, Alarm: null, IsHistorized: true, HistorianTagname: historianName);
|
||||||
|
|
||||||
|
private static EquipmentTagPlan NonHistorizedTag(string displayName, string dataType)
|
||||||
|
=> new("tag-" + displayName, "eq-1", "drv", FolderPath: "", Name: displayName, DataType: dataType, FullName: "ref",
|
||||||
|
Writable: false, Alarm: null, IsHistorized: false, HistorianTagname: null);
|
||||||
|
|
||||||
|
private static AddressSpacePlan PlanWithAddedTags(params EquipmentTagPlan[] tags) => new(
|
||||||
|
AddedEquipment: Array.Empty<EquipmentNode>(),
|
||||||
|
RemovedEquipment: Array.Empty<EquipmentNode>(),
|
||||||
|
ChangedEquipment: Array.Empty<AddressSpacePlan.EquipmentDelta>(),
|
||||||
|
AddedDrivers: Array.Empty<DriverInstancePlan>(),
|
||||||
|
RemovedDrivers: Array.Empty<DriverInstancePlan>(),
|
||||||
|
ChangedDrivers: Array.Empty<AddressSpacePlan.DriverDelta>(),
|
||||||
|
AddedAlarms: Array.Empty<ScriptedAlarmPlan>(),
|
||||||
|
RemovedAlarms: Array.Empty<ScriptedAlarmPlan>(),
|
||||||
|
ChangedAlarms: Array.Empty<AddressSpacePlan.AlarmDelta>())
|
||||||
|
{
|
||||||
|
AddedEquipmentTags = tags,
|
||||||
|
};
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user