diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs new file mode 100644 index 0000000..e7bf85a --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs @@ -0,0 +1,620 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/jetstream_api.go in the NATS server Go source. + +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +// --------------------------------------------------------------------------- +// Forward stubs for types defined in later sessions +// --------------------------------------------------------------------------- + +/// Stub: full definition in session 20 (stream.go). +public sealed class StreamInfo { } + +/// Stub: full definition in session 20 (consumer.go). +public sealed class ConsumerInfo { } + +/// Stub: stored message type — full definition in session 20. +public sealed class StoredMsg { } + +/// Priority group for pull consumers — full definition in session 20. +public sealed class PriorityGroup { } + +// --------------------------------------------------------------------------- +// API subject constants +// --------------------------------------------------------------------------- + +/// +/// JetStream API subject constants. +/// Mirrors the const block at the top of server/jetstream_api.go. +/// +public static class JsApiSubjects +{ + public const string JsAllApi = "$JS.API.>"; + public const string JsApiPrefix = "$JS.API"; + public const string JsApiAccountInfo = "$JS.API.INFO"; + + public const string JsApiStreamCreate = "$JS.API.STREAM.CREATE.*"; + public const string JsApiStreamCreateT = "$JS.API.STREAM.CREATE.{0}"; + public const string JsApiStreamUpdate = "$JS.API.STREAM.UPDATE.*"; + public const string JsApiStreamUpdateT = "$JS.API.STREAM.UPDATE.{0}"; + public const string JsApiStreams = "$JS.API.STREAM.NAMES"; + public const string JsApiStreamList = "$JS.API.STREAM.LIST"; + public const string JsApiStreamInfo = "$JS.API.STREAM.INFO.*"; + public const string JsApiStreamInfoT = "$JS.API.STREAM.INFO.{0}"; + public const string JsApiStreamDelete = "$JS.API.STREAM.DELETE.*"; + public const string JsApiStreamDeleteT = "$JS.API.STREAM.DELETE.{0}"; + public const string JsApiStreamPurge = "$JS.API.STREAM.PURGE.*"; + public const string JsApiStreamPurgeT = "$JS.API.STREAM.PURGE.{0}"; + public const string JsApiStreamSnapshot = "$JS.API.STREAM.SNAPSHOT.*"; + public const string JsApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.{0}"; + public const string JsApiStreamRestore = "$JS.API.STREAM.RESTORE.*"; + public const string JsApiStreamRestoreT = "$JS.API.STREAM.RESTORE.{0}"; + public const string JsApiMsgDelete = "$JS.API.STREAM.MSG.DELETE.*"; + public const string JsApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.{0}"; + public const string JsApiMsgGet = "$JS.API.STREAM.MSG.GET.*"; + public const string JsApiMsgGetT = "$JS.API.STREAM.MSG.GET.{0}"; + public const string JsDirectMsgGet = "$JS.API.DIRECT.GET.*"; + public const string JsDirectMsgGetT = "$JS.API.DIRECT.GET.{0}"; + public const string JsDirectGetLastBySubject = "$JS.API.DIRECT.GET.*.>"; + public const string JsDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.{0}.{1}"; + + public const string JsApiConsumerCreate = "$JS.API.CONSUMER.CREATE.*"; + public const string JsApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.{0}"; + public const string JsApiConsumerCreateEx = "$JS.API.CONSUMER.CREATE.*.>"; + public const string JsApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.{0}.{1}.{2}"; + public const string JsApiDurableCreate = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"; + public const string JsApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.{0}.{1}"; + public const string JsApiConsumers = "$JS.API.CONSUMER.NAMES.*"; + public const string JsApiConsumersT = "$JS.API.CONSUMER.NAMES.{0}"; + public const string JsApiConsumerList = "$JS.API.CONSUMER.LIST.*"; + public const string JsApiConsumerListT = "$JS.API.CONSUMER.LIST.{0}"; + public const string JsApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*"; + public const string JsApiConsumerInfoT = "$JS.API.CONSUMER.INFO.{0}.{1}"; + public const string JsApiConsumerDelete = "$JS.API.CONSUMER.DELETE.*.*"; + public const string JsApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.{0}.{1}"; + public const string JsApiConsumerPause = "$JS.API.CONSUMER.PAUSE.*.*"; + public const string JsApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.{0}.{1}"; + public const string JsApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.{0}.{1}"; + public const string JsApiConsumerResetT = "$JS.API.CONSUMER.RESET.{0}.{1}"; + public const string JsApiConsumerUnpin = "$JS.API.CONSUMER.UNPIN.*.*"; + public const string JsApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.{0}.{1}"; + + public const string JsApiStreamRemovePeer = "$JS.API.STREAM.PEER.REMOVE.*"; + public const string JsApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.{0}"; + public const string JsApiStreamLeaderStepDown = "$JS.API.STREAM.LEADER.STEPDOWN.*"; + public const string JsApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.{0}"; + public const string JsApiConsumerLeaderStepDown = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"; + public const string JsApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.{0}.{1}"; + public const string JsApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"; + public const string JsApiRemoveServer = "$JS.API.SERVER.REMOVE"; + public const string JsApiAccountPurge = "$JS.API.ACCOUNT.PURGE.*"; + public const string JsApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.{0}"; + public const string JsApiServerStreamMove = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"; + public const string JsApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.{0}.{1}"; + public const string JsApiServerStreamCancelMove = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"; + public const string JsApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.{0}.{1}"; + + // Advisory/metric subjects + public const string JsAdvisoryPrefix = "$JS.EVENT.ADVISORY"; + public const string JsMetricPrefix = "$JS.EVENT.METRIC"; + public const string JsMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"; + public const string JsAdvisoryConsumerMaxDelivery = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"; + public const string JsAdvisoryConsumerMsgNak = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"; + public const string JsAdvisoryConsumerMsgTerminated = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"; + public const string JsAdvisoryStreamCreated = "$JS.EVENT.ADVISORY.STREAM.CREATED"; + public const string JsAdvisoryStreamDeleted = "$JS.EVENT.ADVISORY.STREAM.DELETED"; + public const string JsAdvisoryStreamUpdated = "$JS.EVENT.ADVISORY.STREAM.UPDATED"; + public const string JsAdvisoryConsumerCreated = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"; + public const string JsAdvisoryConsumerDeleted = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"; + public const string JsAdvisoryConsumerPause = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"; + public const string JsAdvisoryConsumerPinned = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"; + public const string JsAdvisoryConsumerUnpinned = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"; + public const string JsAdvisoryStreamSnapshotCreate = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"; + public const string JsAdvisoryStreamSnapshotComplete = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"; + public const string JsAdvisoryStreamRestoreCreate = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"; + public const string JsAdvisoryStreamRestoreComplete = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"; + public const string JsAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"; + public const string JsAdvisoryStreamLeaderElected = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"; + public const string JsAdvisoryStreamQuorumLost = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"; + public const string JsAdvisoryStreamBatchAbandoned = "$JS.EVENT.ADVISORY.STREAM.BATCH_ABANDONED"; + public const string JsAdvisoryConsumerLeaderElected = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"; + public const string JsAdvisoryConsumerQuorumLost = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"; + public const string JsAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"; + public const string JsAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"; + public const string JsAdvisoryApiLimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"; + public const string JsAuditAdvisory = "$JS.EVENT.ADVISORY.API"; + + // Response type strings + public const string JsApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"; + public const string JsApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"; + public const string JsApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"; + public const string JsApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"; + public const string JsApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"; + public const string JsApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"; + public const string JsApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"; + public const string JsApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"; + public const string JsApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"; + public const string JsApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"; + public const string JsApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"; + public const string JsApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"; + public const string JsApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"; + public const string JsApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"; + public const string JsApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"; + public const string JsApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"; + public const string JsApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"; + public const string JsApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"; + public const string JsApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"; + public const string JsApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"; + public const string JsApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"; + public const string JsApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"; + public const string JsApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"; + public const string JsApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"; + public const string JsApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"; + public const string JsApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"; + public const string JsApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"; + public const string JsApiConsumerResetResponseType = "io.nats.jetstream.api.v1.consumer_reset_response"; + + // Limits + public const int JsApiNamesLimit = 1024; + public const int JsApiListLimit = 256; + public const int JsMaxSubjectDetails = 100_000; + public const int JsWaitQueueDefaultMax = 512; + public const int JsMaxDescriptionLen = 4 * 1024; + public const int JsMaxMetadataLen = 128 * 1024; + public const int JsMaxNameLen = 255; + public const int JsDefaultRequestQueueLimit = 10_000; + + // Request headers + public const string JsRequiredApiLevel = "Nats-Required-Api-Level"; +} + +// --------------------------------------------------------------------------- +// Base API types +// --------------------------------------------------------------------------- + +/// +/// Standard base response from the JetStream JSON API. +/// Mirrors ApiResponse in server/jetstream_api.go. +/// +public sealed class ApiResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + + /// Returns the as an exception, or null if none. + public Exception? ToError() => + Error is null ? null : new InvalidOperationException($"{Error.Description} ({Error.ErrCode})"); +} + +/// +/// Paged response metadata included in list responses. +/// Mirrors ApiPaged in server/jetstream_api.go. +/// +public sealed class ApiPaged +{ + [JsonPropertyName("total")] public int Total { get; set; } + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("limit")] public int Limit { get; set; } +} + +/// +/// Request parameters for paged API responses. +/// Mirrors ApiPagedRequest in server/jetstream_api.go. +/// +public sealed class ApiPagedRequest +{ + [JsonPropertyName("offset")] public int Offset { get; set; } +} + +// --------------------------------------------------------------------------- +// Account +// --------------------------------------------------------------------------- + +/// Account info response. +public sealed class JsApiAccountInfoResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + + // JetStreamAccountStats fields (embedded in Go) + [JsonPropertyName("memory")] public ulong Memory { get; set; } + [JsonPropertyName("storage")] public ulong Store { get; set; } + [JsonPropertyName("reserved_memory")] public ulong ReservedMemory { get; set; } + [JsonPropertyName("reserved_storage")]public ulong ReservedStore { get; set; } + [JsonPropertyName("streams")] public int Streams { get; set; } + [JsonPropertyName("consumers")] public int Consumers { get; set; } + [JsonPropertyName("limits")] public JetStreamAccountLimits Limits { get; set; } = new(); + [JsonPropertyName("domain")] public string? Domain { get; set; } + [JsonPropertyName("api")] public JetStreamApiStats Api { get; set; } = new(); + [JsonPropertyName("tiers")] public Dictionary? Tiers { get; set; } +} + +// --------------------------------------------------------------------------- +// Stream API types +// --------------------------------------------------------------------------- + +/// Stream creation response. +public sealed class JsApiStreamCreateResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("config")] public StreamConfig? Config { get; set; } + [JsonPropertyName("state")] public StreamState? State { get; set; } + [JsonPropertyName("did_create")] public bool DidCreate { get; set; } +} + +/// Stream deletion response. +public sealed class JsApiStreamDeleteResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } +} + +/// Stream info request with optional filtering. +public sealed class JsApiStreamInfoRequest +{ + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("deleted_details")] public bool DeletedDetails { get; set; } + [JsonPropertyName("subjects_filter")] public string? SubjectsFilter { get; set; } +} + +/// Stream info response. +public sealed class JsApiStreamInfoResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("total")] public int Total { get; set; } + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("limit")] public int Limit { get; set; } + // StreamInfo fields embedded — delegated to StreamInfo stub for now + public StreamInfo? Info { get; set; } +} + +/// Stream names list request. +public sealed class JsApiStreamNamesRequest +{ + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("subject")] public string? Subject { get; set; } +} + +/// Stream names list response. +public sealed class JsApiStreamNamesResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("total")] public int Total { get; set; } + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("limit")] public int Limit { get; set; } + [JsonPropertyName("streams")] public List? Streams { get; set; } +} + +/// Detailed stream list request. +public sealed class JsApiStreamListRequest +{ + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("subject")] public string? Subject { get; set; } +} + +/// Detailed stream list response. +public sealed class JsApiStreamListResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("total")] public int Total { get; set; } + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("limit")] public int Limit { get; set; } + [JsonPropertyName("streams")] public List? Streams { get; set; } + [JsonPropertyName("missing")] public List? Missing { get; set; } + [JsonPropertyName("offline")] public Dictionary? Offline { get; set; } +} + +/// Stream purge request. +public sealed class JsApiStreamPurgeRequest +{ + [JsonPropertyName("seq")] public ulong Sequence { get; set; } + [JsonPropertyName("filter")] public string? Subject { get; set; } + [JsonPropertyName("keep")] public ulong Keep { get; set; } +} + +/// Stream purge response. +public sealed class JsApiStreamPurgeResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } + [JsonPropertyName("purged")] public ulong Purged { get; set; } +} + +/// Stream update response. +public sealed class JsApiStreamUpdateResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + public StreamInfo? Info { get; set; } +} + +/// Message deletion request. +public sealed class JsApiMsgDeleteRequest +{ + [JsonPropertyName("seq")] public ulong Seq { get; set; } + [JsonPropertyName("no_erase")] public bool NoErase { get; set; } +} + +/// Message deletion response. +public sealed class JsApiMsgDeleteResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } +} + +/// Stream snapshot request. +public sealed class JsApiStreamSnapshotRequest +{ + [JsonPropertyName("deliver_subject")] public string DeliverSubject { get; set; } = string.Empty; + [JsonPropertyName("no_consumers")] public bool NoConsumers { get; set; } + [JsonPropertyName("chunk_size")] public int ChunkSize { get; set; } + [JsonPropertyName("window_size")] public int WindowSize { get; set; } + [JsonPropertyName("jsck")] public bool CheckMsgs { get; set; } +} + +/// Direct stream snapshot response. +public sealed class JsApiStreamSnapshotResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("config")] public StreamConfig? Config { get; set; } + [JsonPropertyName("state")] public StreamState? State { get; set; } +} + +/// Stream restore request. +public sealed class JsApiStreamRestoreRequest +{ + [JsonPropertyName("config")] public StreamConfig Config { get; set; } = new(); + [JsonPropertyName("state")] public StreamState State { get; set; } = new(); +} + +/// Stream restore response. +public sealed class JsApiStreamRestoreResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("deliver_subject")] public string DeliverSubject { get; set; } = string.Empty; +} + +/// Remove a peer from a stream. +public sealed class JsApiStreamRemovePeerRequest +{ + [JsonPropertyName("peer")] public string Peer { get; set; } = string.Empty; +} + +/// Response to remove peer request. +public sealed class JsApiStreamRemovePeerResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } +} + +/// Response to stream leader step-down. +public sealed class JsApiStreamLeaderStepDownResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } +} + +/// Response to consumer leader step-down. +public sealed class JsApiConsumerLeaderStepDownResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } +} + +/// Meta-leader step-down request with optional placement. +public sealed class JsApiLeaderStepdownRequest +{ + [JsonPropertyName("placement")] public Placement? Placement { get; set; } +} + +/// Response to meta-leader step-down. +public sealed class JsApiLeaderStepDownResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } +} + +/// Remove a peer server from the meta group. +public sealed class JsApiMetaServerRemoveRequest +{ + [JsonPropertyName("peer")] public string Server { get; set; } = string.Empty; + [JsonPropertyName("peer_id")] public string? PeerId { get; set; } +} + +/// Response to peer removal. +public sealed class JsApiMetaServerRemoveResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } +} + +/// Request to move a stream off a server. +public sealed class JsApiMetaServerStreamMoveRequest +{ + [JsonPropertyName("server")] public string? Server { get; set; } + [JsonPropertyName("cluster")] public string? Cluster { get; set; } + [JsonPropertyName("domain")] public string? Domain { get; set; } + [JsonPropertyName("tags")] public string[]? Tags { get; set; } +} + +/// Account purge response. +public sealed class JsApiAccountPurgeResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("initiated")] public bool Initiated { get; set; } +} + +// --------------------------------------------------------------------------- +// Message get +// --------------------------------------------------------------------------- + +/// +/// Direct message get request (by sequence, last-for-subject, next-for-subject, or batch). +/// Mirrors JSApiMsgGetRequest in server/jetstream_api.go. +/// +public sealed class JsApiMsgGetRequest +{ + [JsonPropertyName("seq")] public ulong Seq { get; set; } + [JsonPropertyName("last_by_subj")] public string? LastFor { get; set; } + [JsonPropertyName("next_by_subj")] public string? NextFor { get; set; } + [JsonPropertyName("batch")] public int Batch { get; set; } + [JsonPropertyName("max_bytes")] public int MaxBytes { get; set; } + [JsonPropertyName("start_time")] public DateTime? StartTime { get; set; } + [JsonPropertyName("multi_last")] public string[]? MultiLastFor { get; set; } + [JsonPropertyName("up_to_seq")] public ulong UpToSeq { get; set; } + [JsonPropertyName("up_to_time")] public DateTime? UpToTime { get; set; } + [JsonPropertyName("no_hdr")] public bool NoHeaders { get; set; } +} + +/// Message get response. +public sealed class JsApiMsgGetResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("message")] public StoredMsg? Message { get; set; } +} + +// --------------------------------------------------------------------------- +// Consumer API types +// --------------------------------------------------------------------------- + +/// Consumer create/update response. +public sealed class JsApiConsumerCreateResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + public ConsumerInfo? Info { get; set; } +} + +/// Consumer delete response. +public sealed class JsApiConsumerDeleteResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } +} + +/// Consumer pause request. +public sealed class JsApiConsumerPauseRequest +{ + [JsonPropertyName("pause_until")] public DateTime PauseUntil { get; set; } +} + +/// Consumer pause response. +public sealed class JsApiConsumerPauseResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("paused")] public bool Paused { get; set; } + [JsonPropertyName("pause_until")] public DateTime PauseUntil { get; set; } + [JsonPropertyName("pause_remaining")] public TimeSpan PauseRemaining { get; set; } +} + +/// Consumer info response. +public sealed class JsApiConsumerInfoResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + public ConsumerInfo? Info { get; set; } +} + +/// Consumer names request (paged). +public sealed class JsApiConsumersRequest +{ + [JsonPropertyName("offset")] public int Offset { get; set; } +} + +/// Consumer names list response. +public sealed class JsApiConsumerNamesResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("total")] public int Total { get; set; } + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("limit")] public int Limit { get; set; } + [JsonPropertyName("consumers")] public List? Consumers { get; set; } +} + +/// Consumer list response. +public sealed class JsApiConsumerListResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("total")] public int Total { get; set; } + [JsonPropertyName("offset")] public int Offset { get; set; } + [JsonPropertyName("limit")] public int Limit { get; set; } + [JsonPropertyName("consumers")] public List? Consumers { get; set; } + [JsonPropertyName("missing")] public List? Missing { get; set; } + [JsonPropertyName("offline")] public Dictionary? Offline { get; set; } +} + +/// +/// Pull consumer next-message request. +/// Mirrors JSApiConsumerGetNextRequest in server/jetstream_api.go. +/// +public sealed class JsApiConsumerGetNextRequest +{ + [JsonPropertyName("expires")] public TimeSpan Expires { get; set; } + [JsonPropertyName("batch")] public int Batch { get; set; } + [JsonPropertyName("max_bytes")] public int MaxBytes { get; set; } + [JsonPropertyName("no_wait")] public bool NoWait { get; set; } + [JsonPropertyName("idle_heartbeat")] public TimeSpan Heartbeat { get; set; } + public PriorityGroup? Priority { get; set; } +} + +/// Consumer reset (seek to sequence) request. +public sealed class JsApiConsumerResetRequest +{ + [JsonPropertyName("seq")] public ulong Seq { get; set; } +} + +/// Consumer reset response. +public sealed class JsApiConsumerResetResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } + [JsonPropertyName("reset_seq")] public ulong ResetSeq { get; set; } + public ConsumerInfo? Info { get; set; } +} + +/// Consumer unpin (priority group) request. +public sealed class JsApiConsumerUnpinRequest +{ + [JsonPropertyName("group")] public string Group { get; set; } = string.Empty; +} + +/// Consumer unpin response. +public sealed class JsApiConsumerUnpinResponse +{ + [JsonPropertyName("type")] public string? Type { get; set; } + [JsonPropertyName("error")] public JsApiError? Error { get; set; } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs new file mode 100644 index 0000000..53ca9e7 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs @@ -0,0 +1,132 @@ +// Copyright 2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/jetstream_batching.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server; + +// --------------------------------------------------------------------------- +// Batching types +// --------------------------------------------------------------------------- + +/// +/// Tracks in-progress atomic publish batch groups for a stream. +/// Mirrors the batching struct in server/jetstream_batching.go. +/// +internal sealed class Batching +{ + private readonly Lock _mu = new(); + private readonly Dictionary _group = new(StringComparer.Ordinal); + + public Lock Mu => _mu; + public Dictionary Group => _group; +} + +/// +/// A single in-progress atomic batch: its temporary store and cleanup timer. +/// Mirrors batchGroup in server/jetstream_batching.go. +/// +internal sealed class BatchGroup +{ + /// Last proposed stream sequence for this batch. + public ulong LastSeq { get; set; } + + /// Temporary backing store for the batch's messages. + public object? Store { get; set; } // IStreamStore — session 20 + + /// Timer that abandons the batch after the configured timeout. + public Timer? BatchTimer { get; set; } + + /// + /// Stops the cleanup timer and flushes pending writes so the batch is + /// ready to be committed. + /// Mirrors batchGroup.readyForCommit. + /// + public bool ReadyForCommit() + { + // Stub — full implementation requires IStreamStore.FlushAllPending (session 20). + return BatchTimer?.Change(Timeout.Infinite, Timeout.Infinite) != null; + } +} + +/// +/// Stages consistency-check data for a single atomic batch before it is committed. +/// Mirrors batchStagedDiff in server/jetstream_batching.go. +/// +internal sealed class BatchStagedDiff +{ + /// Message IDs seen in this batch, for duplicate detection. + public Dictionary? MsgIds { get; set; } + + /// Running counter totals, keyed by subject. + public Dictionary? Counter { get; set; } // map[string]*msgCounterRunningTotal + + /// Inflight subject byte/op totals for DiscardNew checks. + public Dictionary? Inflight { get; set; } // map[string]*inflightSubjectRunningTotal + + /// Expected-last-seq-per-subject checks staged in this batch. + public Dictionary? ExpectedPerSubject { get; set; } +} + +/// +/// Cached expected-last-sequence-per-subject result for a single subject within a batch. +/// Mirrors batchExpectedPerSubject in server/jetstream_batching.go. +/// +internal sealed class BatchExpectedPerSubject +{ + /// Stream sequence of the last message on this subject at proposal time. + public ulong SSeq { get; set; } + + /// Clustered proposal sequence at which this check was computed. + public ulong ClSeq { get; set; } +} + +/// +/// Tracks the in-progress application of a committed batch on the Raft apply path. +/// Mirrors batchApply in server/jetstream_batching.go. +/// +internal sealed class BatchApply +{ + private readonly Lock _mu = new(); + + /// ID of the current batch. + public string Id { get; set; } = string.Empty; + + /// Number of entries expected in the batch (for consistency checks). + public ulong Count { get; set; } + + /// Raft committed entries that make up this batch. + public List? Entries { get; set; } // []*CommittedEntry — session 20+ + + /// Index within an entry indicating the first message of the batch. + public int EntryStart { get; set; } + + /// Applied value before the entry containing the first batch message. + public ulong MaxApplied { get; set; } + + public Lock Mu => _mu; + + /// + /// Clears in-memory apply-batch state. + /// Mirrors batchApply.clearBatchStateLocked. + /// Lock should be held. + /// + public void ClearBatchStateLocked() + { + Id = string.Empty; + Count = 0; + Entries = null; + EntryStart = 0; + MaxApplied = 0; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamErrors.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamErrors.cs new file mode 100644 index 0000000..5ea497e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamErrors.cs @@ -0,0 +1,323 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/jetstream_errors.go and server/jetstream_errors_generated.go +// in the NATS server Go source. + +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +// --------------------------------------------------------------------------- +// JsApiError +// --------------------------------------------------------------------------- + +/// +/// Included in all JetStream API responses when there was an error. +/// Mirrors ApiError in server/jetstream_errors.go. +/// +public sealed class JsApiError +{ + [JsonPropertyName("code")] public int Code { get; set; } + [JsonPropertyName("err_code")] public ushort ErrCode { get; set; } + [JsonPropertyName("description")] public string? Description { get; set; } + + public override string ToString() => $"{Description} ({ErrCode})"; +} + +// --------------------------------------------------------------------------- +// JsApiErrors — all 203 error code constants +// --------------------------------------------------------------------------- + +/// +/// Pre-built instances for all JetStream error codes. +/// Mirrors the ApiErrors map in server/jetstream_errors_generated.go. +/// +public static class JsApiErrors +{ + // ---- Account ---- + public static readonly JsApiError AccountResourcesExceeded = new() { Code = 400, ErrCode = 10002, Description = "resource limits exceeded for account" }; + + // ---- Atomic Publish ---- + public static readonly JsApiError AtomicPublishContainsDuplicateMessage = new() { Code = 400, ErrCode = 10201, Description = "atomic publish batch contains duplicate message id" }; + public static readonly JsApiError AtomicPublishDisabled = new() { Code = 400, ErrCode = 10174, Description = "atomic publish is disabled" }; + public static readonly JsApiError AtomicPublishIncompleteBatch = new() { Code = 400, ErrCode = 10176, Description = "atomic publish batch is incomplete" }; + public static readonly JsApiError AtomicPublishInvalidBatchCommit = new() { Code = 400, ErrCode = 10200, Description = "atomic publish batch commit is invalid" }; + public static readonly JsApiError AtomicPublishInvalidBatchID = new() { Code = 400, ErrCode = 10179, Description = "atomic publish batch ID is invalid" }; + public static readonly JsApiError AtomicPublishMissingSeq = new() { Code = 400, ErrCode = 10175, Description = "atomic publish sequence is missing" }; + public static readonly JsApiError AtomicPublishTooLargeBatch = new() { Code = 400, ErrCode = 10199, Description = "atomic publish batch is too large: {size}" }; + public static readonly JsApiError AtomicPublishUnsupportedHeaderBatch = new() { Code = 400, ErrCode = 10177, Description = "atomic publish unsupported header used: {header}" }; + + // ---- General ---- + public static readonly JsApiError BadRequest = new() { Code = 400, ErrCode = 10003, Description = "bad request" }; + + // ---- Cluster ---- + public static readonly JsApiError ClusterIncomplete = new() { Code = 503, ErrCode = 10004, Description = "incomplete results" }; + public static readonly JsApiError ClusterNoPeers = new() { Code = 400, ErrCode = 10005, Description = "{err}" }; + public static readonly JsApiError ClusterNotActive = new() { Code = 500, ErrCode = 10006, Description = "JetStream not in clustered mode" }; + public static readonly JsApiError ClusterNotAssigned = new() { Code = 500, ErrCode = 10007, Description = "JetStream cluster not assigned to this server" }; + public static readonly JsApiError ClusterNotAvail = new() { Code = 503, ErrCode = 10008, Description = "JetStream system temporarily unavailable" }; + public static readonly JsApiError ClusterNotLeader = new() { Code = 500, ErrCode = 10009, Description = "JetStream cluster can not handle request" }; + public static readonly JsApiError ClusterPeerNotMember = new() { Code = 400, ErrCode = 10040, Description = "peer not a member" }; + public static readonly JsApiError ClusterRequired = new() { Code = 503, ErrCode = 10010, Description = "JetStream clustering support required" }; + public static readonly JsApiError ClusterServerMemberChangeInflight = new() { Code = 400, ErrCode = 10202, Description = "cluster member change is in progress" }; + public static readonly JsApiError ClusterServerNotMember = new() { Code = 400, ErrCode = 10044, Description = "server is not a member of the cluster" }; + public static readonly JsApiError ClusterTags = new() { Code = 400, ErrCode = 10011, Description = "tags placement not supported for operation" }; + public static readonly JsApiError ClusterUnSupportFeature = new() { Code = 503, ErrCode = 10036, Description = "not currently supported in clustered mode" }; + + // ---- Consumer ---- + public static readonly JsApiError ConsumerAckPolicyInvalid = new() { Code = 400, ErrCode = 10181, Description = "consumer ack policy invalid" }; + public static readonly JsApiError ConsumerAckWaitNegative = new() { Code = 400, ErrCode = 10183, Description = "consumer ack wait needs to be positive" }; + public static readonly JsApiError ConsumerAlreadyExists = new() { Code = 400, ErrCode = 10148, Description = "consumer already exists" }; + public static readonly JsApiError ConsumerBackOffNegative = new() { Code = 400, ErrCode = 10184, Description = "consumer backoff needs to be positive" }; + public static readonly JsApiError ConsumerBadDurableName = new() { Code = 400, ErrCode = 10103, Description = "durable name can not contain '.', '*', '>'" }; + public static readonly JsApiError ConsumerConfigRequired = new() { Code = 400, ErrCode = 10078, Description = "consumer config required" }; + public static readonly JsApiError ConsumerCreateDurableAndNameMismatch = new() { Code = 400, ErrCode = 10132, Description = "Consumer Durable and Name have to be equal if both are provided" }; + public static readonly JsApiError ConsumerCreateErr = new() { Code = 500, ErrCode = 10012, Description = "{err}" }; + public static readonly JsApiError ConsumerCreateFilterSubjectMismatch = new() { Code = 400, ErrCode = 10131, Description = "Consumer create request did not match filtered subject from create subject" }; + public static readonly JsApiError ConsumerDeliverCycle = new() { Code = 400, ErrCode = 10081, Description = "consumer deliver subject forms a cycle" }; + public static readonly JsApiError ConsumerDeliverToWildcards = new() { Code = 400, ErrCode = 10079, Description = "consumer deliver subject has wildcards" }; + public static readonly JsApiError ConsumerDescriptionTooLong = new() { Code = 400, ErrCode = 10107, Description = "consumer description is too long, maximum allowed is {max}" }; + public static readonly JsApiError ConsumerDirectRequiresEphemeral = new() { Code = 400, ErrCode = 10091, Description = "consumer direct requires an ephemeral consumer" }; + public static readonly JsApiError ConsumerDirectRequiresPush = new() { Code = 400, ErrCode = 10090, Description = "consumer direct requires a push based consumer" }; + public static readonly JsApiError ConsumerDoesNotExist = new() { Code = 400, ErrCode = 10149, Description = "consumer does not exist" }; + public static readonly JsApiError ConsumerDuplicateFilterSubjects = new() { Code = 400, ErrCode = 10136, Description = "consumer cannot have both FilterSubject and FilterSubjects specified" }; + public static readonly JsApiError ConsumerDurableNameNotInSubject = new() { Code = 400, ErrCode = 10016, Description = "consumer expected to be durable but no durable name set in subject" }; + public static readonly JsApiError ConsumerDurableNameNotMatchSubject = new() { Code = 400, ErrCode = 10017, Description = "consumer name in subject does not match durable name in request" }; + public static readonly JsApiError ConsumerDurableNameNotSet = new() { Code = 400, ErrCode = 10018, Description = "consumer expected to be durable but a durable name was not set" }; + public static readonly JsApiError ConsumerEmptyFilter = new() { Code = 400, ErrCode = 10139, Description = "consumer filter in FilterSubjects cannot be empty" }; + public static readonly JsApiError ConsumerEmptyGroupName = new() { Code = 400, ErrCode = 10161, Description = "Group name cannot be an empty string" }; + public static readonly JsApiError ConsumerEphemeralWithDurableInSubject = new() { Code = 400, ErrCode = 10019, Description = "consumer expected to be ephemeral but detected a durable name set in subject" }; + public static readonly JsApiError ConsumerEphemeralWithDurableName = new() { Code = 400, ErrCode = 10020, Description = "consumer expected to be ephemeral but a durable name was set in request" }; + public static readonly JsApiError ConsumerExistingActive = new() { Code = 400, ErrCode = 10105, Description = "consumer already exists and is still active" }; + public static readonly JsApiError ConsumerFCRequiresPush = new() { Code = 400, ErrCode = 10089, Description = "consumer flow control requires a push based consumer" }; + public static readonly JsApiError ConsumerFilterNotSubset = new() { Code = 400, ErrCode = 10093, Description = "consumer filter subject is not a valid subset of the interest subjects" }; + public static readonly JsApiError ConsumerHBRequiresPush = new() { Code = 400, ErrCode = 10088, Description = "consumer idle heartbeat requires a push based consumer" }; + public static readonly JsApiError ConsumerInactiveThresholdExcess = new() { Code = 400, ErrCode = 10153, Description = "consumer inactive threshold exceeds system limit of {limit}" }; + public static readonly JsApiError ConsumerInvalidDeliverSubject = new() { Code = 400, ErrCode = 10112, Description = "invalid push consumer deliver subject" }; + public static readonly JsApiError ConsumerInvalidGroupName = new() { Code = 400, ErrCode = 10162, Description = "Valid priority group name must match A-Z, a-z, 0-9, -_/=)+ and may not exceed 16 characters" }; + public static readonly JsApiError ConsumerInvalidPolicy = new() { Code = 400, ErrCode = 10094, Description = "{err}" }; + public static readonly JsApiError ConsumerInvalidPriorityGroup = new() { Code = 400, ErrCode = 10160, Description = "Provided priority group does not exist for this consumer" }; + public static readonly JsApiError ConsumerInvalidReset = new() { Code = 400, ErrCode = 10204, Description = "invalid reset: {err}" }; + public static readonly JsApiError ConsumerInvalidSampling = new() { Code = 400, ErrCode = 10095, Description = "failed to parse consumer sampling configuration: {err}" }; + public static readonly JsApiError ConsumerMaxDeliverBackoff = new() { Code = 400, ErrCode = 10116, Description = "max deliver is required to be > length of backoff values" }; + public static readonly JsApiError ConsumerMaxPendingAckExcess = new() { Code = 400, ErrCode = 10121, Description = "consumer max ack pending exceeds system limit of {limit}" }; + public static readonly JsApiError ConsumerMaxPendingAckPolicyRequired = new() { Code = 400, ErrCode = 10082, Description = "consumer requires ack policy for max ack pending" }; + public static readonly JsApiError ConsumerMaxRequestBatchExceeded = new() { Code = 400, ErrCode = 10125, Description = "consumer max request batch exceeds server limit of {limit}" }; + public static readonly JsApiError ConsumerMaxRequestBatchNegative = new() { Code = 400, ErrCode = 10114, Description = "consumer max request batch needs to be > 0" }; + public static readonly JsApiError ConsumerMaxRequestExpiresTooSmall = new() { Code = 400, ErrCode = 10115, Description = "consumer max request expires needs to be >= 1ms" }; + public static readonly JsApiError ConsumerMaxWaitingNegative = new() { Code = 400, ErrCode = 10087, Description = "consumer max waiting needs to be positive" }; + public static readonly JsApiError ConsumerMetadataLength = new() { Code = 400, ErrCode = 10135, Description = "consumer metadata exceeds maximum size of {limit}" }; + public static readonly JsApiError ConsumerMultipleFiltersNotAllowed = new() { Code = 400, ErrCode = 10137, Description = "consumer with multiple subject filters cannot use subject based API" }; + public static readonly JsApiError ConsumerNameContainsPathSeparators = new() { Code = 400, ErrCode = 10127, Description = "Consumer name can not contain path separators" }; + public static readonly JsApiError ConsumerNameExist = new() { Code = 400, ErrCode = 10013, Description = "consumer name already in use" }; + public static readonly JsApiError ConsumerNameTooLong = new() { Code = 400, ErrCode = 10102, Description = "consumer name is too long, maximum allowed is {max}" }; + public static readonly JsApiError ConsumerNotFound = new() { Code = 404, ErrCode = 10014, Description = "consumer not found" }; + public static readonly JsApiError ConsumerOffline = new() { Code = 500, ErrCode = 10119, Description = "consumer is offline" }; + public static readonly JsApiError ConsumerOfflineReason = new() { Code = 500, ErrCode = 10195, Description = "consumer is offline: {err}" }; + public static readonly JsApiError ConsumerOnMapped = new() { Code = 400, ErrCode = 10092, Description = "consumer direct on a mapped consumer" }; + public static readonly JsApiError ConsumerOverlappingSubjectFilters = new() { Code = 400, ErrCode = 10138, Description = "consumer subject filters cannot overlap" }; + public static readonly JsApiError ConsumerPinnedTTLWithoutPriorityPolicyNone = new() { Code = 400, ErrCode = 10197, Description = "PinnedTTL cannot be set when PriorityPolicy is none" }; + public static readonly JsApiError ConsumerPriorityGroupWithPolicyNone = new() { Code = 400, ErrCode = 10196, Description = "consumer can not have priority groups when policy is none" }; + public static readonly JsApiError ConsumerPriorityPolicyWithoutGroup = new() { Code = 400, ErrCode = 10159, Description = "Setting PriorityPolicy requires at least one PriorityGroup to be set" }; + public static readonly JsApiError ConsumerPullNotDurable = new() { Code = 400, ErrCode = 10085, Description = "consumer in pull mode requires a durable name" }; + public static readonly JsApiError ConsumerPullRequiresAck = new() { Code = 400, ErrCode = 10084, Description = "consumer in pull mode requires explicit ack policy on workqueue stream" }; + public static readonly JsApiError ConsumerPullWithRateLimit = new() { Code = 400, ErrCode = 10086, Description = "consumer in pull mode can not have rate limit set" }; + public static readonly JsApiError ConsumerPushMaxWaiting = new() { Code = 400, ErrCode = 10080, Description = "consumer in push mode can not set max waiting" }; + public static readonly JsApiError ConsumerPushWithPriorityGroup = new() { Code = 400, ErrCode = 10178, Description = "priority groups can not be used with push consumers" }; + public static readonly JsApiError ConsumerReplacementWithDifferentName = new() { Code = 400, ErrCode = 10106, Description = "consumer replacement durable config not the same" }; + public static readonly JsApiError ConsumerReplayPolicyInvalid = new() { Code = 400, ErrCode = 10182, Description = "consumer replay policy invalid" }; + public static readonly JsApiError ConsumerReplicasExceedsStream = new() { Code = 400, ErrCode = 10126, Description = "consumer config replica count exceeds parent stream" }; + public static readonly JsApiError ConsumerReplicasShouldMatchStream = new() { Code = 400, ErrCode = 10134, Description = "consumer config replicas must match interest retention stream's replicas" }; + public static readonly JsApiError ConsumerSmallHeartbeat = new() { Code = 400, ErrCode = 10083, Description = "consumer idle heartbeat needs to be >= 100ms" }; + public static readonly JsApiError ConsumerStoreFailed = new() { Code = 500, ErrCode = 10104, Description = "error creating store for consumer: {err}" }; + public static readonly JsApiError ConsumerWQConsumerNotDeliverAll = new() { Code = 400, ErrCode = 10101, Description = "consumer must be deliver all on workqueue stream" }; + public static readonly JsApiError ConsumerWQConsumerNotUnique = new() { Code = 400, ErrCode = 10100, Description = "filtered consumer not unique on workqueue stream" }; + public static readonly JsApiError ConsumerWQMultipleUnfiltered = new() { Code = 400, ErrCode = 10099, Description = "multiple non-filtered consumers not allowed on workqueue stream" }; + public static readonly JsApiError ConsumerWQRequiresExplicitAck = new() { Code = 400, ErrCode = 10098, Description = "workqueue stream requires explicit ack" }; + public static readonly JsApiError ConsumerWithFlowControlNeedsHeartbeats = new() { Code = 400, ErrCode = 10108, Description = "consumer with flow control also needs heartbeats" }; + + // ---- Resources ---- + public static readonly JsApiError InsufficientResources = new() { Code = 503, ErrCode = 10023, Description = "insufficient resources" }; + public static readonly JsApiError InvalidJSON = new() { Code = 400, ErrCode = 10025, Description = "invalid JSON: {err}" }; + public static readonly JsApiError MaximumConsumersLimit = new() { Code = 400, ErrCode = 10026, Description = "maximum consumers limit reached" }; + public static readonly JsApiError MaximumStreamsLimit = new() { Code = 400, ErrCode = 10027, Description = "maximum number of streams reached" }; + public static readonly JsApiError MemoryResourcesExceeded = new() { Code = 500, ErrCode = 10028, Description = "insufficient memory resources available" }; + + // ---- Message Counter ---- + public static readonly JsApiError MessageCounterBroken = new() { Code = 400, ErrCode = 10172, Description = "message counter is broken" }; + public static readonly JsApiError MessageIncrDisabled = new() { Code = 400, ErrCode = 10168, Description = "message counters is disabled" }; + public static readonly JsApiError MessageIncrInvalid = new() { Code = 400, ErrCode = 10171, Description = "message counter increment is invalid" }; + public static readonly JsApiError MessageIncrMissing = new() { Code = 400, ErrCode = 10169, Description = "message counter increment is missing" }; + public static readonly JsApiError MessageIncrPayload = new() { Code = 400, ErrCode = 10170, Description = "message counter has payload" }; + + // ---- Message Schedules ---- + public static readonly JsApiError MessageSchedulesDisabled = new() { Code = 400, ErrCode = 10188, Description = "message schedules is disabled" }; + public static readonly JsApiError MessageSchedulesPatternInvalid = new() { Code = 400, ErrCode = 10189, Description = "message schedules pattern is invalid" }; + public static readonly JsApiError MessageSchedulesRollupInvalid = new() { Code = 400, ErrCode = 10192, Description = "message schedules invalid rollup" }; + public static readonly JsApiError MessageSchedulesSourceInvalid = new() { Code = 400, ErrCode = 10203, Description = "message schedules source is invalid" }; + public static readonly JsApiError MessageSchedulesTTLInvalid = new() { Code = 400, ErrCode = 10191, Description = "message schedules invalid per-message TTL" }; + public static readonly JsApiError MessageSchedulesTargetInvalid = new() { Code = 400, ErrCode = 10190, Description = "message schedules target is invalid" }; + + // ---- Message TTL ---- + public static readonly JsApiError MessageTTLDisabled = new() { Code = 400, ErrCode = 10166, Description = "per-message TTL is disabled" }; + public static readonly JsApiError MessageTTLInvalid = new() { Code = 400, ErrCode = 10165, Description = "invalid per-message TTL" }; + + // ---- Mirror ---- + public static readonly JsApiError MirrorConsumerSetupFailed = new() { Code = 500, ErrCode = 10029, Description = "{err}" }; + public static readonly JsApiError MirrorInvalidStreamName = new() { Code = 400, ErrCode = 10142, Description = "mirrored stream name is invalid" }; + public static readonly JsApiError MirrorInvalidSubjectFilter = new() { Code = 400, ErrCode = 10151, Description = "mirror transform source: {err}" }; + public static readonly JsApiError MirrorInvalidTransformDestination = new() { Code = 400, ErrCode = 10154, Description = "mirror transform: {err}" }; + public static readonly JsApiError MirrorMaxMessageSizeTooBig = new() { Code = 400, ErrCode = 10030, Description = "stream mirror must have max message size >= source" }; + public static readonly JsApiError MirrorMultipleFiltersNotAllowed = new() { Code = 400, ErrCode = 10150, Description = "mirror with multiple subject transforms cannot also have a single subject filter" }; + public static readonly JsApiError MirrorOverlappingSubjectFilters = new() { Code = 400, ErrCode = 10152, Description = "mirror subject filters can not overlap" }; + public static readonly JsApiError MirrorWithAtomicPublish = new() { Code = 400, ErrCode = 10198, Description = "stream mirrors can not also use atomic publishing" }; + public static readonly JsApiError MirrorWithCounters = new() { Code = 400, ErrCode = 10173, Description = "stream mirrors can not also calculate counters" }; + public static readonly JsApiError MirrorWithFirstSeq = new() { Code = 400, ErrCode = 10143, Description = "stream mirrors can not have first sequence configured" }; + public static readonly JsApiError MirrorWithMsgSchedules = new() { Code = 400, ErrCode = 10186, Description = "stream mirrors can not also schedule messages" }; + public static readonly JsApiError MirrorWithSources = new() { Code = 400, ErrCode = 10031, Description = "stream mirrors can not also contain other sources" }; + public static readonly JsApiError MirrorWithStartSeqAndTime = new() { Code = 400, ErrCode = 10032, Description = "stream mirrors can not have both start seq and start time configured" }; + public static readonly JsApiError MirrorWithSubjectFilters = new() { Code = 400, ErrCode = 10033, Description = "stream mirrors can not contain filtered subjects" }; + public static readonly JsApiError MirrorWithSubjects = new() { Code = 400, ErrCode = 10034, Description = "stream mirrors can not contain subjects" }; + + // ---- Misc ---- + public static readonly JsApiError NoAccount = new() { Code = 503, ErrCode = 10035, Description = "account not found" }; + public static readonly JsApiError NoLimits = new() { Code = 400, ErrCode = 10120, Description = "no JetStream default or applicable tiered limit present" }; + public static readonly JsApiError NoMessageFound = new() { Code = 404, ErrCode = 10037, Description = "no message found" }; + public static readonly JsApiError NotEmptyRequest = new() { Code = 400, ErrCode = 10038, Description = "expected an empty request payload" }; + public static readonly JsApiError NotEnabled = new() { Code = 503, ErrCode = 10076, Description = "JetStream not enabled" }; + public static readonly JsApiError NotEnabledForAccount = new() { Code = 503, ErrCode = 10039, Description = "JetStream not enabled for account" }; + public static readonly JsApiError Pedantic = new() { Code = 400, ErrCode = 10157, Description = "pedantic mode: {err}" }; + public static readonly JsApiError PeerRemap = new() { Code = 503, ErrCode = 10075, Description = "peer remap failed" }; + + // ---- RAFT ---- + public static readonly JsApiError RaftGeneralErr = new() { Code = 500, ErrCode = 10041, Description = "{err}" }; + + // ---- Replicas ---- + public static readonly JsApiError ReplicasCountCannotBeNegative = new() { Code = 400, ErrCode = 10133, Description = "replicas count cannot be negative" }; + public static readonly JsApiError RequiredApiLevel = new() { Code = 412, ErrCode = 10185, Description = "JetStream minimum api level required" }; + + // ---- Restore ---- + public static readonly JsApiError RestoreSubscribeFailed = new() { Code = 500, ErrCode = 10042, Description = "JetStream unable to subscribe to restore snapshot {subject}: {err}" }; + + // ---- Sequence ---- + public static readonly JsApiError SequenceNotFound = new() { Code = 400, ErrCode = 10043, Description = "sequence {seq} not found" }; + public static readonly JsApiError SnapshotDeliverSubjectInvalid = new() { Code = 400, ErrCode = 10015, Description = "deliver subject not valid" }; + + // ---- Source ---- + public static readonly JsApiError SourceConsumerSetupFailed = new() { Code = 500, ErrCode = 10045, Description = "{err}" }; + public static readonly JsApiError SourceDuplicateDetected = new() { Code = 400, ErrCode = 10140, Description = "duplicate source configuration detected" }; + public static readonly JsApiError SourceInvalidStreamName = new() { Code = 400, ErrCode = 10141, Description = "sourced stream name is invalid" }; + public static readonly JsApiError SourceInvalidSubjectFilter = new() { Code = 400, ErrCode = 10145, Description = "source transform source: {err}" }; + public static readonly JsApiError SourceInvalidTransformDestination = new() { Code = 400, ErrCode = 10146, Description = "source transform: {err}" }; + public static readonly JsApiError SourceMaxMessageSizeTooBig = new() { Code = 400, ErrCode = 10046, Description = "stream source must have max message size >= target" }; + public static readonly JsApiError SourceMultipleFiltersNotAllowed = new() { Code = 400, ErrCode = 10144, Description = "source with multiple subject transforms cannot also have a single subject filter" }; + public static readonly JsApiError SourceOverlappingSubjectFilters = new() { Code = 400, ErrCode = 10147, Description = "source filters can not overlap" }; + public static readonly JsApiError SourceWithMsgSchedules = new() { Code = 400, ErrCode = 10187, Description = "stream source can not also schedule messages" }; + + // ---- Storage ---- + public static readonly JsApiError StorageResourcesExceeded = new() { Code = 500, ErrCode = 10047, Description = "insufficient storage resources available" }; + + // ---- Stream ---- + public static readonly JsApiError StreamAssignment = new() { Code = 500, ErrCode = 10048, Description = "{err}" }; + public static readonly JsApiError StreamCreate = new() { Code = 500, ErrCode = 10049, Description = "{err}" }; + public static readonly JsApiError StreamDelete = new() { Code = 500, ErrCode = 10050, Description = "{err}" }; + public static readonly JsApiError StreamDuplicateMessageConflict = new() { Code = 409, ErrCode = 10158, Description = "duplicate message id is in process" }; + public static readonly JsApiError StreamExpectedLastSeqPerSubjectInvalid = new() { Code = 400, ErrCode = 10193, Description = "missing sequence for expected last sequence per subject" }; + public static readonly JsApiError StreamExpectedLastSeqPerSubjectNotReady = new() { Code = 503, ErrCode = 10163, Description = "expected last sequence per subject temporarily unavailable" }; + public static readonly JsApiError StreamExternalApiOverlap = new() { Code = 400, ErrCode = 10021, Description = "stream external api prefix {prefix} must not overlap with {subject}" }; + public static readonly JsApiError StreamExternalDelPrefixOverlaps = new() { Code = 400, ErrCode = 10022, Description = "stream external delivery prefix {prefix} overlaps with stream subject {subject}" }; + public static readonly JsApiError StreamGeneralError = new() { Code = 500, ErrCode = 10051, Description = "{err}" }; + public static readonly JsApiError StreamHeaderExceedsMaximum = new() { Code = 400, ErrCode = 10097, Description = "header size exceeds maximum allowed of 64k" }; + public static readonly JsApiError StreamInfoMaxSubjects = new() { Code = 500, ErrCode = 10117, Description = "subject details would exceed maximum allowed" }; + public static readonly JsApiError StreamInvalidConfig = new() { Code = 500, ErrCode = 10052, Description = "{err}" }; + public static readonly JsApiError StreamInvalid = new() { Code = 500, ErrCode = 10096, Description = "stream not valid" }; + public static readonly JsApiError StreamInvalidExternalDeliverySubj = new() { Code = 400, ErrCode = 10024, Description = "stream external delivery prefix {prefix} must not contain wildcards" }; + public static readonly JsApiError StreamLimits = new() { Code = 500, ErrCode = 10053, Description = "{err}" }; + public static readonly JsApiError StreamMaxBytesRequired = new() { Code = 400, ErrCode = 10113, Description = "account requires a stream config to have max bytes set" }; + public static readonly JsApiError StreamMaxStreamBytesExceeded = new() { Code = 400, ErrCode = 10122, Description = "stream max bytes exceeds account limit max stream bytes" }; + public static readonly JsApiError StreamMessageExceedsMaximum = new() { Code = 400, ErrCode = 10054, Description = "message size exceeds maximum allowed" }; + public static readonly JsApiError StreamMinLastSeq = new() { Code = 412, ErrCode = 10180, Description = "min last sequence" }; + public static readonly JsApiError StreamMirrorNotUpdatable = new() { Code = 400, ErrCode = 10055, Description = "stream mirror configuration can not be updated" }; + public static readonly JsApiError StreamMismatch = new() { Code = 400, ErrCode = 10056, Description = "stream name in subject does not match request" }; + public static readonly JsApiError StreamMoveAndScale = new() { Code = 400, ErrCode = 10123, Description = "can not move and scale a stream in a single update" }; + public static readonly JsApiError StreamMoveInProgress = new() { Code = 400, ErrCode = 10124, Description = "stream move already in progress: {msg}" }; + public static readonly JsApiError StreamMoveNotInProgress = new() { Code = 400, ErrCode = 10129, Description = "stream move not in progress" }; + public static readonly JsApiError StreamMsgDeleteFailed = new() { Code = 500, ErrCode = 10057, Description = "{err}" }; + public static readonly JsApiError StreamNameContainsPathSeparators = new() { Code = 400, ErrCode = 10128, Description = "Stream name can not contain path separators" }; + public static readonly JsApiError StreamNameExist = new() { Code = 400, ErrCode = 10058, Description = "stream name already in use with a different configuration" }; + public static readonly JsApiError StreamNameExistRestoreFailed = new() { Code = 400, ErrCode = 10130, Description = "stream name already in use, cannot restore" }; + public static readonly JsApiError StreamNotFound = new() { Code = 404, ErrCode = 10059, Description = "stream not found" }; + public static readonly JsApiError StreamNotMatch = new() { Code = 400, ErrCode = 10060, Description = "expected stream does not match" }; + public static readonly JsApiError StreamOffline = new() { Code = 500, ErrCode = 10118, Description = "stream is offline" }; + public static readonly JsApiError StreamOfflineReason = new() { Code = 500, ErrCode = 10194, Description = "stream is offline: {err}" }; + public static readonly JsApiError StreamPurgeFailed = new() { Code = 500, ErrCode = 10110, Description = "{err}" }; + public static readonly JsApiError StreamReplicasNotSupported = new() { Code = 500, ErrCode = 10074, Description = "replicas > 1 not supported in non-clustered mode" }; + public static readonly JsApiError StreamReplicasNotUpdatable = new() { Code = 400, ErrCode = 10061, Description = "Replicas configuration can not be updated" }; + public static readonly JsApiError StreamRestore = new() { Code = 500, ErrCode = 10062, Description = "restore failed: {err}" }; + public static readonly JsApiError StreamRollupFailed = new() { Code = 500, ErrCode = 10111, Description = "{err}" }; + public static readonly JsApiError StreamSealed = new() { Code = 400, ErrCode = 10109, Description = "invalid operation on sealed stream" }; + public static readonly JsApiError StreamSequenceNotMatch = new() { Code = 503, ErrCode = 10063, Description = "expected stream sequence does not match" }; + public static readonly JsApiError StreamSnapshot = new() { Code = 500, ErrCode = 10064, Description = "snapshot failed: {err}" }; + public static readonly JsApiError StreamStoreFailed = new() { Code = 503, ErrCode = 10077, Description = "{err}" }; + public static readonly JsApiError StreamSubjectOverlap = new() { Code = 400, ErrCode = 10065, Description = "subjects overlap with an existing stream" }; + public static readonly JsApiError StreamTemplateCreate = new() { Code = 500, ErrCode = 10066, Description = "{err}" }; + public static readonly JsApiError StreamTemplateDelete = new() { Code = 500, ErrCode = 10067, Description = "{err}" }; + public static readonly JsApiError StreamTemplateNotFound = new() { Code = 404, ErrCode = 10068, Description = "template not found" }; + public static readonly JsApiError StreamTooManyRequests = new() { Code = 429, ErrCode = 10167, Description = "too many requests" }; + public static readonly JsApiError StreamTransformInvalidDestination = new() { Code = 400, ErrCode = 10156, Description = "stream transform: {err}" }; + public static readonly JsApiError StreamTransformInvalidSource = new() { Code = 400, ErrCode = 10155, Description = "stream transform source: {err}" }; + public static readonly JsApiError StreamUpdate = new() { Code = 500, ErrCode = 10069, Description = "{err}" }; + public static readonly JsApiError StreamWrongLastMsgID = new() { Code = 400, ErrCode = 10070, Description = "wrong last msg ID: {id}" }; + public static readonly JsApiError StreamWrongLastSequenceConstant = new() { Code = 400, ErrCode = 10164, Description = "wrong last sequence" }; + public static readonly JsApiError StreamWrongLastSequence = new() { Code = 400, ErrCode = 10071, Description = "wrong last sequence: {seq}" }; + + // ---- Temp storage ---- + public static readonly JsApiError TempStorageFailed = new() { Code = 500, ErrCode = 10072, Description = "JetStream unable to open temp storage for restore" }; + public static readonly JsApiError TemplateNameNotMatchSubject = new() { Code = 400, ErrCode = 10073, Description = "template name in subject does not match request" }; + + // --------------------------------------------------------------------------- + // Lookup by ErrCode + // --------------------------------------------------------------------------- + + private static readonly Dictionary _byErrCode; + + static JsApiErrors() + { + _byErrCode = new Dictionary(); + foreach (var field in typeof(JsApiErrors).GetFields( + System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Static)) + { + if (field.GetValue(null) is JsApiError e) + _byErrCode.TryAdd(e.ErrCode, e); + } + } + + /// + /// Returns the pre-built for the given err_code, or null if not found. + /// + public static JsApiError? ForErrCode(ushort errCode) => + _byErrCode.TryGetValue(errCode, out var e) ? e : null; + + /// + /// Returns true if the given matches one or more of the supplied err_codes. + /// Mirrors IsNatsErr in server/jetstream_errors.go. + /// + public static bool IsNatsError(JsApiError? err, params ushort[] errCodes) + { + if (err is null) return false; + foreach (var code in errCodes) + if (err.ErrCode == code) return true; + return false; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs new file mode 100644 index 0000000..5eb4aa5 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs @@ -0,0 +1,293 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/jetstream.go in the NATS server Go source. + +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +// --------------------------------------------------------------------------- +// JetStreamConfig +// --------------------------------------------------------------------------- + +/// +/// Configuration for the JetStream subsystem. +/// Mirrors JetStreamConfig in server/jetstream.go. +/// +public sealed class JetStreamConfig +{ + /// Maximum size of memory-backed streams. + [JsonPropertyName("max_memory")] + public long MaxMemory { get; set; } + + /// Maximum size of file-backed streams. + [JsonPropertyName("max_storage")] + public long MaxStore { get; set; } + + /// Directory where storage files are stored. + [JsonPropertyName("store_dir")] + public string StoreDir { get; set; } = string.Empty; + + /// How frequently we sync to disk in the background via fsync. + [JsonPropertyName("sync_interval")] + public TimeSpan SyncInterval { get; set; } + + /// If true, flushes are done after every write. + [JsonPropertyName("sync_always")] + public bool SyncAlways { get; set; } + + /// The JetStream domain for this server. + [JsonPropertyName("domain")] + public string Domain { get; set; } = string.Empty; + + /// Whether compression is supported. + [JsonPropertyName("compress_ok")] + public bool CompressOK { get; set; } + + /// Unique tag assigned to this server instance. + [JsonPropertyName("unique_tag")] + public string UniqueTag { get; set; } = string.Empty; + + /// Whether strict JSON parsing is performed. + [JsonPropertyName("strict")] + public bool Strict { get; set; } +} + +// --------------------------------------------------------------------------- +// JetStreamApiStats +// --------------------------------------------------------------------------- + +/// +/// Statistics about the JetStream API usage for this server. +/// Mirrors JetStreamAPIStats in server/jetstream.go. +/// +public sealed class JetStreamApiStats +{ + /// Active API level implemented by this server. + [JsonPropertyName("level")] + public int Level { get; set; } + + /// Total API requests received since server start. + [JsonPropertyName("total")] + public ulong Total { get; set; } + + /// Total API requests that resulted in error responses. + [JsonPropertyName("errors")] + public ulong Errors { get; set; } + + /// Number of API requests currently being served. + [JsonPropertyName("inflight")] + public ulong Inflight { get; set; } +} + +// --------------------------------------------------------------------------- +// JetStreamStats +// --------------------------------------------------------------------------- + +/// +/// Statistics about JetStream for this server. +/// Mirrors JetStreamStats in server/jetstream.go. +/// +public sealed class JetStreamStats +{ + [JsonPropertyName("memory")] public ulong Memory { get; set; } + [JsonPropertyName("storage")] public ulong Store { get; set; } + [JsonPropertyName("reserved_memory")] public ulong ReservedMemory { get; set; } + [JsonPropertyName("reserved_storage")] public ulong ReservedStore { get; set; } + [JsonPropertyName("accounts")] public int Accounts { get; set; } + [JsonPropertyName("ha_assets")] public int HAAssets { get; set; } + [JsonPropertyName("api")] public JetStreamApiStats Api { get; set; } = new(); +} + +// --------------------------------------------------------------------------- +// JetStreamAccountLimits +// --------------------------------------------------------------------------- + +/// +/// Per-account JetStream limits. +/// Mirrors JetStreamAccountLimits in server/jetstream.go. +/// +public sealed class JetStreamAccountLimits +{ + [JsonPropertyName("max_memory")] public long MaxMemory { get; set; } + [JsonPropertyName("max_storage")] public long MaxStore { get; set; } + [JsonPropertyName("max_streams")] public int MaxStreams { get; set; } + [JsonPropertyName("max_consumers")] public int MaxConsumers { get; set; } + [JsonPropertyName("max_ack_pending")] public int MaxAckPending { get; set; } + [JsonPropertyName("memory_max_stream_bytes")] public long MemoryMaxStreamBytes { get; set; } + [JsonPropertyName("storage_max_stream_bytes")] public long StoreMaxStreamBytes { get; set; } + [JsonPropertyName("max_bytes_required")] public bool MaxBytesRequired { get; set; } +} + +// --------------------------------------------------------------------------- +// JetStreamTier +// --------------------------------------------------------------------------- + +/// +/// Per-tier JetStream usage and limits. +/// Mirrors JetStreamTier in server/jetstream.go. +/// +public sealed class JetStreamTier +{ + [JsonPropertyName("memory")] public ulong Memory { get; set; } + [JsonPropertyName("storage")] public ulong Store { get; set; } + [JsonPropertyName("reserved_memory")] public ulong ReservedMemory { get; set; } + [JsonPropertyName("reserved_storage")]public ulong ReservedStore { get; set; } + [JsonPropertyName("streams")] public int Streams { get; set; } + [JsonPropertyName("consumers")] public int Consumers { get; set; } + [JsonPropertyName("limits")] public JetStreamAccountLimits Limits { get; set; } = new(); +} + +// --------------------------------------------------------------------------- +// JetStreamAccountStats +// --------------------------------------------------------------------------- + +/// +/// Current statistics about an account's JetStream usage. +/// Mirrors JetStreamAccountStats in server/jetstream.go. +/// Embeds for totals. +/// +public sealed class JetStreamAccountStats +{ + // Embedded JetStreamTier fields (Go struct embedding) + [JsonPropertyName("memory")] public ulong Memory { get; set; } + [JsonPropertyName("storage")] public ulong Store { get; set; } + [JsonPropertyName("reserved_memory")] public ulong ReservedMemory { get; set; } + [JsonPropertyName("reserved_storage")]public ulong ReservedStore { get; set; } + [JsonPropertyName("streams")] public int Streams { get; set; } + [JsonPropertyName("consumers")] public int Consumers { get; set; } + [JsonPropertyName("limits")] public JetStreamAccountLimits Limits { get; set; } = new(); + + [JsonPropertyName("domain")] public string? Domain { get; set; } + [JsonPropertyName("api")] public JetStreamApiStats Api { get; set; } = new(); + [JsonPropertyName("tiers")] public Dictionary? Tiers { get; set; } +} + +// --------------------------------------------------------------------------- +// Internal JetStream engine types +// --------------------------------------------------------------------------- + +/// +/// The main JetStream engine, one per server. +/// Mirrors jetStream struct in server/jetstream.go. +/// +internal sealed class JetStream +{ + // Atomic counters (use Interlocked for thread-safety) + public long ApiInflight; + public long ApiTotal; + public long ApiErrors; + public long MemReserved; + public long StoreReserved; + public long MemUsed; + public long StoreUsed; + public long QueueLimit; + public int Clustered; // atomic int32 + + private readonly ReaderWriterLockSlim _mu = new(); + + public object? Server { get; set; } // *Server — set at runtime + public JetStreamConfig Config { get; set; } = new(); + public object? Cluster { get; set; } // *jetStreamCluster — session 20+ + public Dictionary Accounts { get; } = new(StringComparer.Ordinal); + public DateTime Started { get; set; } + + // State booleans + public bool MetaRecovering { get; set; } + public bool StandAlone { get; set; } + public bool Oos { get; set; } + public bool ShuttingDown { get; set; } + public int Disabled; // atomic bool (0=false, 1=true) + + public ReaderWriterLockSlim Lock => _mu; +} + +/// +/// Tracks remote per-tier usage for a JetStream account. +/// Mirrors remoteUsage in server/jetstream.go. +/// +internal sealed class RemoteUsage +{ + public Dictionary Tiers { get; } = new(StringComparer.Ordinal); + public ulong Api; + public ulong Err; +} + +/// +/// Per-tier storage accounting (total + local split). +/// Mirrors jsaStorage in server/jetstream.go. +/// +internal sealed class JsaStorage +{ + public JsaUsage Total { get; set; } = new(); + public JsaUsage Local { get; set; } = new(); +} + +/// +/// A JetStream-enabled account, holding streams, limits and usage tracking. +/// Mirrors jsAccount in server/jetstream.go. +/// +internal sealed class JsAccount +{ + private readonly ReaderWriterLockSlim _mu = new(); + + public object? Js { get; set; } // *jetStream + public object? Account { get; set; } // *Account + public string StoreDir { get; set; } = string.Empty; + + // Concurrent inflight map (mirrors sync.Map) + public readonly System.Collections.Concurrent.ConcurrentDictionary Inflight = new(); + + // Streams keyed by stream name + public Dictionary Streams { get; } = new(StringComparer.Ordinal); // *stream + + // Send queue (mirrors *ipQueue[*pubMsg]) + public object? SendQ { get; set; } + + // Atomic sync flag (0=false, 1=true) + public int Sync; + + // Usage/limits (protected by UsageMu) + private readonly ReaderWriterLockSlim _usageMu = new(); + public Dictionary Limits { get; } = new(StringComparer.Ordinal); + public Dictionary Usage { get; } = new(StringComparer.Ordinal); + public Dictionary RUsage { get; } = new(StringComparer.Ordinal); + + public ulong ApiTotal { get; set; } + public ulong ApiErrors { get; set; } + public ulong UsageApi { get; set; } + public ulong UsageErr { get; set; } + public string UpdatesPub { get; set; } = string.Empty; + public object? UpdatesSub { get; set; } // *subscription + public DateTime LUpdate { get; set; } + + public ReaderWriterLockSlim Lock => _mu; + public ReaderWriterLockSlim UsageLock => _usageMu; +} + +/// +/// Memory and store byte usage. +/// Mirrors jsaUsage in server/jetstream.go. +/// +internal sealed class JsaUsage +{ + public long Mem { get; set; } + public long Store { get; set; } +} + +/// +/// Delegate for a function that generates a key-encryption key from a context byte array. +/// Mirrors keyGen in server/jetstream.go. +/// +public delegate byte[] KeyGen(byte[] context); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamVersioning.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamVersioning.cs new file mode 100644 index 0000000..6a6af5e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamVersioning.cs @@ -0,0 +1,106 @@ +// Copyright 2024-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/jetstream_versioning.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server; + +/// +/// JetStream API level versioning constants and helpers. +/// Mirrors server/jetstream_versioning.go. +/// +public static class JetStreamVersioning +{ + /// Maximum supported JetStream API level for this server. + public const int JsApiLevel = 3; + + /// Metadata key that carries the required API level for a stream or consumer asset. + public const string JsRequiredLevelMetadataKey = "_nats.req.level"; + + /// Metadata key that carries the server version that created/updated the asset. + public const string JsServerVersionMetadataKey = "_nats.ver"; + + /// Metadata key that carries the server API level that created/updated the asset. + public const string JsServerLevelMetadataKey = "_nats.level"; + + // ---- API level feature gates ---- + // These document which API level each feature requires. + // They correspond to the requires() calls in setStaticStreamMetadata / setStaticConsumerMetadata. + + /// API level required for per-message TTL and SubjectDeleteMarkerTTL (v2.11). + public const int ApiLevelForTTL = 1; + + /// API level required for consumer PauseUntil (v2.11). + public const int ApiLevelForConsumerPause = 1; + + /// API level required for priority groups (v2.11). + public const int ApiLevelForPriorityGroups = 1; + + /// API level required for counter CRDTs (v2.12). + public const int ApiLevelForCounters = 2; + + /// API level required for atomic batch publishing (v2.12). + public const int ApiLevelForAtomicPublish = 2; + + /// API level required for message scheduling (v2.12). + public const int ApiLevelForMsgSchedules = 2; + + /// API level required for async persist mode (v2.12). + public const int ApiLevelForAsyncPersist = 2; + + // ---- Helper methods ---- + + /// + /// Returns the required API level string from stream or consumer metadata, + /// or an empty string if not set. + /// Mirrors getRequiredApiLevel. + /// + public static string GetRequiredApiLevel(IDictionary? metadata) + { + if (metadata is not null && metadata.TryGetValue(JsRequiredLevelMetadataKey, out var l) && l.Length > 0) + return l; + return string.Empty; + } + + /// + /// Returns whether this server supports the required API level encoded in the asset's metadata. + /// Mirrors supportsRequiredApiLevel. + /// + public static bool SupportsRequiredApiLevel(IDictionary? metadata) + { + var l = GetRequiredApiLevel(metadata); + if (l.Length == 0) return true; + return int.TryParse(l, out var level) && level <= JsApiLevel; + } + + /// + /// Removes dynamic (per-response) versioning fields from metadata. + /// These should never be stored; only added in API responses. + /// Mirrors deleteDynamicMetadata. + /// + public static void DeleteDynamicMetadata(IDictionary metadata) + { + metadata.Remove(JsServerVersionMetadataKey); + metadata.Remove(JsServerLevelMetadataKey); + } + + /// + /// Returns whether a request should be rejected based on the Nats-Required-Api-Level header value. + /// Mirrors errorOnRequiredApiLevel. + /// + public static bool ErrorOnRequiredApiLevel(string? reqApiLevelHeader) + { + if (string.IsNullOrEmpty(reqApiLevelHeader)) return false; + return !int.TryParse(reqApiLevelHeader, out var minLevel) || JsApiLevel < minLevel; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs index 68506b3..7623d00 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs @@ -208,22 +208,8 @@ public static class CompressionMode // InternalState is now fully defined in Events/EventTypes.cs (session 12). -/// Stub for JetStream state pointer (session 19). -internal sealed class JetStreamState { } - -/// Stub for JetStream config (session 19). -public sealed class JetStreamConfig -{ - public string StoreDir { get; set; } = string.Empty; - public TimeSpan SyncInterval { get; set; } - public bool SyncAlways { get; set; } - public bool Strict { get; set; } - public long MaxMemory { get; set; } - public long MaxStore { get; set; } - public string Domain { get; set; } = string.Empty; - public bool CompressOK { get; set; } - public string UniqueTag { get; set; } = string.Empty; -} +// JetStreamState — replaced by JsAccount in JetStream/JetStreamTypes.cs (session 19). +// JetStreamConfig — replaced by full implementation in JetStream/JetStreamTypes.cs (session 19). // SrvGateway — full class is in Gateway/GatewayTypes.cs (session 16). diff --git a/porting.db b/porting.db index dd7db8a..ff67152 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index ad44671..aff4275 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 21:06:51 UTC +Generated: 2026-02-26 21:14:41 UTC ## Modules (12 total) @@ -13,9 +13,9 @@ Generated: 2026-02-26 21:06:51 UTC | Status | Count | |--------|-------| -| complete | 2048 | +| complete | 2422 | | n_a | 77 | -| not_started | 1455 | +| not_started | 1081 | | stub | 93 | ## Unit Tests (3257 total) @@ -36,4 +36,4 @@ Generated: 2026-02-26 21:06:51 UTC ## Overall Progress -**2636/6942 items complete (38.0%)** +**3010/6942 items complete (43.4%)** diff --git a/reports/report_3cffa5b.md b/reports/report_3cffa5b.md new file mode 100644 index 0000000..aff4275 --- /dev/null +++ b/reports/report_3cffa5b.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 21:14:41 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 2422 | +| n_a | 77 | +| not_started | 1081 | +| stub | 93 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 319 | +| n_a | 181 | +| not_started | 2533 | +| stub | 224 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**3010/6942 items complete (43.4%)**