Compare commits

...

1 Commits

Author SHA1 Message Date
Joseph Doherty
d13f919112 Phase 2 PR 10 — HistoryReadAtTime IPC surface. New Shared.Contracts messages HistoryReadAtTimeRequest/Response (MessageKind 0x64/0x65), IGalaxyBackend gains HistoryReadAtTimeAsync, Stub/DbBacked return canonical pending error, MxAccessGalaxyBackend delegates to _historian.ReadAtTimeAsync (ported in PR 5, exposed now) — request timestamp array is flow-encoded as Unix ms to avoid MessagePack DateTime quirks then re-hydrated to DateTime on the Host side. Per-sample mapping uses the same ToWire(HistorianSample) helper as ReadRawAsync so the category→StatusCode mapping stays consistent (Quality byte 192+ → Good 0u, 64-191 → Uncertain, 0-63 → Bad 0x80000000u). Guards: null historian → "Historian disabled" (symmetric with other history paths); empty timestamp array short-circuits to Success=true, Values=[] without an SDK round-trip; SDK exception → Success=false with the message chained. Proxy-side IHistoryProvider.ReadAtTimeAsync capability doesn't exist in Core.Abstractions yet (OPC UA HistoryReadAtTime service is supported but the current IHistoryProvider only has ReadRawAsync + ReadProcessedAsync) — this PR adds the Host-side surface so a future Core.Abstractions extension can wire it through without needing another IPC change. Tests (4 new): disabled-error when historian null, empty-timestamp short-circuit without SDK call, Unix-ms↔DateTime round-trip with Good samples at two distinct timestamps, missing sample (Quality=0) maps to 0x80000000u Bad category. Galaxy.Host.Tests Unit suite: 31 pass / 0 fail (4 new at-time + 27 pre-existing). Galaxy.Host builds clean. Branches off v2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:03:25 -04:00
8 changed files with 234 additions and 2 deletions

View File

@@ -136,6 +136,15 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadAtTimeResponse
{
Success = false,
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });

View File

@@ -39,6 +39,7 @@ public interface IGalaxyBackend
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(HistoryReadAtTimeRequest req, CancellationToken ct);
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
}

View File

@@ -324,6 +324,42 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
}
}
public async Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
{
if (_historian is null)
return new HistoryReadAtTimeResponse
{
Success = false,
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
Values = Array.Empty<GalaxyDataValue>(),
};
if (req.TimestampsUtcUnixMs.Length == 0)
return new HistoryReadAtTimeResponse { Success = true, Values = Array.Empty<GalaxyDataValue>() };
var timestamps = req.TimestampsUtcUnixMs
.Select(ms => DateTimeOffset.FromUnixTimeMilliseconds(ms).UtcDateTime)
.ToArray();
try
{
var samples = await _historian.ReadAtTimeAsync(req.TagReference, timestamps, ct).ConfigureAwait(false);
var wire = samples.Select(s => ToWire(req.TagReference, s)).ToArray();
return new HistoryReadAtTimeResponse { Success = true, Values = wire };
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
return new HistoryReadAtTimeResponse
{
Success = false,
Error = $"Historian at-time read failed: {ex.Message}",
Values = Array.Empty<GalaxyDataValue>(),
};
}
}
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });

View File

@@ -94,6 +94,15 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(
HistoryReadAtTimeRequest req, CancellationToken ct)
=> Task.FromResult(new HistoryReadAtTimeResponse
{
Success = false,
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
Values = System.Array.Empty<GalaxyDataValue>(),
});
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
=> Task.FromResult(new RecycleStatusResponse
{

View File

@@ -87,6 +87,13 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
return;
}
case MessageKind.HistoryReadAtTimeRequest:
{
var resp = await backend.HistoryReadAtTimeAsync(
Deserialize<HistoryReadAtTimeRequest>(body), ct);
await writer.WriteAsync(MessageKind.HistoryReadAtTimeResponse, resp, ct);
return;
}
case MessageKind.RecycleHostRequest:
{
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);

View File

@@ -48,10 +48,12 @@ public enum MessageKind : byte
AlarmEvent = 0x51,
AlarmAckRequest = 0x52,
HistoryReadRequest = 0x60,
HistoryReadResponse = 0x61,
HistoryReadRequest = 0x60,
HistoryReadResponse = 0x61,
HistoryReadProcessedRequest = 0x62,
HistoryReadProcessedResponse = 0x63,
HistoryReadAtTimeRequest = 0x64,
HistoryReadAtTimeResponse = 0x65,
HostConnectivityStatus = 0x70,
RuntimeStatusChange = 0x71,

View File

@@ -50,3 +50,24 @@ public sealed class HistoryReadProcessedResponse
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
}
/// <summary>
/// At-time historian read — OPC UA HistoryReadAtTime service. Returns one sample per
/// requested timestamp (interpolated when no exact match exists). The per-timestamp array
/// is flow-encoded as Unix milliseconds to avoid MessagePack DateTime quirks.
/// </summary>
[MessagePackObject]
public sealed class HistoryReadAtTimeRequest
{
[Key(0)] public long SessionId { get; set; }
[Key(1)] public string TagReference { get; set; } = string.Empty;
[Key(2)] public long[] TimestampsUtcUnixMs { get; set; } = System.Array.Empty<long>();
}
[MessagePackObject]
public sealed class HistoryReadAtTimeResponse
{
[Key(0)] public bool Success { get; set; }
[Key(1)] public string? Error { get; set; }
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
}

View File

@@ -0,0 +1,147 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class HistoryReadAtTimeTests
{
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? historian, StaPump pump) =>
new(
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
new MxAccessClient(pump, new MxProxyAdapter(), "attime-test"),
historian);
[Fact]
public async Task Returns_disabled_error_when_no_historian_configured()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
using var backend = BuildBackend(null, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = new[] { 1L, 2L },
}, CancellationToken.None);
resp.Success.ShouldBeFalse();
resp.Error.ShouldContain("Historian disabled");
}
[Fact]
public async Task Empty_timestamp_list_short_circuits_to_success_with_no_values()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var fake = new FakeHistorian();
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = Array.Empty<long>(),
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.ShouldBeEmpty();
fake.Calls.ShouldBe(0); // no round-trip to SDK for empty timestamp list
}
[Fact]
public async Task Timestamps_survive_Unix_ms_round_trip_to_DateTime()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
var t1 = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var t2 = new DateTime(2026, 4, 18, 10, 5, 0, DateTimeKind.Utc);
var fake = new FakeHistorian(
new HistorianSample { Value = 100.0, Quality = 192, TimestampUtc = t1 },
new HistorianSample { Value = 101.5, Quality = 192, TimestampUtc = t2 });
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "TankLevel",
TimestampsUtcUnixMs = new[]
{
new DateTimeOffset(t1, TimeSpan.Zero).ToUnixTimeMilliseconds(),
new DateTimeOffset(t2, TimeSpan.Zero).ToUnixTimeMilliseconds(),
},
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(2);
resp.Values[0].SourceTimestampUtcUnixMs.ShouldBe(new DateTimeOffset(t1, TimeSpan.Zero).ToUnixTimeMilliseconds());
resp.Values[0].StatusCode.ShouldBe(0u); // Good (quality 192)
MessagePackSerializer.Deserialize<double>(resp.Values[0].ValueBytes!).ShouldBe(100.0);
fake.Calls.ShouldBe(1);
fake.LastTimestamps.Length.ShouldBe(2);
fake.LastTimestamps[0].ShouldBe(t1);
fake.LastTimestamps[1].ShouldBe(t2);
}
[Fact]
public async Task Missing_sample_maps_to_Bad_category()
{
using var pump = new StaPump("Test.Sta");
await pump.WaitForStartedAsync();
// Quality=0 means no sample at that timestamp per HistorianDataSource.ReadAtTimeAsync.
var fake = new FakeHistorian(new HistorianSample
{
Value = null,
Quality = 0,
TimestampUtc = DateTime.UtcNow,
});
using var backend = BuildBackend(fake, pump);
var resp = await backend.HistoryReadAtTimeAsync(new HistoryReadAtTimeRequest
{
TagReference = "T",
TimestampsUtcUnixMs = new[] { 1L },
}, CancellationToken.None);
resp.Success.ShouldBeTrue();
resp.Values.Length.ShouldBe(1);
resp.Values[0].StatusCode.ShouldBe(0x80000000u); // Bad category
resp.Values[0].ValueBytes.ShouldBeNull();
}
private sealed class FakeHistorian : IHistorianDataSource
{
private readonly HistorianSample[] _samples;
public int Calls { get; private set; }
public DateTime[] LastTimestamps { get; private set; } = Array.Empty<DateTime>();
public FakeHistorian(params HistorianSample[] samples) => _samples = samples;
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
{
Calls++;
LastTimestamps = ts;
return Task.FromResult(new List<HistorianSample>(_samples));
}
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianSample>());
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
=> Task.FromResult(new List<HistorianAggregateSample>());
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
=> Task.FromResult(new List<HistorianEventDto>());
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
}