feat(snf): per-attempt and terminal cached-call lifecycle observer (#23 M3)
Hook the store-and-forward retry loop so the audit pipeline can emit
per-attempt + terminal telemetry under the original TrackedOperationId
(Bundle E Tasks E4 + E5).
New seam:
* ICachedCallLifecycleObserver + CachedCallAttemptContext in
Commons.Interfaces.Services. Outcome enum
(Delivered / TransientFailure / PermanentFailure / ParkedMaxRetries)
is S&F-vocabulary; the bridge living in ScadaLink.AuditLog (Bundle F)
will map it to the AuditKind/AuditStatus pair when building the
CachedCallTelemetry packet.
* StoreAndForwardService gains an optional cachedCallObserver
constructor parameter + siteId. RetryMessageAsync fires the observer
exactly once per attempt with the appropriate outcome:
- handler returns true -> Delivered
- handler returns false -> PermanentFailure (and parks)
- handler throws + retries remaining -> TransientFailure
- handler throws + max retries hit -> ParkedMaxRetries (and parks)
Hook is best-effort: a thrown observer is logged + swallowed so a
failing audit pipeline can never be misclassified as a transient
delivery failure or corrupt the retry-count bookkeeping (alog.md §7).
Only cached-call categories (ExternalSystem, CachedDbWrite) generate
notifications — Notification category has its own central-side
audit pipeline (Notification Outbox / #21).
Pre-M3 callers that didn't thread a TrackedOperationId into the S&F
message id are silently skipped — the observer requires a parseable id
by contract. New S&F callers stamp the id as messageId (Bundle E3).
Bundle E tasks E4 + E5.
This commit is contained in:
@@ -0,0 +1,298 @@
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ScadaLink.Commons.Interfaces.Services;
|
||||
using ScadaLink.Commons.Types;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
|
||||
namespace ScadaLink.StoreAndForward.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Audit Log #23 — M3 Bundle E Tasks E4 + E5: the store-and-forward retry
|
||||
/// loop invokes <see cref="ICachedCallLifecycleObserver"/> after every
|
||||
/// cached-call attempt. The observer is given a
|
||||
/// <see cref="CachedCallAttemptContext"/> derived from the underlying
|
||||
/// <see cref="StoreAndForwardMessage"/>; the audit bridge then materialises
|
||||
/// the right <c>CachedCallTelemetry</c> packet (Attempted on every retry,
|
||||
/// CachedResolve on terminal transitions). Tests run with
|
||||
/// <c>DefaultRetryInterval=Zero</c> so the timer-driven retry sweep is
|
||||
/// short-circuited by directly invoking
|
||||
/// <see cref="StoreAndForwardService.RetryPendingMessagesAsync"/>.
|
||||
/// </summary>
|
||||
public class CachedCallAttemptEmissionTests : IAsyncLifetime, IDisposable
|
||||
{
|
||||
private readonly SqliteConnection _keepAlive;
|
||||
private readonly StoreAndForwardStorage _storage;
|
||||
private readonly StoreAndForwardService _service;
|
||||
private readonly StoreAndForwardOptions _options;
|
||||
private readonly CapturingObserver _observer;
|
||||
|
||||
public CachedCallAttemptEmissionTests()
|
||||
{
|
||||
var dbName = $"E4Tests_{Guid.NewGuid():N}";
|
||||
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
||||
_keepAlive = new SqliteConnection(connStr);
|
||||
_keepAlive.Open();
|
||||
|
||||
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
||||
|
||||
_options = new StoreAndForwardOptions
|
||||
{
|
||||
DefaultRetryInterval = TimeSpan.Zero,
|
||||
DefaultMaxRetries = 3,
|
||||
RetryTimerInterval = TimeSpan.FromMinutes(10),
|
||||
};
|
||||
|
||||
_observer = new CapturingObserver();
|
||||
|
||||
_service = new StoreAndForwardService(
|
||||
_storage,
|
||||
_options,
|
||||
NullLogger<StoreAndForwardService>.Instance,
|
||||
replication: null,
|
||||
cachedCallObserver: _observer,
|
||||
siteId: "site-77");
|
||||
}
|
||||
|
||||
public async Task InitializeAsync() => await _storage.InitializeAsync();
|
||||
public Task DisposeAsync() => Task.CompletedTask;
|
||||
public void Dispose() => _keepAlive.Dispose();
|
||||
|
||||
/// <summary>
|
||||
/// Captures every observer notification so tests can assert on the
|
||||
/// emitted lifecycle sequence.
|
||||
/// </summary>
|
||||
private sealed class CapturingObserver : ICachedCallLifecycleObserver
|
||||
{
|
||||
public List<CachedCallAttemptContext> Notifications { get; } = new();
|
||||
public Exception? ThrowOnNotify { get; set; }
|
||||
|
||||
public Task OnAttemptCompletedAsync(CachedCallAttemptContext context, CancellationToken ct = default)
|
||||
{
|
||||
if (ThrowOnNotify != null)
|
||||
{
|
||||
return Task.FromException(ThrowOnNotify);
|
||||
}
|
||||
lock (Notifications)
|
||||
{
|
||||
Notifications.Add(context);
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<TrackedOperationId> EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory category, string target, int maxRetries = 3)
|
||||
{
|
||||
// The TrackedOperationId is the S&F message id (Bundle E3 contract).
|
||||
var trackedId = TrackedOperationId.New();
|
||||
await _service.EnqueueAsync(
|
||||
category,
|
||||
target,
|
||||
"""{"payload":"x"}""",
|
||||
originInstanceName: "Plant.Pump42",
|
||||
maxRetries: maxRetries,
|
||||
retryInterval: TimeSpan.Zero,
|
||||
attemptImmediateDelivery: false,
|
||||
messageId: trackedId.ToString());
|
||||
return trackedId;
|
||||
}
|
||||
|
||||
// ── Task E4: per-attempt observer notifications ──
|
||||
|
||||
[Fact]
|
||||
public async Task Attempt_FailWithHttp500_EmitsAttemptedTelemetry()
|
||||
{
|
||||
// ExternalSystem cached call buffered, retry sweep encounters a
|
||||
// transient failure on the first attempt.
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => throw new HttpRequestException("HTTP 500 from ERP"));
|
||||
var trackedId = await EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 5);
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
var notification = Assert.Single(_observer.Notifications);
|
||||
Assert.Equal(trackedId, notification.TrackedOperationId);
|
||||
Assert.Equal("ApiOutbound", notification.Channel);
|
||||
Assert.Equal("ERP", notification.Target);
|
||||
Assert.Equal("site-77", notification.SourceSite);
|
||||
Assert.Equal(CachedCallAttemptOutcome.TransientFailure, notification.Outcome);
|
||||
Assert.Equal(1, notification.RetryCount);
|
||||
Assert.Contains("HTTP 500", notification.LastError);
|
||||
Assert.Equal("Plant.Pump42", notification.SourceInstanceId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Attempt_Success_EmitsDeliveredOutcome()
|
||||
{
|
||||
// ExternalSystem cached call buffered, retry sweep delivers the
|
||||
// message successfully on its first attempt.
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => Task.FromResult(true));
|
||||
var trackedId = await EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "ERP");
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
var notification = Assert.Single(_observer.Notifications);
|
||||
Assert.Equal(trackedId, notification.TrackedOperationId);
|
||||
Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome);
|
||||
Assert.Null(notification.LastError);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Attempt_PermanentFailure_EmitsPermanentFailureOutcome()
|
||||
{
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => Task.FromResult(false));
|
||||
var trackedId = await EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "ERP");
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
var notification = Assert.Single(_observer.Notifications);
|
||||
Assert.Equal(trackedId, notification.TrackedOperationId);
|
||||
Assert.Equal(CachedCallAttemptOutcome.PermanentFailure, notification.Outcome);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Attempt_CachedDbWrite_EmitsDbOutboundChannel()
|
||||
{
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite,
|
||||
_ => Task.FromResult(true));
|
||||
var trackedId = await EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory.CachedDbWrite, "myDb");
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
var notification = Assert.Single(_observer.Notifications);
|
||||
Assert.Equal(trackedId, notification.TrackedOperationId);
|
||||
Assert.Equal("DbOutbound", notification.Channel);
|
||||
Assert.Equal("myDb", notification.Target);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Attempt_NotificationCategory_NoObserverNotification()
|
||||
{
|
||||
// Notifications are NOT cached calls — they're forwarded to central via
|
||||
// a separate forwarder. The observer must not fire for Notification
|
||||
// category messages.
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification,
|
||||
_ => Task.FromResult(true));
|
||||
await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.Notification,
|
||||
"alerts",
|
||||
"""{"subject":"x"}""",
|
||||
originInstanceName: "Plant.Pump42",
|
||||
attemptImmediateDelivery: false);
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
Assert.Empty(_observer.Notifications);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Attempt_MessageIdNotAGuid_NoObserverNotification()
|
||||
{
|
||||
// Pre-M3 cached calls (no TrackedOperationId threaded in) use a random
|
||||
// GUID-N message id from S&F itself. We should still emit (M3 expects
|
||||
// post-rollout these are tracked) — BUT pre-rollout messages can have
|
||||
// a non-parseable id, in which case the observer is silently skipped
|
||||
// to keep S&F bookkeeping intact.
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => Task.FromResult(true));
|
||||
await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.ExternalSystem,
|
||||
"ERP",
|
||||
"""{}""",
|
||||
originInstanceName: "Plant.Pump42",
|
||||
attemptImmediateDelivery: false,
|
||||
messageId: "not-a-valid-guid-id");
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
Assert.Empty(_observer.Notifications);
|
||||
}
|
||||
|
||||
// ── Task E5: terminal-state observer notifications ──
|
||||
|
||||
[Fact]
|
||||
public async Task Terminal_Delivered_EmitsResolveWithDeliveredStatus()
|
||||
{
|
||||
// A successful retry produces a single Delivered observer notification
|
||||
// — the audit bridge maps this to both an Attempted-Delivered audit row
|
||||
// and the terminal CachedResolve(Delivered) row. The S&F layer fires
|
||||
// ONE notification per attempt and lets the bridge fan out as needed.
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => Task.FromResult(true));
|
||||
var trackedId = await EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "ERP");
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
var notification = Assert.Single(_observer.Notifications);
|
||||
Assert.Equal(trackedId, notification.TrackedOperationId);
|
||||
Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Terminal_Parked_OnMaxRetries_EmitsParkedMaxRetries()
|
||||
{
|
||||
// Configure handler to throw transient every time.
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => throw new HttpRequestException("Connection refused"));
|
||||
var trackedId = await EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 2);
|
||||
|
||||
// Two sweeps -> RetryCount climbs to 2 -> parked on the second sweep.
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
Assert.Equal(2, _observer.Notifications.Count);
|
||||
Assert.Equal(CachedCallAttemptOutcome.TransientFailure, _observer.Notifications[0].Outcome);
|
||||
Assert.Equal(CachedCallAttemptOutcome.ParkedMaxRetries, _observer.Notifications[1].Outcome);
|
||||
Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Lifecycle_RetryFail_RetrySucceed_EmitsExpectedSequence()
|
||||
{
|
||||
var calls = 0;
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ =>
|
||||
{
|
||||
calls++;
|
||||
if (calls == 1) throw new HttpRequestException("transient");
|
||||
return Task.FromResult(true);
|
||||
});
|
||||
var trackedId = await EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 5);
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
Assert.Equal(2, _observer.Notifications.Count);
|
||||
Assert.Equal(CachedCallAttemptOutcome.TransientFailure, _observer.Notifications[0].Outcome);
|
||||
Assert.Equal(1, _observer.Notifications[0].RetryCount);
|
||||
Assert.Equal(CachedCallAttemptOutcome.Delivered, _observer.Notifications[1].Outcome);
|
||||
Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId);
|
||||
}
|
||||
|
||||
// ── Best-effort contract: observer throws must NOT corrupt retry bookkeeping ──
|
||||
|
||||
[Fact]
|
||||
public async Task Observer_Throws_DoesNotCorruptRetryCount()
|
||||
{
|
||||
_observer.ThrowOnNotify = new InvalidOperationException("simulated audit failure");
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => Task.FromResult(true));
|
||||
var trackedId = await EnqueueBufferedAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "ERP");
|
||||
|
||||
// Must not throw — observer is best-effort.
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
// The message was delivered (handler returned true) so it should be gone.
|
||||
var msg = await _storage.GetMessageByIdAsync(trackedId.ToString());
|
||||
Assert.Null(msg);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user