feat(auditlog): thread ParentExecutionId through S&F for retry-loop cached rows

The store-and-forward retry loop emits the per-attempt and terminal cached
audit rows (ApiCallCached/DbWriteCached Attempted, CachedResolve) via
CachedCallLifecycleBridge from a CachedCallAttemptContext, not from the
script context. The ExecutionId rollout (Task 4) already threaded ExecutionId
and SourceScript through this path; ParentExecutionId — the spawning
inbound-API request's ExecutionId — was not, so those retry-loop rows had
ParentExecutionId = null even for an inbound-API-routed run.

Thread it additively as a sibling at every carry point ExecutionId passes
through:

- StoreAndForwardMessage gains ParentExecutionId (Guid?).
- StoreAndForwardStorage adds a nullable parent_execution_id column via the
  same idempotent PRAGMA-probed ALTER TABLE migration; rows persisted by an
  older build read back null (back-compat). The defensive Guid.TryParse read
  helper (ParseExecutionId) is renamed ParseGuidColumn and reused for both
  columns so a corrupt value cannot abort the retry sweep.
- StoreAndForwardService.EnqueueAsync gains an optional parentExecutionId
  param, stamped onto the buffered message and surfaced on the
  CachedCallAttemptContext built in the retry loop.
- CachedCallAttemptContext gains ParentExecutionId.
- CachedCallLifecycleBridge.BuildPacket sets AuditEvent.ParentExecutionId
  from the context, beside the existing ExecutionId.
- IExternalSystemClient.CachedCallAsync / IDatabaseGateway.CachedWriteAsync
  gain an optional parentExecutionId param; ScriptRuntimeContext's CachedCall
  / CachedWrite helpers pass _parentExecutionId.

All threading is additive — ParentExecutionId is Guid? everywhere, null for
non-routed runs, and old buffered S&F rows still deserialize with the new
field null.
This commit is contained in:
Joseph Doherty
2026-05-21 17:58:11 -04:00
parent 150ba5e63f
commit c00603e2a4
15 changed files with 581 additions and 51 deletions

View File

@@ -33,7 +33,8 @@ public class CachedCallLifecycleBridgeTests
string? lastError = null,
int? httpStatus = null,
Guid? executionId = null,
string? sourceScript = null) =>
string? sourceScript = null,
Guid? parentExecutionId = null) =>
new(
TrackedOperationId: _id,
Channel: channel,
@@ -48,7 +49,8 @@ public class CachedCallLifecycleBridgeTests
DurationMs: 42,
SourceInstanceId: "Plant.Pump42",
ExecutionId: executionId,
SourceScript: sourceScript);
SourceScript: sourceScript,
ParentExecutionId: parentExecutionId);
[Fact]
public async Task TransientFailure_EmitsOneAttemptedRow_NoResolve()
@@ -259,4 +261,70 @@ public class CachedCallLifecycleBridgeTests
Assert.Null(captured!.Audit.ExecutionId);
Assert.Null(captured.Audit.SourceScript);
}
// ── Audit Log #23 (ParentExecutionId Task 6): ParentExecutionId ──
[Fact]
public async Task RetryLoopAttemptedRow_CarriesParentExecutionId_FromContext()
{
// Task 6: the ParentExecutionId threaded through the S&F buffer (the
// inbound-API run that spawned the originating script) arrives on the
// CachedCallAttemptContext; the bridge must stamp it onto the
// per-attempt ApiCallCached row beside ExecutionId.
var parentExecutionId = Guid.NewGuid();
var captured = new List<CachedCallTelemetry>();
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var sut = CreateSut();
await sut.OnAttemptCompletedAsync(Ctx(
CachedCallAttemptOutcome.TransientFailure,
parentExecutionId: parentExecutionId));
var packet = Assert.Single(captured);
Assert.Equal(AuditKind.ApiCallCached, packet.Audit.Kind);
Assert.Equal(parentExecutionId, packet.Audit.ParentExecutionId);
}
[Fact]
public async Task RetryLoopCachedResolveRow_CarriesParentExecutionId_FromContext()
{
// The terminal CachedResolve row must also carry the threaded
// ParentExecutionId so the whole retry-loop lifecycle correlates back
// to the spawning inbound-API execution.
var parentExecutionId = Guid.NewGuid();
var captured = new List<CachedCallTelemetry>();
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var sut = CreateSut();
await sut.OnAttemptCompletedAsync(Ctx(
CachedCallAttemptOutcome.Delivered,
channel: "DbOutbound",
parentExecutionId: parentExecutionId));
Assert.Equal(2, captured.Count);
var resolve = Assert.Single(captured, p => p.Audit.Kind == AuditKind.CachedResolve);
Assert.Equal(parentExecutionId, resolve.Audit.ParentExecutionId);
var attempted = Assert.Single(captured, p => p.Audit.Kind == AuditKind.DbWriteCached);
Assert.Equal(parentExecutionId, attempted.Audit.ParentExecutionId);
}
[Fact]
public async Task RetryLoopRow_NullParentExecutionId_RemainsNull()
{
// Back-compat / non-routed run: the originating script was not spawned
// by an inbound-API request, so ParentExecutionId is null; the bridge
// must leave the audit row's ParentExecutionId null rather than throwing.
CachedCallTelemetry? captured = null;
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured = t), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var sut = CreateSut();
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure));
Assert.NotNull(captured);
Assert.Null(captured!.Audit.ParentExecutionId);
}
}

View File

@@ -75,7 +75,7 @@ public class DatabaseCachedWriteEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.Returns(Task.CompletedTask);
var forwarder = new CapturingForwarder();
@@ -116,7 +116,7 @@ public class DatabaseCachedWriteEmissionTests
It.IsAny<string?>(),
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.Returns(Task.CompletedTask);
var forwarder = new CapturingForwarder();
@@ -145,7 +145,7 @@ public class DatabaseCachedWriteEmissionTests
It.IsAny<string?>(),
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.Returns(Task.CompletedTask);
var forwarder = new CapturingForwarder();
@@ -167,7 +167,7 @@ public class DatabaseCachedWriteEmissionTests
It.IsAny<string?>(),
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.Returns(Task.CompletedTask);
var forwarder = new CapturingForwarder();
@@ -181,7 +181,7 @@ public class DatabaseCachedWriteEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
trackedId,
It.IsAny<Guid?>(), It.IsAny<string?>()),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()),
Times.Once);
}
@@ -205,7 +205,7 @@ public class DatabaseCachedWriteEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.Returns(Task.CompletedTask);
var forwarder = new CapturingForwarder();
@@ -221,7 +221,79 @@ public class DatabaseCachedWriteEmissionTests
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.Is<Guid?>(id => id == TestExecutionId),
It.Is<string?>(s => s == SourceScript)),
It.Is<string?>(s => s == SourceScript),
It.IsAny<Guid?>()),
Times.Once);
}
/// <summary>
/// Audit Log #23 (ParentExecutionId Task 6): the helper → gateway hop for
/// <c>ParentExecutionId</c>. A cached write enqueued from an inbound-API-
/// routed script run must forward the runtime context's
/// <c>ParentExecutionId</c> verbatim into
/// <see cref="IDatabaseGateway.CachedWriteAsync"/> so the buffered retry
/// loop later stamps it onto its audit rows.
/// </summary>
[Fact]
public async Task CachedWrite_ThreadsParentExecutionId_IntoGateway()
{
var parentExecutionId = Guid.NewGuid();
var gateway = new Mock<IDatabaseGateway>();
gateway
.Setup(g => g.CachedWriteAsync(
"myDb", "INSERT INTO t VALUES (1)",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.Returns(Task.CompletedTask);
var forwarder = new CapturingForwarder();
var helper = CreateHelper(gateway.Object, forwarder, parentExecutionId);
await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
gateway.Verify(g => g.CachedWriteAsync(
"myDb", "INSERT INTO t VALUES (1)",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(),
It.Is<Guid?>(id => id == parentExecutionId)),
Times.Once);
}
/// <summary>
/// Audit Log #23 (ParentExecutionId Task 6): a non-routed run threads a
/// <c>null</c> ParentExecutionId into the gateway — the additive default.
/// </summary>
[Fact]
public async Task CachedWrite_NonRoutedRun_ThreadsNullParentExecutionId_IntoGateway()
{
var gateway = new Mock<IDatabaseGateway>();
gateway
.Setup(g => g.CachedWriteAsync(
"myDb", "INSERT INTO t VALUES (1)",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.Returns(Task.CompletedTask);
var forwarder = new CapturingForwarder();
var helper = CreateHelper(gateway.Object, forwarder);
await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
gateway.Verify(g => g.CachedWriteAsync(
"myDb", "INSERT INTO t VALUES (1)",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(),
It.Is<Guid?>(id => id == null)),
Times.Once);
}
@@ -236,7 +308,7 @@ public class DatabaseCachedWriteEmissionTests
It.IsAny<string?>(),
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.Returns(Task.CompletedTask);
var forwarder = new CapturingForwarder
{
@@ -253,7 +325,7 @@ public class DatabaseCachedWriteEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
trackedId,
It.IsAny<Guid?>(), It.IsAny<string?>()),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()),
Times.Once);
}
}

View File

@@ -78,7 +78,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var forwarder = new CapturingForwarder();
@@ -121,7 +121,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false));
var forwarder = new CapturingForwarder();
@@ -158,7 +158,7 @@ public class ExternalSystemCachedCallEmissionTests
It.IsAny<string?>(),
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var forwarder = new CapturingForwarder();
@@ -178,7 +178,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
id1,
It.IsAny<Guid?>(), It.IsAny<string?>()),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()),
Times.Once);
client.Verify(c => c.CachedCallAsync(
"ERP", "GetOrder",
@@ -186,7 +186,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
id2,
It.IsAny<Guid?>(), It.IsAny<string?>()),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()),
Times.Once);
}
@@ -210,7 +210,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var forwarder = new CapturingForwarder();
@@ -226,7 +226,79 @@ public class ExternalSystemCachedCallEmissionTests
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.Is<Guid?>(id => id == TestExecutionId),
It.Is<string?>(s => s == SourceScript)),
It.Is<string?>(s => s == SourceScript),
It.IsAny<Guid?>()),
Times.Once);
}
/// <summary>
/// Audit Log #23 (ParentExecutionId Task 6): the helper → gateway hop for
/// <c>ParentExecutionId</c>. A cached call enqueued from an inbound-API-
/// routed script run must forward the runtime context's
/// <c>ParentExecutionId</c> verbatim into
/// <see cref="IExternalSystemClient.CachedCallAsync"/> so the buffered
/// retry loop later stamps it onto its audit rows.
/// </summary>
[Fact]
public async Task CachedCall_ThreadsParentExecutionId_IntoClient()
{
var parentExecutionId = Guid.NewGuid();
var client = new Mock<IExternalSystemClient>();
client
.Setup(c => c.CachedCallAsync(
"ERP", "GetOrder",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var forwarder = new CapturingForwarder();
var helper = CreateHelper(client.Object, forwarder, parentExecutionId);
await helper.CachedCall("ERP", "GetOrder");
client.Verify(c => c.CachedCallAsync(
"ERP", "GetOrder",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(),
It.Is<Guid?>(id => id == parentExecutionId)),
Times.Once);
}
/// <summary>
/// Audit Log #23 (ParentExecutionId Task 6): a non-routed run threads a
/// <c>null</c> ParentExecutionId into the client — the additive default.
/// </summary>
[Fact]
public async Task CachedCall_NonRoutedRun_ThreadsNullParentExecutionId_IntoClient()
{
var client = new Mock<IExternalSystemClient>();
client
.Setup(c => c.CachedCallAsync(
"ERP", "GetOrder",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var forwarder = new CapturingForwarder();
var helper = CreateHelper(client.Object, forwarder);
await helper.CachedCall("ERP", "GetOrder");
client.Verify(c => c.CachedCallAsync(
"ERP", "GetOrder",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(),
It.Is<Guid?>(id => id == null)),
Times.Once);
}
@@ -241,7 +313,7 @@ public class ExternalSystemCachedCallEmissionTests
It.IsAny<string?>(),
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var forwarder = new CapturingForwarder
{
@@ -261,7 +333,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
trackedId,
It.IsAny<Guid?>(), It.IsAny<string?>()),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()),
Times.Once);
}
@@ -276,7 +348,7 @@ public class ExternalSystemCachedCallEmissionTests
It.IsAny<string?>(),
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var forwarder = new CapturingForwarder();
@@ -303,7 +375,7 @@ public class ExternalSystemCachedCallEmissionTests
It.IsAny<string?>(),
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var helper = CreateHelper(client.Object, forwarder: null);
@@ -316,7 +388,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
trackedId,
It.IsAny<Guid?>(), It.IsAny<string?>()),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()),
Times.Once);
}
@@ -346,7 +418,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
// WasBuffered=false — the immediate HTTP attempt succeeded; S&F
// is bypassed entirely.
.ReturnsAsync(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false));
@@ -412,7 +484,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false));
var forwarder = new CapturingForwarder();
@@ -442,7 +514,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(
false, null, "Permanent error: HTTP 422 bad payload", WasBuffered: false));
var forwarder = new CapturingForwarder();
@@ -485,7 +557,7 @@ public class ExternalSystemCachedCallEmissionTests
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>()))
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
// S&F took ownership — Attempted + Resolve come from the
// CachedCallLifecycleBridge driven by the retry loop, not the helper.
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));

View File

@@ -357,6 +357,83 @@ public class CachedCallAttemptEmissionTests : IAsyncLifetime, IDisposable
Assert.Equal("Plant.Tank/OnAlarm", notification.SourceScript);
}
// ── Audit Log #23 (ParentExecutionId Task 6): ParentExecutionId ──
[Fact]
public async Task Attempt_CarriesParentExecutionId_FromBufferedMessage()
{
// A cached call enqueued from an inbound-API-routed script run carries
// the spawning execution's ParentExecutionId. The retry sweep must
// surface it on the CachedCallAttemptContext beside ExecutionId so the
// audit bridge can stamp it on the retry-loop cached rows.
var parentExecutionId = Guid.NewGuid();
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("HTTP 503"));
var trackedId = TrackedOperationId.New();
await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem,
"ERP",
"""{"payload":"x"}""",
originInstanceName: "Plant.Pump42",
maxRetries: 5,
retryInterval: TimeSpan.Zero,
attemptImmediateDelivery: false,
messageId: trackedId.ToString(),
parentExecutionId: parentExecutionId);
await _service.RetryPendingMessagesAsync();
var notification = Assert.Single(_observer.Notifications);
Assert.Equal(parentExecutionId, notification.ParentExecutionId);
}
[Fact]
public async Task Attempt_NullParentExecutionId_SurfacesAsNull()
{
// Non-routed run: the originating script was not spawned by an
// inbound-API request, so no ParentExecutionId is threaded. It must
// surface as null on the context, not throw.
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => Task.FromResult(true));
var trackedId = await EnqueueBufferedAsync(
StoreAndForwardCategory.ExternalSystem, "ERP");
await _service.RetryPendingMessagesAsync();
var notification = Assert.Single(_observer.Notifications);
Assert.Null(notification.ParentExecutionId);
}
[Fact]
public async Task TerminalResolve_CarriesParentExecutionId()
{
// The terminal Delivered notification must also carry the threaded
// ParentExecutionId so the CachedResolve audit row correlates back to
// the spawning inbound-API execution.
var parentExecutionId = Guid.NewGuid();
_service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite,
_ => Task.FromResult(true));
var trackedId = TrackedOperationId.New();
await _service.EnqueueAsync(
StoreAndForwardCategory.CachedDbWrite,
"myDb",
"""{"payload":"x"}""",
originInstanceName: "Plant.Tank",
maxRetries: 3,
retryInterval: TimeSpan.Zero,
attemptImmediateDelivery: false,
messageId: trackedId.ToString(),
parentExecutionId: parentExecutionId);
await _service.RetryPendingMessagesAsync();
var notification = Assert.Single(_observer.Notifications);
Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome);
Assert.Equal(parentExecutionId, notification.ParentExecutionId);
}
// ── Best-effort contract: observer throws must NOT corrupt retry bookkeeping ──
[Fact]

View File

@@ -452,6 +452,141 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable
Assert.Equal(message.ExecutionId, retrieved!.ExecutionId);
}
// ── Audit Log #23 (ParentExecutionId Task 6): parent_execution_id ──
[Fact]
public async Task EnqueueAsync_RoundTripsParentExecutionId()
{
// A cached call buffered from an inbound-API-routed script run carries
// the spawning execution's ParentExecutionId; it must survive a persist
// + read-back so the retry loop can stamp it on audit rows.
var parentExecutionId = Guid.NewGuid();
var message = CreateMessage("parent1", StoreAndForwardCategory.ExternalSystem);
message.ParentExecutionId = parentExecutionId;
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("parent1");
Assert.NotNull(retrieved);
Assert.Equal(parentExecutionId, retrieved!.ParentExecutionId);
}
[Fact]
public async Task EnqueueAsync_NullParentExecutionId_RoundTripsAsNull()
{
// A non-routed run supplies no ParentExecutionId — it must round-trip
// as null rather than throwing or coercing.
var message = CreateMessage("noparent1", StoreAndForwardCategory.ExternalSystem);
Assert.Null(message.ParentExecutionId);
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("noparent1");
Assert.NotNull(retrieved);
Assert.Null(retrieved!.ParentExecutionId);
}
[Fact]
public async Task ParentExecutionId_SurvivesRetrySweepRead()
{
// The retry sweep reads due rows via GetMessagesForRetryAsync; the new
// parent_execution_id field must be present on that read path too — it
// is the path that feeds the CachedCallAttemptContext.
var parentExecutionId = Guid.NewGuid();
var message = CreateMessage("psweep1", StoreAndForwardCategory.CachedDbWrite);
message.ParentExecutionId = parentExecutionId;
message.LastAttemptAt = null; // due immediately
await _storage.EnqueueAsync(message);
var due = await _storage.GetMessagesForRetryAsync();
var row = Assert.Single(due, m => m.Id == "psweep1");
Assert.Equal(parentExecutionId, row.ParentExecutionId);
}
[Fact]
public async Task LegacyRowWithoutParentExecutionIdColumn_ReadsBackAsNull()
{
// Back-compat: a row persisted by a build that pre-dates the
// parent_execution_id column must still deserialize, with
// ParentExecutionId reading back as null. Simulate the pre-Task-6
// schema (which already has execution_id / source_script from the
// ExecutionId rollout) by recreating the table without
// parent_execution_id, inserting directly, then running InitializeAsync
// which ALTER-adds the column.
await using (var setup = new SqliteConnection($"Data Source={_dbName};Mode=Memory;Cache=Shared"))
{
await setup.OpenAsync();
await using var drop = setup.CreateCommand();
drop.CommandText = @"
DROP TABLE IF EXISTS sf_messages;
CREATE TABLE sf_messages (
id TEXT PRIMARY KEY,
category INTEGER NOT NULL,
target TEXT NOT NULL,
payload_json TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 50,
retry_interval_ms INTEGER NOT NULL DEFAULT 30000,
created_at TEXT NOT NULL,
last_attempt_at TEXT,
status INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
origin_instance TEXT,
execution_id TEXT,
source_script TEXT
);
INSERT INTO sf_messages (id, category, target, payload_json, created_at, status)
VALUES ('plegacy1', 0, 'ERP', '{}', '2026-01-01T00:00:00.0000000+00:00', 0);";
await drop.ExecuteNonQueryAsync();
}
// InitializeAsync must additively ALTER-in parent_execution_id without
// disturbing the pre-existing legacy row.
await _storage.InitializeAsync();
var retrieved = await _storage.GetMessageByIdAsync("plegacy1");
Assert.NotNull(retrieved);
Assert.Equal("plegacy1", retrieved!.Id);
Assert.Null(retrieved.ParentExecutionId);
}
[Fact]
public async Task MalformedParentExecutionId_ReadsBackAsNull_DoesNotAbortRetrySweep()
{
// Defensive read path: a corrupt (non-null, non-GUID) parent_execution_id
// must be treated as "no parent execution id" rather than throwing
// FormatException — a single bad row must not abort the whole
// GetMessagesForRetryAsync sweep.
var goodParent = Guid.NewGuid();
var good = CreateMessage("pgood1", StoreAndForwardCategory.ExternalSystem);
good.ParentExecutionId = goodParent;
good.LastAttemptAt = null; // due immediately
await _storage.EnqueueAsync(good);
var bad = CreateMessage("pbad1", StoreAndForwardCategory.ExternalSystem);
bad.ParentExecutionId = Guid.NewGuid();
bad.LastAttemptAt = null; // due immediately
await _storage.EnqueueAsync(bad);
await using (var conn = new SqliteConnection($"Data Source={_dbName};Mode=Memory;Cache=Shared"))
{
await conn.OpenAsync();
await using var corrupt = conn.CreateCommand();
corrupt.CommandText =
"UPDATE sf_messages SET parent_execution_id = 'not-a-guid' WHERE id = 'pbad1';";
await corrupt.ExecuteNonQueryAsync();
}
var due = await _storage.GetMessagesForRetryAsync();
Assert.Null(Assert.Single(due, m => m.Id == "pbad1").ParentExecutionId);
Assert.Equal(goodParent, Assert.Single(due, m => m.Id == "pgood1").ParentExecutionId);
var retrieved = await _storage.GetMessageByIdAsync("pbad1");
Assert.NotNull(retrieved);
Assert.Null(retrieved!.ParentExecutionId);
}
private static StoreAndForwardMessage CreateMessage(string id, StoreAndForwardCategory category)
{
return new StoreAndForwardMessage