feat(cluster): add MetaSnapshotCodec with S2 compression and versioned format

Implements binary codec for meta-group snapshots: 2-byte little-endian version
header followed by S2-compressed JSON of the stream assignment map. Adds
[JsonObjectCreationHandling(Populate)] to StreamAssignment.Consumers so the
getter-only dictionary is populated in-place during deserialization. 8 tests
covering round-trip, compression ratio, field fidelity, multi-consumer restore,
version rejection, and truncation guard.

Go reference: jetstream_cluster.go:2075-2145 (encodeMetaSnapshot/decodeMetaSnapshot)
This commit is contained in:
Joseph Doherty
2026-02-25 01:46:32 -05:00
parent e0f5fe7150
commit 9fdc931ff5
3 changed files with 274 additions and 0 deletions

View File

@@ -1,3 +1,5 @@
using System.Text.Json.Serialization;
namespace NATS.Server.JetStream.Cluster;
/// <summary>
@@ -30,6 +32,13 @@ public sealed class StreamAssignment
public bool Responded { get; set; }
public bool Recovering { get; set; }
public bool Reassigning { get; set; }
/// <summary>
/// Consumer assignments keyed by consumer name.
/// Uses <see cref="JsonObjectCreationHandling.Populate"/> so the deserializer populates
/// the existing dictionary instance rather than replacing it (the property has no setter).
/// </summary>
[JsonObjectCreationHandling(JsonObjectCreationHandling.Populate)]
public Dictionary<string, ConsumerAssignment> Consumers { get; } = new(StringComparer.Ordinal);
}

View File

@@ -0,0 +1,60 @@
using System.Buffers.Binary;
using System.Text.Json;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Cluster;
/// <summary>
/// Binary codec for meta-group snapshots.
/// Format: [2:version_le][N:S2-compressed JSON of assignment map]
/// Go reference: jetstream_cluster.go:2075-2145 (encodeMetaSnapshot/decodeMetaSnapshot)
/// </summary>
internal static class MetaSnapshotCodec
{
private const ushort CurrentVersion = 1;
// Use Populate so the getter-only Consumers dictionary on StreamAssignment
// is populated in-place by the deserializer rather than requiring a setter.
// Go reference: jetstream_cluster.go streamAssignment consumers map restoration.
private static readonly JsonSerializerOptions SerializerOptions = new()
{
PreferredObjectCreationHandling = System.Text.Json.Serialization.JsonObjectCreationHandling.Populate,
};
/// <summary>
/// Encodes <paramref name="assignments"/> into the versioned, S2-compressed binary format.
/// Go reference: jetstream_cluster.go:2075 encodeMetaSnapshot.
/// </summary>
public static byte[] Encode(Dictionary<string, StreamAssignment> assignments)
{
var json = JsonSerializer.SerializeToUtf8Bytes(assignments, SerializerOptions);
var compressed = S2Codec.Compress(json);
var result = new byte[2 + compressed.Length];
BinaryPrimitives.WriteUInt16LittleEndian(result, CurrentVersion);
compressed.CopyTo(result, 2);
return result;
}
/// <summary>
/// Decodes a versioned, S2-compressed binary snapshot into a stream assignment map.
/// Go reference: jetstream_cluster.go:2100 decodeMetaSnapshot.
/// </summary>
/// <exception cref="InvalidOperationException">
/// Thrown when <paramref name="data"/> is too short or contains an unrecognised version.
/// </exception>
public static Dictionary<string, StreamAssignment> Decode(byte[] data)
{
if (data.Length < 2)
throw new InvalidOperationException("Meta snapshot too short to contain version header.");
var version = BinaryPrimitives.ReadUInt16LittleEndian(data);
if (version != CurrentVersion)
throw new InvalidOperationException($"Unknown meta snapshot version: {version}");
var compressed = data.AsSpan(2);
var json = S2Codec.Decompress(compressed);
return JsonSerializer.Deserialize<Dictionary<string, StreamAssignment>>(json, SerializerOptions)
?? new Dictionary<string, StreamAssignment>();
}
}

View File

@@ -0,0 +1,205 @@
using NATS.Server.JetStream.Cluster;
namespace NATS.Server.Tests.JetStream.Cluster;
/// <summary>
/// Tests for MetaSnapshotCodec: encode/decode round-trip, S2 compression, versioning.
/// Go reference: jetstream_cluster.go:2075-2145.
/// </summary>
public class MetaSnapshotCodecTests
{
[Fact]
public void Encode_decode_round_trips()
{
// Go reference: jetstream_cluster.go encodeMetaSnapshot/decodeMetaSnapshot round-trip
var assignments = new Dictionary<string, StreamAssignment>
{
["stream-A"] = new StreamAssignment
{
StreamName = "stream-A",
Group = new RaftGroup { Name = "rg-a", Peers = ["n1", "n2", "n3"] },
ConfigJson = """{"subjects":["foo.>"]}""",
},
["stream-B"] = new StreamAssignment
{
StreamName = "stream-B",
Group = new RaftGroup { Name = "rg-b", Peers = ["n1", "n2"] },
ConfigJson = """{"subjects":["bar.>"]}""",
},
};
// Add a consumer to stream-B
assignments["stream-B"].Consumers["con-1"] = new ConsumerAssignment
{
ConsumerName = "con-1",
StreamName = "stream-B",
Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2"] },
};
var encoded = MetaSnapshotCodec.Encode(assignments);
encoded.ShouldNotBeEmpty();
var decoded = MetaSnapshotCodec.Decode(encoded);
decoded.Count.ShouldBe(2);
decoded["stream-A"].StreamName.ShouldBe("stream-A");
decoded["stream-A"].Group.Peers.Count.ShouldBe(3);
decoded["stream-B"].Consumers.Count.ShouldBe(1);
decoded["stream-B"].Consumers["con-1"].ConsumerName.ShouldBe("con-1");
}
[Fact]
public void Encoded_snapshot_is_compressed()
{
// Go reference: jetstream_cluster.go S2 compression of meta snapshots
var assignments = new Dictionary<string, StreamAssignment>();
for (int i = 0; i < 100; i++)
{
assignments[$"stream-{i}"] = new StreamAssignment
{
StreamName = $"stream-{i}",
Group = new RaftGroup { Name = $"rg-{i}", Peers = ["n1", "n2", "n3"] },
ConfigJson = """{"subjects":["test.>"]}""",
};
}
var encoded = MetaSnapshotCodec.Encode(assignments);
var json = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(assignments);
// S2 compressed + 2-byte version header should be smaller than raw JSON
encoded.Length.ShouldBeLessThan(json.Length);
}
[Fact]
public void Empty_snapshot_round_trips()
{
// Go reference: jetstream_cluster.go decodeMetaSnapshot handles empty map
var empty = new Dictionary<string, StreamAssignment>();
var encoded = MetaSnapshotCodec.Encode(empty);
var decoded = MetaSnapshotCodec.Decode(encoded);
decoded.ShouldBeEmpty();
}
[Fact]
public void Versioned_format_rejects_unknown_version()
{
// Go reference: jetstream_cluster.go version check in decodeMetaSnapshot
var bad = new byte[] { 0xFF, 0xFF, 0, 0 }; // version 65535
Should.Throw<InvalidOperationException>(() => MetaSnapshotCodec.Decode(bad));
}
[Fact]
public void Decode_rejects_too_short_input()
{
// Go reference: jetstream_cluster.go guard against truncated snapshot
Should.Throw<InvalidOperationException>(() => MetaSnapshotCodec.Decode([0x01]));
}
[Fact]
public void Encoded_snapshot_begins_with_version_one_header()
{
// Go reference: jetstream_cluster.go:2075 — versioned header allows future format evolution
var assignments = new Dictionary<string, StreamAssignment>
{
["s1"] = new StreamAssignment
{
StreamName = "s1",
Group = new RaftGroup { Name = "g1", Peers = ["n1"] },
},
};
var encoded = MetaSnapshotCodec.Encode(assignments);
// Little-endian version 1: bytes [0x01, 0x00]
encoded[0].ShouldBe((byte)0x01);
encoded[1].ShouldBe((byte)0x00);
}
[Fact]
public void Round_trip_preserves_all_stream_assignment_fields()
{
// Go reference: jetstream_cluster.go streamAssignment struct fields preserved across snapshot
var created = new DateTime(2025, 6, 15, 12, 0, 0, DateTimeKind.Utc);
var assignments = new Dictionary<string, StreamAssignment>
{
["my-stream"] = new StreamAssignment
{
StreamName = "my-stream",
Group = new RaftGroup
{
Name = "rg-main",
Peers = ["peer-a", "peer-b", "peer-c"],
StorageType = "memory",
Cluster = "cluster-east",
Preferred = "peer-a",
},
Created = created,
ConfigJson = """{"subjects":["events.>"],"storage":"memory"}""",
SyncSubject = "$JS.SYNC.my-stream",
Responded = true,
Recovering = false,
Reassigning = true,
},
};
var decoded = MetaSnapshotCodec.Decode(MetaSnapshotCodec.Encode(assignments));
var sa = decoded["my-stream"];
sa.StreamName.ShouldBe("my-stream");
sa.Group.Name.ShouldBe("rg-main");
sa.Group.Peers.ShouldBe(["peer-a", "peer-b", "peer-c"]);
sa.Group.StorageType.ShouldBe("memory");
sa.Group.Cluster.ShouldBe("cluster-east");
sa.Group.Preferred.ShouldBe("peer-a");
sa.Created.ShouldBe(created);
sa.ConfigJson.ShouldBe("""{"subjects":["events.>"],"storage":"memory"}""");
sa.SyncSubject.ShouldBe("$JS.SYNC.my-stream");
sa.Responded.ShouldBeTrue();
sa.Recovering.ShouldBeFalse();
sa.Reassigning.ShouldBeTrue();
}
[Fact]
public void Round_trip_preserves_multiple_consumers_per_stream()
{
// Go reference: jetstream_cluster.go consumerAssignment map restored in snapshot
var sa = new StreamAssignment
{
StreamName = "multi-consumer-stream",
Group = new RaftGroup { Name = "rg-mc", Peers = ["n1", "n2", "n3"] },
};
sa.Consumers["consumer-alpha"] = new ConsumerAssignment
{
ConsumerName = "consumer-alpha",
StreamName = "multi-consumer-stream",
Group = new RaftGroup { Name = "rg-alpha", Peers = ["n1"] },
ConfigJson = """{"deliver_subject":"out.alpha"}""",
Responded = true,
};
sa.Consumers["consumer-beta"] = new ConsumerAssignment
{
ConsumerName = "consumer-beta",
StreamName = "multi-consumer-stream",
Group = new RaftGroup { Name = "rg-beta", Peers = ["n2", "n3"] },
Recovering = true,
};
var assignments = new Dictionary<string, StreamAssignment> { ["multi-consumer-stream"] = sa };
var decoded = MetaSnapshotCodec.Decode(MetaSnapshotCodec.Encode(assignments));
var dsa = decoded["multi-consumer-stream"];
dsa.Consumers.Count.ShouldBe(2);
var alpha = dsa.Consumers["consumer-alpha"];
alpha.ConsumerName.ShouldBe("consumer-alpha");
alpha.StreamName.ShouldBe("multi-consumer-stream");
alpha.Group.Name.ShouldBe("rg-alpha");
alpha.ConfigJson.ShouldBe("""{"deliver_subject":"out.alpha"}""");
alpha.Responded.ShouldBeTrue();
var beta = dsa.Consumers["consumer-beta"];
beta.ConsumerName.ShouldBe("consumer-beta");
beta.Group.Peers.Count.ShouldBe(2);
beta.Recovering.ShouldBeTrue();
}
}