230 lines
8.5 KiB
C#
230 lines
8.5 KiB
C#
using Microsoft.Data.Sqlite;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Options;
|
|
using ScadaLink.Commons.Messages.Health;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.StoreAndForward;
|
|
|
|
namespace ScadaLink.HealthMonitoring.Tests;
|
|
|
|
public class HealthReportSenderTests
|
|
{
|
|
private class FakeTransport : IHealthReportTransport
|
|
{
|
|
public List<SiteHealthReport> SentReports { get; } = [];
|
|
public void Send(SiteHealthReport report) => SentReports.Add(report);
|
|
}
|
|
|
|
private class FakeSiteIdentityProvider : ISiteIdentityProvider
|
|
{
|
|
public string SiteId { get; set; } = "test-site";
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SendsReportsWithMonotonicSequenceNumbers()
|
|
{
|
|
var transport = new FakeTransport();
|
|
var collector = new SiteHealthCollector();
|
|
collector.SetActiveNode(true);
|
|
var options = Options.Create(new HealthMonitoringOptions
|
|
{
|
|
ReportInterval = TimeSpan.FromMilliseconds(50)
|
|
});
|
|
|
|
var sender = new HealthReportSender(
|
|
collector,
|
|
transport,
|
|
options,
|
|
NullLogger<HealthReportSender>.Instance,
|
|
new FakeSiteIdentityProvider { SiteId = "site-A" });
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
|
|
try
|
|
{
|
|
await sender.StartAsync(cts.Token);
|
|
await Task.Delay(280, CancellationToken.None);
|
|
await sender.StopAsync(CancellationToken.None);
|
|
}
|
|
catch (OperationCanceledException) { }
|
|
|
|
// Should have sent several reports
|
|
Assert.True(transport.SentReports.Count >= 2,
|
|
$"Expected at least 2 reports, got {transport.SentReports.Count}");
|
|
|
|
// Verify strictly-monotonic sequence numbers and matching site id
|
|
for (int i = 0; i < transport.SentReports.Count; i++)
|
|
{
|
|
if (i > 0)
|
|
{
|
|
Assert.True(
|
|
transport.SentReports[i].SequenceNumber > transport.SentReports[i - 1].SequenceNumber,
|
|
$"Sequence numbers not strictly increasing at index {i}");
|
|
}
|
|
Assert.Equal("site-A", transport.SentReports[i].SiteId);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task FirstReportSequenceExceedsStartupUnixMs()
|
|
{
|
|
// Reports are seeded with Unix-ms at construction so a freshly-active
|
|
// node always sorts after the prior active. Verify the first emitted
|
|
// sequence is at least the startup epoch.
|
|
var transport = new FakeTransport();
|
|
var collector = new SiteHealthCollector();
|
|
collector.SetActiveNode(true);
|
|
var options = Options.Create(new HealthMonitoringOptions
|
|
{
|
|
ReportInterval = TimeSpan.FromMilliseconds(50)
|
|
});
|
|
|
|
var beforeCtor = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
|
var sender = new HealthReportSender(
|
|
collector,
|
|
transport,
|
|
options,
|
|
NullLogger<HealthReportSender>.Instance,
|
|
new FakeSiteIdentityProvider());
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(150));
|
|
try
|
|
{
|
|
await sender.StartAsync(cts.Token);
|
|
await Task.Delay(120, CancellationToken.None);
|
|
await sender.StopAsync(CancellationToken.None);
|
|
}
|
|
catch (OperationCanceledException) { }
|
|
|
|
Assert.True(transport.SentReports.Count >= 1);
|
|
Assert.True(
|
|
transport.SentReports[0].SequenceNumber >= beforeCtor,
|
|
$"First sequence {transport.SentReports[0].SequenceNumber} should be >= startup epoch {beforeCtor}");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReportsIncludeUtcTimestamp()
|
|
{
|
|
var transport = new FakeTransport();
|
|
var collector = new SiteHealthCollector();
|
|
collector.SetActiveNode(true);
|
|
var options = Options.Create(new HealthMonitoringOptions
|
|
{
|
|
ReportInterval = TimeSpan.FromMilliseconds(50)
|
|
});
|
|
|
|
var sender = new HealthReportSender(
|
|
collector,
|
|
transport,
|
|
options,
|
|
NullLogger<HealthReportSender>.Instance,
|
|
new FakeSiteIdentityProvider());
|
|
|
|
var before = DateTimeOffset.UtcNow;
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(150));
|
|
try
|
|
{
|
|
await sender.StartAsync(cts.Token);
|
|
await Task.Delay(120, CancellationToken.None);
|
|
await sender.StopAsync(CancellationToken.None);
|
|
}
|
|
catch (OperationCanceledException) { }
|
|
var after = DateTimeOffset.UtcNow;
|
|
|
|
Assert.True(transport.SentReports.Count >= 1);
|
|
foreach (var report in transport.SentReports)
|
|
{
|
|
Assert.InRange(report.ReportTimestamp, before, after);
|
|
Assert.Equal(TimeSpan.Zero, report.ReportTimestamp.Offset);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// HealthMonitoring-001 regression: the documented "store-and-forward buffer
|
|
/// depth" metric (pending messages by category) must actually be populated in
|
|
/// the emitted report. Previously SetStoreAndForwardDepths had no callers, so
|
|
/// StoreAndForwardBufferDepths was always empty. The sender must query the S&F
|
|
/// engine's per-category depth API and include it alongside the parked count.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task ReportsIncludeStoreAndForwardBufferDepthsFromStorage()
|
|
{
|
|
var dbName = $"HealthSfDepth_{Guid.NewGuid():N}";
|
|
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
|
// Keep one connection alive so the in-memory DB persists for the test.
|
|
using var keepAlive = new SqliteConnection(connStr);
|
|
keepAlive.Open();
|
|
|
|
var storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
|
await storage.InitializeAsync();
|
|
|
|
// Two pending ExternalSystem messages and one pending Notification message.
|
|
await storage.EnqueueAsync(MakePendingMessage("m1", StoreAndForwardCategory.ExternalSystem));
|
|
await storage.EnqueueAsync(MakePendingMessage("m2", StoreAndForwardCategory.ExternalSystem));
|
|
await storage.EnqueueAsync(MakePendingMessage("m3", StoreAndForwardCategory.Notification));
|
|
|
|
var transport = new FakeTransport();
|
|
var collector = new SiteHealthCollector();
|
|
collector.SetActiveNode(true);
|
|
var options = Options.Create(new HealthMonitoringOptions
|
|
{
|
|
ReportInterval = TimeSpan.FromMilliseconds(50)
|
|
});
|
|
|
|
var sender = new HealthReportSender(
|
|
collector,
|
|
transport,
|
|
options,
|
|
NullLogger<HealthReportSender>.Instance,
|
|
new FakeSiteIdentityProvider(),
|
|
sfStorage: storage);
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
|
|
try
|
|
{
|
|
await sender.StartAsync(cts.Token);
|
|
await Task.Delay(250, CancellationToken.None);
|
|
await sender.StopAsync(CancellationToken.None);
|
|
}
|
|
catch (OperationCanceledException) { }
|
|
|
|
Assert.True(transport.SentReports.Count >= 1);
|
|
var depths = transport.SentReports[^1].StoreAndForwardBufferDepths;
|
|
Assert.Equal(2, depths[nameof(StoreAndForwardCategory.ExternalSystem)]);
|
|
Assert.Equal(1, depths[nameof(StoreAndForwardCategory.Notification)]);
|
|
Assert.False(depths.ContainsKey(nameof(StoreAndForwardCategory.CachedDbWrite)));
|
|
}
|
|
|
|
private static StoreAndForwardMessage MakePendingMessage(string id, StoreAndForwardCategory category) =>
|
|
new()
|
|
{
|
|
Id = id,
|
|
Category = category,
|
|
Target = "target",
|
|
PayloadJson = "{}",
|
|
RetryCount = 0,
|
|
MaxRetries = 50,
|
|
RetryIntervalMs = 30_000,
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
Status = StoreAndForwardMessageStatus.Pending
|
|
};
|
|
|
|
[Fact]
|
|
public void InitialSequenceNumberSeededWithUnixMs()
|
|
{
|
|
var transport = new FakeTransport();
|
|
var collector = new SiteHealthCollector();
|
|
var options = Options.Create(new HealthMonitoringOptions());
|
|
|
|
var beforeCtor = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
|
var sender = new HealthReportSender(
|
|
collector,
|
|
transport,
|
|
options,
|
|
NullLogger<HealthReportSender>.Instance,
|
|
new FakeSiteIdentityProvider());
|
|
var afterCtor = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
|
|
|
Assert.InRange(sender.CurrentSequenceNumber, beforeCtor, afterCtor);
|
|
}
|
|
}
|