feat(historian-gateway): read cutover — AddServerHistorian builds GatewayHistorianDataSource

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
Joseph Doherty
2026-06-26 17:07:59 -04:00
parent 1d5fa8230e
commit 36f7c3c5bf
7 changed files with 213 additions and 9 deletions
@@ -0,0 +1,42 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
/// <summary>
/// Host-callable factory that builds the gateway-backed server-side HistoryRead data source. The
/// Host's <c>AddServerHistorian</c> wiring supplies <see cref="CreateDataSource"/> as its
/// <c>Func&lt;ServerHistorianOptions, IServiceProvider, IHistorianDataSource&gt;</c>, keeping the
/// concrete package-client dependency inside this driver project (the Host references only the
/// driver, not the package client directly).
/// </summary>
public static class GatewayHistorian
{
/// <summary>
/// Builds a <see cref="GatewayHistorianDataSource"/> over a lazily connected
/// <see cref="HistorianGatewayClientAdapter"/> mapped from the bound
/// <see cref="ServerHistorianOptions"/>. Resolves an <see cref="ILoggerFactory"/> and the data
/// source's <see cref="ILogger{TCategoryName}"/> from <paramref name="services"/>, falling back to
/// the null implementations when absent (e.g. minimal test providers). Performs no network I/O —
/// the underlying channel dials on first use.
/// </summary>
/// <param name="options">The bound <c>ServerHistorian</c> configuration.</param>
/// <param name="services">The resolving service provider (used only to locate logging services).</param>
/// <returns>The gateway-backed <see cref="IHistorianDataSource"/>.</returns>
public static IHistorianDataSource CreateDataSource(ServerHistorianOptions options, IServiceProvider services)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(services);
var loggerFactory = services.GetService<ILoggerFactory>() ?? NullLoggerFactory.Instance;
var logger = services.GetService<ILogger<GatewayHistorianDataSource>>()
?? NullLogger<GatewayHistorianDataSource>.Instance;
return new GatewayHistorianDataSource(
HistorianGatewayClientAdapter.Create(options, loggerFactory),
logger);
}
}
@@ -0,0 +1,126 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.HistorianGateway.Client;
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
/// <summary>
/// Concrete <see cref="IHistorianGatewayClient"/> backed by the published
/// <see cref="HistorianGatewayClient"/> package client. Each seam method forwards directly to the
/// matching client wrapper — both sides speak the same generated <c>historian_gateway.v1</c> proto
/// types, so no shape translation happens here. The package client's typed exception hierarchy
/// (<c>HistorianGatewayUnavailableException</c> et al.) is allowed to surface unchanged; the
/// <see cref="GatewayHistorianDataSource"/> records it as a health failure and the node manager
/// turns it into a Bad HistoryRead result.
/// </summary>
/// <remarks>
/// <para>
/// <b>Lazy channel.</b> <see cref="Create"/> calls <see cref="HistorianGatewayClient.Create"/>,
/// which constructs a <c>GrpcChannel</c> over a <c>SocketsHttpHandler</c> without opening a
/// connection — the first RPC dials. Constructing the adapter therefore performs no network I/O,
/// which the offline seam tests rely on (they build from bogus endpoints and must not connect).
/// </para>
/// </remarks>
public sealed class HistorianGatewayClientAdapter : IHistorianGatewayClient, IDisposable
{
private readonly HistorianGatewayClient _inner;
private HistorianGatewayClientAdapter(HistorianGatewayClient inner) => _inner = inner;
/// <summary>
/// Builds an adapter over a freshly created package client mapped from the bound
/// <see cref="ServerHistorianOptions"/>. No connection is opened (lazy channel).
/// </summary>
/// <param name="options">The bound <c>ServerHistorian</c> configuration (endpoint, key, TLS posture).</param>
/// <param name="loggerFactory">Logger factory threaded into the package client's channel diagnostics.</param>
/// <returns>A ready-to-use adapter whose underlying channel has not yet dialed the gateway.</returns>
public static HistorianGatewayClientAdapter Create(ServerHistorianOptions options, ILoggerFactory loggerFactory)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(loggerFactory);
var clientOptions = new HistorianGatewayClientOptions
{
Endpoint = new Uri(options.Endpoint),
ApiKey = options.ApiKey,
UseTls = options.UseTls,
CaCertificatePath = options.CaCertificatePath,
// INVERTED mapping: ServerHistorianOptions.AllowUntrustedServerCertificate (opt-in to accept
// a self-signed cert) is the negation of the client's RequireCertificateValidation. Allowing
// an untrusted cert == not requiring validation; a pinned CaCertificatePath always verifies.
RequireCertificateValidation = !options.AllowUntrustedServerCertificate,
DefaultCallTimeout = options.CallTimeout,
LoggerFactory = loggerFactory,
};
return new HistorianGatewayClientAdapter(HistorianGatewayClient.Create(clientOptions));
}
/// <inheritdoc />
public IAsyncEnumerable<HistorianSample> ReadRawAsync(
string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken ct) =>
_inner.ReadRawAsync(tag, startUtc, endUtc, maxValues, ct);
/// <inheritdoc />
public IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(
string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken ct) =>
_inner.ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, ct);
/// <inheritdoc />
public Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(
string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken ct) =>
_inner.ReadAtTimeAsync(tag, timestampsUtc, ct);
/// <inheritdoc />
/// <remarks>
/// <paramref name="sourceName"/> is rendered into the gateway's one server-filterable predicate —
/// a <c>Source_Object</c> <see cref="HistorianEventComparison.Equal"/> filter the SQL ReadEvents
/// path binds as <c>WHERE Source_Object = @source</c>. A <c>null</c> source passes a null filter
/// (full window). <paramref name="maxEvents"/> is intentionally ignored here: the gateway wire
/// contract carries no per-call cap, so the cap is enforced upstream by
/// <see cref="GatewayHistorianDataSource"/> via early stream termination.
/// </remarks>
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, CancellationToken ct)
{
HistorianEventFilter? filter = sourceName is null
? null
: new HistorianEventFilter
{
PropertyName = "Source_Object",
Comparison = HistorianEventComparison.Equal,
Value = sourceName,
};
return _inner.ReadEventsAsync(startUtc, endUtc, filter, ct);
}
/// <inheritdoc />
public Task<WriteAck> WriteLiveValuesAsync(
string tag, IReadOnlyList<HistorianLiveValue> values, CancellationToken ct) =>
_inner.WriteLiveValuesAsync(tag, values, ct);
/// <inheritdoc />
public Task<WriteAck> SendEventAsync(HistorianEvent evt, CancellationToken ct) =>
_inner.SendEventAsync(evt, ct);
/// <inheritdoc />
public Task<TagOperationResults> EnsureTagsAsync(
IReadOnlyList<HistorianTagDefinition> definitions, CancellationToken ct) =>
_inner.EnsureTagsAsync(definitions, ct);
/// <inheritdoc />
public Task<bool> ProbeAsync(CancellationToken ct) => _inner.ProbeAsync(ct);
/// <inheritdoc />
public Task<ConnectionStatus> GetConnectionStatusAsync(CancellationToken ct) =>
_inner.GetConnectionStatusAsync(ct);
/// <summary>Disposes the underlying package client (and its channel). Prefer <see cref="DisposeAsync"/>.</summary>
public void Dispose() => _inner.Dispose();
/// <summary>Asynchronously disposes the underlying package client (and its channel).</summary>
/// <returns>A task that completes when the client has been disposed.</returns>
public ValueTask DisposeAsync() => _inner.DisposeAsync();
}
@@ -11,6 +11,11 @@
<ItemGroup>
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
<!-- Runtime owns ServerHistorianOptions (the bound ServerHistorian config). The read-cutover
factory (GatewayHistorian.CreateDataSource / HistorianGatewayClientAdapter.Create) maps
those options onto the package client, so the driver references it. Runtime references no
driver, so this is a diamond (Host -> {Runtime, Gateway}, Gateway -> Runtime), not a cycle. -->
<ProjectReference Include="..\..\Server\ZB.MOM.WW.OtOpcUa.Runtime\ZB.MOM.WW.OtOpcUa.Runtime.csproj"/>
</ItemGroup>
<ItemGroup>
+5 -9
View File
@@ -22,6 +22,7 @@ using ZB.MOM.WW.OtOpcUa.Host.Health;
using ZB.MOM.WW.OtOpcUa.Host.Logging;
using ZB.MOM.WW.OtOpcUa.Host.Observability;
using ZB.MOM.WW.OtOpcUa.Host.OpcUa;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
@@ -109,17 +110,12 @@ if (hasDriver)
// Config-gated server-side HistoryRead backend. When the ServerHistorian section is enabled this
// overrides the NullHistorianDataSource default from AddOtOpcUaRuntime (last registration wins) with
// a read-only WonderwareHistorianClient the node manager's HistoryRead overrides block-bridge to.
// The client is supplied here because the Host is the only project that references the Wonderware
// client — Runtime owns the gating, the Host supplies the concrete read downstream.
// a read-only HistorianGateway-backed data source the node manager's HistoryRead overrides
// block-bridge to. The factory lives in the Gateway driver (which owns the package-client adapter
// and the ServerHistorianOptions -> client-options mapping); Runtime owns the gating.
builder.Services.AddServerHistorian(
builder.Configuration,
(opts, sp) => new WonderwareHistorianClient(
new WonderwareHistorianClientOptions(opts.Host, opts.Port, opts.SharedSecret)
{
UseTls = opts.UseTls, ServerCertThumbprint = opts.ServerCertThumbprint,
},
sp.GetService<ILogger<WonderwareHistorianClient>>()));
(opts, sp) => GatewayHistorian.CreateDataSource(opts, sp));
// Bind every cross-platform driver factory before AddAkka resolves IDriverFactory — replaces
// the F7-default NullDriverFactory with a real DriverFactoryRegistryAdapter so DriverHostActor
@@ -61,6 +61,7 @@
<ProjectReference Include="..\..\Drivers\ZB.MOM.WW.OtOpcUa.Driver.AbLegacy\ZB.MOM.WW.OtOpcUa.Driver.AbLegacy.csproj"/>
<ProjectReference Include="..\..\Drivers\ZB.MOM.WW.OtOpcUa.Driver.FOCAS\ZB.MOM.WW.OtOpcUa.Driver.FOCAS.csproj"/>
<ProjectReference Include="..\..\Drivers\ZB.MOM.WW.OtOpcUa.Driver.Galaxy\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.csproj"/>
<ProjectReference Include="..\..\Drivers\ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway\ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj"/>
<ProjectReference Include="..\..\Drivers\ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client\ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.csproj"/>
<ProjectReference Include="..\..\Drivers\ZB.MOM.WW.OtOpcUa.Driver.Modbus\ZB.MOM.WW.OtOpcUa.Driver.Modbus.csproj"/>
<ProjectReference Include="..\..\Drivers\ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient\ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.csproj"/>