feat: add unsupported asset handling for mixed-version clusters (Gap 2.11)
Add Version property to StreamAssignment and ConsumerAssignment so future-version entries from newer cluster peers are gracefully skipped rather than applied incorrectly. ProcessStreamAssignment and ProcessConsumerAssignment now reject entries with Version > CurrentVersion (1), incrementing SkippedUnsupportedEntries for operator visibility. Add MetaEntryType.Unknown with a no-op ApplyEntry handler so unrecognised RAFT entry types never crash the cluster. Version 0 is treated as current for backward compatibility with pre-versioned assignments.
This commit is contained in:
@@ -119,6 +119,16 @@ public sealed class StreamAssignment
|
||||
public bool Recovering { get; set; }
|
||||
public bool Reassigning { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Schema version for forward-compatibility. Assignments with a version greater than
|
||||
/// <see cref="JetStreamMetaGroup.CurrentVersion"/> are skipped so that an older node
|
||||
/// does not crash when replaying a RAFT log produced by a newer cluster peer.
|
||||
/// Version 0 is treated as equivalent to version 1 (backward compatibility with
|
||||
/// pre-versioned assignments).
|
||||
/// Go reference: jetstream_cluster.go — versioned assignment processing.
|
||||
/// </summary>
|
||||
public int Version { get; set; } = 1;
|
||||
|
||||
/// <summary>
|
||||
/// Consumer assignments keyed by consumer name.
|
||||
/// Uses <see cref="JsonObjectCreationHandling.Populate"/> so the deserializer populates
|
||||
@@ -141,4 +151,14 @@ public sealed class ConsumerAssignment
|
||||
public string ConfigJson { get; set; } = "{}";
|
||||
public bool Responded { get; set; }
|
||||
public bool Recovering { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Schema version for forward-compatibility. Assignments with a version greater than
|
||||
/// <see cref="JetStreamMetaGroup.CurrentVersion"/> are skipped so that an older node
|
||||
/// does not crash when replaying a RAFT log produced by a newer cluster peer.
|
||||
/// Version 0 is treated as equivalent to version 1 (backward compatibility with
|
||||
/// pre-versioned assignments).
|
||||
/// Go reference: jetstream_cluster.go — versioned assignment processing.
|
||||
/// </summary>
|
||||
public int Version { get; set; } = 1;
|
||||
}
|
||||
|
||||
@@ -39,6 +39,10 @@ public sealed class JetStreamMetaGroup
|
||||
private int _leaderIndex = 1;
|
||||
private long _leadershipVersion = 1;
|
||||
|
||||
// Count of entries skipped due to unsupported version or unknown type.
|
||||
// Go reference: jetstream_cluster.go — forward-compatibility skip counter.
|
||||
private int _skippedUnsupportedEntries;
|
||||
|
||||
public JetStreamMetaGroup(int nodes)
|
||||
: this(nodes, selfIndex: 1)
|
||||
{
|
||||
@@ -50,6 +54,13 @@ public sealed class JetStreamMetaGroup
|
||||
_selfIndex = selfIndex;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The highest assignment schema version this node supports.
|
||||
/// Assignments with a higher version are skipped rather than crashing the cluster.
|
||||
/// Go reference: jetstream_cluster.go — versioned assignment handling for forward compatibility.
|
||||
/// </summary>
|
||||
public const int CurrentVersion = 1;
|
||||
|
||||
/// <summary>
|
||||
/// Returns true when this node is the current meta-group leader.
|
||||
/// Go reference: jetstream_api.go:200-300 -- leader check before mutating operations.
|
||||
@@ -79,6 +90,13 @@ public sealed class JetStreamMetaGroup
|
||||
/// </summary>
|
||||
public int ConsumerCount => _totalConsumerCount;
|
||||
|
||||
/// <summary>
|
||||
/// Number of entries skipped because their version exceeds <see cref="CurrentVersion"/>
|
||||
/// or their entry type is unknown. Allows operators to detect version skew in a mixed cluster.
|
||||
/// Go reference: jetstream_cluster.go — forward-compatibility skip tracking.
|
||||
/// </summary>
|
||||
public int SkippedUnsupportedEntries => _skippedUnsupportedEntries;
|
||||
|
||||
/// <summary>
|
||||
/// Total number of inflight stream proposals across all accounts.
|
||||
/// </summary>
|
||||
@@ -417,7 +435,9 @@ public sealed class JetStreamMetaGroup
|
||||
|
||||
/// <summary>
|
||||
/// Validates and processes a stream assignment.
|
||||
/// Returns false if the assignment is invalid (empty name, null group).
|
||||
/// Returns false if the assignment is invalid (empty name, null group) or if
|
||||
/// the assignment's version exceeds <see cref="CurrentVersion"/> (forward-compatibility skip).
|
||||
/// Version 0 is treated as version 1 for backward compatibility with pre-versioned entries.
|
||||
/// Idempotent: duplicate assignments for the same stream name are accepted.
|
||||
/// Go reference: jetstream_cluster.go:4541 processStreamAssignment.
|
||||
/// </summary>
|
||||
@@ -426,6 +446,15 @@ public sealed class JetStreamMetaGroup
|
||||
if (string.IsNullOrEmpty(sa.StreamName) || sa.Group == null)
|
||||
return false;
|
||||
|
||||
// Version 0 means pre-versioned (treat as current). Versions above CurrentVersion
|
||||
// are from a newer node and must be skipped to avoid corrupting cluster state.
|
||||
var effectiveVersion = sa.Version == 0 ? CurrentVersion : sa.Version;
|
||||
if (effectiveVersion > CurrentVersion)
|
||||
{
|
||||
Interlocked.Increment(ref _skippedUnsupportedEntries);
|
||||
return false;
|
||||
}
|
||||
|
||||
AddStreamAssignment(sa);
|
||||
return true;
|
||||
}
|
||||
@@ -473,7 +502,9 @@ public sealed class JetStreamMetaGroup
|
||||
|
||||
/// <summary>
|
||||
/// Validates and processes a consumer assignment.
|
||||
/// Returns false if the parent stream does not exist or consumer name is empty.
|
||||
/// Returns false if the parent stream does not exist, consumer name is empty, or if
|
||||
/// the assignment's version exceeds <see cref="CurrentVersion"/> (forward-compatibility skip).
|
||||
/// Version 0 is treated as version 1 for backward compatibility with pre-versioned entries.
|
||||
/// Go reference: jetstream_cluster.go:5300 processConsumerAssignment.
|
||||
/// </summary>
|
||||
public bool ProcessConsumerAssignment(ConsumerAssignment ca)
|
||||
@@ -481,6 +512,15 @@ public sealed class JetStreamMetaGroup
|
||||
if (string.IsNullOrEmpty(ca.ConsumerName) || string.IsNullOrEmpty(ca.StreamName))
|
||||
return false;
|
||||
|
||||
// Version 0 means pre-versioned (treat as current). Versions above CurrentVersion
|
||||
// are from a newer node and must be skipped to avoid corrupting cluster state.
|
||||
var effectiveVersion = ca.Version == 0 ? CurrentVersion : ca.Version;
|
||||
if (effectiveVersion > CurrentVersion)
|
||||
{
|
||||
Interlocked.Increment(ref _skippedUnsupportedEntries);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!_assignments.ContainsKey(ca.StreamName))
|
||||
return false;
|
||||
|
||||
@@ -606,6 +646,14 @@ public sealed class JetStreamMetaGroup
|
||||
case MetaEntryType.PeerRemove:
|
||||
ProcessRemovePeer(name);
|
||||
break;
|
||||
case MetaEntryType.Unknown:
|
||||
default:
|
||||
// Unknown or future entry types are counted and skipped.
|
||||
// This prevents an older node from crashing when it encounters a RAFT entry
|
||||
// produced by a newer cluster peer.
|
||||
// Go reference: jetstream_cluster.go — forward-compatibility fallback.
|
||||
Interlocked.Increment(ref _skippedUnsupportedEntries);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -932,6 +980,13 @@ public enum MetaEntryType
|
||||
/// Go reference: jetstream_cluster.go processRemovePeer.
|
||||
/// </summary>
|
||||
PeerRemove,
|
||||
/// <summary>
|
||||
/// Sentinel value for an unrecognised entry type received from a newer cluster peer.
|
||||
/// Applying an Unknown entry increments <see cref="JetStreamMetaGroup.SkippedUnsupportedEntries"/>
|
||||
/// and is otherwise a no-op, ensuring forward compatibility without crashing.
|
||||
/// Go reference: jetstream_cluster.go — unknown entry type graceful handling.
|
||||
/// </summary>
|
||||
Unknown,
|
||||
}
|
||||
|
||||
public sealed class MetaGroupState
|
||||
|
||||
@@ -0,0 +1,222 @@
|
||||
// Go parity: jetstream_cluster.go — version-incompatible stream/consumer assignment handling.
|
||||
// Covers: future-version SA/CA rejection, unknown MetaEntryType graceful handling,
|
||||
// SkippedUnsupportedEntries counter, mixed-version batch partial apply.
|
||||
using NATS.Server.JetStream.Cluster;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Cluster;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for graceful handling of version-incompatible stream/consumer assignments
|
||||
/// in JetStreamMetaGroup (Gap 2.11).
|
||||
/// Go reference: jetstream_cluster.go — versioned assignment processing, unknown entry fallback.
|
||||
/// </summary>
|
||||
public class UnsupportedAssetTests
|
||||
{
|
||||
// ---------------------------------------------------------------
|
||||
// ProcessStreamAssignment — version checks
|
||||
// Go reference: jetstream_cluster.go:4541 processStreamAssignment
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void ProcessStreamAssignment_skips_future_version()
|
||||
{
|
||||
// Go ref: jetstream_cluster.go — future-version entries are skipped to avoid cluster crash.
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
var sa = new StreamAssignment
|
||||
{
|
||||
StreamName = "ORDERS",
|
||||
Group = new RaftGroup { Name = "orders-group" },
|
||||
Version = 2, // future version — beyond CurrentVersion
|
||||
};
|
||||
|
||||
var result = meta.ProcessStreamAssignment(sa);
|
||||
|
||||
result.ShouldBeFalse();
|
||||
meta.StreamCount.ShouldBe(0);
|
||||
meta.SkippedUnsupportedEntries.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessStreamAssignment_accepts_current_version()
|
||||
{
|
||||
// Go ref: jetstream_cluster.go — current-version entries are processed normally.
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
var sa = new StreamAssignment
|
||||
{
|
||||
StreamName = "ORDERS",
|
||||
Group = new RaftGroup { Name = "orders-group" },
|
||||
Version = JetStreamMetaGroup.CurrentVersion,
|
||||
};
|
||||
|
||||
var result = meta.ProcessStreamAssignment(sa);
|
||||
|
||||
result.ShouldBeTrue();
|
||||
meta.StreamCount.ShouldBe(1);
|
||||
meta.SkippedUnsupportedEntries.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessStreamAssignment_accepts_default_version()
|
||||
{
|
||||
// Go ref: jetstream_cluster.go — version 0 (default/unset) is treated as current version,
|
||||
// maintaining backward compatibility with pre-versioned assignments.
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
var sa = new StreamAssignment
|
||||
{
|
||||
StreamName = "ORDERS",
|
||||
Group = new RaftGroup { Name = "orders-group" },
|
||||
// Version = 0 (default int value — pre-versioned assignment)
|
||||
};
|
||||
|
||||
var result = meta.ProcessStreamAssignment(sa);
|
||||
|
||||
result.ShouldBeTrue();
|
||||
meta.StreamCount.ShouldBe(1);
|
||||
meta.SkippedUnsupportedEntries.ShouldBe(0);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// ProcessConsumerAssignment — version checks
|
||||
// Go reference: jetstream_cluster.go:5300 processConsumerAssignment
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void ProcessConsumerAssignment_skips_future_version()
|
||||
{
|
||||
// Go ref: jetstream_cluster.go — future-version consumer entries are skipped.
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
// First add the parent stream (current version)
|
||||
var sa = new StreamAssignment
|
||||
{
|
||||
StreamName = "ORDERS",
|
||||
Group = new RaftGroup { Name = "orders-group" },
|
||||
};
|
||||
meta.ProcessStreamAssignment(sa);
|
||||
|
||||
var ca = new ConsumerAssignment
|
||||
{
|
||||
ConsumerName = "my-consumer",
|
||||
StreamName = "ORDERS",
|
||||
Group = new RaftGroup { Name = "consumer-group" },
|
||||
Version = 2, // future version
|
||||
};
|
||||
|
||||
var result = meta.ProcessConsumerAssignment(ca);
|
||||
|
||||
result.ShouldBeFalse();
|
||||
meta.ConsumerCount.ShouldBe(0);
|
||||
meta.SkippedUnsupportedEntries.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessConsumerAssignment_accepts_current_version()
|
||||
{
|
||||
// Go ref: jetstream_cluster.go — current-version consumer entries are processed normally.
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
var sa = new StreamAssignment
|
||||
{
|
||||
StreamName = "ORDERS",
|
||||
Group = new RaftGroup { Name = "orders-group" },
|
||||
};
|
||||
meta.ProcessStreamAssignment(sa);
|
||||
|
||||
var ca = new ConsumerAssignment
|
||||
{
|
||||
ConsumerName = "my-consumer",
|
||||
StreamName = "ORDERS",
|
||||
Group = new RaftGroup { Name = "consumer-group" },
|
||||
Version = JetStreamMetaGroup.CurrentVersion,
|
||||
};
|
||||
|
||||
var result = meta.ProcessConsumerAssignment(ca);
|
||||
|
||||
result.ShouldBeTrue();
|
||||
meta.ConsumerCount.ShouldBe(1);
|
||||
meta.SkippedUnsupportedEntries.ShouldBe(0);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// ApplyEntry — unknown entry type
|
||||
// Go reference: jetstream_cluster.go — unknown entry type fallback (no crash)
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void ApplyEntry_unknown_type_does_not_crash()
|
||||
{
|
||||
// Go ref: jetstream_cluster.go — unknown entry types must not crash the cluster;
|
||||
// they are counted and skipped to allow forward compatibility.
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
// Should not throw
|
||||
meta.ApplyEntry(MetaEntryType.Unknown, "something");
|
||||
|
||||
meta.SkippedUnsupportedEntries.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// SkippedUnsupportedEntries counter accumulation
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void SkippedUnsupportedEntries_count_increments_for_each_skip()
|
||||
{
|
||||
// Go ref: jetstream_cluster.go — cluster must track how many unsupported entries
|
||||
// were encountered so operators can detect version skew.
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
var futureStream = new StreamAssignment
|
||||
{
|
||||
StreamName = "STREAM-A",
|
||||
Group = new RaftGroup { Name = "g1" },
|
||||
Version = 99,
|
||||
};
|
||||
var futureStream2 = new StreamAssignment
|
||||
{
|
||||
StreamName = "STREAM-B",
|
||||
Group = new RaftGroup { Name = "g2" },
|
||||
Version = 99,
|
||||
};
|
||||
|
||||
meta.ProcessStreamAssignment(futureStream);
|
||||
meta.ProcessStreamAssignment(futureStream2);
|
||||
meta.ApplyEntry(MetaEntryType.Unknown, "x");
|
||||
|
||||
meta.SkippedUnsupportedEntries.ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Mixed-version batch: only v1 assignments applied
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Mixed_versions_partial_apply()
|
||||
{
|
||||
// Go ref: jetstream_cluster.go — when replaying a RAFT log with mixed-version entries,
|
||||
// supported entries are applied and future-version entries are skipped without affecting
|
||||
// correctly versioned entries.
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
var streams = new[]
|
||||
{
|
||||
new StreamAssignment { StreamName = "S1", Group = new RaftGroup { Name = "g1" }, Version = 1 },
|
||||
new StreamAssignment { StreamName = "S2", Group = new RaftGroup { Name = "g2" }, Version = 2 }, // future
|
||||
new StreamAssignment { StreamName = "S3", Group = new RaftGroup { Name = "g3" }, Version = 1 },
|
||||
new StreamAssignment { StreamName = "S4", Group = new RaftGroup { Name = "g4" }, Version = 3 }, // future
|
||||
new StreamAssignment { StreamName = "S5", Group = new RaftGroup { Name = "g5" }, Version = 0 }, // default = current
|
||||
};
|
||||
|
||||
foreach (var sa in streams)
|
||||
meta.ProcessStreamAssignment(sa);
|
||||
|
||||
// S1, S3, S5 should be applied; S2, S4 skipped
|
||||
meta.StreamCount.ShouldBe(3);
|
||||
meta.GetStreamAssignment("S1").ShouldNotBeNull();
|
||||
meta.GetStreamAssignment("S2").ShouldBeNull();
|
||||
meta.GetStreamAssignment("S3").ShouldNotBeNull();
|
||||
meta.GetStreamAssignment("S4").ShouldBeNull();
|
||||
meta.GetStreamAssignment("S5").ShouldNotBeNull();
|
||||
meta.SkippedUnsupportedEntries.ShouldBe(2);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user