From 04c10babfb2ccc6d07a1fcc71d190aa9a6bf522a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 6 May 2026 07:57:15 -0400 Subject: [PATCH] =?UTF-8?q?[F54=20test]=20end-to-end=20smoke:=20write=5Fwi?= =?UTF-8?q?th=5Fhandle=20=E2=86=94=20callback=5Frouter=20boundary?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `write_handle_correlates_with_router_emitted_status` — the closest-to-live test we can write without an AVEVA endpoint, pinning the F54 boundary the C# `OnWriteComplete` callback ultimately depends on. The existing tests cover the layers individually: - `write_value_with_handle_inserts_into_pending_ops` — write API populates pending_ops with the right correlation id. - `router_populates_operation_status_context_from_pending_ops_fifo` — callback_router consumes a frame + the registry, emits a typed OperationStatus with context attached. - `drain_routes_write_status_to_on_write_complete` (mxaccess-compat) — drain function routes Write op_kind to on_write_complete_tx. What was missing: a test that combines the public `write_value_with_ handle` API with a real callback_router invocation against the SAME `pending_ops` Arc the write populated. The new test: 1. Builds a Session via `connect_test_session`. 2. Calls `session.write_value_with_handle("TestObj.TestInt", ...)` — gets a real `WriteHandle { correlation_id }` and a real entry in `pending_ops` (no manual insertion). 3. Spins a parallel `callback_router` over the SESSION's `pending_ops` Arc + a fake event_tx (the live exporter's internal channel isn't reachable from tests; this is the established workaround pattern from `router_task_decodes_callback_invoked_into_broadcast`). 4. Injects the proven `WRITE_COMPLETE_OK` 5-byte frame. 5. Asserts the emitted `OperationStatus.context.correlation_id` equals the cid the write returned, that op_kind is Write, that reference is the original tag string, and that `pending_ops` is now empty (one-shot popped). This closes the integration-test gap the user flagged. Live AVEVA verification still falls under F49. Workspace 823 → 824 tests; clippy + rustdoc clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/crates/mxaccess/src/session.rs | 100 ++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index e0a29a8..e93f817 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -3992,6 +3992,106 @@ mod tests { handle.await.unwrap(); } + /// F54 — end-to-end smoke test that combines the existing + /// `write_value_with_handle_inserts_into_pending_ops` test with + /// the existing `router_populates_operation_status_context_from_pending_ops_fifo` + /// test: issue a REAL write through the public API, then drive a + /// REAL `WRITE_COMPLETE_OK` frame through the SAME `pending_ops` + /// the write populated, and confirm the resulting + /// [`OperationStatus`] carries the correlation id the write + /// returned. This is the closest thing to a live verification we + /// can do without an AVEVA endpoint — it pins the + /// `Session::write_value_with_handle` ↔ `callback_router` + /// integration boundary that the .NET-style consumer-facing + /// `OnWriteComplete` event ultimately depends on. + #[tokio::test] + async fn write_handle_correlates_with_router_emitted_status() { + // 1. Build a Session via the test harness (no live wire calls + // on the upstream NMX server are needed beyond the existing + // `unauthenticated_server` infrastructure). + let (addr, server_handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // 2. Issue the write — get the correlation id back from the + // public handle-returning API. This populates pending_ops + // naturally (no manual insertion). + let write_handle = session + .write_value_with_handle("TestObj.TestInt", WriteValue::Int32(42)) + .await + .unwrap(); + let cid = write_handle.correlation_id; + + // Sanity check the entry landed. + { + let guard = session.inner.pending_ops.lock().await; + assert_eq!(guard.len(), 1, "write should leave one outstanding op"); + let ctx = guard.by_id.get(&cid).expect("entry under returned cid"); + assert_eq!(ctx.op_kind, OperationKind::Write); + } + + // 3. Spin up a parallel callback_router pointing at the SAME + // pending_ops + operation_status_tx that the live exporter + // would feed. Inject a real `WRITE_COMPLETE_OK` frame. + // (We can't intercept the live exporter's internal + // event_tx, so we run the router function directly with + // a fake event source — but with the production + // `pending_ops` Arc so the FIFO-pop sees the real entry.) + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel(); + let (callback_tx, _callback_rx) = broadcast::channel(8); + let (operation_status_tx, mut operation_status_rx) = + broadcast::channel::>(8); + let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let pending_ops_arc = Arc::clone(&session.inner.pending_ops); + + let router_h = tokio::spawn(callback_router( + event_rx, + callback_tx, + operation_status_tx, + recovery_active, + pending_ops_arc, + )); + + // 4. Drive the proven 5-byte WRITE_COMPLETE_OK frame. + let inner = [0x00, 0x00, 0x50, 0x80, 0x00]; + let body = wrap_op_status_envelope(&inner); + event_tx + .send(CallbackEvent::CallbackInvoked { opnum: 4, body }) + .unwrap(); + + // 5. Receive on the operation_status broadcast and assert the + // context was populated from the pending_ops entry the + // write API created. This is the F54 contract end-to-end: + // write API → pending_ops → router → typed OperationStatus + // with context populated → consumer-visible event. + let event = tokio::time::timeout( + std::time::Duration::from_secs(1), + operation_status_rx.recv(), + ) + .await + .expect("router timed out") + .expect("broadcast recv error"); + + let ctx = event.context.clone().expect("context populated by router"); + assert_eq!(ctx.correlation_id, cid, "router uses the same cid the write returned"); + assert_eq!(ctx.op_kind, OperationKind::Write); + assert_eq!(ctx.reference.as_deref(), Some("TestObj.TestInt")); + assert_eq!(event.status, MxStatus::WRITE_COMPLETE_OK); + + // F54 one-shot: pending_ops is now empty. + { + let guard = session.inner.pending_ops.lock().await; + assert!(guard.by_id.is_empty(), "router popped the entry"); + } + + drop(event_tx); + let _ = router_h.await; + server_handle.await.unwrap(); + } + /// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for /// buffered subscriptions, mirroring the .NET reference's /// `if (!subscription.IsBuffered)` guard at