fix(high-severity): close 9 of 10 open High findings across 8 modules
Comm-016: delete dead HandleConnectionStateChanged + _debugSubscriptions / _inProgressDeployments tracking + ConnectionStateChanged message record. Disconnect detection is owned by the transport layers (gRPC keepalive PING ~25s; Ask-timeout at CommunicationService). Updates the Component-Communication.md design doc to make that explicit. SnF-018: NotificationForwarder.DeliverAsync now discards a corrupt buffered payload (Warning log + return true) instead of returning false and parking the row — honoring the design's "notifications do not park" invariant. DM-018: reconciliation no longer force-sets Enabled, preserving an intentional Disabled state after central failover. ESG-018: DeliverBufferedAsync (both ExternalSystemClient + DatabaseGateway) catches JsonException and returns false, turning a corrupt buffered row into a parked operation instead of a retry-forever poison message. InboundAPI-022: register ActiveNodeGate as IActiveNodeGate in the Central DI branch so standby-node gating is actually wired up in production. NS-019: remove orphaned NotificationDeliveryService / INotificationDeliveryService / NotificationResult; central notification delivery now lives entirely in NotificationOutbox. SEL-016: normalise From/To filters to UTC before ISO-string compare so non-UTC DateTimeOffset clients no longer get spuriously excluded events. TE-017: include Description on attributes/alarms and a HashableConnections projection (protocol, endpoint JSON, failover count) in the revision hash and DiffService; staleness detection now catches description-only and connection-endpoint edits. Transport-001 and Transport-002 (also High) remain Open — they're being handled in a follow-up batch because both touch BundleImporter.cs and must serialise.
This commit is contained in:
@@ -60,17 +60,18 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
/// </summary>
|
||||
private readonly Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)> _siteClients = new();
|
||||
|
||||
/// <summary>
|
||||
/// Tracks active debug view subscriptions: correlationId → (siteId, subscriber).
|
||||
/// Used to kill debug streams on site disconnection (WP-5).
|
||||
/// </summary>
|
||||
private readonly Dictionary<string, (string SiteId, IActorRef Subscriber)> _debugSubscriptions = new();
|
||||
|
||||
/// <summary>
|
||||
/// Tracks in-progress deployments: deploymentId → siteId.
|
||||
/// On central failover, in-progress deployments are treated as failed (WP-5).
|
||||
/// </summary>
|
||||
private readonly Dictionary<string, string> _inProgressDeployments = new();
|
||||
// Communication-016: the previous _debugSubscriptions / _inProgressDeployments
|
||||
// dictionaries existed solely to support a documented "synchronous kill streams +
|
||||
// mark deployments failed on site disconnect" workflow triggered by
|
||||
// ConnectionStateChanged. No production code ever emitted that message — only
|
||||
// the unit test did — so the workflow was dead from end to end. Disconnect
|
||||
// detection is owned by the underlying transports: the gRPC keepalive PING
|
||||
// signals stream interruption in ~25s (handled by DebugStreamBridgeActor's own
|
||||
// reconnection logic), and an Ask round-trip for a deploy times out at the
|
||||
// CommunicationService layer (caller sees failure). The tracking dicts +
|
||||
// ConnectionStateChanged record + HandleConnectionStateChanged handler are
|
||||
// removed; see docs/requirements/Component-Communication.md "Connection
|
||||
// Failure Behavior" for the keepalive-based contract that survives.
|
||||
|
||||
private ICancelable? _refreshSchedule;
|
||||
|
||||
@@ -165,9 +166,6 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
Receive<SiteHealthReportReplica>(r => ProcessLocally(r.Report));
|
||||
Receive<SubscribeAck>(_ => { /* DistributedPubSub subscribe confirmation */ });
|
||||
|
||||
// Connection state changes
|
||||
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
|
||||
|
||||
// Route enveloped messages to sites
|
||||
Receive<SiteEnvelope>(HandleSiteEnvelope);
|
||||
|
||||
@@ -335,44 +333,10 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleConnectionStateChanged(ConnectionStateChanged msg)
|
||||
{
|
||||
if (!msg.IsConnected)
|
||||
{
|
||||
_log.Warning("Site {0} disconnected at {1}", msg.SiteId, msg.Timestamp);
|
||||
|
||||
// WP-5: Kill active debug streams for the disconnected site
|
||||
var toRemove = _debugSubscriptions
|
||||
.Where(kvp => kvp.Value.SiteId == msg.SiteId)
|
||||
.ToList();
|
||||
|
||||
foreach (var kvp in toRemove)
|
||||
{
|
||||
_log.Info("Killing debug stream {0} for disconnected site {1}", kvp.Key, msg.SiteId);
|
||||
kvp.Value.Subscriber.Tell(new DebugStreamTerminated(msg.SiteId, kvp.Key));
|
||||
_debugSubscriptions.Remove(kvp.Key);
|
||||
}
|
||||
|
||||
// WP-5: Mark in-progress deployments as failed
|
||||
var failedDeployments = _inProgressDeployments
|
||||
.Where(kvp => kvp.Value == msg.SiteId)
|
||||
.Select(kvp => kvp.Key)
|
||||
.ToList();
|
||||
|
||||
foreach (var deploymentId in failedDeployments)
|
||||
{
|
||||
_log.Warning("Deployment {0} to site {1} treated as failed due to disconnection",
|
||||
deploymentId, msg.SiteId);
|
||||
_inProgressDeployments.Remove(deploymentId);
|
||||
}
|
||||
|
||||
// Note: Do NOT stop the ClusterClient — it handles reconnection internally
|
||||
}
|
||||
else
|
||||
{
|
||||
_log.Info("Site {0} connected at {1}", msg.SiteId, msg.Timestamp);
|
||||
}
|
||||
}
|
||||
// Communication-016: HandleConnectionStateChanged removed — no production
|
||||
// caller emitted ConnectionStateChanged, so the workflow ran only in tests.
|
||||
// Disconnect detection is owned by the transport layers (gRPC keepalive +
|
||||
// ClusterClient/Ask timeout).
|
||||
|
||||
private void HandleSiteEnvelope(SiteEnvelope envelope)
|
||||
{
|
||||
@@ -385,9 +349,6 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
return;
|
||||
}
|
||||
|
||||
// Track debug subscriptions for cleanup on disconnect
|
||||
TrackMessageForCleanup(envelope);
|
||||
|
||||
// Route via ClusterClient — Sender is preserved for Ask response routing
|
||||
entry.Client.Tell(
|
||||
new ClusterClient.Send("/user/site-communication", envelope.Message),
|
||||
@@ -485,23 +446,8 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
_log.Info("Site ClusterClient cache refreshed with {0} site(s)", _siteClients.Count);
|
||||
}
|
||||
|
||||
private void TrackMessageForCleanup(SiteEnvelope envelope)
|
||||
{
|
||||
switch (envelope.Message)
|
||||
{
|
||||
case Commons.Messages.DebugView.SubscribeDebugViewRequest sub:
|
||||
_debugSubscriptions[sub.CorrelationId] = (envelope.SiteId, Sender);
|
||||
break;
|
||||
|
||||
case Commons.Messages.DebugView.UnsubscribeDebugViewRequest unsub:
|
||||
_debugSubscriptions.Remove(unsub.CorrelationId);
|
||||
break;
|
||||
|
||||
case Commons.Messages.Deployment.DeployInstanceCommand deploy:
|
||||
_inProgressDeployments[deploy.DeploymentId] = envelope.SiteId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Communication-016: TrackMessageForCleanup removed — the dicts it fed
|
||||
// existed solely to support the dead ConnectionStateChanged workflow.
|
||||
|
||||
/// <inheritdoc />
|
||||
protected override SupervisorStrategy SupervisorStrategy()
|
||||
@@ -547,11 +493,8 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
/// <inheritdoc />
|
||||
protected override void PostStop()
|
||||
{
|
||||
_log.Info("CentralCommunicationActor stopped. In-progress deployments treated as failed (WP-5).");
|
||||
_log.Info("CentralCommunicationActor stopped");
|
||||
_refreshSchedule?.Cancel();
|
||||
// On central failover, all in-progress deployments are failed
|
||||
_inProgressDeployments.Clear();
|
||||
_debugSubscriptions.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user