feat: port session 20 — JetStream Cluster & Raft types

Port IRaftNode (37-method interface), Raft, RaftState, EntryType, Entry,
AppendEntry, CommittedEntry, VoteRequest/VoteResponse, PeerState from
jetstream_cluster.go; JetStreamCluster, StreamAssignment, ConsumerAssignment,
EntryOp (19 values) and supporting types from jetstream_cluster.go.

Removes IRaftNode stub from NatsServerTypes.cs.
429 features marked complete (IDs 2599-2796, 1520-1750).
This commit is contained in:
Joseph Doherty
2026-02-26 16:23:39 -05:00
parent 84d450b4a0
commit e6bc76b315
6 changed files with 1190 additions and 6 deletions

View File

@@ -0,0 +1,524 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/jetstream_cluster.go in the NATS server Go source.
using System.Text.Json;
using System.Text.Json.Serialization;
namespace ZB.MOM.NatsNet.Server;
// ============================================================================
// JetStreamCluster
// ============================================================================
/// <summary>
/// Holds cluster-level JetStream state on each server: the meta-controller Raft node,
/// stream/consumer assignment maps, and inflight proposal tracking.
/// Mirrors Go <c>jetStreamCluster</c> struct in server/jetstream_cluster.go lines 44-84.
/// </summary>
internal sealed class JetStreamCluster
{
/// <summary>The meta-controller Raft node.</summary>
public IRaftNode? Meta { get; set; }
/// <summary>
/// All known stream assignments. Key: account name → stream name → assignment.
/// </summary>
public Dictionary<string, Dictionary<string, StreamAssignment>> Streams { get; set; } = new();
/// <summary>
/// Inflight stream create/update/delete proposals. Key: account → stream name.
/// </summary>
public Dictionary<string, Dictionary<string, InflightStreamInfo>> InflightStreams { get; set; } = new();
/// <summary>
/// Inflight consumer create/update/delete proposals. Key: account → stream → consumer.
/// </summary>
public Dictionary<string, Dictionary<string, Dictionary<string, InflightConsumerInfo>>> InflightConsumers { get; set; } = new();
/// <summary>
/// Peer-remove reply subjects pending quorum. Key: peer ID.
/// </summary>
public Dictionary<string, PeerRemoveInfo> PeerRemoveReply { get; set; } = new();
/// <summary>Signals meta-leader should re-check stream assignments.</summary>
public bool StreamsCheck { get; set; }
/// <summary>Reference to the top-level server (object to avoid circular dep).</summary>
public object? Server { get; set; }
/// <summary>Internal client for cluster messaging (object to avoid circular dep).</summary>
public object? Client { get; set; }
/// <summary>Subscription that receives stream assignment results (object to avoid session dep).</summary>
public object? StreamResults { get; set; }
/// <summary>Subscription that receives consumer assignment results (object to avoid session dep).</summary>
public object? ConsumerResults { get; set; }
/// <summary>Subscription for meta-leader step-down requests.</summary>
public object? Stepdown { get; set; }
/// <summary>Subscription for peer-remove requests.</summary>
public object? PeerRemove { get; set; }
/// <summary>Subscription for stream-move requests.</summary>
public object? PeerStreamMove { get; set; }
/// <summary>Subscription for stream-move cancellation.</summary>
public object? PeerStreamCancelMove { get; set; }
/// <summary>Channel used to pop out monitorCluster before the raft layer.</summary>
public System.Threading.Channels.Channel<bool>? Qch { get; set; }
/// <summary>Notifies that monitorCluster has actually stopped.</summary>
public System.Threading.Channels.Channel<bool>? Stopped { get; set; }
/// <summary>Last meta-snapshot time (Unix nanoseconds).</summary>
public long LastMetaSnapTime { get; set; }
/// <summary>Duration of last meta-snapshot (nanoseconds).</summary>
public long LastMetaSnapDuration { get; set; }
}
// ============================================================================
// InflightStreamInfo
// ============================================================================
/// <summary>
/// Tracks inflight stream create/update/delete proposals.
/// Mirrors Go <c>inflightStreamInfo</c> in server/jetstream_cluster.go lines 87-91.
/// </summary>
internal sealed class InflightStreamInfo
{
/// <summary>Number of inflight operations.</summary>
public ulong Ops { get; set; }
/// <summary>Whether the stream has been deleted.</summary>
public bool Deleted { get; set; }
public StreamAssignment? Assignment { get; set; }
}
// ============================================================================
// InflightConsumerInfo
// ============================================================================
/// <summary>
/// Tracks inflight consumer create/update/delete proposals.
/// Mirrors Go <c>inflightConsumerInfo</c> in server/jetstream_cluster.go lines 94-98.
/// </summary>
internal sealed class InflightConsumerInfo
{
/// <summary>Number of inflight operations.</summary>
public ulong Ops { get; set; }
/// <summary>Whether the consumer has been deleted.</summary>
public bool Deleted { get; set; }
public ConsumerAssignment? Assignment { get; set; }
}
// ============================================================================
// PeerRemoveInfo
// ============================================================================
/// <summary>
/// Holds the reply information for a peer-remove request pending quorum.
/// Mirrors Go <c>peerRemoveInfo</c> in server/jetstream_cluster.go lines 101-106.
/// </summary>
internal sealed class PeerRemoveInfo
{
/// <summary>Client info from the request (object to avoid session dep).</summary>
public ClientInfo? ClientInfo { get; set; }
public string Subject { get; set; } = string.Empty;
public string Reply { get; set; } = string.Empty;
public string Request { get; set; } = string.Empty;
}
// ============================================================================
// EntryOp enum
// ============================================================================
/// <summary>
/// Operation type encoded in a JetStream cluster Raft entry.
/// Mirrors Go <c>entryOp</c> iota in server/jetstream_cluster.go lines 116-150.
/// ONLY ADD TO THE END — inserting values breaks server interoperability.
/// </summary>
internal enum EntryOp : byte
{
// Meta ops
AssignStreamOp = 0,
AssignConsumerOp = 1,
RemoveStreamOp = 2,
RemoveConsumerOp = 3,
// Stream ops
StreamMsgOp = 4,
PurgeStreamOp = 5,
DeleteMsgOp = 6,
// Consumer ops
UpdateDeliveredOp = 7,
UpdateAcksOp = 8,
// Compressed consumer assignments
AssignCompressedConsumerOp = 9,
// Filtered consumer skip
UpdateSkipOp = 10,
// Update stream
UpdateStreamOp = 11,
// Pending pull requests
AddPendingRequest = 12,
RemovePendingRequest = 13,
// Compressed streams (RAFT or catchup)
CompressedStreamMsgOp = 14,
// Deleted gaps on catchups for replicas
DeleteRangeOp = 15,
// Batch stream ops
BatchMsgOp = 16,
BatchCommitMsgOp = 17,
// Consumer reset to specific starting sequence
ResetSeqOp = 18,
}
// ============================================================================
// RaftGroup
// ============================================================================
/// <summary>
/// Describes a Raft consensus group that houses streams and consumers.
/// Controlled by the meta-group controller.
/// Mirrors Go <c>raftGroup</c> struct in server/jetstream_cluster.go lines 154-163.
/// </summary>
internal sealed class RaftGroup
{
[JsonPropertyName("name")] public string Name { get; set; } = string.Empty;
[JsonPropertyName("peers")] public string[] Peers { get; set; } = [];
[JsonPropertyName("store")] public StorageType Storage { get; set; }
[JsonPropertyName("cluster")] public string? Cluster { get; set; }
[JsonPropertyName("preferred")] public string? Preferred { get; set; }
[JsonPropertyName("scale_up")] public bool ScaleUp { get; set; }
/// <summary>Internal Raft node — not serialized.</summary>
[JsonIgnore]
public IRaftNode? Node { get; set; }
}
// ============================================================================
// StreamAssignment
// ============================================================================
/// <summary>
/// What the meta controller uses to assign streams to peers.
/// Mirrors Go <c>streamAssignment</c> struct in server/jetstream_cluster.go lines 166-184.
/// </summary>
internal sealed class StreamAssignment
{
[JsonPropertyName("client")] public ClientInfo? Client { get; set; }
[JsonPropertyName("created")] public DateTime Created { get; set; }
[JsonPropertyName("stream")] public JsonElement ConfigJson { get; set; }
[JsonIgnore] public StreamConfig? Config { get; set; }
[JsonPropertyName("group")] public RaftGroup? Group { get; set; }
[JsonPropertyName("sync")] public string Sync { get; set; } = string.Empty;
[JsonPropertyName("subject")] public string? Subject { get; set; }
[JsonPropertyName("reply")] public string? Reply { get; set; }
[JsonPropertyName("restore_state")] public StreamState? Restore { get; set; }
// Internal (not serialized)
[JsonIgnore] public Dictionary<string, ConsumerAssignment>? Consumers { get; set; }
[JsonIgnore] public bool Responded { get; set; }
[JsonIgnore] public bool Recovering { get; set; }
[JsonIgnore] public bool Reassigning { get; set; }
[JsonIgnore] public bool Resetting { get; set; }
[JsonIgnore] public Exception? Error { get; set; }
[JsonIgnore] public UnsupportedStreamAssignment? Unsupported { get; set; }
}
// ============================================================================
// UnsupportedStreamAssignment
// ============================================================================
/// <summary>
/// Holds state for a stream assignment that this server cannot run,
/// so that it can still respond to cluster info requests.
/// Mirrors Go <c>unsupportedStreamAssignment</c> in server/jetstream_cluster.go lines 186-191.
/// </summary>
internal sealed class UnsupportedStreamAssignment
{
public string Reason { get; set; } = string.Empty;
public StreamInfo Info { get; set; } = new();
/// <summary>Internal system client (object to avoid session dep).</summary>
public object? SysClient { get; set; }
/// <summary>Info subscription (object to avoid session dep).</summary>
public object? InfoSub { get; set; }
}
// ============================================================================
// ConsumerAssignment
// ============================================================================
/// <summary>
/// What the meta controller uses to assign consumers to streams.
/// Mirrors Go <c>consumerAssignment</c> struct in server/jetstream_cluster.go lines 250-266.
/// </summary>
internal sealed class ConsumerAssignment
{
[JsonPropertyName("client")] public ClientInfo? Client { get; set; }
[JsonPropertyName("created")] public DateTime Created { get; set; }
[JsonPropertyName("name")] public string Name { get; set; } = string.Empty;
[JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty;
[JsonPropertyName("consumer")] public JsonElement ConfigJson { get; set; }
[JsonIgnore] public ConsumerConfig? Config { get; set; }
[JsonPropertyName("group")] public RaftGroup? Group { get; set; }
[JsonPropertyName("subject")] public string? Subject { get; set; }
[JsonPropertyName("reply")] public string? Reply { get; set; }
[JsonPropertyName("state")] public ConsumerState? State { get; set; }
// Internal (not serialized)
[JsonIgnore] public bool Responded { get; set; }
[JsonIgnore] public bool Recovering { get; set; }
[JsonIgnore] public Exception? Error { get; set; }
[JsonIgnore] public UnsupportedConsumerAssignment? Unsupported { get; set; }
}
// ============================================================================
// UnsupportedConsumerAssignment
// ============================================================================
/// <summary>
/// Holds state for a consumer assignment that this server cannot run.
/// Mirrors Go <c>unsupportedConsumerAssignment</c> in server/jetstream_cluster.go lines 268-273.
/// </summary>
internal sealed class UnsupportedConsumerAssignment
{
public string Reason { get; set; } = string.Empty;
public ConsumerInfo Info { get; set; } = new();
/// <summary>Internal system client (object to avoid session dep).</summary>
public object? SysClient { get; set; }
/// <summary>Info subscription (object to avoid session dep).</summary>
public object? InfoSub { get; set; }
}
// ============================================================================
// WriteableConsumerAssignment
// ============================================================================
/// <summary>
/// Serialisable form of a consumer assignment used in meta-snapshots.
/// Mirrors Go <c>writeableConsumerAssignment</c> in server/jetstream_cluster.go lines 332-340.
/// </summary>
internal sealed class WriteableConsumerAssignment
{
[JsonPropertyName("client")] public ClientInfo? Client { get; set; }
[JsonPropertyName("created")] public DateTime Created { get; set; }
[JsonPropertyName("name")] public string Name { get; set; } = string.Empty;
[JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty;
[JsonPropertyName("consumer")] public JsonElement ConfigJson { get; set; }
[JsonPropertyName("group")] public RaftGroup? Group { get; set; }
[JsonPropertyName("state")] public ConsumerState? State { get; set; }
}
// ============================================================================
// StreamPurge
// ============================================================================
/// <summary>
/// What the stream leader replicates when purging a stream.
/// Mirrors Go <c>streamPurge</c> struct in server/jetstream_cluster.go lines 343-350.
/// </summary>
internal sealed class StreamPurge
{
[JsonPropertyName("client")] public ClientInfo? Client { get; set; }
[JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty;
[JsonPropertyName("last_seq")] public ulong LastSeq { get; set; }
[JsonPropertyName("subject")] public string Subject { get; set; } = string.Empty;
[JsonPropertyName("reply")] public string Reply { get; set; } = string.Empty;
[JsonPropertyName("request")] public JSApiStreamPurgeRequest? Request { get; set; }
}
// ============================================================================
// StreamMsgDelete
// ============================================================================
/// <summary>
/// What the stream leader replicates when deleting a message.
/// Mirrors Go <c>streamMsgDelete</c> struct in server/jetstream_cluster.go lines 353-360.
/// </summary>
internal sealed class StreamMsgDelete
{
[JsonPropertyName("client")] public ClientInfo? Client { get; set; }
[JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty;
[JsonPropertyName("seq")] public ulong Seq { get; set; }
[JsonPropertyName("no_erase")] public bool NoErase { get; set; }
[JsonPropertyName("subject")] public string Subject { get; set; } = string.Empty;
[JsonPropertyName("reply")] public string Reply { get; set; } = string.Empty;
}
// ============================================================================
// RecoveryUpdates
// ============================================================================
/// <summary>
/// Accumulates stream/consumer changes discovered during meta-recovery so they
/// can be applied after the full snapshot has been processed.
/// Mirrors Go <c>recoveryUpdates</c> struct in server/jetstream_cluster.go lines 1327-1333.
/// </summary>
internal sealed class RecoveryUpdates
{
public Dictionary<string, StreamAssignment> RemoveStreams { get; set; } = new();
public Dictionary<string, Dictionary<string, ConsumerAssignment>> RemoveConsumers { get; set; } = new();
public Dictionary<string, StreamAssignment> AddStreams { get; set; } = new();
public Dictionary<string, StreamAssignment> UpdateStreams { get; set; } = new();
public Dictionary<string, Dictionary<string, ConsumerAssignment>> UpdateConsumers { get; set; } = new();
}
// ============================================================================
// WriteableStreamAssignment
// ============================================================================
/// <summary>
/// Serialisable form of a stream assignment (with its consumers) used in meta-snapshots.
/// Mirrors Go <c>writeableStreamAssignment</c> in server/jetstream_cluster.go lines 1872-1879.
/// </summary>
internal sealed class WriteableStreamAssignment
{
[JsonPropertyName("client")] public ClientInfo? Client { get; set; }
[JsonPropertyName("created")] public DateTime Created { get; set; }
[JsonPropertyName("stream")] public JsonElement ConfigJson { get; set; }
[JsonPropertyName("group")] public RaftGroup? Group { get; set; }
[JsonPropertyName("sync")] public string Sync { get; set; } = string.Empty;
public List<WriteableConsumerAssignment> Consumers { get; set; } = new();
}
// ============================================================================
// ConsumerAssignmentResult
// ============================================================================
/// <summary>
/// Result sent by a member after processing a consumer assignment.
/// Mirrors Go <c>consumerAssignmentResult</c> in server/jetstream_cluster.go lines 5592-5597.
/// </summary>
internal sealed class ConsumerAssignmentResult
{
[JsonPropertyName("account")] public string Account { get; set; } = string.Empty;
[JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty;
[JsonPropertyName("consumer")] public string Consumer { get; set; } = string.Empty;
/// <summary>Stub: JSApiConsumerCreateResponse — full type in session 20+.</summary>
[JsonPropertyName("response")] public object? Response { get; set; }
}
// ============================================================================
// StreamAssignmentResult
// ============================================================================
/// <summary>
/// Result sent by a member after processing a stream assignment.
/// Mirrors Go <c>streamAssignmentResult</c> in server/jetstream_cluster.go lines 6779-6785.
/// </summary>
internal sealed class StreamAssignmentResult
{
[JsonPropertyName("account")] public string Account { get; set; } = string.Empty;
[JsonPropertyName("stream")] public string Stream { get; set; } = string.Empty;
/// <summary>Stub: JSApiStreamCreateResponse — full type in session 20+.</summary>
[JsonPropertyName("create_response")] public object? Response { get; set; }
/// <summary>Stub: JSApiStreamRestoreResponse — full type in session 20+.</summary>
[JsonPropertyName("restore_response")] public object? Restore { get; set; }
[JsonPropertyName("is_update")] public bool Update { get; set; }
}
// ============================================================================
// SelectPeerError
// ============================================================================
/// <summary>
/// Collects the reasons why no suitable peer could be found for a placement.
/// Mirrors Go <c>selectPeerError</c> struct in server/jetstream_cluster.go lines 7113-7121.
/// </summary>
internal sealed class SelectPeerError : Exception
{
public bool ExcludeTag { get; set; }
public bool Offline { get; set; }
public bool NoStorage { get; set; }
public bool UniqueTag { get; set; }
public bool Misc { get; set; }
public bool NoJsClust { get; set; }
public HashSet<string>? NoMatchTags { get; set; }
public HashSet<string>? ExcludeTags { get; set; }
public override string Message => BuildMessage();
private string BuildMessage()
{
var b = new System.Text.StringBuilder("no suitable peers for placement");
if (Offline) b.Append(", peer offline");
if (ExcludeTag) b.Append(", exclude tag set");
if (NoStorage) b.Append(", insufficient storage");
if (UniqueTag) b.Append(", server tag not unique");
if (Misc) b.Append(", miscellaneous issue");
if (NoJsClust) b.Append(", jetstream not enabled in cluster");
if (NoMatchTags is { Count: > 0 })
{
b.Append(", tags not matched [");
b.Append(string.Join(", ", NoMatchTags));
b.Append(']');
}
if (ExcludeTags is { Count: > 0 })
{
b.Append(", tags excluded [");
b.Append(string.Join(", ", ExcludeTags));
b.Append(']');
}
return b.ToString();
}
}
// ============================================================================
// StreamSnapshot
// ============================================================================
/// <summary>
/// JSON-serialisable snapshot of a stream's state for cluster catchup.
/// Mirrors Go <c>streamSnapshot</c> struct in server/jetstream_cluster.go lines 9454-9461.
/// </summary>
internal sealed class StreamSnapshot
{
[JsonPropertyName("messages")] public ulong Msgs { get; set; }
[JsonPropertyName("bytes")] public ulong Bytes { get; set; }
[JsonPropertyName("first_seq")] public ulong FirstSeq { get; set; }
[JsonPropertyName("last_seq")] public ulong LastSeq { get; set; }
[JsonPropertyName("clfs")] public ulong Failed { get; set; }
[JsonPropertyName("deleted")] public ulong[]? Deleted { get; set; }
}
// ============================================================================
// StreamSyncRequest
// ============================================================================
/// <summary>
/// Request sent by a lagging follower to trigger stream catch-up from the leader.
/// Mirrors Go <c>streamSyncRequest</c> struct in server/jetstream_cluster.go lines 9707-9713.
/// </summary>
internal sealed class StreamSyncRequest
{
[JsonPropertyName("peer")] public string? Peer { get; set; }
[JsonPropertyName("first_seq")] public ulong FirstSeq { get; set; }
[JsonPropertyName("last_seq")] public ulong LastSeq { get; set; }
[JsonPropertyName("delete_ranges")] public bool DeleteRangesOk { get; set; }
[JsonPropertyName("min_applied")] public ulong MinApplied { get; set; }
}
// ============================================================================
// Stub API request/response types (referenced by cluster types; full impl later)
// ============================================================================
/// <summary>Stub: full definition in session 21 (jetstream_api.go).</summary>
internal sealed class JSApiStreamPurgeRequest
{
[JsonPropertyName("filter")] public string? Filter { get; set; }
[JsonPropertyName("seq")] public ulong? Seq { get; set; }
[JsonPropertyName("keep")] public ulong? Keep { get; set; }
}

View File

@@ -0,0 +1,622 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/raft.go in the NATS server Go source.
using System.Collections.Concurrent;
using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
// ============================================================================
// RaftNode interface
// ============================================================================
/// <summary>
/// Primary interface for a NATS Consensus Group (NRG) Raft node.
/// Mirrors Go <c>RaftNode</c> interface in server/raft.go lines 40-92.
/// Replaces the stub declared in NatsServerTypes.cs.
/// </summary>
public interface IRaftNode
{
// --- Proposal ---
void Propose(byte[] entry);
void ProposeMulti(IReadOnlyList<Entry> entries);
void ForwardProposal(byte[] entry);
// --- Snapshot ---
void InstallSnapshot(byte[] snap, bool force);
/// <remarks>Returns <see cref="IRaftNodeCheckpoint"/>; typed as object to keep the public interface free of internal types.</remarks>
object CreateSnapshotCheckpoint(bool force);
void SendSnapshot(byte[] snap);
bool NeedSnapshot();
// --- State queries ---
(ulong Entries, ulong Bytes) Applied(ulong index);
(ulong Entries, ulong Bytes) Processed(ulong index, ulong applied);
RaftState State();
(ulong Entries, ulong Bytes) Size();
(ulong Index, ulong Commit, ulong Applied) Progress();
bool Leader();
DateTime? LeaderSince();
bool Quorum();
bool Current();
bool Healthy();
ulong Term();
bool Leaderless();
string GroupLeader();
bool HadPreviousLeader();
// --- Leadership / observer ---
void StepDown(params string[] preferred);
void SetObserver(bool isObserver);
bool IsObserver();
void Campaign();
void CampaignImmediately();
// --- Identity ---
string ID();
string Group();
// --- Peer management ---
IReadOnlyList<Peer> Peers();
void ProposeKnownPeers(IReadOnlyList<string> knownPeers);
void UpdateKnownPeers(IReadOnlyList<string> knownPeers);
void ProposeAddPeer(string peer);
void ProposeRemovePeer(string peer);
bool MembershipChangeInProgress();
void AdjustClusterSize(int csz);
void AdjustBootClusterSize(int csz);
int ClusterSize();
// --- Apply queue ---
IpQueue<CommittedEntry> ApplyQ();
void PauseApply();
void ResumeApply();
bool DrainAndReplaySnapshot();
// --- Channels / lifecycle ---
ChannelReader<bool> LeadChangeC();
ChannelReader<bool> QuitC();
DateTime Created();
void Stop();
void WaitForStop();
void Delete();
bool IsDeleted();
void RecreateInternalSubs();
bool IsSystemAccount();
string GetTrafficAccountName();
}
// ============================================================================
// RaftNodeCheckpoint interface
// ============================================================================
/// <summary>
/// Allows asynchronous snapshot installation from a checkpoint.
/// Mirrors Go <c>RaftNodeCheckpoint</c> interface in server/raft.go lines 98-103.
/// Internal because it references internal types (AppendEntry).
/// </summary>
internal interface IRaftNodeCheckpoint
{
byte[] LoadLastSnapshot();
IEnumerable<(AppendEntry Entry, Exception? Error)> AppendEntriesSeq();
void Abort();
ulong InstallSnapshot(byte[] data);
}
// ============================================================================
// IWal interface
// ============================================================================
/// <summary>
/// Write-ahead log abstraction used by the Raft implementation.
/// Mirrors Go <c>WAL</c> interface in server/raft.go lines 105-118.
/// </summary>
internal interface IWal
{
StorageType Type();
(ulong Seq, long TimeStamp) StoreMsg(string subj, byte[]? hdr, byte[] msg, long ttl);
StoreMsg? LoadMsg(ulong index, StoreMsg? sm);
bool RemoveMsg(ulong index);
ulong Compact(ulong index);
ulong Purge();
ulong PurgeEx(string subject, ulong seq, ulong keep);
void Truncate(ulong seq);
StreamState State();
void FastState(ref StreamState state);
void Stop();
void Delete(bool inline);
}
// ============================================================================
// Peer
// ============================================================================
/// <summary>
/// Represents a peer in a Raft group.
/// Mirrors Go <c>Peer</c> struct in server/raft.go lines 120-125.
/// </summary>
public sealed class Peer
{
public string Id { get; set; } = string.Empty;
public bool Current { get; set; }
public DateTime Last { get; set; }
public ulong Lag { get; set; }
}
// ============================================================================
// RaftState enum
// ============================================================================
/// <summary>
/// Allowable states for a NATS Consensus Group node.
/// Mirrors Go <c>RaftState</c> iota in server/raft.go lines 128-135.
/// </summary>
public enum RaftState : byte
{
Follower = 0,
Leader = 1,
Candidate = 2,
Closed = 3,
}
// ============================================================================
// RaftConfig
// ============================================================================
/// <summary>
/// Configuration for creating a Raft group.
/// Mirrors Go <c>RaftConfig</c> struct in server/raft.go lines 301-317.
/// </summary>
public sealed class RaftConfig
{
public string Name { get; set; } = string.Empty;
public string Store { get; set; } = string.Empty;
/// <summary>WAL store — typed as object to keep public API free of internal IWal.</summary>
public object? Log { get; set; }
public bool Track { get; set; }
public bool Observer { get; set; }
/// <summary>
/// Must be set for a Raft group that's recovering after a restart, or if first
/// seen after a catchup from another server.
/// </summary>
public bool Recovering { get; set; }
/// <summary>
/// Identifies the Raft peer set is being scaled up; prevents an empty-log node
/// from becoming leader prematurely.
/// </summary>
public bool ScaleUp { get; set; }
}
// ============================================================================
// Internal Raft state types
// ============================================================================
/// <summary>
/// Main Raft node implementation.
/// Mirrors Go <c>raft</c> struct in server/raft.go lines 151-251.
/// All algorithm methods are stubbed — full implementation is session 20+.
/// </summary>
internal sealed class Raft : IRaftNode
{
// Identity / location
internal DateTime Created_ { get; set; }
internal string AccName { get; set; } = string.Empty;
internal string GroupName { get; set; } = string.Empty;
internal string StoreDir { get; set; } = string.Empty;
internal string Id { get; set; } = string.Empty;
// WAL (IWal is internal; store as object here to keep the field in the class without accessibility issues)
internal object? Wal { get; set; }
internal StorageType WalType { get; set; }
internal ulong WalBytes { get; set; }
internal Exception? WriteErr { get; set; }
// Atomic state
internal int StateValue { get; set; } // RaftState (use Interlocked)
internal long LeaderStateV { get; set; } // 1 = in complete leader state
internal string SnapFile { get; set; } = string.Empty;
// Cluster
internal int Csz { get; set; } // cluster size
internal int Qn { get; set; } // quorum node count
internal Dictionary<string, Lps> Peers_ { get; set; } = new();
// Tracking removed peers
internal Dictionary<string, DateTime> Removed { get; set; } = new();
internal Dictionary<ulong, Dictionary<string, bool>> Acks { get; set; } = new();
internal Dictionary<ulong, AppendEntry> Pae { get; set; } = new();
// Timers / activity
internal System.Threading.Timer? Elect { get; set; }
internal DateTime Active { get; set; }
internal DateTime Llqrt { get; set; }
internal DateTime Lsut { get; set; }
// Term / index tracking
internal ulong Term_ { get; set; }
internal ulong PTerm { get; set; }
internal ulong PIndex { get; set; }
internal ulong Commit { get; set; }
internal ulong Processed_ { get; set; }
internal ulong Applied_ { get; set; }
internal ulong PApplied { get; set; }
internal ulong MembChangeIndex { get; set; }
internal ulong Aflr { get; set; }
internal string LeaderId { get; set; } = string.Empty;
internal string Vote { get; set; } = string.Empty;
// Server references (object to avoid circular deps)
internal object? Server_ { get; set; }
internal object? Client_ { get; set; }
internal object? JetStream_ { get; set; }
// Atomic booleans — must be fields (not auto-properties) for Interlocked
internal long HasLeaderV;
internal long PLeaderV;
private long _isSysAccV;
// NATS subjects
internal string PSubj { get; set; } = string.Empty;
internal string RpSubj { get; set; } = string.Empty;
internal string VSubj { get; set; } = string.Empty;
internal string VReply { get; set; } = string.Empty;
internal string ASubj { get; set; } = string.Empty;
internal string AReply { get; set; } = string.Empty;
// Queues (object placeholder — IpQueue<T> from session 02)
internal object? SendQ { get; set; }
internal object? AeSub { get; set; }
// Write buffers
internal byte[] Wtv { get; set; } = [];
internal byte[] Wps { get; set; } = [];
// Catchup / progress
internal CatchupState? Catchup { get; set; }
internal Dictionary<string, IpQueue<ulong>>? Progress_ { get; set; }
internal ulong HCommit { get; set; }
// Queues (typed as object to avoid pulling IpQueue<T> directly into the struct boundary)
internal IpQueue<ProposedEntry>? PropQ { get; set; }
internal IpQueue<AppendEntry>? EntryQ { get; set; }
internal IpQueue<AppendEntryResponse>? RespQ { get; set; }
internal IpQueue<CommittedEntry>? ApplyQ_ { get; set; }
internal IpQueue<VoteRequest>? Reqs { get; set; }
internal IpQueue<VoteResponse>? Votes_ { get; set; }
internal Channel<bool>? LeadC { get; set; }
internal Channel<bool>? Quit { get; set; }
// Flags
internal bool Lxfer { get; set; }
internal bool HcBehind { get; set; }
internal bool MaybeLeader { get; set; }
internal bool Paused { get; set; }
internal bool Observer_ { get; set; }
internal bool Initializing { get; set; }
internal bool ScaleUp_ { get; set; }
internal bool Deleted_ { get; set; }
internal bool Snapshotting { get; set; }
// Lock
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.SupportsRecursion);
// -----------------------------------------------------------------------
// IRaftNode — stub implementations
// -----------------------------------------------------------------------
public void Propose(byte[] entry) => throw new NotImplementedException("TODO: session 20 — raft");
public void ProposeMulti(IReadOnlyList<Entry> entries) => throw new NotImplementedException("TODO: session 20 — raft");
public void ForwardProposal(byte[] entry) => throw new NotImplementedException("TODO: session 20 — raft");
public void InstallSnapshot(byte[] snap, bool force) => throw new NotImplementedException("TODO: session 20 — raft");
public object CreateSnapshotCheckpoint(bool force) => throw new NotImplementedException("TODO: session 20 — raft");
public void SendSnapshot(byte[] snap) => throw new NotImplementedException("TODO: session 20 — raft");
public bool NeedSnapshot() => throw new NotImplementedException("TODO: session 20 — raft");
public (ulong, ulong) Applied(ulong index) => throw new NotImplementedException("TODO: session 20 — raft");
public (ulong, ulong) Processed(ulong index, ulong applied) => throw new NotImplementedException("TODO: session 20 — raft");
public RaftState State() => (RaftState)StateValue;
public (ulong, ulong) Size() => throw new NotImplementedException("TODO: session 20 — raft");
public (ulong, ulong, ulong) Progress() => throw new NotImplementedException("TODO: session 20 — raft");
public bool Leader() => throw new NotImplementedException("TODO: session 20 — raft");
public DateTime? LeaderSince() => throw new NotImplementedException("TODO: session 20 — raft");
public bool Quorum() => throw new NotImplementedException("TODO: session 20 — raft");
public bool Current() => throw new NotImplementedException("TODO: session 20 — raft");
public bool Healthy() => throw new NotImplementedException("TODO: session 20 — raft");
public ulong Term() => Term_;
public bool Leaderless() => throw new NotImplementedException("TODO: session 20 — raft");
public string GroupLeader() => throw new NotImplementedException("TODO: session 20 — raft");
public bool HadPreviousLeader() => throw new NotImplementedException("TODO: session 20 — raft");
public void StepDown(params string[] preferred) => throw new NotImplementedException("TODO: session 20 — raft");
public void SetObserver(bool isObserver) => throw new NotImplementedException("TODO: session 20 — raft");
public bool IsObserver() => throw new NotImplementedException("TODO: session 20 — raft");
public void Campaign() => throw new NotImplementedException("TODO: session 20 — raft");
public void CampaignImmediately() => throw new NotImplementedException("TODO: session 20 — raft");
public string ID() => Id;
public string Group() => GroupName;
public IReadOnlyList<Peer> Peers() => throw new NotImplementedException("TODO: session 20 — raft");
public void ProposeKnownPeers(IReadOnlyList<string> knownPeers) => throw new NotImplementedException("TODO: session 20 — raft");
public void UpdateKnownPeers(IReadOnlyList<string> knownPeers) => throw new NotImplementedException("TODO: session 20 — raft");
public void ProposeAddPeer(string peer) => throw new NotImplementedException("TODO: session 20 — raft");
public void ProposeRemovePeer(string peer) => throw new NotImplementedException("TODO: session 20 — raft");
public bool MembershipChangeInProgress() => throw new NotImplementedException("TODO: session 20 — raft");
public void AdjustClusterSize(int csz) => throw new NotImplementedException("TODO: session 20 — raft");
public void AdjustBootClusterSize(int csz) => throw new NotImplementedException("TODO: session 20 — raft");
public int ClusterSize() => throw new NotImplementedException("TODO: session 20 — raft");
public IpQueue<CommittedEntry> ApplyQ() => ApplyQ_ ?? throw new InvalidOperationException("Apply queue not initialized");
public void PauseApply() => throw new NotImplementedException("TODO: session 20 — raft");
public void ResumeApply() => throw new NotImplementedException("TODO: session 20 — raft");
public bool DrainAndReplaySnapshot() => throw new NotImplementedException("TODO: session 20 — raft");
public ChannelReader<bool> LeadChangeC() => LeadC?.Reader ?? throw new InvalidOperationException("Lead channel not initialized");
public ChannelReader<bool> QuitC() => Quit?.Reader ?? throw new InvalidOperationException("Quit channel not initialized");
public DateTime Created() => Created_;
public void Stop() => throw new NotImplementedException("TODO: session 20 — raft");
public void WaitForStop() => throw new NotImplementedException("TODO: session 20 — raft");
public void Delete() => throw new NotImplementedException("TODO: session 20 — raft");
public bool IsDeleted() => Deleted_;
public void RecreateInternalSubs() => throw new NotImplementedException("TODO: session 20 — raft");
public bool IsSystemAccount() => Interlocked.Read(ref _isSysAccV) != 0;
public string GetTrafficAccountName() => throw new NotImplementedException("TODO: session 20 — raft");
}
// ============================================================================
// ProposedEntry
// ============================================================================
/// <summary>
/// An entry that has been proposed to the leader, with an optional reply subject.
/// Mirrors Go <c>proposedEntry</c> struct in server/raft.go lines 253-256.
/// </summary>
internal sealed class ProposedEntry
{
public Entry? Entry { get; set; }
public string Reply { get; set; } = string.Empty;
}
// ============================================================================
// CatchupState
// ============================================================================
/// <summary>
/// Tracks the state of a follower catch-up operation.
/// Mirrors Go <c>catchupState</c> struct in server/raft.go lines 259-268.
/// </summary>
internal sealed class CatchupState
{
/// <summary>Subscription that catchup messages arrive on (object to avoid session dep).</summary>
public object? Sub { get; set; }
public ulong CTerm { get; set; }
public ulong CIndex { get; set; }
public ulong PTerm { get; set; }
public ulong PIndex { get; set; }
public DateTime Active { get; set; }
public bool Signal { get; set; }
}
// ============================================================================
// Lps — leader peer state
// ============================================================================
/// <summary>
/// Per-peer state tracked by the leader: last timestamp and last index replicated.
/// Mirrors Go <c>lps</c> struct in server/raft.go lines 271-275.
/// </summary>
internal sealed class Lps
{
/// <summary>Last timestamp.</summary>
public DateTime Ts { get; set; }
/// <summary>Last index replicated.</summary>
public ulong Li { get; set; }
/// <summary>Whether this is a known peer.</summary>
public bool Kp { get; set; }
}
// ============================================================================
// Snapshot (internal)
// ============================================================================
/// <summary>
/// An encoded Raft snapshot (on-disk format).
/// Mirrors Go <c>snapshot</c> struct in server/raft.go lines 1243-1248.
/// </summary>
internal sealed class Snapshot
{
public ulong LastTerm { get; set; }
public ulong LastIndex { get; set; }
public byte[] PeerState { get; set; } = [];
public byte[] Data { get; set; } = [];
}
// ============================================================================
// Checkpoint (internal)
// ============================================================================
/// <summary>
/// Checkpoint for asynchronous snapshot installation.
/// Mirrors Go <c>checkpoint</c> struct in server/raft.go lines 1414-1421.
/// Implements <see cref="IRaftNodeCheckpoint"/>.
/// </summary>
internal sealed class Checkpoint : IRaftNodeCheckpoint
{
public Raft? Node { get; set; }
public ulong Term { get; set; }
public ulong Applied { get; set; }
public ulong PApplied { get; set; }
public string SnapFile { get; set; } = string.Empty;
public byte[] PeerState { get; set; } = [];
public byte[] LoadLastSnapshot()
=> throw new NotImplementedException("TODO: session 20 — raft");
public IEnumerable<(AppendEntry Entry, Exception? Error)> AppendEntriesSeq()
=> throw new NotImplementedException("TODO: session 20 — raft");
public void Abort()
=> throw new NotImplementedException("TODO: session 20 — raft");
public ulong InstallSnapshot(byte[] data)
=> throw new NotImplementedException("TODO: session 20 — raft");
}
// ============================================================================
// CommittedEntry
// ============================================================================
/// <summary>
/// A committed Raft entry passed up to the JetStream layer for application.
/// Mirrors Go <c>CommittedEntry</c> struct in server/raft.go lines 2506-2509.
/// </summary>
public sealed class CommittedEntry
{
public ulong Index { get; set; }
public List<Entry> Entries { get; set; } = new();
}
// ============================================================================
// AppendEntry (internal)
// ============================================================================
/// <summary>
/// The main struct used to sync Raft peers.
/// Mirrors Go <c>appendEntry</c> struct in server/raft.go lines 2557-2568.
/// </summary>
internal sealed class AppendEntry
{
public string Leader { get; set; } = string.Empty;
public ulong TermV { get; set; }
public ulong Commit { get; set; }
public ulong PTerm { get; set; }
public ulong PIndex { get; set; }
public List<Entry> Entries { get; set; } = new();
/// <summary>Highest term for catchups (if 0, use Term).</summary>
public ulong LTerm { get; set; }
public string Reply { get; set; } = string.Empty;
/// <summary>Subscription the append entry arrived on (object to avoid session dep).</summary>
public object? Sub { get; set; }
public byte[]? Buf { get; set; }
}
// ============================================================================
// EntryType enum
// ============================================================================
/// <summary>
/// Type of a Raft log entry.
/// Mirrors Go <c>EntryType</c> iota in server/raft.go lines 2605-2619.
/// </summary>
public enum EntryType : byte
{
EntryNormal = 0,
EntryOldSnapshot = 1,
EntryPeerState = 2,
EntryAddPeer = 3,
EntryRemovePeer = 4,
EntryLeaderTransfer = 5,
EntrySnapshot = 6,
/// <summary>Internal signal type — not transmitted between peers or stored in the log.</summary>
EntryCatchup = 7,
}
// ============================================================================
// Entry
// ============================================================================
/// <summary>
/// A single Raft log entry (type + opaque payload).
/// Mirrors Go <c>Entry</c> struct in server/raft.go lines 2641-2643.
/// </summary>
public sealed class Entry
{
public EntryType Type { get; set; }
public byte[] Data { get; set; } = [];
/// <summary>
/// Returns true if this entry changes group membership.
/// Mirrors Go <c>Entry.ChangesMembership</c>.
/// </summary>
public bool ChangesMembership()
=> Type == EntryType.EntryAddPeer || Type == EntryType.EntryRemovePeer;
}
// ============================================================================
// AppendEntryResponse (internal)
// ============================================================================
/// <summary>
/// Response sent by a follower after receiving an append-entry RPC.
/// Mirrors Go <c>appendEntryResponse</c> struct in server/raft.go lines 2760-2766.
/// </summary>
internal sealed class AppendEntryResponse
{
public ulong TermV { get; set; }
public ulong Index { get; set; }
public string Peer { get; set; } = string.Empty;
public string Reply { get; set; } = string.Empty;
public bool Success { get; set; }
}
// ============================================================================
// PeerState (internal)
// ============================================================================
/// <summary>
/// Encoded peer state attached to snapshots and peer-state entries.
/// Mirrors Go <c>peerState</c> struct in server/raft.go lines 4470-4474.
/// </summary>
internal sealed class PeerState
{
public List<string> KnownPeers { get; set; } = new();
public int ClusterSize { get; set; }
/// <summary>Extension / domain state (opaque ushort in Go).</summary>
public ushort DomainExt { get; set; }
}
// ============================================================================
// VoteRequest (internal)
// ============================================================================
/// <summary>
/// A Raft vote request sent during leader election.
/// Mirrors Go <c>voteRequest</c> struct in server/raft.go lines 4549-4556.
/// </summary>
internal sealed class VoteRequest
{
public ulong TermV { get; set; }
public ulong LastTerm { get; set; }
public ulong LastIndex { get; set; }
public string Candidate { get; set; } = string.Empty;
/// <summary>Internal use — reply subject.</summary>
public string Reply { get; set; } = string.Empty;
}
// ============================================================================
// VoteResponse (internal)
// ============================================================================
/// <summary>
/// A response to a <see cref="VoteRequest"/>.
/// Mirrors Go <c>voteResponse</c> struct in server/raft.go lines 4730-4735.
/// </summary>
internal sealed class VoteResponse
{
public ulong TermV { get; set; }
public string Peer { get; set; } = string.Empty;
public bool Granted { get; set; }
/// <summary>Whether this peer's log is empty.</summary>
public bool Empty { get; set; }
}

View File

@@ -249,8 +249,7 @@ internal static class RateCounterFactory
=> new(rateLimit);
}
/// <summary>Stub for RaftNode (session 20).</summary>
public interface IRaftNode { }
// IRaftNode is now fully defined in JetStream/RaftTypes.cs (session 20).
// ============================================================================
// Session 10: Ports, TlsMixConn, CaptureHttpServerLog