Files
scadalink-design/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs

463 lines
18 KiB
C#

using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.StoreAndForward.Tests;
/// <summary>
/// WP-10/12/13/14: Tests for the StoreAndForwardService retry engine and management.
/// </summary>
public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable
{
private readonly SqliteConnection _keepAlive;
private readonly StoreAndForwardStorage _storage;
private readonly StoreAndForwardService _service;
private readonly StoreAndForwardOptions _options;
public StoreAndForwardServiceTests()
{
var dbName = $"SvcTests_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
_keepAlive = new SqliteConnection(connStr);
_keepAlive.Open();
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
_options = new StoreAndForwardOptions
{
DefaultRetryInterval = TimeSpan.Zero,
DefaultMaxRetries = 3,
RetryTimerInterval = TimeSpan.FromMinutes(10)
};
_service = new StoreAndForwardService(
_storage, _options, NullLogger<StoreAndForwardService>.Instance);
}
public async Task InitializeAsync() => await _storage.InitializeAsync();
public Task DisposeAsync() => Task.CompletedTask;
public void Dispose() => _keepAlive.Dispose();
// ── WP-10: Immediate delivery ──
[Fact]
public async Task EnqueueAsync_ImmediateDeliverySuccess_ReturnsAcceptedNotBuffered()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => Task.FromResult(true));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api.example.com",
"""{"method":"Test"}""", "Pump1");
Assert.True(result.Accepted);
Assert.False(result.WasBuffered);
}
[Fact]
public async Task EnqueueAsync_PermanentFailure_ReturnsNotAccepted()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => Task.FromResult(false));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api.example.com",
"""{"method":"Test"}""");
Assert.False(result.Accepted);
Assert.False(result.WasBuffered);
}
[Fact]
public async Task EnqueueAsync_TransientFailure_BuffersForRetry()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("Connection refused"));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api.example.com",
"""{"method":"Test"}""", "Pump1");
Assert.True(result.Accepted);
Assert.True(result.WasBuffered);
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.NotNull(msg);
Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status);
// StoreAndForward-003: RetryCount counts sweep retries only; the immediate
// attempt is attempt 0, so a freshly buffered message has RetryCount 0.
Assert.Equal(0, msg.RetryCount);
}
[Fact]
public async Task EnqueueAsync_NoHandler_BuffersForLater()
{
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.Notification, "alerts@company.com",
"""{"subject":"Alert"}""");
Assert.True(result.Accepted);
Assert.True(result.WasBuffered);
}
// ── WP-10: Retry engine ──
[Fact]
public async Task RetryPendingMessagesAsync_SuccessfulRetry_RemovesMessage()
{
int callCount = 0;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ =>
{
callCount++;
if (callCount == 1) throw new HttpRequestException("fail");
return Task.FromResult(true);
});
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
Assert.True(result.WasBuffered);
await _service.RetryPendingMessagesAsync();
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Null(msg);
}
[Fact]
public async Task RetryPendingMessagesAsync_MaxRetriesReached_ParksMessage()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("always fails"));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
maxRetries: 2);
// StoreAndForward-003: MaxRetries bounds sweep retries (not the immediate
// attempt), so a message with MaxRetries=2 needs two retry sweeps to park.
await _service.RetryPendingMessagesAsync();
var afterFirst = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Equal(StoreAndForwardMessageStatus.Pending, afterFirst!.Status);
await _service.RetryPendingMessagesAsync();
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.NotNull(msg);
Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status);
}
// ── StoreAndForward-003: retry-count accounting ──
[Fact]
public async Task RetryPendingMessagesAsync_MaxRetriesOne_PerformsExactlyOneRetryBeforeParking()
{
// The immediate attempt is attempt 0; MaxRetries=1 must allow exactly one
// retry sweep before parking. The pre-fix off-by-one parked with zero retries.
var attempts = 0;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => { Interlocked.Increment(ref attempts); throw new HttpRequestException("always fails"); });
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
maxRetries: 1);
// After the immediate failed attempt the message is buffered, not parked.
var buffered = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Equal(StoreAndForwardMessageStatus.Pending, buffered!.Status);
Assert.Equal(1, attempts); // only the immediate attempt so far
await _service.RetryPendingMessagesAsync();
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status);
Assert.Equal(2, attempts); // immediate attempt + exactly one retry
Assert.Equal(1, msg.RetryCount); // one sweep retry recorded
}
// ── StoreAndForward-005: sweep-vs-management race hardening ──
[Fact]
public async Task RetryMessageAsync_StatusChangedDuringDelivery_SweepParkWriteIsSkipped()
{
// StoreAndForward-005: the retry sweep's state-changing writes must be
// conditional on the status it observed, so a concurrent operator action that
// moved the row out of Pending (e.g. between the sweep's snapshot load and its
// park write) is not silently overwritten by the sweep's stale view.
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
attemptImmediateDelivery: false, maxRetries: 1);
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
async msg =>
{
// Simulate an operator action winning the race: the row leaves Pending
// (here: parked) while the sweep is still mid-delivery. The sweep would
// otherwise unconditionally re-write this row from its stale snapshot.
var parkedOutFromUnderTheSweep = new StoreAndForwardMessage
{
Id = msg.Id, Category = msg.Category, Target = msg.Target,
PayloadJson = msg.PayloadJson, RetryCount = 7,
MaxRetries = msg.MaxRetries, RetryIntervalMs = msg.RetryIntervalMs,
CreatedAt = msg.CreatedAt, LastAttemptAt = DateTimeOffset.UtcNow,
Status = StoreAndForwardMessageStatus.Parked,
LastError = "operator/other writer"
};
await _storage.UpdateMessageAsync(parkedOutFromUnderTheSweep);
throw new HttpRequestException("transient — sweep will try to park");
});
await _service.RetryPendingMessagesAsync();
// The sweep observed Pending; the row is now Parked with the other writer's
// RetryCount (7), not the sweep's (1). The sweep's conditional write was skipped.
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.NotNull(msg);
Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status);
Assert.Equal(7, msg.RetryCount);
Assert.Equal("operator/other writer", msg.LastError);
}
[Fact]
public async Task RetryPendingMessagesAsync_PermanentFailureOnRetry_ParksMessage()
{
int callCount = 0;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ =>
{
callCount++;
if (callCount == 1) throw new HttpRequestException("transient");
return Task.FromResult(false);
});
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
await _service.RetryPendingMessagesAsync();
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.NotNull(msg);
Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status);
}
// ── WP-12: Parked message management ──
[Fact]
public async Task RetryParkedMessageAsync_MovesBackToQueue()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("fail"));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
maxRetries: 1);
await _service.RetryPendingMessagesAsync();
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status);
var retried = await _service.RetryParkedMessageAsync(result.MessageId);
Assert.True(retried);
msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status);
Assert.Equal(0, msg.RetryCount);
}
[Fact]
public async Task DiscardParkedMessageAsync_PermanentlyRemoves()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("fail"));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
maxRetries: 1);
await _service.RetryPendingMessagesAsync();
var discarded = await _service.DiscardParkedMessageAsync(result.MessageId);
Assert.True(discarded);
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Null(msg);
}
[Fact]
public async Task GetParkedMessagesAsync_ReturnsPaginatedResults()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("fail"));
for (int i = 0; i < 3; i++)
{
await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, $"api{i}", """{}""",
maxRetries: 1);
}
await _service.RetryPendingMessagesAsync();
var (messages, total) = await _service.GetParkedMessagesAsync(
StoreAndForwardCategory.ExternalSystem, 1, 2);
Assert.Equal(2, messages.Count);
Assert.True(total >= 3);
}
// ── WP-13: Messages survive instance deletion ──
[Fact]
public async Task MessagesForInstance_SurviveAfterDeletion()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("fail"));
await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""", "Pump1");
await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api2", """{}""", "Pump1");
var count = await _service.GetMessageCountForInstanceAsync("Pump1");
Assert.Equal(2, count);
}
// ── WP-14: Health metrics ──
[Fact]
public async Task GetBufferDepthAsync_ReturnsCorrectDepth()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("fail"));
_service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification,
_ => throw new HttpRequestException("fail"));
await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api1", """{}""");
await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api2", """{}""");
await _service.EnqueueAsync(StoreAndForwardCategory.Notification, "email", """{}""");
var depth = await _service.GetBufferDepthAsync();
Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.ExternalSystem) >= 2);
Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.Notification) >= 1);
}
[Fact]
public async Task OnActivity_RaisedOnEnqueue()
{
var activities = new List<string>();
_service.OnActivity += (action, _, _) => activities.Add(action);
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => Task.FromResult(true));
await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api", """{}""");
Assert.Contains("Delivered", activities);
}
[Fact]
public async Task OnActivity_RaisedOnBuffer()
{
var activities = new List<string>();
_service.OnActivity += (action, _, _) => activities.Add(action);
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("fail"));
await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api", """{}""");
Assert.Contains("Queued", activities);
}
// ── StoreAndForward-009: faulting activity subscriber must not corrupt delivery ──
[Fact]
public async Task EnqueueAsync_ImmediateDeliverySuccess_FaultingActivitySubscriber_StillReportsDelivered()
{
// StoreAndForward-009: a throwing OnActivity subscriber (e.g. the site event
// log) must not be misclassified as a transient delivery failure. Pre-fix the
// subscriber's exception escaped RaiseActivity, was caught by EnqueueAsync's
// transient-failure handler, and a successfully delivered message was buffered.
_service.OnActivity += (_, _, _) => throw new InvalidOperationException("logging blew up");
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => Task.FromResult(true));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
Assert.True(result.Accepted);
Assert.False(result.WasBuffered); // delivered, NOT buffered
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Null(msg); // nothing left in the buffer
}
[Fact]
public async Task RetryMessageAsync_FaultingActivitySubscriber_DoesNotIncrementRetryCount()
{
// StoreAndForward-009: a throwing subscriber raised after a successful retry
// delivery must not be caught by the retry-failure handler and counted as a
// transient failure.
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
attemptImmediateDelivery: false, maxRetries: 5);
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => Task.FromResult(true));
_service.OnActivity += (_, _, _) => throw new InvalidOperationException("logging blew up");
await _service.RetryPendingMessagesAsync();
// The retry succeeded; the message must be gone, not re-buffered with a bumped count.
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Null(msg);
}
// ── WP-10: Per-source-entity retry settings ──
[Fact]
public async Task EnqueueAsync_CustomRetrySettings_Respected()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("fail"));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
maxRetries: 100,
retryInterval: TimeSpan.FromSeconds(60));
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.Equal(100, msg!.MaxRetries);
Assert.Equal(60000, msg.RetryIntervalMs);
}
// ── attemptImmediateDelivery: false — caller already attempted delivery ──
[Fact]
public async Task EnqueueAsync_AttemptImmediateDeliveryFalse_BuffersWithoutInvokingHandler()
{
// A caller that has already made its own delivery attempt passes
// attemptImmediateDelivery: false so the request is not dispatched twice.
var handlerCalls = 0;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => { Interlocked.Increment(ref handlerCalls); return Task.FromResult(true); });
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
attemptImmediateDelivery: false);
Assert.Equal(0, handlerCalls); // handler NOT invoked at enqueue time
Assert.True(result.WasBuffered);
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.NotNull(msg);
Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status);
// StoreAndForward-003: the caller's own attempt is attempt 0; RetryCount
// counts only sweep retries, so a freshly buffered message has RetryCount 0.
Assert.Equal(0, msg.RetryCount);
}
}