diff --git a/Directory.Packages.props b/Directory.Packages.props index 3c258375..a069781b 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -56,6 +56,7 @@ + diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/HistorizationCommitMode.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/HistorizationCommitMode.cs new file mode 100644 index 00000000..b6577321 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/HistorizationCommitMode.cs @@ -0,0 +1,14 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; + +/// +/// Per-append durability cadence for the historization outbox. Local to the OtOpcUa abstraction +/// layer (deliberately decoupled from the gateway's internal store-forward commit-mode type). +/// +public enum HistorizationCommitMode +{ + /// fsync the log before each AppendAsync returns — safest, no loss window. + PerEntry, + + /// Batch commits onto a background timer — higher throughput, a bounded worst-case loss window. + Periodic, +} diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/HistorizationOutboxEntry.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/HistorizationOutboxEntry.cs new file mode 100644 index 00000000..f7b9ef20 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/HistorizationOutboxEntry.cs @@ -0,0 +1,18 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; + +/// +/// One durable record buffered by the continuous-historization outbox before it is written to +/// the historian. Carries the minimal payload the SQL analog live-value write path can ingest: +/// a numeric value, a quality code, and a UTC timestamp keyed by tag. +/// +/// Stable identifier used to ack (remove) the entry once written. Unique per append. +/// Fully-qualified historian tag name the value is recorded against. +/// The coerced numeric sample value (the SQL write path is numeric-only). +/// OPC-UA-derived quality code (e.g. 192 = Good) carried through to the historian. +/// UTC source timestamp of the sample. +public sealed record HistorizationOutboxEntry( + Guid Id, + string Tag, + double NumericValue, + ushort Quality, + DateTime TimestampUtc); diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorizationOutbox.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorizationOutbox.cs new file mode 100644 index 00000000..5f44ce27 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorizationOutbox.cs @@ -0,0 +1,40 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; + +/// +/// Durable, crash-safe FIFO buffer the continuous-historization recorder appends sampled values +/// to before acking the writer, so nothing is lost if the process dies mid-drain. An +/// implementation guarantees: appended entries survive an unclean restart up to its commit +/// cadence; returns entries in append (FIFO) order; and +/// durably reclaims an acked entry. A capacity-bounded implementation +/// drops the oldest entry on overflow and reflects it in . +/// +public interface IHistorizationOutbox : IDisposable +{ + /// Lifetime count of entries dropped because an append would have exceeded capacity. + long DroppedCount { get; } + + /// Appends to the tail of the durable buffer. + /// The value record to buffer. + /// Cancellation token. + ValueTask AppendAsync(HistorizationOutboxEntry entry, CancellationToken ct); + + /// + /// Returns up to oldest un-acked entries in FIFO order without removing + /// them. Removal happens via once each entry is durably written. + /// + /// Maximum number of entries to return; must be positive. + /// Cancellation token. + ValueTask> PeekBatchAsync(int max, CancellationToken ct); + + /// + /// Durably removes the entry identified by (and any older entries ahead + /// of it in FIFO order), advancing the buffer head. A no-op when the id is unknown. + /// + /// The to ack. + /// Cancellation token. + ValueTask RemoveAsync(Guid id, CancellationToken ct); + + /// Current number of un-acked entries held in the buffer. + /// Cancellation token. + ValueTask CountAsync(CancellationToken ct); +} diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs new file mode 100644 index 00000000..06e33890 --- /dev/null +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs @@ -0,0 +1,278 @@ +using FASTER.core; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder; + +/// +/// Crash-safe, FIFO backed by a single FasterLog (append-only +/// persistent log) under <directory>/hlog.log. Maps the outbox onto FasterLog: +/// append → ; peek → forward scan +/// from the logical head; remove → (head advance + reclaim) +/// + commit. In-memory FIFO state (entry id → log start address) is rebuilt from the committed log +/// by a one-pass startup scan, so acked truncations survive an unclean restart. +/// +/// +/// Mirrors the gateway's FasterLogOutboxStore and adds a bounded-capacity drop-oldest +/// policy: when an append would exceed capacity, the head is advanced past the oldest +/// entry (truncate + commit) and is incremented. Assumes serialized +/// appends (the recorder actor processes messages sequentially); the lock protects the in-memory +/// index, and FasterLog itself tolerates concurrent enqueue/scan. +/// +public sealed class FasterLogHistorizationOutbox : IHistorizationOutbox +{ + private readonly record struct LiveEntry(Guid Id, long Start); + + private readonly ManagedLocalStorageDevice _device; + private readonly FasterLog _log; + private readonly Lock _state = new(); + private readonly HistorizationCommitMode _commitMode; + private readonly int _capacity; + + // Periodic-mode auto-commit machinery (null under PerEntry). The CTS stops the loop, the timer + // paces it, and the loop Task is retained so Dispose can await it (never leaving an unobserved Task). + private readonly CancellationTokenSource? _periodicCommitCts; + private readonly PeriodicTimer? _periodicCommitTimer; + private readonly Task? _periodicCommitLoop; + private bool _disposed; + + // FIFO of live (appended-but-not-acked) entries with their FasterLog start addresses, plus an id + // index for O(1) remove. All three (+ _nextScanAddress, _droppedCount) are read/written under _state. + private readonly LinkedList _live = new(); + private readonly Dictionary> _index = new(); + private long _nextScanAddress; // authoritative logical head; peeks scan from here + private long _droppedCount; + + /// + /// Opens (or recovers) the FasterLog-backed outbox under . + /// + /// Directory holding the FasterLog segment + commit files. + /// + /// fsyncs before each append returns; + /// commits on a background timer every + /// ms. + /// + /// Periodic-mode commit cadence in ms; must be positive when Periodic. + /// + /// Maximum un-acked entries before drop-oldest kicks in; 0 (default) means unbounded. + /// + public FasterLogHistorizationOutbox( + string directory, + HistorizationCommitMode commitMode = HistorizationCommitMode.PerEntry, + int commitIntervalMs = 100, + int capacity = 0) + { + ArgumentException.ThrowIfNullOrWhiteSpace(directory); + ArgumentOutOfRangeException.ThrowIfNegative(capacity); + if (commitMode == HistorizationCommitMode.Periodic) + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(commitIntervalMs); + + Directory.CreateDirectory(directory); + _commitMode = commitMode; + _capacity = capacity; + _device = new ManagedLocalStorageDevice(Path.Combine(directory, "hlog.log")); + _log = new FasterLog(new FasterLogSettings { LogDevice = _device }); + RecoverState(); // sets _nextScanAddress + rebuilds _live/_index from the committed log + + if (_commitMode == HistorizationCommitMode.Periodic) + { + _periodicCommitCts = new CancellationTokenSource(); + _periodicCommitTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(commitIntervalMs)); + // Started after RecoverState so it never races a half-recovered instance. + _periodicCommitLoop = RunPeriodicCommitLoopAsync(_periodicCommitTimer, _periodicCommitCts.Token); + } + } + + /// + public long DroppedCount + { + get + { + lock (_state) + { + return _droppedCount; + } + } + } + + /// + public async ValueTask AppendAsync(HistorizationOutboxEntry entry, CancellationToken ct) + { + ArgumentNullException.ThrowIfNull(entry); + + byte[] payload = HistorizationOutboxEntrySerializer.Serialize(entry); + long startAddress = await _log.EnqueueAsync(payload, ct).ConfigureAwait(false); + if (_commitMode == HistorizationCommitMode.PerEntry) + { + // PerEntry: durable before returning. Periodic skips this — the background timer (and + // Dispose) commit on their cadence (accepted throughput/latency trade-off). + await _log.CommitAsync(ct).ConfigureAwait(false); + } + + long? truncateTo = null; + lock (_state) + { + LinkedListNode node = _live.AddLast(new LiveEntry(entry.Id, startAddress)); + _index[entry.Id] = node; + + // Drop-oldest on overflow. The new head is the start address of whatever entry survives + // at the front (or the tail if the log emptied); truncate to the furthest such address. + while (_capacity > 0 && _live.Count > _capacity) + { + LinkedListNode oldest = _live.First!; + truncateTo = oldest.Next?.Value.Start ?? _log.TailAddress; + _index.Remove(oldest.Value.Id); + _live.RemoveFirst(); + _droppedCount++; + } + + if (truncateTo is long head) + _nextScanAddress = head; + } + + if (truncateTo is long truncateAddr) + { + _log.TruncateUntil(truncateAddr); + await _log.CommitAsync(ct).ConfigureAwait(false); // make the drop durable + } + } + + /// + public ValueTask> PeekBatchAsync(int max, CancellationToken ct) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(max); + + var batch = new List(Math.Min(max, 64)); + lock (_state) + { + using FasterLogScanIterator iter = _log.Scan(_nextScanAddress, _log.TailAddress, recover: false); + while (batch.Count < max && iter.GetNext(out byte[] bytes, out int len, out _, out _)) + { + batch.Add(HistorizationOutboxEntrySerializer.Deserialize(bytes.AsSpan(0, len))); + } + } + + return ValueTask.FromResult>(batch); + } + + /// + public async ValueTask RemoveAsync(Guid id, CancellationToken ct) + { + long truncateTo; + lock (_state) + { + if (!_index.TryGetValue(id, out LinkedListNode? node)) + return; // unknown / already removed -> defensive no-op + + truncateTo = node.Next?.Value.Start ?? _log.TailAddress; + // FIFO ack: remove the target plus any older entries still ahead of it. + while (_live.First is { } first) + { + bool isTarget = ReferenceEquals(first, node); + _index.Remove(first.Value.Id); + _live.RemoveFirst(); + if (isTarget) + break; + } + + _nextScanAddress = truncateTo; + } + + _log.TruncateUntil(truncateTo); + await _log.CommitAsync(ct).ConfigureAwait(false); // make the head advance durable + } + + /// + public ValueTask CountAsync(CancellationToken ct) + { + lock (_state) + { + return ValueTask.FromResult(_live.Count); + } + } + + // Rebuild the in-memory FIFO index from the committed log after a restart. The FasterLog ctor has + // already recovered BeginAddress/TailAddress from the on-disk commit metadata, so scanning + // [BeginAddress, TailAddress) yields exactly the untruncated (un-acked) records in FIFO order, and + // BeginAddress is the recovered logical head. + // + // CTOR-ONLY: called once before the instance is published and before the periodic-commit loop + // starts. It unconditionally seeds _nextScanAddress/_live/_index, so it must NEVER run post-ctor. + private void RecoverState() + { + _nextScanAddress = _log.BeginAddress; + + using FasterLogScanIterator iter = _log.Scan(_log.BeginAddress, _log.TailAddress, recover: false); + while (iter.GetNext(out byte[] bytes, out int len, out long currentAddress, out _)) + { + HistorizationOutboxEntry entry = HistorizationOutboxEntrySerializer.Deserialize(bytes.AsSpan(0, len)); + LinkedListNode node = _live.AddLast(new LiveEntry(entry.Id, currentAddress)); + _index[entry.Id] = node; + } + } + + // Periodic-mode auto-commit: best-effort _log.Commit every interval until cancelled. Commit + // failures are swallowed so the loop survives transient errors; the per-remove/per-drop commits + // and Dispose's final spin-wait commit still bound durability. + private async Task RunPeriodicCommitLoopAsync(PeriodicTimer timer, CancellationToken cancellationToken) + { + try + { + while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false)) + { + try + { + _log.Commit(spinWait: false); + } + catch (FasterException) + { + // Transient/teardown commit failure — keep ticking. + } + } + } + catch (OperationCanceledException) + { + // Expected on Dispose: the CTS cancelled the wait. Normal teardown. + } + } + + /// + /// Stops the periodic-commit loop (Periodic mode), flushes a final commit (best-effort), and + /// releases the log + device. Idempotent. + /// + public void Dispose() + { + if (_disposed) + return; + _disposed = true; + + // Stop the periodic loop BEFORE the final commit so we don't race the loop's Commit against + // the teardown commit / log dispose. Await the loop (it absorbs cancellation) so no Task leaks. + if (_periodicCommitCts is not null) + { + _periodicCommitCts.Cancel(); + try + { + _periodicCommitLoop?.GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + // Cancellation is the expected stop signal — not an error. + } + + _periodicCommitTimer?.Dispose(); + _periodicCommitCts.Dispose(); + } + + try + { + _log.Commit(spinWait: true); + } + catch (FasterException) + { + // Best-effort final commit on teardown: already-committed enqueues remain durable. + } + + _log.Dispose(); + _device.Dispose(); + } +} diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/HistorizationOutboxEntrySerializer.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/HistorizationOutboxEntrySerializer.cs new file mode 100644 index 00000000..108909b4 --- /dev/null +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/HistorizationOutboxEntrySerializer.cs @@ -0,0 +1,59 @@ +using System.Buffers.Binary; +using System.Text; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder; + +/// +/// Compact, allocation-light little-endian binary (de)serializer for +/// records persisted to the FasterLog outbox. The entry is +/// all primitives, so a fixed binary layout is smaller and faster than JSON and avoids any +/// reflection at the durable boundary. +/// +/// +/// Layout (little-endian): Guid(16) | tagByteLen:int32(4) | tagUtf8(n) | value:double(8) | +/// quality:uint16(2) | timestamp:int64(8). The timestamp is , +/// which round-trips . +/// +internal static class HistorizationOutboxEntrySerializer +{ + /// Serializes to a fixed-layout little-endian byte array. + public static byte[] Serialize(HistorizationOutboxEntry entry) + { + ArgumentNullException.ThrowIfNull(entry); + + int tagLen = Encoding.UTF8.GetByteCount(entry.Tag); + var buffer = new byte[16 + 4 + tagLen + 8 + 2 + 8]; + Span span = buffer; + + entry.Id.TryWriteBytes(span[..16]); + BinaryPrimitives.WriteInt32LittleEndian(span.Slice(16, 4), tagLen); + Encoding.UTF8.GetBytes(entry.Tag, span.Slice(20, tagLen)); + + int p = 20 + tagLen; + BinaryPrimitives.WriteDoubleLittleEndian(span.Slice(p, 8), entry.NumericValue); + p += 8; + BinaryPrimitives.WriteUInt16LittleEndian(span.Slice(p, 2), entry.Quality); + p += 2; + BinaryPrimitives.WriteInt64LittleEndian(span.Slice(p, 8), entry.TimestampUtc.ToBinary()); + + return buffer; + } + + /// Reconstructs a from its serialized bytes. + public static HistorizationOutboxEntry Deserialize(ReadOnlySpan span) + { + var id = new Guid(span[..16]); + int tagLen = BinaryPrimitives.ReadInt32LittleEndian(span.Slice(16, 4)); + string tag = Encoding.UTF8.GetString(span.Slice(20, tagLen)); + + int p = 20 + tagLen; + double value = BinaryPrimitives.ReadDoubleLittleEndian(span.Slice(p, 8)); + p += 8; + ushort quality = BinaryPrimitives.ReadUInt16LittleEndian(span.Slice(p, 2)); + p += 2; + long timestamp = BinaryPrimitives.ReadInt64LittleEndian(span.Slice(p, 8)); + + return new HistorizationOutboxEntry(id, tag, value, quality, DateTime.FromBinary(timestamp)); + } +} diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj index 8660ec4f..262241a0 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj @@ -22,6 +22,9 @@ + + diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs new file mode 100644 index 00000000..bc4d1213 --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs @@ -0,0 +1,123 @@ +using System.Linq; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests.Recorder; + +/// +/// Durability + FIFO contract tests for the FasterLog-backed historization outbox. The +/// remove-then-reopen (restart durability) and drop-oldest (capacity) cases are load-bearing — +/// the outbox is the durable boundary the continuous-historization recorder acks against. +/// +public sealed class FasterLogHistorizationOutboxTests : IDisposable +{ + private readonly List _dirs = new(); + + private string NewTempDir() + { + var dir = Path.Combine(Path.GetTempPath(), "histgw-outbox-" + Guid.NewGuid().ToString("N")); + Directory.CreateDirectory(dir); + _dirs.Add(dir); + return dir; + } + + private static HistorizationOutboxEntry E(string tag, double v) => + new(Guid.NewGuid(), tag, v, 192, new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc)); + + [Fact] + public async Task Append_then_peek_returns_fifo() + { + var dir = NewTempDir(); + using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry); + await o.AppendAsync(E("A", 1), TestContext.Current.CancellationToken); + await o.AppendAsync(E("B", 2), TestContext.Current.CancellationToken); + var batch = await o.PeekBatchAsync(10, TestContext.Current.CancellationToken); + Assert.Equal(new[] { "A", "B" }, batch.Select(b => b.Tag)); + Assert.Equal(2, await o.CountAsync(TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task Remove_truncates_and_survives_restart() + { + var dir = NewTempDir(); + Guid keep; + { + using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry); + var a = E("A", 1); + var b = E("B", 2); + keep = b.Id; + await o.AppendAsync(a, TestContext.Current.CancellationToken); + await o.AppendAsync(b, TestContext.Current.CancellationToken); + await o.PeekBatchAsync(10, TestContext.Current.CancellationToken); + await o.RemoveAsync(a.Id, TestContext.Current.CancellationToken); // ack A + } + + using var reopened = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry); + Assert.Equal(1, await reopened.CountAsync(TestContext.Current.CancellationToken)); // only B survives + var batch = await reopened.PeekBatchAsync(10, TestContext.Current.CancellationToken); + Assert.Equal(keep, batch[0].Id); + } + + [Fact] + public async Task Capacity_full_drops_oldest_and_counts() + { + var dir = NewTempDir(); + using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry, capacity: 2); + await o.AppendAsync(E("A", 1), TestContext.Current.CancellationToken); + await o.AppendAsync(E("B", 2), TestContext.Current.CancellationToken); + await o.AppendAsync(E("C", 3), TestContext.Current.CancellationToken); // overflow -> drop oldest (A) + Assert.Equal(2, await o.CountAsync(TestContext.Current.CancellationToken)); + Assert.Equal(1, o.DroppedCount); + var tags = (await o.PeekBatchAsync(10, TestContext.Current.CancellationToken)).Select(b => b.Tag).ToArray(); + Assert.DoesNotContain("A", tags); + } + + [Fact] + public async Task Periodic_mode_commits_and_recovers() + { + var dir = NewTempDir(); + var a = E("A", 1); + var b = E("B", 2); + { + using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.Periodic, commitIntervalMs: 20); + await o.AppendAsync(a, TestContext.Current.CancellationToken); + await o.AppendAsync(b, TestContext.Current.CancellationToken); + // Dispose flushes a final commit, making the periodic-mode appends durable. + } + + using var reopened = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.Periodic, commitIntervalMs: 20); + Assert.Equal(2, await reopened.CountAsync(TestContext.Current.CancellationToken)); + var batch = await reopened.PeekBatchAsync(10, TestContext.Current.CancellationToken); + Assert.Equal(new[] { a.Id, b.Id }, batch.Select(e => e.Id)); + } + + [Fact] + public async Task Remove_unknown_id_is_noop() + { + var dir = NewTempDir(); + using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry); + await o.AppendAsync(E("A", 1), TestContext.Current.CancellationToken); + await o.RemoveAsync(Guid.NewGuid(), TestContext.Current.CancellationToken); // never appended -> no-op + Assert.Equal(1, await o.CountAsync(TestContext.Current.CancellationToken)); + } + + public void Dispose() + { + foreach (var dir in _dirs) + { + try + { + Directory.Delete(dir, recursive: true); + } + catch (IOException) + { + // Best-effort cleanup; a lingering OS handle must not fail the test run. + } + catch (UnauthorizedAccessException) + { + // Best-effort cleanup. + } + } + } +}