Add Version property to StreamAssignment and ConsumerAssignment so future-version entries from newer cluster peers are gracefully skipped rather than applied incorrectly. ProcessStreamAssignment and ProcessConsumerAssignment now reject entries with Version > CurrentVersion (1), incrementing SkippedUnsupportedEntries for operator visibility. Add MetaEntryType.Unknown with a no-op ApplyEntry handler so unrecognised RAFT entry types never crash the cluster. Version 0 is treated as current for backward compatibility with pre-versioned assignments.
223 lines
8.2 KiB
C#
223 lines
8.2 KiB
C#
// Go parity: jetstream_cluster.go — version-incompatible stream/consumer assignment handling.
|
|
// Covers: future-version SA/CA rejection, unknown MetaEntryType graceful handling,
|
|
// SkippedUnsupportedEntries counter, mixed-version batch partial apply.
|
|
using NATS.Server.JetStream.Cluster;
|
|
|
|
namespace NATS.Server.Tests.JetStream.Cluster;
|
|
|
|
/// <summary>
|
|
/// Tests for graceful handling of version-incompatible stream/consumer assignments
|
|
/// in JetStreamMetaGroup (Gap 2.11).
|
|
/// Go reference: jetstream_cluster.go — versioned assignment processing, unknown entry fallback.
|
|
/// </summary>
|
|
public class UnsupportedAssetTests
|
|
{
|
|
// ---------------------------------------------------------------
|
|
// ProcessStreamAssignment — version checks
|
|
// Go reference: jetstream_cluster.go:4541 processStreamAssignment
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void ProcessStreamAssignment_skips_future_version()
|
|
{
|
|
// Go ref: jetstream_cluster.go — future-version entries are skipped to avoid cluster crash.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var sa = new StreamAssignment
|
|
{
|
|
StreamName = "ORDERS",
|
|
Group = new RaftGroup { Name = "orders-group" },
|
|
Version = 2, // future version — beyond CurrentVersion
|
|
};
|
|
|
|
var result = meta.ProcessStreamAssignment(sa);
|
|
|
|
result.ShouldBeFalse();
|
|
meta.StreamCount.ShouldBe(0);
|
|
meta.SkippedUnsupportedEntries.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public void ProcessStreamAssignment_accepts_current_version()
|
|
{
|
|
// Go ref: jetstream_cluster.go — current-version entries are processed normally.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var sa = new StreamAssignment
|
|
{
|
|
StreamName = "ORDERS",
|
|
Group = new RaftGroup { Name = "orders-group" },
|
|
Version = JetStreamMetaGroup.CurrentVersion,
|
|
};
|
|
|
|
var result = meta.ProcessStreamAssignment(sa);
|
|
|
|
result.ShouldBeTrue();
|
|
meta.StreamCount.ShouldBe(1);
|
|
meta.SkippedUnsupportedEntries.ShouldBe(0);
|
|
}
|
|
|
|
[Fact]
|
|
public void ProcessStreamAssignment_accepts_default_version()
|
|
{
|
|
// Go ref: jetstream_cluster.go — version 0 (default/unset) is treated as current version,
|
|
// maintaining backward compatibility with pre-versioned assignments.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var sa = new StreamAssignment
|
|
{
|
|
StreamName = "ORDERS",
|
|
Group = new RaftGroup { Name = "orders-group" },
|
|
// Version = 0 (default int value — pre-versioned assignment)
|
|
};
|
|
|
|
var result = meta.ProcessStreamAssignment(sa);
|
|
|
|
result.ShouldBeTrue();
|
|
meta.StreamCount.ShouldBe(1);
|
|
meta.SkippedUnsupportedEntries.ShouldBe(0);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// ProcessConsumerAssignment — version checks
|
|
// Go reference: jetstream_cluster.go:5300 processConsumerAssignment
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void ProcessConsumerAssignment_skips_future_version()
|
|
{
|
|
// Go ref: jetstream_cluster.go — future-version consumer entries are skipped.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
|
|
// First add the parent stream (current version)
|
|
var sa = new StreamAssignment
|
|
{
|
|
StreamName = "ORDERS",
|
|
Group = new RaftGroup { Name = "orders-group" },
|
|
};
|
|
meta.ProcessStreamAssignment(sa);
|
|
|
|
var ca = new ConsumerAssignment
|
|
{
|
|
ConsumerName = "my-consumer",
|
|
StreamName = "ORDERS",
|
|
Group = new RaftGroup { Name = "consumer-group" },
|
|
Version = 2, // future version
|
|
};
|
|
|
|
var result = meta.ProcessConsumerAssignment(ca);
|
|
|
|
result.ShouldBeFalse();
|
|
meta.ConsumerCount.ShouldBe(0);
|
|
meta.SkippedUnsupportedEntries.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public void ProcessConsumerAssignment_accepts_current_version()
|
|
{
|
|
// Go ref: jetstream_cluster.go — current-version consumer entries are processed normally.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
|
|
var sa = new StreamAssignment
|
|
{
|
|
StreamName = "ORDERS",
|
|
Group = new RaftGroup { Name = "orders-group" },
|
|
};
|
|
meta.ProcessStreamAssignment(sa);
|
|
|
|
var ca = new ConsumerAssignment
|
|
{
|
|
ConsumerName = "my-consumer",
|
|
StreamName = "ORDERS",
|
|
Group = new RaftGroup { Name = "consumer-group" },
|
|
Version = JetStreamMetaGroup.CurrentVersion,
|
|
};
|
|
|
|
var result = meta.ProcessConsumerAssignment(ca);
|
|
|
|
result.ShouldBeTrue();
|
|
meta.ConsumerCount.ShouldBe(1);
|
|
meta.SkippedUnsupportedEntries.ShouldBe(0);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// ApplyEntry — unknown entry type
|
|
// Go reference: jetstream_cluster.go — unknown entry type fallback (no crash)
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void ApplyEntry_unknown_type_does_not_crash()
|
|
{
|
|
// Go ref: jetstream_cluster.go — unknown entry types must not crash the cluster;
|
|
// they are counted and skipped to allow forward compatibility.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
|
|
// Should not throw
|
|
meta.ApplyEntry(MetaEntryType.Unknown, "something");
|
|
|
|
meta.SkippedUnsupportedEntries.ShouldBe(1);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// SkippedUnsupportedEntries counter accumulation
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void SkippedUnsupportedEntries_count_increments_for_each_skip()
|
|
{
|
|
// Go ref: jetstream_cluster.go — cluster must track how many unsupported entries
|
|
// were encountered so operators can detect version skew.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
|
|
var futureStream = new StreamAssignment
|
|
{
|
|
StreamName = "STREAM-A",
|
|
Group = new RaftGroup { Name = "g1" },
|
|
Version = 99,
|
|
};
|
|
var futureStream2 = new StreamAssignment
|
|
{
|
|
StreamName = "STREAM-B",
|
|
Group = new RaftGroup { Name = "g2" },
|
|
Version = 99,
|
|
};
|
|
|
|
meta.ProcessStreamAssignment(futureStream);
|
|
meta.ProcessStreamAssignment(futureStream2);
|
|
meta.ApplyEntry(MetaEntryType.Unknown, "x");
|
|
|
|
meta.SkippedUnsupportedEntries.ShouldBe(3);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Mixed-version batch: only v1 assignments applied
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Mixed_versions_partial_apply()
|
|
{
|
|
// Go ref: jetstream_cluster.go — when replaying a RAFT log with mixed-version entries,
|
|
// supported entries are applied and future-version entries are skipped without affecting
|
|
// correctly versioned entries.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
|
|
var streams = new[]
|
|
{
|
|
new StreamAssignment { StreamName = "S1", Group = new RaftGroup { Name = "g1" }, Version = 1 },
|
|
new StreamAssignment { StreamName = "S2", Group = new RaftGroup { Name = "g2" }, Version = 2 }, // future
|
|
new StreamAssignment { StreamName = "S3", Group = new RaftGroup { Name = "g3" }, Version = 1 },
|
|
new StreamAssignment { StreamName = "S4", Group = new RaftGroup { Name = "g4" }, Version = 3 }, // future
|
|
new StreamAssignment { StreamName = "S5", Group = new RaftGroup { Name = "g5" }, Version = 0 }, // default = current
|
|
};
|
|
|
|
foreach (var sa in streams)
|
|
meta.ProcessStreamAssignment(sa);
|
|
|
|
// S1, S3, S5 should be applied; S2, S4 skipped
|
|
meta.StreamCount.ShouldBe(3);
|
|
meta.GetStreamAssignment("S1").ShouldNotBeNull();
|
|
meta.GetStreamAssignment("S2").ShouldBeNull();
|
|
meta.GetStreamAssignment("S3").ShouldNotBeNull();
|
|
meta.GetStreamAssignment("S4").ShouldBeNull();
|
|
meta.GetStreamAssignment("S5").ShouldNotBeNull();
|
|
meta.SkippedUnsupportedEntries.ShouldBe(2);
|
|
}
|
|
}
|