The store-and-forward retry loop emits the per-attempt and terminal cached audit rows (ApiCallCached/DbWriteCached Attempted, CachedResolve) via CachedCallLifecycleBridge from a CachedCallAttemptContext, not from the script context. The ExecutionId rollout (Task 4) already threaded ExecutionId and SourceScript through this path; ParentExecutionId — the spawning inbound-API request's ExecutionId — was not, so those retry-loop rows had ParentExecutionId = null even for an inbound-API-routed run. Thread it additively as a sibling at every carry point ExecutionId passes through: - StoreAndForwardMessage gains ParentExecutionId (Guid?). - StoreAndForwardStorage adds a nullable parent_execution_id column via the same idempotent PRAGMA-probed ALTER TABLE migration; rows persisted by an older build read back null (back-compat). The defensive Guid.TryParse read helper (ParseExecutionId) is renamed ParseGuidColumn and reused for both columns so a corrupt value cannot abort the retry sweep. - StoreAndForwardService.EnqueueAsync gains an optional parentExecutionId param, stamped onto the buffered message and surfaced on the CachedCallAttemptContext built in the retry loop. - CachedCallAttemptContext gains ParentExecutionId. - CachedCallLifecycleBridge.BuildPacket sets AuditEvent.ParentExecutionId from the context, beside the existing ExecutionId. - IExternalSystemClient.CachedCallAsync / IDatabaseGateway.CachedWriteAsync gain an optional parentExecutionId param; ScriptRuntimeContext's CachedCall / CachedWrite helpers pass _parentExecutionId. All threading is additive — ParentExecutionId is Guid? everywhere, null for non-routed runs, and old buffered S&F rows still deserialize with the new field null.
456 lines
18 KiB
C#
456 lines
18 KiB
C#
using Microsoft.Data.Sqlite;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
|
|
namespace ScadaLink.StoreAndForward.Tests;
|
|
|
|
/// <summary>
|
|
/// Audit Log #23 — M3 Bundle E Tasks E4 + E5: the store-and-forward retry
|
|
/// loop invokes <see cref="ICachedCallLifecycleObserver"/> after every
|
|
/// cached-call attempt. The observer is given a
|
|
/// <see cref="CachedCallAttemptContext"/> derived from the underlying
|
|
/// <see cref="StoreAndForwardMessage"/>; the audit bridge then materialises
|
|
/// the right <c>CachedCallTelemetry</c> packet (Attempted on every retry,
|
|
/// CachedResolve on terminal transitions). Tests run with
|
|
/// <c>DefaultRetryInterval=Zero</c> so the timer-driven retry sweep is
|
|
/// short-circuited by directly invoking
|
|
/// <see cref="StoreAndForwardService.RetryPendingMessagesAsync"/>.
|
|
/// </summary>
|
|
public class CachedCallAttemptEmissionTests : IAsyncLifetime, IDisposable
|
|
{
|
|
private readonly SqliteConnection _keepAlive;
|
|
private readonly StoreAndForwardStorage _storage;
|
|
private readonly StoreAndForwardService _service;
|
|
private readonly StoreAndForwardOptions _options;
|
|
private readonly CapturingObserver _observer;
|
|
|
|
public CachedCallAttemptEmissionTests()
|
|
{
|
|
var dbName = $"E4Tests_{Guid.NewGuid():N}";
|
|
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
|
_keepAlive = new SqliteConnection(connStr);
|
|
_keepAlive.Open();
|
|
|
|
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
|
|
|
_options = new StoreAndForwardOptions
|
|
{
|
|
DefaultRetryInterval = TimeSpan.Zero,
|
|
DefaultMaxRetries = 3,
|
|
RetryTimerInterval = TimeSpan.FromMinutes(10),
|
|
};
|
|
|
|
_observer = new CapturingObserver();
|
|
|
|
_service = new StoreAndForwardService(
|
|
_storage,
|
|
_options,
|
|
NullLogger<StoreAndForwardService>.Instance,
|
|
replication: null,
|
|
cachedCallObserver: _observer,
|
|
siteId: "site-77");
|
|
}
|
|
|
|
public async Task InitializeAsync() => await _storage.InitializeAsync();
|
|
public Task DisposeAsync() => Task.CompletedTask;
|
|
public void Dispose() => _keepAlive.Dispose();
|
|
|
|
/// <summary>
|
|
/// Captures every observer notification so tests can assert on the
|
|
/// emitted lifecycle sequence.
|
|
/// </summary>
|
|
private sealed class CapturingObserver : ICachedCallLifecycleObserver
|
|
{
|
|
public List<CachedCallAttemptContext> Notifications { get; } = new();
|
|
public Exception? ThrowOnNotify { get; set; }
|
|
|
|
public Task OnAttemptCompletedAsync(CachedCallAttemptContext context, CancellationToken ct = default)
|
|
{
|
|
if (ThrowOnNotify != null)
|
|
{
|
|
return Task.FromException(ThrowOnNotify);
|
|
}
|
|
lock (Notifications)
|
|
{
|
|
Notifications.Add(context);
|
|
}
|
|
return Task.CompletedTask;
|
|
}
|
|
}
|
|
|
|
private async Task<TrackedOperationId> EnqueueBufferedAsync(
|
|
StoreAndForwardCategory category, string target, int maxRetries = 3)
|
|
{
|
|
// The TrackedOperationId is the S&F message id (Bundle E3 contract).
|
|
var trackedId = TrackedOperationId.New();
|
|
await _service.EnqueueAsync(
|
|
category,
|
|
target,
|
|
"""{"payload":"x"}""",
|
|
originInstanceName: "Plant.Pump42",
|
|
maxRetries: maxRetries,
|
|
retryInterval: TimeSpan.Zero,
|
|
attemptImmediateDelivery: false,
|
|
messageId: trackedId.ToString());
|
|
return trackedId;
|
|
}
|
|
|
|
// ── Task E4: per-attempt observer notifications ──
|
|
|
|
[Fact]
|
|
public async Task Attempt_FailWithHttp500_EmitsAttemptedTelemetry()
|
|
{
|
|
// ExternalSystem cached call buffered, retry sweep encounters a
|
|
// transient failure on the first attempt.
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => throw new HttpRequestException("HTTP 500 from ERP"));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 5);
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(trackedId, notification.TrackedOperationId);
|
|
Assert.Equal("ApiOutbound", notification.Channel);
|
|
Assert.Equal("ERP", notification.Target);
|
|
Assert.Equal("site-77", notification.SourceSite);
|
|
Assert.Equal(CachedCallAttemptOutcome.TransientFailure, notification.Outcome);
|
|
Assert.Equal(1, notification.RetryCount);
|
|
Assert.Contains("HTTP 500", notification.LastError);
|
|
Assert.Equal("Plant.Pump42", notification.SourceInstanceId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Attempt_Success_EmitsDeliveredOutcome()
|
|
{
|
|
// ExternalSystem cached call buffered, retry sweep delivers the
|
|
// message successfully on its first attempt.
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => Task.FromResult(true));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(trackedId, notification.TrackedOperationId);
|
|
Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome);
|
|
Assert.Null(notification.LastError);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Attempt_PermanentFailure_EmitsPermanentFailureOutcome()
|
|
{
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => Task.FromResult(false));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(trackedId, notification.TrackedOperationId);
|
|
Assert.Equal(CachedCallAttemptOutcome.PermanentFailure, notification.Outcome);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Attempt_CachedDbWrite_EmitsDbOutboundChannel()
|
|
{
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite,
|
|
_ => Task.FromResult(true));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.CachedDbWrite, "myDb");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(trackedId, notification.TrackedOperationId);
|
|
Assert.Equal("DbOutbound", notification.Channel);
|
|
Assert.Equal("myDb", notification.Target);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Attempt_NotificationCategory_NoObserverNotification()
|
|
{
|
|
// Notifications are NOT cached calls — they're forwarded to central via
|
|
// a separate forwarder. The observer must not fire for Notification
|
|
// category messages.
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification,
|
|
_ => Task.FromResult(true));
|
|
await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.Notification,
|
|
"alerts",
|
|
"""{"subject":"x"}""",
|
|
originInstanceName: "Plant.Pump42",
|
|
attemptImmediateDelivery: false);
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
Assert.Empty(_observer.Notifications);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Attempt_MessageIdNotAGuid_NoObserverNotification()
|
|
{
|
|
// Pre-M3 cached calls (no TrackedOperationId threaded in) use a random
|
|
// GUID-N message id from S&F itself. We should still emit (M3 expects
|
|
// post-rollout these are tracked) — BUT pre-rollout messages can have
|
|
// a non-parseable id, in which case the observer is silently skipped
|
|
// to keep S&F bookkeeping intact.
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => Task.FromResult(true));
|
|
await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem,
|
|
"ERP",
|
|
"""{}""",
|
|
originInstanceName: "Plant.Pump42",
|
|
attemptImmediateDelivery: false,
|
|
messageId: "not-a-valid-guid-id");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
Assert.Empty(_observer.Notifications);
|
|
}
|
|
|
|
// ── Task E5: terminal-state observer notifications ──
|
|
|
|
[Fact]
|
|
public async Task Terminal_Delivered_EmitsResolveWithDeliveredStatus()
|
|
{
|
|
// A successful retry produces a single Delivered observer notification
|
|
// — the audit bridge maps this to both an Attempted-Delivered audit row
|
|
// and the terminal CachedResolve(Delivered) row. The S&F layer fires
|
|
// ONE notification per attempt and lets the bridge fan out as needed.
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => Task.FromResult(true));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(trackedId, notification.TrackedOperationId);
|
|
Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Terminal_Parked_OnMaxRetries_EmitsParkedMaxRetries()
|
|
{
|
|
// Configure handler to throw transient every time.
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => throw new HttpRequestException("Connection refused"));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 2);
|
|
|
|
// Two sweeps -> RetryCount climbs to 2 -> parked on the second sweep.
|
|
await _service.RetryPendingMessagesAsync();
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
Assert.Equal(2, _observer.Notifications.Count);
|
|
Assert.Equal(CachedCallAttemptOutcome.TransientFailure, _observer.Notifications[0].Outcome);
|
|
Assert.Equal(CachedCallAttemptOutcome.ParkedMaxRetries, _observer.Notifications[1].Outcome);
|
|
Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Lifecycle_RetryFail_RetrySucceed_EmitsExpectedSequence()
|
|
{
|
|
var calls = 0;
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ =>
|
|
{
|
|
calls++;
|
|
if (calls == 1) throw new HttpRequestException("transient");
|
|
return Task.FromResult(true);
|
|
});
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 5);
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
Assert.Equal(2, _observer.Notifications.Count);
|
|
Assert.Equal(CachedCallAttemptOutcome.TransientFailure, _observer.Notifications[0].Outcome);
|
|
Assert.Equal(1, _observer.Notifications[0].RetryCount);
|
|
Assert.Equal(CachedCallAttemptOutcome.Delivered, _observer.Notifications[1].Outcome);
|
|
Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId);
|
|
}
|
|
|
|
// ── Audit Log #23 (ExecutionId Task 4): ExecutionId / SourceScript ──
|
|
|
|
[Fact]
|
|
public async Task Attempt_CarriesExecutionIdAndSourceScript_FromBufferedMessage()
|
|
{
|
|
// A buffered cached call carries the originating script execution's
|
|
// ExecutionId + SourceScript. The retry sweep must surface both on the
|
|
// CachedCallAttemptContext handed to the observer so the audit bridge
|
|
// can stamp them on the retry-loop cached rows.
|
|
var executionId = Guid.NewGuid();
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => throw new HttpRequestException("HTTP 503"));
|
|
|
|
var trackedId = TrackedOperationId.New();
|
|
await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem,
|
|
"ERP",
|
|
"""{"payload":"x"}""",
|
|
originInstanceName: "Plant.Pump42",
|
|
maxRetries: 5,
|
|
retryInterval: TimeSpan.Zero,
|
|
attemptImmediateDelivery: false,
|
|
messageId: trackedId.ToString(),
|
|
executionId: executionId,
|
|
sourceScript: "Plant.Pump42/OnTick");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(executionId, notification.ExecutionId);
|
|
Assert.Equal("Plant.Pump42/OnTick", notification.SourceScript);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Attempt_NullExecutionIdAndSourceScript_SurfaceAsNull()
|
|
{
|
|
// Back-compat: a row buffered without ExecutionId / SourceScript (legacy
|
|
// enqueue path) must surface them as null on the context, not throw.
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => Task.FromResult(true));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Null(notification.ExecutionId);
|
|
Assert.Null(notification.SourceScript);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TerminalResolve_CarriesExecutionIdAndSourceScript()
|
|
{
|
|
// The terminal Delivered notification must also carry the threaded
|
|
// provenance so the CachedResolve audit row is correlated.
|
|
var executionId = Guid.NewGuid();
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite,
|
|
_ => Task.FromResult(true));
|
|
|
|
var trackedId = TrackedOperationId.New();
|
|
await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.CachedDbWrite,
|
|
"myDb",
|
|
"""{"payload":"x"}""",
|
|
originInstanceName: "Plant.Tank",
|
|
maxRetries: 3,
|
|
retryInterval: TimeSpan.Zero,
|
|
attemptImmediateDelivery: false,
|
|
messageId: trackedId.ToString(),
|
|
executionId: executionId,
|
|
sourceScript: "Plant.Tank/OnAlarm");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome);
|
|
Assert.Equal(executionId, notification.ExecutionId);
|
|
Assert.Equal("Plant.Tank/OnAlarm", notification.SourceScript);
|
|
}
|
|
|
|
// ── Audit Log #23 (ParentExecutionId Task 6): ParentExecutionId ──
|
|
|
|
[Fact]
|
|
public async Task Attempt_CarriesParentExecutionId_FromBufferedMessage()
|
|
{
|
|
// A cached call enqueued from an inbound-API-routed script run carries
|
|
// the spawning execution's ParentExecutionId. The retry sweep must
|
|
// surface it on the CachedCallAttemptContext beside ExecutionId so the
|
|
// audit bridge can stamp it on the retry-loop cached rows.
|
|
var parentExecutionId = Guid.NewGuid();
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => throw new HttpRequestException("HTTP 503"));
|
|
|
|
var trackedId = TrackedOperationId.New();
|
|
await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem,
|
|
"ERP",
|
|
"""{"payload":"x"}""",
|
|
originInstanceName: "Plant.Pump42",
|
|
maxRetries: 5,
|
|
retryInterval: TimeSpan.Zero,
|
|
attemptImmediateDelivery: false,
|
|
messageId: trackedId.ToString(),
|
|
parentExecutionId: parentExecutionId);
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(parentExecutionId, notification.ParentExecutionId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Attempt_NullParentExecutionId_SurfacesAsNull()
|
|
{
|
|
// Non-routed run: the originating script was not spawned by an
|
|
// inbound-API request, so no ParentExecutionId is threaded. It must
|
|
// surface as null on the context, not throw.
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => Task.FromResult(true));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP");
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Null(notification.ParentExecutionId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TerminalResolve_CarriesParentExecutionId()
|
|
{
|
|
// The terminal Delivered notification must also carry the threaded
|
|
// ParentExecutionId so the CachedResolve audit row correlates back to
|
|
// the spawning inbound-API execution.
|
|
var parentExecutionId = Guid.NewGuid();
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite,
|
|
_ => Task.FromResult(true));
|
|
|
|
var trackedId = TrackedOperationId.New();
|
|
await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.CachedDbWrite,
|
|
"myDb",
|
|
"""{"payload":"x"}""",
|
|
originInstanceName: "Plant.Tank",
|
|
maxRetries: 3,
|
|
retryInterval: TimeSpan.Zero,
|
|
attemptImmediateDelivery: false,
|
|
messageId: trackedId.ToString(),
|
|
parentExecutionId: parentExecutionId);
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var notification = Assert.Single(_observer.Notifications);
|
|
Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome);
|
|
Assert.Equal(parentExecutionId, notification.ParentExecutionId);
|
|
}
|
|
|
|
// ── Best-effort contract: observer throws must NOT corrupt retry bookkeeping ──
|
|
|
|
[Fact]
|
|
public async Task Observer_Throws_DoesNotCorruptRetryCount()
|
|
{
|
|
_observer.ThrowOnNotify = new InvalidOperationException("simulated audit failure");
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => Task.FromResult(true));
|
|
var trackedId = await EnqueueBufferedAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "ERP");
|
|
|
|
// Must not throw — observer is best-effort.
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
// The message was delivered (handler returned true) so it should be gone.
|
|
var msg = await _storage.GetMessageByIdAsync(trackedId.ToString());
|
|
Assert.Null(msg);
|
|
}
|
|
}
|