diff --git a/ZB.MOM.WW.OtOpcUa.slnx b/ZB.MOM.WW.OtOpcUa.slnx
index 0bfe61c..2c304cb 100644
--- a/ZB.MOM.WW.OtOpcUa.slnx
+++ b/ZB.MOM.WW.OtOpcUa.slnx
@@ -6,6 +6,7 @@
+
@@ -32,6 +33,7 @@
+
diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/AlarmHistorianEvent.cs b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/AlarmHistorianEvent.cs
new file mode 100644
index 0000000..8c383b0
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/AlarmHistorianEvent.cs
@@ -0,0 +1,36 @@
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
+
+///
+/// The event shape the historian sink consumes — source-agnostic across scripted
+/// alarms + Galaxy-native + AB CIP ALMD + any future IAlarmSource per Phase 7 plan
+/// decision #15 (sink scope = all alarm sources, not just scripted). A per-alarm
+/// HistorizeToAveva toggle on the producer side gates which events flow.
+///
+/// Stable condition identity.
+/// UNS path of the Equipment node the alarm hangs under. Doubles as the "SourceNode" in Historian's alarm schema.
+/// Human-readable alarm name.
+/// Concrete Part 9 subtype — "LimitAlarm" / "DiscreteAlarm" / "OffNormalAlarm" / "AlarmCondition". Used as the Historian "AlarmType" column.
+/// Mapped to Historian's numeric priority on the sink side.
+///
+/// Which state transition this event represents — "Activated" / "Cleared" /
+/// "Acknowledged" / "Confirmed" / "Shelved" / "Unshelved" / "Disabled" / "Enabled" /
+/// "CommentAdded". Free-form string because different alarm sources use different
+/// vocabularies; the Galaxy.Host handler maps to the historian's enum on the wire.
+///
+/// Fully-rendered message text — template tokens already resolved upstream.
+/// Operator who triggered the transition. "system" for engine-driven events (shelving expiry, predicate change).
+/// Operator-supplied free-form text, if any.
+/// When the transition occurred.
+public sealed record AlarmHistorianEvent(
+ string AlarmId,
+ string EquipmentPath,
+ string AlarmName,
+ string AlarmTypeName,
+ AlarmSeverity Severity,
+ string EventKind,
+ string Message,
+ string User,
+ string? Comment,
+ DateTime TimestampUtc);
diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs
new file mode 100644
index 0000000..87fc9a2
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs
@@ -0,0 +1,82 @@
+namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
+
+///
+/// The historian sink contract — where qualifying alarm events land. Phase 7 plan
+/// decision #17: ingestion routes through Galaxy.Host's pipe so we reuse the
+/// already-loaded aahClientManaged DLLs without loading 32-bit native code
+/// in the main .NET 10 server. Tests use an in-memory fake; production uses
+/// .
+///
+///
+///
+/// is fire-and-forget from the engine's perspective —
+/// the sink MUST NOT block the emitting thread. Production implementations
+/// () persist to a local SQLite queue
+/// first, then drain asynchronously to the actual historian. Per Phase 7 plan
+/// decision #16, failed downstream writes replay with exponential backoff;
+/// operator actions are never blocked waiting on the historian.
+///
+///
+/// exposes queue depth + drain rate + last error
+/// for the Admin UI /alarms/historian diagnostics page (Stream F).
+///
+///
+public interface IAlarmHistorianSink
+{
+ /// Durably enqueue the event. Returns as soon as the queue row is committed.
+ Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken);
+
+ /// Snapshot of current queue depth + drain health.
+ HistorianSinkStatus GetStatus();
+}
+
+/// No-op default for tests or deployments that don't historize alarms.
+public sealed class NullAlarmHistorianSink : IAlarmHistorianSink
+{
+ public static readonly NullAlarmHistorianSink Instance = new();
+ public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) => Task.CompletedTask;
+ public HistorianSinkStatus GetStatus() => new(
+ QueueDepth: 0,
+ DeadLetterDepth: 0,
+ LastDrainUtc: null,
+ LastSuccessUtc: null,
+ LastError: null,
+ DrainState: HistorianDrainState.Disabled);
+}
+
+/// Diagnostic snapshot surfaced to the Admin UI + /healthz endpoints.
+public sealed record HistorianSinkStatus(
+ long QueueDepth,
+ long DeadLetterDepth,
+ DateTime? LastDrainUtc,
+ DateTime? LastSuccessUtc,
+ string? LastError,
+ HistorianDrainState DrainState);
+
+/// Where the drain worker is in its state machine.
+public enum HistorianDrainState
+{
+ Disabled,
+ Idle,
+ Draining,
+ BackingOff,
+}
+
+/// Signaled by the Galaxy.Host-side handler when it fails a batch — drain worker uses this to decide retry cadence.
+public enum HistorianWriteOutcome
+{
+ /// Successfully persisted to the historian. Remove from queue.
+ Ack,
+ /// Transient failure (historian disconnected, timeout, busy). Leave queued; retry after backoff.
+ RetryPlease,
+ /// Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter table.
+ PermanentFail,
+}
+
+/// What the drain worker delegates writes to — Stream G wires this to the Galaxy.Host IPC client.
+public interface IAlarmHistorianWriter
+{
+ /// Push a batch of events to the historian. Returns one outcome per event, same order.
+ Task> WriteBatchAsync(
+ IReadOnlyList batch, CancellationToken cancellationToken);
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
new file mode 100644
index 0000000..1b56ba5
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
@@ -0,0 +1,397 @@
+using System.Text.Json;
+using Microsoft.Data.Sqlite;
+using Serilog;
+
+namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
+
+///
+/// Phase 7 plan decisions #16–#17 implementation: durable SQLite queue on the node
+/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host
+/// via on an exponential-backoff cadence, and
+/// operator acks never block on the historian being reachable.
+///
+///
+///
+/// Queue schema:
+///
+/// CREATE TABLE Queue (
+/// RowId INTEGER PRIMARY KEY AUTOINCREMENT,
+/// AlarmId TEXT NOT NULL,
+/// EnqueuedUtc TEXT NOT NULL,
+/// PayloadJson TEXT NOT NULL,
+/// AttemptCount INTEGER NOT NULL DEFAULT 0,
+/// LastAttemptUtc TEXT NULL,
+/// LastError TEXT NULL,
+/// DeadLettered INTEGER NOT NULL DEFAULT 0
+/// );
+///
+/// Dead-lettered rows stay in place for the configured retention window (default
+/// 30 days per Phase 7 plan decision #21) so operators can inspect + manually
+/// retry before the sweeper purges them. Regular queue capacity is bounded —
+/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
+///
+///
+/// Drain runs on a shared . Exponential
+/// backoff on : 1s → 2s → 5s →
+/// 15s → 60s cap. rows flip
+/// the DeadLettered flag on the individual row; neighbors in the batch
+/// still retry on their own cadence.
+///
+///
+public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
+{
+ /// Default queue capacity — oldest non-dead-lettered rows evicted past this.
+ public const long DefaultCapacity = 1_000_000;
+ public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30);
+
+ private static readonly TimeSpan[] BackoffLadder =
+ [
+ TimeSpan.FromSeconds(1),
+ TimeSpan.FromSeconds(2),
+ TimeSpan.FromSeconds(5),
+ TimeSpan.FromSeconds(15),
+ TimeSpan.FromSeconds(60),
+ ];
+
+ private readonly string _connectionString;
+ private readonly IAlarmHistorianWriter _writer;
+ private readonly ILogger _logger;
+ private readonly int _batchSize;
+ private readonly long _capacity;
+ private readonly TimeSpan _deadLetterRetention;
+ private readonly Func _clock;
+
+ private readonly SemaphoreSlim _drainGate = new(1, 1);
+ private Timer? _drainTimer;
+ private int _backoffIndex;
+ private DateTime? _lastDrainUtc;
+ private DateTime? _lastSuccessUtc;
+ private string? _lastError;
+ private HistorianDrainState _drainState = HistorianDrainState.Idle;
+ private bool _disposed;
+
+ public SqliteStoreAndForwardSink(
+ string databasePath,
+ IAlarmHistorianWriter writer,
+ ILogger logger,
+ int batchSize = 100,
+ long capacity = DefaultCapacity,
+ TimeSpan? deadLetterRetention = null,
+ Func? clock = null)
+ {
+ if (string.IsNullOrWhiteSpace(databasePath))
+ throw new ArgumentException("Database path required.", nameof(databasePath));
+ _writer = writer ?? throw new ArgumentNullException(nameof(writer));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize));
+ _capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
+ _deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
+ _clock = clock ?? (() => DateTime.UtcNow);
+ _connectionString = $"Data Source={databasePath}";
+
+ InitializeSchema();
+ }
+
+ ///
+ /// Start the background drain worker. Not started automatically so tests can
+ /// drive deterministically.
+ ///
+ public void StartDrainLoop(TimeSpan tickInterval)
+ {
+ if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
+ _drainTimer?.Dispose();
+ _drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None),
+ null, tickInterval, tickInterval);
+ }
+
+ public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
+ {
+ if (evt is null) throw new ArgumentNullException(nameof(evt));
+ if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
+
+ using var conn = new SqliteConnection(_connectionString);
+ conn.Open();
+
+ EnforceCapacity(conn);
+
+ using var cmd = conn.CreateCommand();
+ cmd.CommandText = """
+ INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount)
+ VALUES ($alarmId, $enqueued, $payload, 0);
+ """;
+ cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId);
+ cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
+ cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
+ cmd.ExecuteNonQuery();
+ return Task.CompletedTask;
+ }
+
+ ///
+ /// Read up to queued rows, forward through the writer,
+ /// remove Ack'd rows, dead-letter PermanentFail rows, and extend the backoff
+ /// on RetryPlease. Safe to call from multiple threads; the semaphore enforces
+ /// serial execution.
+ ///
+ public async Task DrainOnceAsync(CancellationToken ct)
+ {
+ if (_disposed) return;
+ if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return;
+ try
+ {
+ _drainState = HistorianDrainState.Draining;
+ _lastDrainUtc = _clock();
+
+ PurgeAgedDeadLetters();
+ var (rowIds, events) = ReadBatch();
+ if (rowIds.Count == 0)
+ {
+ _drainState = HistorianDrainState.Idle;
+ return;
+ }
+
+ IReadOnlyList outcomes;
+ try
+ {
+ outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false);
+ _lastError = null;
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception ex)
+ {
+ // Writer-side exception — treat entire batch as RetryPlease.
+ _lastError = ex.Message;
+ _logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count);
+ BumpBackoff();
+ _drainState = HistorianDrainState.BackingOff;
+ return;
+ }
+
+ if (outcomes.Count != events.Count)
+ throw new InvalidOperationException(
+ $"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
+
+ using var conn = new SqliteConnection(_connectionString);
+ conn.Open();
+ using var tx = conn.BeginTransaction();
+ for (var i = 0; i < outcomes.Count; i++)
+ {
+ var outcome = outcomes[i];
+ var rowId = rowIds[i];
+ switch (outcome)
+ {
+ case HistorianWriteOutcome.Ack:
+ DeleteRow(conn, tx, rowId);
+ break;
+ case HistorianWriteOutcome.PermanentFail:
+ DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}");
+ break;
+ case HistorianWriteOutcome.RetryPlease:
+ BumpAttempt(conn, tx, rowId, "retry-please");
+ break;
+ }
+ }
+ tx.Commit();
+
+ var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
+ if (acks > 0) _lastSuccessUtc = _clock();
+
+ if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
+ {
+ BumpBackoff();
+ _drainState = HistorianDrainState.BackingOff;
+ }
+ else
+ {
+ ResetBackoff();
+ _drainState = HistorianDrainState.Idle;
+ }
+ }
+ finally
+ {
+ _drainGate.Release();
+ }
+ }
+
+ public HistorianSinkStatus GetStatus()
+ {
+ using var conn = new SqliteConnection(_connectionString);
+ conn.Open();
+
+ long queued;
+ long deadlettered;
+ using (var cmd = conn.CreateCommand())
+ {
+ cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
+ queued = (long)(cmd.ExecuteScalar() ?? 0L);
+ }
+ using (var cmd = conn.CreateCommand())
+ {
+ cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1";
+ deadlettered = (long)(cmd.ExecuteScalar() ?? 0L);
+ }
+
+ return new HistorianSinkStatus(
+ QueueDepth: queued,
+ DeadLetterDepth: deadlettered,
+ LastDrainUtc: _lastDrainUtc,
+ LastSuccessUtc: _lastSuccessUtc,
+ LastError: _lastError,
+ DrainState: _drainState);
+ }
+
+ /// Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.
+ public int RetryDeadLettered()
+ {
+ using var conn = new SqliteConnection(_connectionString);
+ conn.Open();
+ using var cmd = conn.CreateCommand();
+ cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
+ return cmd.ExecuteNonQuery();
+ }
+
+ private (List rowIds, List events) ReadBatch()
+ {
+ var rowIds = new List();
+ var events = new List();
+ using var conn = new SqliteConnection(_connectionString);
+ conn.Open();
+ using var cmd = conn.CreateCommand();
+ cmd.CommandText = """
+ SELECT RowId, PayloadJson FROM Queue
+ WHERE DeadLettered = 0
+ ORDER BY RowId ASC
+ LIMIT $limit
+ """;
+ cmd.Parameters.AddWithValue("$limit", _batchSize);
+ using var reader = cmd.ExecuteReader();
+ while (reader.Read())
+ {
+ rowIds.Add(reader.GetInt64(0));
+ var payload = reader.GetString(1);
+ var evt = JsonSerializer.Deserialize(payload);
+ if (evt is not null) events.Add(evt);
+ }
+ return (rowIds, events);
+ }
+
+ private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId)
+ {
+ using var cmd = conn.CreateCommand();
+ cmd.Transaction = tx;
+ cmd.CommandText = "DELETE FROM Queue WHERE RowId = $id";
+ cmd.Parameters.AddWithValue("$id", rowId);
+ cmd.ExecuteNonQuery();
+ }
+
+ private void DeadLetterRow(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
+ {
+ using var cmd = conn.CreateCommand();
+ cmd.Transaction = tx;
+ cmd.CommandText = """
+ UPDATE Queue SET DeadLettered = 1, LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
+ WHERE RowId = $id
+ """;
+ cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
+ cmd.Parameters.AddWithValue("$err", reason);
+ cmd.Parameters.AddWithValue("$id", rowId);
+ cmd.ExecuteNonQuery();
+ }
+
+ private void BumpAttempt(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
+ {
+ using var cmd = conn.CreateCommand();
+ cmd.Transaction = tx;
+ cmd.CommandText = """
+ UPDATE Queue SET LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
+ WHERE RowId = $id
+ """;
+ cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
+ cmd.Parameters.AddWithValue("$err", reason);
+ cmd.Parameters.AddWithValue("$id", rowId);
+ cmd.ExecuteNonQuery();
+ }
+
+ private void EnforceCapacity(SqliteConnection conn)
+ {
+ // Count non-dead-lettered rows only — dead-lettered rows retain for
+ // post-mortem per the configured retention window.
+ long count;
+ using (var cmd = conn.CreateCommand())
+ {
+ cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
+ count = (long)(cmd.ExecuteScalar() ?? 0L);
+ }
+ if (count < _capacity) return;
+
+ var toEvict = count - _capacity + 1;
+ using (var cmd = conn.CreateCommand())
+ {
+ cmd.CommandText = """
+ DELETE FROM Queue
+ WHERE RowId IN (
+ SELECT RowId FROM Queue
+ WHERE DeadLettered = 0
+ ORDER BY RowId ASC
+ LIMIT $n
+ )
+ """;
+ cmd.Parameters.AddWithValue("$n", toEvict);
+ cmd.ExecuteNonQuery();
+ }
+ _logger.Warning(
+ "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room",
+ _capacity, toEvict);
+ }
+
+ private void PurgeAgedDeadLetters()
+ {
+ var cutoff = (_clock() - _deadLetterRetention).ToString("O");
+ using var conn = new SqliteConnection(_connectionString);
+ conn.Open();
+ using var cmd = conn.CreateCommand();
+ cmd.CommandText = """
+ DELETE FROM Queue
+ WHERE DeadLettered = 1 AND LastAttemptUtc IS NOT NULL AND LastAttemptUtc < $cutoff
+ """;
+ cmd.Parameters.AddWithValue("$cutoff", cutoff);
+ var purged = cmd.ExecuteNonQuery();
+ if (purged > 0)
+ _logger.Information("Purged {Count} dead-lettered row(s) past retention window", purged);
+ }
+
+ private void InitializeSchema()
+ {
+ using var conn = new SqliteConnection(_connectionString);
+ conn.Open();
+ using var cmd = conn.CreateCommand();
+ cmd.CommandText = """
+ CREATE TABLE IF NOT EXISTS Queue (
+ RowId INTEGER PRIMARY KEY AUTOINCREMENT,
+ AlarmId TEXT NOT NULL,
+ EnqueuedUtc TEXT NOT NULL,
+ PayloadJson TEXT NOT NULL,
+ AttemptCount INTEGER NOT NULL DEFAULT 0,
+ LastAttemptUtc TEXT NULL,
+ LastError TEXT NULL,
+ DeadLettered INTEGER NOT NULL DEFAULT 0
+ );
+ CREATE INDEX IF NOT EXISTS IX_Queue_Drain ON Queue (DeadLettered, RowId);
+ """;
+ cmd.ExecuteNonQuery();
+ }
+
+ private void BumpBackoff() => _backoffIndex = Math.Min(_backoffIndex + 1, BackoffLadder.Length - 1);
+ private void ResetBackoff() => _backoffIndex = 0;
+ public TimeSpan CurrentBackoff => BackoffLadder[_backoffIndex];
+
+ public void Dispose()
+ {
+ if (_disposed) return;
+ _disposed = true;
+ _drainTimer?.Dispose();
+ _drainGate.Dispose();
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj
new file mode 100644
index 0000000..175475d
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj
@@ -0,0 +1,32 @@
+
+
+
+ net10.0
+ enable
+ enable
+ latest
+ true
+ true
+ $(NoWarn);CS1591
+ ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs
index 09db862..193d771 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs
@@ -60,6 +60,14 @@ public enum MessageKind : byte
HostConnectivityStatus = 0x70,
RuntimeStatusChange = 0x71,
+ // Phase 7 Stream D — historian alarm sink. Main server → Galaxy.Host batched
+ // writes into the Aveva Historian alarm schema via the already-loaded
+ // aahClientManaged DLLs. HistorianConnectivityStatus fires proactively from the
+ // Host when the SDK session transitions so diagnostics flip promptly.
+ HistorianAlarmEventRequest = 0x80,
+ HistorianAlarmEventResponse = 0x81,
+ HistorianConnectivityStatus = 0x82,
+
RecycleHostRequest = 0xF0,
RecycleStatusResponse = 0xF1,
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/HistorianAlarms.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/HistorianAlarms.cs
new file mode 100644
index 0000000..6719cdd
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/HistorianAlarms.cs
@@ -0,0 +1,92 @@
+using System;
+using MessagePack;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
+
+///
+/// Phase 7 Stream D — IPC contracts for routing Part 9 alarm transitions from the
+/// main .NET 10 server into Galaxy.Host's already-loaded aahClientManaged
+/// DLLs. Reuses the Tier-C isolation + licensing pathway rather than loading 32-bit
+/// native historian code into the main server.
+///
+///
+///
+/// Batched on the wire to amortize IPC overhead — the main server's SqliteStoreAndForwardSink
+/// ships up to 100 events per request per Phase 7 plan Stream D.5.
+///
+///
+/// Per-event outcomes (Ack / RetryPlease / PermanentFail) let the drain worker
+/// dead-letter malformed events without blocking neighbors in the batch.
+/// fires proactively from
+/// the Host when the SDK session drops so the /hosts + /alarms/historian Admin
+/// diagnostics pages flip to red promptly instead of waiting for the next
+/// drain cycle.
+///
+///
+[MessagePackObject]
+public sealed class HistorianAlarmEventRequest
+{
+ [Key(0)] public HistorianAlarmEventDto[] Events { get; set; } = Array.Empty();
+}
+
+[MessagePackObject]
+public sealed class HistorianAlarmEventResponse
+{
+ /// Per-event outcome, same order as the request.
+ [Key(0)] public HistorianAlarmEventOutcomeDto[] Outcomes { get; set; } = Array.Empty();
+}
+
+/// Outcome enum — bytes on the wire so it stays compact.
+public enum HistorianAlarmEventOutcomeDto : byte
+{
+ /// Successfully persisted to the historian — remove from queue.
+ Ack = 0,
+ /// Transient failure (historian disconnected, timeout, busy) — retry after backoff.
+ RetryPlease = 1,
+ /// Permanent failure (malformed, unrecoverable SDK error) — move to dead-letter.
+ PermanentFail = 2,
+}
+
+/// One alarm-transition payload. Fields mirror Core.AlarmHistorian.AlarmHistorianEvent.
+[MessagePackObject]
+public sealed class HistorianAlarmEventDto
+{
+ [Key(0)] public string AlarmId { get; set; } = string.Empty;
+ [Key(1)] public string EquipmentPath { get; set; } = string.Empty;
+ [Key(2)] public string AlarmName { get; set; } = string.Empty;
+
+ /// Concrete Part 9 subtype name — "LimitAlarm" / "OffNormalAlarm" / "AlarmCondition" / "DiscreteAlarm".
+ [Key(3)] public string AlarmTypeName { get; set; } = string.Empty;
+
+ /// Numeric severity the Host maps to the historian's priority scale.
+ [Key(4)] public int Severity { get; set; }
+
+ /// Which transition this event represents — "Activated" / "Cleared" / "Acknowledged" / etc.
+ [Key(5)] public string EventKind { get; set; } = string.Empty;
+
+ /// Pre-rendered message — template tokens resolved upstream.
+ [Key(6)] public string Message { get; set; } = string.Empty;
+
+ /// Operator who triggered the transition. "system" for engine-driven events.
+ [Key(7)] public string User { get; set; } = "system";
+
+ /// Operator-supplied free-form comment, if any.
+ [Key(8)] public string? Comment { get; set; }
+
+ /// Source timestamp (UTC Unix milliseconds).
+ [Key(9)] public long TimestampUtcUnixMs { get; set; }
+}
+
+///
+/// Proactive notification — Galaxy.Host pushes this when the historian SDK session
+/// transitions (connected / disconnected / degraded). The main server reflects this
+/// into the historian sink status so Admin UI surfaces the problem without the
+/// operator having to scrutinize drain cadence.
+///
+[MessagePackObject]
+public sealed class HistorianConnectivityStatusNotification
+{
+ [Key(0)] public string Status { get; set; } = "unknown"; // connected | disconnected | degraded
+ [Key(1)] public string? Detail { get; set; }
+ [Key(2)] public long ObservedAtUtcUnixMs { get; set; }
+}
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
new file mode 100644
index 0000000..7ed87f1
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
@@ -0,0 +1,286 @@
+using Serilog;
+using Serilog.Core;
+using Serilog.Events;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests;
+
+///
+/// Verifies the durable SQLite store-and-forward queue behind the historian sink:
+/// round-trip Ack, backoff ladder on RetryPlease, dead-lettering on PermanentFail,
+/// capacity eviction, and retention-based dead-letter purge.
+///
+[Trait("Category", "Unit")]
+public sealed class SqliteStoreAndForwardSinkTests : IDisposable
+{
+ private readonly string _dbPath;
+ private readonly ILogger _log;
+
+ public SqliteStoreAndForwardSinkTests()
+ {
+ _dbPath = Path.Combine(Path.GetTempPath(), $"otopcua-historian-{Guid.NewGuid():N}.sqlite");
+ _log = new LoggerConfiguration().MinimumLevel.Verbose().CreateLogger();
+ }
+
+ public void Dispose()
+ {
+ try { if (File.Exists(_dbPath)) File.Delete(_dbPath); } catch { }
+ }
+
+ private sealed class FakeWriter : IAlarmHistorianWriter
+ {
+ public Queue NextOutcomePerEvent { get; } = new();
+ public HistorianWriteOutcome DefaultOutcome { get; set; } = HistorianWriteOutcome.Ack;
+ public List> Batches { get; } = [];
+ public Exception? ThrowOnce { get; set; }
+
+ public Task> WriteBatchAsync(
+ IReadOnlyList batch, CancellationToken ct)
+ {
+ if (ThrowOnce is not null)
+ {
+ var e = ThrowOnce;
+ ThrowOnce = null;
+ throw e;
+ }
+ Batches.Add(batch);
+ var outcomes = new List();
+ for (var i = 0; i < batch.Count; i++)
+ outcomes.Add(NextOutcomePerEvent.Count > 0 ? NextOutcomePerEvent.Dequeue() : DefaultOutcome);
+ return Task.FromResult>(outcomes);
+ }
+ }
+
+ private static AlarmHistorianEvent Event(string alarmId, DateTime? ts = null) => new(
+ AlarmId: alarmId,
+ EquipmentPath: "/Site/Line1/Cell",
+ AlarmName: "HighTemp",
+ AlarmTypeName: "LimitAlarm",
+ Severity: AlarmSeverity.High,
+ EventKind: "Activated",
+ Message: "temp exceeded",
+ User: "system",
+ Comment: null,
+ TimestampUtc: ts ?? DateTime.UtcNow);
+
+ [Fact]
+ public async Task EnqueueThenDrain_Ack_removes_row()
+ {
+ var writer = new FakeWriter();
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
+ sink.GetStatus().QueueDepth.ShouldBe(1);
+
+ await sink.DrainOnceAsync(CancellationToken.None);
+
+ writer.Batches.Count.ShouldBe(1);
+ writer.Batches[0].Count.ShouldBe(1);
+ writer.Batches[0][0].AlarmId.ShouldBe("A1");
+ var status = sink.GetStatus();
+ status.QueueDepth.ShouldBe(0);
+ status.DeadLetterDepth.ShouldBe(0);
+ status.LastSuccessUtc.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public async Task Drain_with_empty_queue_is_noop()
+ {
+ var writer = new FakeWriter();
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.DrainOnceAsync(CancellationToken.None);
+
+ writer.Batches.ShouldBeEmpty();
+ sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.Idle);
+ }
+
+ [Fact]
+ public async Task RetryPlease_bumps_backoff_and_keeps_row()
+ {
+ var writer = new FakeWriter();
+ writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease);
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
+ var before = sink.CurrentBackoff;
+ await sink.DrainOnceAsync(CancellationToken.None);
+
+ sink.CurrentBackoff.ShouldBeGreaterThan(before);
+ sink.GetStatus().QueueDepth.ShouldBe(1, "row stays in queue for retry");
+ sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.BackingOff);
+ }
+
+ [Fact]
+ public async Task Ack_after_Retry_resets_backoff()
+ {
+ var writer = new FakeWriter();
+ writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease);
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
+ await sink.DrainOnceAsync(CancellationToken.None);
+ sink.CurrentBackoff.ShouldBeGreaterThan(TimeSpan.FromSeconds(1) - TimeSpan.FromMilliseconds(1));
+
+ writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack);
+ await sink.DrainOnceAsync(CancellationToken.None);
+
+ sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(1));
+ sink.GetStatus().QueueDepth.ShouldBe(0);
+ }
+
+ [Fact]
+ public async Task PermanentFail_dead_letters_one_row_only()
+ {
+ var writer = new FakeWriter();
+ writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
+ writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack);
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
+ await sink.EnqueueAsync(Event("good"), CancellationToken.None);
+ await sink.DrainOnceAsync(CancellationToken.None);
+
+ var status = sink.GetStatus();
+ status.QueueDepth.ShouldBe(0, "good row acked");
+ status.DeadLetterDepth.ShouldBe(1, "bad row dead-lettered");
+ }
+
+ [Fact]
+ public async Task Writer_exception_treated_as_retry_for_whole_batch()
+ {
+ var writer = new FakeWriter { ThrowOnce = new InvalidOperationException("pipe broken") };
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
+ await sink.DrainOnceAsync(CancellationToken.None);
+
+ var status = sink.GetStatus();
+ status.QueueDepth.ShouldBe(1);
+ status.LastError.ShouldBe("pipe broken");
+ status.DrainState.ShouldBe(HistorianDrainState.BackingOff);
+
+ // Next drain after the writer recovers should Ack.
+ await sink.DrainOnceAsync(CancellationToken.None);
+ sink.GetStatus().QueueDepth.ShouldBe(0);
+ }
+
+ [Fact]
+ public async Task Capacity_eviction_drops_oldest_nondeadlettered_row()
+ {
+ var writer = new FakeWriter();
+ using var sink = new SqliteStoreAndForwardSink(
+ _dbPath, writer, _log, batchSize: 100, capacity: 3);
+
+ await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
+ await sink.EnqueueAsync(Event("A2"), CancellationToken.None);
+ await sink.EnqueueAsync(Event("A3"), CancellationToken.None);
+ // A4 enqueue must evict the oldest (A1).
+ await sink.EnqueueAsync(Event("A4"), CancellationToken.None);
+
+ sink.GetStatus().QueueDepth.ShouldBe(3);
+
+ await sink.DrainOnceAsync(CancellationToken.None);
+ var drained = writer.Batches[0].Select(e => e.AlarmId).ToArray();
+ drained.ShouldNotContain("A1");
+ drained.ShouldContain("A2");
+ drained.ShouldContain("A3");
+ drained.ShouldContain("A4");
+ }
+
+ [Fact]
+ public async Task Deadlettered_rows_are_purged_past_retention()
+ {
+ var now = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ DateTime clock = now;
+
+ var writer = new FakeWriter();
+ writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
+ using var sink = new SqliteStoreAndForwardSink(
+ _dbPath, writer, _log, deadLetterRetention: TimeSpan.FromDays(30),
+ clock: () => clock);
+
+ await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
+ await sink.DrainOnceAsync(CancellationToken.None);
+ sink.GetStatus().DeadLetterDepth.ShouldBe(1);
+
+ // Advance past retention + tick drain (which runs PurgeAgedDeadLetters).
+ clock = now.AddDays(31);
+ await sink.DrainOnceAsync(CancellationToken.None);
+
+ sink.GetStatus().DeadLetterDepth.ShouldBe(0, "purged past retention");
+ }
+
+ [Fact]
+ public async Task RetryDeadLettered_requeues_for_retry()
+ {
+ var writer = new FakeWriter();
+ writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
+ await sink.DrainOnceAsync(CancellationToken.None);
+ sink.GetStatus().DeadLetterDepth.ShouldBe(1);
+
+ var revived = sink.RetryDeadLettered();
+ revived.ShouldBe(1);
+
+ var status = sink.GetStatus();
+ status.QueueDepth.ShouldBe(1);
+ status.DeadLetterDepth.ShouldBe(0);
+ }
+
+ [Fact]
+ public async Task Backoff_ladder_caps_at_60s()
+ {
+ var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease };
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
+
+ // 10 retry rounds — ladder should cap at 60s.
+ for (var i = 0; i < 10; i++)
+ await sink.DrainOnceAsync(CancellationToken.None);
+
+ sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60));
+ }
+
+ [Fact]
+ public void NullAlarmHistorianSink_reports_disabled_status()
+ {
+ var s = NullAlarmHistorianSink.Instance.GetStatus();
+ s.DrainState.ShouldBe(HistorianDrainState.Disabled);
+ s.QueueDepth.ShouldBe(0);
+ }
+
+ [Fact]
+ public async Task NullAlarmHistorianSink_swallows_enqueue()
+ {
+ // Should not throw or persist anything.
+ await NullAlarmHistorianSink.Instance.EnqueueAsync(Event("A1"), CancellationToken.None);
+ }
+
+ [Fact]
+ public void Ctor_rejects_bad_args()
+ {
+ var w = new FakeWriter();
+ Should.Throw(() => new SqliteStoreAndForwardSink("", w, _log));
+ Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, null!, _log));
+ Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, null!));
+ Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0));
+ Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0));
+ }
+
+ [Fact]
+ public async Task Disposed_sink_rejects_enqueue()
+ {
+ var writer = new FakeWriter();
+ var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+ sink.Dispose();
+
+ await Should.ThrowAsync(
+ () => sink.EnqueueAsync(Event("A1"), CancellationToken.None));
+ }
+}
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj
new file mode 100644
index 0000000..5f3cc4f
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj
@@ -0,0 +1,31 @@
+
+
+
+ net10.0
+ enable
+ enable
+ false
+ true
+ ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
+
+
+
+
+
+