Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
86 KiB
OtOpcUa ↔ HistorianGateway Integration — Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
Goal: Make HistorianGateway the sole historian read/write backend for the OtOpcUa OPC UA server — replacing the bespoke Wonderware TCP/ArchestrA sidecar — via a new gateway-backed IHistorianDataSource + IAlarmHistorianWriter + IHistorianProvisioning, a continuous-historization recorder, and DI factory swaps.
Architecture: A new ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway driver project consumes the Gitea-feed ZB.MOM.WW.HistorianGateway.Client package (gRPC over historian_gateway.v1) behind a thin IHistorianGatewayClient seam so every adapter is unit-testable against a fake. Pure, matrix-guarded mappers (design §6) translate OPC UA / driver shapes to gateway proto shapes and back; the read adapter swaps into the existing AddServerHistorian factory (the independently-shippable read cutover), the alarm writer swaps into AddAlarmHistorian (behind the existing SqliteStoreAndForwardSink), and a new Akka recorder taps DependencyMuxActor value changes into a crash-safe FasterLog outbox that drains to WriteLiveValues. An EnsureTags provisioning hook fires non-blocking from AddressSpaceApplier.Apply().
Tech Stack: .NET 10, OPC Foundation UA .NET Standard, Akka.NET actors, Microsoft.FASTER.Core (FasterLog outbox), ZB.MOM.WW.HistorianGateway.Client (gRPC), xUnit.
Prerequisites & Constraints
Cross-plan dependency (hard gate). This plan DEPENDS ON the sibling gateway-client plan
(docs/plans/2026-06-26-historian-gateway-client.md) having already published
ZB.MOM.WW.HistorianGateway.Client and ZB.MOM.WW.HistorianGateway.Contracts to the Gitea feed
(dohertj2-gitea). Phase A consumes those packages; do not start until they resolve. Confirm with:
dotnet package search ZB.MOM.WW.HistorianGateway.Client --source dohertj2-gitea.
Gateway deployment prerequisites (no code here — runtime config of the target gateway). The gateway OtOpcUa points at MUST run with:
RuntimeDb:Enabled=true— enables theWriteLiveValuesSQL live path (continuous historization).RuntimeDb:EventReadsEnabled=true— enablesReadEventsfromRuntime.dbo.Events(alarm-history reads; C2 SQL workaround).- An API key carrying scopes
historian:read,historian:write,historian:tags:write.
The source-name filter on the SQL ReadEvents path is delivered by the gateway-client plan (its Task 14 / design §5.3); until it lands, ReadEventsAsync(sourceName, …) is time-range-only and filters client-side. T11 notes this.
Engineering constraints (every task).
- Zero new warnings.
Directory.Build.propsenforcesNullable=enable,Platforms=x64,PlatformTarget=x64, and treats new warnings as build breaks. Fix, never suppress. - Secrets via environment only. The gateway API key, any connection strings, and TLS material are supplied via env vars (e.g.
ServerHistorian__ApiKey). The committed appsettings defaults are blank or dev-only placeholders. Never commit a real key. - Redaction. No hostnames, credentials, API keys, or tag values in error messages or logs by default. Map gateway/gRPC exceptions to safe messages — mirror the existing
WonderwareHistorianClientposture. - Gitea, not GitHub.
originis Gitea;ghwill not work. Usegit pushtoorigin; open PRs via the Gitea API withGITEA_TOKEN. - Branch isolation. OtOpcUa's working tree is currently DIRTY from unrelated in-flight work. This plan runs on its OWN fresh branch off OtOpcUa
main:Do not entangle with the dirty tree. Ifcd ~/Desktop/OtOpcUa git stash list # confirm you understand existing state; do NOT touch it git fetch origin git switch -c feat/historian-gateway-backend origin/mainorigin/mainis unavailable, branch off the localmainafter confirming it is clean of the in-flight work. - Commit-message trailer. Every commit message ends with:
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii - Plan relocation. This file (and its
.tasks.json) live in the HistorianGateway repo while authored; when execution starts they relocate into~/Desktop/OtOpcUa/docs/plans/. All OtOpcUa paths below are absolute under~/Desktop/OtOpcUa. - Build/test anchors. Solution is
~/Desktop/OtOpcUa/ZB.MOM.WW.OtOpcUa.slnx. Build:dotnet build ZB.MOM.WW.OtOpcUa.slnx. Test a single class:dotnet test --filter "FullyQualifiedName~<ClassName>".
Reference implementations to mirror (read before touching the matching task).
- Quality→OPC-UA-StatusCode:
src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/QualityMapper.cs(port the switch verbatim; the gatewayHistorianSample.opc_qualityis the same OPC DA quality byte). - Aggregate-mode mapping + Total handling + AlignAtTime + null→BadNoData:
…Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs(MapAggregate,ReadProcessedAsync,AlignAtTimeSnapshots,ToAggregateSnapshots). - Health-counter discipline (single
_healthLock,TotalSuccesses+TotalFailures==TotalQueries): same file,GetHealthSnapshot/RecordOutcome. - FasterLog outbox:
~/Desktop/HistorianGateway/src/ZB.MOM.WW.HistorianGateway.Server/Historian/FasterLogOutboxStore.cs(PerEntry/Periodic commit, append/peek/remove-truncate, RecoverState).
The IHistorianGatewayClient seam. All OtOpcUa adapters depend on a thin interface
IHistorianGatewayClient (defined in T1, in the Gateway driver project) that exposes exactly the
gateway operations OtOpcUa needs, in proto-typed terms:
// uses ZB.MOM.WW.HistorianGateway.Contracts.Grpc types from the Contracts package
public interface IHistorianGatewayClient : IAsyncDisposable
{
IAsyncEnumerable<HistorianSample> ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken ct);
IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken ct);
Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken ct);
IAsyncEnumerable<HistorianEvent> ReadEventsAsync(string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, CancellationToken ct);
Task<WriteAck> WriteLiveValuesAsync(string tag, IReadOnlyList<HistorianLiveValue> values, CancellationToken ct);
Task<WriteAck> SendEventAsync(HistorianEvent evt, CancellationToken ct);
Task<TagOperationResults> EnsureTagsAsync(IReadOnlyList<HistorianTagDefinition> definitions, CancellationToken ct);
Task<bool> ProbeAsync(CancellationToken ct);
Task<ConnectionStatus> GetConnectionStatusAsync(CancellationToken ct);
}
The concrete HistorianGatewayClientAdapter : IHistorianGatewayClient (wrapping the package's
HistorianGatewayClient) lands in T10 where the factory wires it. Every other adapter/recorder is
TDD'd against an in-memory FakeHistorianGatewayClient test double.
Task 1: Consume the gateway packages + scaffold the Gateway driver project
Classification: small Estimated implement time: ~5 min Parallelizable with: none (every later task builds on this project)
Files:
- Modify
/Users/dohertj2/Desktop/OtOpcUa/NuGet.config - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/IHistorianGatewayClient.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.csproj - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/FakeHistorianGatewayClient.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/ProjectSmokeTests.cs - Modify
/Users/dohertj2/Desktop/OtOpcUa/ZB.MOM.WW.OtOpcUa.slnx
Step 1: failing test — ProjectSmokeTests.cs:
using Xunit;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
public sealed class ProjectSmokeTests
{
[Fact]
public void GatewayClientSeam_IsReferenceable()
{
// Compiles only if the project references the Contracts package and the seam exists.
var t = typeof(IHistorianGatewayClient);
Assert.Equal("IHistorianGatewayClient", t.Name);
}
}
FakeHistorianGatewayClient.cs implements IHistorianGatewayClient with settable result fields + recorded-call lists (the reusable double for all later tasks): each method returns from a public Func/queue field defaulting to empty, and records its arguments.
Step 2: run, expect FAIL — dotnet test ZB.MOM.WW.OtOpcUa.slnx --filter "FullyQualifiedName~ProjectSmokeTests" → FAILS to compile/restore (project/package/seam absent).
Step 3: implement
- In
NuGet.config, add to thedohertj2-gitea<packageSource>block:<package pattern="ZB.MOM.WW.HistorianGateway.Contracts" /> <package pattern="ZB.MOM.WW.HistorianGateway.Client" /> - New
…Driver.Historian.Gateway.csproj(mirror a sibling driver csproj's TFM/props; do NOT set its ownNullable/Platforms—Directory.Build.propssupplies them):<PackageReference Include="ZB.MOM.WW.HistorianGateway.Client" Version="0.1.0" />(resolve the actual published version withdotnet package search; the Client transitively brings.Contracts).<ProjectReference>toCore/ZB.MOM.WW.OtOpcUa.Core.AbstractionsandCore/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.
IHistorianGatewayClient.cs— the interface exactly as in Prerequisites (proto-typed;using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;).- Test csproj: references the Gateway driver project +
Microsoft.NET.Test.Sdk,xunit,xunit.runner.visualstudio(copy versions from a sibling*.Tests.csproj). FakeHistorianGatewayClientas described.- Add both projects to the solution:
dotnet sln ZB.MOM.WW.OtOpcUa.slnx add src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.csproj.
Step 4: run, expect PASS — same filter → PASS; dotnet build ZB.MOM.WW.OtOpcUa.slnx clean (0 warnings).
Step 5: commit
git add NuGet.config ZB.MOM.WW.OtOpcUa.slnx src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests
git commit -m "feat(historian-gateway): scaffold Gateway driver project + consume client package
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 2: HistoryAggregateType → RetrievalMode mapper (matrix-guarded)
Classification: small Estimated implement time: ~4 min Parallelizable with: Task 3, Task 4, Task 5, Task 6
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/AggregateModeMapper.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/AggregateModeMapperTests.cs
Step 1: failing test — exhaustive per-member + matrix guard:
using ZB.MOM.WW.OtOpcUa.Core.Abstractions; // HistoryAggregateType
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; // RetrievalMode
using Xunit;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.Mapping;
public sealed class AggregateModeMapperTests
{
[Theory]
[InlineData(HistoryAggregateType.Average, RetrievalMode.TimeWeightedAverage)]
[InlineData(HistoryAggregateType.Minimum, RetrievalMode.MinimumWithTime)]
[InlineData(HistoryAggregateType.Maximum, RetrievalMode.MaximumWithTime)]
[InlineData(HistoryAggregateType.Total, RetrievalMode.Integral)]
[InlineData(HistoryAggregateType.Count, RetrievalMode.Counter)]
public void Maps_each_aggregate(HistoryAggregateType a, RetrievalMode expected)
=> Assert.Equal(expected, AggregateModeMapper.ToRetrievalMode(a));
[Fact] // matrix guard: a new HistoryAggregateType member must fail here
public void Every_aggregate_member_is_mapped()
{
foreach (var a in Enum.GetValues<HistoryAggregateType>())
_ = AggregateModeMapper.ToRetrievalMode(a); // must not throw for any defined member
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~AggregateModeMapperTests" → FAILS (mapper absent).
Step 3: implement — AggregateModeMapper.ToRetrievalMode(HistoryAggregateType): a switch expression mapping the five members per the table; the _ => arm throws ArgumentOutOfRangeException (so a future enum member fails the matrix guard, not silently mis-maps). Verify against WonderwareHistorianClient.MapAggregate: Average/Min/Max line up; the Wonderware path had no native Total (derived Average×interval) and used ValueCount for Count — the gateway's native RetrievalMode.Integral (Total) and RetrievalMode.Counter (Count) replace those client-side workarounds, so the gateway path is a strict improvement. Add an XML doc note recording that Total/Count are now native modes (no client-side scaling).
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): HistoryAggregateType->RetrievalMode mapper (matrix-guarded)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 3: DriverDataType → HistorianDataType mapper with write-gap fallbacks (matrix-guarded)
Classification: standard Estimated implement time: ~5 min Parallelizable with: Task 2, Task 4, Task 5, Task 6
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/HistorianTypeMapper.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/HistorianTypeMapperTests.cs
Step 1: failing test — per-member mapping + deferred-throws + matrix guard:
using ZB.MOM.WW.OtOpcUa.Core.Abstractions; // DriverDataType
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; // HistorianDataType
using Xunit;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.Mapping;
public sealed class HistorianTypeMapperTests
{
[Theory]
[InlineData(DriverDataType.Boolean, HistorianDataType.Int1)]
[InlineData(DriverDataType.Int16, HistorianDataType.Int2)]
[InlineData(DriverDataType.Int32, HistorianDataType.Int4)]
[InlineData(DriverDataType.Int64, HistorianDataType.Int8)]
[InlineData(DriverDataType.UInt16, HistorianDataType.Uint4)] // fallback: UInt2 write deferred upstream
[InlineData(DriverDataType.UInt32, HistorianDataType.Uint4)]
[InlineData(DriverDataType.UInt64, HistorianDataType.Uint8)]
[InlineData(DriverDataType.Float32, HistorianDataType.Float)]
[InlineData(DriverDataType.Float64, HistorianDataType.Double)]
public void Maps_writable_numeric_types(DriverDataType d, HistorianDataType expected)
=> Assert.Equal(expected, HistorianTypeMapper.ToHistorianDataType(d));
[Theory]
[InlineData(DriverDataType.String)]
[InlineData(DriverDataType.DateTime)]
[InlineData(DriverDataType.Reference)]
public void Deferred_types_throw_NotSupported_with_clear_message(DriverDataType d)
{
var ex = Assert.Throws<NotSupportedException>(() => HistorianTypeMapper.ToHistorianDataType(d));
Assert.Contains("not historized in v1", ex.Message); // human-actionable, no tag value leaked
}
[Fact] // matrix guard: a new DriverDataType member must be classified (mapped or explicitly deferred)
public void Every_DriverDataType_member_is_classified()
{
foreach (var d in Enum.GetValues<DriverDataType>())
{
try { _ = HistorianTypeMapper.ToHistorianDataType(d); }
catch (NotSupportedException) { /* explicitly deferred — acceptable */ }
// any OTHER exception (e.g. ArgumentOutOfRangeException from an unhandled new member) fails the test
}
}
}
Note: protobuf C# generates
HistorianDataType.Uint4/Uint8(single capital U). Confirm the generated casing against the resolved Contracts package and align theInlineData.
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~HistorianTypeMapperTests" → FAILS.
Step 3: implement — HistorianTypeMapper.ToHistorianDataType(DriverDataType): switch per the §6 table; the three deferred members throw NotSupportedException with a message like $"DriverDataType.{d} is not historized in v1 (string/datetime/reference writes are deferred — gated on the analog SQL write path)." (no tag name/value in the message). The default arm also throws NotSupportedException referencing the unmapped member, so the matrix guard catches a future enum addition. Add a same-file bool IsHistorizable(DriverDataType) helper (true for the nine numeric members) — T15's provisioning hook uses it to skip deferred types without catching exceptions.
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): DriverDataType->HistorianDataType mapper + write-gap fallbacks (matrix-guarded)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 4: HistorianSample/HistorianAggregateSample → DataValueSnapshot + quality mapper
Classification: standard Estimated implement time: ~5 min Parallelizable with: Task 2, Task 3, Task 5, Task 6
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/GatewayQualityMapper.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/SampleMapper.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/GatewayQualityMapperTests.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/SampleMapperTests.cs
Step 1: failing test — quality parity (port the Wonderware table) + sample/aggregate shape:
public sealed class GatewayQualityMapperTests
{
[Theory]
[InlineData(192, 0x00000000u)] // Good
[InlineData(216, 0x00D80000u)] // Good_LocalOverride
[InlineData(64, 0x40000000u)] // Uncertain
[InlineData(0, 0x80000000u)] // Bad
[InlineData(8, 0x808A0000u)] // Bad_NotConnected
[InlineData(255, 0x00000000u)] // >=192 bucket
[InlineData(100, 0x40000000u)] // >=64 bucket
[InlineData(1, 0x80000000u)] // bad bucket
public void Maps_opc_quality_byte(byte q, uint expected)
=> Assert.Equal(expected, GatewayQualityMapper.Map(q));
}
public sealed class SampleMapperTests
{
[Fact]
public void Numeric_sample_maps_value_quality_and_timestamps()
{
var s = new HistorianSample { Tag = "T", NumericValue = 12.5,
Quality = 192, OpcQuality = 192, Timestamp = Ts(2026,1,1,0,0,0) };
var snap = SampleMapper.ToSnapshot(s);
Assert.Equal(12.5, Assert.IsType<double>(snap.Value));
Assert.Equal(0x00000000u, snap.StatusCode);
Assert.Equal(DateTimeKind.Utc, snap.SourceTimestampUtc!.Value.Kind);
}
[Fact]
public void String_sample_carries_string_value()
{
var s = new HistorianSample { Tag = "T", StringValue = "abc", OpcQuality = 192, Timestamp = Ts(2026,1,1,0,0,0) };
Assert.Equal("abc", SampleMapper.ToSnapshot(s).Value);
}
[Fact]
public void Aggregate_null_value_is_BadNoData()
{
var a = new HistorianAggregateSample { Tag = "T", /* Value unset */ EndTime = Ts(2026,1,1,0,0,0) };
var snap = SampleMapper.ToAggregateSnapshot(a);
Assert.Equal(0x800E0000u, snap.StatusCode); // BadNoData
Assert.Null(snap.Value);
}
// Ts(...) builds a Google.Protobuf.WellKnownTypes.Timestamp from UTC parts.
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~SampleMapperTests|FullyQualifiedName~GatewayQualityMapperTests" → FAILS.
Step 3: implement
GatewayQualityMapper.Map(byte)— byte-identical port of…Wonderware.Client/Internal/QualityMapper.cs(copy the switch verbatim; add an XML doc cross-reference to that origin so a future quality-table change stays in parity).SampleMapper.ToSnapshot(HistorianSample)→DataValueSnapshot(Value, StatusCode, SourceTimestampUtc, ServerTimestampUtc):Value:s.NumericValue(when present via proto3 optionalHasNumericValue) boxed asdouble, elses.StringValuewhen present, elsenull.StatusCode:GatewayQualityMapper.Map((byte)s.OpcQuality)(preferopc_quality; if zero/unset andqualitycarries the OPC-DA byte, fall back toquality— match whatever the gateway populates; document the choice).SourceTimestampUtc:s.Timestamp.ToDateTime()(UTC kind).ServerTimestampUtc:DateTime.UtcNow.
SampleMapper.ToAggregateSnapshot(HistorianAggregateSample)→ null aggregate value ⇒StatusCode 0x800E0000(BadNoData), non-null ⇒ Good (0x00000000) with the value;SourceTimestampUtc← the bucket end/start timestamp (match the WonderwareToAggregateSnapshotsconvention — it stamps the bucket timestamp). ProvideIReadOnlyList<>batch helpers too.
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): sample/aggregate->DataValueSnapshot + quality mapper (Wonderware parity)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 5: HistorianEvent → HistoricalEvent mapper (+ severity from properties)
Classification: small Estimated implement time: ~4 min Parallelizable with: Task 2, Task 3, Task 4, Task 6
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/EventMapper.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/EventMapperTests.cs
Step 1: failing test:
public sealed class EventMapperTests
{
[Fact]
public void Maps_core_fields_and_times()
{
var e = new HistorianEvent { Id = "E1", SourceName = "Pump1",
EventTime = Ts(2026,1,1,0,0,0), ReceivedTime = Ts(2026,1,1,0,0,5) };
e.Properties["Message"] = "High temp";
e.Properties["Severity"] = "700";
var h = EventMapper.ToHistoricalEvent(e);
Assert.Equal("E1", h.EventId);
Assert.Equal("Pump1", h.SourceName);
Assert.Equal("High temp", h.Message);
Assert.Equal((ushort)700, h.Severity);
Assert.Equal(DateTimeKind.Utc, h.EventTimeUtc.Kind);
}
[Theory]
[InlineData("Priority", "999", 999)]
[InlineData("Severity", "0", 1)] // clamp to OPC UA min 1
[InlineData("Severity", "5000", 1000)] // clamp to OPC UA max 1000
[InlineData(null, null, 1)] // missing → default min severity
public void Severity_parsed_and_clamped(string? key, string? val, int expected)
{
var e = new HistorianEvent { Id = "E", EventTime = Ts(2026,1,1,0,0,0), ReceivedTime = Ts(2026,1,1,0,0,0) };
if (key is not null) e.Properties[key] = val!;
Assert.Equal((ushort)expected, EventMapper.ToHistoricalEvent(e).Severity);
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~EventMapperTests" → FAILS.
Step 3: implement — EventMapper.ToHistoricalEvent(HistorianEvent):
EventId←e.Id;SourceName←e.SourceName;EventTimeUtc←e.EventTime.ToDateTime();ReceivedTimeUtc←e.ReceivedTime.ToDateTime().Message←Properties["Message"]if present, elsee.Type(best-effort render); never null-crash.Severity← parseProperties["Severity"]elseProperties["Priority"]; clamp to[1,1000]; missing/unparseable ⇒1. Return(ushort).- Add a batch helper
IReadOnlyList<HistoricalEvent> ToHistoricalEvents(IEnumerable<HistorianEvent>).
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): HistorianEvent->HistoricalEvent mapper (+ clamped severity)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 6: AlarmHistorianEvent → HistorianEvent mapper (SendEvent)
Classification: small Estimated implement time: ~4 min Parallelizable with: Task 2, Task 3, Task 4, Task 5
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/AlarmEventMapper.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Mapping/AlarmEventMapperTests.cs
Step 1: failing test:
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; // AlarmHistorianEvent
using ZB.MOM.WW.OtOpcUa.Core.Abstractions; // AlarmSeverity
public sealed class AlarmEventMapperTests
{
[Fact]
public void Maps_source_time_type_and_rich_properties()
{
var a = new AlarmHistorianEvent("A1","Area/Line/Pump1","HiHi","LimitAlarm",
AlarmSeverity.High, "Activated", "Temp high", "operator1", "ack note",
new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc));
var e = AlarmEventMapper.ToHistorianEvent(a);
Assert.Equal("Area/Line/Pump1", e.SourceName);
Assert.Equal("LimitAlarm", e.Type);
Assert.Equal(new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc), e.EventTime.ToDateTime());
Assert.Equal("HiHi", e.Properties["AlarmName"]);
Assert.Equal("Activated", e.Properties["EventKind"]);
Assert.Equal("High", e.Properties["Severity"]);
Assert.Equal("operator1", e.Properties["User"]);
Assert.Equal("ack note", e.Properties["Comment"]);
Assert.Equal("Temp high", e.Properties["Message"]);
}
[Fact]
public void Null_comment_is_omitted_not_null()
{
var a = new AlarmHistorianEvent("A","S","N","DiscreteAlarm",AlarmSeverity.Low,"Cleared","m","system",null,
new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc));
Assert.False(AlarmEventMapper.ToHistorianEvent(a).Properties.ContainsKey("Comment"));
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~AlarmEventMapperTests" → FAILS.
Step 3: implement — AlarmEventMapper.ToHistorianEvent(AlarmHistorianEvent):
Id←a.AlarmId(orGuid.NewGuid().ToString("N")if blank);SourceName←a.EquipmentPath;EventTime←Timestamp.FromDateTime(DateTime.SpecifyKind(a.TimestampUtc, DateTimeKind.Utc));ReceivedTime← same as event time (server re-stamps on the SQL path);Type←a.AlarmTypeName.Propertiesmap:AlarmName,EventKind,Severity(a.Severity.ToString()),User,Message; addCommentonly when non-null. Protomap<string,string>values must be non-null — never insert a null.
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): AlarmHistorianEvent->HistorianEvent mapper (SendEvent properties)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 7: GatewayHistorianDataSource — ReadRaw / ReadProcessed / ReadAtTime
Classification: standard Estimated implement time: ~5 min Parallelizable with: none (consumes T2 + T4; gates T8/T10/T11)
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs
Step 1: failing test — against FakeHistorianGatewayClient:
public sealed class GatewayHistorianDataSourceTests
{
[Fact]
public async Task ReadRaw_maps_samples_and_passes_args()
{
var fake = new FakeHistorianGatewayClient();
fake.RawSamples = new[] {
new HistorianSample { Tag="T", NumericValue=1.0, OpcQuality=192, Timestamp=Ts(2026,1,1,0,0,0) },
new HistorianSample { Tag="T", NumericValue=2.0, OpcQuality=0, Timestamp=Ts(2026,1,1,0,0,1) },
};
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
var r = await ds.ReadRawAsync("T", DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 100, default);
Assert.Equal(2, r.Samples.Count);
Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // Bad from quality 0
Assert.Equal("T", fake.LastReadRawTag);
Assert.Equal(100, fake.LastReadRawMaxValues);
}
[Fact]
public async Task ReadProcessed_uses_aggregate_mode_mapping()
{
var fake = new FakeHistorianGatewayClient();
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
await ds.ReadProcessedAsync("T", default, default, TimeSpan.FromSeconds(60), HistoryAggregateType.Minimum, default);
Assert.Equal(RetrievalMode.MinimumWithTime, fake.LastAggregateMode);
Assert.Equal(TimeSpan.FromSeconds(60), fake.LastAggregateInterval);
}
[Fact]
public async Task ReadAtTime_aligns_one_snapshot_per_timestamp_with_gaps_Bad()
{
var fake = new FakeHistorianGatewayClient();
var t0 = new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc);
var t1 = t0.AddSeconds(1);
fake.AtTimeSamples = new[] { new HistorianSample { NumericValue=5.0, OpcQuality=192, Timestamp=Timestamp.FromDateTime(t0) } };
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
var r = await ds.ReadAtTimeAsync("T", new[]{ t0, t1 }, default);
Assert.Equal(2, r.Samples.Count); // exactly one per requested ts, in order
Assert.Equal(5.0, r.Samples[0].Value);
Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // gap → Bad at requested ts
}
[Fact]
public async Task Empty_window_is_not_a_fault()
{
var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty<HistorianSample>() };
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
var r = await ds.ReadRawAsync("T", default, default, 10, default);
Assert.Empty(r.Samples); // GoodNoData-empty, no throw
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~GatewayHistorianDataSourceTests" → FAILS.
Step 3: implement — GatewayHistorianDataSource : IHistorianDataSource (ctor takes IHistorianGatewayClient + ILogger<>):
ReadRawAsync: clampmaxValuesPerNodetoint((int)Math.Min(maxValuesPerNode, int.MaxValue)); drainclient.ReadRawAsyncIAsyncEnumerable→SampleMapper.ToSnapshot; returnnew HistoryReadResult(list, ContinuationPoint: null).ReadProcessedAsync:AggregateModeMapper.ToRetrievalMode(aggregate)→client.ReadAggregateAsync(...); map viaSampleMapper.ToAggregateSnapshot. (No client-side Total scaling —Integralis native; delete the Wonderware workaround.)ReadAtTimeAsync: callclient.ReadAtTimeAsync; align exactly one snapshot per requested timestamp in order, gaps → Bad (0x80000000) stamped at the requested time — portWonderwareHistorianClient.AlignAtTimeSnapshotsverbatim (index returned samples byTimestamp.ToDateTime().Ticks).- Record health counters on every read via a private
RecordOutcomemirroring the Wonderware single-lock discipline (used by T8). Map a thrown gateway exception to a recorded failure + rethrow (the node manager turns it into a Bad result; never crash the host). Dispose()disposes the client (bridge theIAsyncDisposablelike the WonderwareDispose).
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): GatewayHistorianDataSource read paths (raw/processed/at-time)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 8: GetHealthSnapshot from Probe / GetConnectionStatus
Classification: small Estimated implement time: ~4 min Parallelizable with: none (extends T7's class)
Files:
- Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs
Step 1: failing test:
public sealed class GatewayHealthSnapshotTests
{
[Fact]
public async Task Counters_track_success_and_failure()
{
var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty<HistorianSample>() };
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
await ds.ReadRawAsync("T", default, default, 1, default);
fake.ThrowOnRead = true;
await Assert.ThrowsAnyAsync<Exception>(() => ds.ReadRawAsync("T", default, default, 1, default));
var h = ds.GetHealthSnapshot();
Assert.Equal(2, h.TotalQueries);
Assert.Equal(1, h.TotalSuccesses);
Assert.Equal(1, h.TotalFailures);
Assert.Equal(1, h.ConsecutiveFailures);
Assert.Equal(h.TotalQueries, h.TotalSuccesses + h.TotalFailures); // invariant
}
[Fact]
public void Connection_state_reflects_GetConnectionStatus_flags()
{
var fake = new FakeHistorianGatewayClient {
ConnectionStatus = new ConnectionStatus { ConnectedToServer = true, ConnectionKind = 0b11 } }; // Process|Event
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
ds.RefreshConnectionStateAsync(default).GetAwaiter().GetResult(); // internal probe used by health hosted-service
var h = ds.GetHealthSnapshot();
Assert.True(h.ProcessConnectionOpen);
Assert.True(h.EventConnectionOpen);
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~GatewayHealthSnapshotTests" → FAILS.
Step 3: implement
- Add the six health counter fields under one
_healthLock+RecordOutcome(bool, string?)(copy the Wonderware discipline soTotalSuccesses + TotalFailures == TotalQueriesalways holds). WireRecordOutcomeinto every read method from T7. - Add a lightweight, non-blocking cached connection state:
RefreshConnectionStateAsync(ct)callsclient.GetConnectionStatusAsync(and/orProbeAsync) and cachesProcessConnectionOpen←ConnectedToServer && (ConnectionKind & 1)!=0,EventConnectionOpen←ConnectedToServer && (ConnectionKind & 2)!=0.GetHealthSnapshot()is pure observation — returns the cached flags + counters; it never blocks on I/O (per the interface contract).ActiveProcessNode/ActiveEventNode/Nodesare null/empty (the gateway is non-clustered to us, mirroring the Wonderware client's Finding 010 posture).
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): GetHealthSnapshot via Probe/GetConnectionStatus (counter discipline)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 9: Reshape ServerHistorianOptions to gateway form
Classification: high-risk Estimated implement time: ~5 min Parallelizable with: Task 2–Task 8 (different project/area)
Files:
- Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ServerHistorianOptions.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ServerHistorianOptionsTests.cs(place in the existing Runtime test project; confirm its path withfind tests -name "*Runtime.Tests.csproj")
Step 1: failing test:
public sealed class ServerHistorianOptionsTests
{
[Fact]
public void Disabled_yields_no_warnings()
=> Assert.Empty(new ServerHistorianOptions { Enabled = false }.Validate());
[Fact]
public void Enabled_without_endpoint_warns()
{
var w = new ServerHistorianOptions { Enabled = true, Endpoint = "", ApiKey = "histgw_x_y" }.Validate();
Assert.Contains(w, m => m.Contains("Endpoint"));
}
[Fact]
public void Enabled_without_apikey_warns()
{
var w = new ServerHistorianOptions { Enabled = true, Endpoint = "https://h:5222", ApiKey = "" }.Validate();
Assert.Contains(w, m => m.Contains("ApiKey"));
}
[Fact]
public void Valid_config_is_clean()
=> Assert.Empty(new ServerHistorianOptions { Enabled = true, Endpoint = "https://h:5222", ApiKey = "histgw_x_y" }.Validate());
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~ServerHistorianOptionsTests" → FAILS (new members absent).
Step 3: implement — reshape ServerHistorianOptions:
- Add:
string Endpoint(e.g.https://host:5222),string ApiKey = ""(supplied via envServerHistorian__ApiKey),bool UseTls = true,bool AllowUntrustedServerCertificate = false,string? CaCertificatePath,TimeSpan CallTimeout = TimeSpan.FromSeconds(30). - Remove:
Host,Port,SharedSecret,ServerCertThumbprint(Wonderware-specific). KeepEnabledandMaxTieClusterOverfetch(still used by the node manager's HistoryRead-Raw tie-cluster paging — unchanged). Validate(): warn when enabled andEndpointblank; warn when enabled andApiKeyblank ("the gateway gRPC surface will reject unauthenticated calls"); keep theMaxTieClusterOverfetch <= 0warning. No secret values in warning text.- Update the class XML doc to describe the gateway backend (not the Wonderware sidecar).
High-risk: this is a config contract change. Any deployment
appsettingsServerHistorianblock changes shape (T21 documents the new keys + provides a migration note). The build will surface every stale reference to the removed members — the only in-tree consumer is the Program.cs factory (T10) and the (to-be-retired) Wonderware wiring.
Step 4: run, expect PASS — filter passes; dotnet build of the Runtime project clean. (Program.cs will not compile until T10 — that is expected and handled there; build the Runtime + Tests projects in isolation for this task: dotnet build src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ZB.MOM.WW.OtOpcUa.Runtime.csproj.)
Step 5: commit
git commit -am "feat(historian-gateway): reshape ServerHistorianOptions to gateway form (Endpoint/ApiKey/Tls)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 10: Swap AddServerHistorian factory in Program.cs (READ CUTOVER)
Classification: high-risk Estimated implement time: ~5 min Parallelizable with: none (the read cutover gate)
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/HistorianGatewayClientAdapter.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianServiceCollectionExtensions.cs(a Host-callable factory helper) - Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/HistorianGatewayClientAdapterTests.cs
Step 1: failing test — adapter construction + factory shape (no live gateway needed):
public sealed class HistorianGatewayClientAdapterTests
{
[Fact]
public void Adapter_constructs_from_options_without_dialing()
{
// Constructing the channel must not perform network I/O (lazy connect).
var opts = new ServerHistorianOptions { Enabled = true, Endpoint = "https://localhost:5222", ApiKey = "histgw_x_y" };
using var adapter = GatewayHistorianClientAdapter.Create(opts, NullLoggerFactory.Instance);
Assert.NotNull(adapter);
}
[Fact]
public void Factory_builds_GatewayHistorianDataSource()
{
var opts = new ServerHistorianOptions { Enabled = true, Endpoint = "https://localhost:5222", ApiKey = "histgw_x_y" };
var ds = GatewayHistorian.CreateDataSource(opts, services: new ServiceCollection().BuildServiceProvider());
Assert.IsType<GatewayHistorianDataSource>(ds);
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~HistorianGatewayClientAdapterTests" → FAILS.
Step 3: implement
HistorianGatewayClientAdapter : IHistorianGatewayClient— wraps the package'sHistorianGatewayClient.Create(ServerHistorianOptions, ILoggerFactory)builds the underlying client fromHistorianGatewayClientOptions(Endpoint,ApiKey,UseTls,AllowUntrustedServerCertificate,CaCertificatePath,CallTimeout) — match the exact options surface the published Client package exposes (the sibling client plan). Each interface method forwards to the matching client wrapper (streaming →IAsyncEnumerable, unary →Task). Channel construction is lazy (no network I/O in the ctor). Map the client's typed exceptions through unchanged (the data source records them as failures).GatewayHistorian.CreateDataSource(ServerHistorianOptions, IServiceProvider)static factory:new GatewayHistorianDataSource(HistorianGatewayClientAdapter.Create(opts, sp.GetService<ILoggerFactory>() ?? NullLoggerFactory.Instance), sp.GetService<ILogger<GatewayHistorianDataSource>>() ?? NullLogger<…>.Instance).- In
Program.cs(hasDriverblock, theAddServerHistoriancall ~lines 115-122): replace theWonderwareHistorianClientfactory lambda with(opts, sp) => GatewayHistorian.CreateDataSource(opts, sp). Update theusingfrom…Driver.Historian.Wonderware.Clientto…Driver.Historian.Gateway(theAddAlarmHistorianWonderware lambda stays for now — T13 swaps it). Add the Host<ProjectReference>to the Gateway driver project inZB.MOM.WW.OtOpcUa.Host.csproj.
Step 4: run, expect PASS — adapter tests pass; dotnet build ZB.MOM.WW.OtOpcUa.slnx is clean (Program.cs compiles against the reshaped options); the existing server-historian / node-manager HistoryRead tests stay green: dotnet test --filter "FullyQualifiedName~HistoryRead|FullyQualifiedName~ServerHistorian".
Step 5: commit
git commit -am "feat(historian-gateway): read cutover — AddServerHistorian builds GatewayHistorianDataSource
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
This is the read cutover gate. After this commit, OPC UA HistoryRead (raw/processed/at-time) for Galaxy + non-Galaxy tags flows through the gateway. Use case 1 is shippable here, before any write code.
Task 11: ReadEventsAsync (alarm history) on the data source
Classification: small Estimated implement time: ~4 min Parallelizable with: none (extends T7 class; consumes T5)
Files:
- Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayReadEventsTests.cs
Step 1: failing test:
public sealed class GatewayReadEventsTests
{
[Fact]
public async Task ReadEvents_maps_and_passes_source_filter()
{
var fake = new FakeHistorianGatewayClient {
Events = new[] { new HistorianEvent { Id="E1", SourceName="Pump1",
EventTime=Ts(2026,1,1,0,0,0), ReceivedTime=Ts(2026,1,1,0,0,0) } } };
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
var r = await ds.ReadEventsAsync("Pump1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 0, default);
Assert.Single(r.Events);
Assert.Equal("E1", r.Events[0].EventId);
Assert.Equal("Pump1", fake.LastReadEventsSourceName);
}
[Fact]
public async Task ReadEvents_truncation_sets_continuation_point()
{
var fake = new FakeHistorianGatewayClient {
Events = Enumerable.Range(0,5).Select(i => new HistorianEvent { Id=$"E{i}",
EventTime=Ts(2026,1,1,0,0,i), ReceivedTime=Ts(2026,1,1,0,0,i) }).ToArray() };
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
var r = await ds.ReadEventsAsync(null, DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 3, default);
Assert.Equal(3, r.Events.Count);
Assert.NotNull(r.ContinuationPoint); // cap truncated → non-null per Core.Abstractions-009
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~GatewayReadEventsTests" → FAILS.
Step 3: implement — ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, ct):
- Treat
maxEvents <= 0as the backend-default sentinel (do not pass a cap; let the gateway apply itsEventReadMaxRows). WhenmaxEvents > 0, stop draining atmaxEventsand set a non-nullContinuationPointiff the source produced at least one more event (truncation signal, Core.Abstractions-009); otherwise null. - Map via
EventMapper.ToHistoricalEvents. Record health outcome. - XML doc note: this path depends on the gateway being deployed with
RuntimeDb:EventReadsEnabled=true; the source-name server filter is delivered by the gateway-client plan's SQL-ReadEvents enhancement — until it lands,sourceNameis passed through but the gateway may return the full window and the adapter must not assume server-side filtering. (For v1 correctness, whensourceNameis non-null, defensively filter the mapped events bySourceNameclient-side as well; remove once the server filter is confirmed.)
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): ReadEventsAsync alarm-history via gateway ReadEvents (+ truncation signal)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 12: GatewayAlarmHistorianWriter : IAlarmHistorianWriter
Classification: standard Estimated implement time: ~5 min Parallelizable with: Task 14, Task 16 (independent files; all in Phase D)
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs
Step 1: failing test — outcome mapping from gRPC status (uses the fake's per-call SendEvent behaviour):
public sealed class GatewayAlarmHistorianWriterTests
{
private static AlarmHistorianEvent Evt(string id) => new(id,"Area/Pump","N","LimitAlarm",
AlarmSeverity.High,"Activated","m","u",null,new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc));
[Fact]
public async Task All_acked_when_SendEvent_succeeds()
{
var fake = new FakeHistorianGatewayClient { SendEventResult = new WriteAck { Success = true } };
var w = new GatewayAlarmHistorianWriter(fake, NullLogger<GatewayAlarmHistorianWriter>.Instance);
var outcomes = await w.WriteBatchAsync(new[]{ Evt("A"), Evt("B") }, default);
Assert.All(outcomes, o => Assert.Equal(HistorianWriteOutcome.Ack, o));
}
[Fact]
public async Task Unavailable_is_RetryPlease()
{
var fake = new FakeHistorianGatewayClient { SendEventThrows = new RpcException(new Status(StatusCode.Unavailable, "down")) };
var w = new GatewayAlarmHistorianWriter(fake, NullLogger<GatewayAlarmHistorianWriter>.Instance);
var outcomes = await w.WriteBatchAsync(new[]{ Evt("A") }, default);
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
}
[Fact]
public async Task InvalidArgument_is_PermanentFail()
{
var fake = new FakeHistorianGatewayClient { SendEventThrows = new RpcException(new Status(StatusCode.InvalidArgument, "malformed")) };
var w = new GatewayAlarmHistorianWriter(fake, NullLogger<GatewayAlarmHistorianWriter>.Instance);
var outcomes = await w.WriteBatchAsync(new[]{ Evt("A") }, default);
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
}
[Fact]
public async Task Empty_batch_returns_empty() =>
Assert.Empty(await new GatewayAlarmHistorianWriter(new FakeHistorianGatewayClient(),
NullLogger<GatewayAlarmHistorianWriter>.Instance).WriteBatchAsync(Array.Empty<AlarmHistorianEvent>(), default));
}
If the published Client wraps
RpcExceptionin a typed hierarchy (…UnavailableException,…AuthorizationException), assert on those types instead — align with the client plan's exception design (§9). The fake should be configurable to throw whichever the real client surfaces.
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~GatewayAlarmHistorianWriterTests" → FAILS.
Step 3: implement — GatewayAlarmHistorianWriter : IAlarmHistorianWriter: per event, AlarmEventMapper.ToHistorianEvent → client.SendEventAsync; map result to HistorianWriteOutcome:
- success ack ⇒
Ack. - transient gRPC (
Unavailable,DeadlineExceeded,ResourceExhausted,Aborted,Internal) or the client's…UnavailableException⇒RetryPlease. - permanent gRPC (
InvalidArgument,FailedPrecondition,OutOfRange,Unimplemented) ⇒PermanentFail(so the drain worker dead-letters poison events instead of looping to the cap — mirrors the WonderwarePerEventStatus==2boundary). Unauthenticated/PermissionDenied⇒RetryPlease(a key fix re-enables the batch; do not dead-letter on an auth blip).- Returns one outcome per event, same order. Empty batch ⇒
[]. Never throws out ofWriteBatchAsync— it sits behindSqliteStoreAndForwardSink, which expects per-event outcomes.
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): GatewayAlarmHistorianWriter — SendEvent + gRPC->outcome mapping
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 13: Swap AddAlarmHistorian factory in Program.cs
Classification: high-risk Estimated implement time: ~3 min Parallelizable with: none
Files:
- Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs
Step 1: failing test — reuse the existing Host integration/boot test (find it: find tests -name "*Host*Tests*.cs" | head); assert the resolved IAlarmHistorianWriter is the gateway writer when AlarmHistorian:Enabled=true. If no such test exists, add a minimal DI-resolution test in the Host test project that builds the service collection with the gateway writer factory and asserts the type. (Keep it offline — the writer ctor must not dial.)
Step 2: run, expect FAIL — run that test → FAILS (still wired to WonderwareHistorianClient).
Step 3: implement — in Program.cs AddAlarmHistorian(...) call (~lines 101-108): replace the WonderwareHistorianClient writer lambda with one that builds new GatewayAlarmHistorianWriter(HistorianGatewayClientAdapter.Create(<gateway options>, sp.GetService<ILoggerFactory>()...), sp.GetService<ILogger<GatewayAlarmHistorianWriter>>()...). The alarm-write path must point at the same gateway endpoint/key as the read path — source the connection from ServerHistorianOptions (single gateway) rather than the Wonderware-shaped AlarmHistorianOptions host/port. Reuse a single IHistorianGatewayClient/adapter instance across read + alarm-write where practical (resolve a shared singleton in DI to avoid two channels); if simplest, register IHistorianGatewayClient as a singleton built from ServerHistorianOptions and have both factories consume it. Remove the now-dead using …Wonderware.Client.
Step 4: run, expect PASS — the test passes; dotnet build ZB.MOM.WW.OtOpcUa.slnx clean.
Step 5: commit
git commit -am "feat(historian-gateway): alarm-write cutover — AddAlarmHistorian drains to GatewayAlarmHistorianWriter
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 14: IHistorianProvisioning + GatewayTagProvisioner (EnsureTags)
Classification: standard Estimated implement time: ~5 min Parallelizable with: Task 12, Task 16
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorianProvisioning.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayTagProvisioner.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayTagProvisionerTests.cs
Step 1: failing test:
public sealed class GatewayTagProvisionerTests
{
[Fact]
public async Task Ensures_numeric_tags_with_mapped_type()
{
var fake = new FakeHistorianGatewayClient { EnsureTagsResult = new TagOperationResults() };
var p = new GatewayTagProvisioner(fake, NullLogger<GatewayTagProvisioner>.Instance);
var reqs = new[] {
new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, "degC", "Temp"),
new HistorianTagProvisionRequest("Pump1.Run", DriverDataType.Boolean, null, null),
};
var result = await p.EnsureTagsAsync(reqs, default);
Assert.Equal(2, fake.LastEnsureDefinitions.Count);
Assert.Equal(HistorianDataType.Float, fake.LastEnsureDefinitions[0].DataType);
Assert.Equal(HistorianDataType.Int1, fake.LastEnsureDefinitions[1].DataType);
Assert.Equal(2, result.Requested);
Assert.Equal(0, result.Skipped);
}
[Fact]
public async Task Deferred_types_are_skipped_not_sent()
{
var fake = new FakeHistorianGatewayClient { EnsureTagsResult = new TagOperationResults() };
var p = new GatewayTagProvisioner(fake, NullLogger<GatewayTagProvisioner>.Instance);
var result = await p.EnsureTagsAsync(new[] {
new HistorianTagProvisionRequest("Pump1.Name", DriverDataType.String, null, null) }, default);
Assert.Empty(fake.LastEnsureDefinitions); // String is deferred → never sent
Assert.Equal(1, result.Skipped);
}
[Fact]
public async Task Gateway_failure_is_swallowed_and_counted_not_thrown()
{
var fake = new FakeHistorianGatewayClient { EnsureTagsThrows = new Exception("boom") };
var p = new GatewayTagProvisioner(fake, NullLogger<GatewayTagProvisioner>.Instance);
var result = await p.EnsureTagsAsync(new[] {
new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, null, null) }, default);
Assert.Equal(1, result.Failed); // non-blocking: no throw
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~GatewayTagProvisionerTests" → FAILS.
Step 3: implement
IHistorianProvisioning(Core.Abstractions, alongsideIHistorianDataSource):Task<HistorianProvisionResult> EnsureTagsAsync(IReadOnlyList<HistorianTagProvisionRequest> requests, CancellationToken ct). DefineHistorianTagProvisionRequest(string TagName, DriverDataType DataType, string? EngineeringUnit, string? Description)andHistorianProvisionResult(int Requested, int Ensured, int Skipped, int Failed)records here too. Add aNullHistorianProvisioningno-op (returns all-zero) so the Applier has a safe default.GatewayTagProvisioner : IHistorianProvisioning(Gateway driver): for each request, gate onHistorianTypeMapper.IsHistorizable(DataType)— non-historizable ⇒ countSkipped, log Debug, never build a definition; otherwise buildHistorianTagDefinition { TagName, DataType = HistorianTypeMapper.ToHistorianDataType(...), EngineeringUnit, Description }. Batch all historizable definitions into oneclient.EnsureTagsAsynccall. Non-blocking semantics: wrap the call in try/catch — any exception ⇒ count the batch asFailed, log a Warning (no tag values), return. Returns the result record. Never throws.
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): IHistorianProvisioning + GatewayTagProvisioner (EnsureTags, non-blocking)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 15: Hook provisioning into AddressSpaceApplier.Apply()
Classification: high-risk Estimated implement time: ~5 min Parallelizable with: none
Files:
- Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/ZB.MOM.WW.OtOpcUa.OpcUaServer.csproj(addCore.AbstractionsProjectReference) - Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/AddressSpaceApplier.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/AddressSpaceApplierProvisioningTests.cs(confirm the OpcUaServer test project path withfind tests -name "*OpcUaServer.Tests.csproj")
Step 1: failing test — synthetic plan with mixed historized/non-historized tags + a capturing IHistorianProvisioning:
public sealed class AddressSpaceApplierProvisioningTests
{
private sealed class CapturingProvisioner : IHistorianProvisioning
{
public List<HistorianTagProvisionRequest> Seen = new();
public bool Throw;
public Task<HistorianProvisionResult> EnsureTagsAsync(IReadOnlyList<HistorianTagProvisionRequest> r, CancellationToken ct)
{
if (Throw) throw new Exception("boom");
Seen.AddRange(r);
return Task.FromResult(new HistorianProvisionResult(r.Count, r.Count, 0, 0));
}
}
[Fact]
public void Apply_provisions_only_historized_added_tags()
{
var prov = new CapturingProvisioner();
var applier = new AddressSpaceApplier(NullSink, NullLogger, prov);
var plan = PlanWith(
Tag("Pump1.Temp", historized:true, historianName:"Pump1.Temp", type:"Float32"),
Tag("Pump1.Run", historized:false, historianName:null, type:"Boolean"));
applier.Apply(plan);
Assert.Single(prov.Seen);
Assert.Equal("Pump1.Temp", prov.Seen[0].TagName); // resolved historian name
}
[Fact]
public void Provisioner_throw_does_not_block_publish()
{
var applier = new AddressSpaceApplier(NullSink, NullLogger, new CapturingProvisioner { Throw = true });
var outcome = applier.Apply(PlanWith(Tag("Pump1.Temp", historized:true, historianName:"Pump1.Temp", type:"Float32")));
Assert.True(outcome.RebuildCalled); // address-space work still completed
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~AddressSpaceApplierProvisioningTests" → FAILS (ctor arity / hook absent).
Step 3: implement
- Add
<ProjectReference>fromZB.MOM.WW.OtOpcUa.OpcUaServer.csprojtoCore/ZB.MOM.WW.OtOpcUa.Core.Abstractions(it is a leaf abstractions project — OpcUaServer already references Commons; no cycle). - Add an
IHistorianProvisioningctor param toAddressSpaceApplierdefaulting toNullHistorianProvisioning.Instance(preserves every existing call site — including the one inServiceCollectionExtensions.WithOtOpcUaRuntimeActorsthat constructs the applier). - In
Apply(plan), after the address-space work (rebuild/surgical passes) completes, iterateplan.AddedEquipmentTags.Where(t => t.IsHistorized):- resolve the historian name exactly as the materialiser does:
string.IsNullOrWhiteSpace(t.HistorianTagname) ? t.FullName : t.HistorianTagname. - parse
t.DataType(string) →DriverDataTypeviaEnum.TryParse; an unparseable type ⇒ skip + log Debug. - build
HistorianTagProvisionRequest(historianName, dataType, EngineeringUnit: null, Description: t.Name). - Fire-and-forget the provisioning off the apply path so it NEVER blocks the publish: call
provisioner.EnsureTagsAsync(requests, CancellationToken.None)and observe the task on a continuation that logsresult.Failed/Skipped(and swallows any escaped exception). The synchronousApplyreturns its normalAddressSpaceApplyOutcomeregardless. Wrap the whole hook in try/catch so a hook fault cannot break a deploy.
- resolve the historian name exactly as the materialiser does:
High-risk: this runs on the OPC UA publish actor's pinned thread. Keep the hook's synchronous portion to building the request list only; the gateway round-trip must be fully detached (do not
.Wait()/.Result). Tests assert publish completes even when the provisioner throws synchronously.
Step 4: run, expect PASS — provisioning tests pass; the existing AddressSpaceApplier test suite stays green (dotnet test --filter "FullyQualifiedName~AddressSpaceApplier"); dotnet build clean.
Step 5: commit
git commit -am "feat(historian-gateway): EnsureTags provisioning hook in AddressSpaceApplier (non-blocking)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 16: FasterLog outbox store for the recorder
Classification: high-risk Estimated implement time: ~5 min Parallelizable with: Task 12, Task 14
Files:
- Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj(addMicrosoft.FASTER.Core2.6.5) - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/HistorizationOutboxEntry.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/IHistorizationOutbox.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs
Step 1: failing test — append/peek/remove, restart durability, full-drop:
public sealed class FasterLogHistorizationOutboxTests
{
private static HistorizationOutboxEntry E(string tag, double v) =>
new(Guid.NewGuid(), tag, v, 192, new DateTime(2026,1,1,0,0,0,DateTimeKind.Utc));
[Fact]
public async Task Append_then_peek_returns_fifo()
{
var dir = NewTempDir();
using var o = new FasterLogHistorizationOutbox(dir, StoreForwardCommitMode.PerEntry);
await o.AppendAsync(E("A",1), default);
await o.AppendAsync(E("B",2), default);
var batch = await o.PeekBatchAsync(10, default);
Assert.Equal(new[]{"A","B"}, batch.Select(b => b.Tag));
Assert.Equal(2, await o.CountAsync(default));
}
[Fact]
public async Task Remove_truncates_and_survives_restart()
{
var dir = NewTempDir();
Guid keep;
{
using var o = new FasterLogHistorizationOutbox(dir, StoreForwardCommitMode.PerEntry);
var a = E("A",1); var b = E("B",2); keep = b.Id;
await o.AppendAsync(a, default); await o.AppendAsync(b, default);
await o.PeekBatchAsync(10, default);
await o.RemoveAsync(a.Id, default); // ack A
}
using var reopened = new FasterLogHistorizationOutbox(dir, StoreForwardCommitMode.PerEntry);
Assert.Equal(1, await reopened.CountAsync(default)); // only B survives
var batch = await reopened.PeekBatchAsync(10, default);
Assert.Equal(keep, batch[0].Id);
}
[Fact]
public async Task Capacity_full_drops_oldest_and_counts()
{
var dir = NewTempDir();
using var o = new FasterLogHistorizationOutbox(dir, StoreForwardCommitMode.PerEntry, capacity: 2);
await o.AppendAsync(E("A",1), default);
await o.AppendAsync(E("B",2), default);
await o.AppendAsync(E("C",3), default); // overflow → drop oldest (A)
Assert.Equal(2, await o.CountAsync(default));
Assert.Equal(1, o.DroppedCount);
var tags = (await o.PeekBatchAsync(10, default)).Select(b => b.Tag).ToArray();
Assert.DoesNotContain("A", tags);
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~FasterLogHistorizationOutboxTests" → FAILS.
Step 3: implement — mirror the gateway's FasterLogOutboxStore (read it first):
HistorizationOutboxEntry(Guid Id, string Tag, double NumericValue, ushort Quality, DateTime TimestampUtc)+ a compact binary serializer (BinaryWriter or the same approach the gateway'sOutboxEntrySerializeruses).IHistorizationOutbox(Append/PeekBatch/Remove/Count,DroppedCount,IDisposable).FasterLogHistorizationOutbox:ManagedLocalStorageDeviceunder<dir>/hlog.log;FasterLog; PerEntry commits beforeAppendAsyncreturns, Periodic drives aPeriodicTimercommit loop;RemoveAsync⇒TruncateUntil(window[id])+CommitAsync;RecoverState()in the ctor rebuilds count + head from the committed log (restart durability). Add the capacity/drop-oldest behavior the gateway store lacks: acapacityctor arg; when an append would exceed it, advance the head past the oldest entry (truncate one) and incrementDroppedCount(the recorder surfaces it as a metric in T17/T18). UseStoreForwardCommitModefrom the gateway's Configuration namespace or define a localenum HistorizationCommitMode { PerEntry, Periodic }to avoid a cross-package coupling — prefer the local enum (the gateway's type is internal to its Server project).
High-risk: this is the durable boundary. The restart-durability + truncate-correctness tests are load-bearing; do not skip them.
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): FasterLog historization outbox (PerEntry/Periodic, drop-oldest)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 17: ContinuousHistorizationRecorder actor
Classification: high-risk Estimated implement time: ~5 min Parallelizable with: none (consumes T16 + the client)
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs
The recorder lives in Runtime (it references
DependencyMuxActor+DriverInstanceActor.AttributeValuePublished, both in Runtime), and consumesIHistorianGatewayClient+IHistorizationOutboxvia abstractions so Runtime does not take a hard dependency on the Gateway driver project. Define the two seams the recorder needs (IHistorianGatewayClientwrite subset,IHistorizationOutbox) such that Runtime can reference them — either moveIHistorizationOutbox+ aIHistorianValueWriter(justWriteLiveValuesAsync) intoCore.Abstractions/Historian, and have the Gateway driver implement them. Decision: introduceIHistorianValueWriter(single methodTask<bool> WriteLiveValuesAsync(string tag, IReadOnlyList<(DateTime? ts, double value, ushort quality)> values, CancellationToken ct)) andIHistorizationOutboxinCore.Abstractions/Historian;GatewayHistorianValueWriter(Gateway driver) adaptsIHistorianGatewayClient.WriteLiveValuesAsync. This keeps Runtime free of the gRPC package.
Files (revised to honour the layering decision):
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorianValueWriter.cs(+ moveIHistorizationOutbox+HistorizationOutboxEntryhere; the FasterLog impl from T16 stays in the Gateway driver project implementing this interface) - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/GatewayHistorianValueWriter.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs - Create the recorder test file above.
Step 1: failing test — Akka TestKit with fake mux + fake writer + real/fake outbox:
public sealed class ContinuousHistorizationRecorderTests : TestKit
{
[Fact]
public void Registers_interest_for_historized_refs_on_start()
{
var mux = CreateTestProbe();
var writer = new FakeValueWriter();
var outbox = new InMemoryOutbox();
Sys.ActorOf(ContinuousHistorizationRecorder.Props(mux.Ref, writer, outbox,
historizedRefs: new[]{ "Pump1.Temp" }));
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
Assert.Contains("Pump1.Temp", reg.TagRefs);
}
[Fact]
public async Task AttributeValuePublished_appends_to_outbox_then_drains_to_writer()
{
var mux = CreateTestProbe();
var writer = new FakeValueWriter { Succeed = true };
var outbox = new InMemoryOutbox();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(mux.Ref, writer, outbox, new[]{ "Pump1.Temp" }));
rec.Tell(new DriverInstanceActor.AttributeValuePublished("drv","Pump1.Temp", 42.0, OpcUaQuality.Good, DateTime.UtcNow));
await AwaitAssertAsync(() => Assert.Contains(writer.Written, w => w.Tag=="Pump1.Temp" && w.Value==42.0));
await AwaitAssertAsync(async () => Assert.Equal(0, await outbox.CountAsync(default))); // acked → truncated
}
[Fact]
public async Task Writer_failure_keeps_entry_for_retry()
{
var mux = CreateTestProbe();
var writer = new FakeValueWriter { Succeed = false };
var outbox = new InMemoryOutbox();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(mux.Ref, writer, outbox, new[]{ "Pump1.Temp" }));
rec.Tell(new DriverInstanceActor.AttributeValuePublished("drv","Pump1.Temp", 7.0, OpcUaQuality.Good, DateTime.UtcNow));
await AwaitAssertAsync(async () => Assert.Equal(1, await outbox.CountAsync(default))); // not acked → retained
}
[Fact]
public void Non_numeric_value_is_dropped_with_metric()
{
var mux = CreateTestProbe(); var writer = new FakeValueWriter(); var outbox = new InMemoryOutbox();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(mux.Ref, writer, outbox, new[]{ "Pump1.Name" }));
rec.Tell(new DriverInstanceActor.AttributeValuePublished("drv","Pump1.Name", "text", OpcUaQuality.Good, DateTime.UtcNow));
// string value: SQL analog write path can't carry it → dropped, not appended
AwaitAssert(() => Assert.Empty(writer.Written));
}
}
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~ContinuousHistorizationRecorderTests" → FAILS.
Step 3: implement — ContinuousHistorizationRecorder : ReceiveActor:
Props(IActorRef dependencyMux, IHistorianValueWriter writer, IHistorizationOutbox outbox, IReadOnlyList<string> historizedRefs, ...options).PreStart:dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(historizedRefs, Self)). (The mux fansAttributeValuePublishedfor those refs back asDependencyValueChanged; or wire the recorder to receiveAttributeValuePublisheddirectly fromDriverHostActor's forward — match the actual fan-out. Per the mux contract, register interest and handleVirtualTagActor.DependencyValueChanged; if the recorder must see ALL historized refs regardless of vtag interest, instead haveDriverHostActorforwardAttributeValuePublishedto the recorder. Choose theRegisterInterest→DependencyValueChangedpath to reuse the existing mux without touchingDriverHostActor; the test asserts the register message.)- On a value-change message for a historized ref: coerce the value to numeric (
double); a non-numeric/non-coercible value (string/null) is dropped + metered (the SQL analogWriteLiveValuespath is numeric-only — design §6/§7, §11 risk 2). Append aHistorizationOutboxEntryto the outbox (the durable boundary). On outbox-full, the store drops oldest + incrementsDroppedCount. - A background drain (self-scheduled tick via
Context.System.Scheduler/Timers):PeekBatchAsync(batchSize)→ group by tag →writer.WriteLiveValuesAsync(tag, values); on successRemoveAsynceach acked id; on failure leave queued and back off (exponential, capped) before the next tick. Never let a drain exception kill the actor (supervised try/catch; log Warning, back off). - Surface counters (queued depth, dropped, last drain success) for T18's meter/health.
High-risk: actor + durable recorder + data contract. The append-before-ack ordering, the numeric-only gate, and the failure-retains-entry behavior are the load-bearing invariants. Use
Akka.TestKit.Xunit2.
Step 4: run, expect PASS.
Step 5: commit
git commit -am "feat(historian-gateway): ContinuousHistorizationRecorder actor (outbox->WriteLiveValues, backoff)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 18: Wire the recorder into DI + hosted lifecycle
Classification: standard Estimated implement time: ~5 min Parallelizable with: none
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationOptions.cs - Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs(spawn the recorder inWithOtOpcUaRuntimeActors, gated on options) - Modify
/Users/dohertj2/Desktop/OtOpcUa/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs(bind options + registerIHistorianValueWriter+IHistorizationOutboxfrom the gateway, gated) - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationOptionsTests.cs
Step 1: failing test — options validation + actor-spawn gating:
public sealed class ContinuousHistorizationOptionsTests
{
[Fact] public void Disabled_no_warnings() => Assert.Empty(new ContinuousHistorizationOptions{Enabled=false}.Validate());
[Fact] public void Enabled_requires_outbox_path()
=> Assert.Contains(new ContinuousHistorizationOptions{Enabled=true, OutboxPath=""}.Validate(), m => m.Contains("OutboxPath"));
[Fact] public void Periodic_requires_positive_interval()
=> Assert.Contains(new ContinuousHistorizationOptions{Enabled=true, OutboxPath="x", CommitMode="Periodic", CommitIntervalMs=0}.Validate(), m => m.Contains("CommitIntervalMs"));
}
Plus a Runtime spawn test (mirror existing WithOtOpcUaRuntimeActors tests): when ContinuousHistorization:Enabled=false, the recorder actor is NOT spawned; when enabled with a fake writer/outbox registered, it IS (registry.Get<ContinuousHistorizationRecorderKey>() resolves).
Step 2: run, expect FAIL — dotnet test --filter "FullyQualifiedName~ContinuousHistorizationOptions" → FAILS.
Step 3: implement
ContinuousHistorizationOptions(SectionName="ContinuousHistorization"):Enabled,OutboxPath(directory; required when enabled — production absolute path),CommitMode(PerEntry/Periodic),CommitIntervalMs,DrainBatchSize(default 64),DrainIntervalSeconds,Capacity,RetryBackoff.Validate()warns on the gated cases above.- In
WithOtOpcUaRuntimeActors: resolveIHistorianValueWriter+IHistorizationOutbox(+ the historized-ref set — source it from the deployed composition; for v1, the recorder can register interest dynamically as tags deploy, but the minimal wiring registers the actor and lets a laterSetHistorizedRefsmessage feed it. Keep T18 scope to: bind options, build the outbox fromOutboxPath, spawn the recorder when enabled, register its key.). Gate the spawn onContinuousHistorizationOptions.Enabled. - In
Program.cs(hasDriverblock): bind+validate the options; when enabled, registerIHistorizationOutbox⇒FasterLogHistorizationOutbox(opts.OutboxPath, …)andIHistorianValueWriter⇒GatewayHistorianValueWriterover the sharedIHistorianGatewayClient(the singleton from T13). Add a meter/observable-gauge for outbox depth + dropped count (mirror the existing observability registration), and feed the recorder's health into/healthzif a historian health hook exists.
Step 4: run, expect PASS — option + spawn tests pass; dotnet build ZB.MOM.WW.OtOpcUa.slnx clean; full offline suite green: dotnet test ZB.MOM.WW.OtOpcUa.slnx.
Step 5: commit
git commit -am "feat(historian-gateway): wire ContinuousHistorizationRecorder into DI + hosted lifecycle + meters
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 19: Retire the Wonderware historian projects
Classification: high-risk Estimated implement time: ~5 min Parallelizable with: none (must be last code task)
Files (remove from solution + delete):
/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Users/dohertj2/Desktop/OtOpcUa/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests- Modify
/Users/dohertj2/Desktop/OtOpcUa/ZB.MOM.WW.OtOpcUa.slnx - Grep-and-remove any remaining
using …Wonderware…/ProjectReference …Wonderware…(notablyHost.csproj).
Leave
/Users/dohertj2/Desktop/OtOpcUa/code-reviews/Driver.Historian.Wonderware*untouched — those are review artifacts, not projects.
Step 1: failing test — a guard test asserting the Wonderware types are gone (compiles only after removal):
public sealed class WonderwareRetirementTests
{
[Fact]
public void No_Wonderware_historian_assembly_is_loaded()
{
var loaded = AppDomain.CurrentDomain.GetAssemblies().Select(a => a.GetName().Name);
Assert.DoesNotContain(loaded, n => n is not null && n.Contains("Wonderware", StringComparison.OrdinalIgnoreCase));
}
}
(Place in the Gateway driver test project. Before removal this passes trivially only if nothing loaded it — instead make the gate the build: after deleting, dotnet build must still be clean with no dangling references. The test documents intent.)
Step 2: run, expect FAIL — first run dotnet build ZB.MOM.WW.OtOpcUa.slnx BEFORE deletion to confirm it is currently green with both backends, then delete; the meaningful failure is a dangling reference build break if any using/ProjectReference survives.
Step 3: implement
dotnet sln ZB.MOM.WW.OtOpcUa.slnx remove <each of the 5 project paths>.git rm -r <each of the 5 directories>.grep -rn "Wonderware" src/ tests/ --include=*.cs --include=*.csprojand remove every remaining reference (Host.csprojProjectReference, any strayusing).AlarmHistorianOptionsWonderware-shaped fields (Host/Port/SharedSecret) — if now unused by the gateway alarm-write wiring (T13 sources connection fromServerHistorian), prune them; otherwise leave and note in T21.
Step 4: run, expect PASS — dotnet build ZB.MOM.WW.OtOpcUa.slnx clean (0 warnings, no dangling refs); full suite green: dotnet test ZB.MOM.WW.OtOpcUa.slnx.
Step 5: commit
git rm -r src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests
git commit -am "refactor(historian-gateway): retire Wonderware historian projects (gateway is sole backend)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 20: Env-gated live validation vs wonder-sql-vd03
Classification: small Estimated implement time: ~5 min Parallelizable with: none
Files:
- Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Live/GatewayLiveIntegrationTests.cs - Create
/Users/dohertj2/Desktop/OtOpcUa/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Live/GatewayLiveFixture.cs
Step 1: failing test — [Trait("Category","LiveIntegration")] tests that skip when env vars are absent:
public sealed class GatewayLiveIntegrationTests
{
// Reads HISTGW_GATEWAY_ENDPOINT + HISTGW_GATEWAY_APIKEY + HISTGW_TEST_TAG (Galaxy tag),
// HISTGW_WRITE_SANDBOX_TAG (HistGW.LiveTest.*), HISTGW_ALARM_SOURCE. Skips if endpoint/key absent.
[SkippableFact, Trait("Category","LiveIntegration")]
public async Task Galaxy_tag_read_round_trip() { Skip.If(Fixture.NotConfigured); /* ReadRaw last 1h, assert >=0 samples, no throw */ }
[SkippableFact, Trait("Category","LiveIntegration")]
public async Task Write_then_read_on_sandbox_tag() { /* EnsureTags(Float) → WriteLiveValues → ReadRaw → sample present */ }
[SkippableFact, Trait("Category","LiveIntegration")]
public async Task Alarm_SendEvent_then_ReadEvents() { /* SendEvent → ReadEvents(source) → event present (needs RuntimeDb:EventReadsEnabled) */ }
}
Step 2: run, expect FAIL/SKIP — dotnet test --filter "Category=LiveIntegration" with no env → all SKIP (green). With env set + VPN up → FAIL until the live path works.
Step 3: implement — the fixture builds a real HistorianGatewayClientAdapter from env, the three round-trips exercise the read/write/alarm paths against the live gateway → wonder-sql-vd03. Mirror the HistorianGateway repo's GatewayIntegrationFixture env-gating convention (skip+log when unset). VPN-gated: wonder-sql-vd03 is reachable only on VPN — if the endpoint is configured but unreachable, prompt the user to connect VPN (do not hang).
Step 4: run, expect PASS — offline: all SKIP. On VPN with env + the gateway running RuntimeDb:Enabled=true + RuntimeDb:EventReadsEnabled=true: the three round-trips PASS (this is the real-world validation gate before T19's retirement is trusted).
Step 5: commit
git commit -am "test(historian-gateway): env-gated live validation vs wonder-sql-vd03 (read/write/alarm round-trips)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Task 21: Documentation
Classification: small Estimated implement time: ~5 min Parallelizable with: none
Files:
- Modify
/Users/dohertj2/Desktop/OtOpcUa/CLAUDE.md - Modify
/Users/dohertj2/Desktop/OtOpcUa/README.md(if present) - Modify the relevant
appsettings*.json(theServerHistorian+ newContinuousHistorizationblocks) undersrc/Server/ZB.MOM.WW.OtOpcUa.Host/
Step 1: failing test — none (docs). Verification is review + a grep guard: the docs must not reference the retired Wonderware backend as current, and must document the new config keys.
Step 2: run, expect FAIL — grep -n "Wonderware" CLAUDE.md shows stale references (the backend section still describes the sidecar).
Step 3: implement
CLAUDE.md: historian backend is now HistorianGateway (gRPC client package); document theServerHistoriankeys (Endpoint/ApiKey/UseTls/AllowUntrustedServerCertificate/CaCertificatePath/CallTimeout), theContinuousHistorizationsection, theIHistorianProvisioningEnsureTags hook, the alarm SendEvent path +ReadEventsdependency on gatewayRuntimeDb:EventReadsEnabled=true, and that the Wonderware projects were retired. Note the gateway-side prerequisites (RuntimeDb flags + API-key scopes).appsettings: a commentedServerHistorianexample (blankApiKey— env-supplied) + aContinuousHistorizationexample (disabled by default, absoluteOutboxPathnote for production).- Migration note: deployments must rename their old
ServerHistoriankeys (Host/Port/SharedSecret/ServerCertThumbprint→Endpoint/ApiKey/UseTls/CaCertificatePath) and supplyServerHistorian__ApiKeyvia env.
Step 4: run, expect PASS — grep -n "Wonderware" CLAUDE.md returns only retirement-history mentions; the new keys are documented.
Step 5: commit
git commit -am "docs(historian-gateway): document gateway backend, config keys, EnsureTags hook, Wonderware retirement
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii"
Execution order & parallelism
- Phase A (T1) runs first — every later task depends on the project + consumed package.
- Phase B mappers (T2–T6) are all pure/no-I/O and fully parallelizable with each other once T1 lands.
- Phase C (T7→T8→T9→T10→T11) follows B. T7 consumes T2+T4; T9 (options reshape) can proceed in parallel with B/T7 (different area). T10 is the read cutover gate — it depends on T7 (data source) + T9 (options) and makes use case 1 shippable on its own. T8 and T11 extend T7's class (sequential with it).
- Phase D (T12–T18) follows the read cutover (T10). T12 (alarm writer), T14 (provisioner), T16 (outbox) are mutually parallelizable; T13 depends on T12; T15 depends on T14; T17 depends on T16 (+ client write seam); T18 depends on T17.
- Phase E: T20 (live validation) runs after the gateway path is fully wired (read cutover + alarm + recorder). T19 (Wonderware retirement) lands LAST — only after D is green AND T20's live round-trips pass (do not delete the proven-out backend until the replacement is live-validated). T21 (docs) follows the retirement so it documents the final state.
- The whole live-validation phase is gated on VPN reachability of
wonder-sql-vd03and on the gateway being deployed withRuntimeDb:Enabled=true+RuntimeDb:EventReadsEnabled=true.
Live/VPN + verify-live risks (design §11)
Settle these during T10/T11/T17/T20 against wonder-sql-vd03 (prompt the user to connect VPN if unreachable):
- Galaxy-tag → historian-tag identity. Does OtOpcUa's resolved
historianTagname(FullNameor theHistorianTagnameoverride, typicallytag_name.Attribute) match the AVEVA historian's stored tag name? Confirm early in T20's read round-trip — a mismatch surfaces as empty reads, not errors. - UInt16 / String / DateTime write gaps. Continuous historization is numeric-analog only in v1.
UInt16→UInt4is a documented fallback;String/DateTime/Referenceare deferred and throwNotSupported(provisioning skips them, the recorder drops non-numeric values + meters) — never silent. T3/T14/T17 encode this; T21 documents it. - Alarm-history reads depend on the gateway's SQL event path.
ReadEventsAsyncrequires the gateway deployed withRuntimeDb:EventReadsEnabled=true; the source-name server filter is delivered by the gateway-client plan (§5.3) — until it lands, T11 filters client-side and must not assume server filtering. WriteLiveValuesrequires gatewayRuntimeDb:Enabled=trueAND anEnsureTags-provisioned tag. The provisioning hook (T15) must have run for a tag before the recorder's writes (T17) land — provisioning is fire-and-forget, so the first few values for a brand-new historized tag may be rejected untilEnsureTagscompletes; the outbox retains + retries them (no loss). Validate the ordering in T20's write→read round-trip.received_timeUTC semantics. On the SQL event/value paths, inherit whatever the gateway'sfeat/sql-readeventswork establishes for local-vs-UTC andEventTimeUTCOffsetMins. Stamp all OtOpcUa-side timestamps UTC (DateTimeKind.Utc) and assert kind in the mappers (T4/T5/T6).