From 5e27b5f708d01bf4de95d3a61503aa93a32dc25a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 16 Jun 2026 05:24:56 -0400 Subject: [PATCH] feat(historian): support Total aggregate (client-side Average x interval-seconds) --- .../WonderwareHistorianClient.cs | 51 +++++++++++++++-- .../WonderwareHistorianClientTests.cs | 56 ++++++++++++++++--- 2 files changed, 94 insertions(+), 13 deletions(-) diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs index c55a50ba..95517ac7 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs @@ -92,6 +92,13 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist } /// Asynchronously reads processed historical data with aggregation for a tag within a time range. + /// + /// is derived client-side as the time-weighted + /// Average × interval-seconds; Wonderware AnalogSummary exposes no Total column. The wire + /// request is issued with the Average column and each returned bucket value is scaled by + /// interval.TotalSeconds, preserving the bucket's status code and timestamp. All + /// other aggregates pass through unchanged. + /// /// The full reference path of the tag to read. /// The start time in UTC for the read range. /// The end time in UTC for the read range. @@ -103,19 +110,52 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval, HistoryAggregateType aggregate, CancellationToken cancellationToken) { + // Total has no AnalogSummary column — request the time-weighted Average and scale + // client-side below (Total = Average × interval-seconds). + var isDerivedTotal = aggregate == HistoryAggregateType.Total; + var wireAggregate = isDerivedTotal ? HistoryAggregateType.Average : aggregate; + var req = new ReadProcessedRequest { TagName = fullReference, StartUtcTicks = startUtc.Ticks, EndUtcTicks = endUtc.Ticks, IntervalMs = interval.TotalMilliseconds, - AggregateColumn = MapAggregate(aggregate), + AggregateColumn = MapAggregate(wireAggregate), CorrelationId = Guid.NewGuid().ToString("N"), }; var reply = await InvokeAndClassifyAsync( MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req, r => (r.Success, r.Error), "ReadProcessed", cancellationToken).ConfigureAwait(false); - return new HistoryReadResult(ToAggregateSnapshots(reply.Buckets), ContinuationPoint: null); + + var buckets = isDerivedTotal + ? ScaleAverageToTotal(reply.Buckets, interval.TotalSeconds) + : reply.Buckets; + return new HistoryReadResult(ToAggregateSnapshots(buckets), ContinuationPoint: null); + } + + /// + /// Derives buckets from time-weighted Average + /// buckets using the time-integral identity Total = Average × interval-seconds. Null + /// (unavailable) buckets are carried through unscaled so the downstream null→BadNoData + /// mapping still fires; non-null values are multiplied by . + /// + private static HistorianAggregateSampleDto[] ScaleAverageToTotal( + HistorianAggregateSampleDto[] averages, double intervalSeconds) + { + if (averages.Length == 0) return averages; + var totals = new HistorianAggregateSampleDto[averages.Length]; + for (var i = 0; i < averages.Length; i++) + { + var avg = averages[i]; + totals[i] = new HistorianAggregateSampleDto + { + // Null (unavailable) average → null total (→ BadNoData downstream). + Value = avg.Value is { } v ? v * intervalSeconds : null, + TimestampUtcTicks = avg.TimestampUtcTicks, + }; + } + return totals; } /// Asynchronously reads historical data at specific timestamps for a tag. @@ -510,14 +550,17 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist _ => 500, }; + /// + /// Maps an OPC UA aggregate to its Wonderware AnalogSummary column name. There is no + /// Total column — is derived client-side in + /// by requesting Average, so it is never passed here. + /// private static string MapAggregate(HistoryAggregateType aggregate) => aggregate switch { HistoryAggregateType.Average => "Average", HistoryAggregateType.Minimum => "Minimum", HistoryAggregateType.Maximum => "Maximum", HistoryAggregateType.Count => "ValueCount", - HistoryAggregateType.Total => throw new NotSupportedException( - "HistoryAggregateType.Total is not supported by the Wonderware AnalogSummary query — use Average/Minimum/Maximum/Count."), _ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"), }; diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs index 1522e091..d2643fda 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs @@ -463,22 +463,60 @@ public sealed class WonderwareHistorianClientTests } /// - /// (5) must throw - /// because Wonderware AnalogSummary has no Total - /// aggregate column. + /// (5) is derived client-side as the + /// time-weighted Average multiplied by the interval duration in seconds, because the + /// Wonderware AnalogSummary query exposes no Total column. The client must issue the + /// wire request with the Average column and scale every returned bucket value by + /// interval.TotalSeconds, carrying the bucket's quality and timestamp through. /// [Fact] - public async Task ReadProcessedAsync_TotalAggregate_ThrowsNotSupported() + public async Task ReadProcessedAsync_TotalAggregate_ReturnsAverageTimesIntervalSeconds() { - await using var server = new FakeSidecarServer(Secret); + var bucketTs = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc); + string? requestedColumn = null; + + await using var server = new FakeSidecarServer(Secret) + { + OnReadProcessed = req => + { + // Capture the column the client asked for: Total must be requested as Average. + requestedColumn = req.AggregateColumn; + return new ReadProcessedReply + { + Success = true, + Buckets = + [ + // One Good Average bucket of 2.0; with a 60s interval the derived + // Total is 2.0 * 60 = 120.0. + new HistorianAggregateSampleDto { Value = 2.0, TimestampUtcTicks = bucketTs.Ticks }, + // A null (unavailable) Average bucket must stay BadNoData / null. + new HistorianAggregateSampleDto { Value = null, TimestampUtcTicks = bucketTs.AddMinutes(1).Ticks }, + ], + }; + }, + }; await server.StartAsync(); await using var client = TcpClientFor(server); + var result = await client.ReadProcessedAsync("Tank.Level", + new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc), + new DateTime(2026, 4, 29, 0, 2, 0, DateTimeKind.Utc), + TimeSpan.FromMinutes(1), HistoryAggregateType.Total, CancellationToken.None); - await Should.ThrowAsync(() => - client.ReadProcessedAsync("Tag", - DateTime.UtcNow, DateTime.UtcNow, TimeSpan.FromMinutes(1), - HistoryAggregateType.Total, CancellationToken.None)); + // The wire request asks for the Average column — Total has no AnalogSummary column. + requestedColumn.ShouldBe("Average"); + + result.Samples.Count.ShouldBe(2); + + // Total = Average (2.0) x interval-seconds (60) = 120.0, quality + timestamp carried. + result.Samples[0].StatusCode.ShouldBe(0x00000000u); // Good + result.Samples[0].Value.ShouldBe(120.0); + result.Samples[0].SourceTimestampUtc.ShouldBe(bucketTs); + + // Null Average bucket → still BadNoData / null after scaling. + result.Samples[1].StatusCode.ShouldBe(0x800E0000u); // BadNoData + result.Samples[1].Value.ShouldBeNull(); + result.Samples[1].SourceTimestampUtc.ShouldBe(bucketTs.AddMinutes(1)); } ///