test(historian-gateway): env-gated live validation vs wonder-sql-vd03 (read/write/alarm round-trips)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
+204
@@ -0,0 +1,204 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.Live;
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end live validation of the gateway-backed historian backend against a real, running
|
||||
/// <c>ZB.MOM.WW.HistorianGateway</c> sidecar (typically <c>wonder-sql-vd03</c> on the VPN). This is
|
||||
/// the validation gate the operator runs on the VPN before the Wonderware backend's retirement is
|
||||
/// trusted — every path exercised here is a <em>real</em> driver component, not a fake:
|
||||
/// <list type="bullet">
|
||||
/// <item><see cref="GatewayHistorianDataSource"/> — the read + <c>ReadEvents</c> path.</item>
|
||||
/// <item><see cref="HistorianGatewayClientAdapter"/> — the <c>EnsureTags</c> seam.</item>
|
||||
/// <item><see cref="GatewayHistorianValueWriter"/> — the recorder's <c>WriteLiveValues</c> path.</item>
|
||||
/// <item><see cref="GatewayAlarmHistorianWriter"/> — the alarm <c>SendEvent</c> path.</item>
|
||||
/// </list>
|
||||
/// <para>
|
||||
/// <b>Env-gated + skip-clean.</b> Every test calls <c>Assert.Skip</c> via the fixture when the
|
||||
/// suite is unconfigured / the gateway is unreachable, and again when its own required tag /
|
||||
/// source env var is absent — so <c>dotnet test --filter "Category=LiveIntegration"</c> stays
|
||||
/// green offline (all skip, none fail). See <see cref="GatewayLiveFixture"/> for the env vars.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Gateway prerequisites</b> (when run on the VPN): the target gateway must run with
|
||||
/// <c>RuntimeDb:Enabled=true</c> (the <c>WriteLiveValues</c> SQL path) and
|
||||
/// <c>RuntimeDb:EventReadsEnabled=true</c> (the SQL <c>ReadEvents</c> path), and the API key
|
||||
/// must carry the <c>historian:read</c> + <c>historian:write</c> scopes.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
[Trait("Category", "LiveIntegration")]
|
||||
public sealed class GatewayLiveIntegrationTests(GatewayLiveFixture fixture) : IClassFixture<GatewayLiveFixture>
|
||||
{
|
||||
private readonly GatewayLiveFixture _fx = fixture;
|
||||
|
||||
/// <summary>
|
||||
/// Read round-trip — <c>ReadRaw</c> for an existing tag over the last hour through the real
|
||||
/// <see cref="GatewayHistorianDataSource"/>. Asserts the read completes without throwing and
|
||||
/// returns a (possibly empty) sample set: a sparse tag legitimately has zero samples in the
|
||||
/// window, so the meaningful live signal is "the gateway answered, not faulted".
|
||||
/// </summary>
|
||||
[Fact]
|
||||
[Trait("Category", "LiveIntegration")]
|
||||
public async Task Galaxy_tag_read_round_trip()
|
||||
{
|
||||
if (_fx.NotConfigured) Assert.Skip(_fx.SkipReason!);
|
||||
if (_fx.TestTag is null)
|
||||
Assert.Skip("Skipped: set HISTGW_TEST_TAG to an existing Galaxy/historian tag to run the read round-trip.");
|
||||
|
||||
var ct = TestContext.Current.CancellationToken;
|
||||
await using var dataSource = _fx.CreateDataSource();
|
||||
|
||||
var endUtc = DateTime.UtcNow;
|
||||
var startUtc = endUtc - TimeSpan.FromHours(1);
|
||||
|
||||
var result = await dataSource.ReadRawAsync(_fx.TestTag, startUtc, endUtc, maxValuesPerNode: 1000, ct);
|
||||
|
||||
result.ShouldNotBeNull();
|
||||
result.Samples.Count.ShouldBeGreaterThanOrEqualTo(0, "a live ReadRaw must answer (zero samples is a valid sparse-tag result, not a fault)");
|
||||
|
||||
TestContext.Current.SendDiagnosticMessage(
|
||||
$"read round-trip: ReadRaw('{_fx.TestTag}', last 1h) returned {result.Samples.Count} sample(s).");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Write round-trip — <c>EnsureTags</c> (Float) → <c>WriteLiveValues</c> (a known value via the
|
||||
/// real recorder writer) → <c>ReadRaw</c> the recent window and assert the written sample is
|
||||
/// present. Requires the gateway running <c>RuntimeDb:Enabled=true</c> and that <c>EnsureTags</c>
|
||||
/// provisioned the tag (the SQL live-write path only accepts provisioned analog tags). The write
|
||||
/// value is an exact-in-float integer so the float-precision round-trip compares cleanly.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
[Trait("Category", "LiveIntegration")]
|
||||
public async Task Write_then_read_on_sandbox_tag()
|
||||
{
|
||||
if (_fx.NotConfigured) Assert.Skip(_fx.SkipReason!);
|
||||
if (_fx.WriteSandboxTag is null)
|
||||
Assert.Skip("Skipped: set HISTGW_WRITE_SANDBOX_TAG to a writable Float sandbox tag (e.g. HistGW.LiveTest.Sandbox) to run the write round-trip.");
|
||||
|
||||
var ct = TestContext.Current.CancellationToken;
|
||||
var tag = _fx.WriteSandboxTag;
|
||||
|
||||
// A value that is exactly representable in float32 (integer < 2^24) so the analog
|
||||
// store/read round-trip is not muddied by single-precision rounding. The millisecond
|
||||
// component keeps consecutive runs from colliding on the same value.
|
||||
const ushort goodQuality = 192; // OPC-DA "Good" floor.
|
||||
var writeUtc = DateTime.UtcNow;
|
||||
double written = 1_000_000 + writeUtc.Millisecond;
|
||||
|
||||
// EnsureTags (Float) through the real adapter seam — create-or-update, idempotent for an
|
||||
// already-provisioned sandbox tag.
|
||||
await using var writeClient = _fx.CreateClient();
|
||||
var ensure = await writeClient.EnsureTagsAsync(
|
||||
new[]
|
||||
{
|
||||
new HistorianTagDefinition
|
||||
{
|
||||
TagName = tag,
|
||||
DataType = HistorianDataType.Float,
|
||||
EngineeringUnit = string.Empty,
|
||||
Description = "OtOpcUa live validation sandbox",
|
||||
},
|
||||
},
|
||||
ct);
|
||||
ensure.ShouldNotBeNull();
|
||||
|
||||
// WriteLiveValues through the real recorder writer (SQL live-write path).
|
||||
var valueWriter = new GatewayHistorianValueWriter(writeClient, NullLogger<GatewayHistorianValueWriter>.Instance);
|
||||
var acked = await valueWriter.WriteLiveValuesAsync(
|
||||
tag, new[] { new HistorizationValue(writeUtc, written, goodQuality) }, ct);
|
||||
acked.ShouldBeTrue(
|
||||
"the live write must be acked — needs the gateway running RuntimeDb:Enabled=true and the tag EnsureTags-provisioned.");
|
||||
|
||||
// Read the written value back over a recent window. The SQL write can lag the read by a flush
|
||||
// cadence, so poll briefly rather than asserting on the first read.
|
||||
await using var dataSource = _fx.CreateDataSource();
|
||||
DataValueSnapshot? hit = null;
|
||||
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(15);
|
||||
do
|
||||
{
|
||||
var read = await dataSource.ReadRawAsync(
|
||||
tag, writeUtc - TimeSpan.FromMinutes(5), DateTime.UtcNow + TimeSpan.FromMinutes(1), maxValuesPerNode: 10_000, ct);
|
||||
hit = read.Samples.FirstOrDefault(s => s.Value is double d && Math.Abs(d - written) < 0.5);
|
||||
if (hit is not null) break;
|
||||
await Task.Delay(TimeSpan.FromSeconds(1), ct);
|
||||
}
|
||||
while (DateTime.UtcNow < deadline);
|
||||
|
||||
hit.ShouldNotBeNull(
|
||||
$"the written sample ({written}) should be readable back from '{tag}' within the recent window (gateway needs RuntimeDb:Enabled=true).");
|
||||
|
||||
TestContext.Current.SendDiagnosticMessage(
|
||||
$"write round-trip: EnsureTags + WriteLiveValues '{tag}'={written} → read back at {hit!.SourceTimestampUtc:O}.");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Alarm round-trip — <c>SendEvent</c> for the configured source through the real
|
||||
/// <see cref="GatewayAlarmHistorianWriter"/>, then <c>ReadEvents</c> the recent window for that
|
||||
/// source through the real <see cref="GatewayHistorianDataSource"/> and assert the event is
|
||||
/// present. Requires the gateway running <c>RuntimeDb:EventReadsEnabled=true</c> (the SQL
|
||||
/// alarm-history read path). Presence is asserted as "at least one event for the source surfaced
|
||||
/// in the post-send window" (the data source filters by source); the exact AlarmId / message
|
||||
/// match is surfaced as a diagnostic, since the SQL event store may re-key the row.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
[Trait("Category", "LiveIntegration")]
|
||||
public async Task Alarm_SendEvent_then_ReadEvents()
|
||||
{
|
||||
if (_fx.NotConfigured) Assert.Skip(_fx.SkipReason!);
|
||||
if (_fx.AlarmSource is null)
|
||||
Assert.Skip("Skipped: set HISTGW_ALARM_SOURCE to a source name to run the alarm SendEvent → ReadEvents round-trip.");
|
||||
|
||||
var ct = TestContext.Current.CancellationToken;
|
||||
var source = _fx.AlarmSource;
|
||||
var alarmId = "OtOpcUaLive-" + Guid.NewGuid().ToString("N");
|
||||
var eventUtc = DateTime.UtcNow;
|
||||
|
||||
var alarm = new AlarmHistorianEvent(
|
||||
AlarmId: alarmId,
|
||||
EquipmentPath: source, // becomes the wire event's SourceName / SQL Source_Object filter key.
|
||||
AlarmName: "OtOpcUaLiveValidation",
|
||||
AlarmTypeName: "LimitAlarm",
|
||||
Severity: AlarmSeverity.High,
|
||||
EventKind: "Activated",
|
||||
Message: "OtOpcUa live validation event",
|
||||
User: "system",
|
||||
Comment: null,
|
||||
TimestampUtc: eventUtc);
|
||||
|
||||
// SendEvent through the real alarm writer (never throws — returns a per-event outcome).
|
||||
using var alarmClient = _fx.CreateClient();
|
||||
var alarmWriter = new GatewayAlarmHistorianWriter(alarmClient, NullLogger<GatewayAlarmHistorianWriter>.Instance);
|
||||
var outcomes = await alarmWriter.WriteBatchAsync(new[] { alarm }, ct);
|
||||
outcomes.ShouldHaveSingleItem().ShouldBe(
|
||||
HistorianWriteOutcome.Ack,
|
||||
"the alarm SendEvent must be acked — needs the gateway write scope (historian:write) and SendEvent path.");
|
||||
|
||||
// Read the event back over a recent window for the source. The SQL event write can lag, so poll.
|
||||
await using var dataSource = _fx.CreateDataSource();
|
||||
IReadOnlyList<HistoricalEvent> events = Array.Empty<HistoricalEvent>();
|
||||
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(15);
|
||||
do
|
||||
{
|
||||
var read = await dataSource.ReadEventsAsync(
|
||||
source, eventUtc - TimeSpan.FromMinutes(5), DateTime.UtcNow + TimeSpan.FromMinutes(1), maxEvents: 0, ct);
|
||||
events = read.Events;
|
||||
if (events.Count > 0) break;
|
||||
await Task.Delay(TimeSpan.FromSeconds(1), ct);
|
||||
}
|
||||
while (DateTime.UtcNow < deadline);
|
||||
|
||||
events.Count.ShouldBeGreaterThan(0,
|
||||
$"the SendEvent for source '{source}' should be readable back via ReadEvents (gateway needs RuntimeDb:EventReadsEnabled=true).");
|
||||
|
||||
var exactMatch = events.Any(e => string.Equals(e.EventId, alarmId, StringComparison.Ordinal));
|
||||
TestContext.Current.SendDiagnosticMessage(
|
||||
$"alarm round-trip: SendEvent source='{source}' id={alarmId} → ReadEvents returned {events.Count} event(s); exact-id match={exactMatch}.");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user