diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index a07ac82..4f92a90 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -15,6 +15,7 @@ import ( "io" "os" "os/signal" + "sort" "strconv" "strings" "syscall" @@ -90,6 +91,18 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err return runSubscribeBulk(ctx, args[1:], stdout, stderr) case "unsubscribe-bulk": return runUnsubscribeBulk(ctx, args[1:], stdout, stderr) + case "read-bulk": + return runReadBulk(ctx, args[1:], stdout, stderr) + case "write-bulk": + return runWriteBulk(ctx, args[1:], stdout, stderr) + case "write2-bulk": + return runWrite2Bulk(ctx, args[1:], stdout, stderr) + case "write-secured-bulk": + return runWriteSecuredBulk(ctx, args[1:], stdout, stderr) + case "write-secured2-bulk": + return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr) + case "bench-read-bulk": + return runBenchReadBulk(ctx, args[1:], stdout, stderr) case "write": return runWrite(ctx, args[1:], stdout, stderr) case "stream-events": @@ -340,11 +353,363 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr } defer client.Close() + handles, err := parseInt32List(*itemHandles) + if err != nil { + return err + } + session := mxgateway.NewSessionForID(client, *sessionID) - results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), parseInt32List(*itemHandles)) + results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), handles) return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err) } +func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("read-bulk", flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + sessionID := flags.String("session-id", "", "gateway session id") + serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") + items := flags.String("items", "", "comma-separated tag addresses") + timeoutMs := flags.Int("timeout-ms", 0, "per-tag snapshot timeout in milliseconds (0 = worker default)") + + if err := flags.Parse(args); err != nil { + return err + } + if *sessionID == "" || *items == "" { + return errors.New("session-id and items are required") + } + + client, options, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + session := mxgateway.NewSessionForID(client, *sessionID) + results, err := session.ReadBulk(ctx, int32(*serverHandle), parseStringList(*items), time.Duration(*timeoutMs)*time.Millisecond) + return writeReadBulkOutput(stdout, *jsonOutput, "read-bulk", options, results, err) +} + +func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false, false) +} + +func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true, false) +} + +func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false, true) +} + +func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true, true) +} + +// runWriteBulkVariant shares the flag-parsing + entry-build skeleton across +// the four bulk-write families. withTimestamp adds a --timestamp-value flag; +// secured switches from --user-id to --current-user-id / --verifier-user-id. +func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool, secured bool) error { + flags := flag.NewFlagSet(command, flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + sessionID := flags.String("session-id", "", "gateway session id") + serverHandle := flags.Int("server-handle", 0, "MXAccess server handle") + itemHandles := flags.String("item-handles", "", "comma-separated item handles") + valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string") + values := flags.String("values", "", "comma-separated values (one per item handle)") + userID := flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)") + currentUserID := flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)") + verifierUserID := flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)") + timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)") + + if err := flags.Parse(args); err != nil { + return err + } + if *sessionID == "" || *itemHandles == "" || *values == "" { + return errors.New("session-id, item-handles, and values are required") + } + + handles, err := parseInt32List(*itemHandles) + if err != nil { + return err + } + valueTexts := parseStringList(*values) + if len(handles) != len(valueTexts) { + return fmt.Errorf("item-handles count (%d) does not match values count (%d)", len(handles), len(valueTexts)) + } + + parsedValues := make([]*mxgateway.MxValue, len(handles)) + for i, text := range valueTexts { + v, err := parseValue(*valueType, text) + if err != nil { + return fmt.Errorf("entry %d: %w", i, err) + } + parsedValues[i] = v + } + + var tsValue *mxgateway.MxValue + if withTimestamp { + if *timestampValue == "" { + return errors.New("timestamp-value is required for write2/write-secured2 bulk variants") + } + parsed, err := parseRfc3339Timestamp(*timestampValue) + if err != nil { + return err + } + tsValue = parsed + } + + client, options, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + session := mxgateway.NewSessionForID(client, *sessionID) + + var results []*mxgateway.BulkWriteResult + switch command { + case "write-bulk": + entries := make([]*mxgateway.WriteBulkEntry, len(handles)) + for i := range handles { + entries[i] = &mxgateway.WriteBulkEntry{ItemHandle: handles[i], Value: parsedValues[i], UserId: int32(*userID)} + } + results, err = session.WriteBulk(ctx, int32(*serverHandle), entries) + case "write2-bulk": + entries := make([]*mxgateway.Write2BulkEntry, len(handles)) + for i := range handles { + entries[i] = &mxgateway.Write2BulkEntry{ItemHandle: handles[i], Value: parsedValues[i], TimestampValue: tsValue, UserId: int32(*userID)} + } + results, err = session.Write2Bulk(ctx, int32(*serverHandle), entries) + case "write-secured-bulk": + entries := make([]*mxgateway.WriteSecuredBulkEntry, len(handles)) + for i := range handles { + entries[i] = &mxgateway.WriteSecuredBulkEntry{ + ItemHandle: handles[i], + Value: parsedValues[i], + CurrentUserId: int32(*currentUserID), + VerifierUserId: int32(*verifierUserID), + } + } + results, err = session.WriteSecuredBulk(ctx, int32(*serverHandle), entries) + case "write-secured2-bulk": + entries := make([]*mxgateway.WriteSecured2BulkEntry, len(handles)) + for i := range handles { + entries[i] = &mxgateway.WriteSecured2BulkEntry{ + ItemHandle: handles[i], + Value: parsedValues[i], + TimestampValue: tsValue, + CurrentUserId: int32(*currentUserID), + VerifierUserId: int32(*verifierUserID), + } + } + results, err = session.WriteSecured2Bulk(ctx, int32(*serverHandle), entries) + default: + return fmt.Errorf("unsupported bulk write command %q", command) + } + _ = secured // currently only used for routing above; reserved for future per-variant validation + return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err) +} + +// parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the +// MxValue protobuf representation used for the timestamped write families. +func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) { + t, err := time.Parse(time.RFC3339Nano, text) + if err != nil { + return nil, fmt.Errorf("invalid RFC 3339 timestamp %q: %w", text, err) + } + return mxgateway.TimestampValue(t), nil +} + +// runBenchReadBulk drives the cross-language ReadBulk stress benchmark from Go: +// opens its own session, subscribes to bulk-size tags so the worker value cache +// populates from real OnDataChange events, runs ReadBulk in a tight loop for +// duration-seconds with per-call timing, and emits the shared JSON schema the +// scripts/bench-read-bulk.ps1 driver collates across all five clients. +func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("bench-read-bulk", flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + clientName := flags.String("client-name", "mxgw-go-bench", "session client name") + durationSeconds := flags.Int("duration-seconds", 30, "steady-state measurement window in seconds") + warmupSeconds := flags.Int("warmup-seconds", 3, "warm-up window before measurement, in seconds") + bulkSize := flags.Int("bulk-size", 6, "tags per ReadBulk call") + tagStart := flags.Int("tag-start", 1, "first machine number") + tagPrefix := flags.String("tag-prefix", "TestMachine_", "tag prefix (machine number appended as %03d)") + tagAttribute := flags.String("tag-attribute", "TestChangingInt", "attribute appended to each tag prefix") + timeoutMs := flags.Int("timeout-ms", 1500, "per-tag snapshot timeout in milliseconds") + + if err := flags.Parse(args); err != nil { + return err + } + if *bulkSize < 1 { + return errors.New("bulk-size must be positive") + } + if *durationSeconds < 1 { + return errors.New("duration-seconds must be positive") + } + + tags := make([]string, *bulkSize) + for i := 0; i < *bulkSize; i++ { + tags[i] = fmt.Sprintf("%s%03d.%s", *tagPrefix, *tagStart+i, *tagAttribute) + } + + client, options, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName}) + if err != nil { + return err + } + defer func() { + _, _ = session.Close(context.Background()) + }() + + serverHandle, err := session.Register(ctx, *clientName) + if err != nil { + return err + } + + subscribeResults, err := session.SubscribeBulk(ctx, serverHandle, tags) + if err != nil { + return err + } + itemHandles := make([]int32, 0, len(subscribeResults)) + for _, result := range subscribeResults { + if result.GetWasSuccessful() { + itemHandles = append(itemHandles, result.GetItemHandle()) + } + } + defer func() { + if len(itemHandles) > 0 { + _, _ = session.UnsubscribeBulk(context.Background(), serverHandle, itemHandles) + } + }() + + // Warm-up: drive identical calls so any first-call JIT / connection-pool + // setup is amortised before the measurement window opens. + warmupDeadline := time.Now().Add(time.Duration(*warmupSeconds) * time.Second) + timeout := time.Duration(*timeoutMs) * time.Millisecond + for time.Now().Before(warmupDeadline) { + _, _ = session.ReadBulk(ctx, serverHandle, tags, timeout) + } + + // Steady state: per-call latency captured via time.Now() deltas. + latenciesMs := make([]float64, 0, 65536) + var totalReadResults int64 + var cachedReadResults int64 + var successfulCalls, failedCalls int + steadyStart := time.Now() + steadyDeadline := steadyStart.Add(time.Duration(*durationSeconds) * time.Second) + + for time.Now().Before(steadyDeadline) { + callStart := time.Now() + results, err := session.ReadBulk(ctx, serverHandle, tags, timeout) + elapsed := time.Since(callStart) + latenciesMs = append(latenciesMs, float64(elapsed.Nanoseconds())/1e6) + if err != nil { + failedCalls++ + continue + } + successfulCalls++ + for _, r := range results { + totalReadResults++ + if r.GetWasCached() { + cachedReadResults++ + } + } + } + steadyElapsed := time.Since(steadyStart) + totalCalls := successfulCalls + failedCalls + + callsPerSecond := 0.0 + if steadyElapsed.Seconds() > 0 { + callsPerSecond = float64(totalCalls) / steadyElapsed.Seconds() + } + + stats := map[string]any{ + "language": "go", + "command": "bench-read-bulk", + "endpoint": options.Endpoint, + "clientName": *clientName, + "bulkSize": *bulkSize, + "durationSeconds": *durationSeconds, + "warmupSeconds": *warmupSeconds, + "durationMs": steadyElapsed.Milliseconds(), + "tags": tags, + "totalCalls": totalCalls, + "successfulCalls": successfulCalls, + "failedCalls": failedCalls, + "totalReadResults": totalReadResults, + "cachedReadResults": cachedReadResults, + "callsPerSecond": roundTo(callsPerSecond, 2), + "latencyMs": percentileSummary(latenciesMs), + } + if *jsonOutput { + return writeJSON(stdout, stats) + } + fmt.Fprintln(stdout, callsPerSecond) + return nil +} + +// percentileSummary returns the same { p50, p95, p99, max, mean } shape every +// language bench emits, rounded to 3 decimal places so the PowerShell driver +// sees one schema across all five clients. +func percentileSummary(sample []float64) map[string]float64 { + if len(sample) == 0 { + return map[string]float64{"p50": 0, "p95": 0, "p99": 0, "max": 0, "mean": 0} + } + sorted := append([]float64(nil), sample...) + sort.Float64s(sorted) + mean := 0.0 + maxValue := sorted[len(sorted)-1] + for _, v := range sample { + mean += v + } + mean /= float64(len(sample)) + return map[string]float64{ + "p50": roundTo(percentile(sorted, 0.50), 3), + "p95": roundTo(percentile(sorted, 0.95), 3), + "p99": roundTo(percentile(sorted, 0.99), 3), + "max": roundTo(maxValue, 3), + "mean": roundTo(mean, 3), + } +} + +// percentile uses nearest-rank with linear interpolation; matches the .NET +// implementation so cross-language comparisons are apples-to-apples. +func percentile(sorted []float64, quantile float64) float64 { + if len(sorted) == 0 { + return 0 + } + if len(sorted) == 1 { + return sorted[0] + } + rank := quantile * float64(len(sorted)-1) + lower := int(rank) + upper := lower + 1 + if upper >= len(sorted) { + return sorted[lower] + } + fraction := rank - float64(lower) + return sorted[lower] + (sorted[upper]-sorted[lower])*fraction +} + +func roundTo(value float64, digits int) float64 { + shift := 1.0 + for i := 0; i < digits; i++ { + shift *= 10 + } + return float64(int64(value*shift+0.5)) / shift +} + func runWrite(ctx context.Context, args []string, stdout, stderr io.Writer) error { flags := flag.NewFlagSet("write", flag.ContinueOnError) flags.SetOutput(stderr) @@ -517,7 +882,7 @@ func parseStringList(value string) []string { return items } -func parseInt32List(value string) []int32 { +func parseInt32List(value string) ([]int32, error) { parts := strings.Split(value, ",") items := make([]int32, 0, len(parts)) for _, part := range parts { @@ -527,11 +892,11 @@ func parseInt32List(value string) []int32 { } parsed, err := strconv.ParseInt(item, 10, 32) if err != nil { - panic(err) + return nil, fmt.Errorf("invalid item handle %q: %w", item, err) } items = append(items, int32(parsed)) } - return items + return items, nil } func bindCommonFlags(flags *flag.FlagSet) *commonOptions { @@ -650,6 +1015,36 @@ func writeBulkOutput(stdout io.Writer, jsonOutput bool, command string, options return nil } +func writeWriteBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkWriteResult, err error) error { + if err != nil { + return err + } + if jsonOutput { + return writeJSON(stdout, map[string]any{ + "command": command, + "options": options, + "results": results, + }) + } + fmt.Fprintln(stdout, len(results)) + return nil +} + +func writeReadBulkOutput(stdout io.Writer, jsonOutput bool, command string, options commonOptions, results []*mxgateway.BulkReadResult, err error) error { + if err != nil { + return err + } + if jsonOutput { + return writeJSON(stdout, map[string]any{ + "command": command, + "options": options, + "results": results, + }) + } + fmt.Fprintln(stdout, len(results)) + return nil +} + func writeJSON(writer io.Writer, value any) error { encoder := json.NewEncoder(writer) encoder.SetIndent("", " ") @@ -669,7 +1064,7 @@ type protojsonMessage interface { } func writeUsage(writer io.Writer) { - fmt.Fprintln(writer, "usage: mxgw-go ") + fmt.Fprintln(writer, "usage: mxgw-go ") } // batchEOR is the end-of-result sentinel emitted to stdout after every command diff --git a/clients/go/mxgateway/session.go b/clients/go/mxgateway/session.go index 8e00fd1..7165c6f 100644 --- a/clients/go/mxgateway/session.go +++ b/clients/go/mxgateway/session.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "sync" + "time" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc/codes" @@ -387,6 +388,142 @@ func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemH return reply.GetUnsubscribeBulk().GetResults(), nil } +// WriteBulk invokes MXAccess Write sequentially for each entry inside one gateway command. +// Per-entry failures appear as BulkWriteResult entries with WasSuccessful=false; the call +// never returns an error for per-entry MXAccess failures (it returns an error only for +// protocol-level failures or transport errors). +func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []*WriteBulkEntry) ([]*BulkWriteResult, error) { + if entries == nil { + return nil, errors.New("mxgateway: write bulk entries are required") + } + if err := ensureBulkSize("write bulk entries", len(entries)); err != nil { + return nil, err + } + reply, err := s.invokeCommand(ctx, &pb.MxCommand{ + Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK, + Payload: &pb.MxCommand_WriteBulk{ + WriteBulk: &pb.WriteBulkCommand{ + ServerHandle: serverHandle, + Entries: entries, + }, + }, + }) + if err != nil { + return nil, err + } + return reply.GetWriteBulk().GetResults(), nil +} + +// Write2Bulk invokes MXAccess Write2 (timestamped) for each entry inside one gateway command. +func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries []*Write2BulkEntry) ([]*BulkWriteResult, error) { + if entries == nil { + return nil, errors.New("mxgateway: write2 bulk entries are required") + } + if err := ensureBulkSize("write2 bulk entries", len(entries)); err != nil { + return nil, err + } + reply, err := s.invokeCommand(ctx, &pb.MxCommand{ + Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE2_BULK, + Payload: &pb.MxCommand_Write2Bulk{ + Write2Bulk: &pb.Write2BulkCommand{ + ServerHandle: serverHandle, + Entries: entries, + }, + }, + }) + if err != nil { + return nil, err + } + return reply.GetWrite2Bulk().GetResults(), nil +} + +// WriteSecuredBulk invokes MXAccess WriteSecured for each entry. Credential-sensitive +// values must not be logged by callers; mirrors the single-item WriteSecured contract. +func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entries []*WriteSecuredBulkEntry) ([]*BulkWriteResult, error) { + if entries == nil { + return nil, errors.New("mxgateway: write-secured bulk entries are required") + } + if err := ensureBulkSize("write-secured bulk entries", len(entries)); err != nil { + return nil, err + } + reply, err := s.invokeCommand(ctx, &pb.MxCommand{ + Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED_BULK, + Payload: &pb.MxCommand_WriteSecuredBulk{ + WriteSecuredBulk: &pb.WriteSecuredBulkCommand{ + ServerHandle: serverHandle, + Entries: entries, + }, + }, + }) + if err != nil { + return nil, err + } + return reply.GetWriteSecuredBulk().GetResults(), nil +} + +// WriteSecured2Bulk invokes MXAccess WriteSecured2 (timestamped) for each entry. +func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, entries []*WriteSecured2BulkEntry) ([]*BulkWriteResult, error) { + if entries == nil { + return nil, errors.New("mxgateway: write-secured2 bulk entries are required") + } + if err := ensureBulkSize("write-secured2 bulk entries", len(entries)); err != nil { + return nil, err + } + reply, err := s.invokeCommand(ctx, &pb.MxCommand{ + Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED2_BULK, + Payload: &pb.MxCommand_WriteSecured2Bulk{ + WriteSecured2Bulk: &pb.WriteSecured2BulkCommand{ + ServerHandle: serverHandle, + Entries: entries, + }, + }, + }) + if err != nil { + return nil, err + } + return reply.GetWriteSecured2Bulk().GetResults(), nil +} + +// ReadBulk snapshots the current value of each requested tag. +// +// MXAccess COM has no synchronous Read; the worker satisfies this by returning the +// most recent cached OnDataChange value when the tag is already advised (WasCached=true), +// or by taking a full AddItem + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle +// otherwise. timeout bounds the wait per tag in the snapshot case; pass zero to use the +// worker default. Per-tag failures (timeout, invalid tag) appear as BulkReadResult entries +// with WasSuccessful=false; the call never returns an error for per-tag MXAccess failures. +func (s *Session) ReadBulk(ctx context.Context, serverHandle int32, tagAddresses []string, timeout time.Duration) ([]*BulkReadResult, error) { + if tagAddresses == nil { + return nil, errors.New("mxgateway: tag addresses are required") + } + if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil { + return nil, err + } + var timeoutMs uint32 + if timeout > 0 { + ms := timeout.Milliseconds() + if ms > int64(^uint32(0)) { + timeoutMs = ^uint32(0) + } else { + timeoutMs = uint32(ms) + } + } + reply, err := s.invokeCommand(ctx, &pb.MxCommand{ + Kind: pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK, + Payload: &pb.MxCommand_ReadBulk{ + ReadBulk: &pb.ReadBulkCommand{ + ServerHandle: serverHandle, + TagAddresses: tagAddresses, + TimeoutMs: timeoutMs, + }, + }, + }) + if err != nil { + return nil, err + } + return reply.GetReadBulk().GetResults(), nil +} + // Write invokes MXAccess Write. func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error { _, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID) diff --git a/clients/go/mxgateway/types.go b/clients/go/mxgateway/types.go index b942b9e..1682907 100644 --- a/clients/go/mxgateway/types.go +++ b/clients/go/mxgateway/types.go @@ -70,6 +70,32 @@ type ( WriteCommand = pb.WriteCommand // Write2Command is the payload of an MXAccess Write2 command. Write2Command = pb.Write2Command + // WriteBulkCommand is the payload of a bulk Write command. + WriteBulkCommand = pb.WriteBulkCommand + // WriteBulkEntry is one entry inside a WriteBulkCommand. + WriteBulkEntry = pb.WriteBulkEntry + // Write2BulkCommand is the payload of a bulk Write2 (timestamped) command. + Write2BulkCommand = pb.Write2BulkCommand + // Write2BulkEntry is one entry inside a Write2BulkCommand. + Write2BulkEntry = pb.Write2BulkEntry + // WriteSecuredBulkCommand is the payload of a bulk WriteSecured command. + WriteSecuredBulkCommand = pb.WriteSecuredBulkCommand + // WriteSecuredBulkEntry is one entry inside a WriteSecuredBulkCommand. + WriteSecuredBulkEntry = pb.WriteSecuredBulkEntry + // WriteSecured2BulkCommand is the payload of a bulk WriteSecured2 (timestamped) command. + WriteSecured2BulkCommand = pb.WriteSecured2BulkCommand + // WriteSecured2BulkEntry is one entry inside a WriteSecured2BulkCommand. + WriteSecured2BulkEntry = pb.WriteSecured2BulkEntry + // ReadBulkCommand is the payload of a bulk Read snapshot command. + ReadBulkCommand = pb.ReadBulkCommand + // BulkWriteReply aggregates BulkWriteResult entries for a bulk write command. + BulkWriteReply = pb.BulkWriteReply + // BulkWriteResult is one entry in a bulk write reply list. + BulkWriteResult = pb.BulkWriteResult + // BulkReadReply aggregates BulkReadResult entries for a bulk read command. + BulkReadReply = pb.BulkReadReply + // BulkReadResult is one entry in a bulk read reply list. + BulkReadResult = pb.BulkReadResult // RegisterReply carries the ServerHandle returned by Register. RegisterReply = pb.RegisterReply // AddItemReply carries the ItemHandle returned by AddItem.