Compare commits
3 Commits
8e5c8e29f7
...
337a691629
| Author | SHA1 | Date | |
|---|---|---|---|
| 337a691629 | |||
| b06e3ae740 | |||
| f57f61deac |
@@ -77,16 +77,16 @@
|
||||
{"id": 65, "subject": "Task 65: New v2 docs (Architecture-v2, Cluster, ControlPlane, Runtime)", "status": "completed", "classification": "standard", "estMinutes": 5, "parallelizableWith": [62,63,64], "blockedBy": [57], "commit": "1689901"},
|
||||
{"id": "F1", "subject": "Follow-up: AuthEndpoints integration tests against fused Host", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": ["F2"], "blockedBy": [53], "commit": "463512d", "origin": "Deviation from Task 29 (commit 38ea0c5) — deferred until Task 53 wires AddOtOpcUaAuth/MapOtOpcUaAuth in Program. Add WebApplicationFactory<OtOpcUa.Host.Program> tests for /auth/login (204/401/503), /auth/ping (401/200), /auth/token (200+JWT), /auth/logout (204+cookie clear) using a stub ILdapAuthService.", "deviation": "Used HostBuilder + TestServer directly (Security.Tests/AuthEndpointsIntegrationTests) instead of WebApplicationFactory<Program> — Host needs Akka cluster bootstrap that's out of scope for this contract test. Cluster-mode auth coverage belongs in Task 58."},
|
||||
{"id": "F2", "subject": "Follow-up: Replace JwtBearer BuildServiceProvider antipattern with IPostConfigureOptions", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F1"], "blockedBy": [], "commit": "45a8c79", "origin": "Deviation from Task 26 (commit 207fc6a) — AddOtOpcUaAuth uses services.BuildServiceProvider().CreateScope() inside .AddJwtBearer lambda (ASP0000). Refactor to IPostConfigureOptions<JwtBearerOptions> so validation parameters resolve lazily from the real request provider."},
|
||||
{"id": "F3", "subject": "Follow-up: Add EventId unique column to ConfigAuditLog for cross-restart audit idempotency", "status": "pending", "classification": "small", "estMinutes": 15, "parallelizableWith": ["F4"], "blockedBy": [], "origin": "Deviation from Task 33 — AuditWriterActor only dedups in-buffer; ConfigAuditLog lacks EventId column so a duplicate AuditEvent that arrives after a flush becomes a duplicate row. Add nullable EventId Guid + filtered unique index, migration, and refactor AuditWriterActor.WrapDetails away."},
|
||||
{"id": "F4", "subject": "Follow-up: Harden AuditWriterActor.WrapDetails JSON synthesis with System.Text.Json", "status": "pending", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F3"], "blockedBy": [], "origin": "Self-review of Task 33 — WrapDetails uses string concat; malformed caller DetailsJson would produce invalid JSON and trip the CK_ConfigAuditLog_DetailsJson_IsJson constraint, killing the entire flush batch. Discard this task if F3 lands first (F3 removes WrapDetails entirely)."},
|
||||
{"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."},
|
||||
{"id": "F3", "subject": "Follow-up: Add EventId unique column to ConfigAuditLog for cross-restart audit idempotency", "status": "completed", "classification": "small", "estMinutes": 15, "parallelizableWith": ["F4"], "blockedBy": [], "commit": "f57f61d", "origin": "Deviation from Task 33 — AuditWriterActor only dedups in-buffer; ConfigAuditLog lacks EventId column so a duplicate AuditEvent that arrives after a flush becomes a duplicate row. Add nullable EventId Guid + filtered unique index, migration, and refactor AuditWriterActor.WrapDetails away."},
|
||||
{"id": "F4", "subject": "Follow-up: Harden AuditWriterActor.WrapDetails JSON synthesis with System.Text.Json", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F3"], "blockedBy": [], "commit": "f57f61d", "deviation": "Moot — F3 deleted WrapDetails entirely (EventId/CorrelationId now live in dedicated columns).", "origin": "Self-review of Task 33 — WrapDetails uses string concat; malformed caller DetailsJson would produce invalid JSON and trip the CK_ConfigAuditLog_DetailsJson_IsJson constraint, killing the entire flush batch. Discard this task if F3 lands first (F3 removes WrapDetails entirely)."},
|
||||
{"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "5cfbe8b", "deviation": "Delivered by Task 59 — DeployHappyPathTests.StartDeployment_seals_after_both_nodes_apply exercises the exact 'dispatch to N driver nodes, all ack, seals' flow via the real 2-node TwoNodeClusterHarness rather than a multi-system TestKit. Cleaner because it tests the production code path end-to-end.", "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."},
|
||||
{"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action<object> broadcast so tests can replace it with a probe; un-skip both tests."},
|
||||
{"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "pending", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands."},
|
||||
{"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers."},
|
||||
{"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted."},
|
||||
{"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "pending", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction."},
|
||||
{"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."},
|
||||
{"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "pending", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."},
|
||||
{"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."},
|
||||
{"id": "F13", "subject": "Follow-up: Full OpcUaApplicationHost extraction (security/alarms/history/observability)", "status": "pending", "classification": "high-risk", "estMinutes": 120, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 46 — facade only boots ApplicationInstance + StandardServer. Legacy 391-line file pulls Server.Security/Alarms/History/Observability. Pull those into thin OpcUaServer interfaces."},
|
||||
{"id": "F14", "subject": "Follow-up: Migrate side-effecting Phase7Composer (EquipmentNodeWalker, trace logs, node cache)", "status": "pending", "classification": "standard", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 47 — pure version covers the projection. Walker + alarm sink registration + cache mutation stay in legacy until split into Phase7Applier."},
|
||||
{"id": "F15", "subject": "Follow-up: Migrate 47 legacy Admin Blazor components into AdminUI library", "status": "pending", "classification": "high-risk", "estMinutes": 180, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 48 — only MapAdminUI scaffold + 1 new page (Deployments). 47 pages stay in legacy Admin (accepted-broken) until Task 56."},
|
||||
|
||||
@@ -22,4 +22,16 @@ public sealed class ConfigAuditLog
|
||||
public long? GenerationId { get; set; }
|
||||
|
||||
public string? DetailsJson { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Stable per-event identifier from <c>AuditEvent.EventId</c>. Filtered unique index on
|
||||
/// this column gives cross-restart idempotency for the batched AuditWriterActor: a flush
|
||||
/// that retries after a process crash can re-send the same EventId without producing a
|
||||
/// duplicate row. Nullable so pre-v2 rows backfill cleanly.
|
||||
/// </summary>
|
||||
public Guid? EventId { get; set; }
|
||||
|
||||
/// <summary>Correlation ID from <c>AuditEvent.CorrelationId</c> so an audit row joins to its
|
||||
/// originating request/workflow. Nullable for the same backfill reason as <see cref="EventId"/>.</summary>
|
||||
public Guid? CorrelationId { get; set; }
|
||||
}
|
||||
|
||||
+1755
File diff suppressed because it is too large
Load Diff
+50
@@ -0,0 +1,50 @@
|
||||
using System;
|
||||
using Microsoft.EntityFrameworkCore.Migrations;
|
||||
|
||||
#nullable disable
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public partial class AddConfigAuditLogEventIdColumns : Migration
|
||||
{
|
||||
/// <inheritdoc />
|
||||
protected override void Up(MigrationBuilder migrationBuilder)
|
||||
{
|
||||
migrationBuilder.AddColumn<Guid>(
|
||||
name: "CorrelationId",
|
||||
table: "ConfigAuditLog",
|
||||
type: "uniqueidentifier",
|
||||
nullable: true);
|
||||
|
||||
migrationBuilder.AddColumn<Guid>(
|
||||
name: "EventId",
|
||||
table: "ConfigAuditLog",
|
||||
type: "uniqueidentifier",
|
||||
nullable: true);
|
||||
|
||||
migrationBuilder.CreateIndex(
|
||||
name: "UX_ConfigAuditLog_EventId",
|
||||
table: "ConfigAuditLog",
|
||||
column: "EventId",
|
||||
unique: true,
|
||||
filter: "[EventId] IS NOT NULL");
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
protected override void Down(MigrationBuilder migrationBuilder)
|
||||
{
|
||||
migrationBuilder.DropIndex(
|
||||
name: "UX_ConfigAuditLog_EventId",
|
||||
table: "ConfigAuditLog");
|
||||
|
||||
migrationBuilder.DropColumn(
|
||||
name: "CorrelationId",
|
||||
table: "ConfigAuditLog");
|
||||
|
||||
migrationBuilder.DropColumn(
|
||||
name: "EventId",
|
||||
table: "ConfigAuditLog");
|
||||
}
|
||||
}
|
||||
}
|
||||
+11
@@ -165,9 +165,15 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<Guid?>("CorrelationId")
|
||||
.HasColumnType("uniqueidentifier");
|
||||
|
||||
b.Property<string>("DetailsJson")
|
||||
.HasColumnType("nvarchar(max)");
|
||||
|
||||
b.Property<Guid?>("EventId")
|
||||
.HasColumnType("uniqueidentifier");
|
||||
|
||||
b.Property<string>("EventType")
|
||||
.IsRequired()
|
||||
.HasMaxLength(64)
|
||||
@@ -192,6 +198,11 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
|
||||
b.HasKey("AuditId");
|
||||
|
||||
b.HasIndex("EventId")
|
||||
.IsUnique()
|
||||
.HasDatabaseName("UX_ConfigAuditLog_EventId")
|
||||
.HasFilter("[EventId] IS NOT NULL");
|
||||
|
||||
b.HasIndex("GenerationId")
|
||||
.HasDatabaseName("IX_ConfigAuditLog_Generation")
|
||||
.HasFilter("[GenerationId] IS NOT NULL");
|
||||
|
||||
@@ -413,6 +413,8 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
||||
e.Property(x => x.ClusterId).HasMaxLength(64);
|
||||
e.Property(x => x.NodeId).HasMaxLength(64);
|
||||
e.Property(x => x.DetailsJson).HasColumnType("nvarchar(max)");
|
||||
e.Property(x => x.EventId);
|
||||
e.Property(x => x.CorrelationId);
|
||||
|
||||
e.HasIndex(x => new { x.ClusterId, x.Timestamp })
|
||||
.IsDescending(false, true)
|
||||
@@ -420,6 +422,14 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
||||
e.HasIndex(x => x.GenerationId)
|
||||
.HasFilter("[GenerationId] IS NOT NULL")
|
||||
.HasDatabaseName("IX_ConfigAuditLog_Generation");
|
||||
// Filtered unique index gives cross-restart idempotency for the AuditWriterActor:
|
||||
// a retry of an already-flushed batch will hit this constraint and the catch in
|
||||
// FlushBuffer drops the duplicate insert. Nullable + filter so legacy backfill rows
|
||||
// (EventId=NULL) don't collide.
|
||||
e.HasIndex(x => x.EventId)
|
||||
.IsUnique()
|
||||
.HasFilter("[EventId] IS NOT NULL")
|
||||
.HasDatabaseName("UX_ConfigAuditLog_EventId");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -14,9 +14,11 @@ namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Audit;
|
||||
/// - <see cref="FlushInterval"/> elapses with a non-empty buffer.
|
||||
/// - <c>PreRestart</c> / <c>PostStop</c> (supervisor swap or coordinated shutdown).
|
||||
///
|
||||
/// Dedup is in-buffer only — once a batch is flushed, the actor accepts a duplicate
|
||||
/// <see cref="AuditEvent.EventId"/> as a new row. True cross-restart idempotency needs an
|
||||
/// EventId column with a unique index on <c>ConfigAuditLog</c>; tracked as follow-up F3.
|
||||
/// Dedup is two-layer: in-buffer (the <see cref="Dictionary{TKey, TValue}"/> below collapses
|
||||
/// duplicate EventIds before flush) and at the database via the filtered unique index
|
||||
/// <c>UX_ConfigAuditLog_EventId</c> (cross-restart safety — a retry of an already-flushed
|
||||
/// batch hits the constraint and we drop the duplicate insert without losing the rest of
|
||||
/// the batch).
|
||||
/// </summary>
|
||||
public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
@@ -70,7 +72,9 @@ public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
||||
Principal = evt.Actor,
|
||||
EventType = $"{evt.Category}:{evt.Action}",
|
||||
NodeId = evt.SourceNode.Value,
|
||||
DetailsJson = WrapDetails(evt),
|
||||
DetailsJson = evt.DetailsJson,
|
||||
EventId = evt.EventId,
|
||||
CorrelationId = evt.CorrelationId.Value,
|
||||
});
|
||||
}
|
||||
db.SaveChanges();
|
||||
@@ -82,17 +86,6 @@ public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Wraps caller-supplied details with the EventId + CorrelationId so audit consumers can
|
||||
/// reconstruct the original message. Until ConfigAuditLog gains a first-class EventId column
|
||||
/// (follow-up F3), this is the only place these correlation IDs are persisted.
|
||||
/// </summary>
|
||||
private static string WrapDetails(AuditEvent evt)
|
||||
{
|
||||
var details = evt.DetailsJson ?? "null";
|
||||
return $"{{\"eventId\":\"{evt.EventId:N}\",\"correlationId\":\"{evt.CorrelationId.Value:N}\",\"details\":{details}}}";
|
||||
}
|
||||
|
||||
protected override void PreRestart(Exception reason, object message)
|
||||
{
|
||||
FlushBuffer();
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Net.Sockets;
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
@@ -8,7 +9,13 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.Health;
|
||||
/// <summary>
|
||||
/// Periodically pings a peer node's OPC UA endpoint (<c>opc.tcp://peer:4840</c>) and publishes
|
||||
/// the result on the cluster's redundancy-state input topic so the admin <c>RedundancyStateActor</c>
|
||||
/// can react. Real OPC UA probe call is staged for follow-up F12.
|
||||
/// can react.
|
||||
///
|
||||
/// The probe is a plain TCP connect to the OPC UA port with a short timeout — enough to detect
|
||||
/// "is the OPC UA server process up and accepting connections?" A full secure-channel handshake
|
||||
/// (Hello / Acknowledge) needs the OPC UA Client SDK and a session/PKI setup, which is more than
|
||||
/// what the redundancy calculator needs. Upgrade to a real Hello probe if a deeper liveness signal
|
||||
/// is ever required.
|
||||
/// </summary>
|
||||
public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
@@ -17,38 +24,87 @@ public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers
|
||||
public const string RedundancyStateTopic = "redundancy-state";
|
||||
|
||||
public static readonly TimeSpan DefaultProbeInterval = TimeSpan.FromSeconds(10);
|
||||
public static readonly TimeSpan DefaultConnectTimeout = TimeSpan.FromSeconds(2);
|
||||
public const int DefaultOpcUaPort = 4840;
|
||||
|
||||
public sealed record OpcUaProbeResult(NodeId NodeId, bool Ok);
|
||||
public sealed class Tick { public static readonly Tick Instance = new(); private Tick() { } }
|
||||
|
||||
private readonly NodeId _peer;
|
||||
private readonly TimeSpan _interval;
|
||||
private readonly TimeSpan _connectTimeout;
|
||||
private readonly int _opcUaPort;
|
||||
private readonly Action<object>? _broadcastOverride;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
public ITimerScheduler Timers { get; set; } = null!;
|
||||
|
||||
public static Props Props(NodeId peer, TimeSpan? interval = null, Action<object>? broadcast = null) =>
|
||||
Akka.Actor.Props.Create(() => new PeerOpcUaProbeActor(peer, interval ?? DefaultProbeInterval, broadcast));
|
||||
public static Props Props(
|
||||
NodeId peer,
|
||||
TimeSpan? interval = null,
|
||||
TimeSpan? connectTimeout = null,
|
||||
int opcUaPort = DefaultOpcUaPort,
|
||||
Action<object>? broadcast = null) =>
|
||||
Akka.Actor.Props.Create(() => new PeerOpcUaProbeActor(
|
||||
peer,
|
||||
interval ?? DefaultProbeInterval,
|
||||
connectTimeout ?? DefaultConnectTimeout,
|
||||
opcUaPort,
|
||||
broadcast));
|
||||
|
||||
public PeerOpcUaProbeActor(NodeId peer, TimeSpan interval, Action<object>? broadcastOverride)
|
||||
public PeerOpcUaProbeActor(
|
||||
NodeId peer,
|
||||
TimeSpan interval,
|
||||
TimeSpan connectTimeout,
|
||||
int opcUaPort,
|
||||
Action<object>? broadcastOverride)
|
||||
{
|
||||
_peer = peer;
|
||||
_interval = interval;
|
||||
_connectTimeout = connectTimeout;
|
||||
_opcUaPort = opcUaPort;
|
||||
_broadcastOverride = broadcastOverride;
|
||||
|
||||
Receive<Tick>(_ => RunProbe());
|
||||
ReceiveAsync<Tick>(_ => RunProbeAsync());
|
||||
}
|
||||
|
||||
protected override void PreStart() =>
|
||||
Timers.StartPeriodicTimer("probe", Tick.Instance, _interval);
|
||||
|
||||
private void RunProbe()
|
||||
private async Task RunProbeAsync()
|
||||
{
|
||||
// F12: actual opc.tcp ping. Assume Ok=true until the probe is wired.
|
||||
var msg = new OpcUaProbeResult(_peer, Ok: true);
|
||||
var host = ExtractHost(_peer);
|
||||
var ok = await TryTcpConnectAsync(host, _opcUaPort, _connectTimeout);
|
||||
var msg = new OpcUaProbeResult(_peer, ok);
|
||||
if (_broadcastOverride is not null) _broadcastOverride(msg);
|
||||
else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(RedundancyStateTopic, msg));
|
||||
_log.Debug("PeerOpcUaProbe: pinged {Peer} (probe staged for F12)", _peer);
|
||||
_log.Debug("PeerOpcUaProbe: pinged {Peer} ({Host}:{Port}) → ok={Ok}",
|
||||
_peer, host, _opcUaPort, ok);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="NodeId"/> values are derived as <c>host:port</c> (see <c>ClusterRoleInfo</c>).
|
||||
/// Strip the port suffix to get the hostname for the TCP connect.
|
||||
/// </summary>
|
||||
private static string ExtractHost(NodeId nodeId)
|
||||
{
|
||||
var s = nodeId.Value;
|
||||
var colon = s.LastIndexOf(':');
|
||||
return colon > 0 ? s[..colon] : s;
|
||||
}
|
||||
|
||||
private static async Task<bool> TryTcpConnectAsync(string host, int port, TimeSpan timeout)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var client = new TcpClient();
|
||||
using var cts = new CancellationTokenSource(timeout);
|
||||
await client.ConnectAsync(host, port, cts.Token);
|
||||
return client.Connected;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ public sealed class AuditWriterActorTests : ControlPlaneActorTestBase
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Details_wrapper_embeds_eventId_and_correlationId()
|
||||
public void EventId_and_CorrelationId_are_persisted_to_dedicated_columns()
|
||||
{
|
||||
var dbFactory = NewInMemoryDbFactory();
|
||||
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
||||
@@ -92,9 +92,9 @@ public sealed class AuditWriterActorTests : ControlPlaneActorTestBase
|
||||
|
||||
using var db = dbFactory.CreateDbContext();
|
||||
var row = db.ConfigAuditLogs.Single();
|
||||
row.DetailsJson.ShouldNotBeNull();
|
||||
row.DetailsJson.ShouldContain(eventId.ToString("N"));
|
||||
row.DetailsJson.ShouldContain("\"correlationId\":");
|
||||
row.EventId.ShouldBe(eventId);
|
||||
row.CorrelationId.ShouldNotBeNull();
|
||||
row.DetailsJson.ShouldBe("{\"field\":\"value\"}");
|
||||
row.EventType.ShouldBe("Config:Edit");
|
||||
row.NodeId.ShouldBe("node-a");
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using Akka.Actor;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
@@ -24,16 +26,38 @@ public sealed class HealthProbeActorTests : RuntimeActorTestBase
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void PeerOpcUaProbeActor_publishes_probe_result_at_each_tick()
|
||||
public void PeerOpcUaProbeActor_reports_Ok_true_against_a_live_listener()
|
||||
{
|
||||
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
||||
|
||||
var received = new System.Collections.Generic.List<object>();
|
||||
var actor = Sys.ActorOf(PeerOpcUaProbeActor.Props(
|
||||
NodeId.Parse("peer-1"),
|
||||
Sys.ActorOf(PeerOpcUaProbeActor.Props(
|
||||
NodeId.Parse($"127.0.0.1:{port}"),
|
||||
interval: TimeSpan.FromMilliseconds(50),
|
||||
connectTimeout: TimeSpan.FromMilliseconds(500),
|
||||
opcUaPort: port,
|
||||
broadcast: msg => received.Add(msg)));
|
||||
|
||||
AwaitCondition(() => received.Count >= 2, TimeSpan.FromSeconds(2));
|
||||
received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().ShouldNotBeEmpty();
|
||||
AwaitCondition(() => received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().Any(r => r.Ok),
|
||||
TimeSpan.FromSeconds(3));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void PeerOpcUaProbeActor_reports_Ok_false_against_an_unreachable_endpoint()
|
||||
{
|
||||
// Port 1 is reserved (tcpmux) and almost never bound on dev machines, so the connect fails fast.
|
||||
var received = new System.Collections.Generic.List<object>();
|
||||
Sys.ActorOf(PeerOpcUaProbeActor.Props(
|
||||
NodeId.Parse("127.0.0.1:1"),
|
||||
interval: TimeSpan.FromMilliseconds(50),
|
||||
connectTimeout: TimeSpan.FromMilliseconds(300),
|
||||
opcUaPort: 1,
|
||||
broadcast: msg => received.Add(msg)));
|
||||
|
||||
AwaitCondition(() => received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().Any(r => !r.Ok),
|
||||
TimeSpan.FromSeconds(3));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
||||
Reference in New Issue
Block a user