diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorianValueWriter.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorianValueWriter.cs new file mode 100644 index 00000000..9f915a95 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/Historian/IHistorianValueWriter.cs @@ -0,0 +1,35 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; + +/// +/// One numeric sample the continuous-historization recorder drains to the historian's SQL +/// live-value write path. Carries the minimal payload that path can ingest: an optional UTC +/// timestamp, the coerced numeric value, and an OPC-DA quality byte. +/// +/// +/// UTC source timestamp of the sample, or null to defer to the historian's server-stamped +/// current-time path (the SQL writer uses SYSDATETIME() when the timestamp is absent). +/// +/// The coerced numeric value (the SQL analog write path is numeric-only). +/// OPC-DA-derived quality code carried to the historian (192 = Good). +public readonly record struct HistorizationValue(DateTime? TimestampUtc, double Value, ushort Quality); + +/// +/// Seam over the historian's live-value write path used by the continuous-historization recorder. +/// Lives in the abstraction layer so the Runtime recorder depends on it without taking a hard +/// reference on the gRPC gateway driver; the gateway driver supplies the concrete adapter +/// (GatewayHistorianValueWriter). +/// +public interface IHistorianValueWriter +{ + /// + /// Writes a batch of live values for a single tag through the historian's SQL live-write path. + /// Implementations are expected to be non-throwing: a transport/gateway error is surfaced as a + /// false result so the recorder retains the entries and retries, rather than as an + /// exception. + /// + /// Fully-qualified historian tag the values are recorded against. + /// The numeric samples to write, in append order. + /// Cancellation token. + /// true on a successful (or durably-queued) gateway ack; false on a retryable failure. + Task WriteLiveValuesAsync(string tag, IReadOnlyList values, CancellationToken ct); +} diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/GatewayHistorianValueWriter.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/GatewayHistorianValueWriter.cs new file mode 100644 index 00000000..af80f717 --- /dev/null +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/GatewayHistorianValueWriter.cs @@ -0,0 +1,89 @@ +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder; + +/// +/// Adapts the gateway client's WriteLiveValues RPC to the Runtime recorder's +/// seam. Maps each onto a +/// proto (numeric value + quality, with an optional timestamp — +/// a null timestamp leaves the proto field unset so the gateway's SQL writer server-stamps the +/// current time) and folds the returned to a single retry/ack boolean. +/// +/// +/// +/// Non-throwing by contract. The recorder's drain loop stays simple by treating the +/// writer as never throwing: any gateway/transport error (and a non-success, non-queued ack) +/// is mapped to false so the recorder retains the outbox entries and retries. Only the +/// failure category (the exception type name) is logged — never tag values, hostnames, or +/// credentials. +/// +/// +/// A success ack OR a store-forward-queued ack maps to true: a value the gateway +/// durably queued must not be re-drained. +/// +/// +public sealed class GatewayHistorianValueWriter : IHistorianValueWriter +{ + private readonly IHistorianGatewayClient _client; + private readonly ILogger _logger; + + /// Creates the writer over a gateway client seam. + /// The gateway client used for the WriteLiveValues write path. + /// Logger for failure-category diagnostics (never logs value content). + public GatewayHistorianValueWriter(IHistorianGatewayClient client, ILogger logger) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + public async Task WriteLiveValuesAsync( + string tag, IReadOnlyList values, CancellationToken ct) + { + ArgumentException.ThrowIfNullOrEmpty(tag); + ArgumentNullException.ThrowIfNull(values); + + if (values.Count == 0) + { + // Nothing to write is a trivially-successful ack — the recorder treats it as drained. + return true; + } + + try + { + var liveValues = new List(values.Count); + foreach (HistorizationValue value in values) + { + var live = new HistorianLiveValue + { + NumericValue = value.Value, + Quality = value.Quality, + }; + + if (value.TimestampUtc is { } timestampUtc) + { + // Timestamp.FromDateTime requires Utc kind; coerce defensively. A null timestamp + // leaves the proto field unset -> the gateway's SQL writer server-stamps now. + live.Timestamp = Timestamp.FromDateTime(DateTime.SpecifyKind(timestampUtc, DateTimeKind.Utc)); + } + + liveValues.Add(live); + } + + WriteAck ack = await _client.WriteLiveValuesAsync(tag, liveValues, ct).ConfigureAwait(false); + return ack.Success || ack.Queued; + } + catch (Exception exception) + { + // NEVER throw out of the writer — the recorder's drain expects a bool so its retain/retry + // logic stays simple. Log only the failure category (no value content, hostnames, or creds). + _logger.LogDebug( + "WriteLiveValues failed ({Exception}); recorder will retain and retry.", + exception.GetType().Name); + return false; + } + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs new file mode 100644 index 00000000..de0f76d0 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs @@ -0,0 +1,353 @@ +using Akka.Actor; +using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; + +/// +/// Continuous-historization engine for non-Galaxy (driver) tags. Registers interest with the +/// per-node for the configured historized tag refs, then taps the +/// values the mux fans for those refs: +/// value → coerce to numeric → append to the durable (the crash +/// boundary) → a self-paced drain writes the outbox through 's +/// live-value path and acks (truncates) on success. +/// +/// +/// +/// Message path (grounded against the real actors). The mux fans +/// — which carries only +/// (TagId, Value, TimestampUtc) and no quality — to every registered subscriber. +/// The recorder reuses that existing fan-out (no change to DriverHostActor), so values +/// arrive Good-quality by the same convention the scripted-alarm host uses (the mux only +/// forwards driver-published values); historized samples are recorded with +/// . +/// +/// +/// Numeric-only gate. The SQL analog live-value write path is numeric-only, so a +/// non-coercible value (string/null/reference) is dropped and metered rather than appended. +/// +/// +/// Non-blocking drain. Appends run on the actor thread (an awaited +/// so appends stay serialized — the durable +/// boundary completes before the next message), but the drain runs off the actor thread and +/// pipes its outcome back to Self () so the mailbox is never +/// blocked on the gateway write. A drain never throws into the actor: the drain task catches +/// everything and reports a failed , which backs the next attempt off +/// (exponential, capped). +/// +/// +public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers +{ + /// OPC-DA quality byte recorded for mux-fanned values (the mux drops quality; driver-published + /// values are Good by the same convention the scripted-alarm host applies). + public const ushort GoodQuality = 192; + + private const string DrainTimerKey = "drain"; + + /// Self-tick that triggers a drain attempt (from the timer or a fresh append). + private sealed class DrainTick + { + public static readonly DrainTick Instance = new(); + private DrainTick() { } + } + + /// Outcome of one off-thread drain pass, piped back to the actor thread. + private sealed record DrainResult(bool Success, int Acked, int Attempted); + + /// Ask the recorder for its current counters (used by the meter/health wiring). + public sealed class GetStatus + { + /// The singleton request instance. + public static readonly GetStatus Instance = new(); + private GetStatus() { } + } + + /// A point-in-time snapshot of the recorder's counters. + /// Un-acked entries currently held in the durable outbox. + /// Lifetime count of values appended to the outbox. + /// Lifetime count of values dropped for not being numeric-coercible. + /// Lifetime count of entries the outbox dropped on capacity overflow. + /// Whether the most recent drain pass acked cleanly. + public sealed record RecorderStatus( + int QueuedDepth, + long TotalRecorded, + long DroppedNonNumeric, + long OutboxDropped, + bool LastDrainSucceeded); + + private readonly IActorRef _dependencyMux; + private readonly IHistorianValueWriter _writer; + private readonly IHistorizationOutbox _outbox; + private readonly IReadOnlyList _historizedRefs; + private readonly HashSet _historizedSet; + private readonly int _drainBatchSize; + private readonly TimeSpan _drainInterval; + private readonly TimeSpan _minBackoff; + private readonly TimeSpan _maxBackoff; + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly CancellationTokenSource _lifetime = new(); + + private bool _draining; + private TimeSpan _currentBackoff; + private DateTime _nextAllowedDrainUtc = DateTime.MinValue; + private long _totalRecorded; + private long _droppedNonNumeric; + private bool _lastDrainSucceeded = true; + + /// Gets or sets the timer scheduler (set by Akka via ). + public ITimerScheduler Timers { get; set; } = null!; + + /// Builds props for the recorder. + /// The per-node dependency mux to register historized-ref interest with. + /// The live-value write seam the drain pushes batches through. + /// The durable FIFO outbox values are appended to before they are written. + /// The fully-qualified tag refs to historize. + /// Max entries peeked + written per drain pass (must be positive). + /// Steady drain cadence; also the post-success reschedule. + /// Initial retry backoff after a failed drain. + /// Cap on the exponential retry backoff. + /// Props for the actor. + public static Props Props( + IActorRef dependencyMux, + IHistorianValueWriter writer, + IHistorizationOutbox outbox, + IReadOnlyList historizedRefs, + int drainBatchSize = 64, + TimeSpan? drainInterval = null, + TimeSpan? minBackoff = null, + TimeSpan? maxBackoff = null) => + Akka.Actor.Props.Create(() => new ContinuousHistorizationRecorder( + dependencyMux, writer, outbox, historizedRefs, drainBatchSize, drainInterval, minBackoff, maxBackoff)); + + /// Initializes a new instance of the class. + /// The per-node dependency mux to register historized-ref interest with. + /// The live-value write seam the drain pushes batches through. + /// The durable FIFO outbox values are appended to before they are written. + /// The fully-qualified tag refs to historize. + /// Max entries peeked + written per drain pass (must be positive). + /// Steady drain cadence; also the post-success reschedule. + /// Initial retry backoff after a failed drain. + /// Cap on the exponential retry backoff. + public ContinuousHistorizationRecorder( + IActorRef dependencyMux, + IHistorianValueWriter writer, + IHistorizationOutbox outbox, + IReadOnlyList historizedRefs, + int drainBatchSize = 64, + TimeSpan? drainInterval = null, + TimeSpan? minBackoff = null, + TimeSpan? maxBackoff = null) + { + _dependencyMux = dependencyMux ?? throw new ArgumentNullException(nameof(dependencyMux)); + _writer = writer ?? throw new ArgumentNullException(nameof(writer)); + _outbox = outbox ?? throw new ArgumentNullException(nameof(outbox)); + _historizedRefs = historizedRefs ?? throw new ArgumentNullException(nameof(historizedRefs)); + _historizedSet = new HashSet(_historizedRefs, StringComparer.Ordinal); + _drainBatchSize = drainBatchSize > 0 ? drainBatchSize : 64; + _drainInterval = drainInterval is { } di && di > TimeSpan.Zero ? di : TimeSpan.FromSeconds(2); + _minBackoff = minBackoff is { } mb && mb > TimeSpan.Zero ? mb : TimeSpan.FromSeconds(1); + _maxBackoff = maxBackoff is { } xb && xb > TimeSpan.Zero ? xb : TimeSpan.FromSeconds(30); + _currentBackoff = _minBackoff; + + ReceiveAsync(OnValueChangedAsync); + Receive(_ => OnDrainTick()); + Receive(OnDrainResult); + ReceiveAsync(async _ => Sender.Tell(await BuildStatusAsync().ConfigureAwait(false))); + } + + /// + protected override void PreStart() + { + // Register interest for the historized refs so the mux fans their DependencyValueChanged to us. + _dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(_historizedRefs, Self)); + // Seed the steady drain cadence; appends also nudge a prompt drain (see OnValueChangedAsync). + Timers.StartSingleTimer(DrainTimerKey, DrainTick.Instance, _drainInterval); + base.PreStart(); + } + + /// + protected override void PostStop() + { + _lifetime.Cancel(); + _lifetime.Dispose(); + base.PostStop(); + } + + private async Task OnValueChangedAsync(VirtualTagActor.DependencyValueChanged msg) + { + // Defensive: only historize refs we registered interest for (the mux already scopes to these). + if (!_historizedSet.Contains(msg.TagId)) + { + return; + } + + if (!TryCoerceToDouble(msg.Value, out double numeric)) + { + // The SQL analog live-value path is numeric-only — drop non-coercible values (string/null/ + // reference) and meter the drop. No value content is logged. + _droppedNonNumeric++; + _log.Debug("ContinuousHistorization: dropped non-numeric value for a historized ref."); + return; + } + + var entry = new HistorizationOutboxEntry( + Guid.NewGuid(), + msg.TagId, + numeric, + GoodQuality, + DateTime.SpecifyKind(msg.TimestampUtc, DateTimeKind.Utc)); + + // Durable boundary: append (awaited so appends stay serialized) BEFORE the value is considered + // captured. The outbox drops the oldest entry on capacity overflow and tracks DroppedCount. + await _outbox.AppendAsync(entry, _lifetime.Token).ConfigureAwait(false); + _totalRecorded++; + + // Nudge a prompt drain attempt; the DrainTick handler de-dups (already draining) and honours + // any active backoff window, so this never bypasses a failure cooldown. + Self.Tell(DrainTick.Instance); + } + + private void OnDrainTick() + { + if (_draining) + { + // A drain is already in flight; it reschedules itself on completion. + return; + } + + if (DateTime.UtcNow < _nextAllowedDrainUtc) + { + // Inside a post-failure backoff cooldown — the scheduled timer will fire the retry. + return; + } + + _draining = true; + DrainOnceAsync(_lifetime.Token) + .PipeTo(Self, success: result => result, failure: ToFailedDrain); + } + + // Defensive failure mapper: DrainOnceAsync catches its own exceptions, so this only fires if the + // PipeTo plumbing itself faults. Map to a failed drain so the actor backs off rather than dies. + private static object ToFailedDrain(Exception _) => new DrainResult(false, 0, 0); + + private async Task DrainOnceAsync(CancellationToken ct) + { + try + { + IReadOnlyList batch = await _outbox.PeekBatchAsync(_drainBatchSize, ct) + .ConfigureAwait(false); + if (batch.Count == 0) + { + return new DrainResult(true, 0, 0); + } + + // Write per-tag batches. The outbox's RemoveAsync truncates the FIFO head THROUGH the acked + // id, so partial per-tag acking could drop un-written older entries; ack all-or-nothing — + // only when every tag's batch wrote do we truncate the whole peeked prefix (one commit). + var allOk = true; + foreach (IGrouping group in + batch.GroupBy(e => e.Tag, StringComparer.Ordinal)) + { + var values = group + .Select(e => new HistorizationValue(e.TimestampUtc, e.NumericValue, e.Quality)) + .ToList(); + + bool ok; + try + { + ok = await _writer.WriteLiveValuesAsync(group.Key, values, ct).ConfigureAwait(false); + } + catch + { + // The writer is non-throwing by contract; guard defensively so one tag's fault + // cannot fail the whole drain task. + ok = false; + } + + if (!ok) + { + allOk = false; + } + } + + if (!allOk) + { + // Leave the entire peeked prefix queued for the next (backed-off) attempt. + return new DrainResult(false, 0, batch.Count); + } + + // Ack the whole prefix by truncating through the last entry (FIFO head advance + commit). + await _outbox.RemoveAsync(batch[^1].Id, ct).ConfigureAwait(false); + return new DrainResult(true, batch.Count, batch.Count); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // Shutdown mid-drain: not a fault. Entries stay durable and drain next startup. + return new DrainResult(true, 0, 0); + } + catch (Exception exception) + { + // A drain exception must NEVER kill the actor. Log the category only (no value content) and + // report a failed pass so the next attempt backs off. + _log.Warning("ContinuousHistorization: drain pass failed ({Exception}); will retry.", + exception.GetType().Name); + return new DrainResult(false, 0, 0); + } + } + + private void OnDrainResult(DrainResult result) + { + _draining = false; + _lastDrainSucceeded = result.Success; + + if (result.Success) + { + // Healthy: reset backoff, clear the cooldown, and keep a steady cadence to catch stragglers + // appended during this pass. + _currentBackoff = _minBackoff; + _nextAllowedDrainUtc = DateTime.MinValue; + Timers.StartSingleTimer(DrainTimerKey, DrainTick.Instance, _drainInterval); + return; + } + + // Failed: schedule the retry after the current backoff, then grow it (capped) for the next. + TimeSpan delay = _currentBackoff; + _nextAllowedDrainUtc = DateTime.UtcNow + delay; + _currentBackoff = Min(TimeSpan.FromTicks(_currentBackoff.Ticks * 2), _maxBackoff); + Timers.StartSingleTimer(DrainTimerKey, DrainTick.Instance, delay); + } + + private async Task BuildStatusAsync() + { + int depth = await _outbox.CountAsync(_lifetime.Token).ConfigureAwait(false); + return new RecorderStatus( + depth, + _totalRecorded, + _droppedNonNumeric, + _outbox.DroppedCount, + _lastDrainSucceeded); + } + + private static TimeSpan Min(TimeSpan a, TimeSpan b) => a < b ? a : b; + + private static bool TryCoerceToDouble(object? value, out double result) + { + switch (value) + { + case double d: result = d; return true; + case float f: result = f; return true; + case int i: result = i; return true; + case long l: result = l; return true; + case short s: result = s; return true; + case ushort us: result = us; return true; + case uint ui: result = ui; return true; + case ulong ul: result = ul; return true; + case byte b: result = b; return true; + case sbyte sb: result = sb; return true; + case decimal dec: result = (double)dec; return true; + case bool bo: result = bo ? 1d : 0d; return true; // Boolean historizes as Int1 (1/0). + default: result = 0d; return false; // string / null / DateTime / reference -> dropped. + } + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs new file mode 100644 index 00000000..42ee87fe --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs @@ -0,0 +1,204 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; +using ZB.MOM.WW.OtOpcUa.Runtime.Historian; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Historian; + +/// +/// Verifies the : it registers historized-ref +/// interest with the dependency mux on start, appends mux-fanned +/// values to the durable outbox then drains +/// them to the live-value writer, retains entries when the writer fails, and drops non-numeric +/// values (the SQL analog write path is numeric-only). +/// +/// +/// Adapted from the plan's Task 17: the recorder handles the REAL fan-out message the mux emits — +/// (TagId/Value/TimestampUtc, no quality) — +/// not DriverInstanceActor.AttributeValuePublished. The mux drops quality, so the recorder +/// records Good-quality (the same convention the scripted-alarm host uses for mux values). +/// +public sealed class ContinuousHistorizationRecorderTests : TestKit +{ + [Fact] + public void Registers_interest_for_historized_refs_on_start() + { + var mux = CreateTestProbe(); + var writer = new FakeValueWriter(); + var outbox = new InMemoryOutbox(); + + Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, writer, outbox, historizedRefs: new[] { "Pump1.Temp" })); + + var reg = mux.ExpectMsg(); + Assert.Contains("Pump1.Temp", reg.TagRefs); + } + + [Fact] + public async Task DependencyValueChanged_appends_to_outbox_then_drains_to_writer() + { + var mux = CreateTestProbe(); + var writer = new FakeValueWriter { Succeed = true }; + var outbox = new InMemoryOutbox(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, writer, outbox, new[] { "Pump1.Temp" })); + + rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 42.0, DateTime.UtcNow)); + + await AwaitAssertAsync(() => + Assert.Contains(writer.Snapshot(), w => w.Tag == "Pump1.Temp" && w.Value == 42.0)); + await AwaitAssertAsync(async () => + Assert.Equal(0, await outbox.CountAsync(default))); // acked -> truncated + } + + [Fact] + public async Task Writer_failure_keeps_entry_for_retry() + { + var mux = CreateTestProbe(); + var writer = new FakeValueWriter { Succeed = false }; + var outbox = new InMemoryOutbox(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, writer, outbox, new[] { "Pump1.Temp" })); + + rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 7.0, DateTime.UtcNow)); + + await AwaitAssertAsync(async () => + Assert.Equal(1, await outbox.CountAsync(default))); // not acked -> retained for retry + } + + [Fact] + public async Task Non_numeric_value_is_dropped_with_metric() + { + var mux = CreateTestProbe(); + var writer = new FakeValueWriter(); + var outbox = new InMemoryOutbox(); + + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, writer, outbox, new[] { "Pump1.Name" })); + + rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Name", "text", DateTime.UtcNow)); + + // A string value can't ride the SQL analog write path -> dropped (metered), never appended. + await AwaitAssertAsync(async () => + { + var status = await rec.Ask( + ContinuousHistorizationRecorder.GetStatus.Instance, TimeSpan.FromSeconds(3)); + Assert.Equal(1, status.DroppedNonNumeric); + Assert.Equal(0, status.QueuedDepth); + }); + Assert.Empty(writer.Snapshot()); + } + + /// In-memory double: records every value written and + /// returns as the ack. Thread-safe — the recorder drains off the actor thread. + private sealed class FakeValueWriter : IHistorianValueWriter + { + private readonly Lock _gate = new(); + private readonly List _written = new(); + + public bool Succeed { get; init; } = true; + + public IReadOnlyList Snapshot() + { + lock (_gate) + { + return _written.ToArray(); + } + } + + public Task WriteLiveValuesAsync( + string tag, IReadOnlyList values, CancellationToken ct) + { + lock (_gate) + { + foreach (HistorizationValue v in values) + { + _written.Add(new WrittenValue(tag, v.Value, v.Quality, v.TimestampUtc)); + } + } + + return Task.FromResult(Succeed); + } + } + + private sealed record WrittenValue(string Tag, double Value, ushort Quality, DateTime? TimestampUtc); + + /// In-memory double honouring the FIFO-truncate + /// contract (remove the id plus any older entries + /// ahead of it) and the optional drop-oldest capacity. Thread-safe. + private sealed class InMemoryOutbox : IHistorizationOutbox + { + private readonly Lock _gate = new(); + private readonly List _entries = new(); + private readonly int _capacity; + private long _dropped; + + public InMemoryOutbox(int capacity = 0) => _capacity = capacity; + + public long DroppedCount + { + get + { + lock (_gate) + { + return _dropped; + } + } + } + + public ValueTask AppendAsync(HistorizationOutboxEntry entry, CancellationToken ct) + { + lock (_gate) + { + _entries.Add(entry); + while (_capacity > 0 && _entries.Count > _capacity) + { + _entries.RemoveAt(0); // drop oldest on overflow + _dropped++; + } + } + + return ValueTask.CompletedTask; + } + + public ValueTask> PeekBatchAsync(int max, CancellationToken ct) + { + lock (_gate) + { + IReadOnlyList batch = _entries.Take(max).ToArray(); + return ValueTask.FromResult(batch); + } + } + + public ValueTask RemoveAsync(Guid id, CancellationToken ct) + { + lock (_gate) + { + int idx = _entries.FindIndex(e => e.Id == id); + if (idx >= 0) + { + // FIFO ack: remove the target plus everything ahead of it in the buffer. + _entries.RemoveRange(0, idx + 1); + } + } + + return ValueTask.CompletedTask; + } + + public ValueTask CountAsync(CancellationToken ct) + { + lock (_gate) + { + return ValueTask.FromResult(_entries.Count); + } + } + + public void Dispose() + { + } + } +}