fix(store-and-forward): resolve StoreAndForward-004,005,010,013 — accurate handler-contract doc, conditional sweep writes, reset LastAttemptAt on parked retry, test coverage
This commit is contained in:
@@ -0,0 +1,173 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ScadaLink.Commons.Messages.RemoteQuery;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
|
||||
namespace ScadaLink.StoreAndForward.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// StoreAndForward-013: tests for the <see cref="ParkedMessageHandlerActor"/> actor
|
||||
/// bridge — the Query/Retry/Discard request-to-response mapping and the
|
||||
/// <c>ExtractMethodName</c> payload-JSON parsing (including the malformed-JSON branch).
|
||||
/// </summary>
|
||||
public class ParkedMessageHandlerActorTests : TestKit, IAsyncLifetime, IDisposable
|
||||
{
|
||||
private readonly SqliteConnection _keepAlive;
|
||||
private readonly StoreAndForwardStorage _storage;
|
||||
private readonly StoreAndForwardService _service;
|
||||
|
||||
public ParkedMessageHandlerActorTests()
|
||||
{
|
||||
var connStr = $"Data Source=ActorTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared";
|
||||
_keepAlive = new SqliteConnection(connStr);
|
||||
_keepAlive.Open();
|
||||
|
||||
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
||||
|
||||
var options = new StoreAndForwardOptions
|
||||
{
|
||||
DefaultRetryInterval = TimeSpan.Zero,
|
||||
DefaultMaxRetries = 1,
|
||||
RetryTimerInterval = TimeSpan.FromMinutes(10),
|
||||
ReplicationEnabled = false,
|
||||
};
|
||||
|
||||
_service = new StoreAndForwardService(
|
||||
_storage, options, NullLogger<StoreAndForwardService>.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync() => await _storage.InitializeAsync();
|
||||
|
||||
public Task DisposeAsync() => Task.CompletedTask;
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing) _keepAlive.Dispose();
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
/// <summary>Enqueues a message and parks it via the retry sweep (MaxRetries = 1).</summary>
|
||||
private async Task<string> ParkMessageAsync(StoreAndForwardCategory category, string payloadJson)
|
||||
{
|
||||
_service.RegisterDeliveryHandler(category, _ => throw new HttpRequestException("always fails"));
|
||||
var result = await _service.EnqueueAsync(category, "target", payloadJson, maxRetries: 1);
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
return result.MessageId;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Query_ReturnsParkedEntries_WithExtractedMethodName()
|
||||
{
|
||||
await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem,
|
||||
"""{"MethodName":"StartPump","Args":{}}""");
|
||||
|
||||
var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1")));
|
||||
actor.Tell(new ParkedMessageQueryRequest("corr-1", "site-1", 1, 50, DateTimeOffset.UtcNow));
|
||||
|
||||
var response = ExpectMsg<ParkedMessageQueryResponse>();
|
||||
Assert.True(response.Success);
|
||||
Assert.Equal("corr-1", response.CorrelationId);
|
||||
Assert.Equal("site-1", response.SiteId);
|
||||
Assert.Single(response.Messages);
|
||||
Assert.Equal("StartPump", response.Messages[0].MethodName);
|
||||
Assert.Equal(StoreAndForwardCategory.ExternalSystem, response.Messages[0].Category);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Query_NotificationPayload_UsesSubjectAsMethodName()
|
||||
{
|
||||
await ParkMessageAsync(StoreAndForwardCategory.Notification,
|
||||
"""{"Subject":"Tank overflow","Body":"..."}""");
|
||||
|
||||
var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1")));
|
||||
actor.Tell(new ParkedMessageQueryRequest("corr-2", "site-1", 1, 50, DateTimeOffset.UtcNow));
|
||||
|
||||
var response = ExpectMsg<ParkedMessageQueryResponse>();
|
||||
Assert.True(response.Success);
|
||||
Assert.Equal("Tank overflow", response.Messages[0].MethodName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Query_MalformedJsonPayload_FallsBackToCategoryName()
|
||||
{
|
||||
await ParkMessageAsync(StoreAndForwardCategory.CachedDbWrite, "not-valid-json{");
|
||||
|
||||
var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1")));
|
||||
actor.Tell(new ParkedMessageQueryRequest("corr-3", "site-1", 1, 50, DateTimeOffset.UtcNow));
|
||||
|
||||
var response = ExpectMsg<ParkedMessageQueryResponse>();
|
||||
Assert.True(response.Success);
|
||||
// Malformed JSON must not throw — ExtractMethodName falls back to the category.
|
||||
Assert.Equal(StoreAndForwardCategory.CachedDbWrite.ToString(), response.Messages[0].MethodName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Query_PayloadWithoutMethodNameOrSubject_FallsBackToCategoryName()
|
||||
{
|
||||
await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{"Unrelated":"value"}""");
|
||||
|
||||
var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1")));
|
||||
actor.Tell(new ParkedMessageQueryRequest("corr-4", "site-1", 1, 50, DateTimeOffset.UtcNow));
|
||||
|
||||
var response = ExpectMsg<ParkedMessageQueryResponse>();
|
||||
Assert.Equal(StoreAndForwardCategory.ExternalSystem.ToString(), response.Messages[0].MethodName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Retry_ParkedMessage_ReturnsSuccess()
|
||||
{
|
||||
var messageId = await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{}""");
|
||||
|
||||
var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1")));
|
||||
actor.Tell(new ParkedMessageRetryRequest("corr-5", "site-1", messageId, DateTimeOffset.UtcNow));
|
||||
|
||||
var response = ExpectMsg<ParkedMessageRetryResponse>();
|
||||
Assert.True(response.Success);
|
||||
Assert.Equal("corr-5", response.CorrelationId);
|
||||
Assert.Null(response.ErrorMessage);
|
||||
|
||||
var msg = await _storage.GetMessageByIdAsync(messageId);
|
||||
Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Retry_UnknownMessage_ReturnsFailureWithMessage()
|
||||
{
|
||||
var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1")));
|
||||
actor.Tell(new ParkedMessageRetryRequest("corr-6", "site-1", "does-not-exist", DateTimeOffset.UtcNow));
|
||||
|
||||
var response = ExpectMsg<ParkedMessageRetryResponse>();
|
||||
Assert.False(response.Success);
|
||||
Assert.Equal("corr-6", response.CorrelationId);
|
||||
Assert.NotNull(response.ErrorMessage);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Discard_ParkedMessage_ReturnsSuccessAndRemovesMessage()
|
||||
{
|
||||
var messageId = await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{}""");
|
||||
|
||||
var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1")));
|
||||
actor.Tell(new ParkedMessageDiscardRequest("corr-7", "site-1", messageId, DateTimeOffset.UtcNow));
|
||||
|
||||
var response = ExpectMsg<ParkedMessageDiscardResponse>();
|
||||
Assert.True(response.Success);
|
||||
Assert.Equal("corr-7", response.CorrelationId);
|
||||
|
||||
var msg = await _storage.GetMessageByIdAsync(messageId);
|
||||
Assert.Null(msg);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Discard_UnknownMessage_ReturnsFailureWithMessage()
|
||||
{
|
||||
var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1")));
|
||||
actor.Tell(new ParkedMessageDiscardRequest("corr-8", "site-1", "does-not-exist", DateTimeOffset.UtcNow));
|
||||
|
||||
var response = ExpectMsg<ParkedMessageDiscardResponse>();
|
||||
Assert.False(response.Success);
|
||||
Assert.NotNull(response.ErrorMessage);
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Akka.TestKit.Xunit2" />
|
||||
<PackageReference Include="coverlet.collector" />
|
||||
<PackageReference Include="Microsoft.Data.Sqlite" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" />
|
||||
|
||||
@@ -177,6 +177,49 @@ public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable
|
||||
Assert.Equal(1, msg.RetryCount); // one sweep retry recorded
|
||||
}
|
||||
|
||||
// ── StoreAndForward-005: sweep-vs-management race hardening ──
|
||||
|
||||
[Fact]
|
||||
public async Task RetryMessageAsync_StatusChangedDuringDelivery_SweepParkWriteIsSkipped()
|
||||
{
|
||||
// StoreAndForward-005: the retry sweep's state-changing writes must be
|
||||
// conditional on the status it observed, so a concurrent operator action that
|
||||
// moved the row out of Pending (e.g. between the sweep's snapshot load and its
|
||||
// park write) is not silently overwritten by the sweep's stale view.
|
||||
var result = await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
|
||||
attemptImmediateDelivery: false, maxRetries: 1);
|
||||
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
async msg =>
|
||||
{
|
||||
// Simulate an operator action winning the race: the row leaves Pending
|
||||
// (here: parked) while the sweep is still mid-delivery. The sweep would
|
||||
// otherwise unconditionally re-write this row from its stale snapshot.
|
||||
var parkedOutFromUnderTheSweep = new StoreAndForwardMessage
|
||||
{
|
||||
Id = msg.Id, Category = msg.Category, Target = msg.Target,
|
||||
PayloadJson = msg.PayloadJson, RetryCount = 7,
|
||||
MaxRetries = msg.MaxRetries, RetryIntervalMs = msg.RetryIntervalMs,
|
||||
CreatedAt = msg.CreatedAt, LastAttemptAt = DateTimeOffset.UtcNow,
|
||||
Status = StoreAndForwardMessageStatus.Parked,
|
||||
LastError = "operator/other writer"
|
||||
};
|
||||
await _storage.UpdateMessageAsync(parkedOutFromUnderTheSweep);
|
||||
throw new HttpRequestException("transient — sweep will try to park");
|
||||
});
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
|
||||
// The sweep observed Pending; the row is now Parked with the other writer's
|
||||
// RetryCount (7), not the sweep's (1). The sweep's conditional write was skipped.
|
||||
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
|
||||
Assert.NotNull(msg);
|
||||
Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status);
|
||||
Assert.Equal(7, msg.RetryCount);
|
||||
Assert.Equal("operator/other writer", msg.LastError);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RetryPendingMessagesAsync_PermanentFailureOnRetry_ParksMessage()
|
||||
{
|
||||
|
||||
@@ -106,6 +106,35 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable
|
||||
Assert.All(forRetry, m => Assert.Equal(StoreAndForwardMessageStatus.Pending, m.Status));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetMessagesForRetryAsync_NonZeroInterval_ExcludesNotYetDueIncludesDue()
|
||||
{
|
||||
// StoreAndForward-013: exercise the julianday elapsed-time comparison with a
|
||||
// non-zero retry interval. A message attempted just now must NOT be due; one
|
||||
// attempted long ago must be due.
|
||||
var notDue = CreateMessage("notdue", StoreAndForwardCategory.ExternalSystem);
|
||||
notDue.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds;
|
||||
notDue.LastAttemptAt = DateTimeOffset.UtcNow;
|
||||
await _storage.EnqueueAsync(notDue);
|
||||
|
||||
var due = CreateMessage("due", StoreAndForwardCategory.ExternalSystem);
|
||||
due.RetryIntervalMs = (long)TimeSpan.FromMinutes(5).TotalMilliseconds;
|
||||
due.LastAttemptAt = DateTimeOffset.UtcNow.AddHours(-2);
|
||||
await _storage.EnqueueAsync(due);
|
||||
|
||||
var neverAttempted = CreateMessage("never", StoreAndForwardCategory.ExternalSystem);
|
||||
neverAttempted.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds;
|
||||
neverAttempted.LastAttemptAt = null;
|
||||
await _storage.EnqueueAsync(neverAttempted);
|
||||
|
||||
var forRetry = await _storage.GetMessagesForRetryAsync();
|
||||
var ids = forRetry.Select(m => m.Id).ToHashSet();
|
||||
|
||||
Assert.DoesNotContain("notdue", ids);
|
||||
Assert.Contains("due", ids);
|
||||
Assert.Contains("never", ids);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetParkedMessagesAsync_ReturnsParkedOnly()
|
||||
{
|
||||
@@ -136,6 +165,31 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable
|
||||
Assert.Equal(0, retrieved.RetryCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RetryParkedMessageAsync_ClearsLastAttemptAt_SoMessageIsImmediatelyDue()
|
||||
{
|
||||
// StoreAndForward-010: a re-queued parked message must be unambiguously due
|
||||
// for the next sweep regardless of its (stale) last_attempt_at. Use a large
|
||||
// retry interval so a leftover timestamp would otherwise exclude the message.
|
||||
var msg = CreateMessage("requeue1", StoreAndForwardCategory.ExternalSystem);
|
||||
msg.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds;
|
||||
msg.LastAttemptAt = DateTimeOffset.UtcNow; // recent attempt
|
||||
msg.Status = StoreAndForwardMessageStatus.Parked;
|
||||
await _storage.EnqueueAsync(msg);
|
||||
await _storage.UpdateMessageAsync(msg);
|
||||
|
||||
var requeued = await _storage.RetryParkedMessageAsync("requeue1");
|
||||
Assert.True(requeued);
|
||||
|
||||
var retrieved = await _storage.GetMessageByIdAsync("requeue1");
|
||||
Assert.Null(retrieved!.LastAttemptAt);
|
||||
|
||||
// It must appear in the retry-due set even though the configured interval
|
||||
// (1 hour) has not elapsed since the original attempt.
|
||||
var due = await _storage.GetMessagesForRetryAsync();
|
||||
Assert.Contains(due, m => m.Id == "requeue1");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DiscardParkedMessageAsync_RemovesMessage()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user