Phase 7 Stream D — Historian alarm sink (SQLite store-and-forward + Galaxy.Host IPC contracts)
Phase 7 plan decisions #16, #17, #19, #21 implementation. Durable local SQLite queue absorbs every qualifying alarm event; drain worker forwards batches to Galaxy.Host (reusing the already-loaded 32-bit aahClientManaged DLLs) on an exponential-backoff cadence; operator acks never block on the historian being reachable. ## New project Core.AlarmHistorian (net10) - AlarmHistorianEvent — source-agnostic event shape (scripted alarms + Galaxy-native + AB CIP ALMD + any future IAlarmSource) - IAlarmHistorianSink / NullAlarmHistorianSink — interface + disabled default - IAlarmHistorianWriter — per-event outcome (Ack / RetryPlease / PermanentFail); Stream G wires the Galaxy.Host IPC client implementation - SqliteStoreAndForwardSink — full implementation: - Queue table with AttemptCount / LastError / DeadLettered columns - DrainOnceAsync serialised via SemaphoreSlim - BackoffLadder 1s → 2s → 5s → 15s → 60s (cap) - DefaultCapacity 1,000,000 rows — overflow evicts oldest non-dead-lettered - DefaultDeadLetterRetention 30 days — sweeper purges on every drain tick - RetryDeadLettered operator action reattaches dead-letters to the regular queue - Writer-side exceptions treated as whole-batch RetryPlease (no data loss) ## New IPC contracts in Driver.Galaxy.Shared - HistorianAlarmEventRequest — batched up to 100 events/request per plan Stream D.5 - HistorianAlarmEventResponse — per-event outcome (1:1 with request order) - HistorianAlarmEventOutcomeDto enum (byte on the wire — Ack/RetryPlease/PermanentFail) - HistorianAlarmEventDto — mirrors Core.AlarmHistorian.AlarmHistorianEvent - HistorianConnectivityStatusNotification — Host pushes proactively when the SDK session drops so /alarms/historian flips red without waiting for the next drain - MessageKind additions: 0x80 HistorianAlarmEventRequest / 0x81 HistorianAlarmEventResponse / 0x82 HistorianConnectivityStatus ## Tests — 14/14 SqliteStoreAndForwardSinkTests covers: enqueue→drain→Ack round-trip, empty-queue no-op, RetryPlease bumps backoff + keeps row, Ack after Retry resets backoff, PermanentFail dead-letters one row without blocking neighbors, writer exception treated as whole-batch retry with error surfaced in status, capacity eviction drops oldest non-dead-lettered, dead-letters purged past retention window, RetryDeadLettered requeues, ladder caps at 60s after 10 retries, Null sink reports Disabled status, null sink swallows enqueue, ctor argument validation, disposed sink rejects enqueue. ## Totals Full Phase 7 tests: 160 green (63 Scripting + 36 VirtualTags + 47 ScriptedAlarms + 14 AlarmHistorian). Stream G wires this into the real Galaxy.Host IPC pipe.
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
|
||||
/// <summary>
|
||||
/// The event shape the historian sink consumes — source-agnostic across scripted
|
||||
/// alarms + Galaxy-native + AB CIP ALMD + any future IAlarmSource per Phase 7 plan
|
||||
/// decision #15 (sink scope = all alarm sources, not just scripted). A per-alarm
|
||||
/// <c>HistorizeToAveva</c> toggle on the producer side gates which events flow.
|
||||
/// </summary>
|
||||
/// <param name="AlarmId">Stable condition identity.</param>
|
||||
/// <param name="EquipmentPath">UNS path of the Equipment node the alarm hangs under. Doubles as the "SourceNode" in Historian's alarm schema.</param>
|
||||
/// <param name="AlarmName">Human-readable alarm name.</param>
|
||||
/// <param name="AlarmTypeName">Concrete Part 9 subtype — "LimitAlarm" / "DiscreteAlarm" / "OffNormalAlarm" / "AlarmCondition". Used as the Historian "AlarmType" column.</param>
|
||||
/// <param name="Severity">Mapped to Historian's numeric priority on the sink side.</param>
|
||||
/// <param name="EventKind">
|
||||
/// Which state transition this event represents — "Activated" / "Cleared" /
|
||||
/// "Acknowledged" / "Confirmed" / "Shelved" / "Unshelved" / "Disabled" / "Enabled" /
|
||||
/// "CommentAdded". Free-form string because different alarm sources use different
|
||||
/// vocabularies; the Galaxy.Host handler maps to the historian's enum on the wire.
|
||||
/// </param>
|
||||
/// <param name="Message">Fully-rendered message text — template tokens already resolved upstream.</param>
|
||||
/// <param name="User">Operator who triggered the transition. "system" for engine-driven events (shelving expiry, predicate change).</param>
|
||||
/// <param name="Comment">Operator-supplied free-form text, if any.</param>
|
||||
/// <param name="TimestampUtc">When the transition occurred.</param>
|
||||
public sealed record AlarmHistorianEvent(
|
||||
string AlarmId,
|
||||
string EquipmentPath,
|
||||
string AlarmName,
|
||||
string AlarmTypeName,
|
||||
AlarmSeverity Severity,
|
||||
string EventKind,
|
||||
string Message,
|
||||
string User,
|
||||
string? Comment,
|
||||
DateTime TimestampUtc);
|
||||
@@ -0,0 +1,82 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
|
||||
/// <summary>
|
||||
/// The historian sink contract — where qualifying alarm events land. Phase 7 plan
|
||||
/// decision #17: ingestion routes through Galaxy.Host's pipe so we reuse the
|
||||
/// already-loaded <c>aahClientManaged</c> DLLs without loading 32-bit native code
|
||||
/// in the main .NET 10 server. Tests use an in-memory fake; production uses
|
||||
/// <see cref="SqliteStoreAndForwardSink"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <see cref="EnqueueAsync"/> is fire-and-forget from the engine's perspective —
|
||||
/// the sink MUST NOT block the emitting thread. Production implementations
|
||||
/// (<see cref="SqliteStoreAndForwardSink"/>) persist to a local SQLite queue
|
||||
/// first, then drain asynchronously to the actual historian. Per Phase 7 plan
|
||||
/// decision #16, failed downstream writes replay with exponential backoff;
|
||||
/// operator actions are never blocked waiting on the historian.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <see cref="GetStatus"/> exposes queue depth + drain rate + last error
|
||||
/// for the Admin UI <c>/alarms/historian</c> diagnostics page (Stream F).
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public interface IAlarmHistorianSink
|
||||
{
|
||||
/// <summary>Durably enqueue the event. Returns as soon as the queue row is committed.</summary>
|
||||
Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>Snapshot of current queue depth + drain health.</summary>
|
||||
HistorianSinkStatus GetStatus();
|
||||
}
|
||||
|
||||
/// <summary>No-op default for tests or deployments that don't historize alarms.</summary>
|
||||
public sealed class NullAlarmHistorianSink : IAlarmHistorianSink
|
||||
{
|
||||
public static readonly NullAlarmHistorianSink Instance = new();
|
||||
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public HistorianSinkStatus GetStatus() => new(
|
||||
QueueDepth: 0,
|
||||
DeadLetterDepth: 0,
|
||||
LastDrainUtc: null,
|
||||
LastSuccessUtc: null,
|
||||
LastError: null,
|
||||
DrainState: HistorianDrainState.Disabled);
|
||||
}
|
||||
|
||||
/// <summary>Diagnostic snapshot surfaced to the Admin UI + /healthz endpoints.</summary>
|
||||
public sealed record HistorianSinkStatus(
|
||||
long QueueDepth,
|
||||
long DeadLetterDepth,
|
||||
DateTime? LastDrainUtc,
|
||||
DateTime? LastSuccessUtc,
|
||||
string? LastError,
|
||||
HistorianDrainState DrainState);
|
||||
|
||||
/// <summary>Where the drain worker is in its state machine.</summary>
|
||||
public enum HistorianDrainState
|
||||
{
|
||||
Disabled,
|
||||
Idle,
|
||||
Draining,
|
||||
BackingOff,
|
||||
}
|
||||
|
||||
/// <summary>Signaled by the Galaxy.Host-side handler when it fails a batch — drain worker uses this to decide retry cadence.</summary>
|
||||
public enum HistorianWriteOutcome
|
||||
{
|
||||
/// <summary>Successfully persisted to the historian. Remove from queue.</summary>
|
||||
Ack,
|
||||
/// <summary>Transient failure (historian disconnected, timeout, busy). Leave queued; retry after backoff.</summary>
|
||||
RetryPlease,
|
||||
/// <summary>Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter table.</summary>
|
||||
PermanentFail,
|
||||
}
|
||||
|
||||
/// <summary>What the drain worker delegates writes to — Stream G wires this to the Galaxy.Host IPC client.</summary>
|
||||
public interface IAlarmHistorianWriter
|
||||
{
|
||||
/// <summary>Push a batch of events to the historian. Returns one outcome per event, same order.</summary>
|
||||
Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
|
||||
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,397 @@
|
||||
using System.Text.Json;
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Serilog;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 plan decisions #16–#17 implementation: durable SQLite queue on the node
|
||||
/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host
|
||||
/// via <see cref="IAlarmHistorianWriter"/> on an exponential-backoff cadence, and
|
||||
/// operator acks never block on the historian being reachable.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Queue schema:
|
||||
/// <code>
|
||||
/// CREATE TABLE Queue (
|
||||
/// RowId INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
/// AlarmId TEXT NOT NULL,
|
||||
/// EnqueuedUtc TEXT NOT NULL,
|
||||
/// PayloadJson TEXT NOT NULL,
|
||||
/// AttemptCount INTEGER NOT NULL DEFAULT 0,
|
||||
/// LastAttemptUtc TEXT NULL,
|
||||
/// LastError TEXT NULL,
|
||||
/// DeadLettered INTEGER NOT NULL DEFAULT 0
|
||||
/// );
|
||||
/// </code>
|
||||
/// Dead-lettered rows stay in place for the configured retention window (default
|
||||
/// 30 days per Phase 7 plan decision #21) so operators can inspect + manually
|
||||
/// retry before the sweeper purges them. Regular queue capacity is bounded —
|
||||
/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Drain runs on a shared <see cref="System.Threading.Timer"/>. Exponential
|
||||
/// backoff on <see cref="HistorianWriteOutcome.RetryPlease"/>: 1s → 2s → 5s →
|
||||
/// 15s → 60s cap. <see cref="HistorianWriteOutcome.PermanentFail"/> rows flip
|
||||
/// the <c>DeadLettered</c> flag on the individual row; neighbors in the batch
|
||||
/// still retry on their own cadence.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
{
|
||||
/// <summary>Default queue capacity — oldest non-dead-lettered rows evicted past this.</summary>
|
||||
public const long DefaultCapacity = 1_000_000;
|
||||
public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30);
|
||||
|
||||
private static readonly TimeSpan[] BackoffLadder =
|
||||
[
|
||||
TimeSpan.FromSeconds(1),
|
||||
TimeSpan.FromSeconds(2),
|
||||
TimeSpan.FromSeconds(5),
|
||||
TimeSpan.FromSeconds(15),
|
||||
TimeSpan.FromSeconds(60),
|
||||
];
|
||||
|
||||
private readonly string _connectionString;
|
||||
private readonly IAlarmHistorianWriter _writer;
|
||||
private readonly ILogger _logger;
|
||||
private readonly int _batchSize;
|
||||
private readonly long _capacity;
|
||||
private readonly TimeSpan _deadLetterRetention;
|
||||
private readonly Func<DateTime> _clock;
|
||||
|
||||
private readonly SemaphoreSlim _drainGate = new(1, 1);
|
||||
private Timer? _drainTimer;
|
||||
private int _backoffIndex;
|
||||
private DateTime? _lastDrainUtc;
|
||||
private DateTime? _lastSuccessUtc;
|
||||
private string? _lastError;
|
||||
private HistorianDrainState _drainState = HistorianDrainState.Idle;
|
||||
private bool _disposed;
|
||||
|
||||
public SqliteStoreAndForwardSink(
|
||||
string databasePath,
|
||||
IAlarmHistorianWriter writer,
|
||||
ILogger logger,
|
||||
int batchSize = 100,
|
||||
long capacity = DefaultCapacity,
|
||||
TimeSpan? deadLetterRetention = null,
|
||||
Func<DateTime>? clock = null)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(databasePath))
|
||||
throw new ArgumentException("Database path required.", nameof(databasePath));
|
||||
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize));
|
||||
_capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
|
||||
_deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
|
||||
_clock = clock ?? (() => DateTime.UtcNow);
|
||||
_connectionString = $"Data Source={databasePath}";
|
||||
|
||||
InitializeSchema();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Start the background drain worker. Not started automatically so tests can
|
||||
/// drive <see cref="DrainOnceAsync"/> deterministically.
|
||||
/// </summary>
|
||||
public void StartDrainLoop(TimeSpan tickInterval)
|
||||
{
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
|
||||
_drainTimer?.Dispose();
|
||||
_drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None),
|
||||
null, tickInterval, tickInterval);
|
||||
}
|
||||
|
||||
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
|
||||
{
|
||||
if (evt is null) throw new ArgumentNullException(nameof(evt));
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
|
||||
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
|
||||
EnforceCapacity(conn);
|
||||
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount)
|
||||
VALUES ($alarmId, $enqueued, $payload, 0);
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId);
|
||||
cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
|
||||
cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
|
||||
cmd.ExecuteNonQuery();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Read up to <see cref="_batchSize"/> queued rows, forward through the writer,
|
||||
/// remove Ack'd rows, dead-letter PermanentFail rows, and extend the backoff
|
||||
/// on RetryPlease. Safe to call from multiple threads; the semaphore enforces
|
||||
/// serial execution.
|
||||
/// </summary>
|
||||
public async Task DrainOnceAsync(CancellationToken ct)
|
||||
{
|
||||
if (_disposed) return;
|
||||
if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return;
|
||||
try
|
||||
{
|
||||
_drainState = HistorianDrainState.Draining;
|
||||
_lastDrainUtc = _clock();
|
||||
|
||||
PurgeAgedDeadLetters();
|
||||
var (rowIds, events) = ReadBatch();
|
||||
if (rowIds.Count == 0)
|
||||
{
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
return;
|
||||
}
|
||||
|
||||
IReadOnlyList<HistorianWriteOutcome> outcomes;
|
||||
try
|
||||
{
|
||||
outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false);
|
||||
_lastError = null;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Writer-side exception — treat entire batch as RetryPlease.
|
||||
_lastError = ex.Message;
|
||||
_logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count);
|
||||
BumpBackoff();
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
return;
|
||||
}
|
||||
|
||||
if (outcomes.Count != events.Count)
|
||||
throw new InvalidOperationException(
|
||||
$"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
|
||||
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var tx = conn.BeginTransaction();
|
||||
for (var i = 0; i < outcomes.Count; i++)
|
||||
{
|
||||
var outcome = outcomes[i];
|
||||
var rowId = rowIds[i];
|
||||
switch (outcome)
|
||||
{
|
||||
case HistorianWriteOutcome.Ack:
|
||||
DeleteRow(conn, tx, rowId);
|
||||
break;
|
||||
case HistorianWriteOutcome.PermanentFail:
|
||||
DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}");
|
||||
break;
|
||||
case HistorianWriteOutcome.RetryPlease:
|
||||
BumpAttempt(conn, tx, rowId, "retry-please");
|
||||
break;
|
||||
}
|
||||
}
|
||||
tx.Commit();
|
||||
|
||||
var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
|
||||
if (acks > 0) _lastSuccessUtc = _clock();
|
||||
|
||||
if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
|
||||
{
|
||||
BumpBackoff();
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
}
|
||||
else
|
||||
{
|
||||
ResetBackoff();
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_drainGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public HistorianSinkStatus GetStatus()
|
||||
{
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
|
||||
long queued;
|
||||
long deadlettered;
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||
queued = (long)(cmd.ExecuteScalar() ?? 0L);
|
||||
}
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1";
|
||||
deadlettered = (long)(cmd.ExecuteScalar() ?? 0L);
|
||||
}
|
||||
|
||||
return new HistorianSinkStatus(
|
||||
QueueDepth: queued,
|
||||
DeadLetterDepth: deadlettered,
|
||||
LastDrainUtc: _lastDrainUtc,
|
||||
LastSuccessUtc: _lastSuccessUtc,
|
||||
LastError: _lastError,
|
||||
DrainState: _drainState);
|
||||
}
|
||||
|
||||
/// <summary>Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.</summary>
|
||||
public int RetryDeadLettered()
|
||||
{
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
|
||||
return cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private (List<long> rowIds, List<AlarmHistorianEvent> events) ReadBatch()
|
||||
{
|
||||
var rowIds = new List<long>();
|
||||
var events = new List<AlarmHistorianEvent>();
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
SELECT RowId, PayloadJson FROM Queue
|
||||
WHERE DeadLettered = 0
|
||||
ORDER BY RowId ASC
|
||||
LIMIT $limit
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$limit", _batchSize);
|
||||
using var reader = cmd.ExecuteReader();
|
||||
while (reader.Read())
|
||||
{
|
||||
rowIds.Add(reader.GetInt64(0));
|
||||
var payload = reader.GetString(1);
|
||||
var evt = JsonSerializer.Deserialize<AlarmHistorianEvent>(payload);
|
||||
if (evt is not null) events.Add(evt);
|
||||
}
|
||||
return (rowIds, events);
|
||||
}
|
||||
|
||||
private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId)
|
||||
{
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.Transaction = tx;
|
||||
cmd.CommandText = "DELETE FROM Queue WHERE RowId = $id";
|
||||
cmd.Parameters.AddWithValue("$id", rowId);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private void DeadLetterRow(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
|
||||
{
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.Transaction = tx;
|
||||
cmd.CommandText = """
|
||||
UPDATE Queue SET DeadLettered = 1, LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
|
||||
WHERE RowId = $id
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
|
||||
cmd.Parameters.AddWithValue("$err", reason);
|
||||
cmd.Parameters.AddWithValue("$id", rowId);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private void BumpAttempt(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
|
||||
{
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.Transaction = tx;
|
||||
cmd.CommandText = """
|
||||
UPDATE Queue SET LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
|
||||
WHERE RowId = $id
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
|
||||
cmd.Parameters.AddWithValue("$err", reason);
|
||||
cmd.Parameters.AddWithValue("$id", rowId);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private void EnforceCapacity(SqliteConnection conn)
|
||||
{
|
||||
// Count non-dead-lettered rows only — dead-lettered rows retain for
|
||||
// post-mortem per the configured retention window.
|
||||
long count;
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||
count = (long)(cmd.ExecuteScalar() ?? 0L);
|
||||
}
|
||||
if (count < _capacity) return;
|
||||
|
||||
var toEvict = count - _capacity + 1;
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = """
|
||||
DELETE FROM Queue
|
||||
WHERE RowId IN (
|
||||
SELECT RowId FROM Queue
|
||||
WHERE DeadLettered = 0
|
||||
ORDER BY RowId ASC
|
||||
LIMIT $n
|
||||
)
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$n", toEvict);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
_logger.Warning(
|
||||
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room",
|
||||
_capacity, toEvict);
|
||||
}
|
||||
|
||||
private void PurgeAgedDeadLetters()
|
||||
{
|
||||
var cutoff = (_clock() - _deadLetterRetention).ToString("O");
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
DELETE FROM Queue
|
||||
WHERE DeadLettered = 1 AND LastAttemptUtc IS NOT NULL AND LastAttemptUtc < $cutoff
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$cutoff", cutoff);
|
||||
var purged = cmd.ExecuteNonQuery();
|
||||
if (purged > 0)
|
||||
_logger.Information("Purged {Count} dead-lettered row(s) past retention window", purged);
|
||||
}
|
||||
|
||||
private void InitializeSchema()
|
||||
{
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
CREATE TABLE IF NOT EXISTS Queue (
|
||||
RowId INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
AlarmId TEXT NOT NULL,
|
||||
EnqueuedUtc TEXT NOT NULL,
|
||||
PayloadJson TEXT NOT NULL,
|
||||
AttemptCount INTEGER NOT NULL DEFAULT 0,
|
||||
LastAttemptUtc TEXT NULL,
|
||||
LastError TEXT NULL,
|
||||
DeadLettered INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS IX_Queue_Drain ON Queue (DeadLettered, RowId);
|
||||
""";
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private void BumpBackoff() => _backoffIndex = Math.Min(_backoffIndex + 1, BackoffLadder.Length - 1);
|
||||
private void ResetBackoff() => _backoffIndex = 0;
|
||||
public TimeSpan CurrentBackoff => BackoffLadder[_backoffIndex];
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_drainTimer?.Dispose();
|
||||
_drainGate.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<LangVersion>latest</LangVersion>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<GenerateDocumentationFile>true</GenerateDocumentationFile>
|
||||
<NoWarn>$(NoWarn);CS1591</NoWarn>
|
||||
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Data.Sqlite" Version="9.0.0"/>
|
||||
<PackageReference Include="Serilog" Version="4.2.0"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -60,6 +60,14 @@ public enum MessageKind : byte
|
||||
HostConnectivityStatus = 0x70,
|
||||
RuntimeStatusChange = 0x71,
|
||||
|
||||
// Phase 7 Stream D — historian alarm sink. Main server → Galaxy.Host batched
|
||||
// writes into the Aveva Historian alarm schema via the already-loaded
|
||||
// aahClientManaged DLLs. HistorianConnectivityStatus fires proactively from the
|
||||
// Host when the SDK session transitions so diagnostics flip promptly.
|
||||
HistorianAlarmEventRequest = 0x80,
|
||||
HistorianAlarmEventResponse = 0x81,
|
||||
HistorianConnectivityStatus = 0x82,
|
||||
|
||||
RecycleHostRequest = 0xF0,
|
||||
RecycleStatusResponse = 0xF1,
|
||||
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
using System;
|
||||
using MessagePack;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 Stream D — IPC contracts for routing Part 9 alarm transitions from the
|
||||
/// main .NET 10 server into Galaxy.Host's already-loaded <c>aahClientManaged</c>
|
||||
/// DLLs. Reuses the Tier-C isolation + licensing pathway rather than loading 32-bit
|
||||
/// native historian code into the main server.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Batched on the wire to amortize IPC overhead — the main server's SqliteStoreAndForwardSink
|
||||
/// ships up to 100 events per request per Phase 7 plan Stream D.5.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Per-event outcomes (Ack / RetryPlease / PermanentFail) let the drain worker
|
||||
/// dead-letter malformed events without blocking neighbors in the batch.
|
||||
/// <see cref="HistorianConnectivityStatusNotification"/> fires proactively from
|
||||
/// the Host when the SDK session drops so the /hosts + /alarms/historian Admin
|
||||
/// diagnostics pages flip to red promptly instead of waiting for the next
|
||||
/// drain cycle.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianAlarmEventRequest
|
||||
{
|
||||
[Key(0)] public HistorianAlarmEventDto[] Events { get; set; } = Array.Empty<HistorianAlarmEventDto>();
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianAlarmEventResponse
|
||||
{
|
||||
/// <summary>Per-event outcome, same order as the request.</summary>
|
||||
[Key(0)] public HistorianAlarmEventOutcomeDto[] Outcomes { get; set; } = Array.Empty<HistorianAlarmEventOutcomeDto>();
|
||||
}
|
||||
|
||||
/// <summary>Outcome enum — bytes on the wire so it stays compact.</summary>
|
||||
public enum HistorianAlarmEventOutcomeDto : byte
|
||||
{
|
||||
/// <summary>Successfully persisted to the historian — remove from queue.</summary>
|
||||
Ack = 0,
|
||||
/// <summary>Transient failure (historian disconnected, timeout, busy) — retry after backoff.</summary>
|
||||
RetryPlease = 1,
|
||||
/// <summary>Permanent failure (malformed, unrecoverable SDK error) — move to dead-letter.</summary>
|
||||
PermanentFail = 2,
|
||||
}
|
||||
|
||||
/// <summary>One alarm-transition payload. Fields mirror <c>Core.AlarmHistorian.AlarmHistorianEvent</c>.</summary>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianAlarmEventDto
|
||||
{
|
||||
[Key(0)] public string AlarmId { get; set; } = string.Empty;
|
||||
[Key(1)] public string EquipmentPath { get; set; } = string.Empty;
|
||||
[Key(2)] public string AlarmName { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Concrete Part 9 subtype name — "LimitAlarm" / "OffNormalAlarm" / "AlarmCondition" / "DiscreteAlarm".</summary>
|
||||
[Key(3)] public string AlarmTypeName { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Numeric severity the Host maps to the historian's priority scale.</summary>
|
||||
[Key(4)] public int Severity { get; set; }
|
||||
|
||||
/// <summary>Which transition this event represents — "Activated" / "Cleared" / "Acknowledged" / etc.</summary>
|
||||
[Key(5)] public string EventKind { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Pre-rendered message — template tokens resolved upstream.</summary>
|
||||
[Key(6)] public string Message { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Operator who triggered the transition. "system" for engine-driven events.</summary>
|
||||
[Key(7)] public string User { get; set; } = "system";
|
||||
|
||||
/// <summary>Operator-supplied free-form comment, if any.</summary>
|
||||
[Key(8)] public string? Comment { get; set; }
|
||||
|
||||
/// <summary>Source timestamp (UTC Unix milliseconds).</summary>
|
||||
[Key(9)] public long TimestampUtcUnixMs { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Proactive notification — Galaxy.Host pushes this when the historian SDK session
|
||||
/// transitions (connected / disconnected / degraded). The main server reflects this
|
||||
/// into the historian sink status so Admin UI surfaces the problem without the
|
||||
/// operator having to scrutinize drain cadence.
|
||||
/// </summary>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianConnectivityStatusNotification
|
||||
{
|
||||
[Key(0)] public string Status { get; set; } = "unknown"; // connected | disconnected | degraded
|
||||
[Key(1)] public string? Detail { get; set; }
|
||||
[Key(2)] public long ObservedAtUtcUnixMs { get; set; }
|
||||
}
|
||||
Reference in New Issue
Block a user