feat(health.akka): cluster health check with configurable status policy
This commit is contained in:
@@ -0,0 +1,51 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||
|
||||
namespace ZB.MOM.WW.Health.Akka;
|
||||
|
||||
/// <summary>
|
||||
/// Health check that maps the local node's Akka cluster membership status to a
|
||||
/// <see cref="HealthStatus"/> through a configurable <see cref="AkkaClusterStatusPolicy"/>.
|
||||
/// Register to the <see cref="ZbHealthTags.Ready"/> tag (recommended <c>[ready, active]</c>).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The <see cref="ActorSystem"/> is resolved lazily from the service provider. If it is not yet
|
||||
/// available — e.g. during startup before Akka is initialised — the check returns
|
||||
/// <see cref="HealthStatus.Degraded"/> rather than throwing, so it is safe to register before Akka
|
||||
/// is fully up.
|
||||
/// </remarks>
|
||||
public sealed class AkkaClusterHealthCheck : IHealthCheck
|
||||
{
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly AkkaClusterStatusPolicy _policy;
|
||||
|
||||
/// <summary>Initializes a new <see cref="AkkaClusterHealthCheck"/>.</summary>
|
||||
/// <param name="serviceProvider">
|
||||
/// The application service provider. The <see cref="ActorSystem"/> is resolved lazily so the
|
||||
/// check is startup-safe: if no <see cref="ActorSystem"/> is registered yet the result is Degraded.
|
||||
/// </param>
|
||||
/// <param name="policy">The status-to-health mapping policy to apply.</param>
|
||||
public AkkaClusterHealthCheck(IServiceProvider serviceProvider, AkkaClusterStatusPolicy policy)
|
||||
{
|
||||
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
|
||||
_policy = policy ?? throw new ArgumentNullException(nameof(policy));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<HealthCheckResult> CheckHealthAsync(
|
||||
HealthCheckContext context,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var system = _serviceProvider.GetService<ActorSystem>();
|
||||
if (system is null)
|
||||
return Task.FromResult(HealthCheckResult.Degraded("ActorSystem not yet available."));
|
||||
|
||||
var status = Cluster.Get(system).SelfMember.Status;
|
||||
var health = _policy.Evaluate(status);
|
||||
var description = $"Akka cluster member status: {status}";
|
||||
|
||||
return Task.FromResult(new HealthCheckResult(health, description));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
using Akka.Cluster;
|
||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||
|
||||
namespace ZB.MOM.WW.Health.Akka;
|
||||
|
||||
/// <summary>
|
||||
/// Pure mapping from an Akka <see cref="MemberStatus"/> to a <see cref="HealthStatus"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Wraps a <see cref="Func{MemberStatus, HealthStatus}"/> so the decision logic is a deterministic,
|
||||
/// table-testable function — <see cref="AkkaClusterHealthCheck"/> only supplies the live cluster
|
||||
/// status. Two named presets reconcile the divergence between the existing ScadaBridge and OtOpcUa
|
||||
/// implementations; construct a custom instance for project-specific overrides.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class AkkaClusterStatusPolicy
|
||||
{
|
||||
private readonly Func<MemberStatus, HealthStatus> _evaluate;
|
||||
|
||||
/// <summary>Initializes a new <see cref="AkkaClusterStatusPolicy"/>.</summary>
|
||||
/// <param name="evaluate">The pure status-to-health mapping function.</param>
|
||||
public AkkaClusterStatusPolicy(Func<MemberStatus, HealthStatus> evaluate)
|
||||
{
|
||||
_evaluate = evaluate ?? throw new ArgumentNullException(nameof(evaluate));
|
||||
}
|
||||
|
||||
/// <summary>Applies the policy to the given member status.</summary>
|
||||
/// <param name="status">The local node's Akka cluster member status.</param>
|
||||
/// <returns>The mapped <see cref="HealthStatus"/>.</returns>
|
||||
public HealthStatus Evaluate(MemberStatus status) => _evaluate(status);
|
||||
|
||||
/// <summary>
|
||||
/// ScadaBridge origin: <c>Up</c>/<c>Joining</c> → Healthy, <c>Leaving</c>/<c>Exiting</c> →
|
||||
/// Degraded, everything else → Unhealthy. The convergence target for all projects.
|
||||
/// </summary>
|
||||
public static AkkaClusterStatusPolicy Default { get; } = new(static status => status switch
|
||||
{
|
||||
MemberStatus.Up or MemberStatus.Joining => HealthStatus.Healthy,
|
||||
MemberStatus.Leaving or MemberStatus.Exiting => HealthStatus.Degraded,
|
||||
_ => HealthStatus.Unhealthy,
|
||||
});
|
||||
|
||||
/// <summary>
|
||||
/// OtOpcUa origin: self-<c>Up</c>-among-reachable-members → Healthy, any non-<c>Up</c> state
|
||||
/// (including <c>Leaving</c>/<c>Exiting</c>/<c>Down</c>) → Degraded. Provided for backward
|
||||
/// compatibility during OtOpcUa's migration.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The original OtOpcUa check scanned the reachable member set for self with
|
||||
/// <c>Status == Up</c>; any other state caused the scan to miss self and collapse to Degraded.
|
||||
/// This preset reproduces that behavior: only <see cref="MemberStatus.Up"/> is Healthy.
|
||||
/// </remarks>
|
||||
public static AkkaClusterStatusPolicy OtOpcUaCompat { get; } = new(static status =>
|
||||
status == MemberStatus.Up ? HealthStatus.Healthy : HealthStatus.Degraded);
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
using Akka.Cluster;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||
using ZB.MOM.WW.Health.Akka;
|
||||
|
||||
namespace ZB.MOM.WW.Health.Akka.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Table-driven tests for the pure status-mapping function inside <see cref="AkkaClusterStatusPolicy"/>.
|
||||
/// The two presets (<see cref="AkkaClusterStatusPolicy.Default"/> and
|
||||
/// <see cref="AkkaClusterStatusPolicy.OtOpcUaCompat"/>) are the convergence targets for ScadaBridge
|
||||
/// and OtOpcUa respectively; every <see cref="MemberStatus"/> is exercised so a drift in either
|
||||
/// preset fails loudly. Also covers the startup-safety null-guard on <see cref="AkkaClusterHealthCheck"/>.
|
||||
/// </summary>
|
||||
public sealed class AkkaClusterStatusPolicyTests
|
||||
{
|
||||
public static IEnumerable<object[]> DefaultCases() => new[]
|
||||
{
|
||||
new object[] { MemberStatus.Up, HealthStatus.Healthy },
|
||||
new object[] { MemberStatus.Joining, HealthStatus.Healthy },
|
||||
new object[] { MemberStatus.Leaving, HealthStatus.Degraded },
|
||||
new object[] { MemberStatus.Exiting, HealthStatus.Degraded },
|
||||
new object[] { MemberStatus.WeaklyUp, HealthStatus.Unhealthy },
|
||||
new object[] { MemberStatus.Down, HealthStatus.Unhealthy },
|
||||
new object[] { MemberStatus.Removed, HealthStatus.Unhealthy },
|
||||
};
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(DefaultCases))]
|
||||
public void Default_MapsEveryStatus(MemberStatus status, HealthStatus expected)
|
||||
{
|
||||
Assert.Equal(expected, AkkaClusterStatusPolicy.Default.Evaluate(status));
|
||||
}
|
||||
|
||||
public static IEnumerable<object[]> OtOpcUaCompatCases() => new[]
|
||||
{
|
||||
new object[] { MemberStatus.Up, HealthStatus.Healthy },
|
||||
new object[] { MemberStatus.Joining, HealthStatus.Degraded },
|
||||
new object[] { MemberStatus.Leaving, HealthStatus.Degraded },
|
||||
new object[] { MemberStatus.Exiting, HealthStatus.Degraded },
|
||||
new object[] { MemberStatus.WeaklyUp, HealthStatus.Degraded },
|
||||
new object[] { MemberStatus.Down, HealthStatus.Degraded },
|
||||
new object[] { MemberStatus.Removed, HealthStatus.Degraded },
|
||||
};
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(OtOpcUaCompatCases))]
|
||||
public void OtOpcUaCompat_OnlyUpIsHealthy(MemberStatus status, HealthStatus expected)
|
||||
{
|
||||
Assert.Equal(expected, AkkaClusterStatusPolicy.OtOpcUaCompat.Evaluate(status));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CustomPolicy_UsesSuppliedFunc()
|
||||
{
|
||||
var policy = new AkkaClusterStatusPolicy(_ => HealthStatus.Unhealthy);
|
||||
Assert.Equal(HealthStatus.Unhealthy, policy.Evaluate(MemberStatus.Up));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task HealthCheck_NoActorSystem_ReturnsDegraded()
|
||||
{
|
||||
var provider = new ServiceCollection().BuildServiceProvider();
|
||||
var check = new AkkaClusterHealthCheck(provider, AkkaClusterStatusPolicy.Default);
|
||||
|
||||
var result = await check.CheckHealthAsync(NewContext(check));
|
||||
|
||||
Assert.Equal(HealthStatus.Degraded, result.Status);
|
||||
}
|
||||
|
||||
private static HealthCheckContext NewContext(IHealthCheck check) => new()
|
||||
{
|
||||
Registration = new HealthCheckRegistration("akka-cluster", check, HealthStatus.Unhealthy, tags: null),
|
||||
};
|
||||
}
|
||||
+133
@@ -0,0 +1,133 @@
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||
using ZB.MOM.WW.Health.EntityFrameworkCore;
|
||||
|
||||
namespace ZB.MOM.WW.Health.EntityFrameworkCore.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies <see cref="DatabaseHealthCheck{TContext}"/> against a real SQLite database (in-memory,
|
||||
/// connection kept open) so the <c>CanConnectAsync</c> semantics exercise an actual provider:
|
||||
/// reachable → Healthy, unopenable connection → Unhealthy (no throw escapes), a custom
|
||||
/// <see cref="DatabaseHealthCheckOptions{TContext}.ProbeQuery"/> that queries → Healthy, and a
|
||||
/// throwing <c>ProbeQuery</c> → Unhealthy. Both the <see cref="IDbContextFactory{TContext}"/> and
|
||||
/// the scoped-<c>TContext</c> resolution paths are covered.
|
||||
/// </summary>
|
||||
public sealed class DatabaseHealthCheckTests
|
||||
{
|
||||
/// <summary>A minimal context with one entity, used purely to drive provider behaviour.</summary>
|
||||
private sealed class WidgetContext : DbContext
|
||||
{
|
||||
public WidgetContext(DbContextOptions<WidgetContext> options) : base(options) { }
|
||||
|
||||
public DbSet<Widget> Widgets => Set<Widget>();
|
||||
}
|
||||
|
||||
private sealed class Widget
|
||||
{
|
||||
public int Id { get; set; }
|
||||
}
|
||||
|
||||
private static HealthCheckContext NewContext() => new()
|
||||
{
|
||||
Registration = new HealthCheckRegistration(
|
||||
"database",
|
||||
sp => throw new InvalidOperationException("not used"),
|
||||
HealthStatus.Unhealthy,
|
||||
tags: null),
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Builds a provider whose <typeparamref name="WidgetContext"/> is backed by the supplied open
|
||||
/// SQLite connection (and creates the schema). When <paramref name="useFactory"/> is true the
|
||||
/// context is registered via <c>AddDbContextFactory</c>; otherwise via <c>AddDbContext</c> (scoped).
|
||||
/// </summary>
|
||||
private static IServiceProvider BuildProvider(SqliteConnection connection, bool useFactory)
|
||||
{
|
||||
connection.Open();
|
||||
|
||||
var services = new ServiceCollection();
|
||||
if (useFactory)
|
||||
{
|
||||
services.AddDbContextFactory<WidgetContext>(o => o.UseSqlite(connection));
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddDbContext<WidgetContext>(o => o.UseSqlite(connection));
|
||||
}
|
||||
|
||||
var provider = services.BuildServiceProvider();
|
||||
|
||||
using var scope = provider.CreateScope();
|
||||
scope.ServiceProvider.GetRequiredService<WidgetContext>().Database.EnsureCreated();
|
||||
|
||||
return provider;
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
public async Task ReachableContext_Healthy(bool useFactory)
|
||||
{
|
||||
using var connection = new SqliteConnection("DataSource=:memory:");
|
||||
var provider = BuildProvider(connection, useFactory);
|
||||
|
||||
var check = new DatabaseHealthCheck<WidgetContext>(provider);
|
||||
|
||||
var result = await check.CheckHealthAsync(NewContext(), CancellationToken.None);
|
||||
|
||||
Assert.Equal(HealthStatus.Healthy, result.Status);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnopenableConnection_Unhealthy_NoThrow()
|
||||
{
|
||||
// Point the context at a file path that cannot be opened (parent directory does not exist).
|
||||
var bogusPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N"), "missing", "db.sqlite");
|
||||
|
||||
var services = new ServiceCollection();
|
||||
services.AddDbContext<WidgetContext>(o => o.UseSqlite($"DataSource={bogusPath};Mode=ReadWrite"));
|
||||
var provider = services.BuildServiceProvider();
|
||||
|
||||
var check = new DatabaseHealthCheck<WidgetContext>(provider);
|
||||
|
||||
var result = await check.CheckHealthAsync(NewContext(), CancellationToken.None);
|
||||
|
||||
Assert.Equal(HealthStatus.Unhealthy, result.Status);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CustomProbeQuery_RunsQuery_Healthy()
|
||||
{
|
||||
using var connection = new SqliteConnection("DataSource=:memory:");
|
||||
var provider = BuildProvider(connection, useFactory: true);
|
||||
|
||||
var options = new DatabaseHealthCheckOptions<WidgetContext>
|
||||
{
|
||||
ProbeQuery = (ctx, ct) => ctx.Widgets.AsNoTracking().AnyAsync(ct),
|
||||
};
|
||||
var check = new DatabaseHealthCheck<WidgetContext>(provider, options);
|
||||
|
||||
var result = await check.CheckHealthAsync(NewContext(), CancellationToken.None);
|
||||
|
||||
Assert.Equal(HealthStatus.Healthy, result.Status);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ProbeQueryThrows_Unhealthy()
|
||||
{
|
||||
using var connection = new SqliteConnection("DataSource=:memory:");
|
||||
var provider = BuildProvider(connection, useFactory: false);
|
||||
|
||||
var options = new DatabaseHealthCheckOptions<WidgetContext>
|
||||
{
|
||||
ProbeQuery = (_, _) => throw new InvalidOperationException("boom"),
|
||||
};
|
||||
var check = new DatabaseHealthCheck<WidgetContext>(provider, options);
|
||||
|
||||
var result = await check.CheckHealthAsync(NewContext(), CancellationToken.None);
|
||||
|
||||
Assert.Equal(HealthStatus.Unhealthy, result.Status);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user