From d1fcab490c1069bb0f9ea83e95844c2aa552cc69 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 23 May 2026 17:28:23 -0400 Subject: [PATCH] feat(notif-outbox): carry + persist SourceNode end-to-end via NotificationSubmit Site: inject INodeIdentityProvider where NotificationSubmit is built; stamp SourceNode = NodeName at construction. Central: NotificationOutboxActor.HandleSubmit copies submit.SourceNode onto the Notification row; the repository INSERT persists it (EF tracked-entity insert flows it through automatically; raw-SQL extension if not). After this commit, every Notifications row carries the originating site node-a/node-b in SourceNode. Existing notifications submitted pre-feature remain NULL. --- .../NotificationOutboxActor.cs | 7 ++ .../Actors/ScriptExecutionActor.cs | 16 ++++- .../Scripts/ScriptRuntimeContext.cs | 64 +++++++++++++++++-- .../NotificationOutboxActorIngestTests.cs | 45 ++++++++++++- .../Scripts/NotifyHelperTests.cs | 45 ++++++++++++- 5 files changed, 166 insertions(+), 11 deletions(-) diff --git a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs index 2c3284d..4d2cc85 100644 --- a/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs +++ b/src/ScadaLink.NotificationOutbox/NotificationOutboxActor.cs @@ -957,6 +957,13 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers { SourceInstanceId = msg.SourceInstanceId, SourceScript = msg.SourceScript, + // SourceNode (SourceNode-stamping Task 13): the cluster node on which the + // notification was emitted (node-a/node-b for site rows). Stamped by the + // emitting site from INodeIdentityProvider and carried, inside the + // serialized payload, through the S&F buffer to central. EF tracked-entity + // insert flows it through to the Notifications.SourceNode column. Null on + // submissions buffered before the field existed. + SourceNode = msg.SourceNode, // OriginExecutionId (Audit Log #23): the originating script execution's id, // carried from the site so the dispatcher can echo it onto NotifyDeliver rows. OriginExecutionId = msg.OriginExecutionId, diff --git a/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs b/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs index c0f517c..b16fbf8 100644 --- a/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs @@ -124,6 +124,13 @@ public class ScriptExecutionActor : ReceiveActor // to the no-emission path (the underlying S&F handoff still // happens and a TrackedOperationId is still returned). ICachedCallTelemetryForwarder? cachedForwarder = null; + // SourceNode-stamping (Tasks 13/14): the local node name + // resolved from INodeIdentityProvider — node-a/node-b on site + // hosts. Null in tests / hosts that haven't registered the + // provider, in which case NotificationSubmit.SourceNode and + // SiteCallOperational.SourceNode stay null and central + // persists the rows with SourceNode NULL. + string? sourceNode = null; if (serviceProvider != null) { @@ -136,6 +143,7 @@ public class ScriptExecutionActor : ReceiveActor auditWriter = serviceScope.ServiceProvider.GetService(); operationTrackingStore = serviceScope.ServiceProvider.GetService(); cachedForwarder = serviceScope.ServiceProvider.GetService(); + sourceNode = serviceScope.ServiceProvider.GetService()?.NodeName; } var context = new ScriptRuntimeContext( @@ -175,7 +183,13 @@ public class ScriptExecutionActor : ReceiveActor // id for an inbound-API-routed call. The routed script still // mints its own fresh ExecutionId — this records the spawner. // Null for normal (tag-change / timer) runs. - parentExecutionId: parentExecutionId); + parentExecutionId: parentExecutionId, + // SourceNode-stamping (Tasks 13/14): the local node name + // (node-a/node-b on a site) — threaded down so Notify.Send + // and the four cached-call telemetry constructors can stamp + // it onto NotificationSubmit.SourceNode and + // SiteCallOperational.SourceNode respectively. + sourceNode: sourceNode); var globals = new ScriptGlobals { diff --git a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs index 8f8314a..554acb1 100644 --- a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -71,6 +71,19 @@ public class ScriptRuntimeContext /// private readonly string _siteId; + /// + /// SourceNode-stamping (Task 13/14): the cluster node name supplied by + /// INodeIdentityProvider on the local host — node-a/node-b + /// for site nodes. Stamped onto NotificationSubmit.SourceNode by + /// and onto SiteCallOperational.SourceNode + /// by the four / + /// cached-call telemetry construction sites so central can persist it on the + /// Notifications / SiteCalls rows. Null when no provider is + /// wired (legacy hosts / tests) — the helper construction sites pass null + /// through verbatim, leaving the central row's SourceNode as NULL too. + /// + private readonly string? _sourceNode; + /// /// Notification Outbox (FU3): identifier of the script currently executing in this /// context — stamped onto NotificationSubmit.SourceScript for the central @@ -162,7 +175,8 @@ public class ScriptRuntimeContext IOperationTrackingStore? operationTrackingStore = null, ICachedCallTelemetryForwarder? cachedForwarder = null, Guid? executionId = null, - Guid? parentExecutionId = null) + Guid? parentExecutionId = null, + string? sourceNode = null) { _instanceActor = instanceActor; _self = self; @@ -181,6 +195,11 @@ public class ScriptRuntimeContext _auditWriter = auditWriter; _operationTrackingStore = operationTrackingStore; _cachedForwarder = cachedForwarder; + // SourceNode-stamping (Task 13/14): the local node name read from + // INodeIdentityProvider at the ScriptExecutionActor; null when no + // provider was wired so the downstream callsites pass null through + // verbatim — leaving central SourceNode as NULL. + _sourceNode = sourceNode; _executionId = executionId ?? Guid.NewGuid(); // Audit Log #23 (ParentExecutionId): stored verbatim — no `?? NewGuid()` // fallback. A non-routed run legitimately has no parent and stays null. @@ -335,7 +354,10 @@ public class ScriptRuntimeContext _executionId, _auditWriter, // Audit Log #23 (ParentExecutionId): the spawning execution's id, // threaded alongside _executionId. Null for non-routed runs. - _parentExecutionId); + _parentExecutionId, + // SourceNode-stamping (Task 13): the local node name (node-a/node-b), + // threaded so NotifyTarget.Send can stamp it onto NotificationSubmit. + _sourceNode); /// /// Audit Log #23 (M3): site-local tracking-status API for cached operations. @@ -1328,6 +1350,14 @@ public class ScriptRuntimeContext /// private readonly IAuditWriter? _auditWriter; + /// + /// SourceNode-stamping (Task 13): the cluster node name on which this + /// script is executing — node-a/node-b. Stamped onto + /// NotificationSubmit.SourceNode by + /// so central can persist it on the Notifications row. + /// + private readonly string? _sourceNode; + // Parameter ordering: executionId sits immediately after the ILogger, // consistent with the other audit-threaded ctors. parentExecutionId is // a trailing optional param. @@ -1341,7 +1371,8 @@ public class ScriptRuntimeContext ILogger logger, Guid executionId, IAuditWriter? auditWriter = null, - Guid? parentExecutionId = null) + Guid? parentExecutionId = null, + string? sourceNode = null) { _storeAndForward = storeAndForward; _siteCommunicationActor = siteCommunicationActor; @@ -1353,6 +1384,7 @@ public class ScriptRuntimeContext _executionId = executionId; _auditWriter = auditWriter; _parentExecutionId = parentExecutionId; + _sourceNode = sourceNode; } /// @@ -1370,7 +1402,10 @@ public class ScriptRuntimeContext _auditWriter, // Audit Log #23 (ParentExecutionId): the spawning execution's // id, threaded alongside _executionId. Null for non-routed runs. - _parentExecutionId); + _parentExecutionId, + // SourceNode-stamping (Task 13): the local node name, stamped + // onto NotificationSubmit.SourceNode in Send(). + _sourceNode); } /// @@ -1467,6 +1502,15 @@ public class ScriptRuntimeContext /// private readonly IAuditWriter? _auditWriter; + /// + /// SourceNode-stamping (Task 13): the cluster node name on which this + /// script is executing (node-a/node-b). Stamped onto the + /// NotificationSubmit.SourceNode field in so + /// the central NotificationOutboxActor can persist it on the + /// Notifications row. + /// + private readonly string? _sourceNode; + internal NotifyTarget( string listName, StoreAndForwardService? storeAndForward, @@ -1476,7 +1520,8 @@ public class ScriptRuntimeContext ILogger logger, Guid executionId, IAuditWriter? auditWriter = null, - Guid? parentExecutionId = null) + Guid? parentExecutionId = null, + string? sourceNode = null) { _listName = listName; _storeAndForward = storeAndForward; @@ -1487,6 +1532,7 @@ public class ScriptRuntimeContext _executionId = executionId; _auditWriter = auditWriter; _parentExecutionId = parentExecutionId; + _sourceNode = sourceNode; } /// @@ -1539,7 +1585,13 @@ public class ScriptRuntimeContext // for an inbound-API-routed execution, null otherwise. It rides through // the S&F buffer to central, where the dispatcher echoes it onto the // NotifyDeliver rows so the central rows carry the routed run's parent id. - OriginParentExecutionId: _parentExecutionId); + OriginParentExecutionId: _parentExecutionId, + // SourceNode-stamping (Task 13): the cluster node name on which this + // notification was emitted (node-a/node-b). Stamped from the local + // INodeIdentityProvider via ScriptExecutionActor. Rides inside the + // serialized payload through the S&F buffer to central, where + // NotificationOutboxActor persists it on the Notifications row. + SourceNode: _sourceNode); var payloadJson = JsonSerializer.Serialize(payload); diff --git a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs index 37812d9..b55be50 100644 --- a/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs +++ b/tests/ScadaLink.NotificationOutbox.Tests/NotificationOutboxActorIngestTests.cs @@ -42,7 +42,8 @@ public class NotificationOutboxActorIngestTests : TestKit private static NotificationSubmit MakeSubmit( string? notificationId = null, Guid? originExecutionId = null, - Guid? originParentExecutionId = null) + Guid? originParentExecutionId = null, + string? sourceNode = null) { return new NotificationSubmit( NotificationId: notificationId ?? Guid.NewGuid().ToString(), @@ -54,7 +55,8 @@ public class NotificationOutboxActorIngestTests : TestKit SourceScript: "AlarmScript", SiteEnqueuedAt: new DateTimeOffset(2026, 5, 19, 8, 30, 0, TimeSpan.Zero), OriginExecutionId: originExecutionId, - OriginParentExecutionId: originParentExecutionId); + OriginParentExecutionId: originParentExecutionId, + SourceNode: sourceNode); } [Fact] @@ -192,4 +194,43 @@ public class NotificationOutboxActorIngestTests : TestKit Assert.NotNull(ack.Error); Assert.Contains("database unavailable", ack.Error); } + + [Fact] + public void NotificationSubmit_CopiesSourceNode_OntoPersistedNotification() + { + // SourceNode-stamping (Task 13): the originating site's node name (node-a/node-b) + // rides on the NotificationSubmit and must be persisted on the Notification row so + // central observers (KPIs, audit drill-ins, ops dashboards) can see which node + // emitted the notification. + _repository.InsertIfNotExistsAsync(Arg.Any(), Arg.Any()) + .Returns(true); + var submit = MakeSubmit(sourceNode: "node-a"); + var actor = CreateActor(); + + actor.Tell(submit, TestActor); + + ExpectMsg(); + _repository.Received(1).InsertIfNotExistsAsync( + Arg.Is(n => n.SourceNode == "node-a"), + Arg.Any()); + } + + [Fact] + public void NotificationSubmit_NullSourceNode_PersistsNull() + { + // Submissions from a host that didn't wire INodeIdentityProvider, or from + // pre-SourceNode-stamping clients, carry null SourceNode — the central row must + // persist NULL rather than fall back to a placeholder. + _repository.InsertIfNotExistsAsync(Arg.Any(), Arg.Any()) + .Returns(true); + var submit = MakeSubmit(sourceNode: null); + var actor = CreateActor(); + + actor.Tell(submit, TestActor); + + ExpectMsg(); + _repository.Received(1).InsertIfNotExistsAsync( + Arg.Is(n => n.SourceNode == null), + Arg.Any()); + } } diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/NotifyHelperTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/NotifyHelperTests.cs index d4f5eca..7cd558b 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Scripts/NotifyHelperTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/NotifyHelperTests.cs @@ -62,7 +62,8 @@ public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable IActorRef siteCommunicationActor, string? sourceScript = null, Guid? executionId = null, - Guid? parentExecutionId = null) + Guid? parentExecutionId = null, + string? sourceNode = null) { return new ScriptRuntimeContext.NotifyHelper( _saf, @@ -74,7 +75,8 @@ public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable NullLogger.Instance, executionId ?? Guid.NewGuid(), auditWriter: null, - parentExecutionId: parentExecutionId); + parentExecutionId: parentExecutionId, + sourceNode: sourceNode); } [Fact] @@ -197,6 +199,45 @@ public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable Assert.Null(payload!.OriginParentExecutionId); } + [Fact] + public async Task Send_StampsSourceNode_OnTheNotificationSubmitPayload() + { + // SourceNode-stamping (Task 13): when the helper is wired with the + // local INodeIdentityProvider's NodeName, Notify.Send must stamp it + // onto the NotificationSubmit so it rides inside the serialized S&F + // payload to central, where NotificationOutboxActor persists it on + // the Notifications row. + var commProbe = CreateTestProbe(); + var notify = CreateHelper(commProbe.Ref, sourceNode: "node-a"); + + var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); + + var buffered = await _saf.GetMessageByIdAsync(notificationId); + Assert.NotNull(buffered); + var payload = JsonSerializer.Deserialize(buffered!.PayloadJson); + Assert.NotNull(payload); + Assert.Equal("node-a", payload!.SourceNode); + } + + [Fact] + public async Task Send_NoNodeIdentity_LeavesSourceNodeNull() + { + // Hosts that don't wire INodeIdentityProvider (legacy / tests) pass + // null through. The NotificationSubmit payload's SourceNode stays + // null so the central Notifications row persists NULL rather than + // falling back to a placeholder. + var commProbe = CreateTestProbe(); + var notify = CreateHelper(commProbe.Ref, sourceNode: null); + + var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped"); + + var buffered = await _saf.GetMessageByIdAsync(notificationId); + Assert.NotNull(buffered); + var payload = JsonSerializer.Deserialize(buffered!.PayloadJson); + Assert.NotNull(payload); + Assert.Null(payload!.SourceNode); + } + [Fact] public async Task Send_WhenHelperHasNoSourceScript_LeavesSourceScriptNull() {