diff --git a/mxaccesscli/README.md b/mxaccesscli/README.md index 77fc09b..26164ef 100644 --- a/mxaccesscli/README.md +++ b/mxaccesscli/README.md @@ -64,6 +64,15 @@ dotnet run --project src/MxAccess.Cli/MxAccess.Cli.csproj -- write TestMachine_0 # Subscribe to a tag for 30 seconds, JSON Lines for streaming: dotnet run --project src/MxAccess.Cli/MxAccess.Cli.csproj -- subscribe TestMachine_001.Speed -s 30 --llm-json + +# Batch read from a JSONL file (one tag string or {"tag":"..."} per line): +mxa read-batch tags.jsonl --llm-json + +# Batch write {tag,value,type?} pairs piped from stdin: +echo '{"tag":"TM.Setpoint","value":42.5,"type":"double"}' | mxa write-batch - --llm-json + +# Batch subscribe from a JSONL file, streaming for 30s: +mxa subscribe-batch tags.jsonl -s 30 --llm-json ``` The built executable is `bin\x86\Release\net48\mxa.exe`. Drop on `PATH` and use `mxa read ...`. diff --git a/mxaccesscli/docs/usage.md b/mxaccesscli/docs/usage.md index 8d603c4..237af63 100644 --- a/mxaccesscli/docs/usage.md +++ b/mxaccesscli/docs/usage.md @@ -11,6 +11,42 @@ Read, write, and subscribe to AVEVA System Platform tags via MxAccess. The CLI r - **Subsequent events are fast.** Once a tag is bound, value-change updates propagate within ~100 ms. - **Exit codes:** `0` on success, `1` if any operation timed out or returned a non-Ok / non-Pending `MxStatusCategory`, `2` on argument-validation errors. +## Batch input format + +The three `-batch` commands (`read-batch`, `write-batch`, `subscribe-batch`) all +take a single positional `` argument which is either a path to a JSONL +file or `-` to read JSONL from stdin. The line conventions are shared: + +- **Blank lines and lines starting with `#`** are skipped. Line numbers still + count them, so error messages stay accurate against the source file. +- **Per-line auth override is not supported in v1.** All entries in a + `write-batch` use the same `--username` / `--password` / `--user-id` / + `--secured` / `--verifier-*` flags supplied on the command line. +- **Duplicate tags are allowed.** Each input line is an independent entry + with its own item handle and its own row in `results[]`. + +Read / subscribe lines accept **either** a bare JSON string or an object form: + +```jsonl +"TestMachine_001.Speed" +{"tag": "Reactor1.Level"} +``` + +Write lines are object form only: + +```jsonl +{"tag": "TM.Setpoint", "value": 42.5, "type": "double"} +{"tag": "TM.RunFlag", "value": true} +{"tag": "TM.Parts[]", "value": ["", "11111", ""], "type": "string"} +``` + +`value` may be a JSON `bool`, `number`, `string`, or `array`. A JSON array +requires the tag to end in `[]` (whole-array reference, matches the `mxa write` +rule). `type` is optional; when omitted the JSON token's native type drives the +default. When set, `type` accepts the same vocabulary as `mxa write --type` +(`bool`, `byte`, `short`, `int`, `long`, `float`, `double`, `string`, +`datetime`). + ## `mxa info` Print the loaded `ArchestrA.MxAccess` assembly identity, supported `--type` values, and the full `MxStatusCategory` enum. No tag access. @@ -193,6 +229,188 @@ LLM-JSON output (one event per line, no surrounding `[ ... ]`): JSON Lines lets a downstream consumer parse events incrementally rather than buffering the whole stream — the right shape for indefinite or long-running subscriptions. +## `mxa read-batch ` + +Reads many tags from a JSONL file (or `-` for stdin) in **one** MxSession, +amortizing the 3–8 s LMX bind cost across the whole batch. Line shape is +defined in [Batch input format](#batch-input-format). Result rows arrive in +input-line order regardless of which `OnDataChange` fires first. + +| Option | Default | Notes | +| --- | --- | --- | +| `-t`, `--timeout ` | `5` | Wall budget for the whole batch (not per-tag). Tags that haven't delivered a `DataChange` by the deadline are reported with `error: "timeout"`. | +| `--client ` | `mxa` | Passed to `Register()`. | +| `--llm-json` | off | Emit the JSON envelope. | + +Example: + +```powershell +mxa read-batch tags.jsonl --llm-json +echo '"TM.Speed"' | mxa read-batch - --llm-json +``` + +LLM-JSON envelope (one results row per non-blank input line, carrying the +1-based `line` for traceability): + +```json +{ + "query": { "command": "read-batch", "input": "tags.jsonl", "timeout_s": 5.0, "client": "mxa" }, + "ok": true, + "results": [ + { "tag": "TM.Speed", "line": 1, "ok": true, "value": 1234.5, "quality": 192, "timestamp": "...", "statuses": [...] }, + { "tag": "Reactor1.Level","line": 2, "ok": true, "value": 78.1, "quality": 192, "timestamp": "...", "statuses": [...] } + ] +} +``` + +Exit code: `0` if every entry resolved Ok; `1` if any timed out or returned a +non-Ok status; `2` for parse errors, missing input, or empty input. + +## `mxa write-batch ` + +Writes many tag/value pairs from a JSONL file (or `-`) in **one** MxSession, +**pipelining** per-item resolve and write so wall time is roughly +`max(resolve_latency) + max(write_ack)` instead of `N × (resolve_latency + +write_ack)`. Auth is resolved once before any item is touched. + +Line shape: see [Batch input format](#batch-input-format). Auth flags follow +the same semantics as `mxa write` — see [Authentication](#authentication) for +how `--username` / `--domain` / `--secured` / `--verifier-*` interact. Per-line +auth override is **out of scope for v1** (planned follow-up). + +| Option | Default | Notes | +| --- | --- | --- | +| `-t`, `--timeout ` | `5` | Per-phase wall budget. Phase A (resolve types) and Phase B (await `OnWriteComplete`) each get this many seconds. | +| `--user-id ` | `0` | Pre-resolved authenticated user id. See `mxa write` for caveats. | +| `-u`, `--username ` | (none) | Galaxy / OS username. Resolved to a userId via `AuthenticateUser` once before Phase A. | +| `--domain ` | (none) | Combined with `--username` as `\`. | +| `-p`, `--password ` | (none) | Password for `--username`. Redacted in the LLM-JSON `query` echo. | +| `--secured` | off | Route writes through `WriteSecured(currentUserId, verifierUserId, value)`. Required for Secured Write / Verified Write classifications. | +| `--verifier-username` / `--verifier-domain` / `--verifier-password` | (none) | Two-person verified write; implies `--secured`. | +| `--client ` | `mxa` | Passed to `Register()`. | +| `--llm-json` | off | Emit the JSON envelope. | + +Example: + +```powershell +# Set up Hi/HiHi/Lo/LoLo limits and enable alarms for a tag in one shot: +mxa write-batch limits.jsonl --llm-json +``` + +```jsonl +{"tag":"BCD_T1.HighLimit", "value": 80, "type": "float"} +{"tag":"BCD_T1.HighHighLimit", "value": 95, "type": "float"} +{"tag":"BCD_T1.LowLimit", "value": 20, "type": "float"} +{"tag":"BCD_T1.LowLowLimit", "value": 5, "type": "float"} +{"tag":"BCD_T1.HighAlarm.Alarmed", "value": true} +{"tag":"BCD_T1.HighHighAlarm.Alarmed", "value": true} +{"tag":"BCD_T1.LowAlarm.Alarmed", "value": true} +{"tag":"BCD_T1.LowLowAlarm.Alarmed", "value": true} +``` + +LLM-JSON envelope (per-entry row with auth attribution and `line` echo): + +```json +{ + "query": { "command": "write-batch", "input": "limits.jsonl", "entries": 8, + "timeout_s": 5.0, "user_id": 0, "verify_user": null, "secured": false, "client": "mxa" }, + "ok": true, + "results": [ + { "tag": "BCD_T1.HighLimit", "line": 1, "ok": true, "error": null, + "authenticated": false, "auth_user_id": null, "secured": false, + "verifier_user_id": null, "statuses": [...] } + ] +} +``` + +### Failure modes — `write-batch` + +| `error` string | Cause | Exit | +| --- | --- | --- | +| (top-level) `parse-error` | One or more lines failed JSONL / schema validation; `results[]` lists each. | 2 | +| (top-level) `empty-input` | File / stdin contained no non-blank entries. | 2 | +| (top-level) `authentication-failed` | `AuthenticateUser` returned 0 for the operator credentials. **No items attempted.** | 1 | +| (top-level) `verifier-authentication-failed` | Same for the verifier in two-person Verified Write. | 1 | +| per-entry `add-item-failed: …` | `LMXProxyServer.AddItem` threw (typically a malformed reference). | 1 | +| per-entry `timeout-resolving-type` | No `OnDataChange` arrived for this item before `--timeout` elapsed. | 1 | +| per-entry `type-resolution-failed` | First `OnDataChange` carried a non-Ok status — `statuses` filled. | 1 | +| per-entry `value-coerce-failed: …` | JSON value couldn't be converted to the configured `--type` (or inferred type). The Write was **not** queued. | 1 | +| per-entry `write-call-failed: …` | The `Write` / `WriteSecured` COM call itself threw before `OnWriteComplete`. | 1 | +| per-entry `timeout` | No `OnWriteComplete` arrived before `--timeout` elapsed. | 1 | +| per-entry `write-failed` | `OnWriteComplete` arrived with non-Ok statuses (e.g. read-only attr, security denied). | 1 | + +The process exits `1` if any per-entry row failed, `0` if every row was Ok. +The envelope's top-level `ok` matches. + +## `mxa subscribe-batch ` + +Subscribes to many tags from a JSONL file (or `-`) and streams `OnDataChange` +events for a duration. The streaming output is identical to `mxa subscribe` — +no envelope wrap; one event per line when `--llm-json` is set. + +Line shape: see [Batch input format](#batch-input-format). + +| Option | Default | Notes | +| --- | --- | --- | +| `-s`, `--seconds ` | `10` | Wall-clock duration of the subscription. | +| `--max ` | `1000` | Hard cap on emitted events. | +| `--client ` | `mxa` | Passed to `Register()`. | +| `--llm-json` | off | JSON Lines mode — one event per line, no outer envelope. | + +First-event latency is still 3–8 s per tag, but `Advise()` is issued for every +tag up front so the binds run in parallel — wall time matches a single-tag +subscribe. For short `--seconds` windows (< 5 s) you may miss the initial +values; budget accordingly. + +Example: + +```powershell +mxa subscribe-batch tags.jsonl -s 30 --llm-json +``` + +Output stream (each event as one JSON line): + +```jsonl +{"tag":"TM.Speed","ok":true,"value":1234.5,"quality":192,"timestamp":"...","statuses":[...]} +{"tag":"Reactor1.Level","ok":true,"value":78.1,"quality":192,"timestamp":"...","statuses":[...]} +``` + +Exit code: `0` on a clean run, `1` if every tag failed to subscribe (no item +ever bound), `2` for parse errors / missing input / empty input. + +## Batch commands — verified live + +Captured on 2026-05-10 against the dev `ZB` galaxy (System Platform 2017 +Express, MxAccess `3.2.0.0`). The galaxy is configured permissively +(`eNone` / Free Access) so writes were issued anonymously. + +| Scenario | Result | +| --- | --- | +| `read-batch` 3 entries (bare-string + object + duplicate, with blank/`#` lines) | All three returned in input-line order; `line` field correctly reflected `1, 2, 5`. | +| `read-batch` continue-on-error (`NoSuchObject.NoSuchAttribute` mixed with `DevPlatform.CPULoad`) | Good entry returned value, bad entry returned `Category=MxCategoryConfigurationError, Detail=6`. Exit 1. | +| `write-batch` 5 entries (1 attribute with `security_classification=5` failed `SecurityError, Detail=1007`; rest wrote anonymously) | 4/5 wrote; per-item statuses preserved; exit 1. Round-trip read confirmed `TuneValue=7.25`, `ProtectedValue=true`, `ProtectedValue1=false`, `MoveInPartNumbers[1]="PN-42"`. | +| `write-batch` failure-mode coverage (`value-coerce-failed` + `type-resolution-failed` + ok, same `TuneValue` tag duplicated) | Each failure surfaced its distinct `error` string; duplicate tag entries are independent handles. | +| `subscribe-batch` two tags, 18-second window | Both tags bound in parallel; CPULoad streamed ~26 `OnDataChange` events; `SystemStartupTime` delivered its initial value then stayed quiet (constant). Exit 0. | +| Parse error on stdin (`{ malformed json }`) | `results[]` row with `error: "parse-error"`, line + reason, no LMX session opened. Exit 2. | +| Missing input file | Top-level `error` envelope, exit 2. | + +### Pipelining timing + +Same four writes (`OtOpcUaParityTest_001.TuneValue`, +`TestMachine_001.ProtectedValue`, `TestMachine_001.ProtectedValue1`, +`MESReceiver_001.MoveInPartNumbers[1]`) executed two ways, wall-clock: + +| Method | Wall time | Notes | +| --- | --- | --- | +| 4 × `mxa write` (sequential subprocess invocations) | **38.2 s** | Each invocation pays the 3–8 s LMX bind once. | +| 1 × `mxa write-batch` (4 writes in one MxSession, two-phase pipeline) | **10.3 s** | Includes a 5th entry that failed `SecurityError` — still resolved in Phase A. | + +≈ **3.7× faster** at N=4. The cost model is roughly `~max(resolve_latency) ++ ~max(write_ack)` for the batched form versus `N × (resolve_latency + +write_ack)` for the sequential form, so the speedup grows with N: at ~10 +writes the savings are typically 60–80 seconds, which is the scale of the +test-plan setup phases this command was added for. + ## Type support matrix Verified end-to-end against the live `ZB` galaxy (System Platform 2017 Express, MxAccess `3.2.0.0`). Each row records what the wire shape looks like in the JSON envelope. diff --git a/mxaccesscli/src/MxAccess.Cli/Commands/ReadBatchCommand.cs b/mxaccesscli/src/MxAccess.Cli/Commands/ReadBatchCommand.cs new file mode 100644 index 0000000..9ee774d --- /dev/null +++ b/mxaccesscli/src/MxAccess.Cli/Commands/ReadBatchCommand.cs @@ -0,0 +1,227 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using CliFx; +using CliFx.Attributes; +using CliFx.Exceptions; +using CliFx.Infrastructure; +using MxAccess.Cli.Mx; +using MxAccess.Cli.Output; + +namespace MxAccess.Cli.Commands +{ + [Command("read-batch", Description = + "Read many tags from a JSONL input file (or stdin via '-'). One MxSession " + + "amortizes the LMX bind cost across the whole batch.")] + public sealed class ReadBatchCommand : ICommand + { + [CommandParameter(0, Name = "input", + Description = "Path to a JSONL file. Use '-' to read JSONL from stdin. " + + "Each line is either a bare tag string (e.g. \"TM.Speed\") or " + + "an object (e.g. {\"tag\":\"TM.Speed\"}). Blank and '#'-prefixed lines are skipped.")] + public string Input { get; init; } + + [CommandOption("timeout", 't', Description = "Per-tag timeout in seconds while waiting for the first value. Default 5.")] + public double TimeoutSeconds { get; init; } = 5.0; + + [CommandOption("client", Description = "MxAccess client name passed to Register(). Default 'mxa'.")] + public string ClientName { get; init; } = "mxa"; + + [CommandOption("llm-json", Description = "Emit the JSON envelope { query, ok, results } instead of human-readable lines.")] + public bool LlmJson { get; init; } + + public ValueTask ExecuteAsync(IConsole console) + { + if (string.IsNullOrWhiteSpace(Input)) + throw new CommandException("Input path is required (use '-' for stdin).", 2); + if (TimeoutSeconds <= 0) + throw new CommandException("--timeout must be positive.", 2); + + var query = new + { + command = "read-batch", + input = Input, + timeout_s = TimeoutSeconds, + client = ClientName, + }; + + // Phase 1: load + parse. Collect all parse errors so the user gets + // every malformed line in one report instead of fixing them one at + // a time. + var entries = new List(); + var parseErrors = new List(); + try + { + foreach (var (lineNo, json) in JsonlInputReader.ReadLines(Input)) + { + try + { + entries.Add(JsonlInputReader.ParseReadEntry(lineNo, json)); + } + catch (InvalidBatchEntryException ex) + { + parseErrors.Add(new + { + tag = (string)null, + line = ex.LineNumber, + ok = false, + error = "parse-error", + reason = ex.Message, + }); + } + } + } + catch (InvalidBatchInputException ex) + { + if (LlmJson) Envelope.WriteError(console, query, ex.Message); + else console.Error.WriteLine($"[ERR] {ex.Message}"); + Environment.ExitCode = 2; + return default; + } + + if (parseErrors.Count > 0) + { + if (LlmJson) Envelope.Write(console, query, ok: false, parseErrors); + else + { + foreach (dynamic e in parseErrors) + console.Error.WriteLine($"[ERR] line {e.line}: {e.reason}"); + } + Environment.ExitCode = 2; + return default; + } + + if (entries.Count == 0) + { + if (LlmJson) Envelope.WriteError(console, query, "empty-input"); + else console.Error.WriteLine("[ERR] no entries in input"); + Environment.ExitCode = 2; + return default; + } + + // Phase 2: AddItem + Advise for every entry inside one MxSession, + // then drain DataChange events keyed by handle. + using var session = new MxSession(ClientName); + var items = new List(); + // Track each entry's handle (or a per-entry add-item error) so the + // result row keeps its input ordering and line-number echo even + // when AddItem fails mid-batch. + var addError = new Dictionary(); // entryIndex -> error + var handleByEntry = new int[entries.Count]; // 0 = no item created + try + { + for (int i = 0; i < entries.Count; i++) + { + var entry = entries[i]; + try + { + var item = session.AddItem(entry.Tag); + item.Advise(); + items.Add(item); + handleByEntry[i] = item.Handle; + } + catch (Exception ex) + { + addError[i] = ex.Message; + } + } + + var pending = new HashSet(items.Select(i => i.Handle)); + var captured = new Dictionary(); + var deadline = DateTime.UtcNow.AddSeconds(TimeoutSeconds); + + while (pending.Count > 0) + { + var remaining = deadline - DateTime.UtcNow; + if (remaining <= TimeSpan.Zero) break; + + if (!session.WaitForUpdate( + u => u.Kind == MxUpdateKind.DataChange && pending.Contains(u.ItemHandle), + remaining, + out var update)) + break; + + captured[update.ItemHandle] = update; + pending.Remove(update.ItemHandle); + } + + var results = new List(entries.Count); + for (int i = 0; i < entries.Count; i++) + { + var entry = entries[i]; + if (addError.TryGetValue(i, out var msg)) + { + results.Add(new + { + tag = entry.Tag, + line = entry.LineNumber, + ok = false, + error = "add-item-failed", + reason = msg, + value = (object)null, + quality = (int?)null, + timestamp = (DateTime?)null, + statuses = Array.Empty(), + }); + continue; + } + if (captured.TryGetValue(handleByEntry[i], out var u)) + { + results.Add(new + { + tag = entry.Tag, + line = entry.LineNumber, + ok = u.IsOk, + value = u.Value, + quality = u.Quality, + timestamp = u.Timestamp, + statuses = u.Statuses, + }); + } + else + { + results.Add(new + { + tag = entry.Tag, + line = entry.LineNumber, + ok = false, + error = "timeout", + value = (object)null, + quality = (int?)null, + timestamp = (DateTime?)null, + statuses = Array.Empty(), + }); + } + } + + bool overallOk = results.Cast().All(r => (bool)r.ok); + if (LlmJson) Envelope.Write(console, query, overallOk, results); + else WriteHuman(console, results); + if (!overallOk) Environment.ExitCode = 1; + } + finally + { + foreach (var item in items) item.Dispose(); + } + return default; + } + + private static void WriteHuman(IConsole console, List results) + { + foreach (dynamic r in results) + { + if (!(bool)r.ok) + { + var err = (string)(r.error ?? "bad-status"); + console.Output.WriteLine($"[ERR] line {r.line} {r.tag}: {err}"); + continue; + } + var ts = r.timestamp == null + ? "" + : ((DateTime)r.timestamp).ToString("yyyy-MM-dd HH:mm:ss.fff"); + console.Output.WriteLine($"[OK ] line {r.line} {r.tag} = {r.value} (q={r.quality}, t={ts})"); + } + } + } +} diff --git a/mxaccesscli/src/MxAccess.Cli/Commands/SubscribeBatchCommand.cs b/mxaccesscli/src/MxAccess.Cli/Commands/SubscribeBatchCommand.cs new file mode 100644 index 0000000..14745d4 --- /dev/null +++ b/mxaccesscli/src/MxAccess.Cli/Commands/SubscribeBatchCommand.cs @@ -0,0 +1,158 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using CliFx; +using CliFx.Attributes; +using CliFx.Exceptions; +using CliFx.Infrastructure; +using MxAccess.Cli.Mx; +using MxAccess.Cli.Output; +using Newtonsoft.Json; + +namespace MxAccess.Cli.Commands +{ + [Command("subscribe-batch", Description = + "Subscribe to many tags from a JSONL input file (or stdin via '-') and stream " + + "OnDataChange events for a duration. Same streaming output as `subscribe`.")] + public sealed class SubscribeBatchCommand : ICommand + { + [CommandParameter(0, Name = "input", + Description = "Path to a JSONL file. Use '-' to read JSONL from stdin. " + + "Each line is either a bare tag string or {\"tag\":\"...\"}.")] + public string Input { get; init; } + + [CommandOption("seconds", 's', Description = "How long to keep the subscription open, in seconds. Default 10.")] + public double Seconds { get; init; } = 10.0; + + [CommandOption("max", Description = "Hard cap on emitted events. Default 1000.")] + public int Max { get; init; } = 1000; + + [CommandOption("client", Description = "MxAccess client name. Default 'mxa'.")] + public string ClientName { get; init; } = "mxa"; + + [CommandOption("llm-json", Description = "Emit a JSON Lines stream of events (one JSON object per line) instead of human-readable lines.")] + public bool LlmJson { get; init; } + + public ValueTask ExecuteAsync(IConsole console) + { + if (string.IsNullOrWhiteSpace(Input)) + throw new CommandException("Input path is required (use '-' for stdin).", 2); + if (Seconds <= 0) + throw new CommandException("--seconds must be positive.", 2); + + // Parse + validate the whole input before opening an LMX session. + var entries = new List(); + var parseErrors = new List(); + try + { + foreach (var (lineNo, json) in JsonlInputReader.ReadLines(Input)) + { + try { entries.Add(JsonlInputReader.ParseReadEntry(lineNo, json)); } + catch (InvalidBatchEntryException ex) + { + parseErrors.Add($"line {ex.LineNumber}: {ex.Message}"); + } + } + } + catch (InvalidBatchInputException ex) + { + console.Error.WriteLine($"[ERR] {ex.Message}"); + Environment.ExitCode = 2; + return default; + } + + if (parseErrors.Count > 0) + { + foreach (var msg in parseErrors) console.Error.WriteLine("[ERR] " + msg); + Environment.ExitCode = 2; + return default; + } + + if (entries.Count == 0) + { + console.Error.WriteLine("[ERR] empty-input"); + Environment.ExitCode = 2; + return default; + } + + var deadline = DateTime.UtcNow.AddSeconds(Seconds); + var emitted = 0; + + using var session = new MxSession(ClientName); + var items = new List(); + try + { + foreach (var e in entries) + { + try + { + var item = session.AddItem(e.Tag); + item.Advise(); + items.Add(item); + } + catch (Exception ex) + { + // Match SubscribeCommand's tolerance: a bad reference + // shouldn't kill the whole stream. Report it inline. + console.Error.WriteLine($"[ERR] line {e.LineNumber} add-item {e.Tag}: {ex.Message}"); + } + } + + if (items.Count == 0) + { + console.Error.WriteLine("[ERR] no tags successfully subscribed"); + Environment.ExitCode = 1; + return default; + } + + if (!LlmJson) + console.Output.WriteLine($"[INFO] Subscribed to {items.Count} tag(s). Streaming for {Seconds:F1}s. Ctrl-C to stop early."); + + while (DateTime.UtcNow < deadline && emitted < Max) + { + var remaining = deadline - DateTime.UtcNow; + if (remaining <= TimeSpan.Zero) break; + + if (!session.WaitForUpdate( + u => u.Kind == MxUpdateKind.DataChange, + remaining, out var u)) + break; + + EmitOne(console, u); + emitted++; + } + + if (!LlmJson) + console.Output.WriteLine($"[INFO] {emitted} event(s) emitted; subscription closed."); + } + finally + { + foreach (var item in items) item.Dispose(); + } + return default; + } + + private void EmitOne(IConsole console, MxUpdate u) + { + if (LlmJson) + { + var obj = new + { + tag = u.ItemReference, + ok = u.IsOk, + value = u.Value, + quality = u.Quality, + timestamp = u.Timestamp, + statuses = u.Statuses, + }; + console.Output.WriteLine(JsonConvert.SerializeObject(obj, Formatting.None)); + } + else + { + var ts = u.Timestamp.HasValue ? u.Timestamp.Value.ToString("HH:mm:ss.fff") : "??:??:??.???"; + var flag = u.IsOk ? "OK " : "ERR"; + console.Output.WriteLine($"[{ts}] [{flag}] {u.ItemReference} = {u.Value} (q={u.Quality})"); + } + } + } +} diff --git a/mxaccesscli/src/MxAccess.Cli/Commands/WriteBatchCommand.cs b/mxaccesscli/src/MxAccess.Cli/Commands/WriteBatchCommand.cs new file mode 100644 index 0000000..e83959a --- /dev/null +++ b/mxaccesscli/src/MxAccess.Cli/Commands/WriteBatchCommand.cs @@ -0,0 +1,390 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using CliFx; +using CliFx.Attributes; +using CliFx.Exceptions; +using CliFx.Infrastructure; +using MxAccess.Cli.Mx; +using MxAccess.Cli.Output; + +namespace MxAccess.Cli.Commands +{ + [Command("write-batch", Description = + "Write many tag/value pairs from a JSONL input file (or stdin via '-'). " + + "Pipelines per-item resolve and write across one MxSession so wall time is " + + "~max(resolve) + ~max(write_ack) instead of N × (resolve + write_ack).")] + public sealed class WriteBatchCommand : ICommand + { + [CommandParameter(0, Name = "input", + Description = "Path to a JSONL file. Use '-' to read JSONL from stdin. " + + "Each line: {\"tag\":\"...\", \"value\":..., \"type\":\"int\"}.")] + public string Input { get; init; } + + [CommandOption("timeout", 't', Description = "Per-phase timeout in seconds (one wall budget for resolve, one for write_ack). Default 5.")] + public double TimeoutSeconds { get; init; } = 5.0; + + [CommandOption("user-id", Description = "Pre-resolved authenticated user id (0 = unauthenticated). See `mxa write` for how to obtain.")] + public int UserId { get; init; } + + [CommandOption("username", 'u', Description = "Galaxy / OS username. Combined with --domain as '\\' before AuthenticateUser.")] + public string Username { get; init; } + + [CommandOption("domain", Description = "Domain or hostname for OS-authenticated galaxies.")] + public string Domain { get; init; } + + [CommandOption("password", 'p', Description = "Password for --username. Redacted in the JSON query echo.")] + public string Password { get; init; } + + [CommandOption("client", Description = "MxAccess client name. Default 'mxa'.")] + public string ClientName { get; init; } = "mxa"; + + [CommandOption("secured", Description = "Route writes through WriteSecured(currentUserId, verifierUserId, value) instead of Write(value, userId). Required for Secured Write / Verified Write classifications.")] + public bool Secured { get; init; } + + [CommandOption("verifier-username", Description = "Galaxy / OS username of the verifier for a two-person Verified Write. Implies --secured.")] + public string VerifierUsername { get; init; } + + [CommandOption("verifier-domain", Description = "Domain or hostname for the verifier OS user.")] + public string VerifierDomain { get; init; } + + [CommandOption("verifier-password", Description = "Password for the verifier user. Redacted in the JSON query echo.")] + public string VerifierPassword { get; init; } + + [CommandOption("llm-json", Description = "Emit the JSON envelope instead of human-readable status.")] + public bool LlmJson { get; init; } + + // Per-entry state carried across Phase A (resolve) and Phase B (write). + private sealed class EntryState + { + public BatchWriteEntry Entry; + public MxItem Item; // null if AddItem failed + public string AddItemError; + public MxUpdate ResolveUpdate; // null if no DataChange arrived + public string ResolveError; // "timeout-resolving-type" | "type-resolution-failed" | null + public object CoercedValue; + public string CoerceError; // raised by ValueCoercion.CoerceJToken + public MxUpdate AckUpdate; // null if no WriteComplete arrived + public string AckError; // "timeout" | "write-failed" | null + } + + public ValueTask ExecuteAsync(IConsole console) + { + if (string.IsNullOrWhiteSpace(Input)) + throw new CommandException("Input path is required (use '-' for stdin).", 2); + if (TimeoutSeconds <= 0) + throw new CommandException("--timeout must be positive.", 2); + + // Compose the verify-user strings up front so the query echo + // reflects exactly what we'll send to AuthenticateUser. + string verifyUser = null; + if (!string.IsNullOrEmpty(Username)) + verifyUser = string.IsNullOrEmpty(Domain) ? Username : $@"{Domain}\{Username}"; + string verifierVerifyUser = null; + if (!string.IsNullOrEmpty(VerifierUsername)) + verifierVerifyUser = string.IsNullOrEmpty(VerifierDomain) + ? VerifierUsername + : $@"{VerifierDomain}\{VerifierUsername}"; + bool useSecured = Secured || verifierVerifyUser != null; + + // Parse + validate the whole input before opening an LMX session. + // Aborting on parse errors avoids registering a session we'll + // immediately tear down, and gives the user every malformed line + // in one report. + var entries = new List(); + var parseErrors = new List(); + try + { + foreach (var (lineNo, json) in JsonlInputReader.ReadLines(Input)) + { + try + { + entries.Add(JsonlInputReader.ParseWriteEntry(lineNo, json)); + } + catch (InvalidBatchEntryException ex) + { + parseErrors.Add(new + { + tag = (string)null, + line = ex.LineNumber, + ok = false, + error = "parse-error", + reason = ex.Message, + }); + } + } + } + catch (InvalidBatchInputException ex) + { + EmitEnvelopeOrError(console, BuildQuery(verifyUser, verifierVerifyUser, useSecured, entryCount: 0), + ok: false, results: null, topError: ex.Message); + Environment.ExitCode = 2; + return default; + } + + var query = BuildQuery(verifyUser, verifierVerifyUser, useSecured, entries.Count); + + if (parseErrors.Count > 0) + { + EmitEnvelopeOrError(console, query, ok: false, parseErrors, topError: null); + Environment.ExitCode = 2; + return default; + } + + if (entries.Count == 0) + { + EmitEnvelopeOrError(console, query, ok: false, results: null, topError: "empty-input"); + Environment.ExitCode = 2; + return default; + } + + using var session = new MxSession(ClientName); + + // Auth resolution. Anything other than userId>0 here is a hard + // abort: writes downstream depend on a valid identity, and an + // empty results[] is the right shape for "no items attempted". + int effectiveUserId = UserId; + int verifierUserId = 0; + if (verifyUser != null) + { + effectiveUserId = session.Authenticate(verifyUser, Password ?? string.Empty); + if (effectiveUserId == 0) + { + EmitEnvelopeOrError(console, query, ok: false, results: null, topError: "authentication-failed"); + Environment.ExitCode = 1; + return default; + } + } + if (verifierVerifyUser != null) + { + verifierUserId = session.Authenticate(verifierVerifyUser, VerifierPassword ?? string.Empty); + if (verifierUserId == 0) + { + EmitEnvelopeOrError(console, query, ok: false, results: null, topError: "verifier-authentication-failed"); + Environment.ExitCode = 1; + return default; + } + } + else if (useSecured) + { + verifierUserId = effectiveUserId; + } + + var states = new EntryState[entries.Count]; + for (int i = 0; i < entries.Count; i++) states[i] = new EntryState { Entry = entries[i] }; + + try + { + // ─── Phase A: AddItem + Advise for everything, then drain ─────────── + // + // Per WriteCommand.cs:178-179: --username uses Advise (operator + // action); anonymous uses AdviseSupervisory (supervisory action). + // Same heuristic here so the audit trail attribution matches + // the single-tag write. + bool useOperatorAdvise = verifyUser != null; + var pendingResolve = new HashSet(); + foreach (var s in states) + { + try + { + s.Item = session.AddItem(s.Entry.Tag); + if (useOperatorAdvise) s.Item.Advise(); + else s.Item.AdviseSupervisory(); + pendingResolve.Add(s.Item.Handle); + } + catch (Exception ex) + { + s.AddItemError = ex.Message; + } + } + + var phaseADeadline = DateTime.UtcNow.AddSeconds(TimeoutSeconds); + var resolveByHandle = new Dictionary(); + while (pendingResolve.Count > 0) + { + var remaining = phaseADeadline - DateTime.UtcNow; + if (remaining <= TimeSpan.Zero) break; + if (!session.WaitForUpdate( + u => u.Kind == MxUpdateKind.DataChange && pendingResolve.Contains(u.ItemHandle), + remaining, out var u)) + break; + resolveByHandle[u.ItemHandle] = u; + pendingResolve.Remove(u.ItemHandle); + } + + // Annotate each state with its resolve outcome. + foreach (var s in states) + { + if (s.Item == null) continue; + if (resolveByHandle.TryGetValue(s.Item.Handle, out var u)) + { + s.ResolveUpdate = u; + if (!u.IsOk) s.ResolveError = "type-resolution-failed"; + } + else + { + s.ResolveError = "timeout-resolving-type"; + } + } + + // ─── Phase B: coerce values, fire writes, drain WriteComplete ─────── + // + // Coerce up front so a bad value doesn't queue a Write the proxy + // would reject anyway. Track which handles we actually issued + // writes for — only those join the pendingAck set. + var pendingAck = new HashSet(); + foreach (var s in states) + { + if (s.Item == null || s.ResolveError != null) continue; + try + { + s.CoercedValue = ValueCoercion.CoerceJToken(s.Entry.RawValue, s.Entry.TypeHint, s.Entry.IsArray); + } + catch (Exception ex) + { + s.CoerceError = ex.Message; + continue; + } + try + { + if (useSecured) s.Item.WriteSecured(s.CoercedValue, effectiveUserId, verifierUserId); + else s.Item.Write(s.CoercedValue, effectiveUserId); + pendingAck.Add(s.Item.Handle); + } + catch (Exception ex) + { + s.AckError = "write-call-failed: " + ex.Message; + } + } + + var phaseBDeadline = DateTime.UtcNow.AddSeconds(TimeoutSeconds); + var ackByHandle = new Dictionary(); + while (pendingAck.Count > 0) + { + var remaining = phaseBDeadline - DateTime.UtcNow; + if (remaining <= TimeSpan.Zero) break; + if (!session.WaitForUpdate( + u => u.Kind == MxUpdateKind.WriteComplete && pendingAck.Contains(u.ItemHandle), + remaining, out var u)) + break; + ackByHandle[u.ItemHandle] = u; + pendingAck.Remove(u.ItemHandle); + } + + foreach (var s in states) + { + if (s.Item == null || s.ResolveError != null || s.CoerceError != null || s.AckError != null) continue; + if (ackByHandle.TryGetValue(s.Item.Handle, out var u)) + { + s.AckUpdate = u; + if (!u.IsOk) s.AckError = "write-failed"; + } + else + { + s.AckError = "timeout"; + } + } + + // ─── Build results in input order ─────────────────────────────────── + var results = new List(states.Length); + bool anyFailed = false; + foreach (var s in states) + { + var (ok, error, statuses) = Classify(s); + if (!ok) anyFailed = true; + results.Add(new + { + tag = s.Entry.Tag, + line = s.Entry.LineNumber, + ok, + error, + authenticated = verifyUser != null, + auth_user_id = verifyUser != null ? (int?)effectiveUserId : null, + secured = useSecured, + verifier_user_id = useSecured ? (int?)verifierUserId : null, + statuses, + }); + } + + if (LlmJson) Envelope.Write(console, query, ok: !anyFailed, results); + else WriteHuman(console, results); + if (anyFailed) Environment.ExitCode = 1; + } + finally + { + foreach (var s in states) + if (s.Item != null) try { s.Item.Dispose(); } catch { } + } + return default; + } + + private static (bool ok, string error, MxStatusInfo[] statuses) Classify(EntryState s) + { + if (s.AddItemError != null) + return (false, "add-item-failed: " + s.AddItemError, Array.Empty()); + if (s.ResolveError != null) + return (false, s.ResolveError, s.ResolveUpdate?.Statuses ?? Array.Empty()); + if (s.CoerceError != null) + return (false, "value-coerce-failed: " + s.CoerceError, Array.Empty()); + if (s.AckError != null) + return (false, s.AckError, s.AckUpdate?.Statuses ?? Array.Empty()); + return (true, null, s.AckUpdate?.Statuses ?? Array.Empty()); + } + + private object BuildQuery(string verifyUser, string verifierVerifyUser, bool useSecured, int entryCount) => + new + { + command = "write-batch", + input = Input, + entries = entryCount, + timeout_s = TimeoutSeconds, + user_id = UserId, + verify_user = verifyUser, + verifier_verify_user = verifierVerifyUser, + secured = useSecured, + password = string.IsNullOrEmpty(Password) ? null : "***", + verifier_password = string.IsNullOrEmpty(VerifierPassword) ? null : "***", + client = ClientName, + }; + + // Single emit-point used for parse / auth / empty-input failures so + // the envelope shape stays consistent whether the failure is a top- + // level error or a results-array of per-line errors. + private void EmitEnvelopeOrError(IConsole console, object query, bool ok, + IEnumerable results, string topError) + { + if (LlmJson) + { + if (results != null) + Envelope.Write(console, query, ok, results); + else + Envelope.WriteError(console, query, topError); + } + else if (topError != null) + { + console.Error.WriteLine($"[ERR] {topError}"); + } + else if (results != null) + { + foreach (dynamic r in results) + { + var line = r.line; + var reason = (string)(r.reason ?? r.error); + console.Error.WriteLine($"[ERR] line {line}: {reason}"); + } + } + } + + private static void WriteHuman(IConsole console, List results) + { + foreach (dynamic r in results) + { + if ((bool)r.ok) + console.Output.WriteLine($"[OK ] line {r.line} write {r.tag}"); + else + console.Error.WriteLine($"[ERR] line {r.line} write {r.tag}: {r.error}"); + } + } + } +} diff --git a/mxaccesscli/src/MxAccess.Cli/Mx/JsonlInputReader.cs b/mxaccesscli/src/MxAccess.Cli/Mx/JsonlInputReader.cs new file mode 100644 index 0000000..53b67ce --- /dev/null +++ b/mxaccesscli/src/MxAccess.Cli/Mx/JsonlInputReader.cs @@ -0,0 +1,209 @@ +using System; +using System.Collections.Generic; +using System.IO; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace MxAccess.Cli.Mx +{ + /// JSONL parser shared by the batch commands. Lines are read lazily so + /// huge inputs don't allocate up front; per-line parsing is delegated to + /// `ParseReadEntry` / `ParseWriteEntry` so the read loop can defer parse + /// errors to a context where it can attribute them to a line number. + /// + /// Line conventions (same across read / write / subscribe): + /// - Blank lines and lines starting with `#` are skipped (line numbers + /// still count them so error messages stay accurate against the file). + /// - Path `-` reads from stdin until EOF. + public static class JsonlInputReader + { + public static IEnumerable<(int LineNumber, string Json)> ReadLines(string pathOrDash) + { + if (string.IsNullOrEmpty(pathOrDash)) + throw new InvalidBatchInputException("Input path must be non-empty (use '-' for stdin)."); + + TextReader reader = pathOrDash == "-" + ? Console.In + : OpenFile(pathOrDash); + + try + { + int lineNo = 0; + string raw; + while ((raw = reader.ReadLine()) != null) + { + lineNo++; + var trimmed = raw.Trim(); + if (trimmed.Length == 0) continue; + if (trimmed[0] == '#') continue; + yield return (lineNo, trimmed); + } + } + finally + { + // Don't close Console.In — the host CLR owns it and closing it + // breaks anything that reads stdin after this command returns. + if (pathOrDash != "-") reader.Dispose(); + } + } + + private static TextReader OpenFile(string path) + { + try + { + return new StreamReader(path); + } + catch (FileNotFoundException) + { + throw new InvalidBatchInputException($"Input file not found: {path}"); + } + catch (DirectoryNotFoundException) + { + throw new InvalidBatchInputException($"Input file not found: {path}"); + } + catch (IOException ex) + { + throw new InvalidBatchInputException($"Could not open input file '{path}': {ex.Message}"); + } + catch (UnauthorizedAccessException ex) + { + throw new InvalidBatchInputException($"Could not open input file '{path}': {ex.Message}"); + } + } + + /// Parse a read / subscribe entry. Accepts either a bare JSON string + /// (the tag reference) or a `{"tag":"..."}` object. Throws + /// `InvalidBatchEntryException` on malformed input — the caller is + /// expected to surface line number + reason in the result envelope. + public static BatchReadEntry ParseReadEntry(int lineNumber, string json) + { + JToken token; + try + { + token = JToken.Parse(json); + } + catch (JsonReaderException ex) + { + throw new InvalidBatchEntryException(lineNumber, $"invalid JSON: {ex.Message}"); + } + + string tag; + switch (token.Type) + { + case JTokenType.String: + tag = (string)token; + break; + case JTokenType.Object: + var tagTok = token["tag"]; + if (tagTok == null || tagTok.Type != JTokenType.String) + throw new InvalidBatchEntryException(lineNumber, + "object entry must have a 'tag' string field"); + tag = (string)tagTok; + break; + default: + throw new InvalidBatchEntryException(lineNumber, + "expected a tag string or an object with a 'tag' field"); + } + + if (string.IsNullOrWhiteSpace(tag)) + throw new InvalidBatchEntryException(lineNumber, "tag must be non-empty"); + + return new BatchReadEntry { Tag = tag, LineNumber = lineNumber }; + } + + /// Parse a write entry. Required: `tag` (string), `value` (any JSON + /// type). Optional: `type` (string, mirrors `mxa write --type`). + /// If `value` is a JSON array the tag must end in `[]` (matches the + /// array-write rule in `WriteCommand`). + public static BatchWriteEntry ParseWriteEntry(int lineNumber, string json) + { + JToken token; + try + { + token = JToken.Parse(json); + } + catch (JsonReaderException ex) + { + throw new InvalidBatchEntryException(lineNumber, $"invalid JSON: {ex.Message}"); + } + + if (token.Type != JTokenType.Object) + throw new InvalidBatchEntryException(lineNumber, + "write entry must be a JSON object with at least 'tag' and 'value'"); + + var tagTok = token["tag"]; + var valueTok = token["value"]; + var typeTok = token["type"]; + + if (tagTok == null || tagTok.Type != JTokenType.String) + throw new InvalidBatchEntryException(lineNumber, "'tag' must be a non-empty string"); + var tag = (string)tagTok; + if (string.IsNullOrWhiteSpace(tag)) + throw new InvalidBatchEntryException(lineNumber, "'tag' must be a non-empty string"); + + if (valueTok == null) + throw new InvalidBatchEntryException(lineNumber, "'value' is required"); + + string typeHint = null; + if (typeTok != null) + { + if (typeTok.Type != JTokenType.String) + throw new InvalidBatchEntryException(lineNumber, "'type' must be a string when present"); + typeHint = (string)typeTok; + } + + bool tagIsArray = tag.EndsWith("[]", StringComparison.Ordinal); + bool valueIsArray = valueTok.Type == JTokenType.Array; + + if (valueIsArray && !tagIsArray) + throw new InvalidBatchEntryException(lineNumber, + "array 'value' requires a tag ending in '[]' (whole-array write reference)"); + if (!valueIsArray && tagIsArray) + throw new InvalidBatchEntryException(lineNumber, + "tag ends in '[]' (array write) but 'value' is not a JSON array"); + + return new BatchWriteEntry + { + Tag = tag, + RawValue = valueTok, + TypeHint = typeHint, + IsArray = tagIsArray, + LineNumber = lineNumber, + }; + } + } + + public sealed class BatchReadEntry + { + public string Tag { get; init; } + public int LineNumber { get; init; } + } + + public sealed class BatchWriteEntry + { + public string Tag { get; init; } + public JToken RawValue { get; init; } + public string TypeHint { get; init; } + public bool IsArray { get; init; } + public int LineNumber { get; init; } + } + + /// Thrown for problems with the file itself (missing, unreadable, etc). + /// Distinct from `InvalidBatchEntryException` so the command can choose + /// to abort the whole batch on this category without trying to parse. + public sealed class InvalidBatchInputException : Exception + { + public InvalidBatchInputException(string message) : base(message) { } + } + + /// Thrown for problems with a single line. Always carries the 1-based line + /// number so the command can echo it back in the envelope. + public sealed class InvalidBatchEntryException : Exception + { + public int LineNumber { get; } + public InvalidBatchEntryException(int lineNumber, string reason) : base(reason) + { + LineNumber = lineNumber; + } + } +} diff --git a/mxaccesscli/src/MxAccess.Cli/Mx/ValueCoercion.cs b/mxaccesscli/src/MxAccess.Cli/Mx/ValueCoercion.cs index 4ddd3c0..6f1fc33 100644 --- a/mxaccesscli/src/MxAccess.Cli/Mx/ValueCoercion.cs +++ b/mxaccesscli/src/MxAccess.Cli/Mx/ValueCoercion.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Globalization; +using Newtonsoft.Json.Linq; namespace MxAccess.Cli.Mx { @@ -90,6 +91,41 @@ namespace MxAccess.Cli.Mx } } + /// JSON-token entry point used by `read-batch` / `write-batch`. Renders + /// each token to the string form the existing `Coerce` / `CoerceArray` + /// dispatch table accepts, then forwards — so the type vocabulary + /// stays defined in one place. + public static object CoerceJToken(JToken raw, string typeHint, bool isArray) + { + if (raw == null) throw new ArgumentNullException(nameof(raw)); + + if (isArray) + { + if (raw.Type != JTokenType.Array) + throw new ArgumentException("isArray=true but JSON token is not an array."); + var rendered = new List(); + foreach (var el in (JArray)raw) rendered.Add(RenderToken(el)); + return CoerceArray(rendered, typeHint); + } + return Coerce(RenderToken(raw), typeHint); + } + + private static string RenderToken(JToken t) + { + switch (t.Type) + { + case JTokenType.String: return (string)t; + case JTokenType.Boolean: return ((bool)t) ? "true" : "false"; + case JTokenType.Integer: return ((long)t).ToString(CultureInfo.InvariantCulture); + case JTokenType.Float: return ((double)t).ToString("R", CultureInfo.InvariantCulture); + case JTokenType.Date: return ((DateTime)t).ToString("o", CultureInfo.InvariantCulture); + case JTokenType.Null: + throw new ArgumentException("JSON null is not a writable value."); + default: + throw new ArgumentException($"Cannot coerce JSON token of type {t.Type} to a write value."); + } + } + private static T[] Convert(IReadOnlyList raw, Func parse) { var arr = new T[raw.Count]; diff --git a/mxaccesscli/test-fixtures/read-batch.jsonl b/mxaccesscli/test-fixtures/read-batch.jsonl new file mode 100644 index 0000000..d99c560 --- /dev/null +++ b/mxaccesscli/test-fixtures/read-batch.jsonl @@ -0,0 +1,5 @@ +"DevPlatform.CPULoad" +{"tag": "DevPlatform.SystemStartupTime"} +# blank line below is intentional — should be skipped along with this comment + +"DevPlatform.CPULoad" diff --git a/mxaccesscli/test-fixtures/subscribe-batch.jsonl b/mxaccesscli/test-fixtures/subscribe-batch.jsonl new file mode 100644 index 0000000..091b945 --- /dev/null +++ b/mxaccesscli/test-fixtures/subscribe-batch.jsonl @@ -0,0 +1,2 @@ +"DevPlatform.CPULoad" +"DevPlatform.SystemStartupTime" diff --git a/mxaccesscli/test-fixtures/write-batch.jsonl b/mxaccesscli/test-fixtures/write-batch.jsonl new file mode 100644 index 0000000..4974ba9 --- /dev/null +++ b/mxaccesscli/test-fixtures/write-batch.jsonl @@ -0,0 +1,6 @@ +# Mixed scalar writes against the dev galaxy. Permissive auth. +{"tag": "OtOpcUaParityTest_001.ConfigValue", "value": 12.5, "type": "float"} +{"tag": "OtOpcUaParityTest_001.TuneValue", "value": 7.25, "type": "float"} +{"tag": "TestMachine_001.ProtectedValue", "value": true} +{"tag": "TestMachine_001.ProtectedValue1", "value": false} +{"tag": "MESReceiver_001.MoveInPartNumbers[1]", "value": "PN-42"}