feat(historian): support Total aggregate (client-side Average x interval-seconds)

This commit is contained in:
Joseph Doherty
2026-06-16 05:24:56 -04:00
parent 5c5aaef609
commit 5e27b5f708
2 changed files with 94 additions and 13 deletions
@@ -92,6 +92,13 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist
}
/// <summary>Asynchronously reads processed historical data with aggregation for a tag within a time range.</summary>
/// <remarks>
/// <see cref="HistoryAggregateType.Total"/> is derived client-side as the time-weighted
/// Average × interval-seconds; Wonderware AnalogSummary exposes no Total column. The wire
/// request is issued with the Average column and each returned bucket value is scaled by
/// <c>interval.TotalSeconds</c>, preserving the bucket's status code and timestamp. All
/// other aggregates pass through unchanged.
/// </remarks>
/// <param name="fullReference">The full reference path of the tag to read.</param>
/// <param name="startUtc">The start time in UTC for the read range.</param>
/// <param name="endUtc">The end time in UTC for the read range.</param>
@@ -103,19 +110,52 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
// Total has no AnalogSummary column — request the time-weighted Average and scale
// client-side below (Total = Average × interval-seconds).
var isDerivedTotal = aggregate == HistoryAggregateType.Total;
var wireAggregate = isDerivedTotal ? HistoryAggregateType.Average : aggregate;
var req = new ReadProcessedRequest
{
TagName = fullReference,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
IntervalMs = interval.TotalMilliseconds,
AggregateColumn = MapAggregate(aggregate),
AggregateColumn = MapAggregate(wireAggregate),
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync<ReadProcessedRequest, ReadProcessedReply>(
MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req,
r => (r.Success, r.Error), "ReadProcessed", cancellationToken).ConfigureAwait(false);
return new HistoryReadResult(ToAggregateSnapshots(reply.Buckets), ContinuationPoint: null);
var buckets = isDerivedTotal
? ScaleAverageToTotal(reply.Buckets, interval.TotalSeconds)
: reply.Buckets;
return new HistoryReadResult(ToAggregateSnapshots(buckets), ContinuationPoint: null);
}
/// <summary>
/// Derives <see cref="HistoryAggregateType.Total"/> buckets from time-weighted Average
/// buckets using the time-integral identity Total = Average × interval-seconds. Null
/// (unavailable) buckets are carried through unscaled so the downstream null→BadNoData
/// mapping still fires; non-null values are multiplied by <paramref name="intervalSeconds"/>.
/// </summary>
private static HistorianAggregateSampleDto[] ScaleAverageToTotal(
HistorianAggregateSampleDto[] averages, double intervalSeconds)
{
if (averages.Length == 0) return averages;
var totals = new HistorianAggregateSampleDto[averages.Length];
for (var i = 0; i < averages.Length; i++)
{
var avg = averages[i];
totals[i] = new HistorianAggregateSampleDto
{
// Null (unavailable) average → null total (→ BadNoData downstream).
Value = avg.Value is { } v ? v * intervalSeconds : null,
TimestampUtcTicks = avg.TimestampUtcTicks,
};
}
return totals;
}
/// <summary>Asynchronously reads historical data at specific timestamps for a tag.</summary>
@@ -510,14 +550,17 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist
_ => 500,
};
/// <summary>
/// Maps an OPC UA aggregate to its Wonderware AnalogSummary column name. There is no
/// Total column — <see cref="HistoryAggregateType.Total"/> is derived client-side in
/// <see cref="ReadProcessedAsync"/> by requesting Average, so it is never passed here.
/// </summary>
private static string MapAggregate(HistoryAggregateType aggregate) => aggregate switch
{
HistoryAggregateType.Average => "Average",
HistoryAggregateType.Minimum => "Minimum",
HistoryAggregateType.Maximum => "Maximum",
HistoryAggregateType.Count => "ValueCount",
HistoryAggregateType.Total => throw new NotSupportedException(
"HistoryAggregateType.Total is not supported by the Wonderware AnalogSummary query — use Average/Minimum/Maximum/Count."),
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
};
@@ -463,22 +463,60 @@ public sealed class WonderwareHistorianClientTests
}
/// <summary>
/// (5) <see cref="HistoryAggregateType.Total"/> must throw
/// <see cref="NotSupportedException"/> because Wonderware AnalogSummary has no Total
/// aggregate column.
/// (5) <see cref="HistoryAggregateType.Total"/> is derived client-side as the
/// time-weighted Average multiplied by the interval duration in seconds, because the
/// Wonderware AnalogSummary query exposes no Total column. The client must issue the
/// wire request with the Average column and scale every returned bucket value by
/// <c>interval.TotalSeconds</c>, carrying the bucket's quality and timestamp through.
/// </summary>
[Fact]
public async Task ReadProcessedAsync_TotalAggregate_ThrowsNotSupported()
public async Task ReadProcessedAsync_TotalAggregate_ReturnsAverageTimesIntervalSeconds()
{
await using var server = new FakeSidecarServer(Secret);
var bucketTs = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc);
string? requestedColumn = null;
await using var server = new FakeSidecarServer(Secret)
{
OnReadProcessed = req =>
{
// Capture the column the client asked for: Total must be requested as Average.
requestedColumn = req.AggregateColumn;
return new ReadProcessedReply
{
Success = true,
Buckets =
[
// One Good Average bucket of 2.0; with a 60s interval the derived
// Total is 2.0 * 60 = 120.0.
new HistorianAggregateSampleDto { Value = 2.0, TimestampUtcTicks = bucketTs.Ticks },
// A null (unavailable) Average bucket must stay BadNoData / null.
new HistorianAggregateSampleDto { Value = null, TimestampUtcTicks = bucketTs.AddMinutes(1).Ticks },
],
};
},
};
await server.StartAsync();
await using var client = TcpClientFor(server);
var result = await client.ReadProcessedAsync("Tank.Level",
new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2026, 4, 29, 0, 2, 0, DateTimeKind.Utc),
TimeSpan.FromMinutes(1), HistoryAggregateType.Total, CancellationToken.None);
await Should.ThrowAsync<NotSupportedException>(() =>
client.ReadProcessedAsync("Tag",
DateTime.UtcNow, DateTime.UtcNow, TimeSpan.FromMinutes(1),
HistoryAggregateType.Total, CancellationToken.None));
// The wire request asks for the Average column — Total has no AnalogSummary column.
requestedColumn.ShouldBe("Average");
result.Samples.Count.ShouldBe(2);
// Total = Average (2.0) x interval-seconds (60) = 120.0, quality + timestamp carried.
result.Samples[0].StatusCode.ShouldBe(0x00000000u); // Good
result.Samples[0].Value.ShouldBe(120.0);
result.Samples[0].SourceTimestampUtc.ShouldBe(bucketTs);
// Null Average bucket → still BadNoData / null after scaling.
result.Samples[1].StatusCode.ShouldBe(0x800E0000u); // BadNoData
result.Samples[1].Value.ShouldBeNull();
result.Samples[1].SourceTimestampUtc.ShouldBe(bucketTs.AddMinutes(1));
}
/// <summary>