feat(auditlog): thread ExecutionId through S&F for retry-loop cached rows

The store-and-forward retry loop emits the per-attempt and terminal cached
audit rows (ApiCallCached/DbWriteCached Attempted, CachedResolve) via
CachedCallLifecycleBridge from a CachedCallAttemptContext, not from the
script context. ExecutionId (and SourceScript) were not threaded through the
S&F buffer, so those rows had ExecutionId = null and SourceScript = null.

Thread both, additively, from the cached-call enqueue path:

- StoreAndForwardMessage gains ExecutionId (Guid?) / SourceScript (string?).
- StoreAndForwardStorage adds nullable execution_id / source_script columns
  via an idempotent PRAGMA-probed ALTER TABLE migration; rows persisted by
  an older build read back null (back-compat).
- StoreAndForwardService.EnqueueAsync gains optional executionId /
  sourceScript params, stamped onto the buffered message and surfaced on the
  CachedCallAttemptContext built in the retry loop.
- CachedCallAttemptContext gains ExecutionId / SourceScript.
- CachedCallLifecycleBridge.BuildPacket sets AuditEvent.ExecutionId and
  AuditEvent.SourceScript from the context (replacing the hard-coded
  SourceScript = null and its now-stale comment).
- IExternalSystemClient.CachedCallAsync / IDatabaseGateway.CachedWriteAsync
  gain optional executionId / sourceScript params; ScriptRuntimeContext's
  CachedCall / CachedWrite helpers pass _executionId / _sourceScript.

Script-side cached rows (CachedSubmit, immediate Attempted+Resolve) are
unchanged. All threading is additive — old buffered S&F rows still
deserialize and process with the new fields null.
This commit is contained in:
Joseph Doherty
2026-05-21 15:18:35 -04:00
parent 0149ce6180
commit 6f5a35f222
15 changed files with 505 additions and 40 deletions

View File

@@ -277,6 +277,86 @@ public class CachedCallAttemptEmissionTests : IAsyncLifetime, IDisposable
Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId);
}
// ── Audit Log #23 (ExecutionId Task 4): ExecutionId / SourceScript ──
[Fact]
public async Task Attempt_CarriesExecutionIdAndSourceScript_FromBufferedMessage()
{
// A buffered cached call carries the originating script execution's
// ExecutionId + SourceScript. The retry sweep must surface both on the
// CachedCallAttemptContext handed to the observer so the audit bridge
// can stamp them on the retry-loop cached rows.
var executionId = Guid.NewGuid();
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("HTTP 503"));
var trackedId = TrackedOperationId.New();
await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem,
"ERP",
"""{"payload":"x"}""",
originInstanceName: "Plant.Pump42",
maxRetries: 5,
retryInterval: TimeSpan.Zero,
attemptImmediateDelivery: false,
messageId: trackedId.ToString(),
executionId: executionId,
sourceScript: "Plant.Pump42/OnTick");
await _service.RetryPendingMessagesAsync();
var notification = Assert.Single(_observer.Notifications);
Assert.Equal(executionId, notification.ExecutionId);
Assert.Equal("Plant.Pump42/OnTick", notification.SourceScript);
}
[Fact]
public async Task Attempt_NullExecutionIdAndSourceScript_SurfaceAsNull()
{
// Back-compat: a row buffered without ExecutionId / SourceScript (legacy
// enqueue path) must surface them as null on the context, not throw.
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => Task.FromResult(true));
var trackedId = await EnqueueBufferedAsync(
StoreAndForwardCategory.ExternalSystem, "ERP");
await _service.RetryPendingMessagesAsync();
var notification = Assert.Single(_observer.Notifications);
Assert.Null(notification.ExecutionId);
Assert.Null(notification.SourceScript);
}
[Fact]
public async Task TerminalResolve_CarriesExecutionIdAndSourceScript()
{
// The terminal Delivered notification must also carry the threaded
// provenance so the CachedResolve audit row is correlated.
var executionId = Guid.NewGuid();
_service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite,
_ => Task.FromResult(true));
var trackedId = TrackedOperationId.New();
await _service.EnqueueAsync(
StoreAndForwardCategory.CachedDbWrite,
"myDb",
"""{"payload":"x"}""",
originInstanceName: "Plant.Tank",
maxRetries: 3,
retryInterval: TimeSpan.Zero,
attemptImmediateDelivery: false,
messageId: trackedId.ToString(),
executionId: executionId,
sourceScript: "Plant.Tank/OnAlarm");
await _service.RetryPendingMessagesAsync();
var notification = Assert.Single(_observer.Notifications);
Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome);
Assert.Equal(executionId, notification.ExecutionId);
Assert.Equal("Plant.Tank/OnAlarm", notification.SourceScript);
}
// ── Best-effort contract: observer throws must NOT corrupt retry bookkeeping ──
[Fact]

View File

@@ -293,6 +293,125 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable
Assert.True(count >= 1);
}
// ── Audit Log #23 (ExecutionId Task 4): execution_id / source_script ──
[Fact]
public async Task EnqueueAsync_RoundTripsExecutionIdAndSourceScript()
{
// A cached call buffered on a transient failure carries the originating
// script execution's ExecutionId + SourceScript; both must survive a
// persist + read-back so the retry loop can stamp them on audit rows.
var executionId = Guid.NewGuid();
var message = CreateMessage("exec1", StoreAndForwardCategory.ExternalSystem);
message.ExecutionId = executionId;
message.SourceScript = "Plant.Pump42/OnTick";
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("exec1");
Assert.NotNull(retrieved);
Assert.Equal(executionId, retrieved!.ExecutionId);
Assert.Equal("Plant.Pump42/OnTick", retrieved.SourceScript);
}
[Fact]
public async Task EnqueueAsync_NullExecutionIdAndSourceScript_RoundTripAsNull()
{
// Non-cached-call enqueues (notifications) supply neither field — they
// must round-trip as null rather than throwing or coercing.
var message = CreateMessage("noexec1", StoreAndForwardCategory.Notification);
Assert.Null(message.ExecutionId);
Assert.Null(message.SourceScript);
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("noexec1");
Assert.NotNull(retrieved);
Assert.Null(retrieved!.ExecutionId);
Assert.Null(retrieved.SourceScript);
}
[Fact]
public async Task ExecutionIdAndSourceScript_SurviveRetrySweepRead()
{
// The retry sweep reads due rows via GetMessagesForRetryAsync; the new
// fields must be present on that read path too (it is the path that
// feeds the CachedCallAttemptContext).
var executionId = Guid.NewGuid();
var message = CreateMessage("sweep1", StoreAndForwardCategory.CachedDbWrite);
message.ExecutionId = executionId;
message.SourceScript = "Plant.Tank/OnAlarm";
message.LastAttemptAt = null; // due immediately
await _storage.EnqueueAsync(message);
var due = await _storage.GetMessagesForRetryAsync();
var row = Assert.Single(due, m => m.Id == "sweep1");
Assert.Equal(executionId, row.ExecutionId);
Assert.Equal("Plant.Tank/OnAlarm", row.SourceScript);
}
[Fact]
public async Task LegacyRowWithoutNewColumns_ReadsBackAsNull()
{
// Back-compat: a row persisted by a build that pre-dates the
// execution_id / source_script columns must still deserialize, with
// ExecutionId / SourceScript reading back as null. Simulate the legacy
// schema by dropping the table and recreating it without the columns,
// inserting directly, then running InitializeAsync (which ALTER-adds
// the columns) and reading the row back.
await using (var setup = new SqliteConnection($"Data Source={_dbName};Mode=Memory;Cache=Shared"))
{
await setup.OpenAsync();
await using var drop = setup.CreateCommand();
drop.CommandText = @"
DROP TABLE IF EXISTS sf_messages;
CREATE TABLE sf_messages (
id TEXT PRIMARY KEY,
category INTEGER NOT NULL,
target TEXT NOT NULL,
payload_json TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 50,
retry_interval_ms INTEGER NOT NULL DEFAULT 30000,
created_at TEXT NOT NULL,
last_attempt_at TEXT,
status INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
origin_instance TEXT
);
INSERT INTO sf_messages (id, category, target, payload_json, created_at, status)
VALUES ('legacy1', 0, 'ERP', '{}', '2026-01-01T00:00:00.0000000+00:00', 0);";
await drop.ExecuteNonQueryAsync();
}
// InitializeAsync must additively ALTER-in the new columns without
// disturbing the pre-existing legacy row.
await _storage.InitializeAsync();
var retrieved = await _storage.GetMessageByIdAsync("legacy1");
Assert.NotNull(retrieved);
Assert.Equal("legacy1", retrieved!.Id);
Assert.Null(retrieved.ExecutionId);
Assert.Null(retrieved.SourceScript);
}
[Fact]
public async Task InitializeAsync_IsIdempotent_WhenColumnsAlreadyExist()
{
// The additive ALTER must not fail on a second InitializeAsync call
// (SQLite has no ADD COLUMN IF NOT EXISTS — the probe must skip it).
await _storage.InitializeAsync();
await _storage.InitializeAsync();
var message = CreateMessage("idem1", StoreAndForwardCategory.ExternalSystem);
message.ExecutionId = Guid.NewGuid();
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("idem1");
Assert.NotNull(retrieved);
Assert.Equal(message.ExecutionId, retrieved!.ExecutionId);
}
private static StoreAndForwardMessage CreateMessage(string id, StoreAndForwardCategory category)
{
return new StoreAndForwardMessage