From a7c094d6c1b12ad6fd3fabfec8a251d03aa12739 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 09:22:18 -0500 Subject: [PATCH] feat: add unsupported asset handling for mixed-version clusters (Gap 2.11) Add Version property to StreamAssignment and ConsumerAssignment so future-version entries from newer cluster peers are gracefully skipped rather than applied incorrectly. ProcessStreamAssignment and ProcessConsumerAssignment now reject entries with Version > CurrentVersion (1), incrementing SkippedUnsupportedEntries for operator visibility. Add MetaEntryType.Unknown with a no-op ApplyEntry handler so unrecognised RAFT entry types never crash the cluster. Version 0 is treated as current for backward compatibility with pre-versioned assignments. --- .../Cluster/ClusterAssignmentTypes.cs | 20 ++ .../JetStream/Cluster/JetStreamMetaGroup.cs | 59 ++++- .../Cluster/UnsupportedAssetTests.cs | 222 ++++++++++++++++++ 3 files changed, 299 insertions(+), 2 deletions(-) create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/UnsupportedAssetTests.cs diff --git a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs index a905392..604bd94 100644 --- a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs +++ b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs @@ -119,6 +119,16 @@ public sealed class StreamAssignment public bool Recovering { get; set; } public bool Reassigning { get; set; } + /// + /// Schema version for forward-compatibility. Assignments with a version greater than + /// are skipped so that an older node + /// does not crash when replaying a RAFT log produced by a newer cluster peer. + /// Version 0 is treated as equivalent to version 1 (backward compatibility with + /// pre-versioned assignments). + /// Go reference: jetstream_cluster.go — versioned assignment processing. + /// + public int Version { get; set; } = 1; + /// /// Consumer assignments keyed by consumer name. /// Uses so the deserializer populates @@ -141,4 +151,14 @@ public sealed class ConsumerAssignment public string ConfigJson { get; set; } = "{}"; public bool Responded { get; set; } public bool Recovering { get; set; } + + /// + /// Schema version for forward-compatibility. Assignments with a version greater than + /// are skipped so that an older node + /// does not crash when replaying a RAFT log produced by a newer cluster peer. + /// Version 0 is treated as equivalent to version 1 (backward compatibility with + /// pre-versioned assignments). + /// Go reference: jetstream_cluster.go — versioned assignment processing. + /// + public int Version { get; set; } = 1; } diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index f36cc9c..9f681e3 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -39,6 +39,10 @@ public sealed class JetStreamMetaGroup private int _leaderIndex = 1; private long _leadershipVersion = 1; + // Count of entries skipped due to unsupported version or unknown type. + // Go reference: jetstream_cluster.go — forward-compatibility skip counter. + private int _skippedUnsupportedEntries; + public JetStreamMetaGroup(int nodes) : this(nodes, selfIndex: 1) { @@ -50,6 +54,13 @@ public sealed class JetStreamMetaGroup _selfIndex = selfIndex; } + /// + /// The highest assignment schema version this node supports. + /// Assignments with a higher version are skipped rather than crashing the cluster. + /// Go reference: jetstream_cluster.go — versioned assignment handling for forward compatibility. + /// + public const int CurrentVersion = 1; + /// /// Returns true when this node is the current meta-group leader. /// Go reference: jetstream_api.go:200-300 -- leader check before mutating operations. @@ -79,6 +90,13 @@ public sealed class JetStreamMetaGroup /// public int ConsumerCount => _totalConsumerCount; + /// + /// Number of entries skipped because their version exceeds + /// or their entry type is unknown. Allows operators to detect version skew in a mixed cluster. + /// Go reference: jetstream_cluster.go — forward-compatibility skip tracking. + /// + public int SkippedUnsupportedEntries => _skippedUnsupportedEntries; + /// /// Total number of inflight stream proposals across all accounts. /// @@ -417,7 +435,9 @@ public sealed class JetStreamMetaGroup /// /// Validates and processes a stream assignment. - /// Returns false if the assignment is invalid (empty name, null group). + /// Returns false if the assignment is invalid (empty name, null group) or if + /// the assignment's version exceeds (forward-compatibility skip). + /// Version 0 is treated as version 1 for backward compatibility with pre-versioned entries. /// Idempotent: duplicate assignments for the same stream name are accepted. /// Go reference: jetstream_cluster.go:4541 processStreamAssignment. /// @@ -426,6 +446,15 @@ public sealed class JetStreamMetaGroup if (string.IsNullOrEmpty(sa.StreamName) || sa.Group == null) return false; + // Version 0 means pre-versioned (treat as current). Versions above CurrentVersion + // are from a newer node and must be skipped to avoid corrupting cluster state. + var effectiveVersion = sa.Version == 0 ? CurrentVersion : sa.Version; + if (effectiveVersion > CurrentVersion) + { + Interlocked.Increment(ref _skippedUnsupportedEntries); + return false; + } + AddStreamAssignment(sa); return true; } @@ -473,7 +502,9 @@ public sealed class JetStreamMetaGroup /// /// Validates and processes a consumer assignment. - /// Returns false if the parent stream does not exist or consumer name is empty. + /// Returns false if the parent stream does not exist, consumer name is empty, or if + /// the assignment's version exceeds (forward-compatibility skip). + /// Version 0 is treated as version 1 for backward compatibility with pre-versioned entries. /// Go reference: jetstream_cluster.go:5300 processConsumerAssignment. /// public bool ProcessConsumerAssignment(ConsumerAssignment ca) @@ -481,6 +512,15 @@ public sealed class JetStreamMetaGroup if (string.IsNullOrEmpty(ca.ConsumerName) || string.IsNullOrEmpty(ca.StreamName)) return false; + // Version 0 means pre-versioned (treat as current). Versions above CurrentVersion + // are from a newer node and must be skipped to avoid corrupting cluster state. + var effectiveVersion = ca.Version == 0 ? CurrentVersion : ca.Version; + if (effectiveVersion > CurrentVersion) + { + Interlocked.Increment(ref _skippedUnsupportedEntries); + return false; + } + if (!_assignments.ContainsKey(ca.StreamName)) return false; @@ -606,6 +646,14 @@ public sealed class JetStreamMetaGroup case MetaEntryType.PeerRemove: ProcessRemovePeer(name); break; + case MetaEntryType.Unknown: + default: + // Unknown or future entry types are counted and skipped. + // This prevents an older node from crashing when it encounters a RAFT entry + // produced by a newer cluster peer. + // Go reference: jetstream_cluster.go — forward-compatibility fallback. + Interlocked.Increment(ref _skippedUnsupportedEntries); + break; } } @@ -932,6 +980,13 @@ public enum MetaEntryType /// Go reference: jetstream_cluster.go processRemovePeer. /// PeerRemove, + /// + /// Sentinel value for an unrecognised entry type received from a newer cluster peer. + /// Applying an Unknown entry increments + /// and is otherwise a no-op, ensuring forward compatibility without crashing. + /// Go reference: jetstream_cluster.go — unknown entry type graceful handling. + /// + Unknown, } public sealed class MetaGroupState diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/UnsupportedAssetTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/UnsupportedAssetTests.cs new file mode 100644 index 0000000..9e9c9aa --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/UnsupportedAssetTests.cs @@ -0,0 +1,222 @@ +// Go parity: jetstream_cluster.go — version-incompatible stream/consumer assignment handling. +// Covers: future-version SA/CA rejection, unknown MetaEntryType graceful handling, +// SkippedUnsupportedEntries counter, mixed-version batch partial apply. +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for graceful handling of version-incompatible stream/consumer assignments +/// in JetStreamMetaGroup (Gap 2.11). +/// Go reference: jetstream_cluster.go — versioned assignment processing, unknown entry fallback. +/// +public class UnsupportedAssetTests +{ + // --------------------------------------------------------------- + // ProcessStreamAssignment — version checks + // Go reference: jetstream_cluster.go:4541 processStreamAssignment + // --------------------------------------------------------------- + + [Fact] + public void ProcessStreamAssignment_skips_future_version() + { + // Go ref: jetstream_cluster.go — future-version entries are skipped to avoid cluster crash. + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "ORDERS", + Group = new RaftGroup { Name = "orders-group" }, + Version = 2, // future version — beyond CurrentVersion + }; + + var result = meta.ProcessStreamAssignment(sa); + + result.ShouldBeFalse(); + meta.StreamCount.ShouldBe(0); + meta.SkippedUnsupportedEntries.ShouldBe(1); + } + + [Fact] + public void ProcessStreamAssignment_accepts_current_version() + { + // Go ref: jetstream_cluster.go — current-version entries are processed normally. + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "ORDERS", + Group = new RaftGroup { Name = "orders-group" }, + Version = JetStreamMetaGroup.CurrentVersion, + }; + + var result = meta.ProcessStreamAssignment(sa); + + result.ShouldBeTrue(); + meta.StreamCount.ShouldBe(1); + meta.SkippedUnsupportedEntries.ShouldBe(0); + } + + [Fact] + public void ProcessStreamAssignment_accepts_default_version() + { + // Go ref: jetstream_cluster.go — version 0 (default/unset) is treated as current version, + // maintaining backward compatibility with pre-versioned assignments. + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "ORDERS", + Group = new RaftGroup { Name = "orders-group" }, + // Version = 0 (default int value — pre-versioned assignment) + }; + + var result = meta.ProcessStreamAssignment(sa); + + result.ShouldBeTrue(); + meta.StreamCount.ShouldBe(1); + meta.SkippedUnsupportedEntries.ShouldBe(0); + } + + // --------------------------------------------------------------- + // ProcessConsumerAssignment — version checks + // Go reference: jetstream_cluster.go:5300 processConsumerAssignment + // --------------------------------------------------------------- + + [Fact] + public void ProcessConsumerAssignment_skips_future_version() + { + // Go ref: jetstream_cluster.go — future-version consumer entries are skipped. + var meta = new JetStreamMetaGroup(3); + + // First add the parent stream (current version) + var sa = new StreamAssignment + { + StreamName = "ORDERS", + Group = new RaftGroup { Name = "orders-group" }, + }; + meta.ProcessStreamAssignment(sa); + + var ca = new ConsumerAssignment + { + ConsumerName = "my-consumer", + StreamName = "ORDERS", + Group = new RaftGroup { Name = "consumer-group" }, + Version = 2, // future version + }; + + var result = meta.ProcessConsumerAssignment(ca); + + result.ShouldBeFalse(); + meta.ConsumerCount.ShouldBe(0); + meta.SkippedUnsupportedEntries.ShouldBe(1); + } + + [Fact] + public void ProcessConsumerAssignment_accepts_current_version() + { + // Go ref: jetstream_cluster.go — current-version consumer entries are processed normally. + var meta = new JetStreamMetaGroup(3); + + var sa = new StreamAssignment + { + StreamName = "ORDERS", + Group = new RaftGroup { Name = "orders-group" }, + }; + meta.ProcessStreamAssignment(sa); + + var ca = new ConsumerAssignment + { + ConsumerName = "my-consumer", + StreamName = "ORDERS", + Group = new RaftGroup { Name = "consumer-group" }, + Version = JetStreamMetaGroup.CurrentVersion, + }; + + var result = meta.ProcessConsumerAssignment(ca); + + result.ShouldBeTrue(); + meta.ConsumerCount.ShouldBe(1); + meta.SkippedUnsupportedEntries.ShouldBe(0); + } + + // --------------------------------------------------------------- + // ApplyEntry — unknown entry type + // Go reference: jetstream_cluster.go — unknown entry type fallback (no crash) + // --------------------------------------------------------------- + + [Fact] + public void ApplyEntry_unknown_type_does_not_crash() + { + // Go ref: jetstream_cluster.go — unknown entry types must not crash the cluster; + // they are counted and skipped to allow forward compatibility. + var meta = new JetStreamMetaGroup(3); + + // Should not throw + meta.ApplyEntry(MetaEntryType.Unknown, "something"); + + meta.SkippedUnsupportedEntries.ShouldBe(1); + } + + // --------------------------------------------------------------- + // SkippedUnsupportedEntries counter accumulation + // --------------------------------------------------------------- + + [Fact] + public void SkippedUnsupportedEntries_count_increments_for_each_skip() + { + // Go ref: jetstream_cluster.go — cluster must track how many unsupported entries + // were encountered so operators can detect version skew. + var meta = new JetStreamMetaGroup(3); + + var futureStream = new StreamAssignment + { + StreamName = "STREAM-A", + Group = new RaftGroup { Name = "g1" }, + Version = 99, + }; + var futureStream2 = new StreamAssignment + { + StreamName = "STREAM-B", + Group = new RaftGroup { Name = "g2" }, + Version = 99, + }; + + meta.ProcessStreamAssignment(futureStream); + meta.ProcessStreamAssignment(futureStream2); + meta.ApplyEntry(MetaEntryType.Unknown, "x"); + + meta.SkippedUnsupportedEntries.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Mixed-version batch: only v1 assignments applied + // --------------------------------------------------------------- + + [Fact] + public void Mixed_versions_partial_apply() + { + // Go ref: jetstream_cluster.go — when replaying a RAFT log with mixed-version entries, + // supported entries are applied and future-version entries are skipped without affecting + // correctly versioned entries. + var meta = new JetStreamMetaGroup(3); + + var streams = new[] + { + new StreamAssignment { StreamName = "S1", Group = new RaftGroup { Name = "g1" }, Version = 1 }, + new StreamAssignment { StreamName = "S2", Group = new RaftGroup { Name = "g2" }, Version = 2 }, // future + new StreamAssignment { StreamName = "S3", Group = new RaftGroup { Name = "g3" }, Version = 1 }, + new StreamAssignment { StreamName = "S4", Group = new RaftGroup { Name = "g4" }, Version = 3 }, // future + new StreamAssignment { StreamName = "S5", Group = new RaftGroup { Name = "g5" }, Version = 0 }, // default = current + }; + + foreach (var sa in streams) + meta.ProcessStreamAssignment(sa); + + // S1, S3, S5 should be applied; S2, S4 skipped + meta.StreamCount.ShouldBe(3); + meta.GetStreamAssignment("S1").ShouldNotBeNull(); + meta.GetStreamAssignment("S2").ShouldBeNull(); + meta.GetStreamAssignment("S3").ShouldNotBeNull(); + meta.GetStreamAssignment("S4").ShouldBeNull(); + meta.GetStreamAssignment("S5").ShouldNotBeNull(); + meta.SkippedUnsupportedEntries.ShouldBe(2); + } +}