diff --git a/src/NATS.Server/JetStream/Cluster/AssignmentCodec.cs b/src/NATS.Server/JetStream/Cluster/AssignmentCodec.cs new file mode 100644 index 0000000..36d6c69 --- /dev/null +++ b/src/NATS.Server/JetStream/Cluster/AssignmentCodec.cs @@ -0,0 +1,158 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using IronSnappy; + +namespace NATS.Server.JetStream.Cluster; + +/// +/// Binary codec for individual stream and consumer assignments. +/// Uses JSON serialization with optional S2/Snappy compression for large payloads. +/// +/// Compression format: when compresses data, it prepends +/// a single sentinel byte 0x01 before the Snappy block output so that +/// can reliably detect compressed payloads regardless +/// of the Snappy block-format varint header (which varies by uncompressed size and is +/// not a fixed magic sequence). +/// +/// Go reference: jetstream_cluster.go:8703-9246 +/// - encodeAddStreamAssignment / decodeStreamAssignment +/// - encodeAddConsumerAssignment / decodeConsumerAssignment +/// - encodeAddConsumerAssignmentCompressed / decodeConsumerAssignmentCompressed +/// +public static class AssignmentCodec +{ + // Sentinel byte that marks a compressed payload produced by CompressIfLarge. + // Chosen as 0x01 (non-printable, never the first byte of valid JSON which starts + // with '{', '[', '"', digit, or whitespace). + // Go reference: jetstream_cluster.go:9226 uses opcode byte assignCompressedConsumerOp + // as a similar marker before the S2 stream. + private const byte CompressedMarker = 0x01; + + // Use snake_case property names matching Go's JSON field tags, and populate + // the getter-only Consumers dictionary in-place during deserialization. + // Go reference: jetstream_cluster.go json struct tags (stream_name, config_json, etc.) + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, + PreferredObjectCreationHandling = JsonObjectCreationHandling.Populate, + DefaultIgnoreCondition = JsonIgnoreCondition.Never, + }; + + // --------------------------------------------------------------- + // StreamAssignment encode / decode + // Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment + // jetstream_cluster.go:8733 decodeStreamAssignment + // --------------------------------------------------------------- + + /// + /// Serializes a to JSON bytes. + /// Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment — + /// marshals the assignment struct (with ConfigJSON) to JSON. + /// + public static byte[] EncodeStreamAssignment(StreamAssignment sa) + => JsonSerializer.SerializeToUtf8Bytes(sa, SerializerOptions); + + /// + /// Deserializes a from JSON bytes. + /// Returns if is empty or invalid. + /// Go reference: jetstream_cluster.go:8733 decodeStreamAssignment — + /// json.Unmarshal(buf, &sa); returns nil, err on failure. + /// + public static StreamAssignment? DecodeStreamAssignment(ReadOnlySpan data) + { + if (data.IsEmpty) + return null; + + try + { + return JsonSerializer.Deserialize(data, SerializerOptions); + } + catch (JsonException) + { + return null; + } + } + + // --------------------------------------------------------------- + // ConsumerAssignment encode / decode + // Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment + // jetstream_cluster.go:9195 decodeConsumerAssignment + // --------------------------------------------------------------- + + /// + /// Serializes a to JSON bytes. + /// Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment — + /// marshals the assignment struct to JSON. + /// + public static byte[] EncodeConsumerAssignment(ConsumerAssignment ca) + => JsonSerializer.SerializeToUtf8Bytes(ca, SerializerOptions); + + /// + /// Deserializes a from JSON bytes. + /// Returns if is empty or invalid. + /// Go reference: jetstream_cluster.go:9195 decodeConsumerAssignment — + /// json.Unmarshal(buf, &ca); returns nil, err on failure. + /// + public static ConsumerAssignment? DecodeConsumerAssignment(ReadOnlySpan data) + { + if (data.IsEmpty) + return null; + + try + { + return JsonSerializer.Deserialize(data, SerializerOptions); + } + catch (JsonException) + { + return null; + } + } + + // --------------------------------------------------------------- + // Compression helpers + // Go reference: jetstream_cluster.go:9226 encodeAddConsumerAssignmentCompressed + // jetstream_cluster.go:9238 decodeConsumerAssignmentCompressed + // --------------------------------------------------------------- + + /// + /// Compresses using Snappy when its length exceeds + /// . Returns unchanged when + /// it is at or below the threshold. + /// + /// The returned compressed buffer is prefixed with + /// (0x01) so that can reliably identify compressed + /// payloads. The Snappy block format itself has no fixed magic header — only a + /// varint-encoded uncompressed length — so a sentinel prefix is required. + /// + /// Go reference: jetstream_cluster.go:9226 encodeAddConsumerAssignmentCompressed — + /// s2.NewWriter used to compress large consumer assignment payloads; the caller + /// prepends the assignCompressedConsumerOp opcode byte as a similar kind of marker. + /// + public static byte[] CompressIfLarge(byte[] data, int threshold = 1024) + { + if (data.Length <= threshold) + return data; + + var compressed = Snappy.Encode(data); + var result = new byte[1 + compressed.Length]; + result[0] = CompressedMarker; + compressed.CopyTo(result.AsSpan(1)); + return result; + } + + /// + /// Decompresses if it was produced by + /// (i.e., starts with ); otherwise returns + /// unchanged. + /// Go reference: jetstream_cluster.go:9238 decodeConsumerAssignmentCompressed — + /// s2.NewReader used to decompress consumer assignment payloads that were compressed + /// before being proposed to the meta RAFT group. + /// + public static byte[] DecompressIfNeeded(byte[] data) + { + if (data.Length > 0 && data[0] == CompressedMarker) + return Snappy.Decode(data.AsSpan(1)); + + return data; + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentCodecTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentCodecTests.cs new file mode 100644 index 0000000..ac0e8d7 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentCodecTests.cs @@ -0,0 +1,367 @@ +using System.Text; +using System.Text.Json; +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for AssignmentCodec: binary serialization for stream and consumer assignments +/// with optional S2/Snappy compression for large payloads. +/// Go reference: jetstream_cluster.go:8703-9246 (encodeAddStreamAssignment, +/// encodeAddConsumerAssignment, decodeStreamAssignment, decodeConsumerAssignment, +/// encodeAddConsumerAssignmentCompressed, decodeConsumerAssignmentCompressed). +/// +public class AssignmentCodecTests +{ + // --------------------------------------------------------------- + // StreamAssignment round-trip + // Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment / + // 8733 decodeStreamAssignment + // --------------------------------------------------------------- + + [Fact] + public void Encode_decode_stream_assignment_round_trip() + { + // Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment + 8733 decodeStreamAssignment + var created = new DateTime(2025, 3, 15, 9, 0, 0, DateTimeKind.Utc); + var sa = new StreamAssignment + { + StreamName = "orders", + Group = new RaftGroup + { + Name = "rg-orders", + Peers = ["peer-1", "peer-2", "peer-3"], + StorageType = "file", + Cluster = "cluster-east", + Preferred = "peer-1", + DesiredReplicas = 3, + }, + Created = created, + ConfigJson = """{"subjects":["orders.>"],"storage":"file","replicas":3}""", + SyncSubject = "$JS.SYNC.orders", + Responded = true, + Recovering = false, + Reassigning = true, + }; + + var encoded = AssignmentCodec.EncodeStreamAssignment(sa); + encoded.ShouldNotBeEmpty(); + + var decoded = AssignmentCodec.DecodeStreamAssignment(encoded); + decoded.ShouldNotBeNull(); + decoded!.StreamName.ShouldBe("orders"); + decoded.Group.Name.ShouldBe("rg-orders"); + decoded.Group.Peers.ShouldBe(["peer-1", "peer-2", "peer-3"]); + decoded.Group.StorageType.ShouldBe("file"); + decoded.Group.Cluster.ShouldBe("cluster-east"); + decoded.Group.Preferred.ShouldBe("peer-1"); + decoded.Group.DesiredReplicas.ShouldBe(3); + decoded.Created.ShouldBe(created); + decoded.ConfigJson.ShouldBe("""{"subjects":["orders.>"],"storage":"file","replicas":3}"""); + decoded.SyncSubject.ShouldBe("$JS.SYNC.orders"); + decoded.Responded.ShouldBeTrue(); + decoded.Recovering.ShouldBeFalse(); + decoded.Reassigning.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // ConsumerAssignment round-trip + // Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment / + // 9195 decodeConsumerAssignment + // --------------------------------------------------------------- + + [Fact] + public void Encode_decode_consumer_assignment_round_trip() + { + // Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment + 9195 decodeConsumerAssignment + var created = new DateTime(2025, 6, 1, 12, 0, 0, DateTimeKind.Utc); + var ca = new ConsumerAssignment + { + ConsumerName = "push-consumer", + StreamName = "events", + Group = new RaftGroup + { + Name = "rg-push", + Peers = ["node-a", "node-b"], + StorageType = "memory", + DesiredReplicas = 2, + }, + Created = created, + ConfigJson = """{"deliver_subject":"push.out","filter_subject":"events.>"}""", + Responded = true, + Recovering = true, + }; + + var encoded = AssignmentCodec.EncodeConsumerAssignment(ca); + encoded.ShouldNotBeEmpty(); + + var decoded = AssignmentCodec.DecodeConsumerAssignment(encoded); + decoded.ShouldNotBeNull(); + decoded!.ConsumerName.ShouldBe("push-consumer"); + decoded.StreamName.ShouldBe("events"); + decoded.Group.Name.ShouldBe("rg-push"); + decoded.Group.Peers.ShouldBe(["node-a", "node-b"]); + decoded.Group.StorageType.ShouldBe("memory"); + decoded.Group.DesiredReplicas.ShouldBe(2); + decoded.Created.ShouldBe(created); + decoded.ConfigJson.ShouldBe("""{"deliver_subject":"push.out","filter_subject":"events.>"}"""); + decoded.Responded.ShouldBeTrue(); + decoded.Recovering.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // Error handling + // Go reference: jetstream_cluster.go:8733 error return on bad unmarshal + // --------------------------------------------------------------- + + [Fact] + public void Decode_returns_null_for_invalid_data() + { + // Go reference: jetstream_cluster.go:8736 json.Unmarshal error → nil, error + var garbage = new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x01, 0x02, 0x03 }; + var result = AssignmentCodec.DecodeStreamAssignment(garbage); + result.ShouldBeNull(); + } + + [Fact] + public void Decode_returns_null_for_empty_data() + { + // Go reference: jetstream_cluster.go:8733 empty buf → json.Unmarshal fails → nil + var result = AssignmentCodec.DecodeStreamAssignment(ReadOnlySpan.Empty); + result.ShouldBeNull(); + + var caResult = AssignmentCodec.DecodeConsumerAssignment(ReadOnlySpan.Empty); + caResult.ShouldBeNull(); + } + + // --------------------------------------------------------------- + // Compression: CompressIfLarge + // Go reference: jetstream_cluster.go:9226 encodeAddConsumerAssignmentCompressed + // uses s2.NewWriter for large consumer configs + // --------------------------------------------------------------- + + [Fact] + public void CompressIfLarge_compresses_when_above_threshold() + { + // Go reference: jetstream_cluster.go:9226 — S2 compression applied to large consumer assignments + var largeData = Encoding.UTF8.GetBytes(new string('X', 2048)); + var compressed = AssignmentCodec.CompressIfLarge(largeData, threshold: 1024); + + // Snappy compressed data with the stream magic is larger for uniform input but will differ + // The important thing is that the result is NOT the same bytes as the input + compressed.ShouldNotBeSameAs(largeData); + // Compressed form of repeated bytes should typically be shorter + compressed.Length.ShouldBeLessThan(largeData.Length); + } + + [Fact] + public void CompressIfLarge_no_compress_below_threshold() + { + // Go reference: jetstream_cluster.go — small consumer assignments sent uncompressed + var smallData = Encoding.UTF8.GetBytes("""{"stream_name":"foo"}"""); + var result = AssignmentCodec.CompressIfLarge(smallData, threshold: 1024); + + result.ShouldBe(smallData); + } + + // --------------------------------------------------------------- + // Compression: DecompressIfNeeded + // Go reference: jetstream_cluster.go:9238 decodeConsumerAssignmentCompressed + // --------------------------------------------------------------- + + [Fact] + public void DecompressIfNeeded_decompresses_snappy_data() + { + // Go reference: jetstream_cluster.go:9238 decodeConsumerAssignmentCompressed + var original = Encoding.UTF8.GetBytes("""{"stream_name":"test","group":{"name":"rg"}}"""); + var compressed = AssignmentCodec.CompressIfLarge(original, threshold: 0); // force compress + + var decompressed = AssignmentCodec.DecompressIfNeeded(compressed); + decompressed.ShouldBe(original); + } + + [Fact] + public void DecompressIfNeeded_returns_raw_for_non_compressed() + { + // Go reference: jetstream_cluster.go:9195 decodeConsumerAssignment (non-compressed path) + var plainJson = Encoding.UTF8.GetBytes("""{"stream_name":"test"}"""); + var result = AssignmentCodec.DecompressIfNeeded(plainJson); + result.ShouldBe(plainJson); + } + + // --------------------------------------------------------------- + // Consumer preservation in StreamAssignment round-trip + // Go reference: jetstream_cluster.go streamAssignment.Consumers map serialization + // --------------------------------------------------------------- + + [Fact] + public void Stream_assignment_preserves_consumer_assignments() + { + // Go reference: jetstream_cluster.go streamAssignment consumers map preserved in encoding + var sa = new StreamAssignment + { + StreamName = "events", + Group = new RaftGroup { Name = "rg-events", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["events.>"]}""", + }; + + sa.Consumers["consumer-alpha"] = new ConsumerAssignment + { + ConsumerName = "consumer-alpha", + StreamName = "events", + Group = new RaftGroup { Name = "rg-alpha", Peers = ["n1"] }, + ConfigJson = """{"deliver_subject":"out.alpha"}""", + Responded = true, + }; + sa.Consumers["consumer-beta"] = new ConsumerAssignment + { + ConsumerName = "consumer-beta", + StreamName = "events", + Group = new RaftGroup { Name = "rg-beta", Peers = ["n2"] }, + Recovering = true, + }; + sa.Consumers["consumer-gamma"] = new ConsumerAssignment + { + ConsumerName = "consumer-gamma", + StreamName = "events", + Group = new RaftGroup { Name = "rg-gamma", Peers = ["n3"] }, + }; + + var encoded = AssignmentCodec.EncodeStreamAssignment(sa); + var decoded = AssignmentCodec.DecodeStreamAssignment(encoded); + + decoded.ShouldNotBeNull(); + decoded!.Consumers.Count.ShouldBe(3); + decoded.Consumers["consumer-alpha"].ConsumerName.ShouldBe("consumer-alpha"); + decoded.Consumers["consumer-alpha"].Responded.ShouldBeTrue(); + decoded.Consumers["consumer-beta"].Recovering.ShouldBeTrue(); + decoded.Consumers["consumer-gamma"].Group.Name.ShouldBe("rg-gamma"); + } + + // --------------------------------------------------------------- + // RaftGroup peer list preservation + // Go reference: jetstream_cluster.go raftGroup.Peers serialization + // --------------------------------------------------------------- + + [Fact] + public void Stream_assignment_preserves_raft_group_peers() + { + // Go reference: jetstream_cluster.go:154 raftGroup.Peers in assignment encoding + var sa = new StreamAssignment + { + StreamName = "telemetry", + Group = new RaftGroup + { + Name = "rg-telemetry", + Peers = ["peer-alpha", "peer-beta", "peer-gamma"], + DesiredReplicas = 3, + }, + }; + + var encoded = AssignmentCodec.EncodeStreamAssignment(sa); + var decoded = AssignmentCodec.DecodeStreamAssignment(encoded); + + decoded.ShouldNotBeNull(); + decoded!.Group.Peers.Count.ShouldBe(3); + decoded.Group.Peers.ShouldContain("peer-alpha"); + decoded.Group.Peers.ShouldContain("peer-beta"); + decoded.Group.Peers.ShouldContain("peer-gamma"); + decoded.Group.DesiredReplicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Large ConfigJson round-trip through compression + // Go reference: jetstream_cluster.go:9226 encodeAddConsumerAssignmentCompressed + // for large consumer configs + // --------------------------------------------------------------- + + [Fact] + public void Compress_decompress_round_trip_with_large_config() + { + // Go reference: jetstream_cluster.go:9226 — compressed consumer assignment with large config + var largeConfig = """{"subjects":[""" + + string.Join(",", Enumerable.Range(1, 50).Select(i => $"\"events.topic.{i}.>\"")) + + """],"storage":"file","replicas":3,"max_msgs":1000000,"max_bytes":1073741824}"""; + + var ca = new ConsumerAssignment + { + ConsumerName = "large-config-consumer", + StreamName = "big-stream", + Group = new RaftGroup + { + Name = "rg-large", + Peers = ["n1", "n2", "n3"], + }, + ConfigJson = largeConfig, + }; + + var encoded = AssignmentCodec.EncodeConsumerAssignment(ca); + var compressed = AssignmentCodec.CompressIfLarge(encoded, threshold: 512); + + // Compressed should be present (input is large) + compressed.Length.ShouldBeGreaterThan(0); + + var decompressed = AssignmentCodec.DecompressIfNeeded(compressed); + var decoded = AssignmentCodec.DecodeConsumerAssignment(decompressed); + + decoded.ShouldNotBeNull(); + decoded!.ConsumerName.ShouldBe("large-config-consumer"); + decoded.ConfigJson.ShouldBe(largeConfig); + decoded.Group.Peers.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Golden fixture test: known-good JSON bytes decode correctly + // Go reference: jetstream_cluster.go decodeStreamAssignment / decodeConsumerAssignment + // --------------------------------------------------------------- + + [Fact] + public void Golden_fixture_known_bytes() + { + // Go reference: jetstream_cluster.go:8733 decodeStreamAssignment — format stability test. + // This fixture encodes a specific known StreamAssignment JSON and verifies that + // the codec can decode it correctly, ensuring the serialization format remains stable. + // + // The JSON uses snake_case property names (JsonNamingPolicy.SnakeCaseLower). + // Created timestamp: 2025-01-15T00:00:00Z = 638717280000000000 ticks. + const string goldenJson = """ + { + "stream_name": "golden-stream", + "group": { + "name": "rg-golden", + "peers": ["node-1", "node-2", "node-3"], + "storage_type": "file", + "cluster": "us-east", + "preferred": "node-1", + "desired_replicas": 3 + }, + "created": "2025-01-15T00:00:00Z", + "config_json": "{\"subjects\":[\"golden.>\"]}", + "sync_subject": "$JS.SYNC.golden-stream", + "responded": true, + "recovering": false, + "reassigning": false, + "consumers": {} + } + """; + + var bytes = Encoding.UTF8.GetBytes(goldenJson); + var decoded = AssignmentCodec.DecodeStreamAssignment(bytes); + + decoded.ShouldNotBeNull(); + decoded!.StreamName.ShouldBe("golden-stream"); + decoded.Group.Name.ShouldBe("rg-golden"); + decoded.Group.Peers.ShouldBe(["node-1", "node-2", "node-3"]); + decoded.Group.StorageType.ShouldBe("file"); + decoded.Group.Cluster.ShouldBe("us-east"); + decoded.Group.Preferred.ShouldBe("node-1"); + decoded.Group.DesiredReplicas.ShouldBe(3); + decoded.Created.ShouldBe(new DateTime(2025, 1, 15, 0, 0, 0, DateTimeKind.Utc)); + decoded.ConfigJson.ShouldBe("""{"subjects":["golden.>"]}"""); + decoded.SyncSubject.ShouldBe("$JS.SYNC.golden-stream"); + decoded.Responded.ShouldBeTrue(); + decoded.Recovering.ShouldBeFalse(); + decoded.Reassigning.ShouldBeFalse(); + decoded.Consumers.ShouldBeEmpty(); + } +}