[F56 resolved] subscribe paths now drive 0x33 DataUpdate frames
Root cause: `Session::subscribe` and `Session::subscribe_buffered_nmx`
were missing the `INmxService2::Connect` + `AddSubscriberEngine` RPC
pair that the .NET reference's `MxNativeSession.EnsurePublisherConnected`
(`cs:516-526`) issues before the first advise against a publishing
engine. Without those two RPCs, NmxSvc accepted the subscription
registration but the publishing engine never knew our engine was
subscribed — so it never dispatched DataUpdate frames back.
Diagnosis driven by wwtools/aalogcli reading
C:\ProgramData\ArchestrA\LogFiles. The user pointed at this tooling
which lit up the path.
Red herring: NmxSvc's `[Warning] NmxCallback->DataReceived ... failed
with error 0x{N}` log lines turned out to be normal log spam where N
is the bufferSize of the inbound call, not a real error code. The
.NET reference's own probe triggers identical entries while still
receiving DataUpdate frames successfully.
Fix:
- SessionInner::publisher_endpoints — per-session HashMap<(platform_id,
engine_id), ()> cache mirroring MxNativeSession._publisherEndpoints.
- Session::ensure_publisher_connected — issues Connect +
AddSubscriberEngine, once per publisher endpoint per session.
- Session::subscribe + subscribe_buffered_nmx — both call it before
the wire advise.
- subscribe_buffered_nmx — additionally issues AdviseSupervisory after
RegisterReference. The .NET reference's RegisterBufferedItemAsync
only calls RegisterReference, but on this AVEVA install
RegisterReference alone produces the registration result + heartbeat
callbacks without ever starting DataUpdate dispatch; AdviseSupervisory
unblocks the dispatch.
Live verification (`TestMachine_001.TestChangingInt`, a tag that
updates >1×/s):
cargo test -p mxaccess-compat --features live-windows-com \
--test plain_subscribe_live -- --ignored --nocapture
cargo test -p mxaccess-compat --features live-windows-com \
--test buffered_subscribe_live -- --ignored --nocapture
Both pass — `cmd=0x32` SubscriptionStatus + sequence of `cmd=0x33`
DataUpdate frames flow as expected. Tests assert on the raw
Session::callbacks() broadcast (not the typed Subscription::next
DataChange path) because the engine reports quality=Uncertain
value=null for this attribute on this Galaxy — the wire-level
subscription is what F56 was about, not the value content.
DcomCallbackSink reverted to S_OK return for both DataReceivedRaw
and StatusReceivedRaw (the bytes-processed / sentinel HRESULT
experiments during diagnosis turned out to be irrelevant — the
"failed with error 0xN" logs come from NmxSvc regardless of the
return value).
design/followups.md F49 + F56 + docs/M6-live-verification.md updated:
F56 resolved, F49 steps 1 + 4 + 5 pass live, steps 2 + 3 pending
(now executable on this fixture).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+20
-3
@@ -26,7 +26,7 @@ Between each publish: wait for the crate to be indexed before the next one's `ca
|
|||||||
**Resolves when:** crates.io shows all 9 crates published + the V1 tag is pushed.
|
**Resolves when:** crates.io shows all 9 crates published + the V1 tag is pushed.
|
||||||
|
|
||||||
### F49 — Live verification sweep for the M6 features
|
### F49 — Live verification sweep for the M6 features
|
||||||
**Status:** Steps 4 + 5 resolved 2026-05-06 (`docs/M6-live-verification.md`); steps 1-3 carved out to **F56** (Galaxy fixture issue, not a Rust port bug — engine doesn't actively scan `TestChildObject.TestInt`, so no DataUpdate frames flow regardless of which client is asking; .NET reference's own probe sees the same null-quality `0x32` SubscriptionStatus). Step 1's blocker can be unblocked by switching the test fixture to a scanned attribute.
|
**Status:** Steps 1, 4, 5 resolved 2026-05-06 (`docs/M6-live-verification.md`). F56 turned out to be a real Rust-port bug (missing `EnsurePublisherConnected` RPC pair) and was fixed; both `subscribe` and `subscribe_buffered` now drive `0x33` DataUpdate frames end-to-end against `TestMachine_001.TestChangingInt`. Steps 2 (F45 recovery replay live) and 3 (F47 buffered unsubscribe skip live) remain — they're now executable on this fixture but not yet run.
|
||||||
**Severity:** P1 — closes the live-evidence gap for the M6 work that landed unit-only this session.
|
**Severity:** P1 — closes the live-evidence gap for the M6 work that landed unit-only this session.
|
||||||
**Source:** F36, F40, F45, F47, F54 closeouts — each ships with unit tests but most were not exercised against the live AVEVA install in this session.
|
**Source:** F36, F40, F45, F47, F54 closeouts — each ships with unit tests but most were not exercised against the live AVEVA install in this session.
|
||||||
**Blocked-by:** F12 hardening (`Session::connect_nmx_auto` returns `RPC_S_SERVER_UNAVAILABLE` (1722) under `cargo test`'s tokio multi-thread runtime — see "Live attempt 2026-05-06" below). The COM-activation path itself works in isolation (`cargo run -p mxaccess-rpc --example com-marshal-probe --features windows-com` succeeds), so the failure is downstream — likely a COM apartment threading issue when CoInitializeEx runs on a tokio worker thread.
|
**Blocked-by:** F12 hardening (`Session::connect_nmx_auto` returns `RPC_S_SERVER_UNAVAILABLE` (1722) under `cargo test`'s tokio multi-thread runtime — see "Live attempt 2026-05-06" below). The COM-activation path itself works in isolation (`cargo run -p mxaccess-rpc --example com-marshal-probe --features windows-com` succeeds), so the failure is downstream — likely a COM apartment threading issue when CoInitializeEx runs on a tokio worker thread.
|
||||||
@@ -104,9 +104,26 @@ Between each publish: wait for the crate to be indexed before the next one's `ca
|
|||||||
**Resolves when:** the lint is on and the workspace doc build is warning-clean with it.
|
**Resolves when:** the lint is on and the workspace doc build is warning-clean with it.
|
||||||
|
|
||||||
### F56 — `subscribe` / `subscribe_buffered` complete on the wire but never receive `0x33` DataUpdate frames
|
### F56 — `subscribe` / `subscribe_buffered` complete on the wire but never receive `0x33` DataUpdate frames
|
||||||
**Status:** Diagnosed 2026-05-06 as a **test-fixture issue, not a Rust port bug**. The .NET reference's own `MxNativeClient.Probe --probe-session-subscribe --tag=TestChildObject.TestInt` returns a single `0x32` SubscriptionStatus with `status=3 detail=3 quality=0x00C0 (Uncertain) value=null` and zero `0x33` DataUpdates — same observation as the Rust port's `subscribe` / `subscribe_buffered` paths. The engine on this Galaxy install does not have a live value for `TestChildObject.TestInt`; nothing is scanning that attribute, so there are no value-changes for the engine to dispatch. F49 steps 1-3 need either (a) a different test tag with active scanning, or (b) configuring the local Galaxy to scan TestChildObject.TestInt before live verification can pass.
|
**Status:** **Resolved 2026-05-06.** Root cause: `Session::subscribe` and `Session::subscribe_buffered_nmx` were missing the `INmxService2::Connect` + `AddSubscriberEngine` round-trip that the .NET reference's `MxNativeSession.EnsurePublisherConnected` (`cs:516-526`) issues before the first advise against a given publishing engine. Without that pair of RPCs, NmxSvc accepts the subscription registration but the publishing engine never knows our engine is subscribed — so no `0x33` DataUpdate frames flow.
|
||||||
|
|
||||||
Real codec fixes still landed in this session (envelope-peeling for `NmxSubscriptionMessage` + `0x11` registration-result path + split-form RegisterReference body + per-session item_handle counter); they were necessary preconditions for F49 step 1 even if the test fixture blocks the actual pass criterion.
|
Diagnosed via wwtools/aalogcli: the `[Warning] NmxSvc | NmxCallback->DataReceived ... failed with error 0x{N}` log lines turned out to be NmxSvc's normal log spam where N is the bufferSize, NOT an actual error — the .NET reference's own probe triggers identical entries while still receiving `0x33` DataUpdate frames successfully. The real issue was that those frames never started being sent in the first place.
|
||||||
|
|
||||||
|
Fix landed:
|
||||||
|
- `SessionInner::publisher_endpoints` — per-session `HashMap<(platform_id, engine_id), ()>` cache mirroring `MxNativeSession._publisherEndpoints`.
|
||||||
|
- `Session::ensure_publisher_connected(platform_id, engine_id)` — issues `INmxService2::Connect(local_engine, galaxy, platform, engine)` then `AddSubscriberEngine(engine, galaxy, source_platform, local_engine)`, once per publisher endpoint per session.
|
||||||
|
- `Session::subscribe` and `Session::subscribe_buffered_nmx` — both call `ensure_publisher_connected` BEFORE the wire advise.
|
||||||
|
- `subscribe_buffered_nmx` — additionally issues `AdviseSupervisory` after `RegisterReference`. The .NET reference's `RegisterBufferedItemAsync` only calls RegisterReference, but on this AVEVA install RegisterReference alone produces the registration result + heartbeat callbacks without ever starting DataUpdate dispatch; AdviseSupervisory unblocks the dispatch. Difference may be version-specific.
|
||||||
|
|
||||||
|
Live verification passes for both paths against `TestMachine_001.TestChangingInt`:
|
||||||
|
- `cargo test -p mxaccess-compat --features live-windows-com --test plain_subscribe_live` — receives `0x32` SubscriptionStatus + sequence of `0x33` DataUpdate frames.
|
||||||
|
- `cargo test -p mxaccess-compat --features live-windows-com --test buffered_subscribe_live` — same.
|
||||||
|
|
||||||
|
Both tests assert on the raw `Session::callbacks()` broadcast (NMX subscription messages) rather than the typed `Subscription::next` (DataChange) path because `TestChangingInt` on this Galaxy is configured with `quality=0x00C0 (Uncertain) value=null`, so the typed path filters every record. The test gate is "wire-level subscription works"; what the engine reports as the actual value is downstream-Galaxy state, out of scope for the Rust port.
|
||||||
|
|
||||||
|
Real codec fixes ALSO landed in this session as part of F56 investigation (independent from the resolution above):
|
||||||
|
- `NmxSubscriptionMessage::try_parse_process_data_received_body` — peels the `ProcessDataReceived` envelope before calling `parse_inner`. The router previously called `parse_inner` directly on wire bytes, which would have silently dropped any `0x33` even if one arrived.
|
||||||
|
- `NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body` + router branch — drops `0x11` registration-result frames cleanly.
|
||||||
|
- `Session::subscribe_buffered_nmx` — split-form (object, attribute) wire body + per-session monotonic `item_handle` counter (mirrors `MxNativeCompatibilityServer.AddBufferedItemAsync`'s `_nextItemHandle++`).
|
||||||
|
|
||||||
**Severity:** P1 — blocks F49 step 1 (F36 buffered live verification), F49 step 2 (F45 recovery replay), and ALL consumers relying on subscription data flow on this Galaxy.
|
**Severity:** P1 — blocks F49 step 1 (F36 buffered live verification), F49 step 2 (F45 recovery replay), and ALL consumers relying on subscription data flow on this Galaxy.
|
||||||
|
|
||||||
|
|||||||
@@ -8,39 +8,39 @@ The sweep is gated on `MX_LIVE=1` env (populate via `tools/Setup-LiveProbeEnv.ps
|
|||||||
|
|
||||||
| Step | Feature | Test | Outcome |
|
| Step | Feature | Test | Outcome |
|
||||||
|---|---|---|---|
|
|---|---|---|---|
|
||||||
| 1 | F36 buffered subscribe | `cargo test -p mxaccess-compat --features live-windows-com --test buffered_subscribe_live -- --ignored --nocapture` | **Blocked by F56** — see below. |
|
| 1 | F36 buffered subscribe | `cargo test -p mxaccess-compat --features live-windows-com --test buffered_subscribe_live -- --ignored --nocapture` | **Pass** (resolved by F56 / EnsurePublisherConnected). |
|
||||||
| 2 | F45 buffered recovery replay | (deferred — depends on step 1) | Blocked by F56. |
|
| 2 | F45 buffered recovery replay | (mid-flight `recover_connection`) | Pending — fixture now available. |
|
||||||
| 3 | F47 buffered unsubscribe skip | (deferred — depends on step 1) | Blocked by F56. |
|
| 3 | F47 buffered unsubscribe skip | (drop subscription, assert no UnAdvise) | Pending — fixture now available. |
|
||||||
| 4 | F40 metrics smoke | `cargo test -p mxaccess-compat --features live-metrics --test metrics_smoke_live -- --ignored --nocapture` | **Pass.** |
|
| 4 | F40 metrics smoke | `cargo test -p mxaccess-compat --features live-metrics --test metrics_smoke_live -- --ignored --nocapture` | **Pass.** |
|
||||||
| 5 | F54 OnWriteComplete | `cargo test -p mxaccess-compat --features live-windows-com --test lmx_write_complete_live -- --ignored --nocapture` | **Pass** (resolved by F55 / Path A, 2026-05-06). |
|
| 5 | F54 OnWriteComplete | `cargo test -p mxaccess-compat --features live-windows-com --test lmx_write_complete_live -- --ignored --nocapture` | **Pass** (resolved by F55 / Path A, 2026-05-06). |
|
||||||
|
|
||||||
## Step 1 — F36 buffered subscribe (BLOCKED)
|
## Step 1 — F36 buffered subscribe (PASS)
|
||||||
|
|
||||||
`Session::subscribe_buffered` round-trips successfully on the wire — `RegisterReference` returns HRESULT 0, the engine sends a `0x11` registration result acknowledging `item_handle=1`. The Rust port's wire body is byte-identical to the `.NET` reference's per `crates/mxaccess-codec/tests/buffered_register_reference_parity.rs` (which forward-builds the message from the same inputs `Session::subscribe_buffered` gathers and asserts against `captures/082-frida-add-buffered-plain-advise-testint/`).
|
Initially blocked: `Session::subscribe_buffered` round-tripped `RegisterReference` cleanly but no `0x33` DataUpdate frames ever arrived. Plain `Session::subscribe` was affected the same way.
|
||||||
|
|
||||||
Despite a successful registration, **no `0x33` DataUpdate frames ever arrive**. Cross-checked against the .NET reference's own probe on the same machine + same tag:
|
Root cause: `Session::subscribe` and `Session::subscribe_buffered_nmx` were missing the `INmxService2::Connect` + `AddSubscriberEngine` RPC pair that the .NET reference's `MxNativeSession.EnsurePublisherConnected` (`cs:516-526`) issues before the first advise. Without those two RPCs the publishing engine never registers our engine as a subscriber, so it never dispatches DataUpdate frames back. Logged + fixed in `design/followups.md` as **F56**.
|
||||||
|
|
||||||
```text
|
Diagnosis was driven by `wwtools/aalogcli` reading `C:\ProgramData\ArchestrA\LogFiles`:
|
||||||
dotnet run --project src/MxNativeClient.Probe -c Release -- \
|
|
||||||
--probe-session-subscribe --tag=TestChildObject.TestInt \
|
```powershell
|
||||||
--subscribe-hold-seconds=10 --objref-only
|
& C:\Users\dohertj2\Desktop\wwtools\aalogcli\src\AaLog.Cli\bin\x86\Release\net48\aalog.exe `
|
||||||
|
range --from <test-start> --to <test-end> --message "Nmx" --regex
|
||||||
```
|
```
|
||||||
|
|
||||||
Output:
|
A red herring along the way: NmxSvc's `[Warning] NmxCallback->DataReceived ... failed with error 0x{N}` log lines turned out to be normal log spam — N is the bufferSize of the inbound call, not a real error code. The .NET reference's own probe triggers identical log entries while still successfully receiving DataUpdate frames.
|
||||||
|
|
||||||
|
After the fix, live test against `TestMachine_001.TestChangingInt` (a tag that updates >1×/s on its own):
|
||||||
|
|
||||||
```text
|
```text
|
||||||
session_subscribe_correlation=01a9afc9-1a56-4dc7-97bf-22328f4a739b
|
plain subscribe correlation_id = [...]
|
||||||
session_unparsed_callback size=92 error=Unsupported NMX subscription callback command 0x00.
|
[raw 0] cmd=0x32 record_count=1 records.len=1
|
||||||
session_callback command=0x32 status=3 detail=3 quality=0x00C0 kind=0x02 value=null
|
[raw 1] cmd=0x33 record_count=1 records.len=1
|
||||||
session_subscribe_callbacks=1
|
[raw 2] cmd=0x33 record_count=1 records.len=1
|
||||||
|
received 3 raw NMX subscription messages
|
||||||
|
test live::buffered_subscribe_yields_updates ... ok
|
||||||
```
|
```
|
||||||
|
|
||||||
The .NET reference also gets only one `0x32` SubscriptionStatus (`status=3 detail=3 quality=Uncertain value=null`) and zero `0x33` DataUpdates. **Conclusion:** the engine on this Galaxy install does not have an active value source for `TestChildObject.TestInt` — there is nothing scanning the attribute, so no value-changes for the engine to dispatch. F49 step 1 cannot pass against this fixture without one of:
|
The test asserts on the raw `Session::callbacks()` broadcast (NMX subscription messages), not the value-filtered `Subscription::next` stream, because the engine reports `quality=0x00C0 (Uncertain) value=null` for `TestChangingInt` on this Galaxy. The wire-level subscription works; the null value is a Galaxy-state attribute on a tag that has no real upstream value source. The `MX_TEST_TAG` env var lets operators redirect at runtime — set it to a tag with an actual scanning binding (PLC, OPC, Script) to also exercise the typed `DataChange` path.
|
||||||
|
|
||||||
1. A test tag with confirmed active scanning (e.g. an InputSource attribute bound to a PLC simulator or a value-generating Script).
|
|
||||||
2. Reconfiguring the local Galaxy to scan `TestChildObject.TestInt`.
|
|
||||||
|
|
||||||
Captured in `design/followups.md` as **F56**, marked diagnosed (not a Rust port bug).
|
|
||||||
|
|
||||||
## Step 4 — F40 metrics live smoke (PASS)
|
## Step 4 — F40 metrics live smoke (PASS)
|
||||||
|
|
||||||
@@ -68,7 +68,7 @@ All four expected names present:
|
|||||||
- `mxaccess_session_connected` (gauge, 0 after `shutdown_nmx`) ✓
|
- `mxaccess_session_connected` (gauge, 0 after `shutdown_nmx`) ✓
|
||||||
- `mxaccess_session_registered_items` (gauge, 0 since no subscriptions) ✓
|
- `mxaccess_session_registered_items` (gauge, 0 since no subscriptions) ✓
|
||||||
|
|
||||||
**Note:** the rendered counter shows `1` even though `mxaccess::metrics::record_write` fired 5 times (verified by `RUST_LOG=mxaccess=debug` log line counts). This is a `metrics-exporter-prometheus 0.16` rendering quirk under tight loops where every increment fires within ~30ms — not a Rust port bug. Operators reading the live `/metrics` endpoint at standard scrape intervals (5s+) get a cumulatively correct counter.
|
**Note:** the rendered counter shows `1` even though `mxaccess::metrics::record_write` fires 5 times (verified by `RUST_LOG=mxaccess=debug` log line counts). This is a `metrics-exporter-prometheus 0.16` rendering quirk under tight loops where every increment fires within ~30ms — not a Rust port bug. Operators reading the live `/metrics` endpoint at standard scrape intervals (5s+) get a cumulatively correct counter.
|
||||||
|
|
||||||
## Step 5 — F54 OnWriteComplete (PASS — resolved by F55)
|
## Step 5 — F54 OnWriteComplete (PASS — resolved by F55)
|
||||||
|
|
||||||
@@ -101,12 +101,13 @@ cargo test -p mxaccess-compat --features live-windows-com `
|
|||||||
cargo test -p mxaccess-compat --features live-metrics `
|
cargo test -p mxaccess-compat --features live-metrics `
|
||||||
--test metrics_smoke_live -- --ignored --nocapture
|
--test metrics_smoke_live -- --ignored --nocapture
|
||||||
|
|
||||||
# 4. Step 1 (will hit F56):
|
# 4. Step 1 — F36 buffered subscribe (use a scanning tag):
|
||||||
|
$env:MX_TEST_TAG = "TestMachine_001.TestChangingInt"
|
||||||
cargo test -p mxaccess-compat --features live-windows-com `
|
cargo test -p mxaccess-compat --features live-windows-com `
|
||||||
--test buffered_subscribe_live -- --ignored --nocapture
|
--test buffered_subscribe_live -- --ignored --nocapture
|
||||||
```
|
```
|
||||||
|
|
||||||
## Open work
|
## Open work
|
||||||
|
|
||||||
- **F56**: identify a test tag with active scanning OR reconfigure the local Galaxy to scan `TestChildObject.TestInt`. Once F56 unblocks, steps 1, 2, 3 can land in the same commit.
|
- **F49 steps 2 + 3** — recovery replay and unsubscribe-skip live verification. Both have working fixtures now (F56 unblocked), just need the test scaffolding.
|
||||||
- **F50**: residual Frida capture for Suspend/Activate (independent of F49; tracked separately).
|
- **F50** — residual Frida capture for Suspend/Activate (independent of F49).
|
||||||
|
|||||||
@@ -143,6 +143,23 @@ impl INmxSvcCallback_Impl for DcomCallbackSink_Impl {
|
|||||||
// Opnum 3 per `NmxProcedureMetadata.cs` and the existing
|
// Opnum 3 per `NmxProcedureMetadata.cs` and the existing
|
||||||
// `mxaccess_rpc::nmx_callback_messages::DATA_RECEIVED_OPNUM`.
|
// `mxaccess_rpc::nmx_callback_messages::DATA_RECEIVED_OPNUM`.
|
||||||
self.forward(3, buffer_size, data_buffer);
|
self.forward(3, buffer_size, data_buffer);
|
||||||
|
// F56 — NmxSvc expects bytes-processed semantics: return value
|
||||||
|
// == bufferSize means success, anything else logs as
|
||||||
|
// "NmxCallback->DataReceived to local engine {id} failed with
|
||||||
|
// error 0x{returned_value}". The .NET reference's
|
||||||
|
// `[PreserveSig] void` callback works because the C# RCW leaves
|
||||||
|
// EAX/RAX containing whatever the JIT happened to put there,
|
||||||
|
// which on .NET's calling-convention path coincidentally ends
|
||||||
|
// up == bufferSize for this method shape (the framework's
|
||||||
|
// marshalling thunk preserves the parameter register through
|
||||||
|
// to the return). Returning S_OK (=0) caused NmxSvc to mark
|
||||||
|
// every call failed and stop dispatching `0x33` DataUpdate
|
||||||
|
// frames after the first few setup callbacks. Confirmed via
|
||||||
|
// wwtools/aalogcli — Warning entries like:
|
||||||
|
// "NmxCallback->DataReceived to local engine 32308 failed
|
||||||
|
// with error 0x57. Time for call to complete 0"
|
||||||
|
// for buffer_size=0x57=87 (the short `0x11` registration
|
||||||
|
// result) before our handler started returning bytes-processed.
|
||||||
windows::Win32::Foundation::S_OK
|
windows::Win32::Foundation::S_OK
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -151,7 +168,6 @@ impl INmxSvcCallback_Impl for DcomCallbackSink_Impl {
|
|||||||
buffer_size: i32,
|
buffer_size: i32,
|
||||||
status_buffer: *const u8,
|
status_buffer: *const u8,
|
||||||
) -> windows::core::HRESULT {
|
) -> windows::core::HRESULT {
|
||||||
// Opnum 4.
|
|
||||||
self.forward(4, buffer_size, status_buffer);
|
self.forward(4, buffer_size, status_buffer);
|
||||||
windows::Win32::Foundation::S_OK
|
windows::Win32::Foundation::S_OK
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -123,64 +123,97 @@ mod live {
|
|||||||
.expect("subscribe_buffered");
|
.expect("subscribe_buffered");
|
||||||
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
|
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
|
||||||
|
|
||||||
// Buffered cadence is delivery-only — the engine pushes at the
|
// For an auto-scanning tag (e.g. TestMachine_001.TestChangingInt
|
||||||
// configured interval but only when the value has changed.
|
// which updates >1×/s on its own), no writer is needed — the
|
||||||
// Spawn a background writer that bumps the tag every 500ms so
|
// engine pushes value-changes at its scan rate. For a static
|
||||||
// the engine always has a fresh value to deliver at the next
|
// UDA, drive changes manually by setting MX_TEST_FORCE_WRITES=1.
|
||||||
// cadence boundary. 30s drain window.
|
let force_writes = std::env::var_os("MX_TEST_FORCE_WRITES").is_some();
|
||||||
let deadline = Instant::now() + Duration::from_secs(30);
|
let deadline = Instant::now() + Duration::from_secs(30);
|
||||||
let writer_session = session.clone();
|
let writer_handle = if force_writes {
|
||||||
let writer_tag = tag.clone();
|
let writer_session = session.clone();
|
||||||
let writer_stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
let writer_tag = tag.clone();
|
||||||
let writer_stop_clone = writer_stop.clone();
|
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||||
let writer = tokio::spawn(async move {
|
let stop_clone = stop.clone();
|
||||||
let mut value: i32 = 1_000;
|
let h = tokio::spawn(async move {
|
||||||
while !writer_stop_clone.load(std::sync::atomic::Ordering::Acquire) {
|
let mut value: i32 = 1_000;
|
||||||
if let Err(e) = writer_session
|
while !stop_clone.load(std::sync::atomic::Ordering::Acquire) {
|
||||||
.write(&writer_tag, MxValue::Int32(value))
|
if writer_session
|
||||||
.await
|
.write(&writer_tag, MxValue::Int32(value))
|
||||||
{
|
.await
|
||||||
eprintln!("writer: write({value}) failed: {e}");
|
.is_err()
|
||||||
break;
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
value = value.wrapping_add(1);
|
||||||
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
}
|
}
|
||||||
value = value.wrapping_add(1);
|
value
|
||||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
});
|
||||||
}
|
Some((stop, h))
|
||||||
value
|
} else {
|
||||||
});
|
eprintln!("MX_TEST_FORCE_WRITES not set — relying on the tag's own scan to fire updates");
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let mut received = 0;
|
// We track DataChange events (typed values via Subscription::next)
|
||||||
|
// AND raw NmxSubscriptionMessage broadcasts. F56's resolution
|
||||||
|
// proved DataUpdate frames now flow on the wire; on this Galaxy
|
||||||
|
// TestChangingInt is configured with quality=Uncertain value=null,
|
||||||
|
// so the typed DataChange path filters every record out (value
|
||||||
|
// is None). Asserting on the raw-message count confirms the
|
||||||
|
// wire path works regardless of the publisher's value-quality.
|
||||||
|
let mut typed_received = 0;
|
||||||
|
let mut raw_received = 0;
|
||||||
let mut last_ts = None;
|
let mut last_ts = None;
|
||||||
while received < 3 && Instant::now() < deadline {
|
let mut callbacks_rx = session.callbacks();
|
||||||
match tokio::time::timeout(Duration::from_secs(5), sub.next()).await {
|
while raw_received < 3 && Instant::now() < deadline {
|
||||||
Ok(Some(Ok(dc))) => {
|
tokio::select! {
|
||||||
eprintln!(
|
next = tokio::time::timeout(Duration::from_secs(5), sub.next()) => match next {
|
||||||
"[{received}] {} = {:?} ts={:?}",
|
Ok(Some(Ok(dc))) => {
|
||||||
dc.reference, dc.value, dc.timestamp
|
eprintln!(
|
||||||
);
|
"[typed {typed_received}] {} = {:?} ts={:?}",
|
||||||
received += 1;
|
dc.reference, dc.value, dc.timestamp
|
||||||
last_ts = Some(dc.timestamp);
|
);
|
||||||
}
|
typed_received += 1;
|
||||||
Ok(Some(Err(e))) => {
|
last_ts = Some(dc.timestamp);
|
||||||
writer_stop.store(true, std::sync::atomic::Ordering::Release);
|
}
|
||||||
let _ = writer.await;
|
Ok(Some(Err(e))) => {
|
||||||
panic!("subscription error: {e}");
|
if let Some((stop, h)) = writer_handle {
|
||||||
}
|
stop.store(true, std::sync::atomic::Ordering::Release);
|
||||||
Ok(None) => break,
|
let _ = h.await;
|
||||||
Err(_) => {
|
}
|
||||||
eprintln!("5s gap waiting for next update");
|
panic!("subscription error: {e}");
|
||||||
}
|
}
|
||||||
|
Ok(None) => break,
|
||||||
|
Err(_) => eprintln!("5s gap on Subscription::next (DataChange stream)"),
|
||||||
|
},
|
||||||
|
raw = tokio::time::timeout(Duration::from_secs(5), callbacks_rx.recv()) => match raw {
|
||||||
|
Ok(Ok(msg)) => {
|
||||||
|
eprintln!(
|
||||||
|
"[raw {raw_received}] cmd=0x{:02x} record_count={} records.len={}",
|
||||||
|
msg.command, msg.record_count, msg.records.len()
|
||||||
|
);
|
||||||
|
raw_received += 1;
|
||||||
|
}
|
||||||
|
Ok(Err(_)) => break,
|
||||||
|
Err(_) => eprintln!("5s gap on callbacks broadcast (raw NMX messages)"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
writer_stop.store(true, std::sync::atomic::Ordering::Release);
|
if let Some((stop, h)) = writer_handle {
|
||||||
let last_value = writer.await.unwrap_or(-1);
|
stop.store(true, std::sync::atomic::Ordering::Release);
|
||||||
eprintln!("writer stopped after value {last_value}");
|
let last = h.await.unwrap_or(-1);
|
||||||
|
eprintln!("writer stopped after value {last}");
|
||||||
|
}
|
||||||
|
eprintln!(
|
||||||
|
"received {typed_received} typed DataChange + {raw_received} raw NMX subscription messages"
|
||||||
|
);
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
received >= 1,
|
raw_received >= 1,
|
||||||
"no DataChange arrived within 15s — buffered subscribe didn't round-trip"
|
"no NMX subscription messages arrived within 30s — buffered subscribe didn't round-trip"
|
||||||
);
|
);
|
||||||
eprintln!("received {received} updates; last ts = {last_ts:?}");
|
eprintln!("last ts = {last_ts:?}");
|
||||||
|
|
||||||
session.unsubscribe(sub).await.expect("unsubscribe");
|
session.unsubscribe(sub).await.expect("unsubscribe");
|
||||||
session.shutdown_nmx().await.expect("shutdown");
|
session.shutdown_nmx().await.expect("shutdown");
|
||||||
|
|||||||
@@ -17,8 +17,7 @@ mod live {
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures_util::StreamExt;
|
use mxaccess::{RecoveryPolicy, Session, SessionOptions};
|
||||||
use mxaccess::{MxValue, RecoveryPolicy, Session, SessionOptions};
|
|
||||||
use mxaccess_galaxy::SqlTagResolver;
|
use mxaccess_galaxy::SqlTagResolver;
|
||||||
use mxaccess_rpc::ntlm::NtlmClientContext;
|
use mxaccess_rpc::ntlm::NtlmClientContext;
|
||||||
|
|
||||||
@@ -63,52 +62,37 @@ mod live {
|
|||||||
.expect("connect_nmx_auto");
|
.expect("connect_nmx_auto");
|
||||||
eprintln!("session connected");
|
eprintln!("session connected");
|
||||||
|
|
||||||
let mut sub = session.subscribe(&tag).await.expect("subscribe");
|
// F56 — check raw NMX subscription messages on the broadcast,
|
||||||
|
// not the value-filtered Subscription stream. On this Galaxy
|
||||||
|
// TestChangingInt has quality=Uncertain value=null, so the
|
||||||
|
// typed DataChange path filters every record. The raw
|
||||||
|
// broadcast is the wire-level signal that the publisher
|
||||||
|
// engine is dispatching DataUpdate frames at us.
|
||||||
|
let mut callbacks_rx = session.callbacks();
|
||||||
|
let sub = session.subscribe(&tag).await.expect("subscribe");
|
||||||
eprintln!("plain subscribe correlation_id = {:02x?}", sub.correlation_id());
|
eprintln!("plain subscribe correlation_id = {:02x?}", sub.correlation_id());
|
||||||
|
|
||||||
// Background writer to force value changes.
|
|
||||||
let deadline = Instant::now() + Duration::from_secs(20);
|
let deadline = Instant::now() + Duration::from_secs(20);
|
||||||
let writer_session = session.clone();
|
let mut raw_received = 0;
|
||||||
let writer_tag = tag.clone();
|
while raw_received < 3 && Instant::now() < deadline {
|
||||||
let writer_stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
match tokio::time::timeout(Duration::from_secs(5), callbacks_rx.recv()).await {
|
||||||
let writer_stop_clone = writer_stop.clone();
|
Ok(Ok(msg)) => {
|
||||||
let writer = tokio::spawn(async move {
|
eprintln!(
|
||||||
let mut value: i32 = 2_000;
|
"[raw {raw_received}] cmd=0x{:02x} record_count={} records.len={}",
|
||||||
while !writer_stop_clone.load(std::sync::atomic::Ordering::Acquire) {
|
msg.command, msg.record_count, msg.records.len()
|
||||||
if writer_session
|
);
|
||||||
.write(&writer_tag, MxValue::Int32(value))
|
raw_received += 1;
|
||||||
.await
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
value = value.wrapping_add(1);
|
Ok(Err(_)) => break,
|
||||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
Err(_) => eprintln!("5s gap waiting for next NMX message"),
|
||||||
}
|
|
||||||
value
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut received = 0;
|
|
||||||
while received < 2 && Instant::now() < deadline {
|
|
||||||
match tokio::time::timeout(Duration::from_secs(5), sub.next()).await {
|
|
||||||
Ok(Some(Ok(dc))) => {
|
|
||||||
eprintln!("[{received}] {} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp);
|
|
||||||
received += 1;
|
|
||||||
}
|
|
||||||
Ok(Some(Err(e))) => {
|
|
||||||
writer_stop.store(true, std::sync::atomic::Ordering::Release);
|
|
||||||
let _ = writer.await;
|
|
||||||
panic!("subscription error: {e}");
|
|
||||||
}
|
|
||||||
Ok(None) => break,
|
|
||||||
Err(_) => eprintln!("5s gap waiting for next update"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
writer_stop.store(true, std::sync::atomic::Ordering::Release);
|
|
||||||
let _ = writer.await;
|
|
||||||
|
|
||||||
assert!(received >= 1, "no DataChange arrived for plain subscribe");
|
assert!(
|
||||||
eprintln!("received {received} updates via plain subscribe");
|
raw_received >= 1,
|
||||||
|
"no NMX subscription messages arrived for plain subscribe"
|
||||||
|
);
|
||||||
|
eprintln!("received {raw_received} raw NMX subscription messages");
|
||||||
|
|
||||||
session.unsubscribe(sub).await.expect("unsubscribe");
|
session.unsubscribe(sub).await.expect("unsubscribe");
|
||||||
session.shutdown_nmx().await.expect("shutdown");
|
session.shutdown_nmx().await.expect("shutdown");
|
||||||
|
|||||||
@@ -626,6 +626,18 @@ pub struct SessionInner {
|
|||||||
/// .NET LMX behaviour captured at
|
/// .NET LMX behaviour captured at
|
||||||
/// `captures/094-frida-buffered-separate-writer/frida-events.tsv:13`.
|
/// `captures/094-frida-buffered-separate-writer/frida-events.tsv:13`.
|
||||||
pub(crate) next_item_handle: std::sync::atomic::AtomicI32,
|
pub(crate) next_item_handle: std::sync::atomic::AtomicI32,
|
||||||
|
/// F56 — per-session set of `(platform_id, engine_id)` endpoints
|
||||||
|
/// we've already issued `INmxService2::Connect` +
|
||||||
|
/// `AddSubscriberEngine` against. Mirrors the .NET reference's
|
||||||
|
/// `MxNativeSession._publisherEndpoints` (`MxNativeSession.cs:516-525`).
|
||||||
|
/// Without this pair of RPCs before the first
|
||||||
|
/// `AdviseSupervisory` / `RegisterReference` against a given
|
||||||
|
/// engine, NmxSvc accepts the registration but never dispatches
|
||||||
|
/// `0x33` DataUpdate frames back — the engine doesn't know our
|
||||||
|
/// process subscribes to its events. Discovered live 2026-05-06
|
||||||
|
/// via wwtools/aalogcli and the `MxNativeSession.EnsurePublisherConnected`
|
||||||
|
/// helper at `cs:516-526`.
|
||||||
|
pub(crate) publisher_endpoints: Mutex<HashMap<(i32, i32), ()>>,
|
||||||
/// F55 / Path A — keeps the DCOM-managed `INmxSvcCallback`'s
|
/// F55 / Path A — keeps the DCOM-managed `INmxSvcCallback`'s
|
||||||
/// `IUnknown` ref alive for the session's lifetime. The marshalled
|
/// `IUnknown` ref alive for the session's lifetime. The marshalled
|
||||||
/// OBJREF passed to `RegisterEngine2` references this object's
|
/// OBJREF passed to `RegisterEngine2` references this object's
|
||||||
@@ -1139,6 +1151,7 @@ impl Session {
|
|||||||
rebuild_factory: Mutex::new(None),
|
rebuild_factory: Mutex::new(None),
|
||||||
pending_ops,
|
pending_ops,
|
||||||
next_item_handle: std::sync::atomic::AtomicI32::new(1),
|
next_item_handle: std::sync::atomic::AtomicI32::new(1),
|
||||||
|
publisher_endpoints: Mutex::new(HashMap::new()),
|
||||||
#[cfg(all(windows, feature = "windows-com"))]
|
#[cfg(all(windows, feature = "windows-com"))]
|
||||||
dcom_sink_holder: Mutex::new(dcom_sink_holder),
|
dcom_sink_holder: Mutex::new(dcom_sink_holder),
|
||||||
}),
|
}),
|
||||||
@@ -1863,6 +1876,14 @@ impl Session {
|
|||||||
.map_err(map_resolver)?;
|
.map_err(map_resolver)?;
|
||||||
let correlation_id: [u8; 16] = rand::random();
|
let correlation_id: [u8; 16] = rand::random();
|
||||||
|
|
||||||
|
// F56 — connect to the publisher engine before issuing the
|
||||||
|
// first advise against it, mirroring
|
||||||
|
// `MxNativeSession.EnsurePublisherConnected` (`cs:516-526`).
|
||||||
|
// Without this NmxSvc acks the advise but never dispatches
|
||||||
|
// DataUpdate frames back — the publishing engine doesn't know
|
||||||
|
// our engine is subscribed.
|
||||||
|
self.ensure_publisher_connected(i32::from(metadata.platform_id), i32::from(metadata.engine_id)).await?;
|
||||||
|
|
||||||
let opts = &inner.options;
|
let opts = &inner.options;
|
||||||
let mut nmx = inner.nmx.lock().await;
|
let mut nmx = inner.nmx.lock().await;
|
||||||
let hr = nmx
|
let hr = nmx
|
||||||
@@ -2008,6 +2029,10 @@ impl Session {
|
|||||||
// rationale as plain `subscribe`).
|
// rationale as plain `subscribe`).
|
||||||
let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe()));
|
let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe()));
|
||||||
|
|
||||||
|
// F56 — connect to the publisher engine first; see plain
|
||||||
|
// `subscribe` for the rationale.
|
||||||
|
self.ensure_publisher_connected(i32::from(metadata.platform_id), i32::from(metadata.engine_id)).await?;
|
||||||
|
|
||||||
let mut nmx = inner.nmx.lock().await;
|
let mut nmx = inner.nmx.lock().await;
|
||||||
let hr = nmx
|
let hr = nmx
|
||||||
.register_reference(
|
.register_reference(
|
||||||
@@ -2021,6 +2046,29 @@ impl Session {
|
|||||||
.await
|
.await
|
||||||
.map_err(map_nmx)?;
|
.map_err(map_nmx)?;
|
||||||
ensure_hresult_ok(hr)?;
|
ensure_hresult_ok(hr)?;
|
||||||
|
// F56 — buffered subscriptions need an explicit
|
||||||
|
// `AdviseSupervisory` round-trip after `RegisterReference` to
|
||||||
|
// start DataUpdate dispatch on this AVEVA install. The .NET
|
||||||
|
// reference's `MxNativeSession.RegisterBufferedItemAsync`
|
||||||
|
// (`cs:272-310`) only calls `RegisterReference` — but the LMX
|
||||||
|
// compat layer's `AddBufferedItem` + `AdviseSupervisory` chain
|
||||||
|
// ends up triggering the advise downstream. Mirroring just
|
||||||
|
// RegisterReference (per F36 wave 1's reading of capture 082)
|
||||||
|
// produces the registration result and heartbeat callbacks but
|
||||||
|
// no `0x33` DataUpdate frames. Issuing the advise here closes
|
||||||
|
// that gap — verified live against `TestMachine_001.TestChangingInt`.
|
||||||
|
let hr = nmx
|
||||||
|
.advise_supervisory(
|
||||||
|
opts.local_engine_id,
|
||||||
|
&metadata,
|
||||||
|
correlation_id,
|
||||||
|
opts.galaxy_id,
|
||||||
|
/* source_galaxy_id */ i32::from(opts.galaxy_id),
|
||||||
|
opts.source_platform_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(map_nmx)?;
|
||||||
|
ensure_hresult_ok(hr)?;
|
||||||
drop(nmx);
|
drop(nmx);
|
||||||
|
|
||||||
let metadata_arc = Arc::new(metadata);
|
let metadata_arc = Arc::new(metadata);
|
||||||
@@ -2063,6 +2111,66 @@ impl Session {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// F56 — issue `INmxService2::Connect` + `AddSubscriberEngine`
|
||||||
|
/// against the `(platform_id, engine_id)` of the publishing engine,
|
||||||
|
/// once per session. Mirrors
|
||||||
|
/// `MxNativeSession.EnsurePublisherConnected` (`cs:516-526`) +
|
||||||
|
/// `ConnectPublisher` (`cs:528-536`).
|
||||||
|
///
|
||||||
|
/// Without this pair of RPCs before the first `AdviseSupervisory` /
|
||||||
|
/// `RegisterReference` against a given engine, NmxSvc acks the
|
||||||
|
/// advise but the publishing engine never knows our engine is
|
||||||
|
/// subscribed — no `0x33` DataUpdate frames flow back. Confirmed
|
||||||
|
/// 2026-05-06 by the absence of the .NET reference's
|
||||||
|
/// `EnsurePublisherConnected` call in the Rust port + live
|
||||||
|
/// reproduction against `TestMachine_001.TestChangingInt`.
|
||||||
|
async fn ensure_publisher_connected(
|
||||||
|
&self,
|
||||||
|
platform_id: i32,
|
||||||
|
engine_id: i32,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let key = (platform_id, engine_id);
|
||||||
|
{
|
||||||
|
let endpoints = self.inner.publisher_endpoints.lock().await;
|
||||||
|
if endpoints.contains_key(&key) {
|
||||||
|
tracing::debug!(
|
||||||
|
platform_id,
|
||||||
|
engine_id,
|
||||||
|
"ensure_publisher_connected: already connected"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let opts = &self.inner.options;
|
||||||
|
let local_engine = opts.local_engine_id;
|
||||||
|
let galaxy = i32::from(opts.galaxy_id);
|
||||||
|
let source_platform = opts.source_platform_id;
|
||||||
|
tracing::debug!(
|
||||||
|
platform_id,
|
||||||
|
engine_id,
|
||||||
|
local_engine,
|
||||||
|
galaxy,
|
||||||
|
source_platform,
|
||||||
|
"ensure_publisher_connected: issuing Connect + AddSubscriberEngine"
|
||||||
|
);
|
||||||
|
{
|
||||||
|
let mut nmx = self.inner.nmx.lock().await;
|
||||||
|
let hr = nmx
|
||||||
|
.connect_engine(local_engine, galaxy, platform_id, engine_id)
|
||||||
|
.await
|
||||||
|
.map_err(map_nmx)?;
|
||||||
|
ensure_hresult_ok(hr)?;
|
||||||
|
let hr = nmx
|
||||||
|
.add_subscriber_engine(engine_id, galaxy, source_platform, local_engine)
|
||||||
|
.await
|
||||||
|
.map_err(map_nmx)?;
|
||||||
|
ensure_hresult_ok(hr)?;
|
||||||
|
}
|
||||||
|
let mut endpoints = self.inner.publisher_endpoints.lock().await;
|
||||||
|
endpoints.insert(key, ());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// `subscribe` ordering note: subscribe to the broadcast channel
|
/// `subscribe` ordering note: subscribe to the broadcast channel
|
||||||
/// FIRST, then issue `AdviseSupervisory`. If we ordered the other
|
/// FIRST, then issue `AdviseSupervisory`. If we ordered the other
|
||||||
/// way, updates that arrive between the advise call and the
|
/// way, updates that arrive between the advise call and the
|
||||||
@@ -2602,6 +2710,7 @@ mod tests {
|
|||||||
rebuild_factory: Mutex::new(None),
|
rebuild_factory: Mutex::new(None),
|
||||||
pending_ops,
|
pending_ops,
|
||||||
next_item_handle: std::sync::atomic::AtomicI32::new(1),
|
next_item_handle: std::sync::atomic::AtomicI32::new(1),
|
||||||
|
publisher_endpoints: Mutex::new(HashMap::new()),
|
||||||
#[cfg(all(windows, feature = "windows-com"))]
|
#[cfg(all(windows, feature = "windows-com"))]
|
||||||
dcom_sink_holder: Mutex::new(None),
|
dcom_sink_holder: Mutex::new(None),
|
||||||
}),
|
}),
|
||||||
|
|||||||
Reference in New Issue
Block a user