diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/DbBackedGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/DbBackedGalaxyBackend.cs index 95a626b..9c505db 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/DbBackedGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/DbBackedGalaxyBackend.cs @@ -127,6 +127,15 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy Tags = System.Array.Empty(), }); + public Task HistoryReadProcessedAsync( + HistoryReadProcessedRequest req, CancellationToken ct) + => Task.FromResult(new HistoryReadProcessedResponse + { + Success = false, + Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)", + Values = System.Array.Empty(), + }); + public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 }); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs index b4c0a93..5739146 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/IGalaxyBackend.cs @@ -38,6 +38,7 @@ public interface IGalaxyBackend Task AcknowledgeAlarmAsync(AlarmAckRequest req, CancellationToken ct); Task HistoryReadAsync(HistoryReadRequest req, CancellationToken ct); + Task HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct); Task RecycleAsync(RecycleHostRequest req, CancellationToken ct); } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs index 7cd543a..eb38e52 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs @@ -264,6 +264,48 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable } } + public async Task HistoryReadProcessedAsync( + HistoryReadProcessedRequest req, CancellationToken ct) + { + if (_historian is null) + return new HistoryReadProcessedResponse + { + Success = false, + Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration", + Values = Array.Empty(), + }; + + if (req.IntervalMs <= 0) + return new HistoryReadProcessedResponse + { + Success = false, + Error = "HistoryReadProcessed requires IntervalMs > 0", + Values = Array.Empty(), + }; + + var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime; + var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime; + + try + { + var samples = await _historian.ReadAggregateAsync( + req.TagReference, start, end, req.IntervalMs, req.AggregateColumn, ct).ConfigureAwait(false); + + var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray(); + return new HistoryReadProcessedResponse { Success = true, Values = wire }; + } + catch (OperationCanceledException) { throw; } + catch (Exception ex) + { + return new HistoryReadProcessedResponse + { + Success = false, + Error = $"Historian aggregate read failed: {ex.Message}", + Values = Array.Empty(), + }; + } + } + public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 }); @@ -305,6 +347,21 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable return 0x80000000u; // Bad } + /// + /// Maps a (one aggregate bucket) to the IPC wire + /// shape. A null means the aggregate was + /// unavailable for the bucket — the Proxy translates that to OPC UA BadNoData. + /// + private static GalaxyDataValue ToWire(string reference, HistorianAggregateSample sample) => new() + { + TagReference = reference, + ValueBytes = sample.Value is null ? null : MessagePackSerializer.Serialize(sample.Value.Value), + ValueMessagePackType = 0, + StatusCode = sample.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u, + SourceTimestampUtcUnixMs = new DateTimeOffset(sample.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + ServerTimestampUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + private static GalaxyAttributeInfo MapAttribute(GalaxyAttributeRow row) => new() { AttributeName = row.AttributeName, diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs index bff89fe..27da0a4 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/StubGalaxyBackend.cs @@ -85,6 +85,15 @@ public sealed class StubGalaxyBackend : IGalaxyBackend Tags = System.Array.Empty(), }); + public Task HistoryReadProcessedAsync( + HistoryReadProcessedRequest req, CancellationToken ct) + => Task.FromResult(new HistoryReadProcessedResponse + { + Success = false, + Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)", + Values = System.Array.Empty(), + }); + public Task RecycleAsync(RecycleHostRequest req, CancellationToken ct) => Task.FromResult(new RecycleStatusResponse { diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs index a406c04..7d82808 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Ipc/GalaxyFrameHandler.cs @@ -80,6 +80,13 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) : await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct); return; } + case MessageKind.HistoryReadProcessedRequest: + { + var resp = await backend.HistoryReadProcessedAsync( + Deserialize(body), ct); + await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct); + return; + } case MessageKind.RecycleHostRequest: { var resp = await backend.RecycleAsync(Deserialize(body), ct); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs index ee4a2d1..41086cb 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/GalaxyProxyDriver.cs @@ -296,10 +296,50 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options) return new HistoryReadResult(samples, ContinuationPoint: null); } - public Task ReadProcessedAsync( + public async Task ReadProcessedAsync( string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval, HistoryAggregateType aggregate, CancellationToken cancellationToken) - => throw new NotSupportedException("Galaxy historian processed reads are not supported in v2; use ReadRawAsync."); + { + var client = RequireClient(); + var column = MapAggregateToColumn(aggregate); + + var resp = await client.CallAsync( + MessageKind.HistoryReadProcessedRequest, + new HistoryReadProcessedRequest + { + SessionId = _sessionId, + TagReference = fullReference, + StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + IntervalMs = (long)interval.TotalMilliseconds, + AggregateColumn = column, + }, + MessageKind.HistoryReadProcessedResponse, + cancellationToken); + + if (!resp.Success) + throw new InvalidOperationException($"Galaxy.Host HistoryReadProcessed failed: {resp.Error}"); + + IReadOnlyList samples = [.. resp.Values.Select(ToSnapshot)]; + return new HistoryReadResult(samples, ContinuationPoint: null); + } + + /// + /// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian + /// AnalogSummaryQuery column names consumed by HistorianDataSource.ReadAggregateAsync. + /// Kept on the Proxy side so Galaxy.Host stays OPC-UA-free. + /// + internal static string MapAggregateToColumn(HistoryAggregateType aggregate) => aggregate switch + { + HistoryAggregateType.Average => "Average", + HistoryAggregateType.Minimum => "Minimum", + HistoryAggregateType.Maximum => "Maximum", + HistoryAggregateType.Count => "ValueCount", + HistoryAggregateType.Total => throw new NotSupportedException( + "HistoryAggregateType.Total is not supported by the Wonderware Historian AnalogSummary " + + "query — use Average × Count on the caller side, or switch to Average/Minimum/Maximum/Count."), + _ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"), + }; // ---- IRediscoverable ---- diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj index 6859a4c..47dadcc 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj @@ -16,6 +16,10 @@ + + + + diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs index 9694762..2a17478 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs @@ -50,6 +50,8 @@ public enum MessageKind : byte HistoryReadRequest = 0x60, HistoryReadResponse = 0x61, + HistoryReadProcessedRequest = 0x62, + HistoryReadProcessedResponse = 0x63, HostConnectivityStatus = 0x70, RuntimeStatusChange = 0x71, diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/History.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/History.cs index 6f10fe4..6990692 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/History.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/History.cs @@ -26,3 +26,27 @@ public sealed class HistoryReadResponse [Key(1)] public string? Error { get; set; } [Key(2)] public HistoryTagValues[] Tags { get; set; } = System.Array.Empty(); } + +/// +/// Processed (aggregated) historian read — OPC UA HistoryReadProcessed service. The +/// aggregate column is a string (e.g. "Average", "Minimum") mapped by the Proxy from the +/// OPC UA HistoryAggregateType enum so Galaxy.Host stays OPC-UA-free. +/// +[MessagePackObject] +public sealed class HistoryReadProcessedRequest +{ + [Key(0)] public long SessionId { get; set; } + [Key(1)] public string TagReference { get; set; } = string.Empty; + [Key(2)] public long StartUtcUnixMs { get; set; } + [Key(3)] public long EndUtcUnixMs { get; set; } + [Key(4)] public long IntervalMs { get; set; } + [Key(5)] public string AggregateColumn { get; set; } = "Average"; +} + +[MessagePackObject] +public sealed class HistoryReadProcessedResponse +{ + [Key(0)] public bool Success { get; set; } + [Key(1)] public string? Error { get; set; } + [Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty(); +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistoryReadProcessedTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistoryReadProcessedTests.cs new file mode 100644 index 0000000..7cca434 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/HistoryReadProcessedTests.cs @@ -0,0 +1,158 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests; + +[Trait("Category", "Unit")] +public sealed class HistoryReadProcessedTests +{ + [Fact] + public async Task ReturnsDisabledError_When_NoHistorianConfigured() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test"); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + historian: null); + + var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest + { + TagReference = "T", + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + IntervalMs = 1000, + AggregateColumn = "Average", + }, CancellationToken.None); + + resp.Success.ShouldBeFalse(); + resp.Error.ShouldContain("Historian disabled"); + } + + [Fact] + public async Task Rejects_NonPositiveInterval() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test"); + var fake = new FakeHistorianDataSource(); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + fake); + + var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest + { + TagReference = "T", + IntervalMs = 0, + AggregateColumn = "Average", + }, CancellationToken.None); + + resp.Success.ShouldBeFalse(); + resp.Error.ShouldContain("IntervalMs"); + } + + [Fact] + public async Task Maps_AggregateSample_With_Value_To_Good() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test"); + var fake = new FakeHistorianDataSource(new HistorianAggregateSample + { + Value = 12.34, + TimestampUtc = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc), + }); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + fake); + + var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest + { + TagReference = "T", + StartUtcUnixMs = 0, + EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + IntervalMs = 60_000, + AggregateColumn = "Average", + }, CancellationToken.None); + + resp.Success.ShouldBeTrue(); + resp.Values.Length.ShouldBe(1); + resp.Values[0].StatusCode.ShouldBe(0u); // Good + resp.Values[0].ValueBytes.ShouldNotBeNull(); + MessagePackSerializer.Deserialize(resp.Values[0].ValueBytes!).ShouldBe(12.34); + fake.LastAggregateColumn.ShouldBe("Average"); + fake.LastIntervalMs.ShouldBe(60_000d); + } + + [Fact] + public async Task Maps_Null_Bucket_To_BadNoData() + { + using var pump = new StaPump("Test.Sta"); + await pump.WaitForStartedAsync(); + var mx = new MxAccessClient(pump, new MxProxyAdapter(), "processed-test"); + var fake = new FakeHistorianDataSource(new HistorianAggregateSample + { + Value = null, + TimestampUtc = DateTime.UtcNow, + }); + using var backend = new MxAccessGalaxyBackend( + new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }), + mx, + fake); + + var resp = await backend.HistoryReadProcessedAsync(new HistoryReadProcessedRequest + { + TagReference = "T", + IntervalMs = 1000, + AggregateColumn = "Minimum", + }, CancellationToken.None); + + resp.Success.ShouldBeTrue(); + resp.Values.Length.ShouldBe(1); + resp.Values[0].StatusCode.ShouldBe(0x800E0000u); // BadNoData + resp.Values[0].ValueBytes.ShouldBeNull(); + } + + private sealed class FakeHistorianDataSource : IHistorianDataSource + { + private readonly HistorianAggregateSample[] _samples; + public string? LastAggregateColumn { get; private set; } + public double LastIntervalMs { get; private set; } + + public FakeHistorianDataSource(params HistorianAggregateSample[] samples) => _samples = samples; + + public Task> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadAggregateAsync( + string tag, DateTime s, DateTime e, double intervalMs, string col, CancellationToken ct) + { + LastAggregateColumn = col; + LastIntervalMs = intervalMs; + return Task.FromResult(new List(_samples)); + } + + public Task> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct) + => Task.FromResult(new List()); + + public Task> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct) + => Task.FromResult(new List()); + + public HistorianHealthSnapshot GetHealthSnapshot() => new(); + public void Dispose() { } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/AggregateColumnMappingTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/AggregateColumnMappingTests.cs new file mode 100644 index 0000000..85a094d --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests/AggregateColumnMappingTests.cs @@ -0,0 +1,27 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests; + +[Trait("Category", "Unit")] +public sealed class AggregateColumnMappingTests +{ + [Theory] + [InlineData(HistoryAggregateType.Average, "Average")] + [InlineData(HistoryAggregateType.Minimum, "Minimum")] + [InlineData(HistoryAggregateType.Maximum, "Maximum")] + [InlineData(HistoryAggregateType.Count, "ValueCount")] + public void Maps_OpcUa_enum_to_AnalogSummary_column(HistoryAggregateType aggregate, string expected) + { + GalaxyProxyDriver.MapAggregateToColumn(aggregate).ShouldBe(expected); + } + + [Fact] + public void Total_is_not_supported() + { + Should.Throw( + () => GalaxyProxyDriver.MapAggregateToColumn(HistoryAggregateType.Total)); + } +}