Merge pull request 'Phase 2 PR 7 — wire IHistoryProvider.ReadProcessedAsync end-to-end' (#6) from phase-2-pr7-history-processed into v2

This commit was merged in pull request #6.
This commit is contained in:
2026-04-18 06:59:02 -04:00
11 changed files with 340 additions and 2 deletions

View File

@@ -127,6 +127,15 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
Tags = System.Array.Empty<HistoryTagValues>(),
});
public Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadProcessedResponse
{
Success = false,
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });

View File

@@ -38,6 +38,7 @@ public interface IGalaxyBackend
Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct);
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
}

View File

@@ -264,6 +264,48 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
}
}
public async Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadProcessedResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Values = Array.Empty<GalaxyDataValue>(),
};
if (req.IntervalMs <= 0)
return new HistoryReadProcessedResponse
{
Success = false,
Error = "HistoryReadProcessed requires IntervalMs > 0",
Values = Array.Empty<GalaxyDataValue>(),
};
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
try
{
var samples = await _historian.ReadAggregateAsync(
req.TagReference, start, end, req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false);
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
return new HistoryReadProcessedResponse { Success = true, Values = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadProcessedResponse
{
Success = false,
Error = $"Historian aggregate read failed: {ex.Message}",
Values = Array.Empty<GalaxyDataValue>(),
};
}
}
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
@@ -305,6 +347,21 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
return 0x80000000u; // Bad
}
/// <summary>
/// Maps a <see cref="HistorianAggregateSample"/> (one aggregate bucket) to the IPC wire
/// shape. A null <see cref="HistorianAggregateSample.Value"/> means the aggregate was
/// unavailable for the bucket — the Proxy translates that to OPC UA <c>BadNoData</c>.
/// </summary>
private static GalaxyDataValue ToWire(string reference, HistorianAggregateSample sample) => new()
{
TagReference = reference,
ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value.Value),
ValueMessagePackType = 0,
StatusCode = sample.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u,
SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new()
{
AttributeName = row.AttributeName,

View File

@@ -85,6 +85,15 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
Tags = System.Array.Empty<HistoryTagValues>(),
});
public Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(
HistoryReadProcessedRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadProcessedResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse
{

View File

@@ -80,6 +80,13 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct);
return;
}
case MessageKind.HistoryReadProcessedRequest:
{
var resp = await backend.HistoryReadProcessedAsync(
Deserialize<HistoryReadProcessedRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
return;
}
case MessageKind.RecycleHostRequest:
{
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);

View File

@@ -296,10 +296,50 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public Task<HistoryReadResult> ReadProcessedAsync(
public async Task<HistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
=> throw new NotSupportedException("Galaxy historian processed reads are not supported in v2; use ReadRawAsync.");
{
var client = RequireClient();
var column = MapAggregateToColumn(aggregate);
var resp = await client.CallAsync<HistoryReadProcessedRequest, HistoryReadProcessedResponse>(
MessageKind.HistoryReadProcessedRequest,
new HistoryReadProcessedRequest
{
SessionId = _sessionId,
TagReference = fullReference,
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
IntervalMs = (long)interval.TotalMilliseconds,
AggregateColumn = column,
},
MessageKind.HistoryReadProcessedResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadProcessed failed: {resp.Error}");
IReadOnlyList<DataValueSnapshot> samples = [.. resp.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
/// <summary>
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.
/// Kept on the Proxy side so Galaxy.Host stays OPC-UA-free.
/// </summary>
internal static string MapAggregateToColumn(HistoryAggregateType aggregate) => aggregate switch
{
HistoryAggregateType.Average => "Average",
HistoryAggregateType.Minimum => "Minimum",
HistoryAggregateType.Maximum => "Maximum",
HistoryAggregateType.Count => "ValueCount",
HistoryAggregateType.Total => throw new NotSupportedException(
"HistoryAggregateType.Total is not supported by the Wonderware Historian AnalogSummary " +
"query — use Average × Count on the caller side, or switch to Average/Minimum/Maximum/Count."),
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
};
// ---- IRediscoverable ----

View File

@@ -16,6 +16,10 @@
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>

View File

@@ -50,6 +50,8 @@ public enum MessageKind : byte
HistoryReadRequest = 0x60,
HistoryReadResponse = 0x61,
HistoryReadProcessedRequest = 0x62,
HistoryReadProcessedResponse = 0x63,
HostConnectivityStatus = 0x70,
RuntimeStatusChange = 0x71,

View File

@@ -26,3 +26,27 @@ public sealed class HistoryReadResponse
[Key(1)] public string? Error { get; set; }
[Key(2)] public HistoryTagValues[] Tags { get; set; } = System.Array.Empty<HistoryTagValues>();
}
/// <summary>
/// Processed (aggregated) historian read — OPC UA HistoryReadProcessed service. The
/// aggregate column is a string (e.g. "Average", "Minimum") mapped by the Proxy from the
/// OPC UA HistoryAggregateType enum so Galaxy.Host stays OPC-UA-free.
/// </summary>
[MessagePackObject]
public sealed class HistoryReadProcessedRequest
{
[Key(0)] public long SessionId { get; set; }
[Key(1)] public string TagReference { get; set; } = string.Empty;
[Key(2)] public long StartUtcUnixMs { get; set; }
[Key(3)] public long EndUtcUnixMs { get; set; }
[Key(4)] public long IntervalMs { get; set; }
[Key(5)] public string AggregateColumn { get; set; } = "Average";
}
[MessagePackObject]
public sealed class HistoryReadProcessedResponse
{
[Key(0)] public bool Success { get; set; }
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
}