64e3fbe035
v2-ci / build (push) Failing after 1m43s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
Adds <summary>, <param>, <typeparam>, and <inheritdoc/> tags to public members surfaced by commentchecker — resolves 5,847 of 5,869 issues (99.6%) across three /fixdocs passes.
64 lines
2.6 KiB
C#
64 lines
2.6 KiB
C#
using Akka.Actor;
|
|
using Akka.Event;
|
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
|
|
|
/// <summary>
|
|
/// Thin actor wrapper around <see cref="IAlarmHistorianSink"/>. Engine code (ScriptedAlarmActor,
|
|
/// Galaxy native alarm bridge, AB CIP ALMD reader) tells <see cref="AlarmHistorianEvent"/>s to this
|
|
/// actor; the actor enqueues them on the sink fire-and-forget. Production deployments register
|
|
/// <see cref="SqliteStoreAndForwardSink"/> against <c>IAlarmHistorianSink</c>; the sink owns the
|
|
/// durable queue + drain-to-Wonderware-pipe loop. The actor here owns nothing operational beyond
|
|
/// the message contract — its job is to keep the engine actors on Akka's mailbox without blocking
|
|
/// them on disk I/O or pipe handshakes.
|
|
///
|
|
/// Query queue depth + drain health via <see cref="GetStatus"/>.
|
|
/// </summary>
|
|
public sealed class HistorianAdapterActor : ReceiveActor
|
|
{
|
|
public sealed record GetStatus
|
|
{
|
|
public static readonly GetStatus Instance = new();
|
|
}
|
|
|
|
private readonly IAlarmHistorianSink _sink;
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
|
|
/// <summary>Creates the props for a HistorianAdapterActor instance.</summary>
|
|
/// <param name="sink">The alarm historian sink implementation, or null to use a null sink.</param>
|
|
/// <returns>Props configured for creating a HistorianAdapterActor.</returns>
|
|
public static Props Props(IAlarmHistorianSink? sink = null) =>
|
|
Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance));
|
|
|
|
/// <summary>Initializes a new instance of the HistorianAdapterActor class.</summary>
|
|
/// <param name="sink">The alarm historian sink to forward enqueued events to.</param>
|
|
public HistorianAdapterActor(IAlarmHistorianSink sink)
|
|
{
|
|
_sink = sink;
|
|
|
|
Receive<AlarmHistorianEvent>(evt =>
|
|
{
|
|
// Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously
|
|
// inside EnqueueAsync (it returns once the row is committed), so we don't block on
|
|
// network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state.
|
|
_ = EnqueueAsync(evt);
|
|
});
|
|
|
|
Receive<GetStatus>(_ => Sender.Tell(_sink.GetStatus()));
|
|
}
|
|
|
|
private async Task EnqueueAsync(AlarmHistorianEvent evt)
|
|
{
|
|
try
|
|
{
|
|
await _sink.EnqueueAsync(evt, CancellationToken.None);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "Historian sink rejected event for {AlarmId} at {Ts}",
|
|
evt.AlarmId, evt.TimestampUtc);
|
|
}
|
|
}
|
|
}
|