diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index e0a29a8..e93f817 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -3992,6 +3992,106 @@ mod tests { handle.await.unwrap(); } + /// F54 — end-to-end smoke test that combines the existing + /// `write_value_with_handle_inserts_into_pending_ops` test with + /// the existing `router_populates_operation_status_context_from_pending_ops_fifo` + /// test: issue a REAL write through the public API, then drive a + /// REAL `WRITE_COMPLETE_OK` frame through the SAME `pending_ops` + /// the write populated, and confirm the resulting + /// [`OperationStatus`] carries the correlation id the write + /// returned. This is the closest thing to a live verification we + /// can do without an AVEVA endpoint — it pins the + /// `Session::write_value_with_handle` ↔ `callback_router` + /// integration boundary that the .NET-style consumer-facing + /// `OnWriteComplete` event ultimately depends on. + #[tokio::test] + async fn write_handle_correlates_with_router_emitted_status() { + // 1. Build a Session via the test harness (no live wire calls + // on the upstream NMX server are needed beyond the existing + // `unauthenticated_server` infrastructure). + let (addr, server_handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // 2. Issue the write — get the correlation id back from the + // public handle-returning API. This populates pending_ops + // naturally (no manual insertion). + let write_handle = session + .write_value_with_handle("TestObj.TestInt", WriteValue::Int32(42)) + .await + .unwrap(); + let cid = write_handle.correlation_id; + + // Sanity check the entry landed. + { + let guard = session.inner.pending_ops.lock().await; + assert_eq!(guard.len(), 1, "write should leave one outstanding op"); + let ctx = guard.by_id.get(&cid).expect("entry under returned cid"); + assert_eq!(ctx.op_kind, OperationKind::Write); + } + + // 3. Spin up a parallel callback_router pointing at the SAME + // pending_ops + operation_status_tx that the live exporter + // would feed. Inject a real `WRITE_COMPLETE_OK` frame. + // (We can't intercept the live exporter's internal + // event_tx, so we run the router function directly with + // a fake event source — but with the production + // `pending_ops` Arc so the FIFO-pop sees the real entry.) + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel(); + let (callback_tx, _callback_rx) = broadcast::channel(8); + let (operation_status_tx, mut operation_status_rx) = + broadcast::channel::>(8); + let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let pending_ops_arc = Arc::clone(&session.inner.pending_ops); + + let router_h = tokio::spawn(callback_router( + event_rx, + callback_tx, + operation_status_tx, + recovery_active, + pending_ops_arc, + )); + + // 4. Drive the proven 5-byte WRITE_COMPLETE_OK frame. + let inner = [0x00, 0x00, 0x50, 0x80, 0x00]; + let body = wrap_op_status_envelope(&inner); + event_tx + .send(CallbackEvent::CallbackInvoked { opnum: 4, body }) + .unwrap(); + + // 5. Receive on the operation_status broadcast and assert the + // context was populated from the pending_ops entry the + // write API created. This is the F54 contract end-to-end: + // write API → pending_ops → router → typed OperationStatus + // with context populated → consumer-visible event. + let event = tokio::time::timeout( + std::time::Duration::from_secs(1), + operation_status_rx.recv(), + ) + .await + .expect("router timed out") + .expect("broadcast recv error"); + + let ctx = event.context.clone().expect("context populated by router"); + assert_eq!(ctx.correlation_id, cid, "router uses the same cid the write returned"); + assert_eq!(ctx.op_kind, OperationKind::Write); + assert_eq!(ctx.reference.as_deref(), Some("TestObj.TestInt")); + assert_eq!(event.status, MxStatus::WRITE_COMPLETE_OK); + + // F54 one-shot: pending_ops is now empty. + { + let guard = session.inner.pending_ops.lock().await; + assert!(guard.by_id.is_empty(), "router popped the entry"); + } + + drop(event_tx); + let _ = router_h.await; + server_handle.await.unwrap(); + } + /// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for /// buffered subscriptions, mirroring the .NET reference's /// `if (!subscription.IsBuffered)` guard at