mxaccesscli: add read-batch / write-batch / subscribe-batch (JSONL input)

Three new subcommands that take a JSONL file (or '-' for stdin) and reuse a
single MxSession across all entries. The big win is in write-batch: a
two-phase pipeline (Advise all -> drain DataChange to resolve types; Write
all -> drain WriteComplete) reduces wall time from N x (resolve + write_ack)
to ~max(resolve) + ~max(write_ack). Measured 38.2s -> 10.3s (~3.7x) for
four writes against the ZB dev galaxy; the saving grows with N.

Per-item continue-on-error: parse errors are collected line-by-line and
abort with exit 2 before any LMX session opens; runtime failures (resolve
timeout, bad references, coerce errors, write timeouts) get their own
results[] row with a typed `error` string and exit 1. Auth flags mirror
`mxa write` and are resolved once before Phase A.

Shared infra:
  - Mx/JsonlInputReader.cs: lazy line reader (skips blank / '#' lines),
    bare-string or {"tag":"..."} for read/sub, {"tag","value","type"?} for
    write, with array-suffix consistency check at parse time.
  - Mx/ValueCoercion.cs: new CoerceJToken(...) wrapper preserves the
    single source of truth for type vocabulary.

Docs:
  - README run examples extended for each new command.
  - docs/usage.md: new "Batch input format" subsection (shared contract),
    one section per command with envelope examples and a full
    failure-mode table for write-batch, plus a "Batch commands -
    verified live" section capturing the 2026-05-10 ZB-galaxy run and
    pipelining-timing numbers.
  - test-fixtures/ holds the exact JSONL files used in the verified-live
    run so the doc numbers are reproducible.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-11 08:34:46 -04:00
parent 6cde4d9fe4
commit 2e937228a0
10 changed files with 1260 additions and 0 deletions
+9
View File
@@ -64,6 +64,15 @@ dotnet run --project src/MxAccess.Cli/MxAccess.Cli.csproj -- write TestMachine_0
# Subscribe to a tag for 30 seconds, JSON Lines for streaming:
dotnet run --project src/MxAccess.Cli/MxAccess.Cli.csproj -- subscribe TestMachine_001.Speed -s 30 --llm-json
# Batch read from a JSONL file (one tag string or {"tag":"..."} per line):
mxa read-batch tags.jsonl --llm-json
# Batch write {tag,value,type?} pairs piped from stdin:
echo '{"tag":"TM.Setpoint","value":42.5,"type":"double"}' | mxa write-batch - --llm-json
# Batch subscribe from a JSONL file, streaming for 30s:
mxa subscribe-batch tags.jsonl -s 30 --llm-json
```
The built executable is `bin\x86\Release\net48\mxa.exe`. Drop on `PATH` and use `mxa read ...`.
+218
View File
@@ -11,6 +11,42 @@ Read, write, and subscribe to AVEVA System Platform tags via MxAccess. The CLI r
- **Subsequent events are fast.** Once a tag is bound, value-change updates propagate within ~100 ms.
- **Exit codes:** `0` on success, `1` if any operation timed out or returned a non-Ok / non-Pending `MxStatusCategory`, `2` on argument-validation errors.
## Batch input format
The three `-batch` commands (`read-batch`, `write-batch`, `subscribe-batch`) all
take a single positional `<input>` argument which is either a path to a JSONL
file or `-` to read JSONL from stdin. The line conventions are shared:
- **Blank lines and lines starting with `#`** are skipped. Line numbers still
count them, so error messages stay accurate against the source file.
- **Per-line auth override is not supported in v1.** All entries in a
`write-batch` use the same `--username` / `--password` / `--user-id` /
`--secured` / `--verifier-*` flags supplied on the command line.
- **Duplicate tags are allowed.** Each input line is an independent entry
with its own item handle and its own row in `results[]`.
Read / subscribe lines accept **either** a bare JSON string or an object form:
```jsonl
"TestMachine_001.Speed"
{"tag": "Reactor1.Level"}
```
Write lines are object form only:
```jsonl
{"tag": "TM.Setpoint", "value": 42.5, "type": "double"}
{"tag": "TM.RunFlag", "value": true}
{"tag": "TM.Parts[]", "value": ["", "11111", ""], "type": "string"}
```
`value` may be a JSON `bool`, `number`, `string`, or `array`. A JSON array
requires the tag to end in `[]` (whole-array reference, matches the `mxa write`
rule). `type` is optional; when omitted the JSON token's native type drives the
default. When set, `type` accepts the same vocabulary as `mxa write --type`
(`bool`, `byte`, `short`, `int`, `long`, `float`, `double`, `string`,
`datetime`).
## `mxa info`
Print the loaded `ArchestrA.MxAccess` assembly identity, supported `--type` values, and the full `MxStatusCategory` enum. No tag access.
@@ -193,6 +229,188 @@ LLM-JSON output (one event per line, no surrounding `[ ... ]`):
JSON Lines lets a downstream consumer parse events incrementally rather than buffering the whole stream — the right shape for indefinite or long-running subscriptions.
## `mxa read-batch <input>`
Reads many tags from a JSONL file (or `-` for stdin) in **one** MxSession,
amortizing the 38 s LMX bind cost across the whole batch. Line shape is
defined in [Batch input format](#batch-input-format). Result rows arrive in
input-line order regardless of which `OnDataChange` fires first.
| Option | Default | Notes |
| --- | --- | --- |
| `-t`, `--timeout <seconds>` | `5` | Wall budget for the whole batch (not per-tag). Tags that haven't delivered a `DataChange` by the deadline are reported with `error: "timeout"`. |
| `--client <name>` | `mxa` | Passed to `Register()`. |
| `--llm-json` | off | Emit the JSON envelope. |
Example:
```powershell
mxa read-batch tags.jsonl --llm-json
echo '"TM.Speed"' | mxa read-batch - --llm-json
```
LLM-JSON envelope (one results row per non-blank input line, carrying the
1-based `line` for traceability):
```json
{
"query": { "command": "read-batch", "input": "tags.jsonl", "timeout_s": 5.0, "client": "mxa" },
"ok": true,
"results": [
{ "tag": "TM.Speed", "line": 1, "ok": true, "value": 1234.5, "quality": 192, "timestamp": "...", "statuses": [...] },
{ "tag": "Reactor1.Level","line": 2, "ok": true, "value": 78.1, "quality": 192, "timestamp": "...", "statuses": [...] }
]
}
```
Exit code: `0` if every entry resolved Ok; `1` if any timed out or returned a
non-Ok status; `2` for parse errors, missing input, or empty input.
## `mxa write-batch <input>`
Writes many tag/value pairs from a JSONL file (or `-`) in **one** MxSession,
**pipelining** per-item resolve and write so wall time is roughly
`max(resolve_latency) + max(write_ack)` instead of `N × (resolve_latency +
write_ack)`. Auth is resolved once before any item is touched.
Line shape: see [Batch input format](#batch-input-format). Auth flags follow
the same semantics as `mxa write` — see [Authentication](#authentication) for
how `--username` / `--domain` / `--secured` / `--verifier-*` interact. Per-line
auth override is **out of scope for v1** (planned follow-up).
| Option | Default | Notes |
| --- | --- | --- |
| `-t`, `--timeout <seconds>` | `5` | Per-phase wall budget. Phase A (resolve types) and Phase B (await `OnWriteComplete`) each get this many seconds. |
| `--user-id <int>` | `0` | Pre-resolved authenticated user id. See `mxa write` for caveats. |
| `-u`, `--username <name>` | (none) | Galaxy / OS username. Resolved to a userId via `AuthenticateUser` once before Phase A. |
| `--domain <name>` | (none) | Combined with `--username` as `<domain>\<username>`. |
| `-p`, `--password <pwd>` | (none) | Password for `--username`. Redacted in the LLM-JSON `query` echo. |
| `--secured` | off | Route writes through `WriteSecured(currentUserId, verifierUserId, value)`. Required for Secured Write / Verified Write classifications. |
| `--verifier-username` / `--verifier-domain` / `--verifier-password` | (none) | Two-person verified write; implies `--secured`. |
| `--client <name>` | `mxa` | Passed to `Register()`. |
| `--llm-json` | off | Emit the JSON envelope. |
Example:
```powershell
# Set up Hi/HiHi/Lo/LoLo limits and enable alarms for a tag in one shot:
mxa write-batch limits.jsonl --llm-json
```
```jsonl
{"tag":"BCD_T1.HighLimit", "value": 80, "type": "float"}
{"tag":"BCD_T1.HighHighLimit", "value": 95, "type": "float"}
{"tag":"BCD_T1.LowLimit", "value": 20, "type": "float"}
{"tag":"BCD_T1.LowLowLimit", "value": 5, "type": "float"}
{"tag":"BCD_T1.HighAlarm.Alarmed", "value": true}
{"tag":"BCD_T1.HighHighAlarm.Alarmed", "value": true}
{"tag":"BCD_T1.LowAlarm.Alarmed", "value": true}
{"tag":"BCD_T1.LowLowAlarm.Alarmed", "value": true}
```
LLM-JSON envelope (per-entry row with auth attribution and `line` echo):
```json
{
"query": { "command": "write-batch", "input": "limits.jsonl", "entries": 8,
"timeout_s": 5.0, "user_id": 0, "verify_user": null, "secured": false, "client": "mxa" },
"ok": true,
"results": [
{ "tag": "BCD_T1.HighLimit", "line": 1, "ok": true, "error": null,
"authenticated": false, "auth_user_id": null, "secured": false,
"verifier_user_id": null, "statuses": [...] }
]
}
```
### Failure modes — `write-batch`
| `error` string | Cause | Exit |
| --- | --- | --- |
| (top-level) `parse-error` | One or more lines failed JSONL / schema validation; `results[]` lists each. | 2 |
| (top-level) `empty-input` | File / stdin contained no non-blank entries. | 2 |
| (top-level) `authentication-failed` | `AuthenticateUser` returned 0 for the operator credentials. **No items attempted.** | 1 |
| (top-level) `verifier-authentication-failed` | Same for the verifier in two-person Verified Write. | 1 |
| per-entry `add-item-failed: …` | `LMXProxyServer.AddItem` threw (typically a malformed reference). | 1 |
| per-entry `timeout-resolving-type` | No `OnDataChange` arrived for this item before `--timeout` elapsed. | 1 |
| per-entry `type-resolution-failed` | First `OnDataChange` carried a non-Ok status — `statuses` filled. | 1 |
| per-entry `value-coerce-failed: …` | JSON value couldn't be converted to the configured `--type` (or inferred type). The Write was **not** queued. | 1 |
| per-entry `write-call-failed: …` | The `Write` / `WriteSecured` COM call itself threw before `OnWriteComplete`. | 1 |
| per-entry `timeout` | No `OnWriteComplete` arrived before `--timeout` elapsed. | 1 |
| per-entry `write-failed` | `OnWriteComplete` arrived with non-Ok statuses (e.g. read-only attr, security denied). | 1 |
The process exits `1` if any per-entry row failed, `0` if every row was Ok.
The envelope's top-level `ok` matches.
## `mxa subscribe-batch <input>`
Subscribes to many tags from a JSONL file (or `-`) and streams `OnDataChange`
events for a duration. The streaming output is identical to `mxa subscribe`
no envelope wrap; one event per line when `--llm-json` is set.
Line shape: see [Batch input format](#batch-input-format).
| Option | Default | Notes |
| --- | --- | --- |
| `-s`, `--seconds <seconds>` | `10` | Wall-clock duration of the subscription. |
| `--max <int>` | `1000` | Hard cap on emitted events. |
| `--client <name>` | `mxa` | Passed to `Register()`. |
| `--llm-json` | off | JSON Lines mode — one event per line, no outer envelope. |
First-event latency is still 38 s per tag, but `Advise()` is issued for every
tag up front so the binds run in parallel — wall time matches a single-tag
subscribe. For short `--seconds` windows (< 5 s) you may miss the initial
values; budget accordingly.
Example:
```powershell
mxa subscribe-batch tags.jsonl -s 30 --llm-json
```
Output stream (each event as one JSON line):
```jsonl
{"tag":"TM.Speed","ok":true,"value":1234.5,"quality":192,"timestamp":"...","statuses":[...]}
{"tag":"Reactor1.Level","ok":true,"value":78.1,"quality":192,"timestamp":"...","statuses":[...]}
```
Exit code: `0` on a clean run, `1` if every tag failed to subscribe (no item
ever bound), `2` for parse errors / missing input / empty input.
## Batch commands — verified live
Captured on 2026-05-10 against the dev `ZB` galaxy (System Platform 2017
Express, MxAccess `3.2.0.0`). The galaxy is configured permissively
(`eNone` / Free Access) so writes were issued anonymously.
| Scenario | Result |
| --- | --- |
| `read-batch` 3 entries (bare-string + object + duplicate, with blank/`#` lines) | All three returned in input-line order; `line` field correctly reflected `1, 2, 5`. |
| `read-batch` continue-on-error (`NoSuchObject.NoSuchAttribute` mixed with `DevPlatform.CPULoad`) | Good entry returned value, bad entry returned `Category=MxCategoryConfigurationError, Detail=6`. Exit 1. |
| `write-batch` 5 entries (1 attribute with `security_classification=5` failed `SecurityError, Detail=1007`; rest wrote anonymously) | 4/5 wrote; per-item statuses preserved; exit 1. Round-trip read confirmed `TuneValue=7.25`, `ProtectedValue=true`, `ProtectedValue1=false`, `MoveInPartNumbers[1]="PN-42"`. |
| `write-batch` failure-mode coverage (`value-coerce-failed` + `type-resolution-failed` + ok, same `TuneValue` tag duplicated) | Each failure surfaced its distinct `error` string; duplicate tag entries are independent handles. |
| `subscribe-batch` two tags, 18-second window | Both tags bound in parallel; CPULoad streamed ~26 `OnDataChange` events; `SystemStartupTime` delivered its initial value then stayed quiet (constant). Exit 0. |
| Parse error on stdin (`{ malformed json }`) | `results[]` row with `error: "parse-error"`, line + reason, no LMX session opened. Exit 2. |
| Missing input file | Top-level `error` envelope, exit 2. |
### Pipelining timing
Same four writes (`OtOpcUaParityTest_001.TuneValue`,
`TestMachine_001.ProtectedValue`, `TestMachine_001.ProtectedValue1`,
`MESReceiver_001.MoveInPartNumbers[1]`) executed two ways, wall-clock:
| Method | Wall time | Notes |
| --- | --- | --- |
| 4 × `mxa write` (sequential subprocess invocations) | **38.2 s** | Each invocation pays the 38 s LMX bind once. |
| 1 × `mxa write-batch` (4 writes in one MxSession, two-phase pipeline) | **10.3 s** | Includes a 5th entry that failed `SecurityError` — still resolved in Phase A. |
**3.7× faster** at N=4. The cost model is roughly `~max(resolve_latency)
+ ~max(write_ack)` for the batched form versus `N × (resolve_latency +
write_ack)` for the sequential form, so the speedup grows with N: at ~10
writes the savings are typically 6080 seconds, which is the scale of the
test-plan setup phases this command was added for.
## Type support matrix
Verified end-to-end against the live `ZB` galaxy (System Platform 2017 Express, MxAccess `3.2.0.0`). Each row records what the wire shape looks like in the JSON envelope.
@@ -0,0 +1,227 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CliFx;
using CliFx.Attributes;
using CliFx.Exceptions;
using CliFx.Infrastructure;
using MxAccess.Cli.Mx;
using MxAccess.Cli.Output;
namespace MxAccess.Cli.Commands
{
[Command("read-batch", Description =
"Read many tags from a JSONL input file (or stdin via '-'). One MxSession " +
"amortizes the LMX bind cost across the whole batch.")]
public sealed class ReadBatchCommand : ICommand
{
[CommandParameter(0, Name = "input",
Description = "Path to a JSONL file. Use '-' to read JSONL from stdin. " +
"Each line is either a bare tag string (e.g. \"TM.Speed\") or " +
"an object (e.g. {\"tag\":\"TM.Speed\"}). Blank and '#'-prefixed lines are skipped.")]
public string Input { get; init; }
[CommandOption("timeout", 't', Description = "Per-tag timeout in seconds while waiting for the first value. Default 5.")]
public double TimeoutSeconds { get; init; } = 5.0;
[CommandOption("client", Description = "MxAccess client name passed to Register(). Default 'mxa'.")]
public string ClientName { get; init; } = "mxa";
[CommandOption("llm-json", Description = "Emit the JSON envelope { query, ok, results } instead of human-readable lines.")]
public bool LlmJson { get; init; }
public ValueTask ExecuteAsync(IConsole console)
{
if (string.IsNullOrWhiteSpace(Input))
throw new CommandException("Input path is required (use '-' for stdin).", 2);
if (TimeoutSeconds <= 0)
throw new CommandException("--timeout must be positive.", 2);
var query = new
{
command = "read-batch",
input = Input,
timeout_s = TimeoutSeconds,
client = ClientName,
};
// Phase 1: load + parse. Collect all parse errors so the user gets
// every malformed line in one report instead of fixing them one at
// a time.
var entries = new List<BatchReadEntry>();
var parseErrors = new List<object>();
try
{
foreach (var (lineNo, json) in JsonlInputReader.ReadLines(Input))
{
try
{
entries.Add(JsonlInputReader.ParseReadEntry(lineNo, json));
}
catch (InvalidBatchEntryException ex)
{
parseErrors.Add(new
{
tag = (string)null,
line = ex.LineNumber,
ok = false,
error = "parse-error",
reason = ex.Message,
});
}
}
}
catch (InvalidBatchInputException ex)
{
if (LlmJson) Envelope.WriteError(console, query, ex.Message);
else console.Error.WriteLine($"[ERR] {ex.Message}");
Environment.ExitCode = 2;
return default;
}
if (parseErrors.Count > 0)
{
if (LlmJson) Envelope.Write(console, query, ok: false, parseErrors);
else
{
foreach (dynamic e in parseErrors)
console.Error.WriteLine($"[ERR] line {e.line}: {e.reason}");
}
Environment.ExitCode = 2;
return default;
}
if (entries.Count == 0)
{
if (LlmJson) Envelope.WriteError(console, query, "empty-input");
else console.Error.WriteLine("[ERR] no entries in input");
Environment.ExitCode = 2;
return default;
}
// Phase 2: AddItem + Advise for every entry inside one MxSession,
// then drain DataChange events keyed by handle.
using var session = new MxSession(ClientName);
var items = new List<MxItem>();
// Track each entry's handle (or a per-entry add-item error) so the
// result row keeps its input ordering and line-number echo even
// when AddItem fails mid-batch.
var addError = new Dictionary<int, string>(); // entryIndex -> error
var handleByEntry = new int[entries.Count]; // 0 = no item created
try
{
for (int i = 0; i < entries.Count; i++)
{
var entry = entries[i];
try
{
var item = session.AddItem(entry.Tag);
item.Advise();
items.Add(item);
handleByEntry[i] = item.Handle;
}
catch (Exception ex)
{
addError[i] = ex.Message;
}
}
var pending = new HashSet<int>(items.Select(i => i.Handle));
var captured = new Dictionary<int, MxUpdate>();
var deadline = DateTime.UtcNow.AddSeconds(TimeoutSeconds);
while (pending.Count > 0)
{
var remaining = deadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero) break;
if (!session.WaitForUpdate(
u => u.Kind == MxUpdateKind.DataChange && pending.Contains(u.ItemHandle),
remaining,
out var update))
break;
captured[update.ItemHandle] = update;
pending.Remove(update.ItemHandle);
}
var results = new List<object>(entries.Count);
for (int i = 0; i < entries.Count; i++)
{
var entry = entries[i];
if (addError.TryGetValue(i, out var msg))
{
results.Add(new
{
tag = entry.Tag,
line = entry.LineNumber,
ok = false,
error = "add-item-failed",
reason = msg,
value = (object)null,
quality = (int?)null,
timestamp = (DateTime?)null,
statuses = Array.Empty<MxStatusInfo>(),
});
continue;
}
if (captured.TryGetValue(handleByEntry[i], out var u))
{
results.Add(new
{
tag = entry.Tag,
line = entry.LineNumber,
ok = u.IsOk,
value = u.Value,
quality = u.Quality,
timestamp = u.Timestamp,
statuses = u.Statuses,
});
}
else
{
results.Add(new
{
tag = entry.Tag,
line = entry.LineNumber,
ok = false,
error = "timeout",
value = (object)null,
quality = (int?)null,
timestamp = (DateTime?)null,
statuses = Array.Empty<MxStatusInfo>(),
});
}
}
bool overallOk = results.Cast<dynamic>().All(r => (bool)r.ok);
if (LlmJson) Envelope.Write(console, query, overallOk, results);
else WriteHuman(console, results);
if (!overallOk) Environment.ExitCode = 1;
}
finally
{
foreach (var item in items) item.Dispose();
}
return default;
}
private static void WriteHuman(IConsole console, List<object> results)
{
foreach (dynamic r in results)
{
if (!(bool)r.ok)
{
var err = (string)(r.error ?? "bad-status");
console.Output.WriteLine($"[ERR] line {r.line} {r.tag}: {err}");
continue;
}
var ts = r.timestamp == null
? ""
: ((DateTime)r.timestamp).ToString("yyyy-MM-dd HH:mm:ss.fff");
console.Output.WriteLine($"[OK ] line {r.line} {r.tag} = {r.value} (q={r.quality}, t={ts})");
}
}
}
}
@@ -0,0 +1,158 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using CliFx;
using CliFx.Attributes;
using CliFx.Exceptions;
using CliFx.Infrastructure;
using MxAccess.Cli.Mx;
using MxAccess.Cli.Output;
using Newtonsoft.Json;
namespace MxAccess.Cli.Commands
{
[Command("subscribe-batch", Description =
"Subscribe to many tags from a JSONL input file (or stdin via '-') and stream " +
"OnDataChange events for a duration. Same streaming output as `subscribe`.")]
public sealed class SubscribeBatchCommand : ICommand
{
[CommandParameter(0, Name = "input",
Description = "Path to a JSONL file. Use '-' to read JSONL from stdin. " +
"Each line is either a bare tag string or {\"tag\":\"...\"}.")]
public string Input { get; init; }
[CommandOption("seconds", 's', Description = "How long to keep the subscription open, in seconds. Default 10.")]
public double Seconds { get; init; } = 10.0;
[CommandOption("max", Description = "Hard cap on emitted events. Default 1000.")]
public int Max { get; init; } = 1000;
[CommandOption("client", Description = "MxAccess client name. Default 'mxa'.")]
public string ClientName { get; init; } = "mxa";
[CommandOption("llm-json", Description = "Emit a JSON Lines stream of events (one JSON object per line) instead of human-readable lines.")]
public bool LlmJson { get; init; }
public ValueTask ExecuteAsync(IConsole console)
{
if (string.IsNullOrWhiteSpace(Input))
throw new CommandException("Input path is required (use '-' for stdin).", 2);
if (Seconds <= 0)
throw new CommandException("--seconds must be positive.", 2);
// Parse + validate the whole input before opening an LMX session.
var entries = new List<BatchReadEntry>();
var parseErrors = new List<string>();
try
{
foreach (var (lineNo, json) in JsonlInputReader.ReadLines(Input))
{
try { entries.Add(JsonlInputReader.ParseReadEntry(lineNo, json)); }
catch (InvalidBatchEntryException ex)
{
parseErrors.Add($"line {ex.LineNumber}: {ex.Message}");
}
}
}
catch (InvalidBatchInputException ex)
{
console.Error.WriteLine($"[ERR] {ex.Message}");
Environment.ExitCode = 2;
return default;
}
if (parseErrors.Count > 0)
{
foreach (var msg in parseErrors) console.Error.WriteLine("[ERR] " + msg);
Environment.ExitCode = 2;
return default;
}
if (entries.Count == 0)
{
console.Error.WriteLine("[ERR] empty-input");
Environment.ExitCode = 2;
return default;
}
var deadline = DateTime.UtcNow.AddSeconds(Seconds);
var emitted = 0;
using var session = new MxSession(ClientName);
var items = new List<MxItem>();
try
{
foreach (var e in entries)
{
try
{
var item = session.AddItem(e.Tag);
item.Advise();
items.Add(item);
}
catch (Exception ex)
{
// Match SubscribeCommand's tolerance: a bad reference
// shouldn't kill the whole stream. Report it inline.
console.Error.WriteLine($"[ERR] line {e.LineNumber} add-item {e.Tag}: {ex.Message}");
}
}
if (items.Count == 0)
{
console.Error.WriteLine("[ERR] no tags successfully subscribed");
Environment.ExitCode = 1;
return default;
}
if (!LlmJson)
console.Output.WriteLine($"[INFO] Subscribed to {items.Count} tag(s). Streaming for {Seconds:F1}s. Ctrl-C to stop early.");
while (DateTime.UtcNow < deadline && emitted < Max)
{
var remaining = deadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero) break;
if (!session.WaitForUpdate(
u => u.Kind == MxUpdateKind.DataChange,
remaining, out var u))
break;
EmitOne(console, u);
emitted++;
}
if (!LlmJson)
console.Output.WriteLine($"[INFO] {emitted} event(s) emitted; subscription closed.");
}
finally
{
foreach (var item in items) item.Dispose();
}
return default;
}
private void EmitOne(IConsole console, MxUpdate u)
{
if (LlmJson)
{
var obj = new
{
tag = u.ItemReference,
ok = u.IsOk,
value = u.Value,
quality = u.Quality,
timestamp = u.Timestamp,
statuses = u.Statuses,
};
console.Output.WriteLine(JsonConvert.SerializeObject(obj, Formatting.None));
}
else
{
var ts = u.Timestamp.HasValue ? u.Timestamp.Value.ToString("HH:mm:ss.fff") : "??:??:??.???";
var flag = u.IsOk ? "OK " : "ERR";
console.Output.WriteLine($"[{ts}] [{flag}] {u.ItemReference} = {u.Value} (q={u.Quality})");
}
}
}
}
@@ -0,0 +1,390 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CliFx;
using CliFx.Attributes;
using CliFx.Exceptions;
using CliFx.Infrastructure;
using MxAccess.Cli.Mx;
using MxAccess.Cli.Output;
namespace MxAccess.Cli.Commands
{
[Command("write-batch", Description =
"Write many tag/value pairs from a JSONL input file (or stdin via '-'). " +
"Pipelines per-item resolve and write across one MxSession so wall time is " +
"~max(resolve) + ~max(write_ack) instead of N × (resolve + write_ack).")]
public sealed class WriteBatchCommand : ICommand
{
[CommandParameter(0, Name = "input",
Description = "Path to a JSONL file. Use '-' to read JSONL from stdin. " +
"Each line: {\"tag\":\"...\", \"value\":..., \"type\":\"int\"}.")]
public string Input { get; init; }
[CommandOption("timeout", 't', Description = "Per-phase timeout in seconds (one wall budget for resolve, one for write_ack). Default 5.")]
public double TimeoutSeconds { get; init; } = 5.0;
[CommandOption("user-id", Description = "Pre-resolved authenticated user id (0 = unauthenticated). See `mxa write` for how to obtain.")]
public int UserId { get; init; }
[CommandOption("username", 'u', Description = "Galaxy / OS username. Combined with --domain as '<domain>\\<username>' before AuthenticateUser.")]
public string Username { get; init; }
[CommandOption("domain", Description = "Domain or hostname for OS-authenticated galaxies.")]
public string Domain { get; init; }
[CommandOption("password", 'p', Description = "Password for --username. Redacted in the JSON query echo.")]
public string Password { get; init; }
[CommandOption("client", Description = "MxAccess client name. Default 'mxa'.")]
public string ClientName { get; init; } = "mxa";
[CommandOption("secured", Description = "Route writes through WriteSecured(currentUserId, verifierUserId, value) instead of Write(value, userId). Required for Secured Write / Verified Write classifications.")]
public bool Secured { get; init; }
[CommandOption("verifier-username", Description = "Galaxy / OS username of the verifier for a two-person Verified Write. Implies --secured.")]
public string VerifierUsername { get; init; }
[CommandOption("verifier-domain", Description = "Domain or hostname for the verifier OS user.")]
public string VerifierDomain { get; init; }
[CommandOption("verifier-password", Description = "Password for the verifier user. Redacted in the JSON query echo.")]
public string VerifierPassword { get; init; }
[CommandOption("llm-json", Description = "Emit the JSON envelope instead of human-readable status.")]
public bool LlmJson { get; init; }
// Per-entry state carried across Phase A (resolve) and Phase B (write).
private sealed class EntryState
{
public BatchWriteEntry Entry;
public MxItem Item; // null if AddItem failed
public string AddItemError;
public MxUpdate ResolveUpdate; // null if no DataChange arrived
public string ResolveError; // "timeout-resolving-type" | "type-resolution-failed" | null
public object CoercedValue;
public string CoerceError; // raised by ValueCoercion.CoerceJToken
public MxUpdate AckUpdate; // null if no WriteComplete arrived
public string AckError; // "timeout" | "write-failed" | null
}
public ValueTask ExecuteAsync(IConsole console)
{
if (string.IsNullOrWhiteSpace(Input))
throw new CommandException("Input path is required (use '-' for stdin).", 2);
if (TimeoutSeconds <= 0)
throw new CommandException("--timeout must be positive.", 2);
// Compose the verify-user strings up front so the query echo
// reflects exactly what we'll send to AuthenticateUser.
string verifyUser = null;
if (!string.IsNullOrEmpty(Username))
verifyUser = string.IsNullOrEmpty(Domain) ? Username : $@"{Domain}\{Username}";
string verifierVerifyUser = null;
if (!string.IsNullOrEmpty(VerifierUsername))
verifierVerifyUser = string.IsNullOrEmpty(VerifierDomain)
? VerifierUsername
: $@"{VerifierDomain}\{VerifierUsername}";
bool useSecured = Secured || verifierVerifyUser != null;
// Parse + validate the whole input before opening an LMX session.
// Aborting on parse errors avoids registering a session we'll
// immediately tear down, and gives the user every malformed line
// in one report.
var entries = new List<BatchWriteEntry>();
var parseErrors = new List<object>();
try
{
foreach (var (lineNo, json) in JsonlInputReader.ReadLines(Input))
{
try
{
entries.Add(JsonlInputReader.ParseWriteEntry(lineNo, json));
}
catch (InvalidBatchEntryException ex)
{
parseErrors.Add(new
{
tag = (string)null,
line = ex.LineNumber,
ok = false,
error = "parse-error",
reason = ex.Message,
});
}
}
}
catch (InvalidBatchInputException ex)
{
EmitEnvelopeOrError(console, BuildQuery(verifyUser, verifierVerifyUser, useSecured, entryCount: 0),
ok: false, results: null, topError: ex.Message);
Environment.ExitCode = 2;
return default;
}
var query = BuildQuery(verifyUser, verifierVerifyUser, useSecured, entries.Count);
if (parseErrors.Count > 0)
{
EmitEnvelopeOrError(console, query, ok: false, parseErrors, topError: null);
Environment.ExitCode = 2;
return default;
}
if (entries.Count == 0)
{
EmitEnvelopeOrError(console, query, ok: false, results: null, topError: "empty-input");
Environment.ExitCode = 2;
return default;
}
using var session = new MxSession(ClientName);
// Auth resolution. Anything other than userId>0 here is a hard
// abort: writes downstream depend on a valid identity, and an
// empty results[] is the right shape for "no items attempted".
int effectiveUserId = UserId;
int verifierUserId = 0;
if (verifyUser != null)
{
effectiveUserId = session.Authenticate(verifyUser, Password ?? string.Empty);
if (effectiveUserId == 0)
{
EmitEnvelopeOrError(console, query, ok: false, results: null, topError: "authentication-failed");
Environment.ExitCode = 1;
return default;
}
}
if (verifierVerifyUser != null)
{
verifierUserId = session.Authenticate(verifierVerifyUser, VerifierPassword ?? string.Empty);
if (verifierUserId == 0)
{
EmitEnvelopeOrError(console, query, ok: false, results: null, topError: "verifier-authentication-failed");
Environment.ExitCode = 1;
return default;
}
}
else if (useSecured)
{
verifierUserId = effectiveUserId;
}
var states = new EntryState[entries.Count];
for (int i = 0; i < entries.Count; i++) states[i] = new EntryState { Entry = entries[i] };
try
{
// ─── Phase A: AddItem + Advise for everything, then drain ───────────
//
// Per WriteCommand.cs:178-179: --username uses Advise (operator
// action); anonymous uses AdviseSupervisory (supervisory action).
// Same heuristic here so the audit trail attribution matches
// the single-tag write.
bool useOperatorAdvise = verifyUser != null;
var pendingResolve = new HashSet<int>();
foreach (var s in states)
{
try
{
s.Item = session.AddItem(s.Entry.Tag);
if (useOperatorAdvise) s.Item.Advise();
else s.Item.AdviseSupervisory();
pendingResolve.Add(s.Item.Handle);
}
catch (Exception ex)
{
s.AddItemError = ex.Message;
}
}
var phaseADeadline = DateTime.UtcNow.AddSeconds(TimeoutSeconds);
var resolveByHandle = new Dictionary<int, MxUpdate>();
while (pendingResolve.Count > 0)
{
var remaining = phaseADeadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero) break;
if (!session.WaitForUpdate(
u => u.Kind == MxUpdateKind.DataChange && pendingResolve.Contains(u.ItemHandle),
remaining, out var u))
break;
resolveByHandle[u.ItemHandle] = u;
pendingResolve.Remove(u.ItemHandle);
}
// Annotate each state with its resolve outcome.
foreach (var s in states)
{
if (s.Item == null) continue;
if (resolveByHandle.TryGetValue(s.Item.Handle, out var u))
{
s.ResolveUpdate = u;
if (!u.IsOk) s.ResolveError = "type-resolution-failed";
}
else
{
s.ResolveError = "timeout-resolving-type";
}
}
// ─── Phase B: coerce values, fire writes, drain WriteComplete ───────
//
// Coerce up front so a bad value doesn't queue a Write the proxy
// would reject anyway. Track which handles we actually issued
// writes for — only those join the pendingAck set.
var pendingAck = new HashSet<int>();
foreach (var s in states)
{
if (s.Item == null || s.ResolveError != null) continue;
try
{
s.CoercedValue = ValueCoercion.CoerceJToken(s.Entry.RawValue, s.Entry.TypeHint, s.Entry.IsArray);
}
catch (Exception ex)
{
s.CoerceError = ex.Message;
continue;
}
try
{
if (useSecured) s.Item.WriteSecured(s.CoercedValue, effectiveUserId, verifierUserId);
else s.Item.Write(s.CoercedValue, effectiveUserId);
pendingAck.Add(s.Item.Handle);
}
catch (Exception ex)
{
s.AckError = "write-call-failed: " + ex.Message;
}
}
var phaseBDeadline = DateTime.UtcNow.AddSeconds(TimeoutSeconds);
var ackByHandle = new Dictionary<int, MxUpdate>();
while (pendingAck.Count > 0)
{
var remaining = phaseBDeadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero) break;
if (!session.WaitForUpdate(
u => u.Kind == MxUpdateKind.WriteComplete && pendingAck.Contains(u.ItemHandle),
remaining, out var u))
break;
ackByHandle[u.ItemHandle] = u;
pendingAck.Remove(u.ItemHandle);
}
foreach (var s in states)
{
if (s.Item == null || s.ResolveError != null || s.CoerceError != null || s.AckError != null) continue;
if (ackByHandle.TryGetValue(s.Item.Handle, out var u))
{
s.AckUpdate = u;
if (!u.IsOk) s.AckError = "write-failed";
}
else
{
s.AckError = "timeout";
}
}
// ─── Build results in input order ───────────────────────────────────
var results = new List<object>(states.Length);
bool anyFailed = false;
foreach (var s in states)
{
var (ok, error, statuses) = Classify(s);
if (!ok) anyFailed = true;
results.Add(new
{
tag = s.Entry.Tag,
line = s.Entry.LineNumber,
ok,
error,
authenticated = verifyUser != null,
auth_user_id = verifyUser != null ? (int?)effectiveUserId : null,
secured = useSecured,
verifier_user_id = useSecured ? (int?)verifierUserId : null,
statuses,
});
}
if (LlmJson) Envelope.Write(console, query, ok: !anyFailed, results);
else WriteHuman(console, results);
if (anyFailed) Environment.ExitCode = 1;
}
finally
{
foreach (var s in states)
if (s.Item != null) try { s.Item.Dispose(); } catch { }
}
return default;
}
private static (bool ok, string error, MxStatusInfo[] statuses) Classify(EntryState s)
{
if (s.AddItemError != null)
return (false, "add-item-failed: " + s.AddItemError, Array.Empty<MxStatusInfo>());
if (s.ResolveError != null)
return (false, s.ResolveError, s.ResolveUpdate?.Statuses ?? Array.Empty<MxStatusInfo>());
if (s.CoerceError != null)
return (false, "value-coerce-failed: " + s.CoerceError, Array.Empty<MxStatusInfo>());
if (s.AckError != null)
return (false, s.AckError, s.AckUpdate?.Statuses ?? Array.Empty<MxStatusInfo>());
return (true, null, s.AckUpdate?.Statuses ?? Array.Empty<MxStatusInfo>());
}
private object BuildQuery(string verifyUser, string verifierVerifyUser, bool useSecured, int entryCount) =>
new
{
command = "write-batch",
input = Input,
entries = entryCount,
timeout_s = TimeoutSeconds,
user_id = UserId,
verify_user = verifyUser,
verifier_verify_user = verifierVerifyUser,
secured = useSecured,
password = string.IsNullOrEmpty(Password) ? null : "***",
verifier_password = string.IsNullOrEmpty(VerifierPassword) ? null : "***",
client = ClientName,
};
// Single emit-point used for parse / auth / empty-input failures so
// the envelope shape stays consistent whether the failure is a top-
// level error or a results-array of per-line errors.
private void EmitEnvelopeOrError(IConsole console, object query, bool ok,
IEnumerable<object> results, string topError)
{
if (LlmJson)
{
if (results != null)
Envelope.Write(console, query, ok, results);
else
Envelope.WriteError(console, query, topError);
}
else if (topError != null)
{
console.Error.WriteLine($"[ERR] {topError}");
}
else if (results != null)
{
foreach (dynamic r in results)
{
var line = r.line;
var reason = (string)(r.reason ?? r.error);
console.Error.WriteLine($"[ERR] line {line}: {reason}");
}
}
}
private static void WriteHuman(IConsole console, List<object> results)
{
foreach (dynamic r in results)
{
if ((bool)r.ok)
console.Output.WriteLine($"[OK ] line {r.line} write {r.tag}");
else
console.Error.WriteLine($"[ERR] line {r.line} write {r.tag}: {r.error}");
}
}
}
}
@@ -0,0 +1,209 @@
using System;
using System.Collections.Generic;
using System.IO;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace MxAccess.Cli.Mx
{
/// JSONL parser shared by the batch commands. Lines are read lazily so
/// huge inputs don't allocate up front; per-line parsing is delegated to
/// `ParseReadEntry` / `ParseWriteEntry` so the read loop can defer parse
/// errors to a context where it can attribute them to a line number.
///
/// Line conventions (same across read / write / subscribe):
/// - Blank lines and lines starting with `#` are skipped (line numbers
/// still count them so error messages stay accurate against the file).
/// - Path `-` reads from stdin until EOF.
public static class JsonlInputReader
{
public static IEnumerable<(int LineNumber, string Json)> ReadLines(string pathOrDash)
{
if (string.IsNullOrEmpty(pathOrDash))
throw new InvalidBatchInputException("Input path must be non-empty (use '-' for stdin).");
TextReader reader = pathOrDash == "-"
? Console.In
: OpenFile(pathOrDash);
try
{
int lineNo = 0;
string raw;
while ((raw = reader.ReadLine()) != null)
{
lineNo++;
var trimmed = raw.Trim();
if (trimmed.Length == 0) continue;
if (trimmed[0] == '#') continue;
yield return (lineNo, trimmed);
}
}
finally
{
// Don't close Console.In — the host CLR owns it and closing it
// breaks anything that reads stdin after this command returns.
if (pathOrDash != "-") reader.Dispose();
}
}
private static TextReader OpenFile(string path)
{
try
{
return new StreamReader(path);
}
catch (FileNotFoundException)
{
throw new InvalidBatchInputException($"Input file not found: {path}");
}
catch (DirectoryNotFoundException)
{
throw new InvalidBatchInputException($"Input file not found: {path}");
}
catch (IOException ex)
{
throw new InvalidBatchInputException($"Could not open input file '{path}': {ex.Message}");
}
catch (UnauthorizedAccessException ex)
{
throw new InvalidBatchInputException($"Could not open input file '{path}': {ex.Message}");
}
}
/// Parse a read / subscribe entry. Accepts either a bare JSON string
/// (the tag reference) or a `{"tag":"..."}` object. Throws
/// `InvalidBatchEntryException` on malformed input — the caller is
/// expected to surface line number + reason in the result envelope.
public static BatchReadEntry ParseReadEntry(int lineNumber, string json)
{
JToken token;
try
{
token = JToken.Parse(json);
}
catch (JsonReaderException ex)
{
throw new InvalidBatchEntryException(lineNumber, $"invalid JSON: {ex.Message}");
}
string tag;
switch (token.Type)
{
case JTokenType.String:
tag = (string)token;
break;
case JTokenType.Object:
var tagTok = token["tag"];
if (tagTok == null || tagTok.Type != JTokenType.String)
throw new InvalidBatchEntryException(lineNumber,
"object entry must have a 'tag' string field");
tag = (string)tagTok;
break;
default:
throw new InvalidBatchEntryException(lineNumber,
"expected a tag string or an object with a 'tag' field");
}
if (string.IsNullOrWhiteSpace(tag))
throw new InvalidBatchEntryException(lineNumber, "tag must be non-empty");
return new BatchReadEntry { Tag = tag, LineNumber = lineNumber };
}
/// Parse a write entry. Required: `tag` (string), `value` (any JSON
/// type). Optional: `type` (string, mirrors `mxa write --type`).
/// If `value` is a JSON array the tag must end in `[]` (matches the
/// array-write rule in `WriteCommand`).
public static BatchWriteEntry ParseWriteEntry(int lineNumber, string json)
{
JToken token;
try
{
token = JToken.Parse(json);
}
catch (JsonReaderException ex)
{
throw new InvalidBatchEntryException(lineNumber, $"invalid JSON: {ex.Message}");
}
if (token.Type != JTokenType.Object)
throw new InvalidBatchEntryException(lineNumber,
"write entry must be a JSON object with at least 'tag' and 'value'");
var tagTok = token["tag"];
var valueTok = token["value"];
var typeTok = token["type"];
if (tagTok == null || tagTok.Type != JTokenType.String)
throw new InvalidBatchEntryException(lineNumber, "'tag' must be a non-empty string");
var tag = (string)tagTok;
if (string.IsNullOrWhiteSpace(tag))
throw new InvalidBatchEntryException(lineNumber, "'tag' must be a non-empty string");
if (valueTok == null)
throw new InvalidBatchEntryException(lineNumber, "'value' is required");
string typeHint = null;
if (typeTok != null)
{
if (typeTok.Type != JTokenType.String)
throw new InvalidBatchEntryException(lineNumber, "'type' must be a string when present");
typeHint = (string)typeTok;
}
bool tagIsArray = tag.EndsWith("[]", StringComparison.Ordinal);
bool valueIsArray = valueTok.Type == JTokenType.Array;
if (valueIsArray && !tagIsArray)
throw new InvalidBatchEntryException(lineNumber,
"array 'value' requires a tag ending in '[]' (whole-array write reference)");
if (!valueIsArray && tagIsArray)
throw new InvalidBatchEntryException(lineNumber,
"tag ends in '[]' (array write) but 'value' is not a JSON array");
return new BatchWriteEntry
{
Tag = tag,
RawValue = valueTok,
TypeHint = typeHint,
IsArray = tagIsArray,
LineNumber = lineNumber,
};
}
}
public sealed class BatchReadEntry
{
public string Tag { get; init; }
public int LineNumber { get; init; }
}
public sealed class BatchWriteEntry
{
public string Tag { get; init; }
public JToken RawValue { get; init; }
public string TypeHint { get; init; }
public bool IsArray { get; init; }
public int LineNumber { get; init; }
}
/// Thrown for problems with the file itself (missing, unreadable, etc).
/// Distinct from `InvalidBatchEntryException` so the command can choose
/// to abort the whole batch on this category without trying to parse.
public sealed class InvalidBatchInputException : Exception
{
public InvalidBatchInputException(string message) : base(message) { }
}
/// Thrown for problems with a single line. Always carries the 1-based line
/// number so the command can echo it back in the envelope.
public sealed class InvalidBatchEntryException : Exception
{
public int LineNumber { get; }
public InvalidBatchEntryException(int lineNumber, string reason) : base(reason)
{
LineNumber = lineNumber;
}
}
}
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using Newtonsoft.Json.Linq;
namespace MxAccess.Cli.Mx
{
@@ -90,6 +91,41 @@ namespace MxAccess.Cli.Mx
}
}
/// JSON-token entry point used by `read-batch` / `write-batch`. Renders
/// each token to the string form the existing `Coerce` / `CoerceArray`
/// dispatch table accepts, then forwards — so the type vocabulary
/// stays defined in one place.
public static object CoerceJToken(JToken raw, string typeHint, bool isArray)
{
if (raw == null) throw new ArgumentNullException(nameof(raw));
if (isArray)
{
if (raw.Type != JTokenType.Array)
throw new ArgumentException("isArray=true but JSON token is not an array.");
var rendered = new List<string>();
foreach (var el in (JArray)raw) rendered.Add(RenderToken(el));
return CoerceArray(rendered, typeHint);
}
return Coerce(RenderToken(raw), typeHint);
}
private static string RenderToken(JToken t)
{
switch (t.Type)
{
case JTokenType.String: return (string)t;
case JTokenType.Boolean: return ((bool)t) ? "true" : "false";
case JTokenType.Integer: return ((long)t).ToString(CultureInfo.InvariantCulture);
case JTokenType.Float: return ((double)t).ToString("R", CultureInfo.InvariantCulture);
case JTokenType.Date: return ((DateTime)t).ToString("o", CultureInfo.InvariantCulture);
case JTokenType.Null:
throw new ArgumentException("JSON null is not a writable value.");
default:
throw new ArgumentException($"Cannot coerce JSON token of type {t.Type} to a write value.");
}
}
private static T[] Convert<T>(IReadOnlyList<string> raw, Func<string, T> parse)
{
var arr = new T[raw.Count];
@@ -0,0 +1,5 @@
"DevPlatform.CPULoad"
{"tag": "DevPlatform.SystemStartupTime"}
# blank line below is intentional — should be skipped along with this comment
"DevPlatform.CPULoad"
@@ -0,0 +1,2 @@
"DevPlatform.CPULoad"
"DevPlatform.SystemStartupTime"
@@ -0,0 +1,6 @@
# Mixed scalar writes against the dev galaxy. Permissive auth.
{"tag": "OtOpcUaParityTest_001.ConfigValue", "value": 12.5, "type": "float"}
{"tag": "OtOpcUaParityTest_001.TuneValue", "value": 7.25, "type": "float"}
{"tag": "TestMachine_001.ProtectedValue", "value": true}
{"tag": "TestMachine_001.ProtectedValue1", "value": false}
{"tag": "MESReceiver_001.MoveInPartNumbers[1]", "value": "PN-42"}