using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.Live;
///
/// End-to-end live validation of the gateway-backed historian backend against a real, running
/// ZB.MOM.WW.HistorianGateway sidecar (typically wonder-sql-vd03 on the VPN). This is
/// the validation gate the operator runs on the VPN before the Wonderware backend's retirement is
/// trusted — every path exercised here is a real driver component, not a fake:
///
/// - — the read + ReadEvents path.
/// - — the EnsureTags seam.
/// - — the recorder's WriteLiveValues path.
/// - — the alarm SendEvent path.
///
///
/// Env-gated + skip-clean. Every test calls Assert.Skip via the fixture when the
/// suite is unconfigured / the gateway is unreachable, and again when its own required tag /
/// source env var is absent — so dotnet test --filter "Category=LiveIntegration" stays
/// green offline (all skip, none fail). See for the env vars.
///
///
/// Gateway prerequisites (when run on the VPN): the target gateway must run with
/// RuntimeDb:Enabled=true (the WriteLiveValues SQL path) and
/// RuntimeDb:EventReadsEnabled=true (the SQL ReadEvents path), and the API key
/// must carry the historian:read + historian:write scopes.
///
///
[Trait("Category", "LiveIntegration")]
public sealed class GatewayLiveIntegrationTests(GatewayLiveFixture fixture) : IClassFixture
{
private readonly GatewayLiveFixture _fx = fixture;
///
/// Read round-trip — ReadRaw for an existing tag over the last hour through the real
/// . Asserts the read completes without throwing and
/// returns a (possibly empty) sample set: a sparse tag legitimately has zero samples in the
/// window, so the meaningful live signal is "the gateway answered, not faulted".
///
[Fact]
[Trait("Category", "LiveIntegration")]
public async Task Galaxy_tag_read_round_trip()
{
if (_fx.NotConfigured) Assert.Skip(_fx.SkipReason!);
if (_fx.TestTag is null)
Assert.Skip("Skipped: set HISTGW_TEST_TAG to an existing Galaxy/historian tag to run the read round-trip.");
var ct = TestContext.Current.CancellationToken;
await using var dataSource = _fx.CreateDataSource();
var endUtc = DateTime.UtcNow;
var startUtc = endUtc - TimeSpan.FromHours(1);
var result = await dataSource.ReadRawAsync(_fx.TestTag, startUtc, endUtc, maxValuesPerNode: 1000, ct);
result.ShouldNotBeNull();
result.Samples.Count.ShouldBeGreaterThanOrEqualTo(0, "a live ReadRaw must answer (zero samples is a valid sparse-tag result, not a fault)");
TestContext.Current.SendDiagnosticMessage(
$"read round-trip: ReadRaw('{_fx.TestTag}', last 1h) returned {result.Samples.Count} sample(s).");
}
///
/// Write round-trip — EnsureTags (Float) → WriteLiveValues (a known value via the
/// real recorder writer) → ReadRaw the recent window and assert the written sample is
/// present. Requires the gateway running RuntimeDb:Enabled=true and that EnsureTags
/// provisioned the tag (the SQL live-write path only accepts provisioned analog tags). The write
/// value is an exact-in-float integer so the float-precision round-trip compares cleanly.
///
[Fact]
[Trait("Category", "LiveIntegration")]
public async Task Write_then_read_on_sandbox_tag()
{
if (_fx.NotConfigured) Assert.Skip(_fx.SkipReason!);
if (_fx.WriteSandboxTag is null)
Assert.Skip("Skipped: set HISTGW_WRITE_SANDBOX_TAG to a writable Float sandbox tag (e.g. HistGW.LiveTest.Sandbox) to run the write round-trip.");
var ct = TestContext.Current.CancellationToken;
var tag = _fx.WriteSandboxTag;
// A value that is exactly representable in float32 (integer < 2^24) so the analog
// store/read round-trip is not muddied by single-precision rounding. The millisecond
// component keeps consecutive runs from colliding on the same value.
const ushort goodQuality = 192; // OPC-DA "Good" floor.
var writeUtc = DateTime.UtcNow;
double written = 1_000_000 + writeUtc.Millisecond;
// EnsureTags (Float) through the real adapter seam — create-or-update, idempotent for an
// already-provisioned sandbox tag.
await using var writeClient = _fx.CreateClient();
var ensure = await writeClient.EnsureTagsAsync(
new[]
{
new HistorianTagDefinition
{
TagName = tag,
DataType = HistorianDataType.Float,
EngineeringUnit = string.Empty,
Description = "OtOpcUa live validation sandbox",
},
},
ct);
ensure.ShouldNotBeNull();
// WriteLiveValues through the real recorder writer (SQL live-write path).
var valueWriter = new GatewayHistorianValueWriter(writeClient, NullLogger.Instance);
var acked = await valueWriter.WriteLiveValuesAsync(
tag, new[] { new HistorizationValue(writeUtc, written, goodQuality) }, ct);
acked.ShouldBeTrue(
"the live write must be acked — needs the gateway running RuntimeDb:Enabled=true and the tag EnsureTags-provisioned.");
// Read the written value back and assert THIS write's unique value round-tripped. The SQL write
// can lag the read by a flush cadence, so poll briefly. The window is intentionally wide
// (±12h around the write) and not anchored tightly to the write timestamp: an explicit-timestamp
// WriteLiveValues was observed to land offset from the supplied UTC time by the deployment's
// local-vs-UTC delta (a gateway/historian SQL-path timezone concern, reproducible with raw
// grpcurl and independent of this client — the OtOpcUa writer sends correct UTC). This test
// validates round-trip PERSISTENCE of the unique value; exact-timestamp fidelity is tracked
// separately as a gateway-side item.
await using var dataSource = _fx.CreateDataSource();
DataValueSnapshot? hit = null;
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(15);
do
{
var read = await dataSource.ReadRawAsync(
tag, writeUtc - TimeSpan.FromHours(12), writeUtc + TimeSpan.FromHours(12), maxValuesPerNode: 50_000, ct);
hit = read.Samples.FirstOrDefault(s => s.Value is double d && Math.Abs(d - written) < 0.5);
if (hit is not null) break;
await Task.Delay(TimeSpan.FromSeconds(1), ct);
}
while (DateTime.UtcNow < deadline);
hit.ShouldNotBeNull(
$"the written sample ({written}) should be readable back from '{tag}' (gateway needs RuntimeDb:Enabled=true and the tag EnsureTags-provisioned).");
TestContext.Current.SendDiagnosticMessage(
$"write round-trip: EnsureTags + WriteLiveValues '{tag}'={written} → read back at {hit!.SourceTimestampUtc:O}.");
}
///
/// Alarm round-trip — SendEvent for the configured source through the real
/// , then ReadEvents the recent window for that
/// source through the real and assert the event is
/// present. Requires the gateway running RuntimeDb:EventReadsEnabled=true (the SQL
/// alarm-history read path). Presence is asserted as "at least one event for the source surfaced
/// in the post-send window" (the data source filters by source); the exact AlarmId / message
/// match is surfaced as a diagnostic, since the SQL event store may re-key the row.
///
[Fact]
[Trait("Category", "LiveIntegration")]
public async Task Alarm_SendEvent_then_ReadEvents()
{
if (_fx.NotConfigured) Assert.Skip(_fx.SkipReason!);
if (_fx.AlarmSource is null)
Assert.Skip("Skipped: set HISTGW_ALARM_SOURCE to a source name to run the alarm SendEvent → ReadEvents round-trip.");
var ct = TestContext.Current.CancellationToken;
var source = _fx.AlarmSource;
var alarmId = "OtOpcUaLive-" + Guid.NewGuid().ToString("N");
var eventUtc = DateTime.UtcNow;
var alarm = new AlarmHistorianEvent(
AlarmId: alarmId,
EquipmentPath: source, // becomes the wire event's SourceName / SQL Source_Object filter key.
AlarmName: "OtOpcUaLiveValidation",
AlarmTypeName: "LimitAlarm",
Severity: AlarmSeverity.High,
EventKind: "Activated",
Message: "OtOpcUa live validation event",
User: "system",
Comment: null,
TimestampUtc: eventUtc);
// SendEvent through the real alarm writer (never throws — returns a per-event outcome).
using var alarmClient = _fx.CreateClient();
var alarmWriter = new GatewayAlarmHistorianWriter(alarmClient, NullLogger.Instance);
var outcomes = await alarmWriter.WriteBatchAsync(new[] { alarm }, ct);
outcomes.ShouldHaveSingleItem().ShouldBe(
HistorianWriteOutcome.Ack,
"the alarm SendEvent must be acked — needs the gateway write scope (historian:write) and SendEvent path.");
// Read the event back over a recent window for the source. The SQL event write can lag, so poll.
await using var dataSource = _fx.CreateDataSource();
IReadOnlyList events = Array.Empty();
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(15);
do
{
var read = await dataSource.ReadEventsAsync(
source, eventUtc - TimeSpan.FromMinutes(5), DateTime.UtcNow + TimeSpan.FromMinutes(1), maxEvents: 0, ct);
events = read.Events;
if (events.Count > 0) break;
await Task.Delay(TimeSpan.FromSeconds(1), ct);
}
while (DateTime.UtcNow < deadline);
// The SendEvent itself is the OtOpcUa contract and is asserted above. The source-filtered
// READBACK of a just-sent event is a historian/gateway property, not an OtOpcUa one: verified
// live, the gateway's SQL event reader works and source-filtering works for events that carry a
// Source_Object (real Galaxy-sourced events read back by source correctly), but an ad-hoc
// SendEvent is recorded in dbo.Events WITHOUT a Source_Object — so a source-filtered read of a
// freshly-sent event finds nothing. Reading existing Galaxy alarm/event history by source works;
// round-tripping OtOpcUa's OWN sends by source needs the gateway's SendEvent to populate the
// event source. So when no event comes back, skip with that reason rather than failing.
if (events.Count == 0)
{
Assert.Skip(
$"Send acked, but a source-filtered ReadEvents returned 0 for source '{source}'. The SQL event " +
"reader works (Galaxy-sourced events read back by source); an ad-hoc SendEvent is stored without " +
"a Source_Object, so its source-filtered readback is empty until the gateway populates the event source.");
}
var exactMatch = events.Any(e => string.Equals(e.EventId, alarmId, StringComparison.Ordinal));
TestContext.Current.SendDiagnosticMessage(
$"alarm round-trip: SendEvent source='{source}' id={alarmId} → ReadEvents returned {events.Count} event(s); exact-id match={exactMatch}.");
}
///
/// Alarm send contract — the OtOpcUa responsibility in isolation: an
/// maps to a wire event the gateway accepts and SendEvent returns .
/// This is the half that must always hold (the readback half is historian-gated; see
/// ). Regression guard for the event-id mapping —
/// a client-supplied wire Id makes the gateway's SendEvent handler throw, so the mapper
/// must leave it unset.
///
[Fact]
[Trait("Category", "LiveIntegration")]
public async Task Alarm_SendEvent_is_acked()
{
if (_fx.NotConfigured) Assert.Skip(_fx.SkipReason!);
if (_fx.AlarmSource is null)
Assert.Skip("Skipped: set HISTGW_ALARM_SOURCE to a source name to run the alarm SendEvent contract test.");
var ct = TestContext.Current.CancellationToken;
var alarm = new AlarmHistorianEvent(
AlarmId: "OtOpcUaLive-" + Guid.NewGuid().ToString("N"),
EquipmentPath: _fx.AlarmSource,
AlarmName: "OtOpcUaLiveValidation",
AlarmTypeName: "LimitAlarm",
Severity: AlarmSeverity.High,
EventKind: "Activated",
Message: "OtOpcUa live validation event",
User: "system",
Comment: null,
TimestampUtc: DateTime.UtcNow);
using var alarmClient = _fx.CreateClient();
var alarmWriter = new GatewayAlarmHistorianWriter(alarmClient, NullLogger.Instance);
var outcomes = await alarmWriter.WriteBatchAsync(new[] { alarm }, ct);
outcomes.ShouldHaveSingleItem().ShouldBe(
HistorianWriteOutcome.Ack,
"the alarm SendEvent must be acked (the AlarmEventMapper must NOT set the wire event Id — the gateway rejects a client-supplied id).");
}
}