fix(store-and-forward): resolve StoreAndForward-015..017 — document maxRetries=0 contract, replicate operator retry/discard, real category in activity log
This commit is contained in:
@@ -105,4 +105,102 @@ public class StoreAndForwardReplicationTests : IAsyncLifetime, IDisposable
|
||||
Assert.Contains(ops, o =>
|
||||
o.OperationType == ReplicationOperationType.Park && o.MessageId == result.MessageId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// StoreAndForward-016: an operator discarding a parked message must replicate
|
||||
/// a Remove so the standby's copy is also deleted (otherwise the discarded
|
||||
/// message reappears in the parked list after a failover).
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task DiscardingAParkedMessage_ReplicatesARemoveOperation()
|
||||
{
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => throw new HttpRequestException("always fails"));
|
||||
|
||||
var result = await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
||||
await _service.RetryPendingMessagesAsync(); // -> parked
|
||||
await WaitForReplicationAsync(2);
|
||||
|
||||
var discarded = await _service.DiscardParkedMessageAsync(result.MessageId);
|
||||
Assert.True(discarded);
|
||||
|
||||
var ops = await WaitForReplicationAsync(3);
|
||||
Assert.Contains(ops, o =>
|
||||
o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// StoreAndForward-016: an operator retrying a parked message must replicate a
|
||||
/// Requeue so the standby's copy moves back to Pending (otherwise it stays
|
||||
/// Parked on the standby and the operator's retry is lost across a failover).
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task RetryingAParkedMessage_ReplicatesARequeueOperation()
|
||||
{
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ => throw new HttpRequestException("always fails"));
|
||||
|
||||
var result = await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
||||
await _service.RetryPendingMessagesAsync(); // -> parked
|
||||
await WaitForReplicationAsync(2);
|
||||
|
||||
var retried = await _service.RetryParkedMessageAsync(result.MessageId);
|
||||
Assert.True(retried);
|
||||
|
||||
var ops = await WaitForReplicationAsync(3);
|
||||
var requeue = ops.SingleOrDefault(o =>
|
||||
o.OperationType == ReplicationOperationType.Requeue && o.MessageId == result.MessageId);
|
||||
Assert.NotNull(requeue);
|
||||
Assert.NotNull(requeue!.Message);
|
||||
Assert.Equal(StoreAndForwardMessageStatus.Pending, requeue.Message!.Status);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// StoreAndForward-016: the standby applies a Requeue by moving its row back to
|
||||
/// Pending with retry_count = 0, mirroring the active node's local state.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task ApplyReplicatedOperation_Requeue_MovesStandbyRowBackToPending()
|
||||
{
|
||||
var replication = new ReplicationService(
|
||||
new StoreAndForwardOptions { ReplicationEnabled = true },
|
||||
NullLogger<ReplicationService>.Instance);
|
||||
|
||||
var parked = new StoreAndForwardMessage
|
||||
{
|
||||
Id = "requeue1",
|
||||
Category = StoreAndForwardCategory.ExternalSystem,
|
||||
Target = "api",
|
||||
PayloadJson = "{}",
|
||||
RetryCount = 5,
|
||||
MaxRetries = 1,
|
||||
RetryIntervalMs = 0,
|
||||
CreatedAt = DateTimeOffset.UtcNow,
|
||||
Status = StoreAndForwardMessageStatus.Parked,
|
||||
};
|
||||
await _storage.EnqueueAsync(parked);
|
||||
|
||||
var requeued = new StoreAndForwardMessage
|
||||
{
|
||||
Id = parked.Id,
|
||||
Category = parked.Category,
|
||||
Target = parked.Target,
|
||||
PayloadJson = parked.PayloadJson,
|
||||
RetryCount = 0,
|
||||
MaxRetries = parked.MaxRetries,
|
||||
RetryIntervalMs = parked.RetryIntervalMs,
|
||||
CreatedAt = parked.CreatedAt,
|
||||
Status = StoreAndForwardMessageStatus.Pending,
|
||||
};
|
||||
await replication.ApplyReplicatedOperationAsync(
|
||||
new ReplicationOperation(ReplicationOperationType.Requeue, parked.Id, requeued),
|
||||
_storage);
|
||||
|
||||
var row = await _storage.GetMessageByIdAsync(parked.Id);
|
||||
Assert.NotNull(row);
|
||||
Assert.Equal(StoreAndForwardMessageStatus.Pending, row!.Status);
|
||||
Assert.Equal(0, row.RetryCount);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,6 +267,60 @@ public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable
|
||||
Assert.Equal(0, msg.RetryCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// StoreAndForward-017: the Retry activity-log entry must carry the parked
|
||||
/// message's true category, not a hard-coded ExternalSystem.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task RetryParkedMessageAsync_ActivityUsesMessageRealCategory()
|
||||
{
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification,
|
||||
_ => throw new HttpRequestException("fail"));
|
||||
|
||||
var result = await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.Notification, "ops-list", """{}""",
|
||||
maxRetries: 1);
|
||||
await _service.RetryPendingMessagesAsync(); // -> parked
|
||||
|
||||
var categories = new List<StoreAndForwardCategory>();
|
||||
_service.OnActivity += (action, category, _) =>
|
||||
{
|
||||
if (action == "Retry") categories.Add(category);
|
||||
};
|
||||
|
||||
var retried = await _service.RetryParkedMessageAsync(result.MessageId);
|
||||
Assert.True(retried);
|
||||
|
||||
Assert.Equal(new[] { StoreAndForwardCategory.Notification }, categories);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// StoreAndForward-017: the Discard activity-log entry must carry the parked
|
||||
/// message's true category, not a hard-coded ExternalSystem.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task DiscardParkedMessageAsync_ActivityUsesMessageRealCategory()
|
||||
{
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite,
|
||||
_ => throw new HttpRequestException("fail"));
|
||||
|
||||
var result = await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.CachedDbWrite, "site-db", """{}""",
|
||||
maxRetries: 1);
|
||||
await _service.RetryPendingMessagesAsync(); // -> parked
|
||||
|
||||
var categories = new List<StoreAndForwardCategory>();
|
||||
_service.OnActivity += (action, category, _) =>
|
||||
{
|
||||
if (action == "Discard") categories.Add(category);
|
||||
};
|
||||
|
||||
var discarded = await _service.DiscardParkedMessageAsync(result.MessageId);
|
||||
Assert.True(discarded);
|
||||
|
||||
Assert.Equal(new[] { StoreAndForwardCategory.CachedDbWrite }, categories);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DiscardParkedMessageAsync_PermanentlyRemoves()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user