feat(notif-outbox): carry + persist SourceNode end-to-end via NotificationSubmit

Site: inject INodeIdentityProvider where NotificationSubmit is built; stamp
SourceNode = NodeName at construction.

Central: NotificationOutboxActor.HandleSubmit copies submit.SourceNode onto
the Notification row; the repository INSERT persists it (EF tracked-entity
insert flows it through automatically; raw-SQL extension if not).

After this commit, every Notifications row carries the originating site
node-a/node-b in SourceNode. Existing notifications submitted pre-feature
remain NULL.
This commit is contained in:
Joseph Doherty
2026-05-23 17:28:23 -04:00
parent e6341580b3
commit d1fcab490c
5 changed files with 166 additions and 11 deletions

View File

@@ -957,6 +957,13 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
{
SourceInstanceId = msg.SourceInstanceId,
SourceScript = msg.SourceScript,
// SourceNode (SourceNode-stamping Task 13): the cluster node on which the
// notification was emitted (node-a/node-b for site rows). Stamped by the
// emitting site from INodeIdentityProvider and carried, inside the
// serialized payload, through the S&F buffer to central. EF tracked-entity
// insert flows it through to the Notifications.SourceNode column. Null on
// submissions buffered before the field existed.
SourceNode = msg.SourceNode,
// OriginExecutionId (Audit Log #23): the originating script execution's id,
// carried from the site so the dispatcher can echo it onto NotifyDeliver rows.
OriginExecutionId = msg.OriginExecutionId,

View File

@@ -124,6 +124,13 @@ public class ScriptExecutionActor : ReceiveActor
// to the no-emission path (the underlying S&F handoff still
// happens and a TrackedOperationId is still returned).
ICachedCallTelemetryForwarder? cachedForwarder = null;
// SourceNode-stamping (Tasks 13/14): the local node name
// resolved from INodeIdentityProvider — node-a/node-b on site
// hosts. Null in tests / hosts that haven't registered the
// provider, in which case NotificationSubmit.SourceNode and
// SiteCallOperational.SourceNode stay null and central
// persists the rows with SourceNode NULL.
string? sourceNode = null;
if (serviceProvider != null)
{
@@ -136,6 +143,7 @@ public class ScriptExecutionActor : ReceiveActor
auditWriter = serviceScope.ServiceProvider.GetService<IAuditWriter>();
operationTrackingStore = serviceScope.ServiceProvider.GetService<IOperationTrackingStore>();
cachedForwarder = serviceScope.ServiceProvider.GetService<ICachedCallTelemetryForwarder>();
sourceNode = serviceScope.ServiceProvider.GetService<INodeIdentityProvider>()?.NodeName;
}
var context = new ScriptRuntimeContext(
@@ -175,7 +183,13 @@ public class ScriptExecutionActor : ReceiveActor
// id for an inbound-API-routed call. The routed script still
// mints its own fresh ExecutionId — this records the spawner.
// Null for normal (tag-change / timer) runs.
parentExecutionId: parentExecutionId);
parentExecutionId: parentExecutionId,
// SourceNode-stamping (Tasks 13/14): the local node name
// (node-a/node-b on a site) — threaded down so Notify.Send
// and the four cached-call telemetry constructors can stamp
// it onto NotificationSubmit.SourceNode and
// SiteCallOperational.SourceNode respectively.
sourceNode: sourceNode);
var globals = new ScriptGlobals
{

View File

@@ -71,6 +71,19 @@ public class ScriptRuntimeContext
/// </summary>
private readonly string _siteId;
/// <summary>
/// SourceNode-stamping (Task 13/14): the cluster node name supplied by
/// <c>INodeIdentityProvider</c> on the local host — <c>node-a</c>/<c>node-b</c>
/// for site nodes. Stamped onto <c>NotificationSubmit.SourceNode</c> by
/// <see cref="NotifyTarget.Send"/> and onto <c>SiteCallOperational.SourceNode</c>
/// by the four <see cref="ExternalSystemHelper"/> / <see cref="DatabaseHelper"/>
/// cached-call telemetry construction sites so central can persist it on the
/// <c>Notifications</c> / <c>SiteCalls</c> rows. Null when no provider is
/// wired (legacy hosts / tests) — the helper construction sites pass null
/// through verbatim, leaving the central row's SourceNode as NULL too.
/// </summary>
private readonly string? _sourceNode;
/// <summary>
/// Notification Outbox (FU3): identifier of the script currently executing in this
/// context — stamped onto <c>NotificationSubmit.SourceScript</c> for the central
@@ -162,7 +175,8 @@ public class ScriptRuntimeContext
IOperationTrackingStore? operationTrackingStore = null,
ICachedCallTelemetryForwarder? cachedForwarder = null,
Guid? executionId = null,
Guid? parentExecutionId = null)
Guid? parentExecutionId = null,
string? sourceNode = null)
{
_instanceActor = instanceActor;
_self = self;
@@ -181,6 +195,11 @@ public class ScriptRuntimeContext
_auditWriter = auditWriter;
_operationTrackingStore = operationTrackingStore;
_cachedForwarder = cachedForwarder;
// SourceNode-stamping (Task 13/14): the local node name read from
// INodeIdentityProvider at the ScriptExecutionActor; null when no
// provider was wired so the downstream callsites pass null through
// verbatim — leaving central SourceNode as NULL.
_sourceNode = sourceNode;
_executionId = executionId ?? Guid.NewGuid();
// Audit Log #23 (ParentExecutionId): stored verbatim — no `?? NewGuid()`
// fallback. A non-routed run legitimately has no parent and stays null.
@@ -335,7 +354,10 @@ public class ScriptRuntimeContext
_executionId, _auditWriter,
// Audit Log #23 (ParentExecutionId): the spawning execution's id,
// threaded alongside _executionId. Null for non-routed runs.
_parentExecutionId);
_parentExecutionId,
// SourceNode-stamping (Task 13): the local node name (node-a/node-b),
// threaded so NotifyTarget.Send can stamp it onto NotificationSubmit.
_sourceNode);
/// <summary>
/// Audit Log #23 (M3): site-local tracking-status API for cached operations.
@@ -1328,6 +1350,14 @@ public class ScriptRuntimeContext
/// </summary>
private readonly IAuditWriter? _auditWriter;
/// <summary>
/// SourceNode-stamping (Task 13): the cluster node name on which this
/// script is executing — <c>node-a</c>/<c>node-b</c>. Stamped onto
/// <c>NotificationSubmit.SourceNode</c> by <see cref="NotifyTarget.Send"/>
/// so central can persist it on the <c>Notifications</c> row.
/// </summary>
private readonly string? _sourceNode;
// Parameter ordering: executionId sits immediately after the ILogger,
// consistent with the other audit-threaded ctors. parentExecutionId is
// a trailing optional param.
@@ -1341,7 +1371,8 @@ public class ScriptRuntimeContext
ILogger logger,
Guid executionId,
IAuditWriter? auditWriter = null,
Guid? parentExecutionId = null)
Guid? parentExecutionId = null,
string? sourceNode = null)
{
_storeAndForward = storeAndForward;
_siteCommunicationActor = siteCommunicationActor;
@@ -1353,6 +1384,7 @@ public class ScriptRuntimeContext
_executionId = executionId;
_auditWriter = auditWriter;
_parentExecutionId = parentExecutionId;
_sourceNode = sourceNode;
}
/// <summary>
@@ -1370,7 +1402,10 @@ public class ScriptRuntimeContext
_auditWriter,
// Audit Log #23 (ParentExecutionId): the spawning execution's
// id, threaded alongside _executionId. Null for non-routed runs.
_parentExecutionId);
_parentExecutionId,
// SourceNode-stamping (Task 13): the local node name, stamped
// onto NotificationSubmit.SourceNode in Send().
_sourceNode);
}
/// <summary>
@@ -1467,6 +1502,15 @@ public class ScriptRuntimeContext
/// </summary>
private readonly IAuditWriter? _auditWriter;
/// <summary>
/// SourceNode-stamping (Task 13): the cluster node name on which this
/// script is executing (<c>node-a</c>/<c>node-b</c>). Stamped onto the
/// <c>NotificationSubmit.SourceNode</c> field in <see cref="Send"/> so
/// the central <c>NotificationOutboxActor</c> can persist it on the
/// <c>Notifications</c> row.
/// </summary>
private readonly string? _sourceNode;
internal NotifyTarget(
string listName,
StoreAndForwardService? storeAndForward,
@@ -1476,7 +1520,8 @@ public class ScriptRuntimeContext
ILogger logger,
Guid executionId,
IAuditWriter? auditWriter = null,
Guid? parentExecutionId = null)
Guid? parentExecutionId = null,
string? sourceNode = null)
{
_listName = listName;
_storeAndForward = storeAndForward;
@@ -1487,6 +1532,7 @@ public class ScriptRuntimeContext
_executionId = executionId;
_auditWriter = auditWriter;
_parentExecutionId = parentExecutionId;
_sourceNode = sourceNode;
}
/// <summary>
@@ -1539,7 +1585,13 @@ public class ScriptRuntimeContext
// for an inbound-API-routed execution, null otherwise. It rides through
// the S&F buffer to central, where the dispatcher echoes it onto the
// NotifyDeliver rows so the central rows carry the routed run's parent id.
OriginParentExecutionId: _parentExecutionId);
OriginParentExecutionId: _parentExecutionId,
// SourceNode-stamping (Task 13): the cluster node name on which this
// notification was emitted (node-a/node-b). Stamped from the local
// INodeIdentityProvider via ScriptExecutionActor. Rides inside the
// serialized payload through the S&F buffer to central, where
// NotificationOutboxActor persists it on the Notifications row.
SourceNode: _sourceNode);
var payloadJson = JsonSerializer.Serialize(payload);

View File

@@ -42,7 +42,8 @@ public class NotificationOutboxActorIngestTests : TestKit
private static NotificationSubmit MakeSubmit(
string? notificationId = null,
Guid? originExecutionId = null,
Guid? originParentExecutionId = null)
Guid? originParentExecutionId = null,
string? sourceNode = null)
{
return new NotificationSubmit(
NotificationId: notificationId ?? Guid.NewGuid().ToString(),
@@ -54,7 +55,8 @@ public class NotificationOutboxActorIngestTests : TestKit
SourceScript: "AlarmScript",
SiteEnqueuedAt: new DateTimeOffset(2026, 5, 19, 8, 30, 0, TimeSpan.Zero),
OriginExecutionId: originExecutionId,
OriginParentExecutionId: originParentExecutionId);
OriginParentExecutionId: originParentExecutionId,
SourceNode: sourceNode);
}
[Fact]
@@ -192,4 +194,43 @@ public class NotificationOutboxActorIngestTests : TestKit
Assert.NotNull(ack.Error);
Assert.Contains("database unavailable", ack.Error);
}
[Fact]
public void NotificationSubmit_CopiesSourceNode_OntoPersistedNotification()
{
// SourceNode-stamping (Task 13): the originating site's node name (node-a/node-b)
// rides on the NotificationSubmit and must be persisted on the Notification row so
// central observers (KPIs, audit drill-ins, ops dashboards) can see which node
// emitted the notification.
_repository.InsertIfNotExistsAsync(Arg.Any<Notification>(), Arg.Any<CancellationToken>())
.Returns(true);
var submit = MakeSubmit(sourceNode: "node-a");
var actor = CreateActor();
actor.Tell(submit, TestActor);
ExpectMsg<NotificationSubmitAck>();
_repository.Received(1).InsertIfNotExistsAsync(
Arg.Is<Notification>(n => n.SourceNode == "node-a"),
Arg.Any<CancellationToken>());
}
[Fact]
public void NotificationSubmit_NullSourceNode_PersistsNull()
{
// Submissions from a host that didn't wire INodeIdentityProvider, or from
// pre-SourceNode-stamping clients, carry null SourceNode — the central row must
// persist NULL rather than fall back to a placeholder.
_repository.InsertIfNotExistsAsync(Arg.Any<Notification>(), Arg.Any<CancellationToken>())
.Returns(true);
var submit = MakeSubmit(sourceNode: null);
var actor = CreateActor();
actor.Tell(submit, TestActor);
ExpectMsg<NotificationSubmitAck>();
_repository.Received(1).InsertIfNotExistsAsync(
Arg.Is<Notification>(n => n.SourceNode == null),
Arg.Any<CancellationToken>());
}
}

View File

@@ -62,7 +62,8 @@ public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable
IActorRef siteCommunicationActor,
string? sourceScript = null,
Guid? executionId = null,
Guid? parentExecutionId = null)
Guid? parentExecutionId = null,
string? sourceNode = null)
{
return new ScriptRuntimeContext.NotifyHelper(
_saf,
@@ -74,7 +75,8 @@ public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable
NullLogger.Instance,
executionId ?? Guid.NewGuid(),
auditWriter: null,
parentExecutionId: parentExecutionId);
parentExecutionId: parentExecutionId,
sourceNode: sourceNode);
}
[Fact]
@@ -197,6 +199,45 @@ public class NotifyHelperTests : TestKit, IAsyncLifetime, IDisposable
Assert.Null(payload!.OriginParentExecutionId);
}
[Fact]
public async Task Send_StampsSourceNode_OnTheNotificationSubmitPayload()
{
// SourceNode-stamping (Task 13): when the helper is wired with the
// local INodeIdentityProvider's NodeName, Notify.Send must stamp it
// onto the NotificationSubmit so it rides inside the serialized S&F
// payload to central, where NotificationOutboxActor persists it on
// the Notifications row.
var commProbe = CreateTestProbe();
var notify = CreateHelper(commProbe.Ref, sourceNode: "node-a");
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
var buffered = await _saf.GetMessageByIdAsync(notificationId);
Assert.NotNull(buffered);
var payload = JsonSerializer.Deserialize<NotificationSubmit>(buffered!.PayloadJson);
Assert.NotNull(payload);
Assert.Equal("node-a", payload!.SourceNode);
}
[Fact]
public async Task Send_NoNodeIdentity_LeavesSourceNodeNull()
{
// Hosts that don't wire INodeIdentityProvider (legacy / tests) pass
// null through. The NotificationSubmit payload's SourceNode stays
// null so the central Notifications row persists NULL rather than
// falling back to a placeholder.
var commProbe = CreateTestProbe();
var notify = CreateHelper(commProbe.Ref, sourceNode: null);
var notificationId = await notify.To("Operators").Send("Pump alarm", "Pump 3 tripped");
var buffered = await _saf.GetMessageByIdAsync(notificationId);
Assert.NotNull(buffered);
var payload = JsonSerializer.Deserialize<NotificationSubmit>(buffered!.PayloadJson);
Assert.NotNull(payload);
Assert.Null(payload!.SourceNode);
}
[Fact]
public async Task Send_WhenHelperHasNoSourceScript_LeavesSourceScriptNull()
{