test(historian-sidecar): complete PR C.1 test coverage for AahClientManagedAlarmEventWriter
Add the spec-required 100/1000-event batching tests and cluster-failover
tests that were missing from the existing C.1 suite:
- AahClientManagedAlarmEventWriterTests: add Large_batch_all_ack_returns_all_true
(batchSize 100 + 1000) and Large_batch_alternating_outcomes_are_positionally_correct
(batchSize 100 + 1000) to satisfy the "1 / 100 / 1000 events" spec requirement;
add Backend_retry_then_succeed_simulates_cluster_failover to cover the
RetryPlease-then-Ack sequence at the IPC layer (unit-level stand-in for the
rig-gated live cluster-failover path).
- SdkAlarmHistorianWriteBackendTests (new file): unit tests that pin the
placeholder backend's RetryPlease-for-every-slot contract (preserves queued
events while D.1 is unresolved); plus two Skip("rig-required") integration
tests covering the live SDK single-event roundtrip and cluster failover via
HistorianClusterEndpointPicker — remove the Skip in PR D.1.
Feasibility note: aahClientManaged.dll IS present in lib/ and referenced in
the csproj; the SDK call site is isolated behind IAlarmHistorianWriteBackend
in SdkAlarmHistorianWriteBackend.WriteBatchAsync (single method, D.1 seam).
The full AahClientManagedAlarmEventWriter implementation was already complete.
Build: 0 errors, 0 warnings.
Tests: 64 passed, 2 skipped (rig-gated), 0 failed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -108,6 +108,92 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||
result.ShouldBe(new[] { false });
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(100)]
|
||||
[InlineData(1000)]
|
||||
public async Task Large_batch_all_ack_returns_all_true(int batchSize)
|
||||
{
|
||||
// Spec: "1 / 100 / 1000 events through a fake aahClientManaged writer;
|
||||
// assert per-row outcome list parallel to input order."
|
||||
var backend = new RecordingBackend(events => events.Select(_ => AlarmHistorianWriteOutcome.Ack).ToArray());
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
|
||||
var batch = Enumerable.Range(0, batchSize)
|
||||
.Select(i => Event($"E{i}"))
|
||||
.ToArray();
|
||||
|
||||
var result = await writer.WriteAsync(batch, CancellationToken.None);
|
||||
|
||||
result.Length.ShouldBe(batchSize);
|
||||
result.ShouldAllBe(ok => ok);
|
||||
backend.Calls.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(100)]
|
||||
[InlineData(1000)]
|
||||
public async Task Large_batch_alternating_outcomes_are_positionally_correct(int batchSize)
|
||||
{
|
||||
// Verifies that per-row outcome ordering is preserved for large batches;
|
||||
// a backend that returns the outcomes in a different allocation order would
|
||||
// fail this test if the writer incorrectly indexing outcomes.
|
||||
var backend = new RecordingBackend(events =>
|
||||
events.Select((_, i) => i % 2 == 0
|
||||
? AlarmHistorianWriteOutcome.Ack
|
||||
: AlarmHistorianWriteOutcome.RetryPlease).ToArray());
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
|
||||
var batch = Enumerable.Range(0, batchSize).Select(i => Event($"E{i}")).ToArray();
|
||||
var result = await writer.WriteAsync(batch, CancellationToken.None);
|
||||
|
||||
result.Length.ShouldBe(batchSize);
|
||||
for (var i = 0; i < result.Length; i++)
|
||||
{
|
||||
var expected = i % 2 == 0;
|
||||
result[i].ShouldBe(expected, $"slot {i}: expected {expected}");
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Backend_retry_then_succeed_simulates_cluster_failover()
|
||||
{
|
||||
// Spec: "Cluster failover: primary node returns BadCommunicationError;
|
||||
// picker rotates to secondary; assert eventual success."
|
||||
//
|
||||
// The real cluster-failover path is internal to SdkAlarmHistorianWriteBackend
|
||||
// (which is rig-gated) and is exercised at the HistorianClusterEndpointPicker
|
||||
// level in HistorianClusterEndpointPickerTests. Here we test the
|
||||
// AahClientManagedAlarmEventWriter's handling of a backend that returns
|
||||
// RetryPlease on the first call (primary-node failure) and Ack on the
|
||||
// second call (secondary-node success), confirming the IPC layer correctly
|
||||
// propagates the trinary outcome across two separate drain ticks.
|
||||
var callCount = 0;
|
||||
var backend = new RecordingBackend(events =>
|
||||
{
|
||||
callCount++;
|
||||
if (callCount == 1)
|
||||
{
|
||||
// First call: simulate communication error (isCommunicationError=true)
|
||||
// which produces RetryPlease — equivalent to primary node failing.
|
||||
return events.Select(_ => AlarmHistorianWriteOutcome.RetryPlease).ToArray();
|
||||
}
|
||||
// Second call (after cluster picker has rotated to secondary): Ack.
|
||||
return events.Select(_ => AlarmHistorianWriteOutcome.Ack).ToArray();
|
||||
});
|
||||
var writer = new AahClientManagedAlarmEventWriter(backend);
|
||||
var batch = new[] { Event("E1"), Event("E2") };
|
||||
|
||||
// First drain tick: primary "fails" → all RetryPlease (false at IPC layer).
|
||||
var firstResult = await writer.WriteAsync(batch, CancellationToken.None);
|
||||
firstResult.ShouldBe(new[] { false, false });
|
||||
|
||||
// Second drain tick: secondary succeeds → all Ack (true at IPC layer).
|
||||
var secondResult = await writer.WriteAsync(batch, CancellationToken.None);
|
||||
secondResult.ShouldBe(new[] { true, true });
|
||||
|
||||
backend.Calls.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
// hresult 0 + clean → Ack
|
||||
[InlineData(0, false, false, AlarmHistorianWriteOutcome.Ack)]
|
||||
|
||||
@@ -0,0 +1,164 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests
|
||||
{
|
||||
/// <summary>
|
||||
/// PR C.1 — pins the <see cref="SdkAlarmHistorianWriteBackend"/> contract:
|
||||
/// <list type="bullet">
|
||||
/// <item><description>
|
||||
/// Unit: the placeholder backend returns <see cref="AlarmHistorianWriteOutcome.RetryPlease"/>
|
||||
/// for every slot so the lmxopcua-side store-and-forward sink retains events rather than
|
||||
/// dropping them while D.1 is unresolved.
|
||||
/// </description></item>
|
||||
/// <item><description>
|
||||
/// Integration (rig-gated): once D.1 pins the live SDK entry point the Skip attribute is
|
||||
/// removed. The live test writes a synthetic batch to a real AVEVA Historian and asserts
|
||||
/// the cluster picker rotates from a broken primary to a healthy secondary.
|
||||
/// </description></item>
|
||||
/// </list>
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class SdkAlarmHistorianWriteBackendTests
|
||||
{
|
||||
// ── Placeholder-mode tests (no rig required) ─────────────────────────
|
||||
|
||||
[Fact]
|
||||
public async Task Placeholder_returns_RetryPlease_for_every_slot_so_queue_is_preserved()
|
||||
{
|
||||
// The SDK call-site in SdkAlarmHistorianWriteBackend is not yet pinned (PR D.1).
|
||||
// Until D.1 swaps in the live call, the backend must return RetryPlease for every
|
||||
// event so the lmxopcua-side SqliteStoreAndForwardSink retains the rows instead of
|
||||
// dropping them — same effect as the NullAlarmHistorianSink fallback, but each
|
||||
// slot is individually addressable for the drain worker.
|
||||
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
|
||||
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||
|
||||
var events = new[]
|
||||
{
|
||||
AlarmEvent("E1"),
|
||||
AlarmEvent("E2"),
|
||||
AlarmEvent("E3"),
|
||||
};
|
||||
var outcomes = await backend.WriteBatchAsync(events, CancellationToken.None);
|
||||
|
||||
outcomes.Length.ShouldBe(events.Length);
|
||||
outcomes.ShouldAllBe(o => o == AlarmHistorianWriteOutcome.RetryPlease);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Placeholder_returns_empty_array_for_empty_batch()
|
||||
{
|
||||
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
|
||||
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||
|
||||
var outcomes = await backend.WriteBatchAsync(Array.Empty<AlarmHistorianEventDto>(), CancellationToken.None);
|
||||
|
||||
outcomes.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Placeholder_returns_same_count_as_input_for_large_batch()
|
||||
{
|
||||
// Guards against an off-by-one error in the placeholder array allocation —
|
||||
// WriteBatchAsync must always return exactly as many outcomes as input events.
|
||||
var cfg = new HistorianConfiguration { ServerName = "placeholder-test", Enabled = true };
|
||||
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||
|
||||
var batch = Enumerable.Range(0, 1000).Select(i => AlarmEvent($"E{i}")).ToArray();
|
||||
var outcomes = await backend.WriteBatchAsync(batch, CancellationToken.None);
|
||||
|
||||
outcomes.Length.ShouldBe(1000);
|
||||
outcomes.ShouldAllBe(o => o == AlarmHistorianWriteOutcome.RetryPlease);
|
||||
}
|
||||
|
||||
// ── Rig-gated integration tests ───────────────────────────────────────
|
||||
//
|
||||
// The tests below need a live AVEVA Historian install and are gated with
|
||||
// Skip="rig-required". Once PR D.1 pins the SDK entry point, remove the
|
||||
// Skip attribute and add them to the integration test run profile.
|
||||
|
||||
[Fact(Skip = "rig-required: needs a live AVEVA Historian + aahClientManaged SDK — enable in PR D.1")]
|
||||
public async Task Live_single_event_roundtrip_returns_Ack()
|
||||
{
|
||||
// Spec (PR C.1, Tests): "1 / 100 / 1000 events through a fake aahClientManaged
|
||||
// writer; assert per-row outcome list parallel to input order."
|
||||
//
|
||||
// This slice exercises the *live* SDK path. The fake-backend variant at
|
||||
// AahClientManagedAlarmEventWriterTests covers the same assertion without the rig.
|
||||
var cfg = BuildRigConfig();
|
||||
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||
|
||||
var outcomes = await backend.WriteBatchAsync(new[] { AlarmEvent("rig-E1") }, CancellationToken.None);
|
||||
|
||||
outcomes.Length.ShouldBe(1);
|
||||
outcomes[0].ShouldBe(AlarmHistorianWriteOutcome.Ack);
|
||||
}
|
||||
|
||||
[Fact(Skip = "rig-required: needs a live AVEVA Historian cluster (two nodes) — enable in PR D.1")]
|
||||
public async Task Live_cluster_failover_primary_bad_rotates_to_secondary()
|
||||
{
|
||||
// Spec (PR C.1, Tests): "Cluster failover: primary node returns
|
||||
// BadCommunicationError; picker rotates to secondary; assert eventual success."
|
||||
//
|
||||
// Configure the first server name to point at a deliberately unreachable node
|
||||
// and the second to the real Historian; the picker should mark the first node
|
||||
// failed and succeed via the second.
|
||||
var cfg = new HistorianConfiguration
|
||||
{
|
||||
Enabled = true,
|
||||
ServerNames = new System.Collections.Generic.List<string>
|
||||
{
|
||||
"invalid-primary-node-deliberately-unreachable",
|
||||
Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
|
||||
},
|
||||
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
|
||||
IntegratedSecurity = true,
|
||||
FailureCooldownSeconds = 5,
|
||||
CommandTimeoutSeconds = 10,
|
||||
};
|
||||
var backend = new SdkAlarmHistorianWriteBackend(cfg);
|
||||
|
||||
var outcomes = await backend.WriteBatchAsync(new[] { AlarmEvent("rig-failover-E1") }, CancellationToken.None);
|
||||
|
||||
// The backend must succeed (Ack) via the secondary even though the primary was bad.
|
||||
outcomes.Length.ShouldBe(1);
|
||||
outcomes[0].ShouldBe(AlarmHistorianWriteOutcome.Ack);
|
||||
}
|
||||
|
||||
private static AlarmHistorianEventDto AlarmEvent(string id) => new AlarmHistorianEventDto
|
||||
{
|
||||
EventId = id,
|
||||
SourceName = "TestSource",
|
||||
ConditionId = "TestSource.Level.HiHi",
|
||||
AlarmType = "AnalogLimitAlarm.HiHi",
|
||||
Message = "C.1 integration test alarm",
|
||||
Severity = 500,
|
||||
EventTimeUtcTicks = DateTime.UtcNow.Ticks,
|
||||
AckComment = null,
|
||||
};
|
||||
|
||||
private static HistorianConfiguration BuildRigConfig() => new HistorianConfiguration
|
||||
{
|
||||
Enabled = true,
|
||||
ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
|
||||
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
|
||||
IntegratedSecurity = true,
|
||||
CommandTimeoutSeconds = 30,
|
||||
FailureCooldownSeconds = 60,
|
||||
};
|
||||
|
||||
private static int TryParseInt(string envName, int defaultValue)
|
||||
{
|
||||
var raw = Environment.GetEnvironmentVariable(envName);
|
||||
return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user