Phase 7 follow-up #247 — Galaxy.Host historian writer + SQLite sink activation

Closes the historian leg of Phase 7. Scripted alarm transitions now batch-flow
through the existing Galaxy.Host pipe + queue durably in a local SQLite store-
and-forward when Galaxy is the registered driver, instead of being dropped into
NullAlarmHistorianSink.

## GalaxyHistorianWriter (Driver.Galaxy.Proxy.Ipc)

IAlarmHistorianWriter implementation. Translates AlarmHistorianEvent →
HistorianAlarmEventDto (Stream D contract), batches via the existing
GalaxyIpcClient.CallAsync round-trip on MessageKind.HistorianAlarmEventRequest /
Response, maps per-event HistorianAlarmEventOutcomeDto bytes back to
HistorianWriteOutcome (Ack/RetryPlease/PermanentFail) so the SQLite drain
worker knows what to ack vs dead-letter vs retry. Empty-batch fast path.
Pipe-level transport faults (broken pipe, host crash) bubble up as
GalaxyIpcException which the SQLite sink's drain worker translates to
whole-batch RetryPlease per its catch contract.

## GalaxyProxyDriver implements IAlarmHistorianWriter

Marker interface lets Phase7Composer discover it via type check at compose
time. WriteBatchAsync delegates to a thin GalaxyHistorianWriter wrapping the
driver's existing _client. Throws InvalidOperationException if InitializeAsync
hasn't connected yet — the SQLite drain worker treats that as a transient
batch failure and retries.

## Phase7Composer.ResolveHistorianSink

Replaces the injected sink dep when any registered driver implements
IAlarmHistorianWriter. Constructs SqliteStoreAndForwardSink at
%ProgramData%/OtOpcUa/alarm-historian-queue.db (falls back to %TEMP% when
ProgramData unavailable, e.g. dev), starts the 2s drain timer, owns the sink
disposable for clean teardown. When no driver provides the writer, keeps the
NullAlarmHistorianSink wired by Program.cs (#246).

DisposeAsync now also disposes the owned SQLite sink in the right order:
bridge → engines → owned sink → injected fallback.

## Tests — 7 new GalaxyHistorianWriterMappingTests

ToDto round-trips every field; preserves null Comment; per-byte outcome enum
mapping (Ack / RetryPlease / PermanentFail) via [Theory]; unknown byte throws;
ctor null-guard. The IPC round-trip itself is covered by the live Host suite
(task #240) which constructs a real pipe.

Server.Phase7 tests: 34/34 still pass; Galaxy.Proxy tests: 25/25 (+7 = 32 total).

## Phase 7 production wiring chain — COMPLETE
-  #243 composition kernel
-  #245 scripted-alarm IReadable adapter
-  #244 driver bridge
-  #246 Program.cs wire-in
-  #247 this — Galaxy.Host historian writer + SQLite sink activation

What unblocks now: task #240 live OPC UA E2E smoke. With a Galaxy driver
registered, scripted alarm transitions flow end-to-end through the engine →
SQLite queue → drain worker → Galaxy.Host IPC → Aveva Historian alarm schema.
Without Galaxy, NullSink keeps the engines functional and the queue dormant.
This commit is contained in:
Joseph Doherty
2026-04-20 22:18:39 -04:00
parent 42f3b17c4a
commit bb10ba7108
5 changed files with 248 additions and 1 deletions

View File

@@ -1,4 +1,5 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus;
@@ -22,6 +23,7 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
IHistoryProvider,
IRediscoverable,
IHostConnectivityProbe,
IAlarmHistorianWriter,
IDisposable
{
private GalaxyIpcClient? _client;
@@ -511,6 +513,23 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
_ => AlarmSeverity.Critical,
};
/// <summary>
/// Phase 7 follow-up #247 — IAlarmHistorianWriter implementation. Forwards alarm
/// batches to Galaxy.Host over the existing IPC channel, reusing the connection
/// the driver already established for data-plane traffic. Throws
/// <see cref="InvalidOperationException"/> when called before
/// <see cref="InitializeAsync"/> has connected the client; the SQLite drain worker
/// translates that to whole-batch RetryPlease per its catch contract.
/// </summary>
public Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
{
if (_client is null)
throw new InvalidOperationException(
"GalaxyProxyDriver IPC client not connected — historian writes rejected until InitializeAsync completes");
return new GalaxyHistorianWriter(_client).WriteBatchAsync(batch, cancellationToken);
}
public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult();
}

View File

@@ -0,0 +1,90 @@
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
/// <summary>
/// Phase 7 follow-up (task #247) — bridges <see cref="SqliteStoreAndForwardSink"/>'s
/// drain worker to <c>Driver.Galaxy.Host</c> over the existing <see cref="GalaxyIpcClient"/>
/// pipe. Translates <see cref="AlarmHistorianEvent"/> batches into the
/// <see cref="HistorianAlarmEventDto"/> wire format the Host expects + maps per-event
/// <see cref="HistorianAlarmEventOutcomeDto"/> responses back to
/// <see cref="HistorianWriteOutcome"/> so the SQLite queue knows what to ack /
/// dead-letter / retry.
/// </summary>
/// <remarks>
/// <para>
/// Reuses the IPC channel <see cref="GalaxyProxyDriver"/> already opens for the
/// Galaxy data plane — no second pipe to <c>Driver.Galaxy.Host</c>, no separate
/// auth handshake. The IPC client's call gate serializes historian batches with
/// driver Reads/Writes/Subscribes; historian batches are infrequent (every few
/// seconds at most under the SQLite sink's drain cadence) so the contention is
/// negligible compared to per-tag-read pressure.
/// </para>
/// <para>
/// Pipe-level transport faults (broken pipe, host crash) bubble up as
/// <see cref="GalaxyIpcException"/> which the SQLite sink's drain worker catches +
/// translates to a whole-batch RetryPlease per the
/// <see cref="SqliteStoreAndForwardSink"/> docstring — failed events stay queued
/// for the next drain tick after backoff.
/// </para>
/// </remarks>
public sealed class GalaxyHistorianWriter : IAlarmHistorianWriter
{
private readonly GalaxyIpcClient _client;
public GalaxyHistorianWriter(GalaxyIpcClient client)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
}
public async Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(batch);
if (batch.Count == 0) return [];
var request = new HistorianAlarmEventRequest
{
Events = batch.Select(ToDto).ToArray(),
};
var response = await _client.CallAsync<HistorianAlarmEventRequest, HistorianAlarmEventResponse>(
requestKind: MessageKind.HistorianAlarmEventRequest,
request: request,
expectedResponseKind: MessageKind.HistorianAlarmEventResponse,
ct: cancellationToken).ConfigureAwait(false);
if (response.Outcomes.Length != batch.Count)
throw new InvalidOperationException(
$"Galaxy.Host returned {response.Outcomes.Length} outcomes for a batch of {batch.Count} — protocol mismatch");
var outcomes = new HistorianWriteOutcome[response.Outcomes.Length];
for (var i = 0; i < response.Outcomes.Length; i++)
outcomes[i] = MapOutcome(response.Outcomes[i]);
return outcomes;
}
internal static HistorianAlarmEventDto ToDto(AlarmHistorianEvent e) => new()
{
AlarmId = e.AlarmId,
EquipmentPath = e.EquipmentPath,
AlarmName = e.AlarmName,
AlarmTypeName = e.AlarmTypeName,
Severity = (int)e.Severity,
EventKind = e.EventKind,
Message = e.Message,
User = e.User,
Comment = e.Comment,
TimestampUtcUnixMs = new DateTimeOffset(e.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
};
internal static HistorianWriteOutcome MapOutcome(HistorianAlarmEventOutcomeDto wire) => wire switch
{
HistorianAlarmEventOutcomeDto.Ack => HistorianWriteOutcome.Ack,
HistorianAlarmEventOutcomeDto.RetryPlease => HistorianWriteOutcome.RetryPlease,
HistorianAlarmEventOutcomeDto.PermanentFail => HistorianWriteOutcome.PermanentFail,
_ => throw new InvalidOperationException($"Unknown HistorianAlarmEventOutcomeDto byte {(byte)wire}"),
};
}

View File

@@ -14,6 +14,7 @@
<ItemGroup>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
</ItemGroup>
<ItemGroup>

View File

@@ -47,6 +47,9 @@ public sealed class Phase7Composer : IAsyncDisposable
private DriverSubscriptionBridge? _bridge;
private Phase7ComposedSources _sources = Phase7ComposedSources.Empty;
// Sink we constructed in PrepareAsync (vs. the injected fallback). Held so
// DisposeAsync can flush + tear down the SQLite drain timer.
private SqliteStoreAndForwardSink? _ownedSink;
private bool _disposed;
public Phase7Composer(
@@ -96,13 +99,20 @@ public sealed class Phase7Composer : IAsyncDisposable
var upstream = new CachedTagUpstreamSource();
// Phase 7 follow-up #247 — if any registered driver implements IAlarmHistorianWriter
// (today: GalaxyProxyDriver), wrap it in a SqliteStoreAndForwardSink at
// %ProgramData%/OtOpcUa/alarm-historian-queue.db with the 2s drain cadence the
// sink's docstring recommends. Otherwise fall back to the injected sink (Null in
// the default registration).
var historianSink = ResolveHistorianSink();
_sources = Phase7EngineComposer.Compose(
scripts: scripts,
virtualTags: virtualTags,
scriptedAlarms: scriptedAlarms,
upstream: upstream,
alarmStateStore: new InMemoryAlarmStateStore(),
historianSink: _historianSink,
historianSink: historianSink,
rootScriptLogger: _scriptLogger,
loggerFactory: _loggerFactory);
@@ -121,6 +131,47 @@ public sealed class Phase7Composer : IAsyncDisposable
return _sources;
}
private IAlarmHistorianSink ResolveHistorianSink()
{
IAlarmHistorianWriter? writer = null;
foreach (var driverId in _driverHost.RegisteredDriverIds)
{
if (_driverHost.GetDriver(driverId) is IAlarmHistorianWriter w)
{
writer = w;
_logger.LogInformation(
"Phase 7 historian sink: driver {Driver} provides IAlarmHistorianWriter — wiring SqliteStoreAndForwardSink",
driverId);
break;
}
}
if (writer is null)
{
_logger.LogInformation(
"Phase 7 historian sink: no driver provides IAlarmHistorianWriter — using {Sink}",
_historianSink.GetType().Name);
return _historianSink;
}
var queueRoot = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData);
if (string.IsNullOrEmpty(queueRoot)) queueRoot = Path.GetTempPath();
var queueDir = Path.Combine(queueRoot, "OtOpcUa");
Directory.CreateDirectory(queueDir);
var queuePath = Path.Combine(queueDir, "alarm-historian-queue.db");
var sinkLogger = _loggerFactory.CreateLogger<SqliteStoreAndForwardSink>();
// SqliteStoreAndForwardSink wants a Serilog logger for warn-on-eviction emissions;
// bridge the Microsoft logger via Serilog's null-safe path until the sink's
// dependency surface is reshaped (covered as part of release-readiness).
var serilogShim = _scriptLogger.ForContext("HistorianQueuePath", queuePath);
_ownedSink = new SqliteStoreAndForwardSink(
databasePath: queuePath,
writer: writer,
logger: serilogShim);
_ownedSink.StartDrainLoop(TimeSpan.FromSeconds(2));
return _ownedSink;
}
/// <summary>
/// For each registered driver that exposes <see cref="Core.Abstractions.ISubscribable"/>,
/// build a UNS-path → driver-fullRef map from its EquipmentNamespaceContent.
@@ -178,6 +229,9 @@ public sealed class Phase7Composer : IAsyncDisposable
try { d.Dispose(); }
catch (Exception ex) { _logger.LogWarning(ex, "Phase 7 disposable threw during shutdown"); }
}
// Owned SQLite sink: dispose first so the drain timer stops + final batch flushes
// before we release the writer-bearing driver via DriverHost.DisposeAsync upstream.
_ownedSink?.Dispose();
if (_historianSink is IDisposable disposableSink) disposableSink.Dispose();
}
}