diff --git a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs
index a905392..604bd94 100644
--- a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs
+++ b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs
@@ -119,6 +119,16 @@ public sealed class StreamAssignment
public bool Recovering { get; set; }
public bool Reassigning { get; set; }
+ ///
+ /// Schema version for forward-compatibility. Assignments with a version greater than
+ /// are skipped so that an older node
+ /// does not crash when replaying a RAFT log produced by a newer cluster peer.
+ /// Version 0 is treated as equivalent to version 1 (backward compatibility with
+ /// pre-versioned assignments).
+ /// Go reference: jetstream_cluster.go — versioned assignment processing.
+ ///
+ public int Version { get; set; } = 1;
+
///
/// Consumer assignments keyed by consumer name.
/// Uses so the deserializer populates
@@ -141,4 +151,14 @@ public sealed class ConsumerAssignment
public string ConfigJson { get; set; } = "{}";
public bool Responded { get; set; }
public bool Recovering { get; set; }
+
+ ///
+ /// Schema version for forward-compatibility. Assignments with a version greater than
+ /// are skipped so that an older node
+ /// does not crash when replaying a RAFT log produced by a newer cluster peer.
+ /// Version 0 is treated as equivalent to version 1 (backward compatibility with
+ /// pre-versioned assignments).
+ /// Go reference: jetstream_cluster.go — versioned assignment processing.
+ ///
+ public int Version { get; set; } = 1;
}
diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
index f36cc9c..9f681e3 100644
--- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
+++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
@@ -39,6 +39,10 @@ public sealed class JetStreamMetaGroup
private int _leaderIndex = 1;
private long _leadershipVersion = 1;
+ // Count of entries skipped due to unsupported version or unknown type.
+ // Go reference: jetstream_cluster.go — forward-compatibility skip counter.
+ private int _skippedUnsupportedEntries;
+
public JetStreamMetaGroup(int nodes)
: this(nodes, selfIndex: 1)
{
@@ -50,6 +54,13 @@ public sealed class JetStreamMetaGroup
_selfIndex = selfIndex;
}
+ ///
+ /// The highest assignment schema version this node supports.
+ /// Assignments with a higher version are skipped rather than crashing the cluster.
+ /// Go reference: jetstream_cluster.go — versioned assignment handling for forward compatibility.
+ ///
+ public const int CurrentVersion = 1;
+
///
/// Returns true when this node is the current meta-group leader.
/// Go reference: jetstream_api.go:200-300 -- leader check before mutating operations.
@@ -79,6 +90,13 @@ public sealed class JetStreamMetaGroup
///
public int ConsumerCount => _totalConsumerCount;
+ ///
+ /// Number of entries skipped because their version exceeds
+ /// or their entry type is unknown. Allows operators to detect version skew in a mixed cluster.
+ /// Go reference: jetstream_cluster.go — forward-compatibility skip tracking.
+ ///
+ public int SkippedUnsupportedEntries => _skippedUnsupportedEntries;
+
///
/// Total number of inflight stream proposals across all accounts.
///
@@ -417,7 +435,9 @@ public sealed class JetStreamMetaGroup
///
/// Validates and processes a stream assignment.
- /// Returns false if the assignment is invalid (empty name, null group).
+ /// Returns false if the assignment is invalid (empty name, null group) or if
+ /// the assignment's version exceeds (forward-compatibility skip).
+ /// Version 0 is treated as version 1 for backward compatibility with pre-versioned entries.
/// Idempotent: duplicate assignments for the same stream name are accepted.
/// Go reference: jetstream_cluster.go:4541 processStreamAssignment.
///
@@ -426,6 +446,15 @@ public sealed class JetStreamMetaGroup
if (string.IsNullOrEmpty(sa.StreamName) || sa.Group == null)
return false;
+ // Version 0 means pre-versioned (treat as current). Versions above CurrentVersion
+ // are from a newer node and must be skipped to avoid corrupting cluster state.
+ var effectiveVersion = sa.Version == 0 ? CurrentVersion : sa.Version;
+ if (effectiveVersion > CurrentVersion)
+ {
+ Interlocked.Increment(ref _skippedUnsupportedEntries);
+ return false;
+ }
+
AddStreamAssignment(sa);
return true;
}
@@ -473,7 +502,9 @@ public sealed class JetStreamMetaGroup
///
/// Validates and processes a consumer assignment.
- /// Returns false if the parent stream does not exist or consumer name is empty.
+ /// Returns false if the parent stream does not exist, consumer name is empty, or if
+ /// the assignment's version exceeds (forward-compatibility skip).
+ /// Version 0 is treated as version 1 for backward compatibility with pre-versioned entries.
/// Go reference: jetstream_cluster.go:5300 processConsumerAssignment.
///
public bool ProcessConsumerAssignment(ConsumerAssignment ca)
@@ -481,6 +512,15 @@ public sealed class JetStreamMetaGroup
if (string.IsNullOrEmpty(ca.ConsumerName) || string.IsNullOrEmpty(ca.StreamName))
return false;
+ // Version 0 means pre-versioned (treat as current). Versions above CurrentVersion
+ // are from a newer node and must be skipped to avoid corrupting cluster state.
+ var effectiveVersion = ca.Version == 0 ? CurrentVersion : ca.Version;
+ if (effectiveVersion > CurrentVersion)
+ {
+ Interlocked.Increment(ref _skippedUnsupportedEntries);
+ return false;
+ }
+
if (!_assignments.ContainsKey(ca.StreamName))
return false;
@@ -606,6 +646,14 @@ public sealed class JetStreamMetaGroup
case MetaEntryType.PeerRemove:
ProcessRemovePeer(name);
break;
+ case MetaEntryType.Unknown:
+ default:
+ // Unknown or future entry types are counted and skipped.
+ // This prevents an older node from crashing when it encounters a RAFT entry
+ // produced by a newer cluster peer.
+ // Go reference: jetstream_cluster.go — forward-compatibility fallback.
+ Interlocked.Increment(ref _skippedUnsupportedEntries);
+ break;
}
}
@@ -932,6 +980,13 @@ public enum MetaEntryType
/// Go reference: jetstream_cluster.go processRemovePeer.
///
PeerRemove,
+ ///
+ /// Sentinel value for an unrecognised entry type received from a newer cluster peer.
+ /// Applying an Unknown entry increments
+ /// and is otherwise a no-op, ensuring forward compatibility without crashing.
+ /// Go reference: jetstream_cluster.go — unknown entry type graceful handling.
+ ///
+ Unknown,
}
public sealed class MetaGroupState
diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/UnsupportedAssetTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/UnsupportedAssetTests.cs
new file mode 100644
index 0000000..9e9c9aa
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Cluster/UnsupportedAssetTests.cs
@@ -0,0 +1,222 @@
+// Go parity: jetstream_cluster.go — version-incompatible stream/consumer assignment handling.
+// Covers: future-version SA/CA rejection, unknown MetaEntryType graceful handling,
+// SkippedUnsupportedEntries counter, mixed-version batch partial apply.
+using NATS.Server.JetStream.Cluster;
+
+namespace NATS.Server.Tests.JetStream.Cluster;
+
+///
+/// Tests for graceful handling of version-incompatible stream/consumer assignments
+/// in JetStreamMetaGroup (Gap 2.11).
+/// Go reference: jetstream_cluster.go — versioned assignment processing, unknown entry fallback.
+///
+public class UnsupportedAssetTests
+{
+ // ---------------------------------------------------------------
+ // ProcessStreamAssignment — version checks
+ // Go reference: jetstream_cluster.go:4541 processStreamAssignment
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void ProcessStreamAssignment_skips_future_version()
+ {
+ // Go ref: jetstream_cluster.go — future-version entries are skipped to avoid cluster crash.
+ var meta = new JetStreamMetaGroup(3);
+ var sa = new StreamAssignment
+ {
+ StreamName = "ORDERS",
+ Group = new RaftGroup { Name = "orders-group" },
+ Version = 2, // future version — beyond CurrentVersion
+ };
+
+ var result = meta.ProcessStreamAssignment(sa);
+
+ result.ShouldBeFalse();
+ meta.StreamCount.ShouldBe(0);
+ meta.SkippedUnsupportedEntries.ShouldBe(1);
+ }
+
+ [Fact]
+ public void ProcessStreamAssignment_accepts_current_version()
+ {
+ // Go ref: jetstream_cluster.go — current-version entries are processed normally.
+ var meta = new JetStreamMetaGroup(3);
+ var sa = new StreamAssignment
+ {
+ StreamName = "ORDERS",
+ Group = new RaftGroup { Name = "orders-group" },
+ Version = JetStreamMetaGroup.CurrentVersion,
+ };
+
+ var result = meta.ProcessStreamAssignment(sa);
+
+ result.ShouldBeTrue();
+ meta.StreamCount.ShouldBe(1);
+ meta.SkippedUnsupportedEntries.ShouldBe(0);
+ }
+
+ [Fact]
+ public void ProcessStreamAssignment_accepts_default_version()
+ {
+ // Go ref: jetstream_cluster.go — version 0 (default/unset) is treated as current version,
+ // maintaining backward compatibility with pre-versioned assignments.
+ var meta = new JetStreamMetaGroup(3);
+ var sa = new StreamAssignment
+ {
+ StreamName = "ORDERS",
+ Group = new RaftGroup { Name = "orders-group" },
+ // Version = 0 (default int value — pre-versioned assignment)
+ };
+
+ var result = meta.ProcessStreamAssignment(sa);
+
+ result.ShouldBeTrue();
+ meta.StreamCount.ShouldBe(1);
+ meta.SkippedUnsupportedEntries.ShouldBe(0);
+ }
+
+ // ---------------------------------------------------------------
+ // ProcessConsumerAssignment — version checks
+ // Go reference: jetstream_cluster.go:5300 processConsumerAssignment
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void ProcessConsumerAssignment_skips_future_version()
+ {
+ // Go ref: jetstream_cluster.go — future-version consumer entries are skipped.
+ var meta = new JetStreamMetaGroup(3);
+
+ // First add the parent stream (current version)
+ var sa = new StreamAssignment
+ {
+ StreamName = "ORDERS",
+ Group = new RaftGroup { Name = "orders-group" },
+ };
+ meta.ProcessStreamAssignment(sa);
+
+ var ca = new ConsumerAssignment
+ {
+ ConsumerName = "my-consumer",
+ StreamName = "ORDERS",
+ Group = new RaftGroup { Name = "consumer-group" },
+ Version = 2, // future version
+ };
+
+ var result = meta.ProcessConsumerAssignment(ca);
+
+ result.ShouldBeFalse();
+ meta.ConsumerCount.ShouldBe(0);
+ meta.SkippedUnsupportedEntries.ShouldBe(1);
+ }
+
+ [Fact]
+ public void ProcessConsumerAssignment_accepts_current_version()
+ {
+ // Go ref: jetstream_cluster.go — current-version consumer entries are processed normally.
+ var meta = new JetStreamMetaGroup(3);
+
+ var sa = new StreamAssignment
+ {
+ StreamName = "ORDERS",
+ Group = new RaftGroup { Name = "orders-group" },
+ };
+ meta.ProcessStreamAssignment(sa);
+
+ var ca = new ConsumerAssignment
+ {
+ ConsumerName = "my-consumer",
+ StreamName = "ORDERS",
+ Group = new RaftGroup { Name = "consumer-group" },
+ Version = JetStreamMetaGroup.CurrentVersion,
+ };
+
+ var result = meta.ProcessConsumerAssignment(ca);
+
+ result.ShouldBeTrue();
+ meta.ConsumerCount.ShouldBe(1);
+ meta.SkippedUnsupportedEntries.ShouldBe(0);
+ }
+
+ // ---------------------------------------------------------------
+ // ApplyEntry — unknown entry type
+ // Go reference: jetstream_cluster.go — unknown entry type fallback (no crash)
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void ApplyEntry_unknown_type_does_not_crash()
+ {
+ // Go ref: jetstream_cluster.go — unknown entry types must not crash the cluster;
+ // they are counted and skipped to allow forward compatibility.
+ var meta = new JetStreamMetaGroup(3);
+
+ // Should not throw
+ meta.ApplyEntry(MetaEntryType.Unknown, "something");
+
+ meta.SkippedUnsupportedEntries.ShouldBe(1);
+ }
+
+ // ---------------------------------------------------------------
+ // SkippedUnsupportedEntries counter accumulation
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void SkippedUnsupportedEntries_count_increments_for_each_skip()
+ {
+ // Go ref: jetstream_cluster.go — cluster must track how many unsupported entries
+ // were encountered so operators can detect version skew.
+ var meta = new JetStreamMetaGroup(3);
+
+ var futureStream = new StreamAssignment
+ {
+ StreamName = "STREAM-A",
+ Group = new RaftGroup { Name = "g1" },
+ Version = 99,
+ };
+ var futureStream2 = new StreamAssignment
+ {
+ StreamName = "STREAM-B",
+ Group = new RaftGroup { Name = "g2" },
+ Version = 99,
+ };
+
+ meta.ProcessStreamAssignment(futureStream);
+ meta.ProcessStreamAssignment(futureStream2);
+ meta.ApplyEntry(MetaEntryType.Unknown, "x");
+
+ meta.SkippedUnsupportedEntries.ShouldBe(3);
+ }
+
+ // ---------------------------------------------------------------
+ // Mixed-version batch: only v1 assignments applied
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void Mixed_versions_partial_apply()
+ {
+ // Go ref: jetstream_cluster.go — when replaying a RAFT log with mixed-version entries,
+ // supported entries are applied and future-version entries are skipped without affecting
+ // correctly versioned entries.
+ var meta = new JetStreamMetaGroup(3);
+
+ var streams = new[]
+ {
+ new StreamAssignment { StreamName = "S1", Group = new RaftGroup { Name = "g1" }, Version = 1 },
+ new StreamAssignment { StreamName = "S2", Group = new RaftGroup { Name = "g2" }, Version = 2 }, // future
+ new StreamAssignment { StreamName = "S3", Group = new RaftGroup { Name = "g3" }, Version = 1 },
+ new StreamAssignment { StreamName = "S4", Group = new RaftGroup { Name = "g4" }, Version = 3 }, // future
+ new StreamAssignment { StreamName = "S5", Group = new RaftGroup { Name = "g5" }, Version = 0 }, // default = current
+ };
+
+ foreach (var sa in streams)
+ meta.ProcessStreamAssignment(sa);
+
+ // S1, S3, S5 should be applied; S2, S4 skipped
+ meta.StreamCount.ShouldBe(3);
+ meta.GetStreamAssignment("S1").ShouldNotBeNull();
+ meta.GetStreamAssignment("S2").ShouldBeNull();
+ meta.GetStreamAssignment("S3").ShouldNotBeNull();
+ meta.GetStreamAssignment("S4").ShouldBeNull();
+ meta.GetStreamAssignment("S5").ShouldNotBeNull();
+ meta.SkippedUnsupportedEntries.ShouldBe(2);
+ }
+}