fix(host): resolve Host-005..011 — async startup, HOCON escaping, port-conflict check, dead-config cleanup, migration retry, log-level wiring; Host-002 flagged
This commit is contained in:
@@ -54,58 +54,20 @@ public class AkkaHostedService : IHostedService
|
||||
/// </summary>
|
||||
public ActorSystem? ActorSystem => _actorSystem;
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
public async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var seedNodesStr = string.Join(",",
|
||||
_clusterOptions.SeedNodes.Select(s => $"\"{s}\""));
|
||||
|
||||
// For site nodes, include a site-specific role (e.g., "site-SiteA") alongside the base role
|
||||
var roles = BuildRoles();
|
||||
var rolesStr = string.Join(",", roles.Select(r => $"\"{r}\""));
|
||||
|
||||
// WP-3: Transport heartbeat explicitly configured from CommunicationOptions (not framework defaults)
|
||||
var transportHeartbeatSec = _communicationOptions.TransportHeartbeatInterval.TotalSeconds;
|
||||
var transportFailureSec = _communicationOptions.TransportFailureThreshold.TotalSeconds;
|
||||
|
||||
var hocon = $@"
|
||||
akka {{
|
||||
extensions = [
|
||||
""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools""
|
||||
]
|
||||
actor {{
|
||||
provider = cluster
|
||||
}}
|
||||
remote {{
|
||||
dot-netty.tcp {{
|
||||
hostname = ""{_nodeOptions.NodeHostname}""
|
||||
port = {_nodeOptions.RemotingPort}
|
||||
}}
|
||||
transport-failure-detector {{
|
||||
heartbeat-interval = {transportHeartbeatSec:F0}s
|
||||
acceptable-heartbeat-pause = {transportFailureSec:F0}s
|
||||
}}
|
||||
}}
|
||||
cluster {{
|
||||
seed-nodes = [{seedNodesStr}]
|
||||
roles = [{rolesStr}]
|
||||
min-nr-of-members = {_clusterOptions.MinNrOfMembers}
|
||||
split-brain-resolver {{
|
||||
active-strategy = {_clusterOptions.SplitBrainResolverStrategy}
|
||||
stable-after = {_clusterOptions.StableAfter.TotalSeconds:F0}s
|
||||
keep-oldest {{
|
||||
down-if-alone = on
|
||||
}}
|
||||
}}
|
||||
failure-detector {{
|
||||
heartbeat-interval = {_clusterOptions.HeartbeatInterval.TotalSeconds:F0}s
|
||||
acceptable-heartbeat-pause = {_clusterOptions.FailureDetectionThreshold.TotalSeconds:F0}s
|
||||
}}
|
||||
run-coordinated-shutdown-when-down = on
|
||||
}}
|
||||
coordinated-shutdown {{
|
||||
run-by-clr-shutdown-hook = on
|
||||
}}
|
||||
}}";
|
||||
// Host-006: HOCON is assembled in a dedicated builder that quotes/escapes every
|
||||
// interpolated value, so a hostname, seed node or strategy containing a quote,
|
||||
// backslash or whitespace cannot corrupt the configuration document.
|
||||
var hocon = BuildHocon(_nodeOptions, _clusterOptions, roles,
|
||||
transportHeartbeatSec, transportFailureSec);
|
||||
|
||||
var config = ConfigurationFactory.ParseString(hocon);
|
||||
_actorSystem = ActorSystem.Create("scadalink", config);
|
||||
@@ -135,10 +97,78 @@ akka {{
|
||||
}
|
||||
else if (_nodeOptions.Role.Equals("Site", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
RegisterSiteActors();
|
||||
await RegisterSiteActorsAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
/// <summary>
|
||||
/// Builds the Akka HOCON configuration document. Every interpolated value is
|
||||
/// routed through <see cref="QuoteHocon"/> (string values) so a hostname,
|
||||
/// seed-node URI, role or split-brain strategy containing a quote, backslash or
|
||||
/// whitespace cannot corrupt the document or be silently misparsed (Host-006).
|
||||
/// </summary>
|
||||
public static string BuildHocon(
|
||||
NodeOptions nodeOptions,
|
||||
ClusterOptions clusterOptions,
|
||||
IEnumerable<string> roles,
|
||||
double transportHeartbeatSec,
|
||||
double transportFailureSec)
|
||||
{
|
||||
var seedNodesStr = string.Join(",",
|
||||
clusterOptions.SeedNodes.Select(QuoteHocon));
|
||||
var rolesStr = string.Join(",", roles.Select(QuoteHocon));
|
||||
|
||||
return $@"
|
||||
akka {{
|
||||
extensions = [
|
||||
""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools""
|
||||
]
|
||||
actor {{
|
||||
provider = cluster
|
||||
}}
|
||||
remote {{
|
||||
dot-netty.tcp {{
|
||||
hostname = {QuoteHocon(nodeOptions.NodeHostname)}
|
||||
port = {nodeOptions.RemotingPort}
|
||||
}}
|
||||
transport-failure-detector {{
|
||||
heartbeat-interval = {transportHeartbeatSec:F0}s
|
||||
acceptable-heartbeat-pause = {transportFailureSec:F0}s
|
||||
}}
|
||||
}}
|
||||
cluster {{
|
||||
seed-nodes = [{seedNodesStr}]
|
||||
roles = [{rolesStr}]
|
||||
min-nr-of-members = {clusterOptions.MinNrOfMembers}
|
||||
split-brain-resolver {{
|
||||
active-strategy = {QuoteHocon(clusterOptions.SplitBrainResolverStrategy)}
|
||||
stable-after = {clusterOptions.StableAfter.TotalSeconds:F0}s
|
||||
keep-oldest {{
|
||||
down-if-alone = on
|
||||
}}
|
||||
}}
|
||||
failure-detector {{
|
||||
heartbeat-interval = {clusterOptions.HeartbeatInterval.TotalSeconds:F0}s
|
||||
acceptable-heartbeat-pause = {clusterOptions.FailureDetectionThreshold.TotalSeconds:F0}s
|
||||
}}
|
||||
run-coordinated-shutdown-when-down = on
|
||||
}}
|
||||
coordinated-shutdown {{
|
||||
run-by-clr-shutdown-hook = on
|
||||
}}
|
||||
}}";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Renders a value as a HOCON double-quoted string, escaping backslashes and
|
||||
/// double quotes so the resulting token cannot break out of its string literal.
|
||||
/// </summary>
|
||||
private static string QuoteHocon(string? value)
|
||||
{
|
||||
var escaped = (value ?? string.Empty)
|
||||
.Replace("\\", "\\\\")
|
||||
.Replace("\"", "\\\"");
|
||||
return $"\"{escaped}\"";
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
@@ -218,7 +248,7 @@ akka {{
|
||||
/// The singleton is scoped to the site-specific cluster role so it runs on exactly
|
||||
/// one node within this site's cluster.
|
||||
/// </summary>
|
||||
private void RegisterSiteActors()
|
||||
private async Task RegisterSiteActorsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var siteRole = $"site-{_nodeOptions.SiteId}";
|
||||
var storage = _serviceProvider.GetRequiredService<SiteStorageService>();
|
||||
@@ -341,8 +371,11 @@ akka {{
|
||||
if (storeAndForwardService != null)
|
||||
{
|
||||
// Initialize SQLite schema and start the retry timer. Must complete before
|
||||
// any actor or HTTP handler touches the service.
|
||||
storeAndForwardService.StartAsync().GetAwaiter().GetResult();
|
||||
// any actor or HTTP handler touches the service. Host-005: awaited rather
|
||||
// than blocked via GetAwaiter().GetResult() — no thread-pool starvation /
|
||||
// sync-context deadlock risk, and exceptions surface as their original type.
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
await storeAndForwardService.StartAsync();
|
||||
|
||||
// Register the store-and-forward delivery handlers so buffered
|
||||
// ExternalSystem calls, cached DB writes and notifications are actually
|
||||
@@ -413,7 +446,22 @@ akka {{
|
||||
contacts.Count, _nodeOptions.SiteId);
|
||||
}
|
||||
|
||||
// Gate gRPC subscriptions until the actor system and SiteStreamManager are initialized
|
||||
// Gate gRPC subscriptions until the actor system and SiteStreamManager are
|
||||
// initialized (REQ-HOST-7).
|
||||
//
|
||||
// Host-009: SetReady asserts a deliberately narrow contract. By this point the
|
||||
// actor system exists, SiteStreamManager.Initialize has run, and every
|
||||
// role actor (SiteCommunicationActor, deployment-manager singleton,
|
||||
// SiteReplicationActor, the ClusterClient) has been created with ActorOf —
|
||||
// creation and the registration Tells are synchronous and strictly ordered.
|
||||
// What is NOT guaranteed is completion of each actor's PreStart or the
|
||||
// ClusterClient's initial-contact handshake with central: those are
|
||||
// intentionally asynchronous. Gating readiness on the central handshake would
|
||||
// be wrong — a site must come up and stream locally even while central is
|
||||
// briefly unreachable. gRPC readiness therefore guarantees "the site actor
|
||||
// graph exists and can accept subscription streams", not "the cluster
|
||||
// handshake has completed". Streams opened before SetReady are already
|
||||
// rejected by SiteStreamGrpcServer with StatusCode.Unavailable.
|
||||
var grpcServer = _serviceProvider.GetService<ScadaLink.Communication.Grpc.SiteStreamGrpcServer>();
|
||||
grpcServer?.SetReady(_actorSystem!);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user