207 lines
8.1 KiB
C#
207 lines
8.1 KiB
C#
using Microsoft.Data.Sqlite;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
|
|
namespace ScadaLink.StoreAndForward.Tests;
|
|
|
|
/// <summary>
|
|
/// StoreAndForward-001: the active node must forward every buffer operation
|
|
/// (add / remove / park) to the standby via the ReplicationService, so a
|
|
/// failover does not lose the buffer.
|
|
/// </summary>
|
|
public class StoreAndForwardReplicationTests : IAsyncLifetime, IDisposable
|
|
{
|
|
private readonly SqliteConnection _keepAlive;
|
|
private readonly StoreAndForwardStorage _storage;
|
|
private readonly StoreAndForwardService _service;
|
|
private readonly List<ReplicationOperation> _replicated = new();
|
|
|
|
public StoreAndForwardReplicationTests()
|
|
{
|
|
var connStr = $"Data Source=ReplTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared";
|
|
_keepAlive = new SqliteConnection(connStr);
|
|
_keepAlive.Open();
|
|
|
|
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
|
|
|
var options = new StoreAndForwardOptions
|
|
{
|
|
DefaultRetryInterval = TimeSpan.Zero,
|
|
DefaultMaxRetries = 1,
|
|
RetryTimerInterval = TimeSpan.FromMinutes(10),
|
|
ReplicationEnabled = true,
|
|
};
|
|
|
|
var replication = new ReplicationService(options, NullLogger<ReplicationService>.Instance);
|
|
replication.SetReplicationHandler(op =>
|
|
{
|
|
lock (_replicated) _replicated.Add(op);
|
|
return Task.CompletedTask;
|
|
});
|
|
|
|
_service = new StoreAndForwardService(
|
|
_storage, options, NullLogger<StoreAndForwardService>.Instance, replication);
|
|
}
|
|
|
|
public async Task InitializeAsync() => await _storage.InitializeAsync();
|
|
public Task DisposeAsync() => Task.CompletedTask;
|
|
public void Dispose() => _keepAlive.Dispose();
|
|
|
|
/// <summary>Replication is fire-and-forget (Task.Run); poll until the expected ops arrive.</summary>
|
|
private async Task<List<ReplicationOperation>> WaitForReplicationAsync(int count)
|
|
{
|
|
for (var i = 0; i < 100; i++)
|
|
{
|
|
lock (_replicated)
|
|
if (_replicated.Count >= count) return _replicated.ToList();
|
|
await Task.Delay(20);
|
|
}
|
|
lock (_replicated) return _replicated.ToList();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BufferingAMessage_ReplicatesAnAddOperation()
|
|
{
|
|
// No handler registered → message is buffered → an Add is replicated.
|
|
var result = await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
|
|
Assert.True(result.WasBuffered);
|
|
|
|
var ops = await WaitForReplicationAsync(1);
|
|
Assert.Contains(ops, o =>
|
|
o.OperationType == ReplicationOperationType.Add && o.MessageId == result.MessageId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SuccessfulRetry_ReplicatesARemoveOperation()
|
|
{
|
|
var calls = 0;
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => ++calls == 1
|
|
? throw new HttpRequestException("transient")
|
|
: Task.FromResult(true));
|
|
|
|
var result = await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var ops = await WaitForReplicationAsync(2);
|
|
Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Add);
|
|
Assert.Contains(ops, o =>
|
|
o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ParkedMessage_ReplicatesAParkOperation()
|
|
{
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => throw new HttpRequestException("always fails"));
|
|
|
|
var result = await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
|
await _service.RetryPendingMessagesAsync();
|
|
|
|
var ops = await WaitForReplicationAsync(2);
|
|
Assert.Contains(ops, o =>
|
|
o.OperationType == ReplicationOperationType.Park && o.MessageId == result.MessageId);
|
|
}
|
|
|
|
/// <summary>
|
|
/// StoreAndForward-016: an operator discarding a parked message must replicate
|
|
/// a Remove so the standby's copy is also deleted (otherwise the discarded
|
|
/// message reappears in the parked list after a failover).
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task DiscardingAParkedMessage_ReplicatesARemoveOperation()
|
|
{
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => throw new HttpRequestException("always fails"));
|
|
|
|
var result = await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
|
await _service.RetryPendingMessagesAsync(); // -> parked
|
|
await WaitForReplicationAsync(2);
|
|
|
|
var discarded = await _service.DiscardParkedMessageAsync(result.MessageId);
|
|
Assert.True(discarded);
|
|
|
|
var ops = await WaitForReplicationAsync(3);
|
|
Assert.Contains(ops, o =>
|
|
o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId);
|
|
}
|
|
|
|
/// <summary>
|
|
/// StoreAndForward-016: an operator retrying a parked message must replicate a
|
|
/// Requeue so the standby's copy moves back to Pending (otherwise it stays
|
|
/// Parked on the standby and the operator's retry is lost across a failover).
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task RetryingAParkedMessage_ReplicatesARequeueOperation()
|
|
{
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ => throw new HttpRequestException("always fails"));
|
|
|
|
var result = await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
|
await _service.RetryPendingMessagesAsync(); // -> parked
|
|
await WaitForReplicationAsync(2);
|
|
|
|
var retried = await _service.RetryParkedMessageAsync(result.MessageId);
|
|
Assert.True(retried);
|
|
|
|
var ops = await WaitForReplicationAsync(3);
|
|
var requeue = ops.SingleOrDefault(o =>
|
|
o.OperationType == ReplicationOperationType.Requeue && o.MessageId == result.MessageId);
|
|
Assert.NotNull(requeue);
|
|
Assert.NotNull(requeue!.Message);
|
|
Assert.Equal(StoreAndForwardMessageStatus.Pending, requeue.Message!.Status);
|
|
}
|
|
|
|
/// <summary>
|
|
/// StoreAndForward-016: the standby applies a Requeue by moving its row back to
|
|
/// Pending with retry_count = 0, mirroring the active node's local state.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task ApplyReplicatedOperation_Requeue_MovesStandbyRowBackToPending()
|
|
{
|
|
var replication = new ReplicationService(
|
|
new StoreAndForwardOptions { ReplicationEnabled = true },
|
|
NullLogger<ReplicationService>.Instance);
|
|
|
|
var parked = new StoreAndForwardMessage
|
|
{
|
|
Id = "requeue1",
|
|
Category = StoreAndForwardCategory.ExternalSystem,
|
|
Target = "api",
|
|
PayloadJson = "{}",
|
|
RetryCount = 5,
|
|
MaxRetries = 1,
|
|
RetryIntervalMs = 0,
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
Status = StoreAndForwardMessageStatus.Parked,
|
|
};
|
|
await _storage.EnqueueAsync(parked);
|
|
|
|
var requeued = new StoreAndForwardMessage
|
|
{
|
|
Id = parked.Id,
|
|
Category = parked.Category,
|
|
Target = parked.Target,
|
|
PayloadJson = parked.PayloadJson,
|
|
RetryCount = 0,
|
|
MaxRetries = parked.MaxRetries,
|
|
RetryIntervalMs = parked.RetryIntervalMs,
|
|
CreatedAt = parked.CreatedAt,
|
|
Status = StoreAndForwardMessageStatus.Pending,
|
|
};
|
|
await replication.ApplyReplicatedOperationAsync(
|
|
new ReplicationOperation(ReplicationOperationType.Requeue, parked.Id, requeued),
|
|
_storage);
|
|
|
|
var row = await _storage.GetMessageByIdAsync(parked.Id);
|
|
Assert.NotNull(row);
|
|
Assert.Equal(StoreAndForwardMessageStatus.Pending, row!.Status);
|
|
Assert.Equal(0, row.RetryCount);
|
|
}
|
|
}
|