diff --git a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs index 2e49fe8..9650d83 100644 --- a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs +++ b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs @@ -1,3 +1,5 @@ +using System.Text.Json.Serialization; + namespace NATS.Server.JetStream.Cluster; /// @@ -30,6 +32,13 @@ public sealed class StreamAssignment public bool Responded { get; set; } public bool Recovering { get; set; } public bool Reassigning { get; set; } + + /// + /// Consumer assignments keyed by consumer name. + /// Uses so the deserializer populates + /// the existing dictionary instance rather than replacing it (the property has no setter). + /// + [JsonObjectCreationHandling(JsonObjectCreationHandling.Populate)] public Dictionary Consumers { get; } = new(StringComparer.Ordinal); } diff --git a/src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs b/src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs new file mode 100644 index 0000000..0b328ea --- /dev/null +++ b/src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs @@ -0,0 +1,60 @@ +using System.Buffers.Binary; +using System.Text.Json; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.JetStream.Cluster; + +/// +/// Binary codec for meta-group snapshots. +/// Format: [2:version_le][N:S2-compressed JSON of assignment map] +/// Go reference: jetstream_cluster.go:2075-2145 (encodeMetaSnapshot/decodeMetaSnapshot) +/// +internal static class MetaSnapshotCodec +{ + private const ushort CurrentVersion = 1; + + // Use Populate so the getter-only Consumers dictionary on StreamAssignment + // is populated in-place by the deserializer rather than requiring a setter. + // Go reference: jetstream_cluster.go streamAssignment consumers map restoration. + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PreferredObjectCreationHandling = System.Text.Json.Serialization.JsonObjectCreationHandling.Populate, + }; + + /// + /// Encodes into the versioned, S2-compressed binary format. + /// Go reference: jetstream_cluster.go:2075 encodeMetaSnapshot. + /// + public static byte[] Encode(Dictionary assignments) + { + var json = JsonSerializer.SerializeToUtf8Bytes(assignments, SerializerOptions); + var compressed = S2Codec.Compress(json); + + var result = new byte[2 + compressed.Length]; + BinaryPrimitives.WriteUInt16LittleEndian(result, CurrentVersion); + compressed.CopyTo(result, 2); + return result; + } + + /// + /// Decodes a versioned, S2-compressed binary snapshot into a stream assignment map. + /// Go reference: jetstream_cluster.go:2100 decodeMetaSnapshot. + /// + /// + /// Thrown when is too short or contains an unrecognised version. + /// + public static Dictionary Decode(byte[] data) + { + if (data.Length < 2) + throw new InvalidOperationException("Meta snapshot too short to contain version header."); + + var version = BinaryPrimitives.ReadUInt16LittleEndian(data); + if (version != CurrentVersion) + throw new InvalidOperationException($"Unknown meta snapshot version: {version}"); + + var compressed = data.AsSpan(2); + var json = S2Codec.Decompress(compressed); + return JsonSerializer.Deserialize>(json, SerializerOptions) + ?? new Dictionary(); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/MetaSnapshotCodecTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/MetaSnapshotCodecTests.cs new file mode 100644 index 0000000..f3bb304 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/MetaSnapshotCodecTests.cs @@ -0,0 +1,205 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for MetaSnapshotCodec: encode/decode round-trip, S2 compression, versioning. +/// Go reference: jetstream_cluster.go:2075-2145. +/// +public class MetaSnapshotCodecTests +{ + [Fact] + public void Encode_decode_round_trips() + { + // Go reference: jetstream_cluster.go encodeMetaSnapshot/decodeMetaSnapshot round-trip + var assignments = new Dictionary + { + ["stream-A"] = new StreamAssignment + { + StreamName = "stream-A", + Group = new RaftGroup { Name = "rg-a", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["foo.>"]}""", + }, + ["stream-B"] = new StreamAssignment + { + StreamName = "stream-B", + Group = new RaftGroup { Name = "rg-b", Peers = ["n1", "n2"] }, + ConfigJson = """{"subjects":["bar.>"]}""", + }, + }; + + // Add a consumer to stream-B + assignments["stream-B"].Consumers["con-1"] = new ConsumerAssignment + { + ConsumerName = "con-1", + StreamName = "stream-B", + Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2"] }, + }; + + var encoded = MetaSnapshotCodec.Encode(assignments); + encoded.ShouldNotBeEmpty(); + + var decoded = MetaSnapshotCodec.Decode(encoded); + decoded.Count.ShouldBe(2); + decoded["stream-A"].StreamName.ShouldBe("stream-A"); + decoded["stream-A"].Group.Peers.Count.ShouldBe(3); + decoded["stream-B"].Consumers.Count.ShouldBe(1); + decoded["stream-B"].Consumers["con-1"].ConsumerName.ShouldBe("con-1"); + } + + [Fact] + public void Encoded_snapshot_is_compressed() + { + // Go reference: jetstream_cluster.go S2 compression of meta snapshots + var assignments = new Dictionary(); + for (int i = 0; i < 100; i++) + { + assignments[$"stream-{i}"] = new StreamAssignment + { + StreamName = $"stream-{i}", + Group = new RaftGroup { Name = $"rg-{i}", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["test.>"]}""", + }; + } + + var encoded = MetaSnapshotCodec.Encode(assignments); + var json = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(assignments); + + // S2 compressed + 2-byte version header should be smaller than raw JSON + encoded.Length.ShouldBeLessThan(json.Length); + } + + [Fact] + public void Empty_snapshot_round_trips() + { + // Go reference: jetstream_cluster.go decodeMetaSnapshot handles empty map + var empty = new Dictionary(); + var encoded = MetaSnapshotCodec.Encode(empty); + var decoded = MetaSnapshotCodec.Decode(encoded); + decoded.ShouldBeEmpty(); + } + + [Fact] + public void Versioned_format_rejects_unknown_version() + { + // Go reference: jetstream_cluster.go version check in decodeMetaSnapshot + var bad = new byte[] { 0xFF, 0xFF, 0, 0 }; // version 65535 + Should.Throw(() => MetaSnapshotCodec.Decode(bad)); + } + + [Fact] + public void Decode_rejects_too_short_input() + { + // Go reference: jetstream_cluster.go guard against truncated snapshot + Should.Throw(() => MetaSnapshotCodec.Decode([0x01])); + } + + [Fact] + public void Encoded_snapshot_begins_with_version_one_header() + { + // Go reference: jetstream_cluster.go:2075 — versioned header allows future format evolution + var assignments = new Dictionary + { + ["s1"] = new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "g1", Peers = ["n1"] }, + }, + }; + + var encoded = MetaSnapshotCodec.Encode(assignments); + + // Little-endian version 1: bytes [0x01, 0x00] + encoded[0].ShouldBe((byte)0x01); + encoded[1].ShouldBe((byte)0x00); + } + + [Fact] + public void Round_trip_preserves_all_stream_assignment_fields() + { + // Go reference: jetstream_cluster.go streamAssignment struct fields preserved across snapshot + var created = new DateTime(2025, 6, 15, 12, 0, 0, DateTimeKind.Utc); + var assignments = new Dictionary + { + ["my-stream"] = new StreamAssignment + { + StreamName = "my-stream", + Group = new RaftGroup + { + Name = "rg-main", + Peers = ["peer-a", "peer-b", "peer-c"], + StorageType = "memory", + Cluster = "cluster-east", + Preferred = "peer-a", + }, + Created = created, + ConfigJson = """{"subjects":["events.>"],"storage":"memory"}""", + SyncSubject = "$JS.SYNC.my-stream", + Responded = true, + Recovering = false, + Reassigning = true, + }, + }; + + var decoded = MetaSnapshotCodec.Decode(MetaSnapshotCodec.Encode(assignments)); + + var sa = decoded["my-stream"]; + sa.StreamName.ShouldBe("my-stream"); + sa.Group.Name.ShouldBe("rg-main"); + sa.Group.Peers.ShouldBe(["peer-a", "peer-b", "peer-c"]); + sa.Group.StorageType.ShouldBe("memory"); + sa.Group.Cluster.ShouldBe("cluster-east"); + sa.Group.Preferred.ShouldBe("peer-a"); + sa.Created.ShouldBe(created); + sa.ConfigJson.ShouldBe("""{"subjects":["events.>"],"storage":"memory"}"""); + sa.SyncSubject.ShouldBe("$JS.SYNC.my-stream"); + sa.Responded.ShouldBeTrue(); + sa.Recovering.ShouldBeFalse(); + sa.Reassigning.ShouldBeTrue(); + } + + [Fact] + public void Round_trip_preserves_multiple_consumers_per_stream() + { + // Go reference: jetstream_cluster.go consumerAssignment map restored in snapshot + var sa = new StreamAssignment + { + StreamName = "multi-consumer-stream", + Group = new RaftGroup { Name = "rg-mc", Peers = ["n1", "n2", "n3"] }, + }; + + sa.Consumers["consumer-alpha"] = new ConsumerAssignment + { + ConsumerName = "consumer-alpha", + StreamName = "multi-consumer-stream", + Group = new RaftGroup { Name = "rg-alpha", Peers = ["n1"] }, + ConfigJson = """{"deliver_subject":"out.alpha"}""", + Responded = true, + }; + sa.Consumers["consumer-beta"] = new ConsumerAssignment + { + ConsumerName = "consumer-beta", + StreamName = "multi-consumer-stream", + Group = new RaftGroup { Name = "rg-beta", Peers = ["n2", "n3"] }, + Recovering = true, + }; + + var assignments = new Dictionary { ["multi-consumer-stream"] = sa }; + var decoded = MetaSnapshotCodec.Decode(MetaSnapshotCodec.Encode(assignments)); + + var dsa = decoded["multi-consumer-stream"]; + dsa.Consumers.Count.ShouldBe(2); + + var alpha = dsa.Consumers["consumer-alpha"]; + alpha.ConsumerName.ShouldBe("consumer-alpha"); + alpha.StreamName.ShouldBe("multi-consumer-stream"); + alpha.Group.Name.ShouldBe("rg-alpha"); + alpha.ConfigJson.ShouldBe("""{"deliver_subject":"out.alpha"}"""); + alpha.Responded.ShouldBeTrue(); + + var beta = dsa.Consumers["consumer-beta"]; + beta.ConsumerName.ShouldBe("consumer-beta"); + beta.Group.Peers.Count.ShouldBe(2); + beta.Recovering.ShouldBeTrue(); + } +}