3 Commits

Author SHA1 Message Date
Joseph Doherty 6f1f6b8467 fix(health): replicate site health reports between central nodes
CentralHealthAggregator is a per-node hosted singleton, but site health
reports flow through ClusterClient which round-robins each report to one
central node only. The other node's aggregator never saw those reports
and marked sites offline at the 60s threshold — sites constantly flapped
between online and offline on the monitoring page.

On receive, the active CentralCommunicationActor now republishes a
SiteHealthReportReplica wrapper on a DistributedPubSub topic. Both
central nodes subscribe to the topic and process replicas through a
dedicated path that updates the local aggregator without re-broadcasting
(avoids fan-out loops). The aggregator's existing sequence-number
idempotency makes self-delivery a cheap no-op.

DistributedPubSubExtensionProvider is now listed in the HOCON
`akka.extensions` block so the mediator is initialised at cluster
start, eliminating a race where the first Subscribe arrived before the
extension was loaded.
2026-05-13 06:20:07 -04:00
Joseph Doherty d9caa3dd7e fix(ui/shared-scripts): show real param count and return type on cards
The card badges were stuck on the pre-migration data shape: the param
counter only handled flat arrays (now JSON Schema objects), and the
return badge said "returns" regardless of the actual type. Count
`properties` for object schemas with array fallback, and label the
return badge with the schema's `type` (or `T[]` for arrays).
2026-05-13 05:52:53 -04:00
Joseph Doherty 352c93d5a2 fix(alarms): surface composed-member attributes across flatten/validate/UI
Three layers were each blind to nested composition in different ways:

- FlatteningPipeline only loaded compositions for templates in the parent's
  inheritance chain, so depth-2 composed attributes (e.g.
  Pump.AlarmSensor.SensorReading) never materialized. Walk composed chains
  breadth-first so the flattener's nested step has the data it needs.

- InstanceConfigure's alarm trigger picker was fed only direct, non-locked
  attributes, hiding inherited and composed-member paths. Feed it the full
  flattened attribute list via FlatteningPipeline.

- ValidationService.ExtractAttributeNameFromTriggerConfig only recognized
  "attributeName", silently passing alarms still using the legacy
  "attribute" key. Accept both keys, matching FlatteningService,
  AlarmActor, and AlarmTriggerConfigCodec.
2026-05-13 05:33:32 -04:00
7 changed files with 184 additions and 26 deletions
@@ -7,10 +7,12 @@
@using ScadaLink.Commons.Types.Enums
@using ScadaLink.TemplateEngine.Flattening
@using ScadaLink.TemplateEngine.Services
@using ScadaLink.DeploymentManager
@attribute [Authorize(Policy = AuthorizationPolicies.RequireDeployment)]
@inject ITemplateEngineRepository TemplateEngineRepository
@inject ISiteRepository SiteRepository
@inject InstanceService InstanceService
@inject IFlatteningPipeline FlatteningPipeline
@inject AuthenticationStateProvider AuthStateProvider
@inject NavigationManager NavigationManager
@@ -353,6 +355,12 @@
private string? _editingError;
private IReadOnlyList<AlarmAttributeChoice> _editingAvailableAttributes = Array.Empty<AlarmAttributeChoice>();
// Cached flattened attribute list (direct + inherited + composed members,
// path-qualified canonical names). Populated once after the instance loads
// and fed to the alarm trigger editor so composed-member paths like
// "AlarmSensor.SensorReading" resolve in the picker.
private IReadOnlyList<AlarmAttributeChoice> _flattenedAttributes = Array.Empty<AlarmAttributeChoice>();
// Area
private List<Area> _siteAreas = new();
private int _reassignAreaId;
@@ -405,6 +413,8 @@
{
_existingAlarmOverrides[o.AlarmCanonicalName] = o;
}
_flattenedAttributes = await BuildFlattenedAttributesAsync();
}
catch (Exception ex)
{
@@ -547,9 +557,7 @@
: (existing?.TriggerConfigurationOverride ?? alarm.TriggerConfiguration);
_editingPriorityText = existing?.PriorityLevelOverride?.ToString();
_editingAvailableAttributes = _overrideAttrs
.Select(a => new AlarmAttributeChoice(a.Name, MapDataType(a.DataType), "Direct"))
.ToList();
_editingAvailableAttributes = _flattenedAttributes;
}
private void CancelEditOverride()
@@ -689,6 +697,47 @@
_ => "Object"
};
/// <summary>
/// Same mapping for the string form emitted by <see cref="Commons.Types.Flattening.ResolvedAttribute.DataType"/>.
/// </summary>
private static string MapDataType(string dt) =>
Enum.TryParse<DataType>(dt, out var parsed) ? MapDataType(parsed) : dt;
/// <summary>
/// Builds the alarm picker choice list from the flattened configuration so
/// composed-member paths (e.g. <c>AlarmSensor.SensorReading</c>) and
/// inherited attributes appear alongside direct ones. Falls back to the
/// direct-only list if flattening fails for any reason.
/// </summary>
private async Task<IReadOnlyList<AlarmAttributeChoice>> BuildFlattenedAttributesAsync()
{
var fallback = (IReadOnlyList<AlarmAttributeChoice>)_overrideAttrs
.Select(a => new AlarmAttributeChoice(a.Name, MapDataType(a.DataType), "Direct"))
.ToList();
try
{
var flat = await FlatteningPipeline.FlattenAndValidateAsync(Id);
if (flat.IsFailure) return fallback;
return flat.Value.Configuration.Attributes
.Select(a => new AlarmAttributeChoice(
a.CanonicalName,
MapDataType(a.DataType),
a.Source switch
{
"Composed" => "Composed",
"Inherited" => "Inherited",
_ => "Direct" // Template / Override
}))
.ToList();
}
catch
{
return fallback;
}
}
// ── Area ────────────────────────────────────────────────
private async Task ReassignArea()
@@ -55,7 +55,8 @@
var preview = s.Code.Length > 80
? s.Code[..80] + "…"
: s.Code;
var paramCount = CountJsonArrayEntries(s.ParameterDefinitions);
var paramCount = CountSchemaProperties(s.ParameterDefinitions);
var returnLabel = DescribeReturnType(s.ReturnDefinition);
<div class="col-lg-6 col-12" @key="s.Id">
<div class="card h-100">
<div class="card-body">
@@ -87,14 +88,9 @@
<div>
<span class="badge bg-light text-dark me-1">@paramCount params</span>
@if (!string.IsNullOrWhiteSpace(s.ReturnDefinition))
{
<span class="badge bg-light text-dark">returns</span>
}
else
{
<span class="badge bg-light text-dark">void</span>
}
<span class="badge bg-light text-dark">
@(returnLabel == "void" ? "void" : $"returns {returnLabel}")
</span>
</div>
</div>
</div>
@@ -174,19 +170,54 @@
}
/// <summary>
/// Best-effort count of JSON array entries by tallying top-level objects.
/// Returns 0 if the parameter definition is null/empty/malformed.
/// Counts the parameters declared in either format: a JSON Schema object
/// (<c>{"type":"object","properties":{...}}</c>) or the legacy flat array.
/// Returns 0 for null/empty/malformed input.
/// </summary>
private static int CountJsonArrayEntries(string? json)
private static int CountSchemaProperties(string? json)
{
if (string.IsNullOrWhiteSpace(json)) return 0;
try
{
using var doc = System.Text.Json.JsonDocument.Parse(json);
if (doc.RootElement.ValueKind == System.Text.Json.JsonValueKind.Array)
return doc.RootElement.GetArrayLength();
var root = doc.RootElement;
if (root.ValueKind == System.Text.Json.JsonValueKind.Object
&& root.TryGetProperty("properties", out var props)
&& props.ValueKind == System.Text.Json.JsonValueKind.Object)
{
return props.EnumerateObject().Count();
}
if (root.ValueKind == System.Text.Json.JsonValueKind.Array)
return root.GetArrayLength();
}
catch { /* fall through */ }
return 0;
}
/// <summary>
/// Produces a short human label for a script's return type from its JSON
/// Schema definition: "string", "integer", "object", "string[]", etc.
/// Treats null/empty/malformed input as "void".
/// </summary>
private static string DescribeReturnType(string? json)
{
if (string.IsNullOrWhiteSpace(json)) return "void";
try
{
using var doc = System.Text.Json.JsonDocument.Parse(json);
var root = doc.RootElement;
if (root.ValueKind != System.Text.Json.JsonValueKind.Object) return "void";
if (!root.TryGetProperty("type", out var typeEl)) return "object";
var type = typeEl.GetString() ?? "object";
if (type == "array"
&& root.TryGetProperty("items", out var items)
&& items.ValueKind == System.Text.Json.JsonValueKind.Object
&& items.TryGetProperty("type", out var itemTypeEl))
{
return $"{itemTypeEl.GetString() ?? "object"}[]";
}
return type;
}
catch { return "void"; }
}
}
@@ -21,3 +21,13 @@ public record SiteHealthReport(
IReadOnlyDictionary<string, TagQualityCounts>? DataConnectionTagQuality = null,
int ParkedMessageCount = 0,
IReadOnlyList<NodeStatus>? ClusterNodes = null);
/// <summary>
/// Broadcast wrapper used between central nodes to keep per-node
/// CentralHealthAggregator state in sync. ClusterClient load-balances each
/// incoming SiteHealthReport to one central node; that node re-publishes
/// this wrapper on a DistributedPubSub topic so the peer node's aggregator
/// also processes the report (idempotently — sequence numbers guard against
/// double-counting).
/// </summary>
public record SiteHealthReportReplica(SiteHealthReport Report);
@@ -1,6 +1,7 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Commons.Interfaces.Repositories;
@@ -65,6 +66,13 @@ public class CentralCommunicationActor : ReceiveActor
private ICancelable? _refreshSchedule;
/// <summary>
/// DistributedPubSub topic used to fan health reports out to the peer
/// central node so both per-node aggregators stay in sync. See
/// <see cref="SiteHealthReportReplica"/> for the protocol rationale.
/// </summary>
private const string HealthReportTopic = "site-health-replica";
public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory)
{
_serviceProvider = serviceProvider;
@@ -79,6 +87,8 @@ public class CentralCommunicationActor : ReceiveActor
// Health monitoring: heartbeats and health reports from sites
Receive<HeartbeatMessage>(HandleHeartbeat);
Receive<SiteHealthReport>(HandleSiteHealthReport);
Receive<SiteHealthReportReplica>(r => ProcessLocally(r.Report));
Receive<SubscribeAck>(_ => { /* DistributedPubSub subscribe confirmation */ });
// Connection state changes
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
@@ -94,7 +104,32 @@ public class CentralCommunicationActor : ReceiveActor
Context.Parent.Tell(heartbeat);
}
/// <summary>
/// Handles a report delivered directly from a site (via ClusterClient):
/// process locally, then fan out to the peer central node so its
/// aggregator stays in sync.
/// </summary>
private void HandleSiteHealthReport(SiteHealthReport report)
{
ProcessLocally(report);
try
{
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Publish(HealthReportTopic, new SiteHealthReportReplica(report)));
}
catch
{
// No-op in non-clustered hosts (TestKit).
}
}
/// <summary>
/// Applies a report to the local aggregator without re-broadcasting.
/// Used for both site-originated reports and peer-replicated ones — the
/// aggregator is idempotent via sequence-number comparison.
/// </summary>
private void ProcessLocally(SiteHealthReport report)
{
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
if (aggregator != null)
@@ -265,6 +300,20 @@ public class CentralCommunicationActor : ReceiveActor
{
_log.Info("CentralCommunicationActor started");
// Subscribe to the peer-replication topic so we receive health reports
// delivered to the other central node and keep our local aggregator
// in sync (ClusterClient load-balances reports across nodes).
// Tolerant of non-clustered hosts (TestKit) where the extension is absent.
try
{
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Subscribe(HealthReportTopic, Self));
}
catch (Exception ex)
{
_log.Debug("DistributedPubSub not available — peer health replication disabled: {0}", ex.Message);
}
// Schedule periodic refresh of site addresses from the database
_refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
TimeSpan.Zero,
@@ -50,23 +50,34 @@ public class FlatteningPipeline : IFlatteningPipeline
if (templateChain.Count == 0)
return Result<FlatteningPipelineResult>.Failure("Template chain is empty.");
// Build composition maps
// Build composition maps, walking nested compositions so the flattener
// can resolve composed-of-composed attributes / alarms / scripts (e.g.
// a parent that composes Pump where Pump itself composes AlarmSensor
// produces "Pump.AlarmSensor.SensorReading").
var compositionMap = new Dictionary<int, IReadOnlyList<Commons.Entities.Templates.TemplateComposition>>();
var composedChains = new Dictionary<int, IReadOnlyList<Commons.Entities.Templates.Template>>();
var processedTemplateIds = new HashSet<int>();
var pendingChains = new Queue<IReadOnlyList<Commons.Entities.Templates.Template>>();
foreach (var template in templateChain)
pendingChains.Enqueue(templateChain);
while (pendingChains.Count > 0)
{
var compositions = await _templateRepo.GetCompositionsByTemplateIdAsync(template.Id, cancellationToken);
if (compositions.Count > 0)
var chain = pendingChains.Dequeue();
foreach (var template in chain)
{
if (!processedTemplateIds.Add(template.Id)) continue;
var compositions = await _templateRepo.GetCompositionsByTemplateIdAsync(template.Id, cancellationToken);
if (compositions.Count == 0) continue;
compositionMap[template.Id] = compositions;
foreach (var comp in compositions)
{
if (!composedChains.ContainsKey(comp.ComposedTemplateId))
{
composedChains[comp.ComposedTemplateId] =
await BuildTemplateChainAsync(comp.ComposedTemplateId, cancellationToken);
}
if (composedChains.ContainsKey(comp.ComposedTemplateId)) continue;
var composedChain = await BuildTemplateChainAsync(comp.ComposedTemplateId, cancellationToken);
composedChains[comp.ComposedTemplateId] = composedChain;
pendingChains.Enqueue(composedChain);
}
}
}
@@ -69,6 +69,9 @@ public class AkkaHostedService : IHostedService
var hocon = $@"
akka {{
extensions = [
""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools""
]
actor {{
provider = cluster
}}
@@ -220,11 +220,16 @@ public class ValidationService
internal static string? ExtractAttributeNameFromTriggerConfig(string triggerConfigJson)
{
// Accept both keys to stay consistent with FlatteningService.PrefixTriggerAttribute,
// AlarmActor.ParseEvalConfig and AlarmTriggerConfigCodec. Old data may still use
// "attribute"; the UI codec writes the canonical "attributeName".
try
{
using var doc = JsonDocument.Parse(triggerConfigJson);
if (doc.RootElement.TryGetProperty("attributeName", out var prop))
return prop.GetString();
if (doc.RootElement.TryGetProperty("attribute", out var legacyProp))
return legacyProp.GetString();
}
catch (JsonException)
{