Merge branch 'feat/alarm-historian-c1-writer'
Task #19 (alarms C.1) — complete PR C.1 test coverage for the Wonderware historian alarm-event writer (100/1000-event batching + cluster-failover). The C.1 code structure was already on master; the live aahClientManaged SDK binding in SdkAlarmHistorianWriteBackend remains rig-gated (D.1).
This commit is contained in:
@@ -108,6 +108,92 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
|||||||
result.ShouldBe(new[] { false });
|
result.ShouldBe(new[] { false });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(100)]
|
||||||
|
[InlineData(1000)]
|
||||||
|
public async Task Large_batch_all_ack_returns_all_true(int batchSize)
|
||||||
|
{
|
||||||
|
// Spec: "1 / 100 / 1000 events through a fake aahClientManaged writer;
|
||||||
|
// assert per-row outcome list parallel to input order."
|
||||||
|
var backend = new RecordingBackend(events => events.Select(_ => AlarmHistorianWriteOutcome.Ack).ToArray());
|
||||||
|
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||||
|
|
||||||
|
var batch = Enumerable.Range(0, batchSize)
|
||||||
|
.Select(i => Event($"E{i}"))
|
||||||
|
.ToArray();
|
||||||
|
|
||||||
|
var result = await writer.WriteAsync(batch, CancellationToken.None);
|
||||||
|
|
||||||
|
result.Length.ShouldBe(batchSize);
|
||||||
|
result.ShouldAllBe(ok => ok);
|
||||||
|
backend.Calls.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(100)]
|
||||||
|
[InlineData(1000)]
|
||||||
|
public async Task Large_batch_alternating_outcomes_are_positionally_correct(int batchSize)
|
||||||
|
{
|
||||||
|
// Verifies that per-row outcome ordering is preserved for large batches;
|
||||||
|
// a backend that returns the outcomes in a different allocation order would
|
||||||
|
// fail this test if the writer incorrectly indexing outcomes.
|
||||||
|
var backend = new RecordingBackend(events =>
|
||||||
|
events.Select((_, i) => i % 2 == 0
|
||||||
|
? AlarmHistorianWriteOutcome.Ack
|
||||||
|
: AlarmHistorianWriteOutcome.RetryPlease).ToArray());
|
||||||
|
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||||
|
|
||||||
|
var batch = Enumerable.Range(0, batchSize).Select(i => Event($"E{i}")).ToArray();
|
||||||
|
var result = await writer.WriteAsync(batch, CancellationToken.None);
|
||||||
|
|
||||||
|
result.Length.ShouldBe(batchSize);
|
||||||
|
for (var i = 0; i < result.Length; i++)
|
||||||
|
{
|
||||||
|
var expected = i % 2 == 0;
|
||||||
|
result[i].ShouldBe(expected, $"slot {i}: expected {expected}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Backend_retry_then_succeed_simulates_cluster_failover()
|
||||||
|
{
|
||||||
|
// Spec: "Cluster failover: primary node returns BadCommunicationError;
|
||||||
|
// picker rotates to secondary; assert eventual success."
|
||||||
|
//
|
||||||
|
// The real cluster-failover path is internal to SdkAlarmHistorianWriteBackend
|
||||||
|
// (which is rig-gated) and is exercised at the HistorianClusterEndpointPicker
|
||||||
|
// level in HistorianClusterEndpointPickerTests. Here we test the
|
||||||
|
// AahClientManagedAlarmEventWriter's handling of a backend that returns
|
||||||
|
// RetryPlease on the first call (primary-node failure) and Ack on the
|
||||||
|
// second call (secondary-node success), confirming the IPC layer correctly
|
||||||
|
// propagates the trinary outcome across two separate drain ticks.
|
||||||
|
var callCount = 0;
|
||||||
|
var backend = new RecordingBackend(events =>
|
||||||
|
{
|
||||||
|
callCount++;
|
||||||
|
if (callCount == 1)
|
||||||
|
{
|
||||||
|
// First call: simulate communication error (isCommunicationError=true)
|
||||||
|
// which produces RetryPlease — equivalent to primary node failing.
|
||||||
|
return events.Select(_ => AlarmHistorianWriteOutcome.RetryPlease).ToArray();
|
||||||
|
}
|
||||||
|
// Second call (after cluster picker has rotated to secondary): Ack.
|
||||||
|
return events.Select(_ => AlarmHistorianWriteOutcome.Ack).ToArray();
|
||||||
|
});
|
||||||
|
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||||
|
var batch = new[] { Event("E1"), Event("E2") };
|
||||||
|
|
||||||
|
// First drain tick: primary "fails" → all RetryPlease (false at IPC layer).
|
||||||
|
var firstResult = await writer.WriteAsync(batch, CancellationToken.None);
|
||||||
|
firstResult.ShouldBe(new[] { false, false });
|
||||||
|
|
||||||
|
// Second drain tick: secondary succeeds → all Ack (true at IPC layer).
|
||||||
|
var secondResult = await writer.WriteAsync(batch, CancellationToken.None);
|
||||||
|
secondResult.ShouldBe(new[] { true, true });
|
||||||
|
|
||||||
|
backend.Calls.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
[Theory]
|
[Theory]
|
||||||
// hresult 0 + clean → Ack
|
// hresult 0 + clean → Ack
|
||||||
[InlineData(0, false, false, AlarmHistorianWriteOutcome.Ack)]
|
[InlineData(0, false, false, AlarmHistorianWriteOutcome.Ack)]
|
||||||
|
|||||||
@@ -0,0 +1,164 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// PR C.1 — pins the <see cref="SdkAlarmHistorianWriteBackend"/> contract:
|
||||||
|
/// <list type="bullet">
|
||||||
|
/// <item><description>
|
||||||
|
/// Unit: the placeholder backend returns <see cref="AlarmHistorianWriteOutcome.RetryPlease"/>
|
||||||
|
/// for every slot so the lmxopcua-side store-and-forward sink retains events rather than
|
||||||
|
/// dropping them while D.1 is unresolved.
|
||||||
|
/// </description></item>
|
||||||
|
/// <item><description>
|
||||||
|
/// Integration (rig-gated): once D.1 pins the live SDK entry point the Skip attribute is
|
||||||
|
/// removed. The live test writes a synthetic batch to a real AVEVA Historian and asserts
|
||||||
|
/// the cluster picker rotates from a broken primary to a healthy secondary.
|
||||||
|
/// </description></item>
|
||||||
|
/// </list>
|
||||||
|
/// </summary>
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class SdkAlarmHistorianWriteBackendTests
|
||||||
|
{
|
||||||
|
// ── Placeholder-mode tests (no rig required) ─────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Placeholder_returns_RetryPlease_for_every_slot_so_queue_is_preserved()
|
||||||
|
{
|
||||||
|
// The SDK call-site in SdkAlarmHistorianWriteBackend is not yet pinned (PR D.1).
|
||||||
|
// Until D.1 swaps in the live call, the backend must return RetryPlease for every
|
||||||
|
// event so the lmxopcua-side SqliteStoreAndForwardSink retains the rows instead of
|
||||||
|
// dropping them — same effect as the NullAlarmHistorianSink fallback, but each
|
||||||
|
// slot is individually addressable for the drain worker.
|
||||||
|
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
|
||||||
|
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||||
|
|
||||||
|
var events = new[]
|
||||||
|
{
|
||||||
|
AlarmEvent("E1"),
|
||||||
|
AlarmEvent("E2"),
|
||||||
|
AlarmEvent("E3"),
|
||||||
|
};
|
||||||
|
var outcomes = await backend.WriteBatchAsync(events, CancellationToken.None);
|
||||||
|
|
||||||
|
outcomes.Length.ShouldBe(events.Length);
|
||||||
|
outcomes.ShouldAllBe(o => o == AlarmHistorianWriteOutcome.RetryPlease);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Placeholder_returns_empty_array_for_empty_batch()
|
||||||
|
{
|
||||||
|
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
|
||||||
|
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||||
|
|
||||||
|
var outcomes = await backend.WriteBatchAsync(Array.Empty<AlarmHistorianEventDto>(), CancellationToken.None);
|
||||||
|
|
||||||
|
outcomes.ShouldBeEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Placeholder_returns_same_count_as_input_for_large_batch()
|
||||||
|
{
|
||||||
|
// Guards against an off-by-one error in the placeholder array allocation —
|
||||||
|
// WriteBatchAsync must always return exactly as many outcomes as input events.
|
||||||
|
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
|
||||||
|
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||||
|
|
||||||
|
var batch = Enumerable.Range(0, 1000).Select(i => AlarmEvent($"E{i}")).ToArray();
|
||||||
|
var outcomes = await backend.WriteBatchAsync(batch, CancellationToken.None);
|
||||||
|
|
||||||
|
outcomes.Length.ShouldBe(1000);
|
||||||
|
outcomes.ShouldAllBe(o => o == AlarmHistorianWriteOutcome.RetryPlease);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Rig-gated integration tests ───────────────────────────────────────
|
||||||
|
//
|
||||||
|
// The tests below need a live AVEVA Historian install and are gated with
|
||||||
|
// Skip="rig-required". Once PR D.1 pins the SDK entry point, remove the
|
||||||
|
// Skip attribute and add them to the integration test run profile.
|
||||||
|
|
||||||
|
[Fact(Skip = "rig-required: needs a live AVEVA Historian + aahClientManaged SDK — enable in PR D.1")]
|
||||||
|
public async Task Live_single_event_roundtrip_returns_Ack()
|
||||||
|
{
|
||||||
|
// Spec (PR C.1, Tests): "1 / 100 / 1000 events through a fake aahClientManaged
|
||||||
|
// writer; assert per-row outcome list parallel to input order."
|
||||||
|
//
|
||||||
|
// This slice exercises the *live* SDK path. The fake-backend variant at
|
||||||
|
// AahClientManagedAlarmEventWriterTests covers the same assertion without the rig.
|
||||||
|
var cfg = BuildRigConfig();
|
||||||
|
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||||
|
|
||||||
|
var outcomes = await backend.WriteBatchAsync(new[] { AlarmEvent("rig-E1") }, CancellationToken.None);
|
||||||
|
|
||||||
|
outcomes.Length.ShouldBe(1);
|
||||||
|
outcomes[0].ShouldBe(AlarmHistorianWriteOutcome.Ack);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact(Skip = "rig-required: needs a live AVEVA Historian cluster (two nodes) — enable in PR D.1")]
|
||||||
|
public async Task Live_cluster_failover_primary_bad_rotates_to_secondary()
|
||||||
|
{
|
||||||
|
// Spec (PR C.1, Tests): "Cluster failover: primary node returns
|
||||||
|
// BadCommunicationError; picker rotates to secondary; assert eventual success."
|
||||||
|
//
|
||||||
|
// Configure the first server name to point at a deliberately unreachable node
|
||||||
|
// and the second to the real Historian; the picker should mark the first node
|
||||||
|
// failed and succeed via the second.
|
||||||
|
var cfg = new HistorianConfiguration
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
ServerNames = new System.Collections.Generic.List<string>
|
||||||
|
{
|
||||||
|
"invalid-primary-node-deliberately-unreachable",
|
||||||
|
Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
|
||||||
|
},
|
||||||
|
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
|
||||||
|
IntegratedSecurity = true,
|
||||||
|
FailureCooldownSeconds = 5,
|
||||||
|
CommandTimeoutSeconds = 10,
|
||||||
|
};
|
||||||
|
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||||
|
|
||||||
|
var outcomes = await backend.WriteBatchAsync(new[] { AlarmEvent("rig-failover-E1") }, CancellationToken.None);
|
||||||
|
|
||||||
|
// The backend must succeed (Ack) via the secondary even though the primary was bad.
|
||||||
|
outcomes.Length.ShouldBe(1);
|
||||||
|
outcomes[0].ShouldBe(AlarmHistorianWriteOutcome.Ack);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AlarmHistorianEventDto AlarmEvent(string id) => new AlarmHistorianEventDto
|
||||||
|
{
|
||||||
|
EventId = id,
|
||||||
|
SourceName = "TestSource",
|
||||||
|
ConditionId = "TestSource.Level.HiHi",
|
||||||
|
AlarmType = "AnalogLimitAlarm.HiHi",
|
||||||
|
Message = "C.1 integration test alarm",
|
||||||
|
Severity = 500,
|
||||||
|
EventTimeUtcTicks = DateTime.UtcNow.Ticks,
|
||||||
|
AckComment = null,
|
||||||
|
};
|
||||||
|
|
||||||
|
private static HistorianConfiguration BuildRigConfig() => new HistorianConfiguration
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
|
||||||
|
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
|
||||||
|
IntegratedSecurity = true,
|
||||||
|
CommandTimeoutSeconds = 30,
|
||||||
|
FailureCooldownSeconds = 60,
|
||||||
|
};
|
||||||
|
|
||||||
|
private static int TryParseInt(string envName, int defaultValue)
|
||||||
|
{
|
||||||
|
var raw = Environment.GetEnvironmentVariable(envName);
|
||||||
|
return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user