docs+code: close Theme 1 — 24 design-doc / XML-doc drift findings

Doc/XML-comment drift + small adherence fixes across 17 modules. Highlights:
- Host-017: site CoordinatedShutdown ordering — SiteStreamGrpcServer gains
  CancelAllStreams() (refuse new streams, cancel active), wired into
  Program.cs site branch via ApplicationStopping.
- InboundAPI-021: ParentExecutionId now travels on RouteToGet/SetAttributes
  symmetric with RouteToCallRequest; RouteHelper stamps from _parentExecutionId.
- ClusterInfra-012: ClusterOptionsValidator now requires both seed nodes.
- Comm-018: SiteCommunicationActor.HeartbeatMessage.IsActive derived from
  cluster leader check (was hardcoded true).
- DM-020: reconciliation audit row attributes the current user, not prior deployer.
- SEL-019: EventLogPurgeService early-exits on standby via active-node check.
- Plus comment/XML-doc accuracy fixes across AuditLog, ConfigurationDatabase,
  NotificationOutbox, SiteRuntime, SiteCallAudit; doc refreshes for Component-
  Commons / -ManagementService / -CLI / -ExternalSystemGateway / -HealthMonitoring
  / -Transport / -ConfigurationDatabase; CD-023 index-name doc alignment.

11 new regression tests (RouteHelper x4, SiteStreamGrpcServer x2,
ClusterOptionsValidator x1, SiteCommunicationActor x1, DeploymentService x1,
EventLogPurgeService x3). Build clean (0 warnings); InboundAPI/Communication/
Host suites all green. README regenerated: 112 open (was 136).
This commit is contained in:
Joseph Doherty
2026-05-28 06:28:31 -04:00
parent e3ca9af1be
commit 487859bff0
51 changed files with 940 additions and 188 deletions
@@ -26,10 +26,12 @@ namespace ScadaLink.AuditLog.Central;
/// <para>
/// Per Bundle D's brief, audit-write failures must NEVER abort the user-facing
/// action. The actor wraps each repository call in its own try/catch so a
/// single bad row cannot cause the rest of the batch to be lost; the actor's
/// <see cref="SupervisorStrategy"/> uses <c>Resume</c> so a thrown exception
/// inside <c>ReceiveAsync</c> does not restart the actor (which would also
/// reset any in-flight state).
/// single bad row cannot cause the rest of the batch to be lost that
/// per-row catch is what keeps this actor alive across handler throws, not
/// the supervisor strategy. The <see cref="SupervisorStrategy"/> override
/// returns the Akka default decider (Restart for most exceptions) and
/// governs children only; this actor has no children today, so the override
/// is a forward-compat placeholder.
/// </para>
/// <para>
/// Two constructors exist for a deliberate reason: Bundle D's tests inject a
@@ -35,9 +35,11 @@ namespace ScadaLink.AuditLog.Central;
/// <b>Continue-on-error.</b> A single boundary that throws (transient SQL
/// failure, contention with backup, missing object) must NOT prevent the
/// other eligible boundaries from being purged on the same tick. Per-boundary
/// work runs inside its own try/catch; the actor's
/// <see cref="SupervisorStrategy"/> uses Resume so any leaked exception keeps
/// the singleton alive for the next tick.
/// work runs inside its own try/catch that per-boundary catch is what
/// keeps the singleton alive across handler throws. The
/// <see cref="SupervisorStrategy"/> override returns the Akka default
/// decider (Restart) and governs children only; this actor has no children
/// today, so the override is a forward-compat placeholder.
/// </para>
/// <para>
/// <b>DI scopes.</b> <see cref="IAuditLogRepository"/> is a scoped EF Core
@@ -48,10 +48,13 @@ namespace ScadaLink.AuditLog.Central;
/// <para>
/// <b>Failure isolation.</b> A single site that throws (DNS, transport,
/// repository write) must NOT prevent other sites from being polled on the
/// same tick. The per-site work runs inside its own try/catch; the actor's
/// supervisor strategy keeps it alive across any leaked exception with
/// <see cref="Akka.Actor.SupervisorStrategy.DefaultDecider"/>'s Restart
/// semantics — restart resets the in-memory cursors, but as noted above that's
/// same tick. The per-site work runs inside its own try/catch that
/// per-site catch is what keeps the actor running across handler throws.
/// The <see cref="SupervisorStrategy"/> override returns
/// <see cref="Akka.Actor.SupervisorStrategy.DefaultDecider"/> (Restart
/// semantics) and governs children only; this actor has no children today,
/// so the override is a forward-compat placeholder. If it ever did fire,
/// restart would reset the in-memory cursors — but as noted above that's
/// a safe (over-pull, idempotent) recovery.
/// </para>
/// <para>
@@ -706,9 +706,13 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
lock (_writeLock)
{
if (_disposed) return;
// Stop accepting new events. Setting _disposed first ensures any
// FlushBatch entered after we mark disposed will fault its pending
// events rather than touching the about-to-close connection.
// Stop accepting new events. Completing the channel writer is the
// shutdown signal: WriteAsync calls observe the completion and
// fault, and the writer loop drains any already-buffered items
// before exiting. _disposed is intentionally NOT set here — it
// flips only after the loop has fully drained (second lock block
// below), so FlushBatch's existing _disposed check guards the
// post-drain window when the connection is about to close.
_writeQueue.Writer.TryComplete();
writerLoop = _writerLoop;
}
@@ -27,9 +27,18 @@ public sealed class ClusterOptionsValidator : IValidateOptions<ClusterOptions>
{
var failures = new List<string>();
if (options.SeedNodes is null || options.SeedNodes.Count == 0)
if (options.SeedNodes is null || options.SeedNodes.Count < 2)
{
failures.Add("ClusterOptions.SeedNodes must contain at least one seed node.");
// CI-012: design doc states "both nodes are seed nodes — each node lists
// both itself and its partner" so a properly-configured deployment lists
// two. Accepting a single-seed configuration silently defeats the
// "no startup ordering dependency" guarantee called out by
// Component-ClusterInfrastructure.md (Node Configuration).
failures.Add(
"ClusterOptions.SeedNodes must contain at least 2 seed nodes "
+ "(Component-ClusterInfrastructure.md → Node Configuration: "
+ "both nodes are seed nodes); a single-seed configuration defeats "
+ "the no-startup-ordering-dependency guarantee.");
}
if (string.IsNullOrWhiteSpace(options.SplitBrainResolverStrategy)
@@ -8,12 +8,13 @@ namespace ScadaLink.Commons.Interfaces.Transport;
/// Re-entrancy / thread-safety: mutating <see cref="BundleImportId"/> is NOT
/// thread-safe. The service is registered scoped, and the assumed usage is a
/// single Blazor Server circuit (or single API request) at a time — within that
/// scope <see cref="BundleImporter.ApplyAsync"/> is the sole writer, and the
/// audit service is the sole reader, in a strictly sequential await chain.
/// Callers that perform concurrent imports within a shared scope (e.g. two
/// <c>ApplyAsync</c> calls awaited via <c>Task.WhenAll</c> on the same circuit)
/// MUST serialize access externally — there is no internal lock and the last
/// writer wins, which would cross-contaminate audit rows between imports.
/// scope <c>BundleImporter.ApplyAsync</c> (in the Transport component) is the
/// sole writer, and the audit service is the sole reader, in a strictly
/// sequential await chain. Callers that perform concurrent imports within a
/// shared scope (e.g. two <c>ApplyAsync</c> calls awaited via
/// <c>Task.WhenAll</c> on the same circuit) MUST serialize access externally —
/// there is no internal lock and the last writer wins, which would
/// cross-contaminate audit rows between imports.
/// </para>
/// </summary>
public interface IAuditCorrelationContext
@@ -33,11 +33,19 @@ public record RouteToCallResponse(
/// <summary>
/// Request to read attribute(s) from a remote instance.
/// </summary>
/// <param name="ParentExecutionId">
/// Audit Log #23 (ParentExecutionId): mirrors <see cref="RouteToCallRequest.ParentExecutionId"/>.
/// For an inbound-API-routed read this is the inbound request's per-request execution id;
/// future site-side audit emission for routed reads can stamp it as <c>ParentExecutionId</c>
/// so the inbound→site execution-tree link survives the read path. Additive trailing
/// member — null for the Central UI sandbox path or for callers built before the field existed.
/// </param>
public record RouteToGetAttributesRequest(
string CorrelationId,
string InstanceUniqueName,
IReadOnlyList<string> AttributeNames,
DateTimeOffset Timestamp);
DateTimeOffset Timestamp,
Guid? ParentExecutionId = null);
/// <summary>
/// Response containing attribute values from a remote instance.
@@ -52,11 +60,20 @@ public record RouteToGetAttributesResponse(
/// <summary>
/// Request to write attribute(s) on a remote instance.
/// </summary>
/// <param name="ParentExecutionId">
/// Audit Log #23 (ParentExecutionId): mirrors <see cref="RouteToCallRequest.ParentExecutionId"/>.
/// For an inbound-API-routed write this is the inbound request's per-request execution id;
/// site-side audit emission for the underlying device / static-attribute write can stamp
/// it as <c>ParentExecutionId</c> so the inbound→site execution-tree link survives the
/// write path. Additive trailing member — null for the Central UI sandbox path or for
/// callers built before the field existed.
/// </param>
public record RouteToSetAttributesRequest(
string CorrelationId,
string InstanceUniqueName,
IReadOnlyDictionary<string, string> AttributeValues,
DateTimeOffset Timestamp);
DateTimeOffset Timestamp,
Guid? ParentExecutionId = null);
/// <summary>
/// Response confirming attribute writes on a remote instance.
@@ -1,4 +1,5 @@
using Akka.Actor;
using Akka.Cluster;
using Akka.Cluster.Tools.Client;
using Akka.Event;
using ScadaLink.Commons.Messages.Artifacts;
@@ -27,6 +28,15 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
private readonly string _siteId;
private readonly CommunicationOptions _options;
/// <summary>
/// Communication-018: predicate that returns <c>true</c> when this node is
/// the active member of the local site cluster (used to stamp
/// <see cref="HeartbeatMessage.IsActive"/>). Production builds default to
/// the Akka <see cref="Cluster"/> leader check; tests inject a stub so they
/// do not need a real cluster.
/// </summary>
private readonly Func<bool> _isActiveCheck;
/// <summary>
/// Reference to the local Deployment Manager singleton proxy.
/// </summary>
@@ -54,14 +64,23 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
/// <param name="siteId">The site identifier included in outbound messages.</param>
/// <param name="options">Communication options including heartbeat interval and transport settings.</param>
/// <param name="deploymentManagerProxy">Local reference to the Deployment Manager singleton proxy.</param>
/// <param name="isActiveCheck">
/// Communication-018: optional override returning <c>true</c> when this node
/// is the active member of the site cluster. <c>null</c> uses the real
/// Akka <see cref="Cluster"/> leader check (the default for production
/// wiring); tests pass a stub so they do not need to load Akka.Cluster
/// into the <c>TestKit</c> ActorSystem.
/// </param>
public SiteCommunicationActor(
string siteId,
CommunicationOptions options,
IActorRef deploymentManagerProxy)
IActorRef deploymentManagerProxy,
Func<bool>? isActiveCheck = null)
{
_siteId = siteId;
_options = options;
_deploymentManagerProxy = deploymentManagerProxy;
_isActiveCheck = isActiveCheck ?? DefaultIsActiveCheck;
// Registration
Receive<RegisterCentralClient>(msg =>
@@ -360,16 +379,60 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
return;
var hostname = Environment.MachineName;
// Communication-018: stamp HeartbeatMessage.IsActive with this node's
// true active/standby role rather than hard-coding `true`. The field is
// part of the wire contract (additive-only-evolution) so a future
// central health dashboard can distinguish "active node down, standby
// up" from "site fully offline" without a new message type.
bool isActive;
try
{
isActive = _isActiveCheck();
}
catch (Exception ex)
{
// Defensive: never let a cluster-state read failure abort the
// heartbeat itself (heartbeats are health signal — their absence is
// already meaningful). Fall back to the safest non-claiming value:
// standby. Logged at Debug because this path normally only fires
// during ActorSystem warm-up.
_log.Debug(ex,
"Active-node check threw while sending heartbeat for site {0}; reporting IsActive=false",
_siteId);
isActive = false;
}
var heartbeat = new HeartbeatMessage(
_siteId,
hostname,
IsActive: true,
IsActive: isActive,
DateTimeOffset.UtcNow);
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", heartbeat), Self);
}
/// <summary>
/// Communication-018: default active-node check used when no override is
/// supplied. Mirrors <c>ActiveNodeGate</c> in the Host (and
/// <c>ActiveNodeHealthCheck</c>): the node is the active member of the
/// site cluster when it is the current cluster leader AND its own
/// <see cref="MemberStatus"/> is <see cref="MemberStatus.Up"/>. Any other
/// state (still joining, leaving, no leader yet) reports standby —
/// safe-by-default, matching the standby case.
/// </summary>
private bool DefaultIsActiveCheck()
{
var cluster = Cluster.Get(Context.System);
var self = cluster.SelfMember;
if (self.Status != MemberStatus.Up)
return false;
var leader = cluster.State.Leader;
return leader != null && leader == self.Address;
}
// ── Internal messages ──
internal record SendHeartbeat;
@@ -25,6 +25,11 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
private readonly int _maxConcurrentStreams;
private readonly TimeSpan _maxStreamLifetime;
private volatile bool _ready;
// Host-017 / REQ-HOST-7: flipped by CancelAllStreams() when the host enters
// CoordinatedShutdown so SubscribeInstance refuses new streams with
// Unavailable before the actor system tears down. Strictly monotonic — once
// true, never reset (the server is single-lifetime per host).
private volatile bool _shuttingDown;
private long _actorCounter;
// Audit Log (#23 M2): central-side ingest actor proxy. Set by the host
// after the cluster singleton starts (see Bundle E wiring). When null the
@@ -131,6 +136,40 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
_siteAuditQueue = queue;
}
/// <summary>
/// Host-017 / REQ-HOST-7: signals the gRPC server to begin its part of the
/// site shutdown sequence — refuse new <see cref="SubscribeInstance"/>
/// streams with <see cref="StatusCode.Unavailable"/> and cancel every
/// active stream so its <c>await foreach</c> observes
/// <see cref="OperationCanceledException"/> and the response stream
/// completes with <c>Cancelled</c> on the client. Idempotent — safe to call
/// more than once. Invoked from the site host's
/// <c>IHostApplicationLifetime.ApplicationStopping</c> callback BEFORE
/// Akka's <c>CoordinatedShutdown</c> runs, so in-flight clients get a
/// clean cancellation they can reconnect on rather than a silent stream
/// that only times out via gRPC keepalive.
/// </summary>
public void CancelAllStreams()
{
_shuttingDown = true;
foreach (var entry in _activeStreams.Values)
{
try
{
entry.Cts.Cancel();
}
catch (ObjectDisposedException)
{
// Already cleaned up by its own finally — nothing to do.
}
}
}
/// <summary>
/// Host-017: exposed for test assertions on the shutdown state.
/// </summary>
internal bool IsShuttingDown => _shuttingDown;
/// <summary>
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
/// </summary>
@@ -151,6 +190,11 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
if (!_ready)
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready"));
// Host-017 / REQ-HOST-7: refuse new subscriptions during shutdown so
// CoordinatedShutdown can quiesce without racing fresh streams.
if (_shuttingDown)
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server shutting down"));
// Communication-014: correlation_id arrives off the wire on a public gRPC
// endpoint and is used (below) to compose an Akka actor name. Akka actor names
// have a restricted character set — a id containing '/', whitespace, or other
@@ -6,11 +6,12 @@ using ScadaLink.Commons.Interfaces.Repositories;
namespace ScadaLink.ConfigurationDatabase.Repositories;
/// <summary>
/// EF Core implementation of IDeploymentManagerRepository.
/// Provides storage/query of deployed configuration snapshots per instance,
/// current deployment status, and optimistic concurrency on deployment status records.
///
/// WP-24: Stub level sufficient for diff/staleness support.
/// EF Core implementation of <see cref="IDeploymentManagerRepository"/> covering
/// the deployment pipeline's persistence surface: <c>DeploymentRecord</c> CRUD
/// (with optimistic concurrency via <c>DeploymentRecord.RowVersion</c>),
/// <c>SystemArtifactDeploymentRecord</c> CRUD, <c>DeployedConfigSnapshot</c> CRUD,
/// and a Restrict-FK-aware <see cref="DeleteInstanceAsync"/> that explicitly
/// clears dependent deployment-record rows before removing an instance.
/// </summary>
public class DeploymentManagerRepository : IDeploymentManagerRepository
{
@@ -170,7 +170,7 @@ public class DeploymentService
// Idempotency"). A clean prior Success or a fresh first-time deploy
// skips this extra round-trip.
var reconciled = await TryReconcileWithSiteAsync(
instance, revisionHash, configJson, cancellationToken);
instance, revisionHash, configJson, user, cancellationToken);
if (reconciled != null)
return Result<DeploymentRecord>.Success(reconciled);
@@ -617,6 +617,7 @@ public class DeploymentService
Instance instance,
string targetRevisionHash,
string configJson,
string currentUser,
CancellationToken cancellationToken)
{
var prior = await _repository.GetCurrentDeploymentStatusAsync(instance.Id, cancellationToken);
@@ -695,9 +696,19 @@ public class DeploymentService
instance, prior.DeploymentId, targetRevisionHash, configJson,
forceEnabledState: false, cancellationToken);
await _auditService.LogAsync(prior.DeployedBy, "DeployReconciled", "Instance",
// DeploymentManager-020: attribute the audit row to the user driving
// THIS redeploy (the caller of DeployInstanceAsync), not the user
// who issued the original timed-out / stuck deployment. The original
// deployer is preserved in the detail object so forensics can still
// see who launched the run that reconciliation rescued.
await _auditService.LogAsync(currentUser, "DeployReconciled", "Instance",
instance.Id.ToString(), instance.UniqueName,
new { DeploymentId = prior.DeploymentId, RevisionHash = targetRevisionHash },
new
{
DeploymentId = prior.DeploymentId,
RevisionHash = targetRevisionHash,
OriginalDeployer = prior.DeployedBy
},
cancellationToken);
return prior;
+12
View File
@@ -271,6 +271,18 @@ try
// Map gRPC service — resolves the singleton SiteStreamGrpcServer from DI
app.MapGrpcService<ScadaLink.Communication.Grpc.SiteStreamGrpcServer>();
// Host-017 / REQ-HOST-7: site-shutdown ordering. ApplicationStopping
// fires BEFORE IHostedService.StopAsync runs, so the gRPC server
// refuses new streams (Unavailable) and cancels every active stream
// here — clients observe a clean Cancelled and reconnect — and only
// THEN does AkkaHostedService run CoordinatedShutdown and tear down
// actors. Without this hand-off, in-flight streams go silent and only
// time out via gRPC keepalive (~25 s), violating the documented
// four-step sequence.
var siteLifetime = app.Services.GetRequiredService<Microsoft.Extensions.Hosting.IHostApplicationLifetime>();
var siteGrpcServer = app.Services.GetRequiredService<ScadaLink.Communication.Grpc.SiteStreamGrpcServer>();
siteLifetime.ApplicationStopping.Register(() => siteGrpcServer.CancelAllStreams());
await app.RunAsync();
}
else
+14 -2
View File
@@ -179,8 +179,14 @@ public class RouteTarget
var siteId = await ResolveSiteAsync(token);
var correlationId = Guid.NewGuid().ToString();
// Audit Log #23 (ParentExecutionId): mirrors the Call path — stamp the
// spawning inbound request's ExecutionId so future site-side audit
// emission for routed reads can record this read's parent. Symmetric
// with RouteToCallRequest so script authors get the same correlation
// across Call / GetAttributes / SetAttributes.
var request = new RouteToGetAttributesRequest(
correlationId, _instanceCode, attributeNames.ToList(), DateTimeOffset.UtcNow);
correlationId, _instanceCode, attributeNames.ToList(), DateTimeOffset.UtcNow,
_parentExecutionId);
var response = await _instanceRouter.RouteToGetAttributesAsync(siteId, request, token);
@@ -222,8 +228,14 @@ public class RouteTarget
var siteId = await ResolveSiteAsync(token);
var correlationId = Guid.NewGuid().ToString();
// Audit Log #23 (ParentExecutionId): mirrors the Call path — stamp the
// spawning inbound request's ExecutionId so future site-side audit
// emission for routed writes can record this write's parent. Symmetric
// with RouteToCallRequest so script authors get the same correlation
// across Call / GetAttributes / SetAttributes.
var request = new RouteToSetAttributesRequest(
correlationId, _instanceCode, attributeValues, DateTimeOffset.UtcNow);
correlationId, _instanceCode, attributeValues, DateTimeOffset.UtcNow,
_parentExecutionId);
var response = await _instanceRouter.RouteToSetAttributesAsync(siteId, request, token);
@@ -6,21 +6,46 @@ namespace ScadaLink.NotificationOutbox;
/// </summary>
public class NotificationOutboxOptions
{
/// <summary>Interval between dispatch sweeps that pick up pending notifications for delivery.</summary>
/// <summary>
/// Interval between dispatch sweeps that pick up pending notifications for delivery.
/// Default: 10 seconds.
/// </summary>
public TimeSpan DispatchInterval { get; set; } = TimeSpan.FromSeconds(10);
/// <summary>Maximum number of notifications claimed for delivery in a single dispatch sweep.</summary>
/// <summary>
/// Maximum number of notifications claimed for delivery in a single dispatch sweep.
/// Caps per-sweep batch size to bound memory and per-iteration latency.
/// Default: 100.
/// </summary>
public int DispatchBatchSize { get; set; } = 100;
/// <summary>Age past which an in-progress notification is considered stuck and re-claimed.</summary>
/// <summary>
/// Age past which a still-<c>Pending</c>/<c>Retrying</c> notification is counted as
/// "stuck" on the KPI tile and the per-row badge in the Central UI.
/// Display-only: rows older than this threshold are flagged in KPIs/UI; there is no
/// automatic re-claim, requeue, or escalation — the dispatcher behaviour is unaffected.
/// Default: 10 minutes.
/// </summary>
public TimeSpan StuckAgeThreshold { get; set; } = TimeSpan.FromMinutes(10);
/// <summary>Retention period for notifications in a terminal state before they are purged.</summary>
/// <summary>
/// Retention period for notifications in a terminal state (<c>Delivered</c>,
/// <c>Parked</c>, <c>Discarded</c>) before they are purged from the Notifications table.
/// Default: 365 days.
/// </summary>
public TimeSpan TerminalRetention { get; set; } = TimeSpan.FromDays(365);
/// <summary>Interval between background purge sweeps of terminal notifications.</summary>
/// <summary>
/// Interval between background purge sweeps of terminal notifications older than
/// <see cref="TerminalRetention"/>.
/// Default: 1 day.
/// </summary>
public TimeSpan PurgeInterval { get; set; } = TimeSpan.FromDays(1);
/// <summary>Trailing window used to compute the delivered-notifications throughput KPI.</summary>
/// <summary>
/// Trailing window used to compute the "delivered (last interval)" throughput KPI
/// surfaced on the Health dashboard and the Notification Outbox page.
/// Default: 1 minute.
/// </summary>
public TimeSpan DeliveredKpiWindow { get; set; } = TimeSpan.FromMinutes(1);
}
@@ -23,10 +23,14 @@ namespace ScadaLink.SiteCallAudit;
/// </summary>
/// <remarks>
/// <para>
/// Query, detail and KPIs (Task 4) and the central→site Retry/Discard relay
/// (Task 5 — the relay handlers live in this actor) are implemented; only
/// reconciliation remains deferred (per CLAUDE.md scope discipline — it lands
/// in a later follow-up).
/// Implemented: direct <see cref="UpsertSiteCallCommand"/> telemetry ingest,
/// query, detail and KPI handlers (Task 4), and the central→site Retry/Discard
/// relay (Task 5 — the relay handlers live in this actor). Deferred (per
/// CLAUDE.md scope discipline — both land in a later follow-up): the periodic
/// per-site reconciliation puller that backfills lost telemetry, and the daily
/// terminal-row purge scheduler (the repository exposes
/// <c>PurgeTerminalAsync</c> but nothing in this module currently invokes it
/// on a schedule).
/// </para>
/// <para>
/// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" —
@@ -556,6 +560,13 @@ public class SiteCallAuditActor : ReceiveActor
// SiteUnreachable is never produced from a ParkedOperationActionAck —
// unreachable responses are built by UnreachableRetry/UnreachableDiscard
// before any ack is classified, so this arm is unreachable by construction.
// We deliberately return ack.ErrorMessage (rather than throwing) to keep
// AckErrorMessage total and side-effect-free: site-unreachable is classified
// as transient by the upstream relay path (which has already constructed the
// SiteUnreachable response and detail text via SiteUnreachableMessage), so a
// defensive fall-through here just surfaces whatever error text the ack
// carries and lets the caller schedule a retry. Throwing would turn a benign
// refactor invariant violation into a relay-path crash.
SiteCallRelayOutcome.SiteUnreachable => ack.ErrorMessage,
_ => throw new ArgumentOutOfRangeException(
nameof(outcome), outcome, "unknown SiteCallRelayOutcome"),
@@ -4,6 +4,21 @@ using Microsoft.Extensions.Options;
namespace ScadaLink.SiteEventLogging;
/// <summary>
/// SiteEventLogging-019: predicate the <see cref="EventLogPurgeService"/>
/// consults at the top of every purge tick to decide whether THIS node should
/// run the daily purge. The design states "a daily background job runs on the
/// active node and deletes all events older than 30 days"; the standby's local
/// SQLite receives no writes, so purging there is harmless but unnecessary —
/// and silently doing it anyway diverges from the design.
///
/// Registration is the Host's responsibility (it knows the cluster topology);
/// when no implementation is registered the purge service defaults to "always
/// active" so non-clustered hosts and unit tests are unaffected — backward
/// compatible with the prior "run on every host" behaviour.
/// </summary>
public delegate bool SiteEventLogActiveNodeCheck();
/// <summary>
/// Background service that periodically purges old events from the SQLite event log.
/// Enforces both time-based retention (default 30 days) and storage cap (default 1GB).
@@ -17,15 +32,24 @@ public class EventLogPurgeService : BackgroundService
private readonly SiteEventLogger _eventLogger;
private readonly SiteEventLogOptions _options;
private readonly ILogger<EventLogPurgeService> _logger;
private readonly SiteEventLogActiveNodeCheck _isActiveNode;
/// <summary>Initializes a new instance of <see cref="EventLogPurgeService"/>.</summary>
/// <param name="eventLogger">The concrete event logger providing lock-guarded database access.</param>
/// <param name="options">Site event log options (retention days, storage cap, purge interval).</param>
/// <param name="logger">Logger instance.</param>
/// <param name="isActiveNode">
/// SiteEventLogging-019: optional active-node check. When <c>null</c>, the
/// service runs the purge on every tick (preserves the pre-fix behaviour
/// for non-clustered hosts and existing tests). When supplied — e.g. by
/// the Host on a site node — each tick early-exits on the standby so the
/// daily purge runs only on the active node, matching the design.
/// </param>
public EventLogPurgeService(
SiteEventLogger eventLogger,
IOptions<SiteEventLogOptions> options,
ILogger<EventLogPurgeService> logger)
ILogger<EventLogPurgeService> logger,
SiteEventLogActiveNodeCheck? isActiveNode = null)
{
// Depend on the concrete recorder directly: purge must funnel database access
// through its lock-guarded WithConnection. Taking ISiteEventLogger and
@@ -33,6 +57,7 @@ public class EventLogPurgeService : BackgroundService
_eventLogger = eventLogger;
_options = options.Value;
_logger = logger;
_isActiveNode = isActiveNode ?? (static () => true);
}
/// <inheritdoc />
@@ -58,6 +83,31 @@ public class EventLogPurgeService : BackgroundService
{
try
{
// SiteEventLogging-019: gate every tick on the active-node check.
// The standby's local SQLite receives no writes, so purging there
// is harmless but unnecessary; the design (Component-SiteEventLogging
// → Storage) explicitly states the purge runs on the active node.
// Defensive try/catch: a transient cluster-state read failure must
// not stop the purge loop — fall back to running the purge (the
// pre-fix behaviour was "always run", which is harmless on standby).
bool isActive;
try
{
isActive = _isActiveNode();
}
catch (Exception checkEx)
{
_logger.LogDebug(checkEx,
"Active-node check threw during purge tick; running purge to be safe");
isActive = true;
}
if (!isActive)
{
_logger.LogDebug("Skipping event log purge tick — this node is not the active site member");
return;
}
PurgeByRetention();
PurgeByStorageCap();
}
@@ -1,4 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ScadaLink.SiteEventLogging;
@@ -18,7 +21,19 @@ public static class ServiceCollectionExtensions
services.AddSingleton<SiteEventLogger>();
services.AddSingleton<ISiteEventLogger>(sp => sp.GetRequiredService<SiteEventLogger>());
services.AddSingleton<IEventLogQueryService, EventLogQueryService>();
services.AddHostedService<EventLogPurgeService>();
// SiteEventLogging-019: the purge service still registers on every host
// node, but it consults an optional SiteEventLogActiveNodeCheck on each
// tick and early-exits on the standby. The Host registers the real
// active-node check on site nodes; tests and non-clustered hosts leave
// it unregistered, and the purge defaults to "always run" (the
// pre-fix behaviour). Building the service via a factory so the
// optional delegate flows from DI rather than the constructor default.
services.AddHostedService(sp => new EventLogPurgeService(
sp.GetRequiredService<SiteEventLogger>(),
sp.GetRequiredService<IOptions<SiteEventLogOptions>>(),
sp.GetRequiredService<ILogger<EventLogPurgeService>>(),
sp.GetService<SiteEventLogActiveNodeCheck>()));
return services;
}
@@ -3,32 +3,40 @@ using ScadaLink.StoreAndForward;
namespace ScadaLink.SiteRuntime.Messages;
/// <summary>
/// Outbound messages — sent by local DeploymentManagerActor/S&amp;F service
/// to the local SiteReplicationActor for forwarding to the peer node.
/// </summary>
// Outbound messages — sent by local DeploymentManagerActor/S&F service
// to the local SiteReplicationActor for forwarding to the peer node.
/// <summary>Outbound: replicate a deployed instance config (create or update) to the peer node.</summary>
public record ReplicateConfigDeploy(
string InstanceName, string ConfigJson, string DeploymentId, string RevisionHash, bool IsEnabled);
/// <summary>Outbound: replicate removal of a deployed instance config to the peer node.</summary>
public record ReplicateConfigRemove(string InstanceName);
/// <summary>Outbound: replicate an instance enabled/disabled flag change to the peer node.</summary>
public record ReplicateConfigSetEnabled(string InstanceName, bool IsEnabled);
/// <summary>Outbound: replicate a system-wide artifact deployment (shared scripts, external systems, etc.) to the peer node.</summary>
public record ReplicateArtifacts(DeployArtifactsCommand Command);
/// <summary>Outbound: replicate a store-and-forward buffer mutation (enqueue/dequeue/park/etc.) to the peer node.</summary>
public record ReplicateStoreAndForward(ReplicationOperation Operation);
/// <summary>
/// Inbound messages — received from the peer's SiteReplicationActor
/// and applied to local SQLite storage.
/// </summary>
// Inbound messages — received from the peer's SiteReplicationActor
// and applied to local SQLite storage.
/// <summary>Inbound: apply a peer-replicated instance config (create or update) to local SQLite.</summary>
public record ApplyConfigDeploy(
string InstanceName, string ConfigJson, string DeploymentId, string RevisionHash, bool IsEnabled);
/// <summary>Inbound: apply peer-replicated removal of a deployed instance config to local SQLite.</summary>
public record ApplyConfigRemove(string InstanceName);
/// <summary>Inbound: apply a peer-replicated instance enabled/disabled flag change to local SQLite.</summary>
public record ApplyConfigSetEnabled(string InstanceName, bool IsEnabled);
/// <summary>Inbound: apply a peer-replicated system-wide artifact deployment to local SQLite.</summary>
public record ApplyArtifacts(DeployArtifactsCommand Command);
/// <summary>Inbound: apply a peer-replicated store-and-forward buffer mutation to the local buffer.</summary>
public record ApplyStoreAndForward(ReplicationOperation Operation);