Closes Stream C per docs/v2/implementation/phase-6-1-resilience-and-observability.md. Core.Observability (new namespace): - DriverHealthReport — pure-function aggregation over DriverHealthSnapshot list. Empty fleet = Healthy. Any Faulted = Faulted. Any Unknown/Initializing (no Faulted) = NotReady. Any Degraded or Reconnecting (no Faulted, no NotReady) = Degraded. Else Healthy. HttpStatus(verdict) maps to the Stream C.1 state matrix: Healthy/Degraded → 200, NotReady/Faulted → 503. - LogContextEnricher — Serilog LogContext wrapper. Push(id, type, capability, correlationId) returns an IDisposable scope; inner log calls carry DriverInstanceId / DriverType / CapabilityName / CorrelationId structured properties automatically. NewCorrelationId = 12-hex-char GUID slice for cases where no OPC UA RequestHeader.RequestHandle is in flight. CapabilityInvoker — now threads LogContextEnricher around every ExecuteAsync / ExecuteWriteAsync call site. OtOpcUaServer passes driver.DriverType through so logs correlate to the driver type too. Every capability call emits structured fields per the Stream C.4 compliance check. Server.Observability: - HealthEndpointsHost — standalone HttpListener on http://localhost:4841/ (loopback avoids Windows URL-ACL elevation; remote probing via reverse proxy or explicit netsh urlacl grant). Routes: /healthz → 200 when (configDbReachable OR usingStaleConfig); 503 otherwise. Body: status, uptimeSeconds, configDbReachable, usingStaleConfig. /readyz → DriverHealthReport.Aggregate + HttpStatus mapping. Body: verdict, drivers[], degradedDrivers[], uptimeSeconds. anything else → 404. Disposal cooperative with the HttpListener shutdown. - OpcUaApplicationHost starts the health host after the OPC UA server comes up and disposes it on shutdown. New OpcUaServerOptions knobs: HealthEndpointsEnabled (default true), HealthEndpointsPrefix (default http://localhost:4841/). Program.cs: - Serilog pipeline adds Enrich.FromLogContext + opt-in JSON file sink via `Serilog:WriteJson = true` appsetting. Uses Serilog.Formatting.Compact's CompactJsonFormatter (one JSON object per line — SIEMs like Splunk, Datadog, Graylog ingest without a regex parser). Server.Tests: - Existing 3 OpcUaApplicationHost integration tests now set HealthEndpointsEnabled=false to avoid port :4841 collisions under parallel execution. - New HealthEndpointsHostTests (9): /healthz healthy empty fleet; stale-config returns 200 with flag; unreachable+no-cache returns 503; /readyz empty/ Healthy/Faulted/Degraded/Initializing drivers return correct status and bodies; unknown path → 404. Uses ephemeral ports via Interlocked counter. Core.Tests: - DriverHealthReportTests (8): empty fleet, all-healthy, any-Faulted trumps, any-NotReady without Faulted, Degraded without Faulted/NotReady, HttpStatus per-verdict theory. - LogContextEnricherTests (8): all 4 properties attach; scope disposes cleanly; NewCorrelationId shape; null/whitespace driverInstanceId throws. - CapabilityInvokerEnrichmentTests (2): inner logs carry structured properties; no context leak outside the call site. Full solution dotnet test: 1016 passing (baseline 906, +110 for Phase 6.1 so far across Streams A+B+C). Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
357 lines
17 KiB
C#
357 lines
17 KiB
C#
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Opc.Ua;
|
|
using Opc.Ua.Client;
|
|
using Opc.Ua.Configuration;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
|
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
|
|
using ZB.MOM.WW.OtOpcUa.Server.Security;
|
|
// Core.Abstractions.HistoryReadResult (driver-side samples) collides with Opc.Ua.HistoryReadResult
|
|
// (service-layer per-node result). Alias the driver type so the stub's interface implementations
|
|
// are unambiguous.
|
|
using DriverHistoryReadResult = ZB.MOM.WW.OtOpcUa.Core.Abstractions.HistoryReadResult;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
|
|
|
/// <summary>
|
|
/// End-to-end test that a real OPC UA client's HistoryRead service reaches a fake driver's
|
|
/// <see cref="IHistoryProvider"/> via <see cref="DriverNodeManager"/>'s
|
|
/// <c>HistoryReadRawModified</c> / <c>HistoryReadProcessed</c> / <c>HistoryReadAtTime</c> /
|
|
/// <c>HistoryReadEvents</c> overrides. Boots the full OPC UA stack + a stub
|
|
/// <see cref="IHistoryProvider"/> driver, opens a client session, issues each HistoryRead
|
|
/// variant, and asserts the client receives the expected per-kind payload.
|
|
/// </summary>
|
|
[Trait("Category", "Integration")]
|
|
public sealed class HistoryReadIntegrationTests : IAsyncLifetime
|
|
{
|
|
private static readonly int Port = 48600 + Random.Shared.Next(0, 99);
|
|
private readonly string _endpoint = $"opc.tcp://localhost:{Port}/OtOpcUaHistoryTest";
|
|
private readonly string _pkiRoot = Path.Combine(Path.GetTempPath(), $"otopcua-history-test-{Guid.NewGuid():N}");
|
|
|
|
private DriverHost _driverHost = null!;
|
|
private OpcUaApplicationHost _server = null!;
|
|
private HistoryDriver _driver = null!;
|
|
|
|
public async ValueTask InitializeAsync()
|
|
{
|
|
_driverHost = new DriverHost();
|
|
_driver = new HistoryDriver();
|
|
await _driverHost.RegisterAsync(_driver, "{}", CancellationToken.None);
|
|
|
|
var options = new OpcUaServerOptions
|
|
{
|
|
EndpointUrl = _endpoint,
|
|
ApplicationName = "OtOpcUaHistoryTest",
|
|
ApplicationUri = "urn:OtOpcUa:Server:HistoryTest",
|
|
PkiStoreRoot = _pkiRoot,
|
|
AutoAcceptUntrustedClientCertificates = true, HealthEndpointsEnabled = false,
|
|
};
|
|
|
|
_server = new OpcUaApplicationHost(options, _driverHost, new DenyAllUserAuthenticator(),
|
|
NullLoggerFactory.Instance, NullLogger<OpcUaApplicationHost>.Instance);
|
|
await _server.StartAsync(CancellationToken.None);
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _server.DisposeAsync();
|
|
await _driverHost.DisposeAsync();
|
|
try { Directory.Delete(_pkiRoot, recursive: true); } catch { /* best-effort */ }
|
|
}
|
|
|
|
[Fact]
|
|
public async Task HistoryReadRaw_round_trips_driver_samples_to_the_client()
|
|
{
|
|
using var session = await OpenSessionAsync();
|
|
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
|
|
var nodeId = new NodeId("raw.var", nsIndex);
|
|
|
|
// The Opc.Ua client exposes HistoryRead via Session.HistoryRead. We construct a
|
|
// ReadRawModifiedDetails (IsReadModified=false → raw path) and a single
|
|
// HistoryReadValueId targeting the driver-backed variable.
|
|
var details = new ReadRawModifiedDetails
|
|
{
|
|
StartTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
|
EndTime = new DateTime(2024, 1, 1, 0, 0, 10, DateTimeKind.Utc),
|
|
NumValuesPerNode = 100,
|
|
IsReadModified = false,
|
|
ReturnBounds = false,
|
|
};
|
|
var extObj = new ExtensionObject(details);
|
|
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
|
|
|
|
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
|
|
out var results, out _);
|
|
|
|
results.Count.ShouldBe(1);
|
|
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good, $"HistoryReadRaw returned {results[0].StatusCode}");
|
|
var hd = (HistoryData)ExtensionObject.ToEncodeable(results[0].HistoryData);
|
|
hd.DataValues.Count.ShouldBe(_driver.RawSamplesReturned, "one DataValue per driver sample");
|
|
hd.DataValues[0].Value.ShouldBe(_driver.FirstRawValue);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task HistoryReadProcessed_maps_Average_aggregate_and_routes_to_ReadProcessedAsync()
|
|
{
|
|
using var session = await OpenSessionAsync();
|
|
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
|
|
var nodeId = new NodeId("proc.var", nsIndex);
|
|
|
|
var details = new ReadProcessedDetails
|
|
{
|
|
StartTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
|
EndTime = new DateTime(2024, 1, 1, 0, 1, 0, DateTimeKind.Utc),
|
|
ProcessingInterval = 10_000, // 10s buckets
|
|
AggregateType = [ObjectIds.AggregateFunction_Average],
|
|
};
|
|
var extObj = new ExtensionObject(details);
|
|
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
|
|
|
|
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
|
|
out var results, out _);
|
|
|
|
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good);
|
|
_driver.LastProcessedAggregate.ShouldBe(HistoryAggregateType.Average,
|
|
"MapAggregate must translate ObjectIds.AggregateFunction_Average → driver enum");
|
|
_driver.LastProcessedInterval.ShouldBe(TimeSpan.FromSeconds(10));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task HistoryReadProcessed_returns_BadAggregateNotSupported_for_unmapped_aggregate()
|
|
{
|
|
using var session = await OpenSessionAsync();
|
|
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
|
|
var nodeId = new NodeId("proc.var", nsIndex);
|
|
|
|
var details = new ReadProcessedDetails
|
|
{
|
|
StartTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
|
EndTime = new DateTime(2024, 1, 1, 0, 1, 0, DateTimeKind.Utc),
|
|
ProcessingInterval = 10_000,
|
|
// TimeAverage is a valid OPC UA aggregate NodeId but not one the driver implements —
|
|
// the override returns BadAggregateNotSupported per Part 13 rather than coercing.
|
|
AggregateType = [ObjectIds.AggregateFunction_TimeAverage],
|
|
};
|
|
var extObj = new ExtensionObject(details);
|
|
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
|
|
|
|
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
|
|
out var results, out _);
|
|
|
|
results[0].StatusCode.Code.ShouldBe(StatusCodes.BadAggregateNotSupported);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task HistoryReadAtTime_forwards_timestamp_list_to_driver()
|
|
{
|
|
using var session = await OpenSessionAsync();
|
|
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
|
|
var nodeId = new NodeId("atTime.var", nsIndex);
|
|
|
|
var t1 = new DateTime(2024, 3, 1, 10, 0, 0, DateTimeKind.Utc);
|
|
var t2 = new DateTime(2024, 3, 1, 10, 0, 30, DateTimeKind.Utc);
|
|
var details = new ReadAtTimeDetails { ReqTimes = new DateTimeCollection { t1, t2 } };
|
|
var extObj = new ExtensionObject(details);
|
|
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
|
|
|
|
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
|
|
out var results, out _);
|
|
|
|
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good);
|
|
_driver.LastAtTimeRequestedTimes.ShouldNotBeNull();
|
|
_driver.LastAtTimeRequestedTimes!.Count.ShouldBe(2);
|
|
_driver.LastAtTimeRequestedTimes[0].ShouldBe(t1);
|
|
_driver.LastAtTimeRequestedTimes[1].ShouldBe(t2);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task HistoryReadEvents_returns_HistoryEvent_with_BaseEventType_field_list()
|
|
{
|
|
using var session = await OpenSessionAsync();
|
|
// Events target the driver-root notifier (not a specific variable) which is the
|
|
// conventional pattern for alarm-history browse.
|
|
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
|
|
var nodeId = new NodeId("history-driver", nsIndex);
|
|
|
|
// EventFilter must carry at least one SelectClause or the stack rejects it as
|
|
// BadEventFilterInvalid before our override runs — empty filters are spec-forbidden.
|
|
// We populate the standard BaseEventType selectors any real client would send; my
|
|
// override's BuildHistoryEvent ignores the specific clauses and emits the canonical
|
|
// field list anyway (the richer "respect exact SelectClauses" behavior is on the PR 38
|
|
// follow-up list).
|
|
var filter = new EventFilter();
|
|
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.EventId);
|
|
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.SourceName);
|
|
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Message);
|
|
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Severity);
|
|
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Time);
|
|
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.ReceiveTime);
|
|
|
|
var details = new ReadEventDetails
|
|
{
|
|
StartTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
|
EndTime = new DateTime(2024, 12, 31, 0, 0, 0, DateTimeKind.Utc),
|
|
NumValuesPerNode = 10,
|
|
Filter = filter,
|
|
};
|
|
var extObj = new ExtensionObject(details);
|
|
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
|
|
|
|
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
|
|
out var results, out _);
|
|
|
|
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good);
|
|
var he = (HistoryEvent)ExtensionObject.ToEncodeable(results[0].HistoryData);
|
|
he.Events.Count.ShouldBe(_driver.EventsReturned);
|
|
he.Events[0].EventFields.Count.ShouldBe(6, "BaseEventType default field layout is 6 entries");
|
|
}
|
|
|
|
private async Task<ISession> OpenSessionAsync()
|
|
{
|
|
var cfg = new ApplicationConfiguration
|
|
{
|
|
ApplicationName = "OtOpcUaHistoryTestClient",
|
|
ApplicationUri = "urn:OtOpcUa:HistoryTestClient",
|
|
ApplicationType = ApplicationType.Client,
|
|
SecurityConfiguration = new SecurityConfiguration
|
|
{
|
|
ApplicationCertificate = new CertificateIdentifier
|
|
{
|
|
StoreType = CertificateStoreType.Directory,
|
|
StorePath = Path.Combine(_pkiRoot, "client-own"),
|
|
SubjectName = "CN=OtOpcUaHistoryTestClient",
|
|
},
|
|
TrustedIssuerCertificates = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-issuers") },
|
|
TrustedPeerCertificates = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-trusted") },
|
|
RejectedCertificateStore = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-rejected") },
|
|
AutoAcceptUntrustedCertificates = true,
|
|
AddAppCertToTrustedStore = true,
|
|
},
|
|
TransportConfigurations = new TransportConfigurationCollection(),
|
|
TransportQuotas = new TransportQuotas { OperationTimeout = 15000 },
|
|
ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },
|
|
};
|
|
await cfg.Validate(ApplicationType.Client);
|
|
cfg.CertificateValidator.CertificateValidation += (_, e) => e.Accept = true;
|
|
|
|
var instance = new ApplicationInstance { ApplicationConfiguration = cfg, ApplicationType = ApplicationType.Client };
|
|
await instance.CheckApplicationInstanceCertificate(true, CertificateFactory.DefaultKeySize);
|
|
|
|
var selected = CoreClientUtils.SelectEndpoint(cfg, _endpoint, useSecurity: false);
|
|
var endpointConfig = EndpointConfiguration.Create(cfg);
|
|
var configuredEndpoint = new ConfiguredEndpoint(null, selected, endpointConfig);
|
|
|
|
return await Session.Create(cfg, configuredEndpoint, false, "OtOpcUaHistoryTestClientSession", 60000,
|
|
new UserIdentity(new AnonymousIdentityToken()), null);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Stub driver that implements <see cref="IHistoryProvider"/> so the service dispatch
|
|
/// can be verified without bringing up a real Galaxy or Historian. Captures the last-
|
|
/// seen arguments so tests can assert what the service handler forwarded.
|
|
/// </summary>
|
|
private sealed class HistoryDriver : IDriver, ITagDiscovery, IReadable, IHistoryProvider
|
|
{
|
|
public string DriverInstanceId => "history-driver";
|
|
public string DriverType => "HistoryStub";
|
|
|
|
public int RawSamplesReturned => 3;
|
|
public int FirstRawValue => 100;
|
|
public int EventsReturned => 2;
|
|
|
|
public HistoryAggregateType? LastProcessedAggregate { get; private set; }
|
|
public TimeSpan? LastProcessedInterval { get; private set; }
|
|
public IReadOnlyList<DateTime>? LastAtTimeRequestedTimes { get; private set; }
|
|
|
|
public Task InitializeAsync(string driverConfigJson, CancellationToken ct) => Task.CompletedTask;
|
|
public Task ReinitializeAsync(string driverConfigJson, CancellationToken ct) => Task.CompletedTask;
|
|
public Task ShutdownAsync(CancellationToken ct) => Task.CompletedTask;
|
|
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
|
|
public long GetMemoryFootprint() => 0;
|
|
public Task FlushOptionalCachesAsync(CancellationToken ct) => Task.CompletedTask;
|
|
|
|
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken ct)
|
|
{
|
|
// Every variable must be Historized for HistoryRead to route — the node-manager's
|
|
// stack base class checks the bit before dispatching.
|
|
builder.Variable("raw", "raw",
|
|
new DriverAttributeInfo("raw.var", DriverDataType.Int32, false, null,
|
|
SecurityClassification.FreeAccess, IsHistorized: true, IsAlarm: false));
|
|
builder.Variable("proc", "proc",
|
|
new DriverAttributeInfo("proc.var", DriverDataType.Float64, false, null,
|
|
SecurityClassification.FreeAccess, IsHistorized: true, IsAlarm: false));
|
|
builder.Variable("atTime", "atTime",
|
|
new DriverAttributeInfo("atTime.var", DriverDataType.Int32, false, null,
|
|
SecurityClassification.FreeAccess, IsHistorized: true, IsAlarm: false));
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
|
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
|
|
{
|
|
var now = DateTime.UtcNow;
|
|
IReadOnlyList<DataValueSnapshot> r =
|
|
[.. fullReferences.Select(_ => new DataValueSnapshot(0, 0u, now, now))];
|
|
return Task.FromResult(r);
|
|
}
|
|
|
|
public Task<DriverHistoryReadResult> ReadRawAsync(
|
|
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var samples = new List<DataValueSnapshot>();
|
|
for (var i = 0; i < RawSamplesReturned; i++)
|
|
{
|
|
samples.Add(new DataValueSnapshot(
|
|
Value: FirstRawValue + i,
|
|
StatusCode: StatusCodes.Good,
|
|
SourceTimestampUtc: startUtc.AddSeconds(i),
|
|
ServerTimestampUtc: startUtc.AddSeconds(i)));
|
|
}
|
|
return Task.FromResult(new DriverHistoryReadResult(samples, null));
|
|
}
|
|
|
|
public Task<DriverHistoryReadResult> ReadProcessedAsync(
|
|
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
|
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
|
{
|
|
LastProcessedAggregate = aggregate;
|
|
LastProcessedInterval = interval;
|
|
return Task.FromResult(new DriverHistoryReadResult(
|
|
[new DataValueSnapshot(1.0, StatusCodes.Good, startUtc, startUtc)],
|
|
null));
|
|
}
|
|
|
|
public Task<DriverHistoryReadResult> ReadAtTimeAsync(
|
|
string fullReference, IReadOnlyList<DateTime> timestampsUtc,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
LastAtTimeRequestedTimes = timestampsUtc;
|
|
var samples = timestampsUtc
|
|
.Select(t => new DataValueSnapshot(42, StatusCodes.Good, t, t))
|
|
.ToArray();
|
|
return Task.FromResult(new DriverHistoryReadResult(samples, null));
|
|
}
|
|
|
|
public Task<HistoricalEventsResult> ReadEventsAsync(
|
|
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var events = new List<HistoricalEvent>();
|
|
for (var i = 0; i < EventsReturned; i++)
|
|
{
|
|
events.Add(new HistoricalEvent(
|
|
EventId: $"e{i}",
|
|
SourceName: sourceName,
|
|
EventTimeUtc: startUtc.AddHours(i),
|
|
ReceivedTimeUtc: startUtc.AddHours(i).AddSeconds(1),
|
|
Message: $"Event {i}",
|
|
Severity: (ushort)(500 + i)));
|
|
}
|
|
return Task.FromResult(new HistoricalEventsResult(events, null));
|
|
}
|
|
}
|
|
}
|