035bde0562
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
219 lines
8.3 KiB
C#
219 lines
8.3 KiB
C#
using Grpc.Core;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Xunit;
|
|
using ZB.MOM.WW.HistorianGateway.Client;
|
|
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
|
|
|
|
public sealed class GatewayAlarmHistorianWriterTests
|
|
{
|
|
private static AlarmHistorianEvent Evt(string id) => new(
|
|
id, "Area/Pump", "N", "LimitAlarm",
|
|
AlarmSeverity.High, "Activated", "m", "u", null,
|
|
new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc));
|
|
|
|
private static GatewayAlarmHistorianWriter Writer(FakeHistorianGatewayClient fake) =>
|
|
new(fake, NullLogger<GatewayAlarmHistorianWriter>.Instance);
|
|
|
|
[Fact]
|
|
public async Task All_acked_when_SendEvent_succeeds()
|
|
{
|
|
var fake = new FakeHistorianGatewayClient { SendEventResult = new WriteAck { Success = true } };
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A"), Evt("B") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.All(outcomes, o => Assert.Equal(HistorianWriteOutcome.Ack, o));
|
|
// One SendEvent per event so a single poison event cannot fail the whole batch.
|
|
Assert.Equal(2, fake.SendEventCallCount);
|
|
Assert.Equal(2, outcomes.Count);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Queued_ack_is_treated_as_Ack()
|
|
{
|
|
// A store-forward-queued send is durably accepted by the gateway → do not re-drain.
|
|
var fake = new FakeHistorianGatewayClient { SendEventResult = new WriteAck { Success = false, Queued = true } };
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.Ack, outcomes[0]);
|
|
}
|
|
|
|
// ---- Typed published-client exception hierarchy (production reality) ------------------------
|
|
|
|
[Fact]
|
|
public async Task Typed_Unavailable_is_RetryPlease()
|
|
{
|
|
var fake = new FakeHistorianGatewayClient
|
|
{
|
|
SendEventThrows = new HistorianGatewayUnavailableException("down"),
|
|
};
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Typed_Authentication_is_RetryPlease()
|
|
{
|
|
var fake = new FakeHistorianGatewayClient
|
|
{
|
|
SendEventThrows = new HistorianGatewayAuthenticationException("bad key"),
|
|
};
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Typed_Authorization_is_RetryPlease()
|
|
{
|
|
var fake = new FakeHistorianGatewayClient
|
|
{
|
|
SendEventThrows = new HistorianGatewayAuthorizationException("no scope"),
|
|
};
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Base_typed_exception_with_inner_permanent_status_is_PermanentFail()
|
|
{
|
|
// The published client maps a permanent gRPC code (InvalidArgument) onto the base
|
|
// HistorianGatewayException carrying the original RpcException as InnerException.
|
|
var inner = new RpcException(new Status(StatusCode.InvalidArgument, "malformed"));
|
|
var fake = new FakeHistorianGatewayClient
|
|
{
|
|
SendEventThrows = new HistorianGatewayException("malformed", inner),
|
|
};
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Bare_base_typed_exception_is_PermanentFail()
|
|
{
|
|
// No classifiable inner status → default to PermanentFail to avoid infinite drain loops.
|
|
var fake = new FakeHistorianGatewayClient
|
|
{
|
|
SendEventThrows = new HistorianGatewayException("unclassifiable"),
|
|
};
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
|
|
}
|
|
|
|
// ---- Defensive raw RpcException path --------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Raw_Unavailable_is_RetryPlease()
|
|
{
|
|
var fake = new FakeHistorianGatewayClient
|
|
{
|
|
SendEventThrows = new RpcException(new Status(StatusCode.Unavailable, "down")),
|
|
};
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData(StatusCode.DeadlineExceeded)]
|
|
[InlineData(StatusCode.ResourceExhausted)]
|
|
[InlineData(StatusCode.Aborted)]
|
|
[InlineData(StatusCode.Internal)]
|
|
[InlineData(StatusCode.Unauthenticated)]
|
|
[InlineData(StatusCode.PermissionDenied)]
|
|
public async Task Raw_transient_or_auth_status_is_RetryPlease(StatusCode code)
|
|
{
|
|
var fake = new FakeHistorianGatewayClient
|
|
{
|
|
SendEventThrows = new RpcException(new Status(code, "x")),
|
|
};
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.RetryPlease, outcomes[0]);
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData(StatusCode.InvalidArgument)]
|
|
[InlineData(StatusCode.FailedPrecondition)]
|
|
[InlineData(StatusCode.OutOfRange)]
|
|
[InlineData(StatusCode.Unimplemented)]
|
|
public async Task Raw_permanent_status_is_PermanentFail(StatusCode code)
|
|
{
|
|
var fake = new FakeHistorianGatewayClient
|
|
{
|
|
SendEventThrows = new RpcException(new Status(code, "x")),
|
|
};
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unknown_exception_is_PermanentFail()
|
|
{
|
|
var fake = new FakeHistorianGatewayClient { SendEventThrows = new InvalidOperationException("boom") };
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A") }, TestContext.Current.CancellationToken);
|
|
|
|
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Cancellation_mid_drain_is_RetryPlease_not_PermanentFail()
|
|
{
|
|
// Shutdown mid-drain: a cancelled token must NOT dead-letter in-flight events (silent data
|
|
// loss). Every outcome is RetryPlease (stays queued for next startup), WriteBatchAsync never
|
|
// throws, and the gateway is not called with a cancelled token (short-circuited up front).
|
|
using var cts = new CancellationTokenSource();
|
|
await cts.CancelAsync();
|
|
var fake = new FakeHistorianGatewayClient { SendEventThrows = new OperationCanceledException() };
|
|
|
|
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A"), Evt("B") }, cts.Token);
|
|
|
|
Assert.Equal(2, outcomes.Count);
|
|
Assert.All(outcomes, o => Assert.Equal(HistorianWriteOutcome.RetryPlease, o));
|
|
Assert.Equal(0, fake.SendEventCallCount);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Empty_batch_returns_empty()
|
|
{
|
|
var outcomes = await Writer(new FakeHistorianGatewayClient())
|
|
.WriteBatchAsync(Array.Empty<AlarmHistorianEvent>(), TestContext.Current.CancellationToken);
|
|
|
|
Assert.Empty(outcomes);
|
|
}
|
|
|
|
[Fact]
|
|
public void Dispose_with_async_only_client_does_not_throw()
|
|
{
|
|
// FakeHistorianGatewayClient implements IAsyncDisposable only — not IDisposable.
|
|
// The `as IDisposable` guard in GatewayAlarmHistorianWriter.Dispose() must safely
|
|
// no-op rather than throw when the client cannot be cast to IDisposable.
|
|
var fake = new FakeHistorianGatewayClient();
|
|
var writer = Writer(fake);
|
|
|
|
var ex = Record.Exception(() => ((IDisposable)writer).Dispose());
|
|
|
|
Assert.Null(ex);
|
|
// The Fake is IAsyncDisposable only; the sync Dispose must not call DisposeAsync.
|
|
Assert.Equal(0, fake.DisposeCallCount);
|
|
}
|
|
}
|