using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.StoreAndForward.Tests;
///
/// WP-11: Tests for async replication to standby.
///
public class ReplicationServiceTests : IAsyncLifetime, IDisposable
{
private readonly SqliteConnection _keepAlive;
private readonly StoreAndForwardStorage _storage;
private readonly ReplicationService _replicationService;
public ReplicationServiceTests()
{
var dbName = $"RepTests_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
_keepAlive = new SqliteConnection(connStr);
_keepAlive.Open();
_storage = new StoreAndForwardStorage(connStr, NullLogger.Instance);
var options = new StoreAndForwardOptions { ReplicationEnabled = true };
_replicationService = new ReplicationService(
options, NullLogger.Instance);
}
public async Task InitializeAsync() => await _storage.InitializeAsync();
public Task DisposeAsync() => Task.CompletedTask;
public void Dispose() => _keepAlive.Dispose();
[Fact]
public void ReplicateEnqueue_NoHandler_DoesNotThrow()
{
var msg = CreateMessage("rep1");
_replicationService.ReplicateEnqueue(msg);
}
[Fact]
public async Task ReplicateEnqueue_WithHandler_ForwardsOperation()
{
ReplicationOperation? captured = null;
_replicationService.SetReplicationHandler(op =>
{
captured = op;
return Task.CompletedTask;
});
var msg = CreateMessage("rep2");
_replicationService.ReplicateEnqueue(msg);
await Task.Delay(200);
Assert.NotNull(captured);
Assert.Equal(ReplicationOperationType.Add, captured!.OperationType);
Assert.Equal("rep2", captured.MessageId);
}
[Fact]
public async Task ReplicateRemove_WithHandler_ForwardsRemoveOperation()
{
ReplicationOperation? captured = null;
_replicationService.SetReplicationHandler(op =>
{
captured = op;
return Task.CompletedTask;
});
_replicationService.ReplicateRemove("rep3");
await Task.Delay(200);
Assert.NotNull(captured);
Assert.Equal(ReplicationOperationType.Remove, captured!.OperationType);
Assert.Equal("rep3", captured.MessageId);
}
[Fact]
public async Task ReplicatePark_WithHandler_ForwardsParkOperation()
{
ReplicationOperation? captured = null;
_replicationService.SetReplicationHandler(op =>
{
captured = op;
return Task.CompletedTask;
});
var msg = CreateMessage("rep4");
_replicationService.ReplicatePark(msg);
await Task.Delay(200);
Assert.NotNull(captured);
Assert.Equal(ReplicationOperationType.Park, captured!.OperationType);
}
[Fact]
public async Task ApplyReplicatedOperationAsync_Add_EnqueuesMessage()
{
var msg = CreateMessage("apply1");
var operation = new ReplicationOperation(ReplicationOperationType.Add, "apply1", msg);
await _replicationService.ApplyReplicatedOperationAsync(operation, _storage);
var retrieved = await _storage.GetMessageByIdAsync("apply1");
Assert.NotNull(retrieved);
}
[Fact]
public async Task ApplyReplicatedOperationAsync_Remove_DeletesMessage()
{
var msg = CreateMessage("apply2");
await _storage.EnqueueAsync(msg);
var operation = new ReplicationOperation(ReplicationOperationType.Remove, "apply2", null);
await _replicationService.ApplyReplicatedOperationAsync(operation, _storage);
var retrieved = await _storage.GetMessageByIdAsync("apply2");
Assert.Null(retrieved);
}
[Fact]
public async Task ApplyReplicatedOperationAsync_Park_UpdatesStatus()
{
var msg = CreateMessage("apply3");
await _storage.EnqueueAsync(msg);
var operation = new ReplicationOperation(ReplicationOperationType.Park, "apply3", msg);
await _replicationService.ApplyReplicatedOperationAsync(operation, _storage);
var retrieved = await _storage.GetMessageByIdAsync("apply3");
Assert.NotNull(retrieved);
Assert.Equal(StoreAndForwardMessageStatus.Parked, retrieved!.Status);
}
[Fact]
public void ReplicateEnqueue_WhenReplicationDisabled_DoesNothing()
{
var options = new StoreAndForwardOptions { ReplicationEnabled = false };
var service = new ReplicationService(options, NullLogger.Instance);
bool handlerCalled = false;
service.SetReplicationHandler(_ => { handlerCalled = true; return Task.CompletedTask; });
service.ReplicateEnqueue(CreateMessage("disabled1"));
Assert.False(handlerCalled);
}
[Fact]
public async Task ReplicateEnqueue_HandlerThrows_DoesNotPropagateException()
{
_replicationService.SetReplicationHandler(_ =>
throw new InvalidOperationException("standby down"));
_replicationService.ReplicateEnqueue(CreateMessage("err1"));
await Task.Delay(200);
// No exception -- fire-and-forget, best-effort
}
private static StoreAndForwardMessage CreateMessage(string id)
{
return new StoreAndForwardMessage
{
Id = id,
Category = StoreAndForwardCategory.ExternalSystem,
Target = "target",
PayloadJson = "{}",
RetryCount = 0,
MaxRetries = 50,
RetryIntervalMs = 30000,
CreatedAt = DateTimeOffset.UtcNow,
Status = StoreAndForwardMessageStatus.Pending
};
}
}