Go client: port bulk read/write SDK methods + CLI subcommands
Mirrors the .NET addition: HEAD's session.go had only the subscribe-style
bulks (AddItemBulk / AdviseItemBulk / RemoveItemBulk / UnAdviseItemBulk /
SubscribeBulk / UnsubscribeBulk). This commit ports the value-bulk SDK
surface and CLI subcommands from divergent branch commit f220908.
SDK (clients/go/mxgateway/session.go):
- WriteBulk(ctx, serverHandle int32, entries []*WriteBulkEntry)
- Write2Bulk(ctx, ..., entries []*Write2BulkEntry)
- WriteSecuredBulk(ctx, ..., entries []*WriteSecuredBulkEntry)
- WriteSecured2Bulk(ctx, ..., entries []*WriteSecured2BulkEntry)
- ReadBulk(ctx, serverHandle int32, tagAddresses []string, timeout time.Duration)
→ []*BulkReadResult
types.go gains public re-exports of the generated proto types
(WriteBulkCommand, WriteBulkEntry, Write2BulkCommand, Write2BulkEntry,
WriteSecuredBulkCommand, WriteSecuredBulkEntry, WriteSecured2BulkCommand,
WriteSecured2BulkEntry, ReadBulkCommand, BulkWriteReply, BulkWriteResult,
BulkReadReply, BulkReadResult) so external callers can construct entries
through the public `mxgateway` package without dipping into the internal
generated path.
CLI (clients/go/cmd/mxgw-go/main.go):
- read-bulk, write-bulk, write2-bulk, write-secured-bulk,
write-secured2-bulk routed through runWithIO. write families share a
runWriteBulkVariant helper that gates per-variant flags
(--current-user-id, --verifier-user-id, --timestamp) so the
Client.Go-015 flag-gating contract is preserved.
- bench-read-bulk: percentile + timing helpers; JSON output schema
identical to the .NET / Rust / Python / Java benches.
parseInt32List was changed from panic-on-error to ([]int32, error) so
the new write-bulk commands surface parse errors gracefully; the
existing runUnsubscribeBulk caller is updated accordingly.
Verification: go build ./... + go vet ./... + go test ./... all clean.
Manual smoke against live gateway on localhost:5120: open-session →
register → subscribe-bulk on 3 TestMachine_NNN.TestChangingInt tags
(all wasSuccessful=true) → read-bulk (all wasSuccessful=true /
wasCached=true) → write-bulk int32 100/200/300 (all wasSuccessful=true)
→ close-session SESSION_STATE_CLOSED.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
@@ -90,6 +91,18 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
|
|||||||
return runSubscribeBulk(ctx, args[1:], stdout, stderr)
|
return runSubscribeBulk(ctx, args[1:], stdout, stderr)
|
||||||
case "unsubscribe-bulk":
|
case "unsubscribe-bulk":
|
||||||
return runUnsubscribeBulk(ctx, args[1:], stdout, stderr)
|
return runUnsubscribeBulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "read-bulk":
|
||||||
|
return runReadBulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "write-bulk":
|
||||||
|
return runWriteBulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "write2-bulk":
|
||||||
|
return runWrite2Bulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "write-secured-bulk":
|
||||||
|
return runWriteSecuredBulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "write-secured2-bulk":
|
||||||
|
return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr)
|
||||||
|
case "bench-read-bulk":
|
||||||
|
return runBenchReadBulk(ctx, args[1:], stdout, stderr)
|
||||||
case "write":
|
case "write":
|
||||||
return runWrite(ctx, args[1:], stdout, stderr)
|
return runWrite(ctx, args[1:], stdout, stderr)
|
||||||
case "stream-events":
|
case "stream-events":
|
||||||
@@ -340,11 +353,363 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr
|
|||||||
}
|
}
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
|
handles, err := parseInt32List(*itemHandles)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
session := mxgateway.NewSessionForID(client, *sessionID)
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
||||||
results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), parseInt32List(*itemHandles))
|
results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), handles)
|
||||||
return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err)
|
return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
flags := flag.NewFlagSet("read-bulk", flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
sessionID := flags.String("session-id", "", "gateway session id")
|
||||||
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
||||||
|
items := flags.String("items", "", "comma-separated tag addresses")
|
||||||
|
timeoutMs := flags.Int("timeout-ms", 0, "per-tag snapshot timeout in milliseconds (0 = worker default)")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *sessionID == "" || *items == "" {
|
||||||
|
return errors.New("session-id and items are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
client, options, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
||||||
|
results, err := session.ReadBulk(ctx, int32(*serverHandle), parseStringList(*items), time.Duration(*timeoutMs)*time.Millisecond)
|
||||||
|
return writeReadBulkOutput(stdout, *jsonOutput, "read-bulk", options, results, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// runWriteBulkVariant shares the flag-parsing + entry-build skeleton across
|
||||||
|
// the four bulk-write families. withTimestamp adds a --timestamp-value flag;
|
||||||
|
// secured switches from --user-id to --current-user-id / --verifier-user-id.
|
||||||
|
func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool, secured bool) error {
|
||||||
|
flags := flag.NewFlagSet(command, flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
sessionID := flags.String("session-id", "", "gateway session id")
|
||||||
|
serverHandle := flags.Int("server-handle", 0, "MXAccess server handle")
|
||||||
|
itemHandles := flags.String("item-handles", "", "comma-separated item handles")
|
||||||
|
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
|
||||||
|
values := flags.String("values", "", "comma-separated values (one per item handle)")
|
||||||
|
userID := flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)")
|
||||||
|
currentUserID := flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)")
|
||||||
|
verifierUserID := flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)")
|
||||||
|
timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *sessionID == "" || *itemHandles == "" || *values == "" {
|
||||||
|
return errors.New("session-id, item-handles, and values are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
handles, err := parseInt32List(*itemHandles)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
valueTexts := parseStringList(*values)
|
||||||
|
if len(handles) != len(valueTexts) {
|
||||||
|
return fmt.Errorf("item-handles count (%d) does not match values count (%d)", len(handles), len(valueTexts))
|
||||||
|
}
|
||||||
|
|
||||||
|
parsedValues := make([]*mxgateway.MxValue, len(handles))
|
||||||
|
for i, text := range valueTexts {
|
||||||
|
v, err := parseValue(*valueType, text)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("entry %d: %w", i, err)
|
||||||
|
}
|
||||||
|
parsedValues[i] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
var tsValue *mxgateway.MxValue
|
||||||
|
if withTimestamp {
|
||||||
|
if *timestampValue == "" {
|
||||||
|
return errors.New("timestamp-value is required for write2/write-secured2 bulk variants")
|
||||||
|
}
|
||||||
|
parsed, err := parseRfc3339Timestamp(*timestampValue)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tsValue = parsed
|
||||||
|
}
|
||||||
|
|
||||||
|
client, options, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
session := mxgateway.NewSessionForID(client, *sessionID)
|
||||||
|
|
||||||
|
var results []*mxgateway.BulkWriteResult
|
||||||
|
switch command {
|
||||||
|
case "write-bulk":
|
||||||
|
entries := make([]*mxgateway.WriteBulkEntry, len(handles))
|
||||||
|
for i := range handles {
|
||||||
|
entries[i] = &mxgateway.WriteBulkEntry{ItemHandle: handles[i], Value: parsedValues[i], UserId: int32(*userID)}
|
||||||
|
}
|
||||||
|
results, err = session.WriteBulk(ctx, int32(*serverHandle), entries)
|
||||||
|
case "write2-bulk":
|
||||||
|
entries := make([]*mxgateway.Write2BulkEntry, len(handles))
|
||||||
|
for i := range handles {
|
||||||
|
entries[i] = &mxgateway.Write2BulkEntry{ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, UserId: int32(*userID)}
|
||||||
|
}
|
||||||
|
results, err = session.Write2Bulk(ctx, int32(*serverHandle), entries)
|
||||||
|
case "write-secured-bulk":
|
||||||
|
entries := make([]*mxgateway.WriteSecuredBulkEntry, len(handles))
|
||||||
|
for i := range handles {
|
||||||
|
entries[i] = &mxgateway.WriteSecuredBulkEntry{
|
||||||
|
ItemHandle: handles[i],
|
||||||
|
Value: parsedValues[i],
|
||||||
|
CurrentUserId: int32(*currentUserID),
|
||||||
|
VerifierUserId: int32(*verifierUserID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results, err = session.WriteSecuredBulk(ctx, int32(*serverHandle), entries)
|
||||||
|
case "write-secured2-bulk":
|
||||||
|
entries := make([]*mxgateway.WriteSecured2BulkEntry, len(handles))
|
||||||
|
for i := range handles {
|
||||||
|
entries[i] = &mxgateway.WriteSecured2BulkEntry{
|
||||||
|
ItemHandle: handles[i],
|
||||||
|
Value: parsedValues[i],
|
||||||
|
TimestampValue: tsValue,
|
||||||
|
CurrentUserId: int32(*currentUserID),
|
||||||
|
VerifierUserId: int32(*verifierUserID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results, err = session.WriteSecured2Bulk(ctx, int32(*serverHandle), entries)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported bulk write command %q", command)
|
||||||
|
}
|
||||||
|
_ = secured // currently only used for routing above; reserved for future per-variant validation
|
||||||
|
return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the
|
||||||
|
// MxValue protobuf representation used for the timestamped write families.
|
||||||
|
func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) {
|
||||||
|
t, err := time.Parse(time.RFC3339Nano, text)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid RFC 3339 timestamp %q: %w", text, err)
|
||||||
|
}
|
||||||
|
return mxgateway.TimestampValue(t), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// runBenchReadBulk drives the cross-language ReadBulk stress benchmark from Go:
|
||||||
|
// opens its own session, subscribes to bulk-size tags so the worker value cache
|
||||||
|
// populates from real OnDataChange events, runs ReadBulk in a tight loop for
|
||||||
|
// duration-seconds with per-call timing, and emits the shared JSON schema the
|
||||||
|
// scripts/bench-read-bulk.ps1 driver collates across all five clients.
|
||||||
|
func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
flags := flag.NewFlagSet("bench-read-bulk", flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
clientName := flags.String("client-name", "mxgw-go-bench", "session client name")
|
||||||
|
durationSeconds := flags.Int("duration-seconds", 30, "steady-state measurement window in seconds")
|
||||||
|
warmupSeconds := flags.Int("warmup-seconds", 3, "warm-up window before measurement, in seconds")
|
||||||
|
bulkSize := flags.Int("bulk-size", 6, "tags per ReadBulk call")
|
||||||
|
tagStart := flags.Int("tag-start", 1, "first machine number")
|
||||||
|
tagPrefix := flags.String("tag-prefix", "TestMachine_", "tag prefix (machine number appended as %03d)")
|
||||||
|
tagAttribute := flags.String("tag-attribute", "TestChangingInt", "attribute appended to each tag prefix")
|
||||||
|
timeoutMs := flags.Int("timeout-ms", 1500, "per-tag snapshot timeout in milliseconds")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *bulkSize < 1 {
|
||||||
|
return errors.New("bulk-size must be positive")
|
||||||
|
}
|
||||||
|
if *durationSeconds < 1 {
|
||||||
|
return errors.New("duration-seconds must be positive")
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := make([]string, *bulkSize)
|
||||||
|
for i := 0; i < *bulkSize; i++ {
|
||||||
|
tags[i] = fmt.Sprintf("%s%03d.%s", *tagPrefix, *tagStart+i, *tagAttribute)
|
||||||
|
}
|
||||||
|
|
||||||
|
client, options, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_, _ = session.Close(context.Background())
|
||||||
|
}()
|
||||||
|
|
||||||
|
serverHandle, err := session.Register(ctx, *clientName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
subscribeResults, err := session.SubscribeBulk(ctx, serverHandle, tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
itemHandles := make([]int32, 0, len(subscribeResults))
|
||||||
|
for _, result := range subscribeResults {
|
||||||
|
if result.GetWasSuccessful() {
|
||||||
|
itemHandles = append(itemHandles, result.GetItemHandle())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if len(itemHandles) > 0 {
|
||||||
|
_, _ = session.UnsubscribeBulk(context.Background(), serverHandle, itemHandles)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Warm-up: drive identical calls so any first-call JIT / connection-pool
|
||||||
|
// setup is amortised before the measurement window opens.
|
||||||
|
warmupDeadline := time.Now().Add(time.Duration(*warmupSeconds) * time.Second)
|
||||||
|
timeout := time.Duration(*timeoutMs) * time.Millisecond
|
||||||
|
for time.Now().Before(warmupDeadline) {
|
||||||
|
_, _ = session.ReadBulk(ctx, serverHandle, tags, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Steady state: per-call latency captured via time.Now() deltas.
|
||||||
|
latenciesMs := make([]float64, 0, 65536)
|
||||||
|
var totalReadResults int64
|
||||||
|
var cachedReadResults int64
|
||||||
|
var successfulCalls, failedCalls int
|
||||||
|
steadyStart := time.Now()
|
||||||
|
steadyDeadline := steadyStart.Add(time.Duration(*durationSeconds) * time.Second)
|
||||||
|
|
||||||
|
for time.Now().Before(steadyDeadline) {
|
||||||
|
callStart := time.Now()
|
||||||
|
results, err := session.ReadBulk(ctx, serverHandle, tags, timeout)
|
||||||
|
elapsed := time.Since(callStart)
|
||||||
|
latenciesMs = append(latenciesMs, float64(elapsed.Nanoseconds())/1e6)
|
||||||
|
if err != nil {
|
||||||
|
failedCalls++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
successfulCalls++
|
||||||
|
for _, r := range results {
|
||||||
|
totalReadResults++
|
||||||
|
if r.GetWasCached() {
|
||||||
|
cachedReadResults++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
steadyElapsed := time.Since(steadyStart)
|
||||||
|
totalCalls := successfulCalls + failedCalls
|
||||||
|
|
||||||
|
callsPerSecond := 0.0
|
||||||
|
if steadyElapsed.Seconds() > 0 {
|
||||||
|
callsPerSecond = float64(totalCalls) / steadyElapsed.Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
stats := map[string]any{
|
||||||
|
"language": "go",
|
||||||
|
"command": "bench-read-bulk",
|
||||||
|
"endpoint": options.Endpoint,
|
||||||
|
"clientName": *clientName,
|
||||||
|
"bulkSize": *bulkSize,
|
||||||
|
"durationSeconds": *durationSeconds,
|
||||||
|
"warmupSeconds": *warmupSeconds,
|
||||||
|
"durationMs": steadyElapsed.Milliseconds(),
|
||||||
|
"tags": tags,
|
||||||
|
"totalCalls": totalCalls,
|
||||||
|
"successfulCalls": successfulCalls,
|
||||||
|
"failedCalls": failedCalls,
|
||||||
|
"totalReadResults": totalReadResults,
|
||||||
|
"cachedReadResults": cachedReadResults,
|
||||||
|
"callsPerSecond": roundTo(callsPerSecond, 2),
|
||||||
|
"latencyMs": percentileSummary(latenciesMs),
|
||||||
|
}
|
||||||
|
if *jsonOutput {
|
||||||
|
return writeJSON(stdout, stats)
|
||||||
|
}
|
||||||
|
fmt.Fprintln(stdout, callsPerSecond)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// percentileSummary returns the same { p50, p95, p99, max, mean } shape every
|
||||||
|
// language bench emits, rounded to 3 decimal places so the PowerShell driver
|
||||||
|
// sees one schema across all five clients.
|
||||||
|
func percentileSummary(sample []float64) map[string]float64 {
|
||||||
|
if len(sample) == 0 {
|
||||||
|
return map[string]float64{"p50": 0, "p95": 0, "p99": 0, "max": 0, "mean": 0}
|
||||||
|
}
|
||||||
|
sorted := append([]float64(nil), sample...)
|
||||||
|
sort.Float64s(sorted)
|
||||||
|
mean := 0.0
|
||||||
|
maxValue := sorted[len(sorted)-1]
|
||||||
|
for _, v := range sample {
|
||||||
|
mean += v
|
||||||
|
}
|
||||||
|
mean /= float64(len(sample))
|
||||||
|
return map[string]float64{
|
||||||
|
"p50": roundTo(percentile(sorted, 0.50), 3),
|
||||||
|
"p95": roundTo(percentile(sorted, 0.95), 3),
|
||||||
|
"p99": roundTo(percentile(sorted, 0.99), 3),
|
||||||
|
"max": roundTo(maxValue, 3),
|
||||||
|
"mean": roundTo(mean, 3),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// percentile uses nearest-rank with linear interpolation; matches the .NET
|
||||||
|
// implementation so cross-language comparisons are apples-to-apples.
|
||||||
|
func percentile(sorted []float64, quantile float64) float64 {
|
||||||
|
if len(sorted) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if len(sorted) == 1 {
|
||||||
|
return sorted[0]
|
||||||
|
}
|
||||||
|
rank := quantile * float64(len(sorted)-1)
|
||||||
|
lower := int(rank)
|
||||||
|
upper := lower + 1
|
||||||
|
if upper >= len(sorted) {
|
||||||
|
return sorted[lower]
|
||||||
|
}
|
||||||
|
fraction := rank - float64(lower)
|
||||||
|
return sorted[lower] + (sorted[upper]-sorted[lower])*fraction
|
||||||
|
}
|
||||||
|
|
||||||
|
func roundTo(value float64, digits int) float64 {
|
||||||
|
shift := 1.0
|
||||||
|
for i := 0; i < digits; i++ {
|
||||||
|
shift *= 10
|
||||||
|
}
|
||||||
|
return float64(int64(value*shift+0.5)) / shift
|
||||||
|
}
|
||||||
|
|
||||||
func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
flags := flag.NewFlagSet("write", flag.ContinueOnError)
|
flags := flag.NewFlagSet("write", flag.ContinueOnError)
|
||||||
flags.SetOutput(stderr)
|
flags.SetOutput(stderr)
|
||||||
@@ -517,7 +882,7 @@ func parseStringList(value string) []string {
|
|||||||
return items
|
return items
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseInt32List(value string) []int32 {
|
func parseInt32List(value string) ([]int32, error) {
|
||||||
parts := strings.Split(value, ",")
|
parts := strings.Split(value, ",")
|
||||||
items := make([]int32, 0, len(parts))
|
items := make([]int32, 0, len(parts))
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
@@ -527,11 +892,11 @@ func parseInt32List(value string) []int32 {
|
|||||||
}
|
}
|
||||||
parsed, err := strconv.ParseInt(item, 10, 32)
|
parsed, err := strconv.ParseInt(item, 10, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
return nil, fmt.Errorf("invalid item handle %q: %w", item, err)
|
||||||
}
|
}
|
||||||
items = append(items, int32(parsed))
|
items = append(items, int32(parsed))
|
||||||
}
|
}
|
||||||
return items
|
return items, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func bindCommonFlags(flags *flag.FlagSet) *commonOptions {
|
func bindCommonFlags(flags *flag.FlagSet) *commonOptions {
|
||||||
@@ -650,6 +1015,36 @@ func writeBulkOutput(stdout io.Writer, jsonOutput bool, command string, options
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeWriteBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkWriteResult, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if jsonOutput {
|
||||||
|
return writeJSON(stdout, map[string]any{
|
||||||
|
"command": command,
|
||||||
|
"options": options,
|
||||||
|
"results": results,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fmt.Fprintln(stdout, len(results))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeReadBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkReadResult, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if jsonOutput {
|
||||||
|
return writeJSON(stdout, map[string]any{
|
||||||
|
"command": command,
|
||||||
|
"options": options,
|
||||||
|
"results": results,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fmt.Fprintln(stdout, len(results))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func writeJSON(writer io.Writer, value any) error {
|
func writeJSON(writer io.Writer, value any) error {
|
||||||
encoder := json.NewEncoder(writer)
|
encoder := json.NewEncoder(writer)
|
||||||
encoder.SetIndent("", " ")
|
encoder.SetIndent("", " ")
|
||||||
@@ -669,7 +1064,7 @@ type protojsonMessage interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func writeUsage(writer io.Writer) {
|
func writeUsage(writer io.Writer) {
|
||||||
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
|
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
|
||||||
}
|
}
|
||||||
|
|
||||||
// batchEOR is the end-of-result sentinel emitted to stdout after every command
|
// batchEOR is the end-of-result sentinel emitted to stdout after every command
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
@@ -387,6 +388,142 @@ func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemH
|
|||||||
return reply.GetUnsubscribeBulk().GetResults(), nil
|
return reply.GetUnsubscribeBulk().GetResults(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteBulk invokes MXAccess Write sequentially for each entry inside one gateway command.
|
||||||
|
// Per-entry failures appear as BulkWriteResult entries with WasSuccessful=false; the call
|
||||||
|
// never returns an error for per-entry MXAccess failures (it returns an error only for
|
||||||
|
// protocol-level failures or transport errors).
|
||||||
|
func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []*WriteBulkEntry) ([]*BulkWriteResult, error) {
|
||||||
|
if entries == nil {
|
||||||
|
return nil, errors.New("mxgateway: write bulk entries are required")
|
||||||
|
}
|
||||||
|
if err := ensureBulkSize("write bulk entries", len(entries)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||||
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK,
|
||||||
|
Payload: &pb.MxCommand_WriteBulk{
|
||||||
|
WriteBulk: &pb.WriteBulkCommand{
|
||||||
|
ServerHandle: serverHandle,
|
||||||
|
Entries: entries,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return reply.GetWriteBulk().GetResults(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write2Bulk invokes MXAccess Write2 (timestamped) for each entry inside one gateway command.
|
||||||
|
func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries []*Write2BulkEntry) ([]*BulkWriteResult, error) {
|
||||||
|
if entries == nil {
|
||||||
|
return nil, errors.New("mxgateway: write2 bulk entries are required")
|
||||||
|
}
|
||||||
|
if err := ensureBulkSize("write2 bulk entries", len(entries)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||||
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE2_BULK,
|
||||||
|
Payload: &pb.MxCommand_Write2Bulk{
|
||||||
|
Write2Bulk: &pb.Write2BulkCommand{
|
||||||
|
ServerHandle: serverHandle,
|
||||||
|
Entries: entries,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return reply.GetWrite2Bulk().GetResults(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteSecuredBulk invokes MXAccess WriteSecured for each entry. Credential-sensitive
|
||||||
|
// values must not be logged by callers; mirrors the single-item WriteSecured contract.
|
||||||
|
func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entries []*WriteSecuredBulkEntry) ([]*BulkWriteResult, error) {
|
||||||
|
if entries == nil {
|
||||||
|
return nil, errors.New("mxgateway: write-secured bulk entries are required")
|
||||||
|
}
|
||||||
|
if err := ensureBulkSize("write-secured bulk entries", len(entries)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||||
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED_BULK,
|
||||||
|
Payload: &pb.MxCommand_WriteSecuredBulk{
|
||||||
|
WriteSecuredBulk: &pb.WriteSecuredBulkCommand{
|
||||||
|
ServerHandle: serverHandle,
|
||||||
|
Entries: entries,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return reply.GetWriteSecuredBulk().GetResults(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteSecured2Bulk invokes MXAccess WriteSecured2 (timestamped) for each entry.
|
||||||
|
func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, entries []*WriteSecured2BulkEntry) ([]*BulkWriteResult, error) {
|
||||||
|
if entries == nil {
|
||||||
|
return nil, errors.New("mxgateway: write-secured2 bulk entries are required")
|
||||||
|
}
|
||||||
|
if err := ensureBulkSize("write-secured2 bulk entries", len(entries)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||||
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED2_BULK,
|
||||||
|
Payload: &pb.MxCommand_WriteSecured2Bulk{
|
||||||
|
WriteSecured2Bulk: &pb.WriteSecured2BulkCommand{
|
||||||
|
ServerHandle: serverHandle,
|
||||||
|
Entries: entries,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return reply.GetWriteSecured2Bulk().GetResults(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadBulk snapshots the current value of each requested tag.
|
||||||
|
//
|
||||||
|
// MXAccess COM has no synchronous Read; the worker satisfies this by returning the
|
||||||
|
// most recent cached OnDataChange value when the tag is already advised (WasCached=true),
|
||||||
|
// or by taking a full AddItem + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle
|
||||||
|
// otherwise. timeout bounds the wait per tag in the snapshot case; pass zero to use the
|
||||||
|
// worker default. Per-tag failures (timeout, invalid tag) appear as BulkReadResult entries
|
||||||
|
// with WasSuccessful=false; the call never returns an error for per-tag MXAccess failures.
|
||||||
|
func (s *Session) ReadBulk(ctx context.Context, serverHandle int32, tagAddresses []string, timeout time.Duration) ([]*BulkReadResult, error) {
|
||||||
|
if tagAddresses == nil {
|
||||||
|
return nil, errors.New("mxgateway: tag addresses are required")
|
||||||
|
}
|
||||||
|
if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var timeoutMs uint32
|
||||||
|
if timeout > 0 {
|
||||||
|
ms := timeout.Milliseconds()
|
||||||
|
if ms > int64(^uint32(0)) {
|
||||||
|
timeoutMs = ^uint32(0)
|
||||||
|
} else {
|
||||||
|
timeoutMs = uint32(ms)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||||
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK,
|
||||||
|
Payload: &pb.MxCommand_ReadBulk{
|
||||||
|
ReadBulk: &pb.ReadBulkCommand{
|
||||||
|
ServerHandle: serverHandle,
|
||||||
|
TagAddresses: tagAddresses,
|
||||||
|
TimeoutMs: timeoutMs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return reply.GetReadBulk().GetResults(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Write invokes MXAccess Write.
|
// Write invokes MXAccess Write.
|
||||||
func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error {
|
func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error {
|
||||||
_, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID)
|
_, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID)
|
||||||
|
|||||||
@@ -70,6 +70,32 @@ type (
|
|||||||
WriteCommand = pb.WriteCommand
|
WriteCommand = pb.WriteCommand
|
||||||
// Write2Command is the payload of an MXAccess Write2 command.
|
// Write2Command is the payload of an MXAccess Write2 command.
|
||||||
Write2Command = pb.Write2Command
|
Write2Command = pb.Write2Command
|
||||||
|
// WriteBulkCommand is the payload of a bulk Write command.
|
||||||
|
WriteBulkCommand = pb.WriteBulkCommand
|
||||||
|
// WriteBulkEntry is one entry inside a WriteBulkCommand.
|
||||||
|
WriteBulkEntry = pb.WriteBulkEntry
|
||||||
|
// Write2BulkCommand is the payload of a bulk Write2 (timestamped) command.
|
||||||
|
Write2BulkCommand = pb.Write2BulkCommand
|
||||||
|
// Write2BulkEntry is one entry inside a Write2BulkCommand.
|
||||||
|
Write2BulkEntry = pb.Write2BulkEntry
|
||||||
|
// WriteSecuredBulkCommand is the payload of a bulk WriteSecured command.
|
||||||
|
WriteSecuredBulkCommand = pb.WriteSecuredBulkCommand
|
||||||
|
// WriteSecuredBulkEntry is one entry inside a WriteSecuredBulkCommand.
|
||||||
|
WriteSecuredBulkEntry = pb.WriteSecuredBulkEntry
|
||||||
|
// WriteSecured2BulkCommand is the payload of a bulk WriteSecured2 (timestamped) command.
|
||||||
|
WriteSecured2BulkCommand = pb.WriteSecured2BulkCommand
|
||||||
|
// WriteSecured2BulkEntry is one entry inside a WriteSecured2BulkCommand.
|
||||||
|
WriteSecured2BulkEntry = pb.WriteSecured2BulkEntry
|
||||||
|
// ReadBulkCommand is the payload of a bulk Read snapshot command.
|
||||||
|
ReadBulkCommand = pb.ReadBulkCommand
|
||||||
|
// BulkWriteReply aggregates BulkWriteResult entries for a bulk write command.
|
||||||
|
BulkWriteReply = pb.BulkWriteReply
|
||||||
|
// BulkWriteResult is one entry in a bulk write reply list.
|
||||||
|
BulkWriteResult = pb.BulkWriteResult
|
||||||
|
// BulkReadReply aggregates BulkReadResult entries for a bulk read command.
|
||||||
|
BulkReadReply = pb.BulkReadReply
|
||||||
|
// BulkReadResult is one entry in a bulk read reply list.
|
||||||
|
BulkReadResult = pb.BulkReadResult
|
||||||
// RegisterReply carries the ServerHandle returned by Register.
|
// RegisterReply carries the ServerHandle returned by Register.
|
||||||
RegisterReply = pb.RegisterReply
|
RegisterReply = pb.RegisterReply
|
||||||
// AddItemReply carries the ItemHandle returned by AddItem.
|
// AddItemReply carries the ItemHandle returned by AddItem.
|
||||||
|
|||||||
Reference in New Issue
Block a user