Compare commits
3 Commits
164d914ba8
...
6f1f6b8467
| Author | SHA1 | Date | |
|---|---|---|---|
| 6f1f6b8467 | |||
| d9caa3dd7e | |||
| 352c93d5a2 |
@@ -7,10 +7,12 @@
|
||||
@using ScadaLink.Commons.Types.Enums
|
||||
@using ScadaLink.TemplateEngine.Flattening
|
||||
@using ScadaLink.TemplateEngine.Services
|
||||
@using ScadaLink.DeploymentManager
|
||||
@attribute [Authorize(Policy = AuthorizationPolicies.RequireDeployment)]
|
||||
@inject ITemplateEngineRepository TemplateEngineRepository
|
||||
@inject ISiteRepository SiteRepository
|
||||
@inject InstanceService InstanceService
|
||||
@inject IFlatteningPipeline FlatteningPipeline
|
||||
@inject AuthenticationStateProvider AuthStateProvider
|
||||
@inject NavigationManager NavigationManager
|
||||
|
||||
@@ -353,6 +355,12 @@
|
||||
private string? _editingError;
|
||||
private IReadOnlyList<AlarmAttributeChoice> _editingAvailableAttributes = Array.Empty<AlarmAttributeChoice>();
|
||||
|
||||
// Cached flattened attribute list (direct + inherited + composed members,
|
||||
// path-qualified canonical names). Populated once after the instance loads
|
||||
// and fed to the alarm trigger editor so composed-member paths like
|
||||
// "AlarmSensor.SensorReading" resolve in the picker.
|
||||
private IReadOnlyList<AlarmAttributeChoice> _flattenedAttributes = Array.Empty<AlarmAttributeChoice>();
|
||||
|
||||
// Area
|
||||
private List<Area> _siteAreas = new();
|
||||
private int _reassignAreaId;
|
||||
@@ -405,6 +413,8 @@
|
||||
{
|
||||
_existingAlarmOverrides[o.AlarmCanonicalName] = o;
|
||||
}
|
||||
|
||||
_flattenedAttributes = await BuildFlattenedAttributesAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -547,9 +557,7 @@
|
||||
: (existing?.TriggerConfigurationOverride ?? alarm.TriggerConfiguration);
|
||||
|
||||
_editingPriorityText = existing?.PriorityLevelOverride?.ToString();
|
||||
_editingAvailableAttributes = _overrideAttrs
|
||||
.Select(a => new AlarmAttributeChoice(a.Name, MapDataType(a.DataType), "Direct"))
|
||||
.ToList();
|
||||
_editingAvailableAttributes = _flattenedAttributes;
|
||||
}
|
||||
|
||||
private void CancelEditOverride()
|
||||
@@ -689,6 +697,47 @@
|
||||
_ => "Object"
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Same mapping for the string form emitted by <see cref="Commons.Types.Flattening.ResolvedAttribute.DataType"/>.
|
||||
/// </summary>
|
||||
private static string MapDataType(string dt) =>
|
||||
Enum.TryParse<DataType>(dt, out var parsed) ? MapDataType(parsed) : dt;
|
||||
|
||||
/// <summary>
|
||||
/// Builds the alarm picker choice list from the flattened configuration so
|
||||
/// composed-member paths (e.g. <c>AlarmSensor.SensorReading</c>) and
|
||||
/// inherited attributes appear alongside direct ones. Falls back to the
|
||||
/// direct-only list if flattening fails for any reason.
|
||||
/// </summary>
|
||||
private async Task<IReadOnlyList<AlarmAttributeChoice>> BuildFlattenedAttributesAsync()
|
||||
{
|
||||
var fallback = (IReadOnlyList<AlarmAttributeChoice>)_overrideAttrs
|
||||
.Select(a => new AlarmAttributeChoice(a.Name, MapDataType(a.DataType), "Direct"))
|
||||
.ToList();
|
||||
|
||||
try
|
||||
{
|
||||
var flat = await FlatteningPipeline.FlattenAndValidateAsync(Id);
|
||||
if (flat.IsFailure) return fallback;
|
||||
|
||||
return flat.Value.Configuration.Attributes
|
||||
.Select(a => new AlarmAttributeChoice(
|
||||
a.CanonicalName,
|
||||
MapDataType(a.DataType),
|
||||
a.Source switch
|
||||
{
|
||||
"Composed" => "Composed",
|
||||
"Inherited" => "Inherited",
|
||||
_ => "Direct" // Template / Override
|
||||
}))
|
||||
.ToList();
|
||||
}
|
||||
catch
|
||||
{
|
||||
return fallback;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Area ────────────────────────────────────────────────
|
||||
|
||||
private async Task ReassignArea()
|
||||
|
||||
@@ -55,7 +55,8 @@
|
||||
var preview = s.Code.Length > 80
|
||||
? s.Code[..80] + "…"
|
||||
: s.Code;
|
||||
var paramCount = CountJsonArrayEntries(s.ParameterDefinitions);
|
||||
var paramCount = CountSchemaProperties(s.ParameterDefinitions);
|
||||
var returnLabel = DescribeReturnType(s.ReturnDefinition);
|
||||
<div class="col-lg-6 col-12" @key="s.Id">
|
||||
<div class="card h-100">
|
||||
<div class="card-body">
|
||||
@@ -87,14 +88,9 @@
|
||||
|
||||
<div>
|
||||
<span class="badge bg-light text-dark me-1">@paramCount params</span>
|
||||
@if (!string.IsNullOrWhiteSpace(s.ReturnDefinition))
|
||||
{
|
||||
<span class="badge bg-light text-dark">returns</span>
|
||||
}
|
||||
else
|
||||
{
|
||||
<span class="badge bg-light text-dark">void</span>
|
||||
}
|
||||
<span class="badge bg-light text-dark">
|
||||
@(returnLabel == "void" ? "void" : $"returns {returnLabel}")
|
||||
</span>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -174,19 +170,54 @@
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Best-effort count of JSON array entries by tallying top-level objects.
|
||||
/// Returns 0 if the parameter definition is null/empty/malformed.
|
||||
/// Counts the parameters declared in either format: a JSON Schema object
|
||||
/// (<c>{"type":"object","properties":{...}}</c>) or the legacy flat array.
|
||||
/// Returns 0 for null/empty/malformed input.
|
||||
/// </summary>
|
||||
private static int CountJsonArrayEntries(string? json)
|
||||
private static int CountSchemaProperties(string? json)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(json)) return 0;
|
||||
try
|
||||
{
|
||||
using var doc = System.Text.Json.JsonDocument.Parse(json);
|
||||
if (doc.RootElement.ValueKind == System.Text.Json.JsonValueKind.Array)
|
||||
return doc.RootElement.GetArrayLength();
|
||||
var root = doc.RootElement;
|
||||
if (root.ValueKind == System.Text.Json.JsonValueKind.Object
|
||||
&& root.TryGetProperty("properties", out var props)
|
||||
&& props.ValueKind == System.Text.Json.JsonValueKind.Object)
|
||||
{
|
||||
return props.EnumerateObject().Count();
|
||||
}
|
||||
if (root.ValueKind == System.Text.Json.JsonValueKind.Array)
|
||||
return root.GetArrayLength();
|
||||
}
|
||||
catch { /* fall through */ }
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Produces a short human label for a script's return type from its JSON
|
||||
/// Schema definition: "string", "integer", "object", "string[]", etc.
|
||||
/// Treats null/empty/malformed input as "void".
|
||||
/// </summary>
|
||||
private static string DescribeReturnType(string? json)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(json)) return "void";
|
||||
try
|
||||
{
|
||||
using var doc = System.Text.Json.JsonDocument.Parse(json);
|
||||
var root = doc.RootElement;
|
||||
if (root.ValueKind != System.Text.Json.JsonValueKind.Object) return "void";
|
||||
if (!root.TryGetProperty("type", out var typeEl)) return "object";
|
||||
var type = typeEl.GetString() ?? "object";
|
||||
if (type == "array"
|
||||
&& root.TryGetProperty("items", out var items)
|
||||
&& items.ValueKind == System.Text.Json.JsonValueKind.Object
|
||||
&& items.TryGetProperty("type", out var itemTypeEl))
|
||||
{
|
||||
return $"{itemTypeEl.GetString() ?? "object"}[]";
|
||||
}
|
||||
return type;
|
||||
}
|
||||
catch { return "void"; }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,3 +21,13 @@ public record SiteHealthReport(
|
||||
IReadOnlyDictionary<string, TagQualityCounts>? DataConnectionTagQuality = null,
|
||||
int ParkedMessageCount = 0,
|
||||
IReadOnlyList<NodeStatus>? ClusterNodes = null);
|
||||
|
||||
/// <summary>
|
||||
/// Broadcast wrapper used between central nodes to keep per-node
|
||||
/// CentralHealthAggregator state in sync. ClusterClient load-balances each
|
||||
/// incoming SiteHealthReport to one central node; that node re-publishes
|
||||
/// this wrapper on a DistributedPubSub topic so the peer node's aggregator
|
||||
/// also processes the report (idempotently — sequence numbers guard against
|
||||
/// double-counting).
|
||||
/// </summary>
|
||||
public record SiteHealthReportReplica(SiteHealthReport Report);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
using System.Collections.Immutable;
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.Client;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
@@ -65,6 +66,13 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
|
||||
private ICancelable? _refreshSchedule;
|
||||
|
||||
/// <summary>
|
||||
/// DistributedPubSub topic used to fan health reports out to the peer
|
||||
/// central node so both per-node aggregators stay in sync. See
|
||||
/// <see cref="SiteHealthReportReplica"/> for the protocol rationale.
|
||||
/// </summary>
|
||||
private const string HealthReportTopic = "site-health-replica";
|
||||
|
||||
public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory)
|
||||
{
|
||||
_serviceProvider = serviceProvider;
|
||||
@@ -79,6 +87,8 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
// Health monitoring: heartbeats and health reports from sites
|
||||
Receive<HeartbeatMessage>(HandleHeartbeat);
|
||||
Receive<SiteHealthReport>(HandleSiteHealthReport);
|
||||
Receive<SiteHealthReportReplica>(r => ProcessLocally(r.Report));
|
||||
Receive<SubscribeAck>(_ => { /* DistributedPubSub subscribe confirmation */ });
|
||||
|
||||
// Connection state changes
|
||||
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
|
||||
@@ -94,7 +104,32 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
Context.Parent.Tell(heartbeat);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a report delivered directly from a site (via ClusterClient):
|
||||
/// process locally, then fan out to the peer central node so its
|
||||
/// aggregator stays in sync.
|
||||
/// </summary>
|
||||
private void HandleSiteHealthReport(SiteHealthReport report)
|
||||
{
|
||||
ProcessLocally(report);
|
||||
|
||||
try
|
||||
{
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(
|
||||
new Publish(HealthReportTopic, new SiteHealthReportReplica(report)));
|
||||
}
|
||||
catch
|
||||
{
|
||||
// No-op in non-clustered hosts (TestKit).
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Applies a report to the local aggregator without re-broadcasting.
|
||||
/// Used for both site-originated reports and peer-replicated ones — the
|
||||
/// aggregator is idempotent via sequence-number comparison.
|
||||
/// </summary>
|
||||
private void ProcessLocally(SiteHealthReport report)
|
||||
{
|
||||
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
|
||||
if (aggregator != null)
|
||||
@@ -265,6 +300,20 @@ public class CentralCommunicationActor : ReceiveActor
|
||||
{
|
||||
_log.Info("CentralCommunicationActor started");
|
||||
|
||||
// Subscribe to the peer-replication topic so we receive health reports
|
||||
// delivered to the other central node and keep our local aggregator
|
||||
// in sync (ClusterClient load-balances reports across nodes).
|
||||
// Tolerant of non-clustered hosts (TestKit) where the extension is absent.
|
||||
try
|
||||
{
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(
|
||||
new Subscribe(HealthReportTopic, Self));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Debug("DistributedPubSub not available — peer health replication disabled: {0}", ex.Message);
|
||||
}
|
||||
|
||||
// Schedule periodic refresh of site addresses from the database
|
||||
_refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
|
||||
TimeSpan.Zero,
|
||||
|
||||
@@ -50,23 +50,34 @@ public class FlatteningPipeline : IFlatteningPipeline
|
||||
if (templateChain.Count == 0)
|
||||
return Result<FlatteningPipelineResult>.Failure("Template chain is empty.");
|
||||
|
||||
// Build composition maps
|
||||
// Build composition maps, walking nested compositions so the flattener
|
||||
// can resolve composed-of-composed attributes / alarms / scripts (e.g.
|
||||
// a parent that composes Pump where Pump itself composes AlarmSensor
|
||||
// produces "Pump.AlarmSensor.SensorReading").
|
||||
var compositionMap = new Dictionary<int, IReadOnlyList<Commons.Entities.Templates.TemplateComposition>>();
|
||||
var composedChains = new Dictionary<int, IReadOnlyList<Commons.Entities.Templates.Template>>();
|
||||
var processedTemplateIds = new HashSet<int>();
|
||||
var pendingChains = new Queue<IReadOnlyList<Commons.Entities.Templates.Template>>();
|
||||
|
||||
foreach (var template in templateChain)
|
||||
pendingChains.Enqueue(templateChain);
|
||||
while (pendingChains.Count > 0)
|
||||
{
|
||||
var compositions = await _templateRepo.GetCompositionsByTemplateIdAsync(template.Id, cancellationToken);
|
||||
if (compositions.Count > 0)
|
||||
var chain = pendingChains.Dequeue();
|
||||
foreach (var template in chain)
|
||||
{
|
||||
if (!processedTemplateIds.Add(template.Id)) continue;
|
||||
|
||||
var compositions = await _templateRepo.GetCompositionsByTemplateIdAsync(template.Id, cancellationToken);
|
||||
if (compositions.Count == 0) continue;
|
||||
|
||||
compositionMap[template.Id] = compositions;
|
||||
foreach (var comp in compositions)
|
||||
{
|
||||
if (!composedChains.ContainsKey(comp.ComposedTemplateId))
|
||||
{
|
||||
composedChains[comp.ComposedTemplateId] =
|
||||
await BuildTemplateChainAsync(comp.ComposedTemplateId, cancellationToken);
|
||||
}
|
||||
if (composedChains.ContainsKey(comp.ComposedTemplateId)) continue;
|
||||
|
||||
var composedChain = await BuildTemplateChainAsync(comp.ComposedTemplateId, cancellationToken);
|
||||
composedChains[comp.ComposedTemplateId] = composedChain;
|
||||
pendingChains.Enqueue(composedChain);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,6 +69,9 @@ public class AkkaHostedService : IHostedService
|
||||
|
||||
var hocon = $@"
|
||||
akka {{
|
||||
extensions = [
|
||||
""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools""
|
||||
]
|
||||
actor {{
|
||||
provider = cluster
|
||||
}}
|
||||
|
||||
@@ -220,11 +220,16 @@ public class ValidationService
|
||||
|
||||
internal static string? ExtractAttributeNameFromTriggerConfig(string triggerConfigJson)
|
||||
{
|
||||
// Accept both keys to stay consistent with FlatteningService.PrefixTriggerAttribute,
|
||||
// AlarmActor.ParseEvalConfig and AlarmTriggerConfigCodec. Old data may still use
|
||||
// "attribute"; the UI codec writes the canonical "attributeName".
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(triggerConfigJson);
|
||||
if (doc.RootElement.TryGetProperty("attributeName", out var prop))
|
||||
return prop.GetString();
|
||||
if (doc.RootElement.TryGetProperty("attribute", out var legacyProp))
|
||||
return legacyProp.GetString();
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user