feat(historian-gateway): FasterLog historization outbox (PerEntry/Periodic, drop-oldest)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
Joseph Doherty
2026-06-26 17:20:06 -04:00
parent 1a6eb7efe6
commit 555bd477f1
8 changed files with 536 additions and 0 deletions
+1
View File
@@ -56,6 +56,7 @@
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="10.0.7" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.InMemory" Version="10.0.7" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.SqlServer" Version="10.0.7" />
<PackageVersion Include="Microsoft.FASTER.Core" Version="2.6.5" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.7" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="10.0.7" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="10.0.7" />
@@ -0,0 +1,14 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
/// <summary>
/// Per-append durability cadence for the historization outbox. Local to the OtOpcUa abstraction
/// layer (deliberately decoupled from the gateway's internal store-forward commit-mode type).
/// </summary>
public enum HistorizationCommitMode
{
/// <summary>fsync the log before each <c>AppendAsync</c> returns — safest, no loss window.</summary>
PerEntry,
/// <summary>Batch commits onto a background timer — higher throughput, a bounded worst-case loss window.</summary>
Periodic,
}
@@ -0,0 +1,18 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
/// <summary>
/// One durable record buffered by the continuous-historization outbox before it is written to
/// the historian. Carries the minimal payload the SQL analog live-value write path can ingest:
/// a numeric value, a quality code, and a UTC timestamp keyed by tag.
/// </summary>
/// <param name="Id">Stable identifier used to ack (remove) the entry once written. Unique per append.</param>
/// <param name="Tag">Fully-qualified historian tag name the value is recorded against.</param>
/// <param name="NumericValue">The coerced numeric sample value (the SQL write path is numeric-only).</param>
/// <param name="Quality">OPC-UA-derived quality code (e.g. 192 = Good) carried through to the historian.</param>
/// <param name="TimestampUtc">UTC source timestamp of the sample.</param>
public sealed record HistorizationOutboxEntry(
Guid Id,
string Tag,
double NumericValue,
ushort Quality,
DateTime TimestampUtc);
@@ -0,0 +1,40 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
/// <summary>
/// Durable, crash-safe FIFO buffer the continuous-historization recorder appends sampled values
/// to <em>before</em> acking the writer, so nothing is lost if the process dies mid-drain. An
/// implementation guarantees: appended entries survive an unclean restart up to its commit
/// cadence; <see cref="PeekBatchAsync"/> returns entries in append (FIFO) order; and
/// <see cref="RemoveAsync"/> durably reclaims an acked entry. A capacity-bounded implementation
/// drops the oldest entry on overflow and reflects it in <see cref="DroppedCount"/>.
/// </summary>
public interface IHistorizationOutbox : IDisposable
{
/// <summary>Lifetime count of entries dropped because an append would have exceeded capacity.</summary>
long DroppedCount { get; }
/// <summary>Appends <paramref name="entry"/> to the tail of the durable buffer.</summary>
/// <param name="entry">The value record to buffer.</param>
/// <param name="ct">Cancellation token.</param>
ValueTask AppendAsync(HistorizationOutboxEntry entry, CancellationToken ct);
/// <summary>
/// Returns up to <paramref name="max"/> oldest un-acked entries in FIFO order without removing
/// them. Removal happens via <see cref="RemoveAsync"/> once each entry is durably written.
/// </summary>
/// <param name="max">Maximum number of entries to return; must be positive.</param>
/// <param name="ct">Cancellation token.</param>
ValueTask<IReadOnlyList<HistorizationOutboxEntry>> PeekBatchAsync(int max, CancellationToken ct);
/// <summary>
/// Durably removes the entry identified by <paramref name="id"/> (and any older entries ahead
/// of it in FIFO order), advancing the buffer head. A no-op when the id is unknown.
/// </summary>
/// <param name="id">The <see cref="HistorizationOutboxEntry.Id"/> to ack.</param>
/// <param name="ct">Cancellation token.</param>
ValueTask RemoveAsync(Guid id, CancellationToken ct);
/// <summary>Current number of un-acked entries held in the buffer.</summary>
/// <param name="ct">Cancellation token.</param>
ValueTask<int> CountAsync(CancellationToken ct);
}
@@ -0,0 +1,278 @@
using FASTER.core;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder;
/// <summary>
/// Crash-safe, FIFO <see cref="IHistorizationOutbox"/> backed by a single FasterLog (append-only
/// persistent log) under <c>&lt;directory&gt;/hlog.log</c>. Maps the outbox onto FasterLog:
/// append → <see cref="FasterLog.EnqueueAsync(byte[], CancellationToken)"/>; peek → forward scan
/// from the logical head; remove → <see cref="FasterLog.TruncateUntil"/> (head advance + reclaim)
/// + commit. In-memory FIFO state (entry id → log start address) is rebuilt from the committed log
/// by a one-pass startup scan, so acked truncations survive an unclean restart.
/// </summary>
/// <remarks>
/// Mirrors the gateway's <c>FasterLogOutboxStore</c> and adds a bounded-capacity drop-oldest
/// policy: when an append would exceed <c>capacity</c>, the head is advanced past the oldest
/// entry (truncate + commit) and <see cref="DroppedCount"/> is incremented. Assumes serialized
/// appends (the recorder actor processes messages sequentially); the lock protects the in-memory
/// index, and FasterLog itself tolerates concurrent enqueue/scan.
/// </remarks>
public sealed class FasterLogHistorizationOutbox : IHistorizationOutbox
{
private readonly record struct LiveEntry(Guid Id, long Start);
private readonly ManagedLocalStorageDevice _device;
private readonly FasterLog _log;
private readonly Lock _state = new();
private readonly HistorizationCommitMode _commitMode;
private readonly int _capacity;
// Periodic-mode auto-commit machinery (null under PerEntry). The CTS stops the loop, the timer
// paces it, and the loop Task is retained so Dispose can await it (never leaving an unobserved Task).
private readonly CancellationTokenSource? _periodicCommitCts;
private readonly PeriodicTimer? _periodicCommitTimer;
private readonly Task? _periodicCommitLoop;
private bool _disposed;
// FIFO of live (appended-but-not-acked) entries with their FasterLog start addresses, plus an id
// index for O(1) remove. All three (+ _nextScanAddress, _droppedCount) are read/written under _state.
private readonly LinkedList<LiveEntry> _live = new();
private readonly Dictionary<Guid, LinkedListNode<LiveEntry>> _index = new();
private long _nextScanAddress; // authoritative logical head; peeks scan from here
private long _droppedCount;
/// <summary>
/// Opens (or recovers) the FasterLog-backed outbox under <paramref name="directory"/>.
/// </summary>
/// <param name="directory">Directory holding the FasterLog segment + commit files.</param>
/// <param name="commitMode">
/// <see cref="HistorizationCommitMode.PerEntry"/> fsyncs before each append returns;
/// <see cref="HistorizationCommitMode.Periodic"/> commits on a background timer every
/// <paramref name="commitIntervalMs"/> ms.
/// </param>
/// <param name="commitIntervalMs">Periodic-mode commit cadence in ms; must be positive when Periodic.</param>
/// <param name="capacity">
/// Maximum un-acked entries before drop-oldest kicks in; <c>0</c> (default) means unbounded.
/// </param>
public FasterLogHistorizationOutbox(
string directory,
HistorizationCommitMode commitMode = HistorizationCommitMode.PerEntry,
int commitIntervalMs = 100,
int capacity = 0)
{
ArgumentException.ThrowIfNullOrWhiteSpace(directory);
ArgumentOutOfRangeException.ThrowIfNegative(capacity);
if (commitMode == HistorizationCommitMode.Periodic)
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(commitIntervalMs);
Directory.CreateDirectory(directory);
_commitMode = commitMode;
_capacity = capacity;
_device = new ManagedLocalStorageDevice(Path.Combine(directory, "hlog.log"));
_log = new FasterLog(new FasterLogSettings { LogDevice = _device });
RecoverState(); // sets _nextScanAddress + rebuilds _live/_index from the committed log
if (_commitMode == HistorizationCommitMode.Periodic)
{
_periodicCommitCts = new CancellationTokenSource();
_periodicCommitTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(commitIntervalMs));
// Started after RecoverState so it never races a half-recovered instance.
_periodicCommitLoop = RunPeriodicCommitLoopAsync(_periodicCommitTimer, _periodicCommitCts.Token);
}
}
/// <inheritdoc />
public long DroppedCount
{
get
{
lock (_state)
{
return _droppedCount;
}
}
}
/// <inheritdoc />
public async ValueTask AppendAsync(HistorizationOutboxEntry entry, CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(entry);
byte[] payload = HistorizationOutboxEntrySerializer.Serialize(entry);
long startAddress = await _log.EnqueueAsync(payload, ct).ConfigureAwait(false);
if (_commitMode == HistorizationCommitMode.PerEntry)
{
// PerEntry: durable before returning. Periodic skips this — the background timer (and
// Dispose) commit on their cadence (accepted throughput/latency trade-off).
await _log.CommitAsync(ct).ConfigureAwait(false);
}
long? truncateTo = null;
lock (_state)
{
LinkedListNode<LiveEntry> node = _live.AddLast(new LiveEntry(entry.Id, startAddress));
_index[entry.Id] = node;
// Drop-oldest on overflow. The new head is the start address of whatever entry survives
// at the front (or the tail if the log emptied); truncate to the furthest such address.
while (_capacity > 0 && _live.Count > _capacity)
{
LinkedListNode<LiveEntry> oldest = _live.First!;
truncateTo = oldest.Next?.Value.Start ?? _log.TailAddress;
_index.Remove(oldest.Value.Id);
_live.RemoveFirst();
_droppedCount++;
}
if (truncateTo is long head)
_nextScanAddress = head;
}
if (truncateTo is long truncateAddr)
{
_log.TruncateUntil(truncateAddr);
await _log.CommitAsync(ct).ConfigureAwait(false); // make the drop durable
}
}
/// <inheritdoc />
public ValueTask<IReadOnlyList<HistorizationOutboxEntry>> PeekBatchAsync(int max, CancellationToken ct)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(max);
var batch = new List<HistorizationOutboxEntry>(Math.Min(max, 64));
lock (_state)
{
using FasterLogScanIterator iter = _log.Scan(_nextScanAddress, _log.TailAddress, recover: false);
while (batch.Count < max && iter.GetNext(out byte[] bytes, out int len, out _, out _))
{
batch.Add(HistorizationOutboxEntrySerializer.Deserialize(bytes.AsSpan(0, len)));
}
}
return ValueTask.FromResult<IReadOnlyList<HistorizationOutboxEntry>>(batch);
}
/// <inheritdoc />
public async ValueTask RemoveAsync(Guid id, CancellationToken ct)
{
long truncateTo;
lock (_state)
{
if (!_index.TryGetValue(id, out LinkedListNode<LiveEntry>? node))
return; // unknown / already removed -> defensive no-op
truncateTo = node.Next?.Value.Start ?? _log.TailAddress;
// FIFO ack: remove the target plus any older entries still ahead of it.
while (_live.First is { } first)
{
bool isTarget = ReferenceEquals(first, node);
_index.Remove(first.Value.Id);
_live.RemoveFirst();
if (isTarget)
break;
}
_nextScanAddress = truncateTo;
}
_log.TruncateUntil(truncateTo);
await _log.CommitAsync(ct).ConfigureAwait(false); // make the head advance durable
}
/// <inheritdoc />
public ValueTask<int> CountAsync(CancellationToken ct)
{
lock (_state)
{
return ValueTask.FromResult(_live.Count);
}
}
// Rebuild the in-memory FIFO index from the committed log after a restart. The FasterLog ctor has
// already recovered BeginAddress/TailAddress from the on-disk commit metadata, so scanning
// [BeginAddress, TailAddress) yields exactly the untruncated (un-acked) records in FIFO order, and
// BeginAddress is the recovered logical head.
//
// CTOR-ONLY: called once before the instance is published and before the periodic-commit loop
// starts. It unconditionally seeds _nextScanAddress/_live/_index, so it must NEVER run post-ctor.
private void RecoverState()
{
_nextScanAddress = _log.BeginAddress;
using FasterLogScanIterator iter = _log.Scan(_log.BeginAddress, _log.TailAddress, recover: false);
while (iter.GetNext(out byte[] bytes, out int len, out long currentAddress, out _))
{
HistorizationOutboxEntry entry = HistorizationOutboxEntrySerializer.Deserialize(bytes.AsSpan(0, len));
LinkedListNode<LiveEntry> node = _live.AddLast(new LiveEntry(entry.Id, currentAddress));
_index[entry.Id] = node;
}
}
// Periodic-mode auto-commit: best-effort _log.Commit every interval until cancelled. Commit
// failures are swallowed so the loop survives transient errors; the per-remove/per-drop commits
// and Dispose's final spin-wait commit still bound durability.
private async Task RunPeriodicCommitLoopAsync(PeriodicTimer timer, CancellationToken cancellationToken)
{
try
{
while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false))
{
try
{
_log.Commit(spinWait: false);
}
catch (FasterException)
{
// Transient/teardown commit failure — keep ticking.
}
}
}
catch (OperationCanceledException)
{
// Expected on Dispose: the CTS cancelled the wait. Normal teardown.
}
}
/// <summary>
/// Stops the periodic-commit loop (Periodic mode), flushes a final commit (best-effort), and
/// releases the log + device. Idempotent.
/// </summary>
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
// Stop the periodic loop BEFORE the final commit so we don't race the loop's Commit against
// the teardown commit / log dispose. Await the loop (it absorbs cancellation) so no Task leaks.
if (_periodicCommitCts is not null)
{
_periodicCommitCts.Cancel();
try
{
_periodicCommitLoop?.GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
// Cancellation is the expected stop signal — not an error.
}
_periodicCommitTimer?.Dispose();
_periodicCommitCts.Dispose();
}
try
{
_log.Commit(spinWait: true);
}
catch (FasterException)
{
// Best-effort final commit on teardown: already-committed enqueues remain durable.
}
_log.Dispose();
_device.Dispose();
}
}
@@ -0,0 +1,59 @@
using System.Buffers.Binary;
using System.Text;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder;
/// <summary>
/// Compact, allocation-light little-endian binary (de)serializer for
/// <see cref="HistorizationOutboxEntry"/> records persisted to the FasterLog outbox. The entry is
/// all primitives, so a fixed binary layout is smaller and faster than JSON and avoids any
/// reflection at the durable boundary.
/// </summary>
/// <remarks>
/// Layout (little-endian): <c>Guid(16) | tagByteLen:int32(4) | tagUtf8(n) | value:double(8) |
/// quality:uint16(2) | timestamp:int64(8)</c>. The timestamp is <see cref="DateTime.ToBinary"/>,
/// which round-trips <see cref="DateTimeKind"/>.
/// </remarks>
internal static class HistorizationOutboxEntrySerializer
{
/// <summary>Serializes <paramref name="entry"/> to a fixed-layout little-endian byte array.</summary>
public static byte[] Serialize(HistorizationOutboxEntry entry)
{
ArgumentNullException.ThrowIfNull(entry);
int tagLen = Encoding.UTF8.GetByteCount(entry.Tag);
var buffer = new byte[16 + 4 + tagLen + 8 + 2 + 8];
Span<byte> span = buffer;
entry.Id.TryWriteBytes(span[..16]);
BinaryPrimitives.WriteInt32LittleEndian(span.Slice(16, 4), tagLen);
Encoding.UTF8.GetBytes(entry.Tag, span.Slice(20, tagLen));
int p = 20 + tagLen;
BinaryPrimitives.WriteDoubleLittleEndian(span.Slice(p, 8), entry.NumericValue);
p += 8;
BinaryPrimitives.WriteUInt16LittleEndian(span.Slice(p, 2), entry.Quality);
p += 2;
BinaryPrimitives.WriteInt64LittleEndian(span.Slice(p, 8), entry.TimestampUtc.ToBinary());
return buffer;
}
/// <summary>Reconstructs a <see cref="HistorizationOutboxEntry"/> from its serialized bytes.</summary>
public static HistorizationOutboxEntry Deserialize(ReadOnlySpan<byte> span)
{
var id = new Guid(span[..16]);
int tagLen = BinaryPrimitives.ReadInt32LittleEndian(span.Slice(16, 4));
string tag = Encoding.UTF8.GetString(span.Slice(20, tagLen));
int p = 20 + tagLen;
double value = BinaryPrimitives.ReadDoubleLittleEndian(span.Slice(p, 8));
p += 8;
ushort quality = BinaryPrimitives.ReadUInt16LittleEndian(span.Slice(p, 2));
p += 2;
long timestamp = BinaryPrimitives.ReadInt64LittleEndian(span.Slice(p, 8));
return new HistorizationOutboxEntry(id, tag, value, quality, DateTime.FromBinary(timestamp));
}
}
@@ -22,6 +22,9 @@
<PackageReference Include="ZB.MOM.WW.HistorianGateway.Client" />
<PackageReference Include="ZB.MOM.WW.HistorianGateway.Contracts" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<!-- FasterLog-backed durable historization outbox (Recorder/FasterLogHistorizationOutbox).
Pure-managed FasterLog; same package the gateway's store-forward outbox uses. -->
<PackageReference Include="Microsoft.FASTER.Core" />
</ItemGroup>
<ItemGroup>
@@ -0,0 +1,123 @@
using System.Linq;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.Recorder;
/// <summary>
/// Durability + FIFO contract tests for the FasterLog-backed historization outbox. The
/// remove-then-reopen (restart durability) and drop-oldest (capacity) cases are load-bearing —
/// the outbox is the durable boundary the continuous-historization recorder acks against.
/// </summary>
public sealed class FasterLogHistorizationOutboxTests : IDisposable
{
private readonly List<string> _dirs = new();
private string NewTempDir()
{
var dir = Path.Combine(Path.GetTempPath(), "histgw-outbox-" + Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(dir);
_dirs.Add(dir);
return dir;
}
private static HistorizationOutboxEntry E(string tag, double v) =>
new(Guid.NewGuid(), tag, v, 192, new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc));
[Fact]
public async Task Append_then_peek_returns_fifo()
{
var dir = NewTempDir();
using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry);
await o.AppendAsync(E("A", 1), TestContext.Current.CancellationToken);
await o.AppendAsync(E("B", 2), TestContext.Current.CancellationToken);
var batch = await o.PeekBatchAsync(10, TestContext.Current.CancellationToken);
Assert.Equal(new[] { "A", "B" }, batch.Select(b => b.Tag));
Assert.Equal(2, await o.CountAsync(TestContext.Current.CancellationToken));
}
[Fact]
public async Task Remove_truncates_and_survives_restart()
{
var dir = NewTempDir();
Guid keep;
{
using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry);
var a = E("A", 1);
var b = E("B", 2);
keep = b.Id;
await o.AppendAsync(a, TestContext.Current.CancellationToken);
await o.AppendAsync(b, TestContext.Current.CancellationToken);
await o.PeekBatchAsync(10, TestContext.Current.CancellationToken);
await o.RemoveAsync(a.Id, TestContext.Current.CancellationToken); // ack A
}
using var reopened = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry);
Assert.Equal(1, await reopened.CountAsync(TestContext.Current.CancellationToken)); // only B survives
var batch = await reopened.PeekBatchAsync(10, TestContext.Current.CancellationToken);
Assert.Equal(keep, batch[0].Id);
}
[Fact]
public async Task Capacity_full_drops_oldest_and_counts()
{
var dir = NewTempDir();
using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry, capacity: 2);
await o.AppendAsync(E("A", 1), TestContext.Current.CancellationToken);
await o.AppendAsync(E("B", 2), TestContext.Current.CancellationToken);
await o.AppendAsync(E("C", 3), TestContext.Current.CancellationToken); // overflow -> drop oldest (A)
Assert.Equal(2, await o.CountAsync(TestContext.Current.CancellationToken));
Assert.Equal(1, o.DroppedCount);
var tags = (await o.PeekBatchAsync(10, TestContext.Current.CancellationToken)).Select(b => b.Tag).ToArray();
Assert.DoesNotContain("A", tags);
}
[Fact]
public async Task Periodic_mode_commits_and_recovers()
{
var dir = NewTempDir();
var a = E("A", 1);
var b = E("B", 2);
{
using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.Periodic, commitIntervalMs: 20);
await o.AppendAsync(a, TestContext.Current.CancellationToken);
await o.AppendAsync(b, TestContext.Current.CancellationToken);
// Dispose flushes a final commit, making the periodic-mode appends durable.
}
using var reopened = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.Periodic, commitIntervalMs: 20);
Assert.Equal(2, await reopened.CountAsync(TestContext.Current.CancellationToken));
var batch = await reopened.PeekBatchAsync(10, TestContext.Current.CancellationToken);
Assert.Equal(new[] { a.Id, b.Id }, batch.Select(e => e.Id));
}
[Fact]
public async Task Remove_unknown_id_is_noop()
{
var dir = NewTempDir();
using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry);
await o.AppendAsync(E("A", 1), TestContext.Current.CancellationToken);
await o.RemoveAsync(Guid.NewGuid(), TestContext.Current.CancellationToken); // never appended -> no-op
Assert.Equal(1, await o.CountAsync(TestContext.Current.CancellationToken));
}
public void Dispose()
{
foreach (var dir in _dirs)
{
try
{
Directory.Delete(dir, recursive: true);
}
catch (IOException)
{
// Best-effort cleanup; a lingering OS handle must not fail the test run.
}
catch (UnauthorizedAccessException)
{
// Best-effort cleanup.
}
}
}
}