From 59085ba9ea3e9a0455c999817aa1612281f9198b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 11:53:44 -0500 Subject: [PATCH] feat(batch8): implement store enum parity group B --- .../JetStream/StoreParity.cs | 242 ++++++++++++++++++ .../JetStream/StoreTypes.cs | 6 + .../JetStream/StoreTypesTests.cs | 99 +++++++ porting.db | Bin 6516736 -> 6524928 bytes 4 files changed, 347 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs index 69e6681..900d58a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs @@ -1,4 +1,6 @@ using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -274,3 +276,243 @@ public static class StoreParity => _set.Range(f); } } + +public static class StoreEnumParityExtensions +{ + public static string String(this RetentionPolicy value) + => value switch + { + RetentionPolicy.LimitsPolicy => "Limits", + RetentionPolicy.InterestPolicy => "Interest", + RetentionPolicy.WorkQueuePolicy => "WorkQueue", + _ => "Unknown Retention Policy", + }; + + public static string String(this DiscardPolicy value) + => value switch + { + DiscardPolicy.DiscardOld => "DiscardOld", + DiscardPolicy.DiscardNew => "DiscardNew", + _ => "Unknown Discard Policy", + }; + + public static string String(this StorageType value) + => value switch + { + StorageType.MemoryStorage => "Memory", + StorageType.FileStorage => "File", + _ => "Unknown Storage Type", + }; +} + +public sealed class RetentionPolicyJsonConverter : JsonConverter +{ + public override RetentionPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "limits" => RetentionPolicy.LimitsPolicy, + "interest" => RetentionPolicy.InterestPolicy, + "workqueue" => RetentionPolicy.WorkQueuePolicy, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, RetentionPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case RetentionPolicy.LimitsPolicy: + writer.WriteStringValue("limits"); + break; + case RetentionPolicy.InterestPolicy: + writer.WriteStringValue("interest"); + break; + case RetentionPolicy.WorkQueuePolicy: + writer.WriteStringValue("workqueue"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class DiscardPolicyJsonConverter : JsonConverter +{ + public override DiscardPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + var token = reader.GetString() ?? string.Empty; + return token.ToLowerInvariant() switch + { + "old" => DiscardPolicy.DiscardOld, + "new" => DiscardPolicy.DiscardNew, + _ => throw new JsonException($"can not unmarshal \"{token}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, DiscardPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case DiscardPolicy.DiscardOld: + writer.WriteStringValue("old"); + break; + case DiscardPolicy.DiscardNew: + writer.WriteStringValue("new"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class StorageTypeJsonConverter : JsonConverter +{ + public override StorageType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "memory" => StorageType.MemoryStorage, + "file" => StorageType.FileStorage, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, StorageType value, JsonSerializerOptions options) + { + switch (value) + { + case StorageType.MemoryStorage: + writer.WriteStringValue("memory"); + break; + case StorageType.FileStorage: + writer.WriteStringValue("file"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class AckPolicyJsonConverter : JsonConverter +{ + public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "none" => AckPolicy.AckNone, + "all" => AckPolicy.AckAll, + "explicit" => AckPolicy.AckExplicit, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, AckPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case AckPolicy.AckNone: + writer.WriteStringValue("none"); + break; + case AckPolicy.AckAll: + writer.WriteStringValue("all"); + break; + case AckPolicy.AckExplicit: + writer.WriteStringValue("explicit"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class ReplayPolicyJsonConverter : JsonConverter +{ + public override ReplayPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "instant" => ReplayPolicy.ReplayInstant, + "original" => ReplayPolicy.ReplayOriginal, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, ReplayPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case ReplayPolicy.ReplayInstant: + writer.WriteStringValue("instant"); + break; + case ReplayPolicy.ReplayOriginal: + writer.WriteStringValue("original"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class DeliverPolicyJsonConverter : JsonConverter +{ + public override DeliverPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "all" or "undefined" => DeliverPolicy.DeliverAll, + "last" => DeliverPolicy.DeliverLast, + "last_per_subject" => DeliverPolicy.DeliverLastPerSubject, + "new" => DeliverPolicy.DeliverNew, + "by_start_sequence" => DeliverPolicy.DeliverByStartSequence, + "by_start_time" => DeliverPolicy.DeliverByStartTime, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, DeliverPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case DeliverPolicy.DeliverAll: + writer.WriteStringValue("all"); + break; + case DeliverPolicy.DeliverLast: + writer.WriteStringValue("last"); + break; + case DeliverPolicy.DeliverLastPerSubject: + writer.WriteStringValue("last_per_subject"); + break; + case DeliverPolicy.DeliverNew: + writer.WriteStringValue("new"); + break; + case DeliverPolicy.DeliverByStartSequence: + writer.WriteStringValue("by_start_sequence"); + break; + case DeliverPolicy.DeliverByStartTime: + writer.WriteStringValue("by_start_time"); + break; + default: + writer.WriteStringValue("undefined"); + break; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index 0783c52..c6fbb00 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -23,6 +23,7 @@ namespace ZB.MOM.NatsNet.Server; // --------------------------------------------------------------------------- /// Determines how messages are stored for retention. +[JsonConverter(typeof(StorageTypeJsonConverter))] public enum StorageType { /// On disk, designated by the JetStream config StoreDir. @@ -228,6 +229,7 @@ public interface IStreamStore // --------------------------------------------------------------------------- /// Determines how messages in a stream are retained. +[JsonConverter(typeof(RetentionPolicyJsonConverter))] public enum RetentionPolicy { /// Messages are retained until any given limit is reached. @@ -245,6 +247,7 @@ public enum RetentionPolicy // --------------------------------------------------------------------------- /// Determines how the store proceeds when message or byte limits are hit. +[JsonConverter(typeof(DiscardPolicyJsonConverter))] public enum DiscardPolicy { /// Remove older messages to return to the limits. @@ -542,6 +545,7 @@ public sealed class ConsumerState // --------------------------------------------------------------------------- /// Determines how the consumer should acknowledge delivered messages. +[JsonConverter(typeof(AckPolicyJsonConverter))] public enum AckPolicy { /// No acks required for delivered messages. @@ -559,6 +563,7 @@ public enum AckPolicy // --------------------------------------------------------------------------- /// Determines how the consumer replays messages already queued in the stream. +[JsonConverter(typeof(ReplayPolicyJsonConverter))] public enum ReplayPolicy { /// Replay messages as fast as possible. @@ -573,6 +578,7 @@ public enum ReplayPolicy // --------------------------------------------------------------------------- /// Determines how the consumer selects the first message to deliver. +[JsonConverter(typeof(DeliverPolicyJsonConverter))] public enum DeliverPolicy { /// Deliver all messages (default). diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs index 5dfbedd..9a369e0 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs @@ -1,4 +1,5 @@ using System.Text; +using System.Text.Json; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -223,6 +224,104 @@ public class StoreTypesTests StoreParity.IsPermissionError(null).ShouldBeFalse(); } + [Fact] + public void RetentionPolicy_StringParity_ReturnsExpectedText() + { + RetentionPolicy.LimitsPolicy.String().ShouldBe("Limits"); + RetentionPolicy.InterestPolicy.String().ShouldBe("Interest"); + RetentionPolicy.WorkQueuePolicy.String().ShouldBe("WorkQueue"); + ((RetentionPolicy)99).String().ShouldBe("Unknown Retention Policy"); + } + + [Fact] + public void RetentionPolicy_JsonParity_RoundTripsExpectedTokens() + { + JsonSerializer.Serialize(RetentionPolicy.LimitsPolicy).ShouldBe("\"limits\""); + JsonSerializer.Serialize(RetentionPolicy.InterestPolicy).ShouldBe("\"interest\""); + JsonSerializer.Serialize(RetentionPolicy.WorkQueuePolicy).ShouldBe("\"workqueue\""); + + JsonSerializer.Deserialize("\"limits\"").ShouldBe(RetentionPolicy.LimitsPolicy); + JsonSerializer.Deserialize("\"interest\"").ShouldBe(RetentionPolicy.InterestPolicy); + JsonSerializer.Deserialize("\"workqueue\"").ShouldBe(RetentionPolicy.WorkQueuePolicy); + } + + [Fact] + public void RetentionPolicy_UnmarshalInvalid_Throws() + { + Should.Throw(() => JsonSerializer.Deserialize("\"bogus\"")); + } + + [Fact] + public void DiscardPolicy_StringAndJsonParity_MatchesGo() + { + DiscardPolicy.DiscardOld.String().ShouldBe("DiscardOld"); + DiscardPolicy.DiscardNew.String().ShouldBe("DiscardNew"); + ((DiscardPolicy)99).String().ShouldBe("Unknown Discard Policy"); + + JsonSerializer.Serialize(DiscardPolicy.DiscardOld).ShouldBe("\"old\""); + JsonSerializer.Serialize(DiscardPolicy.DiscardNew).ShouldBe("\"new\""); + JsonSerializer.Deserialize("\"OLD\"").ShouldBe(DiscardPolicy.DiscardOld); + JsonSerializer.Deserialize("\"new\"").ShouldBe(DiscardPolicy.DiscardNew); + } + + [Fact] + public void StorageType_StringAndJsonParity_MatchesGo() + { + StorageType.MemoryStorage.String().ShouldBe("Memory"); + StorageType.FileStorage.String().ShouldBe("File"); + ((StorageType)99).String().ShouldBe("Unknown Storage Type"); + + JsonSerializer.Serialize(StorageType.MemoryStorage).ShouldBe("\"memory\""); + JsonSerializer.Serialize(StorageType.FileStorage).ShouldBe("\"file\""); + JsonSerializer.Deserialize("\"memory\"").ShouldBe(StorageType.MemoryStorage); + JsonSerializer.Deserialize("\"file\"").ShouldBe(StorageType.FileStorage); + } + + [Fact] + public void AckPolicy_JsonParity_MatchesGo() + { + JsonSerializer.Serialize(AckPolicy.AckNone).ShouldBe("\"none\""); + JsonSerializer.Serialize(AckPolicy.AckAll).ShouldBe("\"all\""); + JsonSerializer.Serialize(AckPolicy.AckExplicit).ShouldBe("\"explicit\""); + + JsonSerializer.Deserialize("\"none\"").ShouldBe(AckPolicy.AckNone); + JsonSerializer.Deserialize("\"all\"").ShouldBe(AckPolicy.AckAll); + JsonSerializer.Deserialize("\"explicit\"").ShouldBe(AckPolicy.AckExplicit); + Should.Throw(() => JsonSerializer.Deserialize("\"bad\"")); + } + + [Fact] + public void ReplayPolicy_JsonParity_MatchesGo() + { + JsonSerializer.Serialize(ReplayPolicy.ReplayInstant).ShouldBe("\"instant\""); + JsonSerializer.Serialize(ReplayPolicy.ReplayOriginal).ShouldBe("\"original\""); + + JsonSerializer.Deserialize("\"instant\"").ShouldBe(ReplayPolicy.ReplayInstant); + JsonSerializer.Deserialize("\"original\"").ShouldBe(ReplayPolicy.ReplayOriginal); + Should.Throw(() => JsonSerializer.Deserialize("\"bad\"")); + } + + [Fact] + public void DeliverPolicy_JsonParity_MapsUndefinedToAll() + { + JsonSerializer.Serialize(DeliverPolicy.DeliverAll).ShouldBe("\"all\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverLast).ShouldBe("\"last\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverLastPerSubject).ShouldBe("\"last_per_subject\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverNew).ShouldBe("\"new\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverByStartSequence).ShouldBe("\"by_start_sequence\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverByStartTime).ShouldBe("\"by_start_time\""); + JsonSerializer.Serialize((DeliverPolicy)99).ShouldBe("\"undefined\""); + + JsonSerializer.Deserialize("\"all\"").ShouldBe(DeliverPolicy.DeliverAll); + JsonSerializer.Deserialize("\"undefined\"").ShouldBe(DeliverPolicy.DeliverAll); + JsonSerializer.Deserialize("\"last\"").ShouldBe(DeliverPolicy.DeliverLast); + JsonSerializer.Deserialize("\"last_per_subject\"").ShouldBe(DeliverPolicy.DeliverLastPerSubject); + JsonSerializer.Deserialize("\"new\"").ShouldBe(DeliverPolicy.DeliverNew); + JsonSerializer.Deserialize("\"by_start_sequence\"").ShouldBe(DeliverPolicy.DeliverByStartSequence); + JsonSerializer.Deserialize("\"by_start_time\"").ShouldBe(DeliverPolicy.DeliverByStartTime); + Should.Throw(() => JsonSerializer.Deserialize("\"bad\"")); + } + private static void AppendUVarInt(List buffer, ulong value) { while (value >= 0x80) diff --git a/porting.db b/porting.db index 5110e5779319a6da9a76a7e8a6f6bfd4a258e0c8..4484a1ed67b3903d7eece0d6e537a968e6068c91 100644 GIT binary patch delta 6133 zcmchbYiwKP8OP7Xcb{V?Aqi;{rzdF+O|NO=rg4(e(sV7fv?YmiX-LvMiG6%Y;!A87 z=q|KgrMDGpfCjBEbZ9C}+6EJ)sfG^X!=!zgwrPV-8%T_9Z8bDBHZiIRu?y1n#HNld zy_2>h;SYc7`Jd-KzyGn{_Z+|WshbPpsY_3aPuH8n9B1h|uk7Z-OR&35u(zCkp-q3G zjbC$>zHy6h)Bk#l{}q2y7Ck~B5lhZ#8BI!A-{dY9+&sE-1(!tkc)=ymB?~T&Zo1%N z=mraJ4&DBOn?>gqK}*tIu1xhxTq`@ z;Uef(aA9;SxDdJ(oQBRtKQ>p?>W&5BMy7ixg$}m z+H+L|tJYi<#;P$_g|Mp5RT@@~Tot6XmxaMy(mZ~!c}5beJR^Zso)O0?&xm1_XUt)h zXUyu~zbuS5*&da9@R+5GO4o!BP4XVONB`$FVUHx?LEmCMzIHBJ^!tA&yql7Q1^(CO z+vY>2XT{&s{?+^|^xZ!3M!o3c>MJ?k+qF^l3VcoHS}*7I+O&8ek)?-*#0mQK7ex~ts&7#Zw`qK^h#5$+1#fu z7(veth)1aVx7JnkCBGX}H2~H@#?W z;b`9_;U22o=LY&DRJ71*m6f$rxy{@3t57j$p&!2uGJWYf_)f-`@BT-$R5TRDJ0Mo# z=1SabiJK{L(GnLaap4jdDsh^t#0Kf6>#*-kPox4>iK;@~g{nr?plVTfqv}wrP^(e( zs0Ne=)reYyQcz8(X4G1}C(`oE4_eLtG9NZQC*BtK3XA-Ed=nRQPq=>Qs+PAq-*9|p zf7NcW@zx>B#c%GJ-bLRYQ;*WBjW{s}$JLSx$cF zhSkx0?By}M32Y;!SJg9u+^Vp-k-j=6I_T>orDy+XM18QR{n3DGp+6f{i|-@->`^r) z=>{w6z?5$B!2JXsAaH=dK>`mF zI7Hwufg=PSBG5;mpTGbCKY>94Lj;Bij1V|VV3fea1jY!A6PO@yjKCxTATUK>`T_(@ z-a0PGxw~8``3KVblE*phc*_1$J8z3y-?KhodCk&ne#-QjDJs5$YvzjJ;h*3><>;;3 zrp--(ym6d~flC_4i40uAI8I>T;>K}20~a%n z;~2O(!?^P&8Ms;FxW^f|8RNLGF>q1ixUVvB5#zY8FmPexxGytsA>+6c44h^h_ZR~g zEXHm4-=8Qt9}WE5(k|ontA@M5x%auAkl&K4rBUa%9e;4N+GDn>)=#ZFEm`wV%uds& z_&wnhVK0B4yOG;^)n&IV*5|Lrlij`Ye2dclM4*T7rSo%vM$c3{sDx9A<4Gl>r87!+ zDx={eE}d2$jbWm=az7c^K2;87-KbT|6a@yT*)`N-N9DKzl5s#naJ5 zoG!gHUA<;U_wvMhvz?38jNJJ$$?R1+mwV-dkVyUw)Z+%+sTwKxCCuQuLcgx~h z25jCateF8z8ih46UTTB#Z6bJB``F3V^cE7z)gjR47(2S5}t*NA1dr4FKXQyuA;g_8McfL70 zJ7;)moDI*NViW0dR1lQXGnv(5^h3S6K(hTla-qPuP#~_Kp)2!Z9^IK2731E#__cW4 zGA*p@=Txo!^f<8xD-%Cvy)N`W?(w5Y?TK^*DU?Wuk#;B25K>tp z4O;WA3;BsSfM()lcn4DAWw>s1Er=gmy{&?<;!X?F|96^=io4>qbK(~y4@<{0_!9q~ z-{4pHC;TF7Wj_J9YPZYA{dc4v)T%*Ov#@>ngb$!9idP+|a^sbbDkEOCqFNKLT2RGj@McP# zmO?umd+z2WO8zwQ65aB#J8tz$;j<(`EH2;dESAI+`mK?L$^DIW zEnRNY929F}C6=O!qH_8!$XZDH$l|5xASGLqlHXFZ&nNDZDvX}ZItc(hKIH#L_c971NDe*HmH3wLZd3ij* z0xKFk_}`S#nFs?5$e@4~Y@mW29KgW|E^r$&5l=w1oU=X@$TuZzqRbk%PUE^~qcN*{ zLJHN)B`?v^P2EZ5xAn5o$}SHi!4r@SYaj(u;YoN3)S?U|CX!m@4$B_tc%Xlgmr0GL%NGz+$Ad zYCntgR!D0!R}kM#T3E}Y%&ZJf^Hkr*aoFg2IWuzm9QPEULV$S#Z Z0{1#1_S5QBRW!2nmPMN(zS{9f`4=DC`u6|;