diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs new file mode 100644 index 0000000..d43900e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -0,0 +1,524 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/jetstream_cluster.go in the NATS server Go source. + +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// JetStreamCluster +// ============================================================================ + +/// +/// Holds cluster-level JetStream state on each server: the meta-controller Raft node, +/// stream/consumer assignment maps, and inflight proposal tracking. +/// Mirrors Go jetStreamCluster struct in server/jetstream_cluster.go lines 44-84. +/// +internal sealed class JetStreamCluster +{ + /// The meta-controller Raft node. + public IRaftNode? Meta { get; set; } + + /// + /// All known stream assignments. Key: account name → stream name → assignment. + /// + public Dictionary> Streams { get; set; } = new(); + + /// + /// Inflight stream create/update/delete proposals. Key: account → stream name. + /// + public Dictionary> InflightStreams { get; set; } = new(); + + /// + /// Inflight consumer create/update/delete proposals. Key: account → stream → consumer. + /// + public Dictionary>> InflightConsumers { get; set; } = new(); + + /// + /// Peer-remove reply subjects pending quorum. Key: peer ID. + /// + public Dictionary PeerRemoveReply { get; set; } = new(); + + /// Signals meta-leader should re-check stream assignments. + public bool StreamsCheck { get; set; } + + /// Reference to the top-level server (object to avoid circular dep). + public object? Server { get; set; } + + /// Internal client for cluster messaging (object to avoid circular dep). + public object? Client { get; set; } + + /// Subscription that receives stream assignment results (object to avoid session dep). + public object? StreamResults { get; set; } + + /// Subscription that receives consumer assignment results (object to avoid session dep). + public object? ConsumerResults { get; set; } + + /// Subscription for meta-leader step-down requests. + public object? Stepdown { get; set; } + + /// Subscription for peer-remove requests. + public object? PeerRemove { get; set; } + + /// Subscription for stream-move requests. + public object? PeerStreamMove { get; set; } + + /// Subscription for stream-move cancellation. + public object? PeerStreamCancelMove { get; set; } + + /// Channel used to pop out monitorCluster before the raft layer. + public System.Threading.Channels.Channel? Qch { get; set; } + + /// Notifies that monitorCluster has actually stopped. + public System.Threading.Channels.Channel? Stopped { get; set; } + + /// Last meta-snapshot time (Unix nanoseconds). + public long LastMetaSnapTime { get; set; } + + /// Duration of last meta-snapshot (nanoseconds). + public long LastMetaSnapDuration { get; set; } +} + +// ============================================================================ +// InflightStreamInfo +// ============================================================================ + +/// +/// Tracks inflight stream create/update/delete proposals. +/// Mirrors Go inflightStreamInfo in server/jetstream_cluster.go lines 87-91. +/// +internal sealed class InflightStreamInfo +{ + /// Number of inflight operations. + public ulong Ops { get; set; } + /// Whether the stream has been deleted. + public bool Deleted { get; set; } + public StreamAssignment? Assignment { get; set; } +} + +// ============================================================================ +// InflightConsumerInfo +// ============================================================================ + +/// +/// Tracks inflight consumer create/update/delete proposals. +/// Mirrors Go inflightConsumerInfo in server/jetstream_cluster.go lines 94-98. +/// +internal sealed class InflightConsumerInfo +{ + /// Number of inflight operations. + public ulong Ops { get; set; } + /// Whether the consumer has been deleted. + public bool Deleted { get; set; } + public ConsumerAssignment? Assignment { get; set; } +} + +// ============================================================================ +// PeerRemoveInfo +// ============================================================================ + +/// +/// Holds the reply information for a peer-remove request pending quorum. +/// Mirrors Go peerRemoveInfo in server/jetstream_cluster.go lines 101-106. +/// +internal sealed class PeerRemoveInfo +{ + /// Client info from the request (object to avoid session dep). + public ClientInfo? ClientInfo { get; set; } + public string Subject { get; set; } = string.Empty; + public string Reply { get; set; } = string.Empty; + public string Request { get; set; } = string.Empty; +} + +// ============================================================================ +// EntryOp enum +// ============================================================================ + +/// +/// Operation type encoded in a JetStream cluster Raft entry. +/// Mirrors Go entryOp iota in server/jetstream_cluster.go lines 116-150. +/// ONLY ADD TO THE END — inserting values breaks server interoperability. +/// +internal enum EntryOp : byte +{ + // Meta ops + AssignStreamOp = 0, + AssignConsumerOp = 1, + RemoveStreamOp = 2, + RemoveConsumerOp = 3, + // Stream ops + StreamMsgOp = 4, + PurgeStreamOp = 5, + DeleteMsgOp = 6, + // Consumer ops + UpdateDeliveredOp = 7, + UpdateAcksOp = 8, + // Compressed consumer assignments + AssignCompressedConsumerOp = 9, + // Filtered consumer skip + UpdateSkipOp = 10, + // Update stream + UpdateStreamOp = 11, + // Pending pull requests + AddPendingRequest = 12, + RemovePendingRequest = 13, + // Compressed streams (RAFT or catchup) + CompressedStreamMsgOp = 14, + // Deleted gaps on catchups for replicas + DeleteRangeOp = 15, + // Batch stream ops + BatchMsgOp = 16, + BatchCommitMsgOp = 17, + // Consumer reset to specific starting sequence + ResetSeqOp = 18, +} + +// ============================================================================ +// RaftGroup +// ============================================================================ + +/// +/// Describes a Raft consensus group that houses streams and consumers. +/// Controlled by the meta-group controller. +/// Mirrors Go raftGroup struct in server/jetstream_cluster.go lines 154-163. +/// +internal sealed class RaftGroup +{ + [JsonPropertyName("name")] public string Name { get; set; } = string.Empty; + [JsonPropertyName("peers")] public string[] Peers { get; set; } = []; + [JsonPropertyName("store")] public StorageType Storage { get; set; } + [JsonPropertyName("cluster")] public string? Cluster { get; set; } + [JsonPropertyName("preferred")] public string? Preferred { get; set; } + [JsonPropertyName("scale_up")] public bool ScaleUp { get; set; } + /// Internal Raft node — not serialized. + [JsonIgnore] + public IRaftNode? Node { get; set; } +} + +// ============================================================================ +// StreamAssignment +// ============================================================================ + +/// +/// What the meta controller uses to assign streams to peers. +/// Mirrors Go streamAssignment struct in server/jetstream_cluster.go lines 166-184. +/// +internal sealed class StreamAssignment +{ + [JsonPropertyName("client")] public ClientInfo? Client { get; set; } + [JsonPropertyName("created")] public DateTime Created { get; set; } + [JsonPropertyName("stream")] public JsonElement ConfigJson { get; set; } + [JsonIgnore] public StreamConfig? Config { get; set; } + [JsonPropertyName("group")] public RaftGroup? Group { get; set; } + [JsonPropertyName("sync")] public string Sync { get; set; } = string.Empty; + [JsonPropertyName("subject")] public string? Subject { get; set; } + [JsonPropertyName("reply")] public string? Reply { get; set; } + [JsonPropertyName("restore_state")] public StreamState? Restore { get; set; } + + // Internal (not serialized) + [JsonIgnore] public Dictionary? Consumers { get; set; } + [JsonIgnore] public bool Responded { get; set; } + [JsonIgnore] public bool Recovering { get; set; } + [JsonIgnore] public bool Reassigning { get; set; } + [JsonIgnore] public bool Resetting { get; set; } + [JsonIgnore] public Exception? Error { get; set; } + [JsonIgnore] public UnsupportedStreamAssignment? Unsupported { get; set; } +} + +// ============================================================================ +// UnsupportedStreamAssignment +// ============================================================================ + +/// +/// Holds state for a stream assignment that this server cannot run, +/// so that it can still respond to cluster info requests. +/// Mirrors Go unsupportedStreamAssignment in server/jetstream_cluster.go lines 186-191. +/// +internal sealed class UnsupportedStreamAssignment +{ + public string Reason { get; set; } = string.Empty; + public StreamInfo Info { get; set; } = new(); + /// Internal system client (object to avoid session dep). + public object? SysClient { get; set; } + /// Info subscription (object to avoid session dep). + public object? InfoSub { get; set; } +} + +// ============================================================================ +// ConsumerAssignment +// ============================================================================ + +/// +/// What the meta controller uses to assign consumers to streams. +/// Mirrors Go consumerAssignment struct in server/jetstream_cluster.go lines 250-266. +/// +internal sealed class ConsumerAssignment +{ + [JsonPropertyName("client")] public ClientInfo? Client { get; set; } + [JsonPropertyName("created")] public DateTime Created { get; set; } + [JsonPropertyName("name")] public string Name { get; set; } = string.Empty; + [JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty; + [JsonPropertyName("consumer")] public JsonElement ConfigJson { get; set; } + [JsonIgnore] public ConsumerConfig? Config { get; set; } + [JsonPropertyName("group")] public RaftGroup? Group { get; set; } + [JsonPropertyName("subject")] public string? Subject { get; set; } + [JsonPropertyName("reply")] public string? Reply { get; set; } + [JsonPropertyName("state")] public ConsumerState? State { get; set; } + + // Internal (not serialized) + [JsonIgnore] public bool Responded { get; set; } + [JsonIgnore] public bool Recovering { get; set; } + [JsonIgnore] public Exception? Error { get; set; } + [JsonIgnore] public UnsupportedConsumerAssignment? Unsupported { get; set; } +} + +// ============================================================================ +// UnsupportedConsumerAssignment +// ============================================================================ + +/// +/// Holds state for a consumer assignment that this server cannot run. +/// Mirrors Go unsupportedConsumerAssignment in server/jetstream_cluster.go lines 268-273. +/// +internal sealed class UnsupportedConsumerAssignment +{ + public string Reason { get; set; } = string.Empty; + public ConsumerInfo Info { get; set; } = new(); + /// Internal system client (object to avoid session dep). + public object? SysClient { get; set; } + /// Info subscription (object to avoid session dep). + public object? InfoSub { get; set; } +} + +// ============================================================================ +// WriteableConsumerAssignment +// ============================================================================ + +/// +/// Serialisable form of a consumer assignment used in meta-snapshots. +/// Mirrors Go writeableConsumerAssignment in server/jetstream_cluster.go lines 332-340. +/// +internal sealed class WriteableConsumerAssignment +{ + [JsonPropertyName("client")] public ClientInfo? Client { get; set; } + [JsonPropertyName("created")] public DateTime Created { get; set; } + [JsonPropertyName("name")] public string Name { get; set; } = string.Empty; + [JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty; + [JsonPropertyName("consumer")] public JsonElement ConfigJson { get; set; } + [JsonPropertyName("group")] public RaftGroup? Group { get; set; } + [JsonPropertyName("state")] public ConsumerState? State { get; set; } +} + +// ============================================================================ +// StreamPurge +// ============================================================================ + +/// +/// What the stream leader replicates when purging a stream. +/// Mirrors Go streamPurge struct in server/jetstream_cluster.go lines 343-350. +/// +internal sealed class StreamPurge +{ + [JsonPropertyName("client")] public ClientInfo? Client { get; set; } + [JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty; + [JsonPropertyName("last_seq")] public ulong LastSeq { get; set; } + [JsonPropertyName("subject")] public string Subject { get; set; } = string.Empty; + [JsonPropertyName("reply")] public string Reply { get; set; } = string.Empty; + [JsonPropertyName("request")] public JSApiStreamPurgeRequest? Request { get; set; } +} + +// ============================================================================ +// StreamMsgDelete +// ============================================================================ + +/// +/// What the stream leader replicates when deleting a message. +/// Mirrors Go streamMsgDelete struct in server/jetstream_cluster.go lines 353-360. +/// +internal sealed class StreamMsgDelete +{ + [JsonPropertyName("client")] public ClientInfo? Client { get; set; } + [JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty; + [JsonPropertyName("seq")] public ulong Seq { get; set; } + [JsonPropertyName("no_erase")] public bool NoErase { get; set; } + [JsonPropertyName("subject")] public string Subject { get; set; } = string.Empty; + [JsonPropertyName("reply")] public string Reply { get; set; } = string.Empty; +} + +// ============================================================================ +// RecoveryUpdates +// ============================================================================ + +/// +/// Accumulates stream/consumer changes discovered during meta-recovery so they +/// can be applied after the full snapshot has been processed. +/// Mirrors Go recoveryUpdates struct in server/jetstream_cluster.go lines 1327-1333. +/// +internal sealed class RecoveryUpdates +{ + public Dictionary RemoveStreams { get; set; } = new(); + public Dictionary> RemoveConsumers { get; set; } = new(); + public Dictionary AddStreams { get; set; } = new(); + public Dictionary UpdateStreams { get; set; } = new(); + public Dictionary> UpdateConsumers { get; set; } = new(); +} + +// ============================================================================ +// WriteableStreamAssignment +// ============================================================================ + +/// +/// Serialisable form of a stream assignment (with its consumers) used in meta-snapshots. +/// Mirrors Go writeableStreamAssignment in server/jetstream_cluster.go lines 1872-1879. +/// +internal sealed class WriteableStreamAssignment +{ + [JsonPropertyName("client")] public ClientInfo? Client { get; set; } + [JsonPropertyName("created")] public DateTime Created { get; set; } + [JsonPropertyName("stream")] public JsonElement ConfigJson { get; set; } + [JsonPropertyName("group")] public RaftGroup? Group { get; set; } + [JsonPropertyName("sync")] public string Sync { get; set; } = string.Empty; + public List Consumers { get; set; } = new(); +} + +// ============================================================================ +// ConsumerAssignmentResult +// ============================================================================ + +/// +/// Result sent by a member after processing a consumer assignment. +/// Mirrors Go consumerAssignmentResult in server/jetstream_cluster.go lines 5592-5597. +/// +internal sealed class ConsumerAssignmentResult +{ + [JsonPropertyName("account")] public string Account { get; set; } = string.Empty; + [JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty; + [JsonPropertyName("consumer")] public string Consumer { get; set; } = string.Empty; + /// Stub: JSApiConsumerCreateResponse — full type in session 20+. + [JsonPropertyName("response")] public object? Response { get; set; } +} + +// ============================================================================ +// StreamAssignmentResult +// ============================================================================ + +/// +/// Result sent by a member after processing a stream assignment. +/// Mirrors Go streamAssignmentResult in server/jetstream_cluster.go lines 6779-6785. +/// +internal sealed class StreamAssignmentResult +{ + [JsonPropertyName("account")] public string Account { get; set; } = string.Empty; + [JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty; + /// Stub: JSApiStreamCreateResponse — full type in session 20+. + [JsonPropertyName("create_response")] public object? Response { get; set; } + /// Stub: JSApiStreamRestoreResponse — full type in session 20+. + [JsonPropertyName("restore_response")] public object? Restore { get; set; } + [JsonPropertyName("is_update")] public bool Update { get; set; } +} + +// ============================================================================ +// SelectPeerError +// ============================================================================ + +/// +/// Collects the reasons why no suitable peer could be found for a placement. +/// Mirrors Go selectPeerError struct in server/jetstream_cluster.go lines 7113-7121. +/// +internal sealed class SelectPeerError : Exception +{ + public bool ExcludeTag { get; set; } + public bool Offline { get; set; } + public bool NoStorage { get; set; } + public bool UniqueTag { get; set; } + public bool Misc { get; set; } + public bool NoJsClust { get; set; } + public HashSet? NoMatchTags { get; set; } + public HashSet? ExcludeTags { get; set; } + + public override string Message => BuildMessage(); + + private string BuildMessage() + { + var b = new System.Text.StringBuilder("no suitable peers for placement"); + if (Offline) b.Append(", peer offline"); + if (ExcludeTag) b.Append(", exclude tag set"); + if (NoStorage) b.Append(", insufficient storage"); + if (UniqueTag) b.Append(", server tag not unique"); + if (Misc) b.Append(", miscellaneous issue"); + if (NoJsClust) b.Append(", jetstream not enabled in cluster"); + if (NoMatchTags is { Count: > 0 }) + { + b.Append(", tags not matched ["); + b.Append(string.Join(", ", NoMatchTags)); + b.Append(']'); + } + if (ExcludeTags is { Count: > 0 }) + { + b.Append(", tags excluded ["); + b.Append(string.Join(", ", ExcludeTags)); + b.Append(']'); + } + return b.ToString(); + } +} + +// ============================================================================ +// StreamSnapshot +// ============================================================================ + +/// +/// JSON-serialisable snapshot of a stream's state for cluster catchup. +/// Mirrors Go streamSnapshot struct in server/jetstream_cluster.go lines 9454-9461. +/// +internal sealed class StreamSnapshot +{ + [JsonPropertyName("messages")] public ulong Msgs { get; set; } + [JsonPropertyName("bytes")] public ulong Bytes { get; set; } + [JsonPropertyName("first_seq")] public ulong FirstSeq { get; set; } + [JsonPropertyName("last_seq")] public ulong LastSeq { get; set; } + [JsonPropertyName("clfs")] public ulong Failed { get; set; } + [JsonPropertyName("deleted")] public ulong[]? Deleted { get; set; } +} + +// ============================================================================ +// StreamSyncRequest +// ============================================================================ + +/// +/// Request sent by a lagging follower to trigger stream catch-up from the leader. +/// Mirrors Go streamSyncRequest struct in server/jetstream_cluster.go lines 9707-9713. +/// +internal sealed class StreamSyncRequest +{ + [JsonPropertyName("peer")] public string? Peer { get; set; } + [JsonPropertyName("first_seq")] public ulong FirstSeq { get; set; } + [JsonPropertyName("last_seq")] public ulong LastSeq { get; set; } + [JsonPropertyName("delete_ranges")] public bool DeleteRangesOk { get; set; } + [JsonPropertyName("min_applied")] public ulong MinApplied { get; set; } +} + +// ============================================================================ +// Stub API request/response types (referenced by cluster types; full impl later) +// ============================================================================ + +/// Stub: full definition in session 21 (jetstream_api.go). +internal sealed class JSApiStreamPurgeRequest +{ + [JsonPropertyName("filter")] public string? Filter { get; set; } + [JsonPropertyName("seq")] public ulong? Seq { get; set; } + [JsonPropertyName("keep")] public ulong? Keep { get; set; } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs new file mode 100644 index 0000000..c005cad --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs @@ -0,0 +1,622 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/raft.go in the NATS server Go source. + +using System.Collections.Concurrent; +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// RaftNode interface +// ============================================================================ + +/// +/// Primary interface for a NATS Consensus Group (NRG) Raft node. +/// Mirrors Go RaftNode interface in server/raft.go lines 40-92. +/// Replaces the stub declared in NatsServerTypes.cs. +/// +public interface IRaftNode +{ + // --- Proposal --- + void Propose(byte[] entry); + void ProposeMulti(IReadOnlyList entries); + void ForwardProposal(byte[] entry); + + // --- Snapshot --- + void InstallSnapshot(byte[] snap, bool force); + /// Returns ; typed as object to keep the public interface free of internal types. + object CreateSnapshotCheckpoint(bool force); + void SendSnapshot(byte[] snap); + bool NeedSnapshot(); + + // --- State queries --- + (ulong Entries, ulong Bytes) Applied(ulong index); + (ulong Entries, ulong Bytes) Processed(ulong index, ulong applied); + RaftState State(); + (ulong Entries, ulong Bytes) Size(); + (ulong Index, ulong Commit, ulong Applied) Progress(); + bool Leader(); + DateTime? LeaderSince(); + bool Quorum(); + bool Current(); + bool Healthy(); + ulong Term(); + bool Leaderless(); + string GroupLeader(); + bool HadPreviousLeader(); + + // --- Leadership / observer --- + void StepDown(params string[] preferred); + void SetObserver(bool isObserver); + bool IsObserver(); + void Campaign(); + void CampaignImmediately(); + + // --- Identity --- + string ID(); + string Group(); + + // --- Peer management --- + IReadOnlyList Peers(); + void ProposeKnownPeers(IReadOnlyList knownPeers); + void UpdateKnownPeers(IReadOnlyList knownPeers); + void ProposeAddPeer(string peer); + void ProposeRemovePeer(string peer); + bool MembershipChangeInProgress(); + void AdjustClusterSize(int csz); + void AdjustBootClusterSize(int csz); + int ClusterSize(); + + // --- Apply queue --- + IpQueue ApplyQ(); + void PauseApply(); + void ResumeApply(); + bool DrainAndReplaySnapshot(); + + // --- Channels / lifecycle --- + ChannelReader LeadChangeC(); + ChannelReader QuitC(); + DateTime Created(); + void Stop(); + void WaitForStop(); + void Delete(); + bool IsDeleted(); + void RecreateInternalSubs(); + bool IsSystemAccount(); + string GetTrafficAccountName(); +} + +// ============================================================================ +// RaftNodeCheckpoint interface +// ============================================================================ + +/// +/// Allows asynchronous snapshot installation from a checkpoint. +/// Mirrors Go RaftNodeCheckpoint interface in server/raft.go lines 98-103. +/// Internal because it references internal types (AppendEntry). +/// +internal interface IRaftNodeCheckpoint +{ + byte[] LoadLastSnapshot(); + IEnumerable<(AppendEntry Entry, Exception? Error)> AppendEntriesSeq(); + void Abort(); + ulong InstallSnapshot(byte[] data); +} + +// ============================================================================ +// IWal interface +// ============================================================================ + +/// +/// Write-ahead log abstraction used by the Raft implementation. +/// Mirrors Go WAL interface in server/raft.go lines 105-118. +/// +internal interface IWal +{ + StorageType Type(); + (ulong Seq, long TimeStamp) StoreMsg(string subj, byte[]? hdr, byte[] msg, long ttl); + StoreMsg? LoadMsg(ulong index, StoreMsg? sm); + bool RemoveMsg(ulong index); + ulong Compact(ulong index); + ulong Purge(); + ulong PurgeEx(string subject, ulong seq, ulong keep); + void Truncate(ulong seq); + StreamState State(); + void FastState(ref StreamState state); + void Stop(); + void Delete(bool inline); +} + +// ============================================================================ +// Peer +// ============================================================================ + +/// +/// Represents a peer in a Raft group. +/// Mirrors Go Peer struct in server/raft.go lines 120-125. +/// +public sealed class Peer +{ + public string Id { get; set; } = string.Empty; + public bool Current { get; set; } + public DateTime Last { get; set; } + public ulong Lag { get; set; } +} + +// ============================================================================ +// RaftState enum +// ============================================================================ + +/// +/// Allowable states for a NATS Consensus Group node. +/// Mirrors Go RaftState iota in server/raft.go lines 128-135. +/// +public enum RaftState : byte +{ + Follower = 0, + Leader = 1, + Candidate = 2, + Closed = 3, +} + +// ============================================================================ +// RaftConfig +// ============================================================================ + +/// +/// Configuration for creating a Raft group. +/// Mirrors Go RaftConfig struct in server/raft.go lines 301-317. +/// +public sealed class RaftConfig +{ + public string Name { get; set; } = string.Empty; + public string Store { get; set; } = string.Empty; + /// WAL store — typed as object to keep public API free of internal IWal. + public object? Log { get; set; } + public bool Track { get; set; } + public bool Observer { get; set; } + /// + /// Must be set for a Raft group that's recovering after a restart, or if first + /// seen after a catchup from another server. + /// + public bool Recovering { get; set; } + /// + /// Identifies the Raft peer set is being scaled up; prevents an empty-log node + /// from becoming leader prematurely. + /// + public bool ScaleUp { get; set; } +} + +// ============================================================================ +// Internal Raft state types +// ============================================================================ + +/// +/// Main Raft node implementation. +/// Mirrors Go raft struct in server/raft.go lines 151-251. +/// All algorithm methods are stubbed — full implementation is session 20+. +/// +internal sealed class Raft : IRaftNode +{ + // Identity / location + internal DateTime Created_ { get; set; } + internal string AccName { get; set; } = string.Empty; + internal string GroupName { get; set; } = string.Empty; + internal string StoreDir { get; set; } = string.Empty; + internal string Id { get; set; } = string.Empty; + + // WAL (IWal is internal; store as object here to keep the field in the class without accessibility issues) + internal object? Wal { get; set; } + internal StorageType WalType { get; set; } + internal ulong WalBytes { get; set; } + internal Exception? WriteErr { get; set; } + + // Atomic state + internal int StateValue { get; set; } // RaftState (use Interlocked) + internal long LeaderStateV { get; set; } // 1 = in complete leader state + internal string SnapFile { get; set; } = string.Empty; + + // Cluster + internal int Csz { get; set; } // cluster size + internal int Qn { get; set; } // quorum node count + internal Dictionary Peers_ { get; set; } = new(); + + // Tracking removed peers + internal Dictionary Removed { get; set; } = new(); + internal Dictionary> Acks { get; set; } = new(); + internal Dictionary Pae { get; set; } = new(); + + // Timers / activity + internal System.Threading.Timer? Elect { get; set; } + internal DateTime Active { get; set; } + internal DateTime Llqrt { get; set; } + internal DateTime Lsut { get; set; } + + // Term / index tracking + internal ulong Term_ { get; set; } + internal ulong PTerm { get; set; } + internal ulong PIndex { get; set; } + internal ulong Commit { get; set; } + internal ulong Processed_ { get; set; } + internal ulong Applied_ { get; set; } + internal ulong PApplied { get; set; } + + internal ulong MembChangeIndex { get; set; } + internal ulong Aflr { get; set; } + + internal string LeaderId { get; set; } = string.Empty; + internal string Vote { get; set; } = string.Empty; + + // Server references (object to avoid circular deps) + internal object? Server_ { get; set; } + internal object? Client_ { get; set; } + internal object? JetStream_ { get; set; } + + // Atomic booleans — must be fields (not auto-properties) for Interlocked + internal long HasLeaderV; + internal long PLeaderV; + private long _isSysAccV; + + // NATS subjects + internal string PSubj { get; set; } = string.Empty; + internal string RpSubj { get; set; } = string.Empty; + internal string VSubj { get; set; } = string.Empty; + internal string VReply { get; set; } = string.Empty; + internal string ASubj { get; set; } = string.Empty; + internal string AReply { get; set; } = string.Empty; + + // Queues (object placeholder — IpQueue from session 02) + internal object? SendQ { get; set; } + internal object? AeSub { get; set; } + + // Write buffers + internal byte[] Wtv { get; set; } = []; + internal byte[] Wps { get; set; } = []; + + // Catchup / progress + internal CatchupState? Catchup { get; set; } + internal Dictionary>? Progress_ { get; set; } + + internal ulong HCommit { get; set; } + + // Queues (typed as object to avoid pulling IpQueue directly into the struct boundary) + internal IpQueue? PropQ { get; set; } + internal IpQueue? EntryQ { get; set; } + internal IpQueue? RespQ { get; set; } + internal IpQueue? ApplyQ_ { get; set; } + internal IpQueue? Reqs { get; set; } + internal IpQueue? Votes_ { get; set; } + + internal Channel? LeadC { get; set; } + internal Channel? Quit { get; set; } + + // Flags + internal bool Lxfer { get; set; } + internal bool HcBehind { get; set; } + internal bool MaybeLeader { get; set; } + internal bool Paused { get; set; } + internal bool Observer_ { get; set; } + internal bool Initializing { get; set; } + internal bool ScaleUp_ { get; set; } + internal bool Deleted_ { get; set; } + internal bool Snapshotting { get; set; } + + // Lock + private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.SupportsRecursion); + + // ----------------------------------------------------------------------- + // IRaftNode — stub implementations + // ----------------------------------------------------------------------- + public void Propose(byte[] entry) => throw new NotImplementedException("TODO: session 20 — raft"); + public void ProposeMulti(IReadOnlyList entries) => throw new NotImplementedException("TODO: session 20 — raft"); + public void ForwardProposal(byte[] entry) => throw new NotImplementedException("TODO: session 20 — raft"); + public void InstallSnapshot(byte[] snap, bool force) => throw new NotImplementedException("TODO: session 20 — raft"); + public object CreateSnapshotCheckpoint(bool force) => throw new NotImplementedException("TODO: session 20 — raft"); + public void SendSnapshot(byte[] snap) => throw new NotImplementedException("TODO: session 20 — raft"); + public bool NeedSnapshot() => throw new NotImplementedException("TODO: session 20 — raft"); + public (ulong, ulong) Applied(ulong index) => throw new NotImplementedException("TODO: session 20 — raft"); + public (ulong, ulong) Processed(ulong index, ulong applied) => throw new NotImplementedException("TODO: session 20 — raft"); + public RaftState State() => (RaftState)StateValue; + public (ulong, ulong) Size() => throw new NotImplementedException("TODO: session 20 — raft"); + public (ulong, ulong, ulong) Progress() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool Leader() => throw new NotImplementedException("TODO: session 20 — raft"); + public DateTime? LeaderSince() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool Quorum() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool Current() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool Healthy() => throw new NotImplementedException("TODO: session 20 — raft"); + public ulong Term() => Term_; + public bool Leaderless() => throw new NotImplementedException("TODO: session 20 — raft"); + public string GroupLeader() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool HadPreviousLeader() => throw new NotImplementedException("TODO: session 20 — raft"); + public void StepDown(params string[] preferred) => throw new NotImplementedException("TODO: session 20 — raft"); + public void SetObserver(bool isObserver) => throw new NotImplementedException("TODO: session 20 — raft"); + public bool IsObserver() => throw new NotImplementedException("TODO: session 20 — raft"); + public void Campaign() => throw new NotImplementedException("TODO: session 20 — raft"); + public void CampaignImmediately() => throw new NotImplementedException("TODO: session 20 — raft"); + public string ID() => Id; + public string Group() => GroupName; + public IReadOnlyList Peers() => throw new NotImplementedException("TODO: session 20 — raft"); + public void ProposeKnownPeers(IReadOnlyList knownPeers) => throw new NotImplementedException("TODO: session 20 — raft"); + public void UpdateKnownPeers(IReadOnlyList knownPeers) => throw new NotImplementedException("TODO: session 20 — raft"); + public void ProposeAddPeer(string peer) => throw new NotImplementedException("TODO: session 20 — raft"); + public void ProposeRemovePeer(string peer) => throw new NotImplementedException("TODO: session 20 — raft"); + public bool MembershipChangeInProgress() => throw new NotImplementedException("TODO: session 20 — raft"); + public void AdjustClusterSize(int csz) => throw new NotImplementedException("TODO: session 20 — raft"); + public void AdjustBootClusterSize(int csz) => throw new NotImplementedException("TODO: session 20 — raft"); + public int ClusterSize() => throw new NotImplementedException("TODO: session 20 — raft"); + public IpQueue ApplyQ() => ApplyQ_ ?? throw new InvalidOperationException("Apply queue not initialized"); + public void PauseApply() => throw new NotImplementedException("TODO: session 20 — raft"); + public void ResumeApply() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool DrainAndReplaySnapshot() => throw new NotImplementedException("TODO: session 20 — raft"); + public ChannelReader LeadChangeC() => LeadC?.Reader ?? throw new InvalidOperationException("Lead channel not initialized"); + public ChannelReader QuitC() => Quit?.Reader ?? throw new InvalidOperationException("Quit channel not initialized"); + public DateTime Created() => Created_; + public void Stop() => throw new NotImplementedException("TODO: session 20 — raft"); + public void WaitForStop() => throw new NotImplementedException("TODO: session 20 — raft"); + public void Delete() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool IsDeleted() => Deleted_; + public void RecreateInternalSubs() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool IsSystemAccount() => Interlocked.Read(ref _isSysAccV) != 0; + public string GetTrafficAccountName() => throw new NotImplementedException("TODO: session 20 — raft"); +} + +// ============================================================================ +// ProposedEntry +// ============================================================================ + +/// +/// An entry that has been proposed to the leader, with an optional reply subject. +/// Mirrors Go proposedEntry struct in server/raft.go lines 253-256. +/// +internal sealed class ProposedEntry +{ + public Entry? Entry { get; set; } + public string Reply { get; set; } = string.Empty; +} + +// ============================================================================ +// CatchupState +// ============================================================================ + +/// +/// Tracks the state of a follower catch-up operation. +/// Mirrors Go catchupState struct in server/raft.go lines 259-268. +/// +internal sealed class CatchupState +{ + /// Subscription that catchup messages arrive on (object to avoid session dep). + public object? Sub { get; set; } + public ulong CTerm { get; set; } + public ulong CIndex { get; set; } + public ulong PTerm { get; set; } + public ulong PIndex { get; set; } + public DateTime Active { get; set; } + public bool Signal { get; set; } +} + +// ============================================================================ +// Lps — leader peer state +// ============================================================================ + +/// +/// Per-peer state tracked by the leader: last timestamp and last index replicated. +/// Mirrors Go lps struct in server/raft.go lines 271-275. +/// +internal sealed class Lps +{ + /// Last timestamp. + public DateTime Ts { get; set; } + /// Last index replicated. + public ulong Li { get; set; } + /// Whether this is a known peer. + public bool Kp { get; set; } +} + +// ============================================================================ +// Snapshot (internal) +// ============================================================================ + +/// +/// An encoded Raft snapshot (on-disk format). +/// Mirrors Go snapshot struct in server/raft.go lines 1243-1248. +/// +internal sealed class Snapshot +{ + public ulong LastTerm { get; set; } + public ulong LastIndex { get; set; } + public byte[] PeerState { get; set; } = []; + public byte[] Data { get; set; } = []; +} + +// ============================================================================ +// Checkpoint (internal) +// ============================================================================ + +/// +/// Checkpoint for asynchronous snapshot installation. +/// Mirrors Go checkpoint struct in server/raft.go lines 1414-1421. +/// Implements . +/// +internal sealed class Checkpoint : IRaftNodeCheckpoint +{ + public Raft? Node { get; set; } + public ulong Term { get; set; } + public ulong Applied { get; set; } + public ulong PApplied { get; set; } + public string SnapFile { get; set; } = string.Empty; + public byte[] PeerState { get; set; } = []; + + public byte[] LoadLastSnapshot() + => throw new NotImplementedException("TODO: session 20 — raft"); + + public IEnumerable<(AppendEntry Entry, Exception? Error)> AppendEntriesSeq() + => throw new NotImplementedException("TODO: session 20 — raft"); + + public void Abort() + => throw new NotImplementedException("TODO: session 20 — raft"); + + public ulong InstallSnapshot(byte[] data) + => throw new NotImplementedException("TODO: session 20 — raft"); +} + +// ============================================================================ +// CommittedEntry +// ============================================================================ + +/// +/// A committed Raft entry passed up to the JetStream layer for application. +/// Mirrors Go CommittedEntry struct in server/raft.go lines 2506-2509. +/// +public sealed class CommittedEntry +{ + public ulong Index { get; set; } + public List Entries { get; set; } = new(); +} + +// ============================================================================ +// AppendEntry (internal) +// ============================================================================ + +/// +/// The main struct used to sync Raft peers. +/// Mirrors Go appendEntry struct in server/raft.go lines 2557-2568. +/// +internal sealed class AppendEntry +{ + public string Leader { get; set; } = string.Empty; + public ulong TermV { get; set; } + public ulong Commit { get; set; } + public ulong PTerm { get; set; } + public ulong PIndex { get; set; } + public List Entries { get; set; } = new(); + /// Highest term for catchups (if 0, use Term). + public ulong LTerm { get; set; } + public string Reply { get; set; } = string.Empty; + /// Subscription the append entry arrived on (object to avoid session dep). + public object? Sub { get; set; } + public byte[]? Buf { get; set; } +} + +// ============================================================================ +// EntryType enum +// ============================================================================ + +/// +/// Type of a Raft log entry. +/// Mirrors Go EntryType iota in server/raft.go lines 2605-2619. +/// +public enum EntryType : byte +{ + EntryNormal = 0, + EntryOldSnapshot = 1, + EntryPeerState = 2, + EntryAddPeer = 3, + EntryRemovePeer = 4, + EntryLeaderTransfer = 5, + EntrySnapshot = 6, + /// Internal signal type — not transmitted between peers or stored in the log. + EntryCatchup = 7, +} + +// ============================================================================ +// Entry +// ============================================================================ + +/// +/// A single Raft log entry (type + opaque payload). +/// Mirrors Go Entry struct in server/raft.go lines 2641-2643. +/// +public sealed class Entry +{ + public EntryType Type { get; set; } + public byte[] Data { get; set; } = []; + + /// + /// Returns true if this entry changes group membership. + /// Mirrors Go Entry.ChangesMembership. + /// + public bool ChangesMembership() + => Type == EntryType.EntryAddPeer || Type == EntryType.EntryRemovePeer; +} + +// ============================================================================ +// AppendEntryResponse (internal) +// ============================================================================ + +/// +/// Response sent by a follower after receiving an append-entry RPC. +/// Mirrors Go appendEntryResponse struct in server/raft.go lines 2760-2766. +/// +internal sealed class AppendEntryResponse +{ + public ulong TermV { get; set; } + public ulong Index { get; set; } + public string Peer { get; set; } = string.Empty; + public string Reply { get; set; } = string.Empty; + public bool Success { get; set; } +} + +// ============================================================================ +// PeerState (internal) +// ============================================================================ + +/// +/// Encoded peer state attached to snapshots and peer-state entries. +/// Mirrors Go peerState struct in server/raft.go lines 4470-4474. +/// +internal sealed class PeerState +{ + public List KnownPeers { get; set; } = new(); + public int ClusterSize { get; set; } + /// Extension / domain state (opaque ushort in Go). + public ushort DomainExt { get; set; } +} + +// ============================================================================ +// VoteRequest (internal) +// ============================================================================ + +/// +/// A Raft vote request sent during leader election. +/// Mirrors Go voteRequest struct in server/raft.go lines 4549-4556. +/// +internal sealed class VoteRequest +{ + public ulong TermV { get; set; } + public ulong LastTerm { get; set; } + public ulong LastIndex { get; set; } + public string Candidate { get; set; } = string.Empty; + /// Internal use — reply subject. + public string Reply { get; set; } = string.Empty; +} + +// ============================================================================ +// VoteResponse (internal) +// ============================================================================ + +/// +/// A response to a . +/// Mirrors Go voteResponse struct in server/raft.go lines 4730-4735. +/// +internal sealed class VoteResponse +{ + public ulong TermV { get; set; } + public string Peer { get; set; } = string.Empty; + public bool Granted { get; set; } + /// Whether this peer's log is empty. + public bool Empty { get; set; } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs index 7623d00..aee77b3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs @@ -249,8 +249,7 @@ internal static class RateCounterFactory => new(rateLimit); } -/// Stub for RaftNode (session 20). -public interface IRaftNode { } +// IRaftNode is now fully defined in JetStream/RaftTypes.cs (session 20). // ============================================================================ // Session 10: Ports, TlsMixConn, CaptureHttpServerLog diff --git a/porting.db b/porting.db index ff67152..075a652 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index aff4275..071886e 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 21:14:41 UTC +Generated: 2026-02-26 21:23:40 UTC ## Modules (12 total) @@ -13,9 +13,9 @@ Generated: 2026-02-26 21:14:41 UTC | Status | Count | |--------|-------| -| complete | 2422 | +| complete | 2851 | | n_a | 77 | -| not_started | 1081 | +| not_started | 652 | | stub | 93 | ## Unit Tests (3257 total) @@ -36,4 +36,4 @@ Generated: 2026-02-26 21:14:41 UTC ## Overall Progress -**3010/6942 items complete (43.4%)** +**3439/6942 items complete (49.5%)** diff --git a/reports/report_84d450b.md b/reports/report_84d450b.md new file mode 100644 index 0000000..071886e --- /dev/null +++ b/reports/report_84d450b.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 21:23:40 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 2851 | +| n_a | 77 | +| not_started | 652 | +| stub | 93 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 319 | +| n_a | 181 | +| not_started | 2533 | +| stub | 224 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**3439/6942 items complete (49.5%)**