Files
lmxopcua/docs/plans/2026-06-26-otopcua-historian-gateway-integration.md
T

86 KiB
Raw Blame History

OtOpcUa ↔ HistorianGateway Integration — Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.

Goal: Make HistorianGateway the sole historian read/write backend for the OtOpcUa OPC UA server — replacing the bespoke Wonderware TCP/ArchestrA sidecar — via a new gateway-backed IHistorianDataSource + IAlarmHistorianWriter + IHistorianProvisioning, a continuous-historization recorder, and DI factory swaps.

Architecture: A new ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway driver project consumes the Gitea-feed ZB.MOM.WW.HistorianGateway.Client package (gRPC over historian_gateway.v1) behind a thin IHistorianGatewayClient seam so every adapter is unit-testable against a fake. Pure, matrix-guarded mappers (design §6) translate OPC UA / driver shapes to gateway proto shapes and back; the read adapter swaps into the existing AddServerHistorian factory (the independently-shippable read cutover), the alarm writer swaps into AddAlarmHistorian (behind the existing SqliteStoreAndForwardSink), and a new Akka recorder taps DependencyMuxActor value changes into a crash-safe FasterLog outbox that drains to WriteLiveValues. An EnsureTags provisioning hook fires non-blocking from AddressSpaceApplier.Apply().

Tech Stack: .NET 10, OPC Foundation UA .NET Standard, Akka.NET actors, Microsoft.FASTER.Core (FasterLog outbox), ZB.MOM.WW.HistorianGateway.Client (gRPC), xUnit.


Prerequisites & Constraints

Cross-plan dependency (hard gate). This plan DEPENDS ON the sibling gateway-client plan (docs/plans/2026-06-26-historian-gateway-client.md) having already published ZB.MOM.WW.HistorianGateway.Client and ZB.MOM.WW.HistorianGateway.Contracts to the Gitea feed (dohertj2-gitea). Phase A consumes those packages; do not start until they resolve. Confirm with: dotnet package search ZB.MOM.WW.HistorianGateway.Client --source dohertj2-gitea.

Gateway deployment prerequisites (no code here — runtime config of the target gateway). The gateway OtOpcUa points at MUST run with:

  • RuntimeDb:Enabled=true — enables the WriteLiveValues SQL live path (continuous historization).
  • RuntimeDb:EventReadsEnabled=true — enables ReadEvents from Runtime.dbo.Events (alarm-history reads; C2 SQL workaround).
  • An API key carrying scopes historian:read, historian:write, historian:tags:write.

The source-name filter on the SQL ReadEvents path is delivered by the gateway-client plan (its Task 14 / design §5.3); until it lands, ReadEventsAsync(sourceName, …) is time-range-only and filters client-side. T11 notes this.

Engineering constraints (every task).

  • Zero new warnings. Directory.Build.props enforces Nullable=enable, Platforms=x64, PlatformTarget=x64, and treats new warnings as build breaks. Fix, never suppress.
  • Secrets via environment only. The gateway API key, any connection strings, and TLS material are supplied via env vars (e.g. ServerHistorian__ApiKey). The committed appsettings defaults are blank or dev-only placeholders. Never commit a real key.
  • Redaction. No hostnames, credentials, API keys, or tag values in error messages or logs by default. Map gateway/gRPC exceptions to safe messages — mirror the existing WonderwareHistorianClient posture.
  • Gitea, not GitHub. origin is Gitea; gh will not work. Use git push to origin; open PRs via the Gitea API with GITEA_TOKEN.
  • Branch isolation. OtOpcUa's working tree is currently DIRTY from unrelated in-flight work. This plan runs on its OWN fresh branch off OtOpcUa main:
    cd ~/Desktop/OtOpcUa
    git stash list   # confirm you understand existing state; do NOT touch it
    git fetch origin
    git switch -c feat/historian-gateway-backend origin/main
    
    Do not entangle with the dirty tree. If origin/main is unavailable, branch off the local main after confirming it is clean of the in-flight work.
  • Commit-message trailer. Every commit message ends with: Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
  • Plan relocation. This file (and its .tasks.json) live in the HistorianGateway repo while authored; when execution starts they relocate into ~/Desktop/OtOpcUa/docs/plans/. All OtOpcUa paths below are absolute under ~/Desktop/OtOpcUa.
  • Build/test anchors. Solution is ~/Desktop/OtOpcUa/ZB.MOM.WW.OtOpcUa.slnx. Build: dotnet build ZB.MOM.WW.OtOpcUa.slnx. Test a single class: dotnet test --filter "FullyQualifiedName~<ClassName>".

Reference implementations to mirror (read before touching the matching task).

  • Quality→OPC-UA-StatusCode: src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/QualityMapper.cs (port the switch verbatim; the gateway HistorianSample.opc_quality is the same OPC DA quality byte).
  • Aggregate-mode mapping + Total handling + AlignAtTime + null→BadNoData: …Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs (MapAggregate, ReadProcessedAsync, AlignAtTimeSnapshots, ToAggregateSnapshots).
  • Health-counter discipline (single _healthLock, TotalSuccesses+TotalFailures==TotalQueries): same file, GetHealthSnapshot / RecordOutcome.
  • FasterLog outbox: ~/Desktop/HistorianGateway/src/ZB.MOM.WW.HistorianGateway.Server/Historian/FasterLogOutboxStore.cs (PerEntry/Periodic commit, append/peek/remove-truncate, RecoverState).

The IHistorianGatewayClient seam. All OtOpcUa adapters depend on a thin interface IHistorianGatewayClient (defined in T1, in the Gateway driver project) that exposes exactly the gateway operations OtOpcUa needs, in proto-typed terms:

// uses ZB.MOM.WW.HistorianGateway.Contracts.Grpc types from the Contracts package
public interface IHistorianGatewayClient : IAsyncDisposable
{
    IAsyncEnumerable<HistorianSample> ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken ct);
    IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken ct);
    Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken ct);
    IAsyncEnumerable<HistorianEvent> ReadEventsAsync(string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, CancellationToken ct);
    Task<WriteAck> WriteLiveValuesAsync(string tag, IReadOnlyList<HistorianLiveValue> values, CancellationToken ct);
    Task<WriteAck> SendEventAsync(HistorianEvent evt, CancellationToken ct);
    Task<TagOperationResults> EnsureTagsAsync(IReadOnlyList<HistorianTagDefinition> definitions, CancellationToken ct);
    Task<bool> ProbeAsync(CancellationToken ct);
    Task<ConnectionStatus> GetConnectionStatusAsync(CancellationToken ct);
}

The concrete HistorianGatewayClientAdapter : IHistorianGatewayClient (wrapping the package's HistorianGatewayClient) lands in T10 where the factory wires it. Every other adapter/recorder is TDD'd against an in-memory FakeHistorianGatewayClient test double.


Task 1: Consume the gateway packages + scaffold the Gateway driver project

Classification: small Estimated implement time: ~5 min Parallelizable with: none (every later task builds on this project)

Files:

  • Modify /Users/dohertj2/Desktop/OtOpcUa/NuGet.config
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/IHistorianGatewayClient.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.csproj
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/FakeHistorianGatewayClient.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/ProjectSmokeTests.cs
  • Modify /Users/dohertj2/Desktop/OtOpcUa/ZB.MOM.WW.OtOpcUa.slnx

Step 1: failing testProjectSmokeTests.cs:

using Xunit;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
public sealed class ProjectSmokeTests
{
    [Fact]
    public void GatewayClientSeam_IsReferenceable()
    {
        // Compiles only if the project references the Contracts package and the seam exists.
        var t = typeof(IHistorianGatewayClient);
        Assert.Equal("IHistorianGatewayClient", t.Name);
    }
}

FakeHistorianGatewayClient.cs implements IHistorianGatewayClient with settable result fields + recorded-call lists (the reusable double for all later tasks): each method returns from a public Func/queue field defaulting to empty, and records its arguments.

Step 2: run, expect FAILdotnet test ZB.MOM.WW.OtOpcUa.slnx --filter "FullyQualifiedName~ProjectSmokeTests" → FAILS to compile/restore (project/package/seam absent).

Step 3: implement

  1. In NuGet.config, add to the dohertj2-gitea <packageSource> block:
    <package pattern="ZB.MOM.WW.HistorianGateway.Contracts" />
    <package pattern="ZB.MOM.WW.HistorianGateway.Client" />
    
  2. New …Driver.Historian.Gateway.csproj (mirror a sibling driver csproj's TFM/props; do NOT set its own Nullable/PlatformsDirectory.Build.props supplies them):
    • <PackageReference Include="ZB.MOM.WW.HistorianGateway.Client" Version="0.1.0" /> (resolve the actual published version with dotnet package search; the Client transitively brings .Contracts).
    • <ProjectReference> to Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions and Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.
  3. IHistorianGatewayClient.cs — the interface exactly as in Prerequisites (proto-typed; using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;).
  4. Test csproj: references the Gateway driver project + Microsoft.NET.Test.Sdk, xunit, xunit.runner.visualstudio (copy versions from a sibling *.Tests.csproj).
  5. FakeHistorianGatewayClient as described.
  6. Add both projects to the solution: dotnet sln ZB.MOM.WW.OtOpcUa.slnx add src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.csproj.

Step 4: run, expect PASS — same filter → PASS; dotnet build ZB.MOM.WW.OtOpcUa.slnx clean (0 warnings).

Step 5: commit

git add NuGet.config ZB.MOM.WW.OtOpcUa.slnx src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests
git commit -m "feat(historian-gateway): scaffold Gateway driver project + consume client package

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 2: HistoryAggregateTypeRetrievalMode mapper (matrix-guarded)

Classification: small Estimated implement time: ~4 min Parallelizable with: Task 3, Task 4, Task 5, Task 6

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/AggregateModeMapper.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/AggregateModeMapperTests.cs

Step 1: failing test — exhaustive per-member + matrix guard:

using ZB.MOM.WW.OtOpcUa.Core.Abstractions;        // HistoryAggregateType
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;  // RetrievalMode
using Xunit;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.Mapping;
public sealed class AggregateModeMapperTests
{
    [Theory]
    [InlineData(HistoryAggregateType.Average, RetrievalMode.TimeWeightedAverage)]
    [InlineData(HistoryAggregateType.Minimum, RetrievalMode.MinimumWithTime)]
    [InlineData(HistoryAggregateType.Maximum, RetrievalMode.MaximumWithTime)]
    [InlineData(HistoryAggregateType.Total,   RetrievalMode.Integral)]
    [InlineData(HistoryAggregateType.Count,   RetrievalMode.Counter)]
    public void Maps_each_aggregate(HistoryAggregateType a, RetrievalMode expected)
        => Assert.Equal(expected, AggregateModeMapper.ToRetrievalMode(a));

    [Fact] // matrix guard: a new HistoryAggregateType member must fail here
    public void Every_aggregate_member_is_mapped()
    {
        foreach (var a in Enum.GetValues<HistoryAggregateType>())
            _ = AggregateModeMapper.ToRetrievalMode(a); // must not throw for any defined member
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~AggregateModeMapperTests" → FAILS (mapper absent).

Step 3: implementAggregateModeMapper.ToRetrievalMode(HistoryAggregateType): a switch expression mapping the five members per the table; the _ => arm throws ArgumentOutOfRangeException (so a future enum member fails the matrix guard, not silently mis-maps). Verify against WonderwareHistorianClient.MapAggregate: Average/Min/Max line up; the Wonderware path had no native Total (derived Average×interval) and used ValueCount for Count — the gateway's native RetrievalMode.Integral (Total) and RetrievalMode.Counter (Count) replace those client-side workarounds, so the gateway path is a strict improvement. Add an XML doc note recording that Total/Count are now native modes (no client-side scaling).

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): HistoryAggregateType->RetrievalMode mapper (matrix-guarded)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 3: DriverDataTypeHistorianDataType mapper with write-gap fallbacks (matrix-guarded)

Classification: standard Estimated implement time: ~5 min Parallelizable with: Task 2, Task 4, Task 5, Task 6

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/HistorianTypeMapper.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/HistorianTypeMapperTests.cs

Step 1: failing test — per-member mapping + deferred-throws + matrix guard:

using ZB.MOM.WW.OtOpcUa.Core.Abstractions;        // DriverDataType
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;  // HistorianDataType
using Xunit;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.Mapping;
public sealed class HistorianTypeMapperTests
{
    [Theory]
    [InlineData(DriverDataType.Boolean, HistorianDataType.Int1)]
    [InlineData(DriverDataType.Int16,   HistorianDataType.Int2)]
    [InlineData(DriverDataType.Int32,   HistorianDataType.Int4)]
    [InlineData(DriverDataType.Int64,   HistorianDataType.Int8)]
    [InlineData(DriverDataType.UInt16,  HistorianDataType.Uint4)]  // fallback: UInt2 write deferred upstream
    [InlineData(DriverDataType.UInt32,  HistorianDataType.Uint4)]
    [InlineData(DriverDataType.UInt64,  HistorianDataType.Uint8)]
    [InlineData(DriverDataType.Float32, HistorianDataType.Float)]
    [InlineData(DriverDataType.Float64, HistorianDataType.Double)]
    public void Maps_writable_numeric_types(DriverDataType d, HistorianDataType expected)
        => Assert.Equal(expected, HistorianTypeMapper.ToHistorianDataType(d));

    [Theory]
    [InlineData(DriverDataType.String)]
    [InlineData(DriverDataType.DateTime)]
    [InlineData(DriverDataType.Reference)]
    public void Deferred_types_throw_NotSupported_with_clear_message(DriverDataType d)
    {
        var ex = Assert.Throws<NotSupportedException>(() => HistorianTypeMapper.ToHistorianDataType(d));
        Assert.Contains("not historized in v1", ex.Message);  // human-actionable, no tag value leaked
    }

    [Fact] // matrix guard: a new DriverDataType member must be classified (mapped or explicitly deferred)
    public void Every_DriverDataType_member_is_classified()
    {
        foreach (var d in Enum.GetValues<DriverDataType>())
        {
            try { _ = HistorianTypeMapper.ToHistorianDataType(d); }
            catch (NotSupportedException) { /* explicitly deferred — acceptable */ }
            // any OTHER exception (e.g. ArgumentOutOfRangeException from an unhandled new member) fails the test
        }
    }
}

Note: protobuf C# generates HistorianDataType.Uint4 / Uint8 (single capital U). Confirm the generated casing against the resolved Contracts package and align the InlineData.

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~HistorianTypeMapperTests" → FAILS.

Step 3: implementHistorianTypeMapper.ToHistorianDataType(DriverDataType): switch per the §6 table; the three deferred members throw NotSupportedException with a message like $"DriverDataType.{d} is not historized in v1 (string/datetime/reference writes are deferred — gated on the analog SQL write path)." (no tag name/value in the message). The default arm also throws NotSupportedException referencing the unmapped member, so the matrix guard catches a future enum addition. Add a same-file bool IsHistorizable(DriverDataType) helper (true for the nine numeric members) — T15's provisioning hook uses it to skip deferred types without catching exceptions.

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): DriverDataType->HistorianDataType mapper + write-gap fallbacks (matrix-guarded)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 4: HistorianSample/HistorianAggregateSampleDataValueSnapshot + quality mapper

Classification: standard Estimated implement time: ~5 min Parallelizable with: Task 2, Task 3, Task 5, Task 6

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/GatewayQualityMapper.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/SampleMapper.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/GatewayQualityMapperTests.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/SampleMapperTests.cs

Step 1: failing test — quality parity (port the Wonderware table) + sample/aggregate shape:

public sealed class GatewayQualityMapperTests
{
    [Theory]
    [InlineData(192, 0x00000000u)] // Good
    [InlineData(216, 0x00D80000u)] // Good_LocalOverride
    [InlineData(64,  0x40000000u)] // Uncertain
    [InlineData(0,   0x80000000u)] // Bad
    [InlineData(8,   0x808A0000u)] // Bad_NotConnected
    [InlineData(255, 0x00000000u)] // >=192 bucket
    [InlineData(100, 0x40000000u)] // >=64 bucket
    [InlineData(1,   0x80000000u)] // bad bucket
    public void Maps_opc_quality_byte(byte q, uint expected)
        => Assert.Equal(expected, GatewayQualityMapper.Map(q));
}

public sealed class SampleMapperTests
{
    [Fact]
    public void Numeric_sample_maps_value_quality_and_timestamps()
    {
        var s = new HistorianSample { Tag = "T", NumericValue = 12.5,
            Quality = 192, OpcQuality = 192, Timestamp = Ts(2026,1,1,0,0,0) };
        var snap = SampleMapper.ToSnapshot(s);
        Assert.Equal(12.5, Assert.IsType<double>(snap.Value));
        Assert.Equal(0x00000000u, snap.StatusCode);
        Assert.Equal(DateTimeKind.Utc, snap.SourceTimestampUtc!.Value.Kind);
    }

    [Fact]
    public void String_sample_carries_string_value()
    {
        var s = new HistorianSample { Tag = "T", StringValue = "abc", OpcQuality = 192, Timestamp = Ts(2026,1,1,0,0,0) };
        Assert.Equal("abc", SampleMapper.ToSnapshot(s).Value);
    }

    [Fact]
    public void Aggregate_null_value_is_BadNoData()
    {
        var a = new HistorianAggregateSample { Tag = "T", /* Value unset */ EndTime = Ts(2026,1,1,0,0,0) };
        var snap = SampleMapper.ToAggregateSnapshot(a);
        Assert.Equal(0x800E0000u, snap.StatusCode); // BadNoData
        Assert.Null(snap.Value);
    }
    // Ts(...) builds a Google.Protobuf.WellKnownTypes.Timestamp from UTC parts.
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~SampleMapperTests|FullyQualifiedName~GatewayQualityMapperTests" → FAILS.

Step 3: implement

  • GatewayQualityMapper.Map(byte)byte-identical port of …Wonderware.Client/Internal/QualityMapper.cs (copy the switch verbatim; add an XML doc cross-reference to that origin so a future quality-table change stays in parity).
  • SampleMapper.ToSnapshot(HistorianSample)DataValueSnapshot(Value, StatusCode, SourceTimestampUtc, ServerTimestampUtc):
    • Value: s.NumericValue (when present via proto3 optional HasNumericValue) boxed as double, else s.StringValue when present, else null.
    • StatusCode: GatewayQualityMapper.Map((byte)s.OpcQuality) (prefer opc_quality; if zero/unset and quality carries the OPC-DA byte, fall back to quality — match whatever the gateway populates; document the choice).
    • SourceTimestampUtc: s.Timestamp.ToDateTime() (UTC kind).
    • ServerTimestampUtc: DateTime.UtcNow.
  • SampleMapper.ToAggregateSnapshot(HistorianAggregateSample) → null aggregate value ⇒ StatusCode 0x800E0000 (BadNoData), non-null ⇒ Good (0x00000000) with the value; SourceTimestampUtc ← the bucket end/start timestamp (match the Wonderware ToAggregateSnapshots convention — it stamps the bucket timestamp). Provide IReadOnlyList<> batch helpers too.

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): sample/aggregate->DataValueSnapshot + quality mapper (Wonderware parity)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 5: HistorianEventHistoricalEvent mapper (+ severity from properties)

Classification: small Estimated implement time: ~4 min Parallelizable with: Task 2, Task 3, Task 4, Task 6

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/EventMapper.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/EventMapperTests.cs

Step 1: failing test:

public sealed class EventMapperTests
{
    [Fact]
    public void Maps_core_fields_and_times()
    {
        var e = new HistorianEvent { Id = "E1", SourceName = "Pump1",
            EventTime = Ts(2026,1,1,0,0,0), ReceivedTime = Ts(2026,1,1,0,0,5) };
        e.Properties["Message"] = "High temp";
        e.Properties["Severity"] = "700";
        var h = EventMapper.ToHistoricalEvent(e);
        Assert.Equal("E1", h.EventId);
        Assert.Equal("Pump1", h.SourceName);
        Assert.Equal("High temp", h.Message);
        Assert.Equal((ushort)700, h.Severity);
        Assert.Equal(DateTimeKind.Utc, h.EventTimeUtc.Kind);
    }

    [Theory]
    [InlineData("Priority", "999", 999)]
    [InlineData("Severity", "0", 1)]     // clamp to OPC UA min 1
    [InlineData("Severity", "5000", 1000)] // clamp to OPC UA max 1000
    [InlineData(null, null, 1)]          // missing → default min severity
    public void Severity_parsed_and_clamped(string? key, string? val, int expected)
    {
        var e = new HistorianEvent { Id = "E", EventTime = Ts(2026,1,1,0,0,0), ReceivedTime = Ts(2026,1,1,0,0,0) };
        if (key is not null) e.Properties[key] = val!;
        Assert.Equal((ushort)expected, EventMapper.ToHistoricalEvent(e).Severity);
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~EventMapperTests" → FAILS.

Step 3: implementEventMapper.ToHistoricalEvent(HistorianEvent):

  • EventIde.Id; SourceNamee.SourceName; EventTimeUtce.EventTime.ToDateTime(); ReceivedTimeUtce.ReceivedTime.ToDateTime().
  • MessageProperties["Message"] if present, else e.Type (best-effort render); never null-crash.
  • Severity ← parse Properties["Severity"] else Properties["Priority"]; clamp to [1,1000]; missing/unparseable ⇒ 1. Return (ushort).
  • Add a batch helper IReadOnlyList<HistoricalEvent> ToHistoricalEvents(IEnumerable<HistorianEvent>).

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): HistorianEvent->HistoricalEvent mapper (+ clamped severity)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 6: AlarmHistorianEventHistorianEvent mapper (SendEvent)

Classification: small Estimated implement time: ~4 min Parallelizable with: Task 2, Task 3, Task 4, Task 5

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/AlarmEventMapper.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/AlarmEventMapperTests.cs

Step 1: failing test:

using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;   // AlarmHistorianEvent
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;      // AlarmSeverity
public sealed class AlarmEventMapperTests
{
    [Fact]
    public void Maps_source_time_type_and_rich_properties()
    {
        var a = new AlarmHistorianEvent("A1","Area/Line/Pump1","HiHi","LimitAlarm",
            AlarmSeverity.High, "Activated", "Temp high", "operator1", "ack note",
            new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc));
        var e = AlarmEventMapper.ToHistorianEvent(a);
        Assert.Equal("Area/Line/Pump1", e.SourceName);
        Assert.Equal("LimitAlarm", e.Type);
        Assert.Equal(new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc), e.EventTime.ToDateTime());
        Assert.Equal("HiHi", e.Properties["AlarmName"]);
        Assert.Equal("Activated", e.Properties["EventKind"]);
        Assert.Equal("High", e.Properties["Severity"]);
        Assert.Equal("operator1", e.Properties["User"]);
        Assert.Equal("ack note", e.Properties["Comment"]);
        Assert.Equal("Temp high", e.Properties["Message"]);
    }

    [Fact]
    public void Null_comment_is_omitted_not_null()
    {
        var a = new AlarmHistorianEvent("A","S","N","DiscreteAlarm",AlarmSeverity.Low,"Cleared","m","system",null,
            new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc));
        Assert.False(AlarmEventMapper.ToHistorianEvent(a).Properties.ContainsKey("Comment"));
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~AlarmEventMapperTests" → FAILS.

Step 3: implementAlarmEventMapper.ToHistorianEvent(AlarmHistorianEvent):

  • Ida.AlarmId (or Guid.NewGuid().ToString("N") if blank); SourceNamea.EquipmentPath; EventTimeTimestamp.FromDateTime(DateTime.SpecifyKind(a.TimestampUtc, DateTimeKind.Utc)); ReceivedTime ← same as event time (server re-stamps on the SQL path); Typea.AlarmTypeName.
  • Properties map: AlarmName, EventKind, Severity (a.Severity.ToString()), User, Message; add Comment only when non-null. Proto map<string,string> values must be non-null — never insert a null.

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): AlarmHistorianEvent->HistorianEvent mapper (SendEvent properties)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 7: GatewayHistorianDataSource — ReadRaw / ReadProcessed / ReadAtTime

Classification: standard Estimated implement time: ~5 min Parallelizable with: none (consumes T2 + T4; gates T8/T10/T11)

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs

Step 1: failing test — against FakeHistorianGatewayClient:

public sealed class GatewayHistorianDataSourceTests
{
    [Fact]
    public async Task ReadRaw_maps_samples_and_passes_args()
    {
        var fake = new FakeHistorianGatewayClient();
        fake.RawSamples = new[] {
            new HistorianSample { Tag="T", NumericValue=1.0, OpcQuality=192, Timestamp=Ts(2026,1,1,0,0,0) },
            new HistorianSample { Tag="T", NumericValue=2.0, OpcQuality=0,   Timestamp=Ts(2026,1,1,0,0,1) },
        };
        var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
        var r = await ds.ReadRawAsync("T", DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 100, default);
        Assert.Equal(2, r.Samples.Count);
        Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // Bad from quality 0
        Assert.Equal("T", fake.LastReadRawTag);
        Assert.Equal(100, fake.LastReadRawMaxValues);
    }

    [Fact]
    public async Task ReadProcessed_uses_aggregate_mode_mapping()
    {
        var fake = new FakeHistorianGatewayClient();
        var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
        await ds.ReadProcessedAsync("T", default, default, TimeSpan.FromSeconds(60), HistoryAggregateType.Minimum, default);
        Assert.Equal(RetrievalMode.MinimumWithTime, fake.LastAggregateMode);
        Assert.Equal(TimeSpan.FromSeconds(60), fake.LastAggregateInterval);
    }

    [Fact]
    public async Task ReadAtTime_aligns_one_snapshot_per_timestamp_with_gaps_Bad()
    {
        var fake = new FakeHistorianGatewayClient();
        var t0 = new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc);
        var t1 = t0.AddSeconds(1);
        fake.AtTimeSamples = new[] { new HistorianSample { NumericValue=5.0, OpcQuality=192, Timestamp=Timestamp.FromDateTime(t0) } };
        var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
        var r = await ds.ReadAtTimeAsync("T", new[]{ t0, t1 }, default);
        Assert.Equal(2, r.Samples.Count);          // exactly one per requested ts, in order
        Assert.Equal(5.0, r.Samples[0].Value);
        Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // gap → Bad at requested ts
    }

    [Fact]
    public async Task Empty_window_is_not_a_fault()
    {
        var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty<HistorianSample>() };
        var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
        var r = await ds.ReadRawAsync("T", default, default, 10, default);
        Assert.Empty(r.Samples);                    // GoodNoData-empty, no throw
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~GatewayHistorianDataSourceTests" → FAILS.

Step 3: implementGatewayHistorianDataSource : IHistorianDataSource (ctor takes IHistorianGatewayClient + ILogger<>):

  • ReadRawAsync: clamp maxValuesPerNode to int ((int)Math.Min(maxValuesPerNode, int.MaxValue)); drain client.ReadRawAsync IAsyncEnumerableSampleMapper.ToSnapshot; return new HistoryReadResult(list, ContinuationPoint: null).
  • ReadProcessedAsync: AggregateModeMapper.ToRetrievalMode(aggregate)client.ReadAggregateAsync(...); map via SampleMapper.ToAggregateSnapshot. (No client-side Total scaling — Integral is native; delete the Wonderware workaround.)
  • ReadAtTimeAsync: call client.ReadAtTimeAsync; align exactly one snapshot per requested timestamp in order, gaps → Bad (0x80000000) stamped at the requested time — port WonderwareHistorianClient.AlignAtTimeSnapshots verbatim (index returned samples by Timestamp.ToDateTime().Ticks).
  • Record health counters on every read via a private RecordOutcome mirroring the Wonderware single-lock discipline (used by T8). Map a thrown gateway exception to a recorded failure + rethrow (the node manager turns it into a Bad result; never crash the host).
  • Dispose() disposes the client (bridge the IAsyncDisposable like the Wonderware Dispose).

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): GatewayHistorianDataSource read paths (raw/processed/at-time)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 8: GetHealthSnapshot from Probe / GetConnectionStatus

Classification: small Estimated implement time: ~4 min Parallelizable with: none (extends T7's class)

Files:

  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs

Step 1: failing test:

public sealed class GatewayHealthSnapshotTests
{
    [Fact]
    public async Task Counters_track_success_and_failure()
    {
        var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty<HistorianSample>() };
        var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
        await ds.ReadRawAsync("T", default, default, 1, default);
        fake.ThrowOnRead = true;
        await Assert.ThrowsAnyAsync<Exception>(() => ds.ReadRawAsync("T", default, default, 1, default));
        var h = ds.GetHealthSnapshot();
        Assert.Equal(2, h.TotalQueries);
        Assert.Equal(1, h.TotalSuccesses);
        Assert.Equal(1, h.TotalFailures);
        Assert.Equal(1, h.ConsecutiveFailures);
        Assert.Equal(h.TotalQueries, h.TotalSuccesses + h.TotalFailures); // invariant
    }

    [Fact]
    public void Connection_state_reflects_GetConnectionStatus_flags()
    {
        var fake = new FakeHistorianGatewayClient {
            ConnectionStatus = new ConnectionStatus { ConnectedToServer = true, ConnectionKind = 0b11 } }; // Process|Event
        var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
        ds.RefreshConnectionStateAsync(default).GetAwaiter().GetResult(); // internal probe used by health hosted-service
        var h = ds.GetHealthSnapshot();
        Assert.True(h.ProcessConnectionOpen);
        Assert.True(h.EventConnectionOpen);
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~GatewayHealthSnapshotTests" → FAILS.

Step 3: implement

  • Add the six health counter fields under one _healthLock + RecordOutcome(bool, string?) (copy the Wonderware discipline so TotalSuccesses + TotalFailures == TotalQueries always holds). Wire RecordOutcome into every read method from T7.
  • Add a lightweight, non-blocking cached connection state: RefreshConnectionStateAsync(ct) calls client.GetConnectionStatusAsync (and/or ProbeAsync) and caches ProcessConnectionOpenConnectedToServer && (ConnectionKind & 1)!=0, EventConnectionOpenConnectedToServer && (ConnectionKind & 2)!=0. GetHealthSnapshot() is pure observation — returns the cached flags + counters; it never blocks on I/O (per the interface contract). ActiveProcessNode/ActiveEventNode/Nodes are null/empty (the gateway is non-clustered to us, mirroring the Wonderware client's Finding 010 posture).

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): GetHealthSnapshot via Probe/GetConnectionStatus (counter discipline)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 9: Reshape ServerHistorianOptions to gateway form

Classification: high-risk Estimated implement time: ~5 min Parallelizable with: Task 2Task 8 (different project/area)

Files:

  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ServerHistorianOptions.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ServerHistorianOptionsTests.cs (place in the existing Runtime test project; confirm its path with find tests -name "*Runtime.Tests.csproj")

Step 1: failing test:

public sealed class ServerHistorianOptionsTests
{
    [Fact]
    public void Disabled_yields_no_warnings()
        => Assert.Empty(new ServerHistorianOptions { Enabled = false }.Validate());

    [Fact]
    public void Enabled_without_endpoint_warns()
    {
        var w = new ServerHistorianOptions { Enabled = true, Endpoint = "", ApiKey = "histgw_x_y" }.Validate();
        Assert.Contains(w, m => m.Contains("Endpoint"));
    }

    [Fact]
    public void Enabled_without_apikey_warns()
    {
        var w = new ServerHistorianOptions { Enabled = true, Endpoint = "https://h:5222", ApiKey = "" }.Validate();
        Assert.Contains(w, m => m.Contains("ApiKey"));
    }

    [Fact]
    public void Valid_config_is_clean()
        => Assert.Empty(new ServerHistorianOptions { Enabled = true, Endpoint = "https://h:5222", ApiKey = "histgw_x_y" }.Validate());
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~ServerHistorianOptionsTests" → FAILS (new members absent).

Step 3: implement — reshape ServerHistorianOptions:

  • Add: string Endpoint (e.g. https://host:5222), string ApiKey = "" (supplied via env ServerHistorian__ApiKey), bool UseTls = true, bool AllowUntrustedServerCertificate = false, string? CaCertificatePath, TimeSpan CallTimeout = TimeSpan.FromSeconds(30).
  • Remove: Host, Port, SharedSecret, ServerCertThumbprint (Wonderware-specific). Keep Enabled and MaxTieClusterOverfetch (still used by the node manager's HistoryRead-Raw tie-cluster paging — unchanged).
  • Validate(): warn when enabled and Endpoint blank; warn when enabled and ApiKey blank ("the gateway gRPC surface will reject unauthenticated calls"); keep the MaxTieClusterOverfetch <= 0 warning. No secret values in warning text.
  • Update the class XML doc to describe the gateway backend (not the Wonderware sidecar).

High-risk: this is a config contract change. Any deployment appsettings ServerHistorian block changes shape (T21 documents the new keys + provides a migration note). The build will surface every stale reference to the removed members — the only in-tree consumer is the Program.cs factory (T10) and the (to-be-retired) Wonderware wiring.

Step 4: run, expect PASS — filter passes; dotnet build of the Runtime project clean. (Program.cs will not compile until T10 — that is expected and handled there; build the Runtime + Tests projects in isolation for this task: dotnet build src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj.)

Step 5: commit

git commit -am "feat(historian-gateway): reshape ServerHistorianOptions to gateway form (Endpoint/ApiKey/Tls)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 10: Swap AddServerHistorian factory in Program.cs (READ CUTOVER)

Classification: high-risk Estimated implement time: ~5 min Parallelizable with: none (the read cutover gate)

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/HistorianGatewayClientAdapter.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianServiceCollectionExtensions.cs (a Host-callable factory helper)
  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/HistorianGatewayClientAdapterTests.cs

Step 1: failing test — adapter construction + factory shape (no live gateway needed):

public sealed class HistorianGatewayClientAdapterTests
{
    [Fact]
    public void Adapter_constructs_from_options_without_dialing()
    {
        // Constructing the channel must not perform network I/O (lazy connect).
        var opts = new ServerHistorianOptions { Enabled = true, Endpoint = "https://localhost:5222", ApiKey = "histgw_x_y" };
        using var adapter = GatewayHistorianClientAdapter.Create(opts, NullLoggerFactory.Instance);
        Assert.NotNull(adapter);
    }

    [Fact]
    public void Factory_builds_GatewayHistorianDataSource()
    {
        var opts = new ServerHistorianOptions { Enabled = true, Endpoint = "https://localhost:5222", ApiKey = "histgw_x_y" };
        var ds = GatewayHistorian.CreateDataSource(opts, services: new ServiceCollection().BuildServiceProvider());
        Assert.IsType<GatewayHistorianDataSource>(ds);
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~HistorianGatewayClientAdapterTests" → FAILS.

Step 3: implement

  1. HistorianGatewayClientAdapter : IHistorianGatewayClient — wraps the package's HistorianGatewayClient. Create(ServerHistorianOptions, ILoggerFactory) builds the underlying client from HistorianGatewayClientOptions (Endpoint, ApiKey, UseTls, AllowUntrustedServerCertificate, CaCertificatePath, CallTimeout) — match the exact options surface the published Client package exposes (the sibling client plan). Each interface method forwards to the matching client wrapper (streaming → IAsyncEnumerable, unary → Task). Channel construction is lazy (no network I/O in the ctor). Map the client's typed exceptions through unchanged (the data source records them as failures).
  2. GatewayHistorian.CreateDataSource(ServerHistorianOptions, IServiceProvider) static factory: new GatewayHistorianDataSource(HistorianGatewayClientAdapter.Create(opts, sp.GetService<ILoggerFactory>() ?? NullLoggerFactory.Instance), sp.GetService<ILogger<GatewayHistorianDataSource>>() ?? NullLogger<…>.Instance).
  3. In Program.cs (hasDriver block, the AddServerHistorian call ~lines 115-122): replace the WonderwareHistorianClient factory lambda with (opts, sp) => GatewayHistorian.CreateDataSource(opts, sp). Update the using from …Driver.Historian.Wonderware.Client to …Driver.Historian.Gateway (the AddAlarmHistorian Wonderware lambda stays for now — T13 swaps it). Add the Host <ProjectReference> to the Gateway driver project in ZB.MOM.WW.OtOpcUa.Host.csproj.

Step 4: run, expect PASS — adapter tests pass; dotnet build ZB.MOM.WW.OtOpcUa.slnx is clean (Program.cs compiles against the reshaped options); the existing server-historian / node-manager HistoryRead tests stay green: dotnet test --filter "FullyQualifiedName~HistoryRead|FullyQualifiedName~ServerHistorian".

Step 5: commit

git commit -am "feat(historian-gateway): read cutover — AddServerHistorian builds GatewayHistorianDataSource

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

This is the read cutover gate. After this commit, OPC UA HistoryRead (raw/processed/at-time) for Galaxy + non-Galaxy tags flows through the gateway. Use case 1 is shippable here, before any write code.


Task 11: ReadEventsAsync (alarm history) on the data source

Classification: small Estimated implement time: ~4 min Parallelizable with: none (extends T7 class; consumes T5)

Files:

  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayReadEventsTests.cs

Step 1: failing test:

public sealed class GatewayReadEventsTests
{
    [Fact]
    public async Task ReadEvents_maps_and_passes_source_filter()
    {
        var fake = new FakeHistorianGatewayClient {
            Events = new[] { new HistorianEvent { Id="E1", SourceName="Pump1",
                EventTime=Ts(2026,1,1,0,0,0), ReceivedTime=Ts(2026,1,1,0,0,0) } } };
        var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
        var r = await ds.ReadEventsAsync("Pump1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 0, default);
        Assert.Single(r.Events);
        Assert.Equal("E1", r.Events[0].EventId);
        Assert.Equal("Pump1", fake.LastReadEventsSourceName);
    }

    [Fact]
    public async Task ReadEvents_truncation_sets_continuation_point()
    {
        var fake = new FakeHistorianGatewayClient {
            Events = Enumerable.Range(0,5).Select(i => new HistorianEvent { Id=$"E{i}",
                EventTime=Ts(2026,1,1,0,0,i), ReceivedTime=Ts(2026,1,1,0,0,i) }).ToArray() };
        var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
        var r = await ds.ReadEventsAsync(null, DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 3, default);
        Assert.Equal(3, r.Events.Count);
        Assert.NotNull(r.ContinuationPoint); // cap truncated → non-null per Core.Abstractions-009
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~GatewayReadEventsTests" → FAILS.

Step 3: implementReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, ct):

  • Treat maxEvents <= 0 as the backend-default sentinel (do not pass a cap; let the gateway apply its EventReadMaxRows). When maxEvents > 0, stop draining at maxEvents and set a non-null ContinuationPoint iff the source produced at least one more event (truncation signal, Core.Abstractions-009); otherwise null.
  • Map via EventMapper.ToHistoricalEvents. Record health outcome.
  • XML doc note: this path depends on the gateway being deployed with RuntimeDb:EventReadsEnabled=true; the source-name server filter is delivered by the gateway-client plan's SQL-ReadEvents enhancement — until it lands, sourceName is passed through but the gateway may return the full window and the adapter must not assume server-side filtering. (For v1 correctness, when sourceName is non-null, defensively filter the mapped events by SourceName client-side as well; remove once the server filter is confirmed.)

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): ReadEventsAsync alarm-history via gateway ReadEvents (+ truncation signal)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 12: GatewayAlarmHistorianWriter : IAlarmHistorianWriter

Classification: standard Estimated implement time: ~5 min Parallelizable with: Task 14, Task 16 (independent files; all in Phase D)

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs

Step 1: failing test — outcome mapping from gRPC status (uses the fake's per-call SendEvent behaviour):

public sealed class GatewayAlarmHistorianWriterTests
{
    private static AlarmHistorianEvent Evt(string id) => new(id,"Area/Pump","N","LimitAlarm",
        AlarmSeverity.High,"Activated","m","u",null,new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc));

    [Fact]
    public async Task All_acked_when_SendEvent_succeeds()
    {
        var fake = new FakeHistorianGatewayClient { SendEventResult = new WriteAck { Success = true } };
        var w = new GatewayAlarmHistorianWriter(fake, NullLogger<GatewayAlarmHistorianWriter>.Instance);
        var outcomes = await w.WriteBatchAsync(new[]{ Evt("A"), Evt("B") }, default);
        Assert.All(outcomes, o => Assert.Equal(HistorianWriteOutcome.Ack, o));
    }

    [Fact]
    public async Task Unavailable_is_RetryPlease()
    {
        var fake = new FakeHistorianGatewayClient { SendEventThrows = new RpcException(new Status(StatusCode.Unavailable, "down")) };
        var w = new GatewayAlarmHistorianWriter(fake, NullLogger<GatewayAlarmHistorianWriter>.Instance);
        var outcomes = await w.WriteBatchAsync(new[]{ Evt("A") }, default);
        Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
    }

    [Fact]
    public async Task InvalidArgument_is_PermanentFail()
    {
        var fake = new FakeHistorianGatewayClient { SendEventThrows = new RpcException(new Status(StatusCode.InvalidArgument, "malformed")) };
        var w = new GatewayAlarmHistorianWriter(fake, NullLogger<GatewayAlarmHistorianWriter>.Instance);
        var outcomes = await w.WriteBatchAsync(new[]{ Evt("A") }, default);
        Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
    }

    [Fact]
    public async Task Empty_batch_returns_empty() =>
        Assert.Empty(await new GatewayAlarmHistorianWriter(new FakeHistorianGatewayClient(),
            NullLogger<GatewayAlarmHistorianWriter>.Instance).WriteBatchAsync(Array.Empty<AlarmHistorianEvent>(), default));
}

If the published Client wraps RpcException in a typed hierarchy (…UnavailableException, …AuthorizationException), assert on those types instead — align with the client plan's exception design (§9). The fake should be configurable to throw whichever the real client surfaces.

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~GatewayAlarmHistorianWriterTests" → FAILS.

Step 3: implementGatewayAlarmHistorianWriter : IAlarmHistorianWriter: per event, AlarmEventMapper.ToHistorianEventclient.SendEventAsync; map result to HistorianWriteOutcome:

  • success ack ⇒ Ack.
  • transient gRPC (Unavailable, DeadlineExceeded, ResourceExhausted, Aborted, Internal) or the client's …UnavailableExceptionRetryPlease.
  • permanent gRPC (InvalidArgument, FailedPrecondition, OutOfRange, Unimplemented) ⇒ PermanentFail (so the drain worker dead-letters poison events instead of looping to the cap — mirrors the Wonderware PerEventStatus==2 boundary).
  • Unauthenticated/PermissionDeniedRetryPlease (a key fix re-enables the batch; do not dead-letter on an auth blip).
  • Returns one outcome per event, same order. Empty batch ⇒ []. Never throws out of WriteBatchAsync — it sits behind SqliteStoreAndForwardSink, which expects per-event outcomes.

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): GatewayAlarmHistorianWriter — SendEvent + gRPC->outcome mapping

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 13: Swap AddAlarmHistorian factory in Program.cs

Classification: high-risk Estimated implement time: ~3 min Parallelizable with: none

Files:

  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs

Step 1: failing test — reuse the existing Host integration/boot test (find it: find tests -name "*Host*Tests*.cs" | head); assert the resolved IAlarmHistorianWriter is the gateway writer when AlarmHistorian:Enabled=true. If no such test exists, add a minimal DI-resolution test in the Host test project that builds the service collection with the gateway writer factory and asserts the type. (Keep it offline — the writer ctor must not dial.)

Step 2: run, expect FAIL — run that test → FAILS (still wired to WonderwareHistorianClient).

Step 3: implement — in Program.cs AddAlarmHistorian(...) call (~lines 101-108): replace the WonderwareHistorianClient writer lambda with one that builds new GatewayAlarmHistorianWriter(HistorianGatewayClientAdapter.Create(<gateway options>, sp.GetService<ILoggerFactory>()...), sp.GetService<ILogger<GatewayAlarmHistorianWriter>>()...). The alarm-write path must point at the same gateway endpoint/key as the read path — source the connection from ServerHistorianOptions (single gateway) rather than the Wonderware-shaped AlarmHistorianOptions host/port. Reuse a single IHistorianGatewayClient/adapter instance across read + alarm-write where practical (resolve a shared singleton in DI to avoid two channels); if simplest, register IHistorianGatewayClient as a singleton built from ServerHistorianOptions and have both factories consume it. Remove the now-dead using …Wonderware.Client.

Step 4: run, expect PASS — the test passes; dotnet build ZB.MOM.WW.OtOpcUa.slnx clean.

Step 5: commit

git commit -am "feat(historian-gateway): alarm-write cutover — AddAlarmHistorian drains to GatewayAlarmHistorianWriter

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 14: IHistorianProvisioning + GatewayTagProvisioner (EnsureTags)

Classification: standard Estimated implement time: ~5 min Parallelizable with: Task 12, Task 16

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorianProvisioning.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayTagProvisioner.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayTagProvisionerTests.cs

Step 1: failing test:

public sealed class GatewayTagProvisionerTests
{
    [Fact]
    public async Task Ensures_numeric_tags_with_mapped_type()
    {
        var fake = new FakeHistorianGatewayClient { EnsureTagsResult = new TagOperationResults() };
        var p = new GatewayTagProvisioner(fake, NullLogger<GatewayTagProvisioner>.Instance);
        var reqs = new[] {
            new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, "degC", "Temp"),
            new HistorianTagProvisionRequest("Pump1.Run",  DriverDataType.Boolean,  null, null),
        };
        var result = await p.EnsureTagsAsync(reqs, default);
        Assert.Equal(2, fake.LastEnsureDefinitions.Count);
        Assert.Equal(HistorianDataType.Float,  fake.LastEnsureDefinitions[0].DataType);
        Assert.Equal(HistorianDataType.Int1,   fake.LastEnsureDefinitions[1].DataType);
        Assert.Equal(2, result.Requested);
        Assert.Equal(0, result.Skipped);
    }

    [Fact]
    public async Task Deferred_types_are_skipped_not_sent()
    {
        var fake = new FakeHistorianGatewayClient { EnsureTagsResult = new TagOperationResults() };
        var p = new GatewayTagProvisioner(fake, NullLogger<GatewayTagProvisioner>.Instance);
        var result = await p.EnsureTagsAsync(new[] {
            new HistorianTagProvisionRequest("Pump1.Name", DriverDataType.String, null, null) }, default);
        Assert.Empty(fake.LastEnsureDefinitions);  // String is deferred → never sent
        Assert.Equal(1, result.Skipped);
    }

    [Fact]
    public async Task Gateway_failure_is_swallowed_and_counted_not_thrown()
    {
        var fake = new FakeHistorianGatewayClient { EnsureTagsThrows = new Exception("boom") };
        var p = new GatewayTagProvisioner(fake, NullLogger<GatewayTagProvisioner>.Instance);
        var result = await p.EnsureTagsAsync(new[] {
            new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, null, null) }, default);
        Assert.Equal(1, result.Failed);  // non-blocking: no throw
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~GatewayTagProvisionerTests" → FAILS.

Step 3: implement

  • IHistorianProvisioning (Core.Abstractions, alongside IHistorianDataSource): Task<HistorianProvisionResult> EnsureTagsAsync(IReadOnlyList<HistorianTagProvisionRequest> requests, CancellationToken ct). Define HistorianTagProvisionRequest(string TagName, DriverDataType DataType, string? EngineeringUnit, string? Description) and HistorianProvisionResult(int Requested, int Ensured, int Skipped, int Failed) records here too. Add a NullHistorianProvisioning no-op (returns all-zero) so the Applier has a safe default.
  • GatewayTagProvisioner : IHistorianProvisioning (Gateway driver): for each request, gate on HistorianTypeMapper.IsHistorizable(DataType) — non-historizable ⇒ count Skipped, log Debug, never build a definition; otherwise build HistorianTagDefinition { TagName, DataType = HistorianTypeMapper.ToHistorianDataType(...), EngineeringUnit, Description }. Batch all historizable definitions into one client.EnsureTagsAsync call. Non-blocking semantics: wrap the call in try/catch — any exception ⇒ count the batch as Failed, log a Warning (no tag values), return. Returns the result record. Never throws.

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): IHistorianProvisioning + GatewayTagProvisioner (EnsureTags, non-blocking)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 15: Hook provisioning into AddressSpaceApplier.Apply()

Classification: high-risk Estimated implement time: ~5 min Parallelizable with: none

Files:

  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/ZB.MOM.WW.OtOpcUa.OpcUaServer.csproj (add Core.Abstractions ProjectReference)
  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/AddressSpaceApplier.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/AddressSpaceApplierProvisioningTests.cs (confirm the OpcUaServer test project path with find tests -name "*OpcUaServer.Tests.csproj")

Step 1: failing test — synthetic plan with mixed historized/non-historized tags + a capturing IHistorianProvisioning:

public sealed class AddressSpaceApplierProvisioningTests
{
    private sealed class CapturingProvisioner : IHistorianProvisioning
    {
        public List<HistorianTagProvisionRequest> Seen = new();
        public bool Throw;
        public Task<HistorianProvisionResult> EnsureTagsAsync(IReadOnlyList<HistorianTagProvisionRequest> r, CancellationToken ct)
        {
            if (Throw) throw new Exception("boom");
            Seen.AddRange(r);
            return Task.FromResult(new HistorianProvisionResult(r.Count, r.Count, 0, 0));
        }
    }

    [Fact]
    public void Apply_provisions_only_historized_added_tags()
    {
        var prov = new CapturingProvisioner();
        var applier = new AddressSpaceApplier(NullSink, NullLogger, prov);
        var plan = PlanWith(
            Tag("Pump1.Temp", historized:true,  historianName:"Pump1.Temp", type:"Float32"),
            Tag("Pump1.Run",  historized:false, historianName:null,         type:"Boolean"));
        applier.Apply(plan);
        Assert.Single(prov.Seen);
        Assert.Equal("Pump1.Temp", prov.Seen[0].TagName); // resolved historian name
    }

    [Fact]
    public void Provisioner_throw_does_not_block_publish()
    {
        var applier = new AddressSpaceApplier(NullSink, NullLogger, new CapturingProvisioner { Throw = true });
        var outcome = applier.Apply(PlanWith(Tag("Pump1.Temp", historized:true, historianName:"Pump1.Temp", type:"Float32")));
        Assert.True(outcome.RebuildCalled); // address-space work still completed
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~AddressSpaceApplierProvisioningTests" → FAILS (ctor arity / hook absent).

Step 3: implement

  • Add <ProjectReference> from ZB.MOM.WW.OtOpcUa.OpcUaServer.csproj to Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions (it is a leaf abstractions project — OpcUaServer already references Commons; no cycle).
  • Add an IHistorianProvisioning ctor param to AddressSpaceApplier defaulting to NullHistorianProvisioning.Instance (preserves every existing call site — including the one in ServiceCollectionExtensions.WithOtOpcUaRuntimeActors that constructs the applier).
  • In Apply(plan), after the address-space work (rebuild/surgical passes) completes, iterate plan.AddedEquipmentTags.Where(t => t.IsHistorized):
    • resolve the historian name exactly as the materialiser does: string.IsNullOrWhiteSpace(t.HistorianTagname) ? t.FullName : t.HistorianTagname.
    • parse t.DataType (string) → DriverDataType via Enum.TryParse; an unparseable type ⇒ skip + log Debug.
    • build HistorianTagProvisionRequest(historianName, dataType, EngineeringUnit: null, Description: t.Name).
    • Fire-and-forget the provisioning off the apply path so it NEVER blocks the publish: call provisioner.EnsureTagsAsync(requests, CancellationToken.None) and observe the task on a continuation that logs result.Failed/Skipped (and swallows any escaped exception). The synchronous Apply returns its normal AddressSpaceApplyOutcome regardless. Wrap the whole hook in try/catch so a hook fault cannot break a deploy.

High-risk: this runs on the OPC UA publish actor's pinned thread. Keep the hook's synchronous portion to building the request list only; the gateway round-trip must be fully detached (do not .Wait()/.Result). Tests assert publish completes even when the provisioner throws synchronously.

Step 4: run, expect PASS — provisioning tests pass; the existing AddressSpaceApplier test suite stays green (dotnet test --filter "FullyQualifiedName~AddressSpaceApplier"); dotnet build clean.

Step 5: commit

git commit -am "feat(historian-gateway): EnsureTags provisioning hook in AddressSpaceApplier (non-blocking)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 16: FasterLog outbox store for the recorder

Classification: high-risk Estimated implement time: ~5 min Parallelizable with: Task 12, Task 14

Files:

  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj (add Microsoft.FASTER.Core 2.6.5)
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/HistorizationOutboxEntry.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/IHistorizationOutbox.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs

Step 1: failing test — append/peek/remove, restart durability, full-drop:

public sealed class FasterLogHistorizationOutboxTests
{
    private static HistorizationOutboxEntry E(string tag, double v) =>
        new(Guid.NewGuid(), tag, v, 192, new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc));

    [Fact]
    public async Task Append_then_peek_returns_fifo()
    {
        var dir = NewTempDir();
        using var o = new FasterLogHistorizationOutbox(dir, StoreForwardCommitMode.PerEntry);
        await o.AppendAsync(E("A",1), default);
        await o.AppendAsync(E("B",2), default);
        var batch = await o.PeekBatchAsync(10, default);
        Assert.Equal(new[]{"A","B"}, batch.Select(b => b.Tag));
        Assert.Equal(2, await o.CountAsync(default));
    }

    [Fact]
    public async Task Remove_truncates_and_survives_restart()
    {
        var dir = NewTempDir();
        Guid keep;
        {
            using var o = new FasterLogHistorizationOutbox(dir, StoreForwardCommitMode.PerEntry);
            var a = E("A",1); var b = E("B",2); keep = b.Id;
            await o.AppendAsync(a, default); await o.AppendAsync(b, default);
            await o.PeekBatchAsync(10, default);
            await o.RemoveAsync(a.Id, default);   // ack A
        }
        using var reopened = new FasterLogHistorizationOutbox(dir, StoreForwardCommitMode.PerEntry);
        Assert.Equal(1, await reopened.CountAsync(default));      // only B survives
        var batch = await reopened.PeekBatchAsync(10, default);
        Assert.Equal(keep, batch[0].Id);
    }

    [Fact]
    public async Task Capacity_full_drops_oldest_and_counts()
    {
        var dir = NewTempDir();
        using var o = new FasterLogHistorizationOutbox(dir, StoreForwardCommitMode.PerEntry, capacity: 2);
        await o.AppendAsync(E("A",1), default);
        await o.AppendAsync(E("B",2), default);
        await o.AppendAsync(E("C",3), default);  // overflow → drop oldest (A)
        Assert.Equal(2, await o.CountAsync(default));
        Assert.Equal(1, o.DroppedCount);
        var tags = (await o.PeekBatchAsync(10, default)).Select(b => b.Tag).ToArray();
        Assert.DoesNotContain("A", tags);
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~FasterLogHistorizationOutboxTests" → FAILS.

Step 3: implementmirror the gateway's FasterLogOutboxStore (read it first):

  • HistorizationOutboxEntry(Guid Id, string Tag, double NumericValue, ushort Quality, DateTime TimestampUtc) + a compact binary serializer (BinaryWriter or the same approach the gateway's OutboxEntrySerializer uses).
  • IHistorizationOutbox (Append/PeekBatch/Remove/Count, DroppedCount, IDisposable).
  • FasterLogHistorizationOutbox: ManagedLocalStorageDevice under <dir>/hlog.log; FasterLog; PerEntry commits before AppendAsync returns, Periodic drives a PeriodicTimer commit loop; RemoveAsyncTruncateUntil(window[id]) + CommitAsync; RecoverState() in the ctor rebuilds count + head from the committed log (restart durability). Add the capacity/drop-oldest behavior the gateway store lacks: a capacity ctor arg; when an append would exceed it, advance the head past the oldest entry (truncate one) and increment DroppedCount (the recorder surfaces it as a metric in T17/T18). Use StoreForwardCommitMode from the gateway's Configuration namespace or define a local enum HistorizationCommitMode { PerEntry, Periodic } to avoid a cross-package coupling — prefer the local enum (the gateway's type is internal to its Server project).

High-risk: this is the durable boundary. The restart-durability + truncate-correctness tests are load-bearing; do not skip them.

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): FasterLog historization outbox (PerEntry/Periodic, drop-oldest)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 17: ContinuousHistorizationRecorder actor

Classification: high-risk Estimated implement time: ~5 min Parallelizable with: none (consumes T16 + the client)

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs

The recorder lives in Runtime (it references DependencyMuxActor + DriverInstanceActor.AttributeValuePublished, both in Runtime), and consumes IHistorianGatewayClient + IHistorizationOutbox via abstractions so Runtime does not take a hard dependency on the Gateway driver project. Define the two seams the recorder needs (IHistorianGatewayClient write subset, IHistorizationOutbox) such that Runtime can reference them — either move IHistorizationOutbox + a IHistorianValueWriter (just WriteLiveValuesAsync) into Core.Abstractions/Historian, and have the Gateway driver implement them. Decision: introduce IHistorianValueWriter (single method Task<bool> WriteLiveValuesAsync(string tag, IReadOnlyList<(DateTime? ts, double value, ushort quality)> values, CancellationToken ct)) and IHistorizationOutbox in Core.Abstractions/Historian; GatewayHistorianValueWriter (Gateway driver) adapts IHistorianGatewayClient.WriteLiveValuesAsync. This keeps Runtime free of the gRPC package.

Files (revised to honour the layering decision):

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorianValueWriter.cs (+ move IHistorizationOutbox + HistorizationOutboxEntry here; the FasterLog impl from T16 stays in the Gateway driver project implementing this interface)
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/GatewayHistorianValueWriter.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs
  • Create the recorder test file above.

Step 1: failing test — Akka TestKit with fake mux + fake writer + real/fake outbox:

public sealed class ContinuousHistorizationRecorderTests : TestKit
{
    [Fact]
    public void Registers_interest_for_historized_refs_on_start()
    {
        var mux = CreateTestProbe();
        var writer = new FakeValueWriter();
        var outbox = new InMemoryOutbox();
        Sys.ActorOf(ContinuousHistorizationRecorder.Props(mux.Ref, writer, outbox,
            historizedRefs: new[]{ "Pump1.Temp" }));
        var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
        Assert.Contains("Pump1.Temp", reg.TagRefs);
    }

    [Fact]
    public async Task AttributeValuePublished_appends_to_outbox_then_drains_to_writer()
    {
        var mux = CreateTestProbe();
        var writer = new FakeValueWriter { Succeed = true };
        var outbox = new InMemoryOutbox();
        var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(mux.Ref, writer, outbox, new[]{ "Pump1.Temp" }));
        rec.Tell(new DriverInstanceActor.AttributeValuePublished("drv","Pump1.Temp", 42.0, OpcUaQuality.Good, DateTime.UtcNow));
        await AwaitAssertAsync(() => Assert.Contains(writer.Written, w => w.Tag=="Pump1.Temp" && w.Value==42.0));
        await AwaitAssertAsync(async () => Assert.Equal(0, await outbox.CountAsync(default))); // acked → truncated
    }

    [Fact]
    public async Task Writer_failure_keeps_entry_for_retry()
    {
        var mux = CreateTestProbe();
        var writer = new FakeValueWriter { Succeed = false };
        var outbox = new InMemoryOutbox();
        var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(mux.Ref, writer, outbox, new[]{ "Pump1.Temp" }));
        rec.Tell(new DriverInstanceActor.AttributeValuePublished("drv","Pump1.Temp", 7.0, OpcUaQuality.Good, DateTime.UtcNow));
        await AwaitAssertAsync(async () => Assert.Equal(1, await outbox.CountAsync(default))); // not acked → retained
    }

    [Fact]
    public void Non_numeric_value_is_dropped_with_metric()
    {
        var mux = CreateTestProbe(); var writer = new FakeValueWriter(); var outbox = new InMemoryOutbox();
        var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(mux.Ref, writer, outbox, new[]{ "Pump1.Name" }));
        rec.Tell(new DriverInstanceActor.AttributeValuePublished("drv","Pump1.Name", "text", OpcUaQuality.Good, DateTime.UtcNow));
        // string value: SQL analog write path can't carry it → dropped, not appended
        AwaitAssert(() => Assert.Empty(writer.Written));
    }
}

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~ContinuousHistorizationRecorderTests" → FAILS.

Step 3: implementContinuousHistorizationRecorder : ReceiveActor:

  • Props(IActorRef dependencyMux, IHistorianValueWriter writer, IHistorizationOutbox outbox, IReadOnlyList<string> historizedRefs, ...options).
  • PreStart: dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(historizedRefs, Self)). (The mux fans AttributeValuePublished for those refs back as DependencyValueChanged; or wire the recorder to receive AttributeValuePublished directly from DriverHostActor's forward — match the actual fan-out. Per the mux contract, register interest and handle VirtualTagActor.DependencyValueChanged; if the recorder must see ALL historized refs regardless of vtag interest, instead have DriverHostActor forward AttributeValuePublished to the recorder. Choose the RegisterInterestDependencyValueChanged path to reuse the existing mux without touching DriverHostActor; the test asserts the register message.)
  • On a value-change message for a historized ref: coerce the value to numeric (double); a non-numeric/non-coercible value (string/null) is dropped + metered (the SQL analog WriteLiveValues path is numeric-only — design §6/§7, §11 risk 2). Append a HistorizationOutboxEntry to the outbox (the durable boundary). On outbox-full, the store drops oldest + increments DroppedCount.
  • A background drain (self-scheduled tick via Context.System.Scheduler / Timers): PeekBatchAsync(batchSize) → group by tag → writer.WriteLiveValuesAsync(tag, values); on success RemoveAsync each acked id; on failure leave queued and back off (exponential, capped) before the next tick. Never let a drain exception kill the actor (supervised try/catch; log Warning, back off).
  • Surface counters (queued depth, dropped, last drain success) for T18's meter/health.

High-risk: actor + durable recorder + data contract. The append-before-ack ordering, the numeric-only gate, and the failure-retains-entry behavior are the load-bearing invariants. Use Akka.TestKit.Xunit2.

Step 4: run, expect PASS.

Step 5: commit

git commit -am "feat(historian-gateway): ContinuousHistorizationRecorder actor (outbox->WriteLiveValues, backoff)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 18: Wire the recorder into DI + hosted lifecycle

Classification: standard Estimated implement time: ~5 min Parallelizable with: none

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationOptions.cs
  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs (spawn the recorder in WithOtOpcUaRuntimeActors, gated on options)
  • Modify /Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs (bind options + register IHistorianValueWriter + IHistorizationOutbox from the gateway, gated)
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationOptionsTests.cs

Step 1: failing test — options validation + actor-spawn gating:

public sealed class ContinuousHistorizationOptionsTests
{
    [Fact] public void Disabled_no_warnings() => Assert.Empty(new ContinuousHistorizationOptions{Enabled=false}.Validate());
    [Fact] public void Enabled_requires_outbox_path()
        => Assert.Contains(new ContinuousHistorizationOptions{Enabled=true, OutboxPath=""}.Validate(), m => m.Contains("OutboxPath"));
    [Fact] public void Periodic_requires_positive_interval()
        => Assert.Contains(new ContinuousHistorizationOptions{Enabled=true, OutboxPath="x", CommitMode="Periodic", CommitIntervalMs=0}.Validate(), m => m.Contains("CommitIntervalMs"));
}

Plus a Runtime spawn test (mirror existing WithOtOpcUaRuntimeActors tests): when ContinuousHistorization:Enabled=false, the recorder actor is NOT spawned; when enabled with a fake writer/outbox registered, it IS (registry.Get<ContinuousHistorizationRecorderKey>() resolves).

Step 2: run, expect FAILdotnet test --filter "FullyQualifiedName~ContinuousHistorizationOptions" → FAILS.

Step 3: implement

  • ContinuousHistorizationOptions (SectionName="ContinuousHistorization"): Enabled, OutboxPath (directory; required when enabled — production absolute path), CommitMode (PerEntry/Periodic), CommitIntervalMs, DrainBatchSize (default 64), DrainIntervalSeconds, Capacity, RetryBackoff. Validate() warns on the gated cases above.
  • In WithOtOpcUaRuntimeActors: resolve IHistorianValueWriter + IHistorizationOutbox (+ the historized-ref set — source it from the deployed composition; for v1, the recorder can register interest dynamically as tags deploy, but the minimal wiring registers the actor and lets a later SetHistorizedRefs message feed it. Keep T18 scope to: bind options, build the outbox from OutboxPath, spawn the recorder when enabled, register its key.). Gate the spawn on ContinuousHistorizationOptions.Enabled.
  • In Program.cs (hasDriver block): bind+validate the options; when enabled, register IHistorizationOutboxFasterLogHistorizationOutbox(opts.OutboxPath, …) and IHistorianValueWriterGatewayHistorianValueWriter over the shared IHistorianGatewayClient (the singleton from T13). Add a meter/observable-gauge for outbox depth + dropped count (mirror the existing observability registration), and feed the recorder's health into /healthz if a historian health hook exists.

Step 4: run, expect PASS — option + spawn tests pass; dotnet build ZB.MOM.WW.OtOpcUa.slnx clean; full offline suite green: dotnet test ZB.MOM.WW.OtOpcUa.slnx.

Step 5: commit

git commit -am "feat(historian-gateway): wire ContinuousHistorizationRecorder into DI + hosted lifecycle + meters

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 19: Retire the Wonderware historian projects

Classification: high-risk Estimated implement time: ~5 min Parallelizable with: none (must be last code task)

Files (remove from solution + delete):

  • /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware
  • /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client
  • /Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts
  • /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests
  • /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests
  • Modify /Users/dohertj2/Desktop/OtOpcUa/ZB.MOM.WW.OtOpcUa.slnx
  • Grep-and-remove any remaining using …Wonderware… / ProjectReference …Wonderware… (notably Host.csproj).

Leave /Users/dohertj2/Desktop/OtOpcUa/code-reviews/Driver.Historian.Wonderware* untouched — those are review artifacts, not projects.

Step 1: failing test — a guard test asserting the Wonderware types are gone (compiles only after removal):

public sealed class WonderwareRetirementTests
{
    [Fact]
    public void No_Wonderware_historian_assembly_is_loaded()
    {
        var loaded = AppDomain.CurrentDomain.GetAssemblies().Select(a => a.GetName().Name);
        Assert.DoesNotContain(loaded, n => n is not null && n.Contains("Wonderware", StringComparison.OrdinalIgnoreCase));
    }
}

(Place in the Gateway driver test project. Before removal this passes trivially only if nothing loaded it — instead make the gate the build: after deleting, dotnet build must still be clean with no dangling references. The test documents intent.)

Step 2: run, expect FAIL — first run dotnet build ZB.MOM.WW.OtOpcUa.slnx BEFORE deletion to confirm it is currently green with both backends, then delete; the meaningful failure is a dangling reference build break if any using/ProjectReference survives.

Step 3: implement

  1. dotnet sln ZB.MOM.WW.OtOpcUa.slnx remove <each of the 5 project paths>.
  2. git rm -r <each of the 5 directories>.
  3. grep -rn "Wonderware" src/ tests/ --include=*.cs --include=*.csproj and remove every remaining reference (Host.csproj ProjectReference, any stray using). AlarmHistorianOptions Wonderware-shaped fields (Host/Port/SharedSecret) — if now unused by the gateway alarm-write wiring (T13 sources connection from ServerHistorian), prune them; otherwise leave and note in T21.

Step 4: run, expect PASSdotnet build ZB.MOM.WW.OtOpcUa.slnx clean (0 warnings, no dangling refs); full suite green: dotnet test ZB.MOM.WW.OtOpcUa.slnx.

Step 5: commit

git rm -r src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests
git commit -am "refactor(historian-gateway): retire Wonderware historian projects (gateway is sole backend)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 20: Env-gated live validation vs wonder-sql-vd03

Classification: small Estimated implement time: ~5 min Parallelizable with: none

Files:

  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Live/GatewayLiveIntegrationTests.cs
  • Create /Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Live/GatewayLiveFixture.cs

Step 1: failing test[Trait("Category","LiveIntegration")] tests that skip when env vars are absent:

public sealed class GatewayLiveIntegrationTests
{
    // Reads HISTGW_GATEWAY_ENDPOINT + HISTGW_GATEWAY_APIKEY + HISTGW_TEST_TAG (Galaxy tag),
    // HISTGW_WRITE_SANDBOX_TAG (HistGW.LiveTest.*), HISTGW_ALARM_SOURCE. Skips if endpoint/key absent.
    [SkippableFact, Trait("Category","LiveIntegration")]
    public async Task Galaxy_tag_read_round_trip() { Skip.If(Fixture.NotConfigured); /* ReadRaw last 1h, assert >=0 samples, no throw */ }

    [SkippableFact, Trait("Category","LiveIntegration")]
    public async Task Write_then_read_on_sandbox_tag() { /* EnsureTags(Float) → WriteLiveValues → ReadRaw → sample present */ }

    [SkippableFact, Trait("Category","LiveIntegration")]
    public async Task Alarm_SendEvent_then_ReadEvents() { /* SendEvent → ReadEvents(source) → event present (needs RuntimeDb:EventReadsEnabled) */ }
}

Step 2: run, expect FAIL/SKIPdotnet test --filter "Category=LiveIntegration" with no env → all SKIP (green). With env set + VPN up → FAIL until the live path works.

Step 3: implement — the fixture builds a real HistorianGatewayClientAdapter from env, the three round-trips exercise the read/write/alarm paths against the live gateway → wonder-sql-vd03. Mirror the HistorianGateway repo's GatewayIntegrationFixture env-gating convention (skip+log when unset). VPN-gated: wonder-sql-vd03 is reachable only on VPN — if the endpoint is configured but unreachable, prompt the user to connect VPN (do not hang).

Step 4: run, expect PASS — offline: all SKIP. On VPN with env + the gateway running RuntimeDb:Enabled=true + RuntimeDb:EventReadsEnabled=true: the three round-trips PASS (this is the real-world validation gate before T19's retirement is trusted).

Step 5: commit

git commit -am "test(historian-gateway): env-gated live validation vs wonder-sql-vd03 (read/write/alarm round-trips)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Task 21: Documentation

Classification: small Estimated implement time: ~5 min Parallelizable with: none

Files:

  • Modify /Users/dohertj2/Desktop/OtOpcUa/CLAUDE.md
  • Modify /Users/dohertj2/Desktop/OtOpcUa/README.md (if present)
  • Modify the relevant appsettings*.json (the ServerHistorian + new ContinuousHistorization blocks) under src/Server/ZB.MOM.WW.OtOpcUa.Host/

Step 1: failing test — none (docs). Verification is review + a grep guard: the docs must not reference the retired Wonderware backend as current, and must document the new config keys.

Step 2: run, expect FAILgrep -n "Wonderware" CLAUDE.md shows stale references (the backend section still describes the sidecar).

Step 3: implement

  • CLAUDE.md: historian backend is now HistorianGateway (gRPC client package); document the ServerHistorian keys (Endpoint/ApiKey/UseTls/AllowUntrustedServerCertificate/CaCertificatePath/CallTimeout), the ContinuousHistorization section, the IHistorianProvisioning EnsureTags hook, the alarm SendEvent path + ReadEvents dependency on gateway RuntimeDb:EventReadsEnabled=true, and that the Wonderware projects were retired. Note the gateway-side prerequisites (RuntimeDb flags + API-key scopes).
  • appsettings: a commented ServerHistorian example (blank ApiKey — env-supplied) + a ContinuousHistorization example (disabled by default, absolute OutboxPath note for production).
  • Migration note: deployments must rename their old ServerHistorian keys (Host/Port/SharedSecret/ServerCertThumbprintEndpoint/ApiKey/UseTls/CaCertificatePath) and supply ServerHistorian__ApiKey via env.

Step 4: run, expect PASSgrep -n "Wonderware" CLAUDE.md returns only retirement-history mentions; the new keys are documented.

Step 5: commit

git commit -am "docs(historian-gateway): document gateway backend, config keys, EnsureTags hook, Wonderware retirement

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"

Execution order & parallelism

  • Phase A (T1) runs first — every later task depends on the project + consumed package.
  • Phase B mappers (T2T6) are all pure/no-I/O and fully parallelizable with each other once T1 lands.
  • Phase C (T7→T8→T9→T10→T11) follows B. T7 consumes T2+T4; T9 (options reshape) can proceed in parallel with B/T7 (different area). T10 is the read cutover gate — it depends on T7 (data source) + T9 (options) and makes use case 1 shippable on its own. T8 and T11 extend T7's class (sequential with it).
  • Phase D (T12T18) follows the read cutover (T10). T12 (alarm writer), T14 (provisioner), T16 (outbox) are mutually parallelizable; T13 depends on T12; T15 depends on T14; T17 depends on T16 (+ client write seam); T18 depends on T17.
  • Phase E: T20 (live validation) runs after the gateway path is fully wired (read cutover + alarm + recorder). T19 (Wonderware retirement) lands LAST — only after D is green AND T20's live round-trips pass (do not delete the proven-out backend until the replacement is live-validated). T21 (docs) follows the retirement so it documents the final state.
  • The whole live-validation phase is gated on VPN reachability of wonder-sql-vd03 and on the gateway being deployed with RuntimeDb:Enabled=true + RuntimeDb:EventReadsEnabled=true.

Live/VPN + verify-live risks (design §11)

Settle these during T10/T11/T17/T20 against wonder-sql-vd03 (prompt the user to connect VPN if unreachable):

  1. Galaxy-tag → historian-tag identity. Does OtOpcUa's resolved historianTagname (FullName or the HistorianTagname override, typically tag_name.Attribute) match the AVEVA historian's stored tag name? Confirm early in T20's read round-trip — a mismatch surfaces as empty reads, not errors.
  2. UInt16 / String / DateTime write gaps. Continuous historization is numeric-analog only in v1. UInt16→UInt4 is a documented fallback; String/DateTime/Reference are deferred and throw NotSupported (provisioning skips them, the recorder drops non-numeric values + meters) — never silent. T3/T14/T17 encode this; T21 documents it.
  3. Alarm-history reads depend on the gateway's SQL event path. ReadEventsAsync requires the gateway deployed with RuntimeDb:EventReadsEnabled=true; the source-name server filter is delivered by the gateway-client plan (§5.3) — until it lands, T11 filters client-side and must not assume server filtering.
  4. WriteLiveValues requires gateway RuntimeDb:Enabled=true AND an EnsureTags-provisioned tag. The provisioning hook (T15) must have run for a tag before the recorder's writes (T17) land — provisioning is fire-and-forget, so the first few values for a brand-new historized tag may be rejected until EnsureTags completes; the outbox retains + retries them (no loss). Validate the ordering in T20's write→read round-trip.
  5. received_time UTC semantics. On the SQL event/value paths, inherit whatever the gateway's feat/sql-readevents work establishes for local-vs-UTC and EventTimeUTCOffsetMins. Stamp all OtOpcUa-side timestamps UTC (DateTimeKind.Utc) and assert kind in the mappers (T4/T5/T6).