From aeeb2e692919ae7002812b4e665a331196a6c0f3 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 09:35:19 -0500 Subject: [PATCH] feat: add binary assignment codec with golden fixture tests (Gap 2.10) Implements AssignmentCodec with JSON serialization for StreamAssignment and ConsumerAssignment, plus Snappy compression helpers for large payloads. Adds 12 tests covering round-trips, error handling, compression, and a golden fixture for format stability. --- .../JetStream/Cluster/AssignmentCodec.cs | 158 ++++++++ .../JetStream/Cluster/AssignmentCodecTests.cs | 367 ++++++++++++++++++ 2 files changed, 525 insertions(+) create mode 100644 src/NATS.Server/JetStream/Cluster/AssignmentCodec.cs create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/AssignmentCodecTests.cs diff --git a/src/NATS.Server/JetStream/Cluster/AssignmentCodec.cs b/src/NATS.Server/JetStream/Cluster/AssignmentCodec.cs new file mode 100644 index 0000000..36d6c69 --- /dev/null +++ b/src/NATS.Server/JetStream/Cluster/AssignmentCodec.cs @@ -0,0 +1,158 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using IronSnappy; + +namespace NATS.Server.JetStream.Cluster; + +/// +/// Binary codec for individual stream and consumer assignments. +/// Uses JSON serialization with optional S2/Snappy compression for large payloads. +/// +/// Compression format: when compresses data, it prepends +/// a single sentinel byte 0x01 before the Snappy block output so that +/// can reliably detect compressed payloads regardless +/// of the Snappy block-format varint header (which varies by uncompressed size and is +/// not a fixed magic sequence). +/// +/// Go reference: jetstream_cluster.go:8703-9246 +/// - encodeAddStreamAssignment / decodeStreamAssignment +/// - encodeAddConsumerAssignment / decodeConsumerAssignment +/// - encodeAddConsumerAssignmentCompressed / decodeConsumerAssignmentCompressed +/// +public static class AssignmentCodec +{ + // Sentinel byte that marks a compressed payload produced by CompressIfLarge. + // Chosen as 0x01 (non-printable, never the first byte of valid JSON which starts + // with '{', '[', '"', digit, or whitespace). + // Go reference: jetstream_cluster.go:9226 uses opcode byte assignCompressedConsumerOp + // as a similar marker before the S2 stream. + private const byte CompressedMarker = 0x01; + + // Use snake_case property names matching Go's JSON field tags, and populate + // the getter-only Consumers dictionary in-place during deserialization. + // Go reference: jetstream_cluster.go json struct tags (stream_name, config_json, etc.) + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, + PreferredObjectCreationHandling = JsonObjectCreationHandling.Populate, + DefaultIgnoreCondition = JsonIgnoreCondition.Never, + }; + + // --------------------------------------------------------------- + // StreamAssignment encode / decode + // Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment + // jetstream_cluster.go:8733 decodeStreamAssignment + // --------------------------------------------------------------- + + /// + /// Serializes a to JSON bytes. + /// Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment — + /// marshals the assignment struct (with ConfigJSON) to JSON. + /// + public static byte[] EncodeStreamAssignment(StreamAssignment sa) + => JsonSerializer.SerializeToUtf8Bytes(sa, SerializerOptions); + + /// + /// Deserializes a from JSON bytes. + /// Returns if is empty or invalid. + /// Go reference: jetstream_cluster.go:8733 decodeStreamAssignment — + /// json.Unmarshal(buf, &sa); returns nil, err on failure. + /// + public static StreamAssignment? DecodeStreamAssignment(ReadOnlySpan data) + { + if (data.IsEmpty) + return null; + + try + { + return JsonSerializer.Deserialize(data, SerializerOptions); + } + catch (JsonException) + { + return null; + } + } + + // --------------------------------------------------------------- + // ConsumerAssignment encode / decode + // Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment + // jetstream_cluster.go:9195 decodeConsumerAssignment + // --------------------------------------------------------------- + + /// + /// Serializes a to JSON bytes. + /// Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment — + /// marshals the assignment struct to JSON. + /// + public static byte[] EncodeConsumerAssignment(ConsumerAssignment ca) + => JsonSerializer.SerializeToUtf8Bytes(ca, SerializerOptions); + + /// + /// Deserializes a from JSON bytes. + /// Returns if is empty or invalid. + /// Go reference: jetstream_cluster.go:9195 decodeConsumerAssignment — + /// json.Unmarshal(buf, &ca); returns nil, err on failure. + /// + public static ConsumerAssignment? DecodeConsumerAssignment(ReadOnlySpan data) + { + if (data.IsEmpty) + return null; + + try + { + return JsonSerializer.Deserialize(data, SerializerOptions); + } + catch (JsonException) + { + return null; + } + } + + // --------------------------------------------------------------- + // Compression helpers + // Go reference: jetstream_cluster.go:9226 encodeAddConsumerAssignmentCompressed + // jetstream_cluster.go:9238 decodeConsumerAssignmentCompressed + // --------------------------------------------------------------- + + /// + /// Compresses using Snappy when its length exceeds + /// . Returns unchanged when + /// it is at or below the threshold. + /// + /// The returned compressed buffer is prefixed with + /// (0x01) so that can reliably identify compressed + /// payloads. The Snappy block format itself has no fixed magic header — only a + /// varint-encoded uncompressed length — so a sentinel prefix is required. + /// + /// Go reference: jetstream_cluster.go:9226 encodeAddConsumerAssignmentCompressed — + /// s2.NewWriter used to compress large consumer assignment payloads; the caller + /// prepends the assignCompressedConsumerOp opcode byte as a similar kind of marker. + /// + public static byte[] CompressIfLarge(byte[] data, int threshold = 1024) + { + if (data.Length <= threshold) + return data; + + var compressed = Snappy.Encode(data); + var result = new byte[1 + compressed.Length]; + result[0] = CompressedMarker; + compressed.CopyTo(result.AsSpan(1)); + return result; + } + + /// + /// Decompresses if it was produced by + /// (i.e., starts with ); otherwise returns + /// unchanged. + /// Go reference: jetstream_cluster.go:9238 decodeConsumerAssignmentCompressed — + /// s2.NewReader used to decompress consumer assignment payloads that were compressed + /// before being proposed to the meta RAFT group. + /// + public static byte[] DecompressIfNeeded(byte[] data) + { + if (data.Length > 0 && data[0] == CompressedMarker) + return Snappy.Decode(data.AsSpan(1)); + + return data; + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentCodecTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentCodecTests.cs new file mode 100644 index 0000000..ac0e8d7 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentCodecTests.cs @@ -0,0 +1,367 @@ +using System.Text; +using System.Text.Json; +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for AssignmentCodec: binary serialization for stream and consumer assignments +/// with optional S2/Snappy compression for large payloads. +/// Go reference: jetstream_cluster.go:8703-9246 (encodeAddStreamAssignment, +/// encodeAddConsumerAssignment, decodeStreamAssignment, decodeConsumerAssignment, +/// encodeAddConsumerAssignmentCompressed, decodeConsumerAssignmentCompressed). +/// +public class AssignmentCodecTests +{ + // --------------------------------------------------------------- + // StreamAssignment round-trip + // Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment / + // 8733 decodeStreamAssignment + // --------------------------------------------------------------- + + [Fact] + public void Encode_decode_stream_assignment_round_trip() + { + // Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment + 8733 decodeStreamAssignment + var created = new DateTime(2025, 3, 15, 9, 0, 0, DateTimeKind.Utc); + var sa = new StreamAssignment + { + StreamName = "orders", + Group = new RaftGroup + { + Name = "rg-orders", + Peers = ["peer-1", "peer-2", "peer-3"], + StorageType = "file", + Cluster = "cluster-east", + Preferred = "peer-1", + DesiredReplicas = 3, + }, + Created = created, + ConfigJson = """{"subjects":["orders.>"],"storage":"file","replicas":3}""", + SyncSubject = "$JS.SYNC.orders", + Responded = true, + Recovering = false, + Reassigning = true, + }; + + var encoded = AssignmentCodec.EncodeStreamAssignment(sa); + encoded.ShouldNotBeEmpty(); + + var decoded = AssignmentCodec.DecodeStreamAssignment(encoded); + decoded.ShouldNotBeNull(); + decoded!.StreamName.ShouldBe("orders"); + decoded.Group.Name.ShouldBe("rg-orders"); + decoded.Group.Peers.ShouldBe(["peer-1", "peer-2", "peer-3"]); + decoded.Group.StorageType.ShouldBe("file"); + decoded.Group.Cluster.ShouldBe("cluster-east"); + decoded.Group.Preferred.ShouldBe("peer-1"); + decoded.Group.DesiredReplicas.ShouldBe(3); + decoded.Created.ShouldBe(created); + decoded.ConfigJson.ShouldBe("""{"subjects":["orders.>"],"storage":"file","replicas":3}"""); + decoded.SyncSubject.ShouldBe("$JS.SYNC.orders"); + decoded.Responded.ShouldBeTrue(); + decoded.Recovering.ShouldBeFalse(); + decoded.Reassigning.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // ConsumerAssignment round-trip + // Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment / + // 9195 decodeConsumerAssignment + // --------------------------------------------------------------- + + [Fact] + public void Encode_decode_consumer_assignment_round_trip() + { + // Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment + 9195 decodeConsumerAssignment + var created = new DateTime(2025, 6, 1, 12, 0, 0, DateTimeKind.Utc); + var ca = new ConsumerAssignment + { + ConsumerName = "push-consumer", + StreamName = "events", + Group = new RaftGroup + { + Name = "rg-push", + Peers = ["node-a", "node-b"], + StorageType = "memory", + DesiredReplicas = 2, + }, + Created = created, + ConfigJson = """{"deliver_subject":"push.out","filter_subject":"events.>"}""", + Responded = true, + Recovering = true, + }; + + var encoded = AssignmentCodec.EncodeConsumerAssignment(ca); + encoded.ShouldNotBeEmpty(); + + var decoded = AssignmentCodec.DecodeConsumerAssignment(encoded); + decoded.ShouldNotBeNull(); + decoded!.ConsumerName.ShouldBe("push-consumer"); + decoded.StreamName.ShouldBe("events"); + decoded.Group.Name.ShouldBe("rg-push"); + decoded.Group.Peers.ShouldBe(["node-a", "node-b"]); + decoded.Group.StorageType.ShouldBe("memory"); + decoded.Group.DesiredReplicas.ShouldBe(2); + decoded.Created.ShouldBe(created); + decoded.ConfigJson.ShouldBe("""{"deliver_subject":"push.out","filter_subject":"events.>"}"""); + decoded.Responded.ShouldBeTrue(); + decoded.Recovering.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // Error handling + // Go reference: jetstream_cluster.go:8733 error return on bad unmarshal + // --------------------------------------------------------------- + + [Fact] + public void Decode_returns_null_for_invalid_data() + { + // Go reference: jetstream_cluster.go:8736 json.Unmarshal error → nil, error + var garbage = new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x01, 0x02, 0x03 }; + var result = AssignmentCodec.DecodeStreamAssignment(garbage); + result.ShouldBeNull(); + } + + [Fact] + public void Decode_returns_null_for_empty_data() + { + // Go reference: jetstream_cluster.go:8733 empty buf → json.Unmarshal fails → nil + var result = AssignmentCodec.DecodeStreamAssignment(ReadOnlySpan.Empty); + result.ShouldBeNull(); + + var caResult = AssignmentCodec.DecodeConsumerAssignment(ReadOnlySpan.Empty); + caResult.ShouldBeNull(); + } + + // --------------------------------------------------------------- + // Compression: CompressIfLarge + // Go reference: jetstream_cluster.go:9226 encodeAddConsumerAssignmentCompressed + // uses s2.NewWriter for large consumer configs + // --------------------------------------------------------------- + + [Fact] + public void CompressIfLarge_compresses_when_above_threshold() + { + // Go reference: jetstream_cluster.go:9226 — S2 compression applied to large consumer assignments + var largeData = Encoding.UTF8.GetBytes(new string('X', 2048)); + var compressed = AssignmentCodec.CompressIfLarge(largeData, threshold: 1024); + + // Snappy compressed data with the stream magic is larger for uniform input but will differ + // The important thing is that the result is NOT the same bytes as the input + compressed.ShouldNotBeSameAs(largeData); + // Compressed form of repeated bytes should typically be shorter + compressed.Length.ShouldBeLessThan(largeData.Length); + } + + [Fact] + public void CompressIfLarge_no_compress_below_threshold() + { + // Go reference: jetstream_cluster.go — small consumer assignments sent uncompressed + var smallData = Encoding.UTF8.GetBytes("""{"stream_name":"foo"}"""); + var result = AssignmentCodec.CompressIfLarge(smallData, threshold: 1024); + + result.ShouldBe(smallData); + } + + // --------------------------------------------------------------- + // Compression: DecompressIfNeeded + // Go reference: jetstream_cluster.go:9238 decodeConsumerAssignmentCompressed + // --------------------------------------------------------------- + + [Fact] + public void DecompressIfNeeded_decompresses_snappy_data() + { + // Go reference: jetstream_cluster.go:9238 decodeConsumerAssignmentCompressed + var original = Encoding.UTF8.GetBytes("""{"stream_name":"test","group":{"name":"rg"}}"""); + var compressed = AssignmentCodec.CompressIfLarge(original, threshold: 0); // force compress + + var decompressed = AssignmentCodec.DecompressIfNeeded(compressed); + decompressed.ShouldBe(original); + } + + [Fact] + public void DecompressIfNeeded_returns_raw_for_non_compressed() + { + // Go reference: jetstream_cluster.go:9195 decodeConsumerAssignment (non-compressed path) + var plainJson = Encoding.UTF8.GetBytes("""{"stream_name":"test"}"""); + var result = AssignmentCodec.DecompressIfNeeded(plainJson); + result.ShouldBe(plainJson); + } + + // --------------------------------------------------------------- + // Consumer preservation in StreamAssignment round-trip + // Go reference: jetstream_cluster.go streamAssignment.Consumers map serialization + // --------------------------------------------------------------- + + [Fact] + public void Stream_assignment_preserves_consumer_assignments() + { + // Go reference: jetstream_cluster.go streamAssignment consumers map preserved in encoding + var sa = new StreamAssignment + { + StreamName = "events", + Group = new RaftGroup { Name = "rg-events", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["events.>"]}""", + }; + + sa.Consumers["consumer-alpha"] = new ConsumerAssignment + { + ConsumerName = "consumer-alpha", + StreamName = "events", + Group = new RaftGroup { Name = "rg-alpha", Peers = ["n1"] }, + ConfigJson = """{"deliver_subject":"out.alpha"}""", + Responded = true, + }; + sa.Consumers["consumer-beta"] = new ConsumerAssignment + { + ConsumerName = "consumer-beta", + StreamName = "events", + Group = new RaftGroup { Name = "rg-beta", Peers = ["n2"] }, + Recovering = true, + }; + sa.Consumers["consumer-gamma"] = new ConsumerAssignment + { + ConsumerName = "consumer-gamma", + StreamName = "events", + Group = new RaftGroup { Name = "rg-gamma", Peers = ["n3"] }, + }; + + var encoded = AssignmentCodec.EncodeStreamAssignment(sa); + var decoded = AssignmentCodec.DecodeStreamAssignment(encoded); + + decoded.ShouldNotBeNull(); + decoded!.Consumers.Count.ShouldBe(3); + decoded.Consumers["consumer-alpha"].ConsumerName.ShouldBe("consumer-alpha"); + decoded.Consumers["consumer-alpha"].Responded.ShouldBeTrue(); + decoded.Consumers["consumer-beta"].Recovering.ShouldBeTrue(); + decoded.Consumers["consumer-gamma"].Group.Name.ShouldBe("rg-gamma"); + } + + // --------------------------------------------------------------- + // RaftGroup peer list preservation + // Go reference: jetstream_cluster.go raftGroup.Peers serialization + // --------------------------------------------------------------- + + [Fact] + public void Stream_assignment_preserves_raft_group_peers() + { + // Go reference: jetstream_cluster.go:154 raftGroup.Peers in assignment encoding + var sa = new StreamAssignment + { + StreamName = "telemetry", + Group = new RaftGroup + { + Name = "rg-telemetry", + Peers = ["peer-alpha", "peer-beta", "peer-gamma"], + DesiredReplicas = 3, + }, + }; + + var encoded = AssignmentCodec.EncodeStreamAssignment(sa); + var decoded = AssignmentCodec.DecodeStreamAssignment(encoded); + + decoded.ShouldNotBeNull(); + decoded!.Group.Peers.Count.ShouldBe(3); + decoded.Group.Peers.ShouldContain("peer-alpha"); + decoded.Group.Peers.ShouldContain("peer-beta"); + decoded.Group.Peers.ShouldContain("peer-gamma"); + decoded.Group.DesiredReplicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Large ConfigJson round-trip through compression + // Go reference: jetstream_cluster.go:9226 encodeAddConsumerAssignmentCompressed + // for large consumer configs + // --------------------------------------------------------------- + + [Fact] + public void Compress_decompress_round_trip_with_large_config() + { + // Go reference: jetstream_cluster.go:9226 — compressed consumer assignment with large config + var largeConfig = """{"subjects":[""" + + string.Join(",", Enumerable.Range(1, 50).Select(i => $"\"events.topic.{i}.>\"")) + + """],"storage":"file","replicas":3,"max_msgs":1000000,"max_bytes":1073741824}"""; + + var ca = new ConsumerAssignment + { + ConsumerName = "large-config-consumer", + StreamName = "big-stream", + Group = new RaftGroup + { + Name = "rg-large", + Peers = ["n1", "n2", "n3"], + }, + ConfigJson = largeConfig, + }; + + var encoded = AssignmentCodec.EncodeConsumerAssignment(ca); + var compressed = AssignmentCodec.CompressIfLarge(encoded, threshold: 512); + + // Compressed should be present (input is large) + compressed.Length.ShouldBeGreaterThan(0); + + var decompressed = AssignmentCodec.DecompressIfNeeded(compressed); + var decoded = AssignmentCodec.DecodeConsumerAssignment(decompressed); + + decoded.ShouldNotBeNull(); + decoded!.ConsumerName.ShouldBe("large-config-consumer"); + decoded.ConfigJson.ShouldBe(largeConfig); + decoded.Group.Peers.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Golden fixture test: known-good JSON bytes decode correctly + // Go reference: jetstream_cluster.go decodeStreamAssignment / decodeConsumerAssignment + // --------------------------------------------------------------- + + [Fact] + public void Golden_fixture_known_bytes() + { + // Go reference: jetstream_cluster.go:8733 decodeStreamAssignment — format stability test. + // This fixture encodes a specific known StreamAssignment JSON and verifies that + // the codec can decode it correctly, ensuring the serialization format remains stable. + // + // The JSON uses snake_case property names (JsonNamingPolicy.SnakeCaseLower). + // Created timestamp: 2025-01-15T00:00:00Z = 638717280000000000 ticks. + const string goldenJson = """ + { + "stream_name": "golden-stream", + "group": { + "name": "rg-golden", + "peers": ["node-1", "node-2", "node-3"], + "storage_type": "file", + "cluster": "us-east", + "preferred": "node-1", + "desired_replicas": 3 + }, + "created": "2025-01-15T00:00:00Z", + "config_json": "{\"subjects\":[\"golden.>\"]}", + "sync_subject": "$JS.SYNC.golden-stream", + "responded": true, + "recovering": false, + "reassigning": false, + "consumers": {} + } + """; + + var bytes = Encoding.UTF8.GetBytes(goldenJson); + var decoded = AssignmentCodec.DecodeStreamAssignment(bytes); + + decoded.ShouldNotBeNull(); + decoded!.StreamName.ShouldBe("golden-stream"); + decoded.Group.Name.ShouldBe("rg-golden"); + decoded.Group.Peers.ShouldBe(["node-1", "node-2", "node-3"]); + decoded.Group.StorageType.ShouldBe("file"); + decoded.Group.Cluster.ShouldBe("us-east"); + decoded.Group.Preferred.ShouldBe("node-1"); + decoded.Group.DesiredReplicas.ShouldBe(3); + decoded.Created.ShouldBe(new DateTime(2025, 1, 15, 0, 0, 0, DateTimeKind.Utc)); + decoded.ConfigJson.ShouldBe("""{"subjects":["golden.>"]}"""); + decoded.SyncSubject.ShouldBe("$JS.SYNC.golden-stream"); + decoded.Responded.ShouldBeTrue(); + decoded.Recovering.ShouldBeFalse(); + decoded.Reassigning.ShouldBeFalse(); + decoded.Consumers.ShouldBeEmpty(); + } +}