Phase 7 plan decisions #16, #17, #19, #21 implementation. Durable local SQLite queue absorbs every qualifying alarm event; drain worker forwards batches to Galaxy.Host (reusing the already-loaded 32-bit aahClientManaged DLLs) on an exponential-backoff cadence; operator acks never block on the historian being reachable. ## New project Core.AlarmHistorian (net10) - AlarmHistorianEvent — source-agnostic event shape (scripted alarms + Galaxy-native + AB CIP ALMD + any future IAlarmSource) - IAlarmHistorianSink / NullAlarmHistorianSink — interface + disabled default - IAlarmHistorianWriter — per-event outcome (Ack / RetryPlease / PermanentFail); Stream G wires the Galaxy.Host IPC client implementation - SqliteStoreAndForwardSink — full implementation: - Queue table with AttemptCount / LastError / DeadLettered columns - DrainOnceAsync serialised via SemaphoreSlim - BackoffLadder 1s → 2s → 5s → 15s → 60s (cap) - DefaultCapacity 1,000,000 rows — overflow evicts oldest non-dead-lettered - DefaultDeadLetterRetention 30 days — sweeper purges on every drain tick - RetryDeadLettered operator action reattaches dead-letters to the regular queue - Writer-side exceptions treated as whole-batch RetryPlease (no data loss) ## New IPC contracts in Driver.Galaxy.Shared - HistorianAlarmEventRequest — batched up to 100 events/request per plan Stream D.5 - HistorianAlarmEventResponse — per-event outcome (1:1 with request order) - HistorianAlarmEventOutcomeDto enum (byte on the wire — Ack/RetryPlease/PermanentFail) - HistorianAlarmEventDto — mirrors Core.AlarmHistorian.AlarmHistorianEvent - HistorianConnectivityStatusNotification — Host pushes proactively when the SDK session drops so /alarms/historian flips red without waiting for the next drain - MessageKind additions: 0x80 HistorianAlarmEventRequest / 0x81 HistorianAlarmEventResponse / 0x82 HistorianConnectivityStatus ## Tests — 14/14 SqliteStoreAndForwardSinkTests covers: enqueue→drain→Ack round-trip, empty-queue no-op, RetryPlease bumps backoff + keeps row, Ack after Retry resets backoff, PermanentFail dead-letters one row without blocking neighbors, writer exception treated as whole-batch retry with error surfaced in status, capacity eviction drops oldest non-dead-lettered, dead-letters purged past retention window, RetryDeadLettered requeues, ladder caps at 60s after 10 retries, Null sink reports Disabled status, null sink swallows enqueue, ctor argument validation, disposed sink rejects enqueue. ## Totals Full Phase 7 tests: 160 green (63 Scripting + 36 VirtualTags + 47 ScriptedAlarms + 14 AlarmHistorian). Stream G wires this into the real Galaxy.Host IPC pipe.
83 lines
3.5 KiB
C#
83 lines
3.5 KiB
C#
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
|
|
|
/// <summary>
|
|
/// The historian sink contract — where qualifying alarm events land. Phase 7 plan
|
|
/// decision #17: ingestion routes through Galaxy.Host's pipe so we reuse the
|
|
/// already-loaded <c>aahClientManaged</c> DLLs without loading 32-bit native code
|
|
/// in the main .NET 10 server. Tests use an in-memory fake; production uses
|
|
/// <see cref="SqliteStoreAndForwardSink"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// <see cref="EnqueueAsync"/> is fire-and-forget from the engine's perspective —
|
|
/// the sink MUST NOT block the emitting thread. Production implementations
|
|
/// (<see cref="SqliteStoreAndForwardSink"/>) persist to a local SQLite queue
|
|
/// first, then drain asynchronously to the actual historian. Per Phase 7 plan
|
|
/// decision #16, failed downstream writes replay with exponential backoff;
|
|
/// operator actions are never blocked waiting on the historian.
|
|
/// </para>
|
|
/// <para>
|
|
/// <see cref="GetStatus"/> exposes queue depth + drain rate + last error
|
|
/// for the Admin UI <c>/alarms/historian</c> diagnostics page (Stream F).
|
|
/// </para>
|
|
/// </remarks>
|
|
public interface IAlarmHistorianSink
|
|
{
|
|
/// <summary>Durably enqueue the event. Returns as soon as the queue row is committed.</summary>
|
|
Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken);
|
|
|
|
/// <summary>Snapshot of current queue depth + drain health.</summary>
|
|
HistorianSinkStatus GetStatus();
|
|
}
|
|
|
|
/// <summary>No-op default for tests or deployments that don't historize alarms.</summary>
|
|
public sealed class NullAlarmHistorianSink : IAlarmHistorianSink
|
|
{
|
|
public static readonly NullAlarmHistorianSink Instance = new();
|
|
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) => Task.CompletedTask;
|
|
public HistorianSinkStatus GetStatus() => new(
|
|
QueueDepth: 0,
|
|
DeadLetterDepth: 0,
|
|
LastDrainUtc: null,
|
|
LastSuccessUtc: null,
|
|
LastError: null,
|
|
DrainState: HistorianDrainState.Disabled);
|
|
}
|
|
|
|
/// <summary>Diagnostic snapshot surfaced to the Admin UI + /healthz endpoints.</summary>
|
|
public sealed record HistorianSinkStatus(
|
|
long QueueDepth,
|
|
long DeadLetterDepth,
|
|
DateTime? LastDrainUtc,
|
|
DateTime? LastSuccessUtc,
|
|
string? LastError,
|
|
HistorianDrainState DrainState);
|
|
|
|
/// <summary>Where the drain worker is in its state machine.</summary>
|
|
public enum HistorianDrainState
|
|
{
|
|
Disabled,
|
|
Idle,
|
|
Draining,
|
|
BackingOff,
|
|
}
|
|
|
|
/// <summary>Signaled by the Galaxy.Host-side handler when it fails a batch — drain worker uses this to decide retry cadence.</summary>
|
|
public enum HistorianWriteOutcome
|
|
{
|
|
/// <summary>Successfully persisted to the historian. Remove from queue.</summary>
|
|
Ack,
|
|
/// <summary>Transient failure (historian disconnected, timeout, busy). Leave queued; retry after backoff.</summary>
|
|
RetryPlease,
|
|
/// <summary>Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter table.</summary>
|
|
PermanentFail,
|
|
}
|
|
|
|
/// <summary>What the drain worker delegates writes to — Stream G wires this to the Galaxy.Host IPC client.</summary>
|
|
public interface IAlarmHistorianWriter
|
|
{
|
|
/// <summary>Push a batch of events to the historian. Returns one outcome per event, same order.</summary>
|
|
Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
|
|
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken);
|
|
}
|