feat(historian-gateway): IHistorianProvisioning + GatewayTagProvisioner (EnsureTags, non-blocking)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
@@ -0,0 +1,73 @@
|
|||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Server-side historian tag provisioning — ensures the historian knows about the tags the
|
||||||
|
/// address space historizes before values are written. Registered alongside
|
||||||
|
/// <see cref="IHistorianDataSource"/> and invoked by the address-space applier when historized
|
||||||
|
/// nodes are (re)applied.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Provisioning is best-effort and <b>non-blocking</b>: an unreachable or erroring historian
|
||||||
|
/// never fails an address-space apply. Implementations return a
|
||||||
|
/// <see cref="HistorianProvisionResult"/> tally instead of throwing, so the applier can surface
|
||||||
|
/// a count without taking the server down. Non-historizable types are skipped (counted in
|
||||||
|
/// <see cref="HistorianProvisionResult.Skipped"/>), not failed.
|
||||||
|
/// </remarks>
|
||||||
|
public interface IHistorianProvisioning
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Ensures the supplied historian tags exist (create-or-update). Never throws; a transport or
|
||||||
|
/// backend failure is reported via <see cref="HistorianProvisionResult.Failed"/>.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="requests">The tags to ensure, with their driver data type and optional metadata.</param>
|
||||||
|
/// <param name="ct">A cancellation token for the operation.</param>
|
||||||
|
/// <returns>A tally of how the requests were handled.</returns>
|
||||||
|
Task<HistorianProvisionResult> EnsureTagsAsync(
|
||||||
|
IReadOnlyList<HistorianTagProvisionRequest> requests, CancellationToken ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A single historian tag to ensure — the driver-agnostic shape the applier hands to
|
||||||
|
/// <see cref="IHistorianProvisioning.EnsureTagsAsync"/>. A backend maps
|
||||||
|
/// <see cref="DataType"/> onto its native tag type and skips types it cannot historize.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="TagName">The full reference / tag name to ensure in the historian.</param>
|
||||||
|
/// <param name="DataType">The driver-agnostic data type, used to select the historian tag type.</param>
|
||||||
|
/// <param name="EngineeringUnit">Optional engineering unit (e.g. <c>degC</c>); <c>null</c> when unknown.</param>
|
||||||
|
/// <param name="Description">Optional human-readable description; <c>null</c> when unknown.</param>
|
||||||
|
public sealed record HistorianTagProvisionRequest(
|
||||||
|
string TagName,
|
||||||
|
DriverDataType DataType,
|
||||||
|
string? EngineeringUnit,
|
||||||
|
string? Description);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The tally returned by <see cref="IHistorianProvisioning.EnsureTagsAsync"/>. The buckets
|
||||||
|
/// partition the input: <c>Requested == Ensured + Skipped + Failed</c>.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="Requested">Total tags submitted.</param>
|
||||||
|
/// <param name="Ensured">Tags the historian acknowledged as created or already present.</param>
|
||||||
|
/// <param name="Skipped">Tags whose data type is not historizable on the backend (never sent).</param>
|
||||||
|
/// <param name="Failed">Tags that were sent but the backend did not acknowledge (incl. a swallowed transport error).</param>
|
||||||
|
public sealed record HistorianProvisionResult(
|
||||||
|
int Requested,
|
||||||
|
int Ensured,
|
||||||
|
int Skipped,
|
||||||
|
int Failed);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// No-op <see cref="IHistorianProvisioning"/> — the applier's safe default when no historian
|
||||||
|
/// backend is registered. Every call returns an all-zero tally and never touches a backend.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class NullHistorianProvisioning : IHistorianProvisioning
|
||||||
|
{
|
||||||
|
/// <summary>The shared singleton instance.</summary>
|
||||||
|
public static readonly NullHistorianProvisioning Instance = new();
|
||||||
|
|
||||||
|
private NullHistorianProvisioning() { }
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<HistorianProvisionResult> EnsureTagsAsync(
|
||||||
|
IReadOnlyList<HistorianTagProvisionRequest> requests, CancellationToken ct) =>
|
||||||
|
Task.FromResult(new HistorianProvisionResult(0, 0, 0, 0));
|
||||||
|
}
|
||||||
@@ -0,0 +1,81 @@
|
|||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Mapping;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// <see cref="IHistorianProvisioning"/> backed by the HistorianGateway <c>EnsureTags</c> path.
|
||||||
|
/// Non-historizable driver types are skipped (never built into a definition); the historizable
|
||||||
|
/// ones are mapped via <see cref="HistorianTypeMapper"/> and batched into a single
|
||||||
|
/// <c>EnsureTags</c> call.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <b>Non-blocking.</b> A historian that is unreachable or errors must never fail an address-space
|
||||||
|
/// apply, so the gateway call is wrapped in a catch-all: any exception counts the whole sent batch
|
||||||
|
/// as <see cref="HistorianProvisionResult.Failed"/> and returns. The method never throws and never
|
||||||
|
/// logs tag values, hostnames, or credentials.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class GatewayTagProvisioner : IHistorianProvisioning
|
||||||
|
{
|
||||||
|
private readonly IHistorianGatewayClient _client;
|
||||||
|
private readonly ILogger<GatewayTagProvisioner> _logger;
|
||||||
|
|
||||||
|
/// <summary>Creates the provisioner over a gateway client seam.</summary>
|
||||||
|
/// <param name="client">The gateway client used for the <c>EnsureTags</c> path.</param>
|
||||||
|
/// <param name="logger">Logger for skip/failure diagnostics (never logs tag values).</param>
|
||||||
|
public GatewayTagProvisioner(IHistorianGatewayClient client, ILogger<GatewayTagProvisioner> logger)
|
||||||
|
{
|
||||||
|
_client = client ?? throw new ArgumentNullException(nameof(client));
|
||||||
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task<HistorianProvisionResult> EnsureTagsAsync(
|
||||||
|
IReadOnlyList<HistorianTagProvisionRequest> requests, CancellationToken ct)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(requests);
|
||||||
|
|
||||||
|
var definitions = new List<HistorianTagDefinition>(requests.Count);
|
||||||
|
var skipped = 0;
|
||||||
|
|
||||||
|
foreach (var request in requests)
|
||||||
|
{
|
||||||
|
if (!HistorianTypeMapper.IsHistorizable(request.DataType))
|
||||||
|
{
|
||||||
|
skipped++;
|
||||||
|
// Log only the (non-sensitive) data type — never the tag name.
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Skipping provisioning of a non-historizable tag of type {DataType}.", request.DataType);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
definitions.Add(new HistorianTagDefinition
|
||||||
|
{
|
||||||
|
TagName = request.TagName,
|
||||||
|
DataType = HistorianTypeMapper.ToHistorianDataType(request.DataType),
|
||||||
|
// Proto string fields are non-nullable — coalesce absent metadata to empty.
|
||||||
|
EngineeringUnit = request.EngineeringUnit ?? string.Empty,
|
||||||
|
Description = request.Description ?? string.Empty,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var results = await _client.EnsureTagsAsync(definitions, ct).ConfigureAwait(false);
|
||||||
|
var ensured = results.Results.Count(r => r.Success);
|
||||||
|
var failed = Math.Max(0, definitions.Count - ensured);
|
||||||
|
return new HistorianProvisionResult(requests.Count, ensured, skipped, failed);
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
// Non-blocking: a failed EnsureTags never fails the apply. Count the whole sent batch as
|
||||||
|
// Failed and return; log only the failure category (no tag values).
|
||||||
|
_logger.LogWarning(
|
||||||
|
"EnsureTags failed for {Count} historian tag(s) ({Exception}); provisioning deferred.",
|
||||||
|
definitions.Count, exception.GetType().Name);
|
||||||
|
return new HistorianProvisionResult(requests.Count, Ensured: 0, Skipped: skipped, Failed: definitions.Count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
+114
@@ -0,0 +1,114 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
|
||||||
|
|
||||||
|
public sealed class GatewayTagProvisionerTests
|
||||||
|
{
|
||||||
|
private static GatewayTagProvisioner Provisioner(FakeHistorianGatewayClient fake) =>
|
||||||
|
new(fake, NullLogger<GatewayTagProvisioner>.Instance);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Ensures_numeric_tags_with_mapped_type()
|
||||||
|
{
|
||||||
|
var fake = new FakeHistorianGatewayClient { EnsureTagsResult = new TagOperationResults() };
|
||||||
|
var p = Provisioner(fake);
|
||||||
|
var reqs = new[]
|
||||||
|
{
|
||||||
|
new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, "degC", "Temp"),
|
||||||
|
new HistorianTagProvisionRequest("Pump1.Run", DriverDataType.Boolean, null, null),
|
||||||
|
};
|
||||||
|
|
||||||
|
var result = await p.EnsureTagsAsync(reqs, TestContext.Current.CancellationToken);
|
||||||
|
|
||||||
|
Assert.NotNull(fake.LastEnsureDefinitions);
|
||||||
|
Assert.Equal(2, fake.LastEnsureDefinitions!.Count);
|
||||||
|
Assert.Equal(HistorianDataType.Float, fake.LastEnsureDefinitions[0].DataType);
|
||||||
|
Assert.Equal(HistorianDataType.Int1, fake.LastEnsureDefinitions[1].DataType);
|
||||||
|
Assert.Equal(2, result.Requested);
|
||||||
|
Assert.Equal(0, result.Skipped);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Maps_metadata_and_coalesces_null_metadata_to_empty()
|
||||||
|
{
|
||||||
|
var fake = new FakeHistorianGatewayClient { EnsureTagsResult = new TagOperationResults() };
|
||||||
|
var p = Provisioner(fake);
|
||||||
|
|
||||||
|
await p.EnsureTagsAsync(
|
||||||
|
new[]
|
||||||
|
{
|
||||||
|
new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, "degC", "Temp"),
|
||||||
|
new HistorianTagProvisionRequest("Pump1.Run", DriverDataType.Boolean, null, null),
|
||||||
|
},
|
||||||
|
TestContext.Current.CancellationToken);
|
||||||
|
|
||||||
|
var defs = fake.LastEnsureDefinitions!;
|
||||||
|
Assert.Equal("Pump1.Temp", defs[0].TagName);
|
||||||
|
Assert.Equal("degC", defs[0].EngineeringUnit);
|
||||||
|
Assert.Equal("Temp", defs[0].Description);
|
||||||
|
// Proto string fields are non-nullable — null metadata must coalesce to empty.
|
||||||
|
Assert.Equal(string.Empty, defs[1].EngineeringUnit);
|
||||||
|
Assert.Equal(string.Empty, defs[1].Description);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Deferred_types_are_skipped_not_sent()
|
||||||
|
{
|
||||||
|
var fake = new FakeHistorianGatewayClient { EnsureTagsResult = new TagOperationResults() };
|
||||||
|
var p = Provisioner(fake);
|
||||||
|
|
||||||
|
var result = await p.EnsureTagsAsync(
|
||||||
|
new[] { new HistorianTagProvisionRequest("Pump1.Name", DriverDataType.String, null, null) },
|
||||||
|
TestContext.Current.CancellationToken);
|
||||||
|
|
||||||
|
Assert.Empty(fake.LastEnsureDefinitions!); // String is deferred → never built into a definition
|
||||||
|
Assert.Equal(1, result.Requested);
|
||||||
|
Assert.Equal(1, result.Skipped);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Gateway_failure_is_swallowed_and_counted_not_thrown()
|
||||||
|
{
|
||||||
|
var fake = new FakeHistorianGatewayClient { EnsureTagsThrows = new Exception("boom") };
|
||||||
|
var p = Provisioner(fake);
|
||||||
|
|
||||||
|
var result = await p.EnsureTagsAsync(
|
||||||
|
new[] { new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, null, null) },
|
||||||
|
TestContext.Current.CancellationToken);
|
||||||
|
|
||||||
|
Assert.Equal(1, result.Failed); // non-blocking: no throw
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Ensured_count_reflects_successful_results()
|
||||||
|
{
|
||||||
|
var fake = new FakeHistorianGatewayClient
|
||||||
|
{
|
||||||
|
EnsureTagsResult = new TagOperationResults
|
||||||
|
{
|
||||||
|
Results =
|
||||||
|
{
|
||||||
|
new TagOperationResult { Name = "Pump1.Temp", Success = true },
|
||||||
|
new TagOperationResult { Name = "Pump1.Run", Success = false, Error = "x" },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
var p = Provisioner(fake);
|
||||||
|
|
||||||
|
var result = await p.EnsureTagsAsync(
|
||||||
|
new[]
|
||||||
|
{
|
||||||
|
new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, null, null),
|
||||||
|
new HistorianTagProvisionRequest("Pump1.Run", DriverDataType.Boolean, null, null),
|
||||||
|
},
|
||||||
|
TestContext.Current.CancellationToken);
|
||||||
|
|
||||||
|
Assert.Equal(2, result.Requested);
|
||||||
|
Assert.Equal(1, result.Ensured);
|
||||||
|
Assert.Equal(0, result.Skipped);
|
||||||
|
Assert.Equal(1, result.Failed);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user