Merge pull request 'Phase 2 PR 11 � HistoryReadEvents IPC (alarm history)' (#10) from phase-2-pr11-history-events into v2
This commit was merged in pull request #10.
This commit is contained in:
@@ -145,6 +145,15 @@ public sealed class DbBackedGalaxyBackend(GalaxyRepository repository) : IGalaxy
|
|||||||
Values = System.Array.Empty<GalaxyDataValue>(),
|
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||||
|
HistoryReadEventsRequest req, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "MXAccess + Historian code lift pending (Phase 2 Task B.1)",
|
||||||
|
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
|
||||||
|
});
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ public interface IGalaxyBackend
|
|||||||
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
|
Task<HistoryReadResponse> HistoryReadAsync(HistoryReadRequest req, CancellationToken ct);
|
||||||
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
|
Task<HistoryReadProcessedResponse> HistoryReadProcessedAsync(HistoryReadProcessedRequest req, CancellationToken ct);
|
||||||
Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(HistoryReadAtTimeRequest req, CancellationToken ct);
|
Task<HistoryReadAtTimeResponse> HistoryReadAtTimeAsync(HistoryReadAtTimeRequest req, CancellationToken ct);
|
||||||
|
Task<HistoryReadEventsResponse> HistoryReadEventsAsync(HistoryReadEventsRequest req, CancellationToken ct);
|
||||||
|
|
||||||
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
|
Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -360,6 +360,46 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||||
|
HistoryReadEventsRequest req, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (_historian is null)
|
||||||
|
return new HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "Historian disabled — no OTOPCUA_HISTORIAN_ENABLED configuration",
|
||||||
|
Events = Array.Empty<GalaxyHistoricalEvent>(),
|
||||||
|
};
|
||||||
|
|
||||||
|
var start = DateTimeOffset.FromUnixTimeMilliseconds(req.StartUtcUnixMs).UtcDateTime;
|
||||||
|
var end = DateTimeOffset.FromUnixTimeMilliseconds(req.EndUtcUnixMs).UtcDateTime;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var events = await _historian.ReadEventsAsync(req.SourceName, start, end, req.MaxEvents, ct).ConfigureAwait(false);
|
||||||
|
var wire = events.Select(e => new GalaxyHistoricalEvent
|
||||||
|
{
|
||||||
|
EventId = e.Id.ToString(),
|
||||||
|
SourceName = e.Source,
|
||||||
|
EventTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.EventTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
ReceivedTimeUtcUnixMs = new DateTimeOffset(DateTime.SpecifyKind(e.ReceivedTime, DateTimeKind.Utc), TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
DisplayText = e.DisplayText,
|
||||||
|
Severity = e.Severity,
|
||||||
|
}).ToArray();
|
||||||
|
return new HistoryReadEventsResponse { Success = true, Events = wire };
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return new HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = $"Historian event read failed: {ex.Message}",
|
||||||
|
Events = Array.Empty<GalaxyHistoricalEvent>(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
=> Task.FromResult(new RecycleStatusResponse { Accepted = true, GraceSeconds = 15 });
|
||||||
|
|
||||||
|
|||||||
@@ -103,6 +103,15 @@ public sealed class StubGalaxyBackend : IGalaxyBackend
|
|||||||
Values = System.Array.Empty<GalaxyDataValue>(),
|
Values = System.Array.Empty<GalaxyDataValue>(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
public Task<HistoryReadEventsResponse> HistoryReadEventsAsync(
|
||||||
|
HistoryReadEventsRequest req, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
Success = false,
|
||||||
|
Error = "stub: MXAccess code lift pending (Phase 2 Task B.1)",
|
||||||
|
Events = System.Array.Empty<GalaxyHistoricalEvent>(),
|
||||||
|
});
|
||||||
|
|
||||||
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
public Task<RecycleStatusResponse> RecycleAsync(RecycleHostRequest req, CancellationToken ct)
|
||||||
=> Task.FromResult(new RecycleStatusResponse
|
=> Task.FromResult(new RecycleStatusResponse
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -94,6 +94,13 @@ public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) :
|
|||||||
await writer.WriteAsync(MessageKind.HistoryReadAtTimeResponse, resp, ct);
|
await writer.WriteAsync(MessageKind.HistoryReadAtTimeResponse, resp, ct);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
case MessageKind.HistoryReadEventsRequest:
|
||||||
|
{
|
||||||
|
var resp = await backend.HistoryReadEventsAsync(
|
||||||
|
Deserialize<HistoryReadEventsRequest>(body), ct);
|
||||||
|
await writer.WriteAsync(MessageKind.HistoryReadEventsResponse, resp, ct);
|
||||||
|
return;
|
||||||
|
}
|
||||||
case MessageKind.RecycleHostRequest:
|
case MessageKind.RecycleHostRequest:
|
||||||
{
|
{
|
||||||
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
|
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
|
||||||
|
|||||||
@@ -54,6 +54,8 @@ public enum MessageKind : byte
|
|||||||
HistoryReadProcessedResponse = 0x63,
|
HistoryReadProcessedResponse = 0x63,
|
||||||
HistoryReadAtTimeRequest = 0x64,
|
HistoryReadAtTimeRequest = 0x64,
|
||||||
HistoryReadAtTimeResponse = 0x65,
|
HistoryReadAtTimeResponse = 0x65,
|
||||||
|
HistoryReadEventsRequest = 0x66,
|
||||||
|
HistoryReadEventsResponse = 0x67,
|
||||||
|
|
||||||
HostConnectivityStatus = 0x70,
|
HostConnectivityStatus = 0x70,
|
||||||
RuntimeStatusChange = 0x71,
|
RuntimeStatusChange = 0x71,
|
||||||
|
|||||||
@@ -71,3 +71,40 @@ public sealed class HistoryReadAtTimeResponse
|
|||||||
[Key(1)] public string? Error { get; set; }
|
[Key(1)] public string? Error { get; set; }
|
||||||
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
|
[Key(2)] public GalaxyDataValue[] Values { get; set; } = System.Array.Empty<GalaxyDataValue>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Historical events read — OPC UA HistoryReadEvents service and Alarm & Condition
|
||||||
|
/// history. <c>SourceName</c> null means "all sources". Distinct from the live
|
||||||
|
/// <see cref="GalaxyAlarmEvent"/> stream because historical rows carry both
|
||||||
|
/// <c>EventTime</c> (when the event occurred in the process) and <c>ReceivedTime</c>
|
||||||
|
/// (when the Historian persisted it) and have no StateTransition — the Historian logs
|
||||||
|
/// the instantaneous event, not the OPC UA alarm lifecycle.
|
||||||
|
/// </summary>
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
[Key(0)] public long SessionId { get; set; }
|
||||||
|
[Key(1)] public string? SourceName { get; set; }
|
||||||
|
[Key(2)] public long StartUtcUnixMs { get; set; }
|
||||||
|
[Key(3)] public long EndUtcUnixMs { get; set; }
|
||||||
|
[Key(4)] public int MaxEvents { get; set; } = 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class GalaxyHistoricalEvent
|
||||||
|
{
|
||||||
|
[Key(0)] public string EventId { get; set; } = string.Empty;
|
||||||
|
[Key(1)] public string? SourceName { get; set; }
|
||||||
|
[Key(2)] public long EventTimeUtcUnixMs { get; set; }
|
||||||
|
[Key(3)] public long ReceivedTimeUtcUnixMs { get; set; }
|
||||||
|
[Key(4)] public string? DisplayText { get; set; }
|
||||||
|
[Key(5)] public ushort Severity { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
[MessagePackObject]
|
||||||
|
public sealed class HistoryReadEventsResponse
|
||||||
|
{
|
||||||
|
[Key(0)] public bool Success { get; set; }
|
||||||
|
[Key(1)] public string? Error { get; set; }
|
||||||
|
[Key(2)] public GalaxyHistoricalEvent[] Events { get; set; } = System.Array.Empty<GalaxyHistoricalEvent>();
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,129 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class HistoryReadEventsTests
|
||||||
|
{
|
||||||
|
private static MxAccessGalaxyBackend BuildBackend(IHistorianDataSource? h, StaPump pump) =>
|
||||||
|
new(
|
||||||
|
new GalaxyRepository(new GalaxyRepositoryOptions { ConnectionString = "Server=.;Database=ZB;Integrated Security=True;" }),
|
||||||
|
new MxAccessClient(pump, new MxProxyAdapter(), "events-test"),
|
||||||
|
h);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Returns_disabled_error_when_no_historian_configured()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
using var backend = BuildBackend(null, pump);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
SourceName = "TankA",
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
MaxEvents = 100,
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeFalse();
|
||||||
|
resp.Error.ShouldContain("Historian disabled");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Maps_HistorianEventDto_to_GalaxyHistoricalEvent_wire_shape()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
|
||||||
|
var eventId = Guid.NewGuid();
|
||||||
|
var eventTime = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
var receivedTime = eventTime.AddMilliseconds(150);
|
||||||
|
var fake = new FakeHistorian(new HistorianEventDto
|
||||||
|
{
|
||||||
|
Id = eventId,
|
||||||
|
Source = "TankA.Level.HiHi",
|
||||||
|
EventTime = eventTime,
|
||||||
|
ReceivedTime = receivedTime,
|
||||||
|
DisplayText = "HiHi alarm tripped",
|
||||||
|
Severity = 900,
|
||||||
|
});
|
||||||
|
using var backend = BuildBackend(fake, pump);
|
||||||
|
|
||||||
|
var resp = await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
SourceName = "TankA.Level.HiHi",
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||||
|
MaxEvents = 50,
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
resp.Success.ShouldBeTrue();
|
||||||
|
resp.Events.Length.ShouldBe(1);
|
||||||
|
var got = resp.Events[0];
|
||||||
|
got.EventId.ShouldBe(eventId.ToString());
|
||||||
|
got.SourceName.ShouldBe("TankA.Level.HiHi");
|
||||||
|
got.DisplayText.ShouldBe("HiHi alarm tripped");
|
||||||
|
got.Severity.ShouldBe<ushort>(900);
|
||||||
|
got.EventTimeUtcUnixMs.ShouldBe(new DateTimeOffset(eventTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||||
|
got.ReceivedTimeUtcUnixMs.ShouldBe(new DateTimeOffset(receivedTime, TimeSpan.Zero).ToUnixTimeMilliseconds());
|
||||||
|
|
||||||
|
fake.LastSourceName.ShouldBe("TankA.Level.HiHi");
|
||||||
|
fake.LastMaxEvents.ShouldBe(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Null_source_name_is_passed_through_as_all_sources()
|
||||||
|
{
|
||||||
|
using var pump = new StaPump("Test.Sta");
|
||||||
|
await pump.WaitForStartedAsync();
|
||||||
|
var fake = new FakeHistorian();
|
||||||
|
using var backend = BuildBackend(fake, pump);
|
||||||
|
|
||||||
|
await backend.HistoryReadEventsAsync(new HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
SourceName = null,
|
||||||
|
StartUtcUnixMs = 0,
|
||||||
|
EndUtcUnixMs = 1,
|
||||||
|
MaxEvents = 10,
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
fake.LastSourceName.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeHistorian : IHistorianDataSource
|
||||||
|
{
|
||||||
|
private readonly HistorianEventDto[] _events;
|
||||||
|
public string? LastSourceName { get; private set; } = "<unset>";
|
||||||
|
public int LastMaxEvents { get; private set; }
|
||||||
|
|
||||||
|
public FakeHistorian(params HistorianEventDto[] events) => _events = events;
|
||||||
|
|
||||||
|
public Task<List<HistorianEventDto>> ReadEventsAsync(string? src, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
{
|
||||||
|
LastSourceName = src;
|
||||||
|
LastMaxEvents = max;
|
||||||
|
return Task.FromResult(new List<HistorianEventDto>(_events));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<List<HistorianSample>> ReadRawAsync(string tag, DateTime s, DateTime e, int max, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample>());
|
||||||
|
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tag, DateTime s, DateTime e, double ms, string col, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianAggregateSample>());
|
||||||
|
public Task<List<HistorianSample>> ReadAtTimeAsync(string tag, DateTime[] ts, CancellationToken ct)
|
||||||
|
=> Task.FromResult(new List<HistorianSample>());
|
||||||
|
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
||||||
|
public void Dispose() { }
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user