Files
scadalink-design/tests/ScadaLink.HealthMonitoring.Tests/HealthReportSenderTests.cs

341 lines
13 KiB
C#

using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.StoreAndForward;
namespace ScadaLink.HealthMonitoring.Tests;
public class HealthReportSenderTests
{
private class FakeTransport : IHealthReportTransport
{
public List<SiteHealthReport> SentReports { get; } = [];
public void Send(SiteHealthReport report) => SentReports.Add(report);
}
private class FakeSiteIdentityProvider : ISiteIdentityProvider
{
public string SiteId { get; set; } = "test-site";
}
/// <summary>
/// Captures emitted log entries so tests can assert that non-fatal failures
/// are surfaced (HealthMonitoring-010) rather than silently swallowed.
/// </summary>
private sealed class CapturingLogger<T> : ILogger<T>
{
public sealed record Entry(LogLevel Level, string Message, Exception? Exception);
public List<Entry> Entries { get; } = [];
public IDisposable BeginScope<TState>(TState state) where TState : notnull => NullScope.Instance;
public bool IsEnabled(LogLevel logLevel) => true;
public void Log<TState>(
LogLevel logLevel, EventId eventId, TState state, Exception? exception,
Func<TState, Exception?, string> formatter)
{
lock (Entries)
{
Entries.Add(new Entry(logLevel, formatter(state, exception), exception));
}
}
private sealed class NullScope : IDisposable
{
public static readonly NullScope Instance = new();
public void Dispose() { }
}
}
/// <summary>An <see cref="IClusterNodeProvider"/> whose query always throws.</summary>
private sealed class ThrowingClusterNodeProvider : IClusterNodeProvider
{
public bool SelfIsPrimary => true;
public IReadOnlyList<NodeStatus> GetClusterNodes() =>
throw new InvalidOperationException("cluster query failed");
}
[Fact]
public async Task SendsReportsWithMonotonicSequenceNumbers()
{
var transport = new FakeTransport();
var collector = new SiteHealthCollector();
collector.SetActiveNode(true);
var options = Options.Create(new HealthMonitoringOptions
{
ReportInterval = TimeSpan.FromMilliseconds(50)
});
var sender = new HealthReportSender(
collector,
transport,
options,
NullLogger<HealthReportSender>.Instance,
new FakeSiteIdentityProvider { SiteId = "site-A" });
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
try
{
await sender.StartAsync(cts.Token);
await Task.Delay(280, CancellationToken.None);
await sender.StopAsync(CancellationToken.None);
}
catch (OperationCanceledException) { }
// Should have sent several reports
Assert.True(transport.SentReports.Count >= 2,
$"Expected at least 2 reports, got {transport.SentReports.Count}");
// Verify strictly-monotonic sequence numbers and matching site id
for (int i = 0; i < transport.SentReports.Count; i++)
{
if (i > 0)
{
Assert.True(
transport.SentReports[i].SequenceNumber > transport.SentReports[i - 1].SequenceNumber,
$"Sequence numbers not strictly increasing at index {i}");
}
Assert.Equal("site-A", transport.SentReports[i].SiteId);
}
}
[Fact]
public async Task FirstReportSequenceExceedsStartupUnixMs()
{
// Reports are seeded with Unix-ms at construction so a freshly-active
// node always sorts after the prior active. Verify the first emitted
// sequence is at least the startup epoch.
var transport = new FakeTransport();
var collector = new SiteHealthCollector();
collector.SetActiveNode(true);
var options = Options.Create(new HealthMonitoringOptions
{
ReportInterval = TimeSpan.FromMilliseconds(50)
});
var beforeCtor = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var sender = new HealthReportSender(
collector,
transport,
options,
NullLogger<HealthReportSender>.Instance,
new FakeSiteIdentityProvider());
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(150));
try
{
await sender.StartAsync(cts.Token);
await Task.Delay(120, CancellationToken.None);
await sender.StopAsync(CancellationToken.None);
}
catch (OperationCanceledException) { }
Assert.True(transport.SentReports.Count >= 1);
Assert.True(
transport.SentReports[0].SequenceNumber >= beforeCtor,
$"First sequence {transport.SentReports[0].SequenceNumber} should be >= startup epoch {beforeCtor}");
}
[Fact]
public async Task ReportsIncludeUtcTimestamp()
{
var transport = new FakeTransport();
var collector = new SiteHealthCollector();
collector.SetActiveNode(true);
var options = Options.Create(new HealthMonitoringOptions
{
ReportInterval = TimeSpan.FromMilliseconds(50)
});
var sender = new HealthReportSender(
collector,
transport,
options,
NullLogger<HealthReportSender>.Instance,
new FakeSiteIdentityProvider());
var before = DateTimeOffset.UtcNow;
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(150));
try
{
await sender.StartAsync(cts.Token);
await Task.Delay(120, CancellationToken.None);
await sender.StopAsync(CancellationToken.None);
}
catch (OperationCanceledException) { }
var after = DateTimeOffset.UtcNow;
Assert.True(transport.SentReports.Count >= 1);
foreach (var report in transport.SentReports)
{
Assert.InRange(report.ReportTimestamp, before, after);
Assert.Equal(TimeSpan.Zero, report.ReportTimestamp.Offset);
}
}
/// <summary>
/// HealthMonitoring-001 regression: the documented "store-and-forward buffer
/// depth" metric (pending messages by category) must actually be populated in
/// the emitted report. Previously SetStoreAndForwardDepths had no callers, so
/// StoreAndForwardBufferDepths was always empty. The sender must query the S&amp;F
/// engine's per-category depth API and include it alongside the parked count.
/// </summary>
[Fact]
public async Task ReportsIncludeStoreAndForwardBufferDepthsFromStorage()
{
var dbName = $"HealthSfDepth_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
// Keep one connection alive so the in-memory DB persists for the test.
using var keepAlive = new SqliteConnection(connStr);
keepAlive.Open();
var storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
await storage.InitializeAsync();
// Two pending ExternalSystem messages and one pending Notification message.
await storage.EnqueueAsync(MakePendingMessage("m1", StoreAndForwardCategory.ExternalSystem));
await storage.EnqueueAsync(MakePendingMessage("m2", StoreAndForwardCategory.ExternalSystem));
await storage.EnqueueAsync(MakePendingMessage("m3", StoreAndForwardCategory.Notification));
var transport = new FakeTransport();
var collector = new SiteHealthCollector();
collector.SetActiveNode(true);
var options = Options.Create(new HealthMonitoringOptions
{
ReportInterval = TimeSpan.FromMilliseconds(50)
});
var sender = new HealthReportSender(
collector,
transport,
options,
NullLogger<HealthReportSender>.Instance,
new FakeSiteIdentityProvider(),
sfStorage: storage);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
try
{
await sender.StartAsync(cts.Token);
await Task.Delay(250, CancellationToken.None);
await sender.StopAsync(CancellationToken.None);
}
catch (OperationCanceledException) { }
Assert.True(transport.SentReports.Count >= 1);
var depths = transport.SentReports[^1].StoreAndForwardBufferDepths;
Assert.Equal(2, depths[nameof(StoreAndForwardCategory.ExternalSystem)]);
Assert.Equal(1, depths[nameof(StoreAndForwardCategory.Notification)]);
Assert.False(depths.ContainsKey(nameof(StoreAndForwardCategory.CachedDbWrite)));
}
private static StoreAndForwardMessage MakePendingMessage(string id, StoreAndForwardCategory category) =>
new()
{
Id = id,
Category = category,
Target = "target",
PayloadJson = "{}",
RetryCount = 0,
MaxRetries = 50,
RetryIntervalMs = 30_000,
CreatedAt = DateTimeOffset.UtcNow,
Status = StoreAndForwardMessageStatus.Pending
};
[Fact]
public void InitialSequenceNumberSeededWithUnixMs()
{
var transport = new FakeTransport();
var collector = new SiteHealthCollector();
var options = Options.Create(new HealthMonitoringOptions());
var beforeCtor = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var sender = new HealthReportSender(
collector,
transport,
options,
NullLogger<HealthReportSender>.Instance,
new FakeSiteIdentityProvider());
var afterCtor = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
Assert.InRange(sender.CurrentSequenceNumber, beforeCtor, afterCtor);
}
/// <summary>
/// HealthMonitoring-010 regression: a failure refreshing cluster nodes is
/// non-fatal (the report still ships) but must no longer be swallowed by a
/// bare <c>catch {}</c> — it must be logged as a warning with the exception so
/// persistent degradation is diagnosable.
/// </summary>
[Fact]
public async Task ClusterNodeRefreshFailure_IsLoggedNotSwallowed()
{
var transport = new FakeTransport();
var collector = new SiteHealthCollector();
collector.SetActiveNode(true);
var logger = new CapturingLogger<HealthReportSender>();
var options = Options.Create(new HealthMonitoringOptions
{
ReportInterval = TimeSpan.FromMilliseconds(50)
});
var sender = new HealthReportSender(
collector,
transport,
options,
logger,
new FakeSiteIdentityProvider(),
clusterNodeProvider: new ThrowingClusterNodeProvider());
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
try
{
await sender.StartAsync(cts.Token);
await Task.Delay(250, CancellationToken.None);
await sender.StopAsync(CancellationToken.None);
}
catch (OperationCanceledException) { }
// The report loop continues despite the failure...
Assert.NotEmpty(transport.SentReports);
// ...but the failure is surfaced as a warning carrying the exception.
CapturingLogger<HealthReportSender>.Entry[] warnings;
lock (logger.Entries)
{
warnings = logger.Entries
.Where(e => e.Level == LogLevel.Warning && e.Exception is InvalidOperationException)
.ToArray();
}
Assert.NotEmpty(warnings);
Assert.Contains(warnings, w => w.Message.Contains("cluster nodes", StringComparison.OrdinalIgnoreCase));
}
/// <summary>
/// HealthMonitoring-006 regression: the sequence-number seed must be derived
/// from the injected <see cref="TimeProvider"/> so the Unix-ms seeding strategy
/// is deterministically testable and the clock dependency is explicit, rather
/// than reading <c>DateTimeOffset.UtcNow</c> directly at field initialization.
/// </summary>
[Fact]
public void SequenceNumberSeed_UsesInjectedTimeProvider()
{
var fixedInstant = new DateTimeOffset(2026, 5, 16, 12, 0, 0, TimeSpan.Zero);
var timeProvider = new TestTimeProvider(fixedInstant);
var sender = new HealthReportSender(
new SiteHealthCollector(),
new FakeTransport(),
Options.Create(new HealthMonitoringOptions()),
NullLogger<HealthReportSender>.Instance,
new FakeSiteIdentityProvider(),
timeProvider: timeProvider);
Assert.Equal(fixedInstant.ToUnixTimeMilliseconds(), sender.CurrentSequenceNumber);
}
}