feat(historian): config-gated SqliteStoreAndForward→Wonderware sink (AddAlarmHistorian)
This commit is contained in:
@@ -22,7 +22,9 @@ using ZB.MOM.WW.OtOpcUa.Host.Health;
|
|||||||
using ZB.MOM.WW.OtOpcUa.Host.Logging;
|
using ZB.MOM.WW.OtOpcUa.Host.Logging;
|
||||||
using ZB.MOM.WW.OtOpcUa.Host.Observability;
|
using ZB.MOM.WW.OtOpcUa.Host.Observability;
|
||||||
using ZB.MOM.WW.OtOpcUa.Host.OpcUa;
|
using ZB.MOM.WW.OtOpcUa.Host.OpcUa;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
|
||||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
||||||
using ZB.MOM.WW.OtOpcUa.Runtime.Scripting;
|
using ZB.MOM.WW.OtOpcUa.Runtime.Scripting;
|
||||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security;
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security;
|
||||||
using ZB.MOM.WW.OtOpcUa.Runtime;
|
using ZB.MOM.WW.OtOpcUa.Runtime;
|
||||||
@@ -83,6 +85,18 @@ builder.Services.AddOtOpcUaCluster(builder.Configuration);
|
|||||||
if (hasDriver)
|
if (hasDriver)
|
||||||
{
|
{
|
||||||
builder.Services.AddOtOpcUaRuntime();
|
builder.Services.AddOtOpcUaRuntime();
|
||||||
|
|
||||||
|
// Config-gated durable alarm-historian sink. When the AlarmHistorian section is enabled this
|
||||||
|
// overrides the NullAlarmHistorianSink default from AddOtOpcUaRuntime (last registration wins)
|
||||||
|
// with a SqliteStoreAndForwardSink draining to the Wonderware named-pipe writer. The writer is
|
||||||
|
// injected here because the Host is the only project that references the Wonderware client —
|
||||||
|
// Runtime owns the gating + Sqlite construction, the Host supplies the concrete downstream.
|
||||||
|
builder.Services.AddAlarmHistorian(
|
||||||
|
builder.Configuration,
|
||||||
|
(opts, sp) => new WonderwareHistorianClient(
|
||||||
|
new WonderwareHistorianClientOptions(opts.PipeName, opts.SharedSecret),
|
||||||
|
sp.GetService<ILogger<WonderwareHistorianClient>>()));
|
||||||
|
|
||||||
// Bind every cross-platform driver factory before AddAkka resolves IDriverFactory — replaces
|
// Bind every cross-platform driver factory before AddAkka resolves IDriverFactory — replaces
|
||||||
// the F7-default NullDriverFactory with a real DriverFactoryRegistryAdapter so DriverHostActor
|
// the F7-default NullDriverFactory with a real DriverFactoryRegistryAdapter so DriverHostActor
|
||||||
// can materialise real IDriver instances on deploy.
|
// can materialise real IDriver instances on deploy.
|
||||||
|
|||||||
@@ -10,5 +10,11 @@
|
|||||||
"Auth": {
|
"Auth": {
|
||||||
"DisableLogin": false
|
"DisableLogin": false
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"AlarmHistorian": {
|
||||||
|
"Enabled": false,
|
||||||
|
"DatabasePath": "alarm-historian.db",
|
||||||
|
"PipeName": "OtOpcUaHistorian",
|
||||||
|
"SharedSecret": ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,32 @@
|
|||||||
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Binds the <c>AlarmHistorian</c> configuration section that gates the durable
|
||||||
|
/// store-and-forward alarm sink. When <see cref="Enabled"/> is <c>true</c>,
|
||||||
|
/// <c>AddAlarmHistorian</c> registers a <c>SqliteStoreAndForwardSink</c> (draining to the
|
||||||
|
/// Wonderware named-pipe writer supplied by the Host) in place of the
|
||||||
|
/// <c>NullAlarmHistorianSink</c> default; otherwise the Null default survives.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AlarmHistorianOptions
|
||||||
|
{
|
||||||
|
/// <summary>The configuration section name this options class binds.</summary>
|
||||||
|
public const string SectionName = "AlarmHistorian";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When <c>true</c>, the durable SQLite store-and-forward sink is registered; when
|
||||||
|
/// <c>false</c> (the default) the no-op <c>NullAlarmHistorianSink</c> stays in place.
|
||||||
|
/// </summary>
|
||||||
|
public bool Enabled { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Filesystem path to the local SQLite store-and-forward queue database.</summary>
|
||||||
|
public string DatabasePath { get; init; } = "alarm-historian.db";
|
||||||
|
|
||||||
|
/// <summary>Named-pipe name the Wonderware historian sidecar listens on.</summary>
|
||||||
|
public string PipeName { get; init; } = "OtOpcUaHistorian";
|
||||||
|
|
||||||
|
/// <summary>Per-process shared secret the sidecar verifies in the Hello frame.</summary>
|
||||||
|
public string SharedSecret { get; init; } = "";
|
||||||
|
|
||||||
|
/// <summary>Maximum number of queued rows the drain worker forwards in a single batch.</summary>
|
||||||
|
public int BatchSize { get; init; } = 100;
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
using Akka.Actor;
|
using Akka.Actor;
|
||||||
using Akka.Hosting;
|
using Akka.Hosting;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@@ -48,6 +49,45 @@ public static class ServiceCollectionExtensions
|
|||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Config-gated durable alarm-historian sink. When the <c>AlarmHistorian</c> section has
|
||||||
|
/// <c>Enabled=true</c>, registers a <see cref="SqliteStoreAndForwardSink"/> (draining via the
|
||||||
|
/// <paramref name="writerFactory"/>-supplied writer) as the <see cref="IAlarmHistorianSink"/>,
|
||||||
|
/// overriding the <see cref="NullAlarmHistorianSink"/> default. Otherwise a no-op (Null stays).
|
||||||
|
/// The writer is injected so the durable downstream (Wonderware named-pipe client) can be supplied
|
||||||
|
/// by the Host, which is the only project that references it.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="services">The service collection to register with.</param>
|
||||||
|
/// <param name="configuration">The configuration carrying the <c>AlarmHistorian</c> section.</param>
|
||||||
|
/// <param name="writerFactory">
|
||||||
|
/// Factory the Host supplies to build the concrete <see cref="IAlarmHistorianWriter"/>
|
||||||
|
/// (the Wonderware named-pipe client) from the bound options + the resolving provider.
|
||||||
|
/// </param>
|
||||||
|
/// <returns>The same <paramref name="services"/> instance for chaining.</returns>
|
||||||
|
public static IServiceCollection AddAlarmHistorian(
|
||||||
|
this IServiceCollection services,
|
||||||
|
IConfiguration configuration,
|
||||||
|
Func<AlarmHistorianOptions, IServiceProvider, IAlarmHistorianWriter> writerFactory)
|
||||||
|
{
|
||||||
|
var opts = configuration.GetSection(AlarmHistorianOptions.SectionName).Get<AlarmHistorianOptions>();
|
||||||
|
if (opts is not { Enabled: true }) return services; // leave the Null default from AddOtOpcUaRuntime
|
||||||
|
|
||||||
|
services.AddSingleton<IAlarmHistorianSink>(sp =>
|
||||||
|
{
|
||||||
|
// SqliteStoreAndForwardSink takes a Serilog ILogger (not Microsoft.Extensions.Logging).
|
||||||
|
// Resolve it off the host's configured static logger so the drain worker's WARN/INFO
|
||||||
|
// lines land in the same sinks as the rest of the process.
|
||||||
|
var sink = new SqliteStoreAndForwardSink(
|
||||||
|
opts.DatabasePath,
|
||||||
|
writerFactory(opts, sp),
|
||||||
|
Serilog.Log.Logger.ForContext<SqliteStoreAndForwardSink>(),
|
||||||
|
batchSize: opts.BatchSize);
|
||||||
|
sink.StartDrainLoop(TimeSpan.FromSeconds(5));
|
||||||
|
return sink;
|
||||||
|
});
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Spawns the per-node driver-role actors on the host's <see cref="ActorSystem"/>:
|
/// Spawns the per-node driver-role actors on the host's <see cref="ActorSystem"/>:
|
||||||
/// <see cref="DriverHostActor"/> (one per node), <see cref="DbHealthProbeActor"/>
|
/// <see cref="DriverHostActor"/> (one per node), <see cref="DbHealthProbeActor"/>
|
||||||
|
|||||||
+94
@@ -0,0 +1,94 @@
|
|||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Historian;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Verifies the config-gated <c>AddAlarmHistorian</c> registration: when the
|
||||||
|
/// <c>AlarmHistorian</c> section is absent or disabled the <see cref="NullAlarmHistorianSink"/>
|
||||||
|
/// default survives; when it is enabled a real <see cref="SqliteStoreAndForwardSink"/> wins
|
||||||
|
/// (last-registration-wins over the <c>TryAddSingleton</c> Null default).
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AlarmHistorianRegistrationTests
|
||||||
|
{
|
||||||
|
/// <summary>A no-op writer the factory hands the Sqlite sink; never actually invoked in these tests.</summary>
|
||||||
|
private sealed class FakeWriter : IAlarmHistorianWriter
|
||||||
|
{
|
||||||
|
public Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
|
||||||
|
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
|
||||||
|
=> Task.FromResult<IReadOnlyList<HistorianWriteOutcome>>(
|
||||||
|
batch.Select(_ => HistorianWriteOutcome.Ack).ToList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Seed the Null default exactly the way <c>AddOtOpcUaRuntime</c> does, then add logging.</summary>
|
||||||
|
private static ServiceCollection BaseServices()
|
||||||
|
{
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
services.AddLogging();
|
||||||
|
services.TryAddSingleton<IAlarmHistorianSink>(NullAlarmHistorianSink.Instance);
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IConfiguration ConfigFrom(Dictionary<string, string?> values)
|
||||||
|
=> new ConfigurationBuilder().AddInMemoryCollection(values).Build();
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Section_absent_keeps_null_sink()
|
||||||
|
{
|
||||||
|
var services = BaseServices();
|
||||||
|
var config = ConfigFrom(new Dictionary<string, string?>());
|
||||||
|
|
||||||
|
services.AddAlarmHistorian(config, (_, _) => new FakeWriter());
|
||||||
|
|
||||||
|
using var provider = services.BuildServiceProvider();
|
||||||
|
provider.GetRequiredService<IAlarmHistorianSink>().ShouldBeOfType<NullAlarmHistorianSink>();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Section_disabled_keeps_null_sink()
|
||||||
|
{
|
||||||
|
var services = BaseServices();
|
||||||
|
var config = ConfigFrom(new Dictionary<string, string?>
|
||||||
|
{
|
||||||
|
["AlarmHistorian:Enabled"] = "false",
|
||||||
|
});
|
||||||
|
|
||||||
|
services.AddAlarmHistorian(config, (_, _) => new FakeWriter());
|
||||||
|
|
||||||
|
using var provider = services.BuildServiceProvider();
|
||||||
|
provider.GetRequiredService<IAlarmHistorianSink>().ShouldBeOfType<NullAlarmHistorianSink>();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Section_enabled_registers_sqlite_sink()
|
||||||
|
{
|
||||||
|
var tempDir = Path.Combine(Path.GetTempPath(), "otopcua-alarmhist-test-" + Guid.NewGuid().ToString("N"));
|
||||||
|
Directory.CreateDirectory(tempDir);
|
||||||
|
var dbPath = Path.Combine(tempDir, "alarm-historian-test.db");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var services = BaseServices();
|
||||||
|
var config = ConfigFrom(new Dictionary<string, string?>
|
||||||
|
{
|
||||||
|
["AlarmHistorian:Enabled"] = "true",
|
||||||
|
["AlarmHistorian:DatabasePath"] = dbPath,
|
||||||
|
});
|
||||||
|
|
||||||
|
services.AddAlarmHistorian(config, (_, _) => new FakeWriter());
|
||||||
|
|
||||||
|
using (var provider = services.BuildServiceProvider())
|
||||||
|
{
|
||||||
|
provider.GetRequiredService<IAlarmHistorianSink>().ShouldBeOfType<SqliteStoreAndForwardSink>();
|
||||||
|
} // dispose stops the drain loop + releases the SQLite file handle
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
try { Directory.Delete(tempDir, recursive: true); } catch (IOException) { /* best effort */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user