[F54 test] end-to-end smoke: write_with_handle ↔ callback_router boundary
rust / build / test / clippy / fmt (push) Has been cancelled
rust / cargo public-api drift check (F41) (push) Has been cancelled

Adds `write_handle_correlates_with_router_emitted_status` — the
closest-to-live test we can write without an AVEVA endpoint, pinning
the F54 boundary the C# `OnWriteComplete` callback ultimately depends
on.

The existing tests cover the layers individually:
- `write_value_with_handle_inserts_into_pending_ops` — write API
  populates pending_ops with the right correlation id.
- `router_populates_operation_status_context_from_pending_ops_fifo`
  — callback_router consumes a frame + the registry, emits a typed
  OperationStatus with context attached.
- `drain_routes_write_status_to_on_write_complete` (mxaccess-compat)
  — drain function routes Write op_kind to on_write_complete_tx.

What was missing: a test that combines the public `write_value_with_
handle` API with a real callback_router invocation against the SAME
`pending_ops` Arc the write populated. The new test:

1. Builds a Session via `connect_test_session`.
2. Calls `session.write_value_with_handle("TestObj.TestInt", ...)` —
   gets a real `WriteHandle { correlation_id }` and a real entry in
   `pending_ops` (no manual insertion).
3. Spins a parallel `callback_router` over the SESSION's
   `pending_ops` Arc + a fake event_tx (the live exporter's
   internal channel isn't reachable from tests; this is the
   established workaround pattern from
   `router_task_decodes_callback_invoked_into_broadcast`).
4. Injects the proven `WRITE_COMPLETE_OK` 5-byte frame.
5. Asserts the emitted `OperationStatus.context.correlation_id`
   equals the cid the write returned, that op_kind is Write, that
   reference is the original tag string, and that
   `pending_ops` is now empty (one-shot popped).

This closes the integration-test gap the user flagged. Live AVEVA
verification still falls under F49.

Workspace 823 → 824 tests; clippy + rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 07:57:15 -04:00
parent 4ff511bbed
commit 04c10babfb
+100
View File
@@ -3992,6 +3992,106 @@ mod tests {
handle.await.unwrap();
}
/// F54 — end-to-end smoke test that combines the existing
/// `write_value_with_handle_inserts_into_pending_ops` test with
/// the existing `router_populates_operation_status_context_from_pending_ops_fifo`
/// test: issue a REAL write through the public API, then drive a
/// REAL `WRITE_COMPLETE_OK` frame through the SAME `pending_ops`
/// the write populated, and confirm the resulting
/// [`OperationStatus`] carries the correlation id the write
/// returned. This is the closest thing to a live verification we
/// can do without an AVEVA endpoint — it pins the
/// `Session::write_value_with_handle` ↔ `callback_router`
/// integration boundary that the .NET-style consumer-facing
/// `OnWriteComplete` event ultimately depends on.
#[tokio::test]
async fn write_handle_correlates_with_router_emitted_status() {
// 1. Build a Session via the test harness (no live wire calls
// on the upstream NMX server are needed beyond the existing
// `unauthenticated_server` infrastructure).
let (addr, server_handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
// 2. Issue the write — get the correlation id back from the
// public handle-returning API. This populates pending_ops
// naturally (no manual insertion).
let write_handle = session
.write_value_with_handle("TestObj.TestInt", WriteValue::Int32(42))
.await
.unwrap();
let cid = write_handle.correlation_id;
// Sanity check the entry landed.
{
let guard = session.inner.pending_ops.lock().await;
assert_eq!(guard.len(), 1, "write should leave one outstanding op");
let ctx = guard.by_id.get(&cid).expect("entry under returned cid");
assert_eq!(ctx.op_kind, OperationKind::Write);
}
// 3. Spin up a parallel callback_router pointing at the SAME
// pending_ops + operation_status_tx that the live exporter
// would feed. Inject a real `WRITE_COMPLETE_OK` frame.
// (We can't intercept the live exporter's internal
// event_tx, so we run the router function directly with
// a fake event source — but with the production
// `pending_ops` Arc so the FIFO-pop sees the real entry.)
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let (callback_tx, _callback_rx) = broadcast::channel(8);
let (operation_status_tx, mut operation_status_rx) =
broadcast::channel::<Arc<OperationStatus>>(8);
let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0));
let pending_ops_arc = Arc::clone(&session.inner.pending_ops);
let router_h = tokio::spawn(callback_router(
event_rx,
callback_tx,
operation_status_tx,
recovery_active,
pending_ops_arc,
));
// 4. Drive the proven 5-byte WRITE_COMPLETE_OK frame.
let inner = [0x00, 0x00, 0x50, 0x80, 0x00];
let body = wrap_op_status_envelope(&inner);
event_tx
.send(CallbackEvent::CallbackInvoked { opnum: 4, body })
.unwrap();
// 5. Receive on the operation_status broadcast and assert the
// context was populated from the pending_ops entry the
// write API created. This is the F54 contract end-to-end:
// write API → pending_ops → router → typed OperationStatus
// with context populated → consumer-visible event.
let event = tokio::time::timeout(
std::time::Duration::from_secs(1),
operation_status_rx.recv(),
)
.await
.expect("router timed out")
.expect("broadcast recv error");
let ctx = event.context.clone().expect("context populated by router");
assert_eq!(ctx.correlation_id, cid, "router uses the same cid the write returned");
assert_eq!(ctx.op_kind, OperationKind::Write);
assert_eq!(ctx.reference.as_deref(), Some("TestObj.TestInt"));
assert_eq!(event.status, MxStatus::WRITE_COMPLETE_OK);
// F54 one-shot: pending_ops is now empty.
{
let guard = session.inner.pending_ops.lock().await;
assert!(guard.by_id.is_empty(), "router popped the entry");
}
drop(event_tx);
let _ = router_h.await;
server_handle.await.unwrap();
}
/// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for
/// buffered subscriptions, mirroring the .NET reference's
/// `if (!subscription.IsBuffered)` guard at