feat(phase7): wire RingBufferHistoryWriter as production IHistoryWriter for virtual tags (Gap 5)

Closes Phase 7 Gap 5: VirtualTagEngine called IHistoryWriter.Record per evaluation
when Historize=true but Phase7EngineComposer always passed NullHistoryWriter, so
virtual-tag history was computed but never persisted.

The fix:
- New RingBufferHistoryWriter implements both IHistoryWriter (write port for the
  evaluation pipeline) and IHistorianDataSource (read port for IHistoryRouter so
  OPC UA HistoryRead on virtual-tag nodes resolves here). Maintains one bounded
  ring buffer (1000 samples, configurable) per tag path; Record() is O(1) and
  never blocks evaluation.
- Phase7EngineComposer.Compose now accepts IHistoryRouter? and, when any
  VirtualTagDefinition.Historize=true, creates a RingBufferHistoryWriter, passes
  it to VirtualTagEngine as historyWriter, adds it to the disposables list, and
  registers it under the "virtual:" prefix in the router for HistoryRead dispatch.
- Phase7Composer accepts IHistoryRouter? from DI (already registered as singleton
  in Program.cs) and threads it through to Phase7EngineComposer.Compose.
- NullHistoryWriter remains as fallback when no tags request historization.
- 16 new unit tests in RingBufferHistoryWriterTests.cs cover ring-buffer semantics,
  eviction, per-tag isolation, ReadRawAsync windowing, IHistorianDataSource stubs,
  router registration, and the Historize=false / null-router fallback paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 05:49:11 -04:00
parent ca149ce907
commit bc8ff7a5fe
4 changed files with 592 additions and 4 deletions

View File

@@ -7,6 +7,7 @@ using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Server.History;
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
@@ -42,6 +43,7 @@ public sealed class Phase7Composer : IAsyncDisposable
private readonly DriverEquipmentContentRegistry _equipmentRegistry;
private readonly IAlarmHistorianSink _historianSink;
private readonly IAlarmHistorianWriter? _injectedWriter;
private readonly IHistoryRouter? _historyRouter;
private readonly ILoggerFactory _loggerFactory;
private readonly Serilog.ILogger _scriptLogger;
private readonly ILogger<Phase7Composer> _logger;
@@ -61,13 +63,15 @@ public sealed class Phase7Composer : IAsyncDisposable
ILoggerFactory loggerFactory,
Serilog.ILogger scriptLogger,
ILogger<Phase7Composer> logger,
IAlarmHistorianWriter? injectedWriter = null)
IAlarmHistorianWriter? injectedWriter = null,
IHistoryRouter? historyRouter = null)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_driverHost = driverHost ?? throw new ArgumentNullException(nameof(driverHost));
_equipmentRegistry = equipmentRegistry ?? throw new ArgumentNullException(nameof(equipmentRegistry));
_historianSink = historianSink ?? throw new ArgumentNullException(nameof(historianSink));
_injectedWriter = injectedWriter;
_historyRouter = historyRouter;
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_scriptLogger = scriptLogger ?? throw new ArgumentNullException(nameof(scriptLogger));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
@@ -117,7 +121,8 @@ public sealed class Phase7Composer : IAsyncDisposable
alarmStateStore: new InMemoryAlarmStateStore(),
historianSink: historianSink,
rootScriptLogger: _scriptLogger,
loggerFactory: _loggerFactory);
loggerFactory: _loggerFactory,
historyRouter: _historyRouter);
_logger.LogInformation(
"Phase 7: composed engines from generation {Gen} — {Vt} virtual tag(s), {Al} scripted alarm(s), {Sc} script(s)",

View File

@@ -5,6 +5,7 @@ using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
using ZB.MOM.WW.OtOpcUa.Server.History;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
@@ -32,6 +33,14 @@ namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
/// </remarks>
public static class Phase7EngineComposer
{
/// <summary>
/// Prefix used when registering the virtual-tag ring-buffer history source in
/// <see cref="IHistoryRouter"/>. All virtual-tag UNS paths are prefixed with this
/// string so the router resolves reads to the <see cref="RingBufferHistoryWriter"/>
/// rather than a driver-owned historian.
/// </summary>
public const string VirtualTagHistoryPrefix = "virtual:";
public static Phase7ComposedSources Compose(
IReadOnlyList<Script> scripts,
IReadOnlyList<VirtualTag> virtualTags,
@@ -40,7 +49,8 @@ public static class Phase7EngineComposer
IAlarmStateStore alarmStateStore,
IAlarmHistorianSink historianSink,
Serilog.ILogger rootScriptLogger,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
IHistoryRouter? historyRouter = null)
{
ArgumentNullException.ThrowIfNull(scripts);
ArgumentNullException.ThrowIfNull(virtualTags);
@@ -64,10 +74,40 @@ public static class Phase7EngineComposer
// Engines take Serilog.ILogger — each engine gets its own so rolling-file emissions
// stay keyed to the right source in the scripts-*.log.
VirtualTagSource? vtSource = null;
RingBufferHistoryWriter? ringWriter = null;
if (virtualTags.Count > 0)
{
var vtDefs = ProjectVirtualTags(virtualTags, scriptById).ToList();
var vtEngine = new VirtualTagEngine(upstream, scriptLoggerFactory, rootScriptLogger);
// Gap 5 closure (task #28): wire a real IHistoryWriter when any tag has
// Historize=true. RingBufferHistoryWriter is a bounded in-process ring buffer
// that also implements IHistorianDataSource so OPC UA HistoryRead on virtual
// nodes resolves here. Register with the IHistoryRouter under a dedicated
// prefix so the DriverNodeManager's history dispatch finds it.
IHistoryWriter historyWriter = NullHistoryWriter.Instance;
var hasHistorize = vtDefs.Any(d => d.Historize);
if (hasHistorize)
{
ringWriter = new RingBufferHistoryWriter();
historyWriter = ringWriter;
disposables.Add(ringWriter);
if (historyRouter is not null)
{
try
{
historyRouter.Register(VirtualTagHistoryPrefix, ringWriter);
}
catch (InvalidOperationException)
{
// Already registered (e.g. engine reload). Leave the existing entry —
// both registrations refer to the same in-process buffer type.
}
}
}
var vtEngine = new VirtualTagEngine(upstream, scriptLoggerFactory, rootScriptLogger,
historyWriter: historyWriter);
vtEngine.Load(vtDefs);
vtSource = new VirtualTagSource(vtEngine);
disposables.Add(vtEngine);

View File

@@ -0,0 +1,235 @@
using System.Collections.Concurrent;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
/// <summary>
/// Production <see cref="IHistoryWriter"/> for virtual-tag evaluations (Gap 5 closure,
/// task #28). Every evaluation result from a <c>Historize=true</c> virtual tag is
/// appended to an in-process per-tag ring buffer. The same instance also implements
/// <see cref="IHistorianDataSource"/> so it can be registered on the server-level
/// <see cref="ZB.MOM.WW.OtOpcUa.Server.History.IHistoryRouter"/> — OPC UA HistoryRead
/// requests for virtual-tag nodes then resolve here.
/// </summary>
/// <remarks>
/// <para>
/// Design rationale:
/// <list type="bullet">
/// <item><description>
/// No new external process or DB is needed: the ring buffer lives in-process
/// and survives across evaluation cycles for the lifetime of the server.
/// </description></item>
/// <item><description>
/// <see cref="Record"/> is synchronous and O(1) — it never blocks the
/// evaluation pipeline. Per <see cref="IHistoryWriter"/> contract the caller
/// treats it as fire-and-forget.
/// </description></item>
/// <item><description>
/// The buffer is bounded by <see cref="MaxSamplesPerTag"/> (default 1 000).
/// Older samples are evicted silently when the tag writes past the limit, which
/// matches the "hot ring" semantics typical of in-process historian buffers:
/// the most-recent N seconds of data is always accessible for HistoryRead even
/// in the absence of a persistent historian.
/// </description></item>
/// <item><description>
/// Thread safety: <see cref="Record"/> uses an interlocked write pointer so
/// concurrent evaluation callbacks (rare but possible during cascade + timer
/// races) don't corrupt the buffer.
/// </description></item>
/// </list>
/// </para>
/// <para>
/// <see cref="ReadRawAsync"/> supports the OPC UA HistoryRead service's raw-values
/// mode. Processed (aggregate), at-time, and event read modes return empty results
/// with no error — virtual tags are scalar real-time values, not time-series stored
/// at external resolution.
/// </para>
/// <para>
/// Lifecycle: <see cref="Dispose"/> clears every buffer. The caller
/// (<see cref="Phase7Composer.DisposeAsync"/>) disposes this instance after the
/// engine that writes to it has been disposed, so no further <see cref="Record"/>
/// calls can arrive after dispose.
/// </para>
/// </remarks>
public sealed class RingBufferHistoryWriter : IHistoryWriter, IHistorianDataSource
{
/// <summary>Maximum samples retained per tag path. Older samples are evicted when full.</summary>
public const int MaxSamplesPerTag = 1_000;
private readonly ConcurrentDictionary<string, TagRingBuffer> _buffers =
new(StringComparer.Ordinal);
private readonly int _capacity;
private bool _disposed;
/// <param name="capacity">
/// Per-tag sample capacity. Tests inject a smaller value to keep fixtures
/// small; production uses the default <see cref="MaxSamplesPerTag"/>.
/// </param>
public RingBufferHistoryWriter(int capacity = MaxSamplesPerTag)
{
if (capacity < 1) throw new ArgumentOutOfRangeException(nameof(capacity), "Capacity must be ≥ 1.");
_capacity = capacity;
}
// ===== IHistoryWriter =====
/// <summary>
/// Records one evaluation result. Called by <see cref="VirtualTagEngine"/> on every
/// evaluation when <c>Historize=true</c>. O(1), never blocks.
/// </summary>
public void Record(string path, DataValueSnapshot value)
{
if (_disposed) return; // graceful shutdown — silently drop
var buffer = _buffers.GetOrAdd(path, _ => new TagRingBuffer(_capacity));
buffer.Write(value);
}
// ===== IHistorianDataSource =====
/// <summary>
/// Returns samples in the ring buffer whose source timestamp falls within
/// [<paramref name="startUtc"/>, <paramref name="endUtc"/>), newest-first.
/// <paramref name="maxValuesPerNode"/> caps the result count.
/// </summary>
public Task<HistoryReadResult> ReadRawAsync(
string fullReference,
DateTime startUtc,
DateTime endUtc,
uint maxValuesPerNode,
CancellationToken cancellationToken)
{
if (!_buffers.TryGetValue(fullReference, out var buffer))
return Task.FromResult(new HistoryReadResult([], null));
var all = buffer.Snapshot();
var limit = (int)Math.Min(maxValuesPerNode, (uint)all.Length);
var result = new List<DataValueSnapshot>(limit);
foreach (var snap in all)
{
if (result.Count >= limit) break;
var ts = snap.SourceTimestampUtc ?? snap.ServerTimestampUtc;
if (ts >= startUtc && ts < endUtc)
result.Add(snap);
}
return Task.FromResult(new HistoryReadResult(result, null));
}
/// <inheritdoc />
/// <remarks>
/// Virtual tags do not carry aggregate history — returns an empty result rather than
/// failing so OPC UA clients that request processed history receive a graceful empty
/// response instead of a Bad status for the whole node.
/// </remarks>
public Task<HistoryReadResult> ReadProcessedAsync(
string fullReference,
DateTime startUtc,
DateTime endUtc,
TimeSpan interval,
HistoryAggregateType aggregate,
CancellationToken cancellationToken)
=> Task.FromResult(new HistoryReadResult([], null));
/// <inheritdoc />
public Task<HistoryReadResult> ReadAtTimeAsync(
string fullReference,
IReadOnlyList<DateTime> timestampsUtc,
CancellationToken cancellationToken)
=> Task.FromResult(new HistoryReadResult([], null));
/// <inheritdoc />
public Task<HistoricalEventsResult> ReadEventsAsync(
string? sourceName,
DateTime startUtc,
DateTime endUtc,
int maxEvents,
CancellationToken cancellationToken)
=> Task.FromResult(new HistoricalEventsResult([], null));
/// <inheritdoc />
public HistorianHealthSnapshot GetHealthSnapshot() => new(
TotalQueries: 0,
TotalSuccesses: 0,
TotalFailures: 0,
ConsecutiveFailures: 0,
LastSuccessTime: null,
LastFailureTime: null,
LastError: null,
ProcessConnectionOpen: true,
EventConnectionOpen: false,
ActiveProcessNode: null,
ActiveEventNode: null,
Nodes: []);
/// <summary>
/// Number of distinct tag paths that have received at least one recorded sample.
/// Exposed for diagnostics and tests.
/// </summary>
public int TagCount => _buffers.Count;
/// <summary>
/// Returns a snapshot of all samples currently in the buffer for <paramref name="path"/>,
/// or an empty array when the path has no recorded samples. Exposed for diagnostics.
/// </summary>
public DataValueSnapshot[] GetSnapshots(string path)
=> _buffers.TryGetValue(path, out var buf) ? buf.Snapshot() : [];
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_buffers.Clear();
}
// ===== Inner ring buffer =====
/// <summary>
/// Bounded FIFO ring buffer with O(1) write. Reads return a snapshot (array copy)
/// of all current samples in insertion order, oldest first.
/// </summary>
private sealed class TagRingBuffer
{
private readonly DataValueSnapshot?[] _slots;
private int _head; // next write position (wraps)
private int _count; // how many valid entries (≤ capacity)
public TagRingBuffer(int capacity)
{
_slots = new DataValueSnapshot[capacity];
}
public void Write(DataValueSnapshot value)
{
lock (_slots)
{
_slots[_head] = value;
_head = (_head + 1) % _slots.Length;
if (_count < _slots.Length) _count++;
}
}
/// <summary>
/// Returns all current samples in insertion order (oldest → newest), as a
/// snapshot array. Thread-safe: takes the lock for the copy.
/// </summary>
public DataValueSnapshot[] Snapshot()
{
lock (_slots)
{
if (_count == 0) return [];
var result = new DataValueSnapshot[_count];
// The oldest entry is at (_head - _count + capacity) % capacity.
var start = (_head - _count + _slots.Length) % _slots.Length;
for (var i = 0; i < _count; i++)
{
result[i] = _slots[(start + i) % _slots.Length]!;
}
return result;
}
}
}
}