diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs index 69e6681..900d58a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs @@ -1,4 +1,6 @@ using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -274,3 +276,243 @@ public static class StoreParity => _set.Range(f); } } + +public static class StoreEnumParityExtensions +{ + public static string String(this RetentionPolicy value) + => value switch + { + RetentionPolicy.LimitsPolicy => "Limits", + RetentionPolicy.InterestPolicy => "Interest", + RetentionPolicy.WorkQueuePolicy => "WorkQueue", + _ => "Unknown Retention Policy", + }; + + public static string String(this DiscardPolicy value) + => value switch + { + DiscardPolicy.DiscardOld => "DiscardOld", + DiscardPolicy.DiscardNew => "DiscardNew", + _ => "Unknown Discard Policy", + }; + + public static string String(this StorageType value) + => value switch + { + StorageType.MemoryStorage => "Memory", + StorageType.FileStorage => "File", + _ => "Unknown Storage Type", + }; +} + +public sealed class RetentionPolicyJsonConverter : JsonConverter +{ + public override RetentionPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "limits" => RetentionPolicy.LimitsPolicy, + "interest" => RetentionPolicy.InterestPolicy, + "workqueue" => RetentionPolicy.WorkQueuePolicy, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, RetentionPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case RetentionPolicy.LimitsPolicy: + writer.WriteStringValue("limits"); + break; + case RetentionPolicy.InterestPolicy: + writer.WriteStringValue("interest"); + break; + case RetentionPolicy.WorkQueuePolicy: + writer.WriteStringValue("workqueue"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class DiscardPolicyJsonConverter : JsonConverter +{ + public override DiscardPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + var token = reader.GetString() ?? string.Empty; + return token.ToLowerInvariant() switch + { + "old" => DiscardPolicy.DiscardOld, + "new" => DiscardPolicy.DiscardNew, + _ => throw new JsonException($"can not unmarshal \"{token}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, DiscardPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case DiscardPolicy.DiscardOld: + writer.WriteStringValue("old"); + break; + case DiscardPolicy.DiscardNew: + writer.WriteStringValue("new"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class StorageTypeJsonConverter : JsonConverter +{ + public override StorageType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "memory" => StorageType.MemoryStorage, + "file" => StorageType.FileStorage, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, StorageType value, JsonSerializerOptions options) + { + switch (value) + { + case StorageType.MemoryStorage: + writer.WriteStringValue("memory"); + break; + case StorageType.FileStorage: + writer.WriteStringValue("file"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class AckPolicyJsonConverter : JsonConverter +{ + public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "none" => AckPolicy.AckNone, + "all" => AckPolicy.AckAll, + "explicit" => AckPolicy.AckExplicit, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, AckPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case AckPolicy.AckNone: + writer.WriteStringValue("none"); + break; + case AckPolicy.AckAll: + writer.WriteStringValue("all"); + break; + case AckPolicy.AckExplicit: + writer.WriteStringValue("explicit"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class ReplayPolicyJsonConverter : JsonConverter +{ + public override ReplayPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "instant" => ReplayPolicy.ReplayInstant, + "original" => ReplayPolicy.ReplayOriginal, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, ReplayPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case ReplayPolicy.ReplayInstant: + writer.WriteStringValue("instant"); + break; + case ReplayPolicy.ReplayOriginal: + writer.WriteStringValue("original"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + +public sealed class DeliverPolicyJsonConverter : JsonConverter +{ + public override DeliverPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "all" or "undefined" => DeliverPolicy.DeliverAll, + "last" => DeliverPolicy.DeliverLast, + "last_per_subject" => DeliverPolicy.DeliverLastPerSubject, + "new" => DeliverPolicy.DeliverNew, + "by_start_sequence" => DeliverPolicy.DeliverByStartSequence, + "by_start_time" => DeliverPolicy.DeliverByStartTime, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, DeliverPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case DeliverPolicy.DeliverAll: + writer.WriteStringValue("all"); + break; + case DeliverPolicy.DeliverLast: + writer.WriteStringValue("last"); + break; + case DeliverPolicy.DeliverLastPerSubject: + writer.WriteStringValue("last_per_subject"); + break; + case DeliverPolicy.DeliverNew: + writer.WriteStringValue("new"); + break; + case DeliverPolicy.DeliverByStartSequence: + writer.WriteStringValue("by_start_sequence"); + break; + case DeliverPolicy.DeliverByStartTime: + writer.WriteStringValue("by_start_time"); + break; + default: + writer.WriteStringValue("undefined"); + break; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index 0783c52..c6fbb00 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -23,6 +23,7 @@ namespace ZB.MOM.NatsNet.Server; // --------------------------------------------------------------------------- /// Determines how messages are stored for retention. +[JsonConverter(typeof(StorageTypeJsonConverter))] public enum StorageType { /// On disk, designated by the JetStream config StoreDir. @@ -228,6 +229,7 @@ public interface IStreamStore // --------------------------------------------------------------------------- /// Determines how messages in a stream are retained. +[JsonConverter(typeof(RetentionPolicyJsonConverter))] public enum RetentionPolicy { /// Messages are retained until any given limit is reached. @@ -245,6 +247,7 @@ public enum RetentionPolicy // --------------------------------------------------------------------------- /// Determines how the store proceeds when message or byte limits are hit. +[JsonConverter(typeof(DiscardPolicyJsonConverter))] public enum DiscardPolicy { /// Remove older messages to return to the limits. @@ -542,6 +545,7 @@ public sealed class ConsumerState // --------------------------------------------------------------------------- /// Determines how the consumer should acknowledge delivered messages. +[JsonConverter(typeof(AckPolicyJsonConverter))] public enum AckPolicy { /// No acks required for delivered messages. @@ -559,6 +563,7 @@ public enum AckPolicy // --------------------------------------------------------------------------- /// Determines how the consumer replays messages already queued in the stream. +[JsonConverter(typeof(ReplayPolicyJsonConverter))] public enum ReplayPolicy { /// Replay messages as fast as possible. @@ -573,6 +578,7 @@ public enum ReplayPolicy // --------------------------------------------------------------------------- /// Determines how the consumer selects the first message to deliver. +[JsonConverter(typeof(DeliverPolicyJsonConverter))] public enum DeliverPolicy { /// Deliver all messages (default). diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs index 5dfbedd..9a369e0 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs @@ -1,4 +1,5 @@ using System.Text; +using System.Text.Json; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -223,6 +224,104 @@ public class StoreTypesTests StoreParity.IsPermissionError(null).ShouldBeFalse(); } + [Fact] + public void RetentionPolicy_StringParity_ReturnsExpectedText() + { + RetentionPolicy.LimitsPolicy.String().ShouldBe("Limits"); + RetentionPolicy.InterestPolicy.String().ShouldBe("Interest"); + RetentionPolicy.WorkQueuePolicy.String().ShouldBe("WorkQueue"); + ((RetentionPolicy)99).String().ShouldBe("Unknown Retention Policy"); + } + + [Fact] + public void RetentionPolicy_JsonParity_RoundTripsExpectedTokens() + { + JsonSerializer.Serialize(RetentionPolicy.LimitsPolicy).ShouldBe("\"limits\""); + JsonSerializer.Serialize(RetentionPolicy.InterestPolicy).ShouldBe("\"interest\""); + JsonSerializer.Serialize(RetentionPolicy.WorkQueuePolicy).ShouldBe("\"workqueue\""); + + JsonSerializer.Deserialize("\"limits\"").ShouldBe(RetentionPolicy.LimitsPolicy); + JsonSerializer.Deserialize("\"interest\"").ShouldBe(RetentionPolicy.InterestPolicy); + JsonSerializer.Deserialize("\"workqueue\"").ShouldBe(RetentionPolicy.WorkQueuePolicy); + } + + [Fact] + public void RetentionPolicy_UnmarshalInvalid_Throws() + { + Should.Throw(() => JsonSerializer.Deserialize("\"bogus\"")); + } + + [Fact] + public void DiscardPolicy_StringAndJsonParity_MatchesGo() + { + DiscardPolicy.DiscardOld.String().ShouldBe("DiscardOld"); + DiscardPolicy.DiscardNew.String().ShouldBe("DiscardNew"); + ((DiscardPolicy)99).String().ShouldBe("Unknown Discard Policy"); + + JsonSerializer.Serialize(DiscardPolicy.DiscardOld).ShouldBe("\"old\""); + JsonSerializer.Serialize(DiscardPolicy.DiscardNew).ShouldBe("\"new\""); + JsonSerializer.Deserialize("\"OLD\"").ShouldBe(DiscardPolicy.DiscardOld); + JsonSerializer.Deserialize("\"new\"").ShouldBe(DiscardPolicy.DiscardNew); + } + + [Fact] + public void StorageType_StringAndJsonParity_MatchesGo() + { + StorageType.MemoryStorage.String().ShouldBe("Memory"); + StorageType.FileStorage.String().ShouldBe("File"); + ((StorageType)99).String().ShouldBe("Unknown Storage Type"); + + JsonSerializer.Serialize(StorageType.MemoryStorage).ShouldBe("\"memory\""); + JsonSerializer.Serialize(StorageType.FileStorage).ShouldBe("\"file\""); + JsonSerializer.Deserialize("\"memory\"").ShouldBe(StorageType.MemoryStorage); + JsonSerializer.Deserialize("\"file\"").ShouldBe(StorageType.FileStorage); + } + + [Fact] + public void AckPolicy_JsonParity_MatchesGo() + { + JsonSerializer.Serialize(AckPolicy.AckNone).ShouldBe("\"none\""); + JsonSerializer.Serialize(AckPolicy.AckAll).ShouldBe("\"all\""); + JsonSerializer.Serialize(AckPolicy.AckExplicit).ShouldBe("\"explicit\""); + + JsonSerializer.Deserialize("\"none\"").ShouldBe(AckPolicy.AckNone); + JsonSerializer.Deserialize("\"all\"").ShouldBe(AckPolicy.AckAll); + JsonSerializer.Deserialize("\"explicit\"").ShouldBe(AckPolicy.AckExplicit); + Should.Throw(() => JsonSerializer.Deserialize("\"bad\"")); + } + + [Fact] + public void ReplayPolicy_JsonParity_MatchesGo() + { + JsonSerializer.Serialize(ReplayPolicy.ReplayInstant).ShouldBe("\"instant\""); + JsonSerializer.Serialize(ReplayPolicy.ReplayOriginal).ShouldBe("\"original\""); + + JsonSerializer.Deserialize("\"instant\"").ShouldBe(ReplayPolicy.ReplayInstant); + JsonSerializer.Deserialize("\"original\"").ShouldBe(ReplayPolicy.ReplayOriginal); + Should.Throw(() => JsonSerializer.Deserialize("\"bad\"")); + } + + [Fact] + public void DeliverPolicy_JsonParity_MapsUndefinedToAll() + { + JsonSerializer.Serialize(DeliverPolicy.DeliverAll).ShouldBe("\"all\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverLast).ShouldBe("\"last\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverLastPerSubject).ShouldBe("\"last_per_subject\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverNew).ShouldBe("\"new\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverByStartSequence).ShouldBe("\"by_start_sequence\""); + JsonSerializer.Serialize(DeliverPolicy.DeliverByStartTime).ShouldBe("\"by_start_time\""); + JsonSerializer.Serialize((DeliverPolicy)99).ShouldBe("\"undefined\""); + + JsonSerializer.Deserialize("\"all\"").ShouldBe(DeliverPolicy.DeliverAll); + JsonSerializer.Deserialize("\"undefined\"").ShouldBe(DeliverPolicy.DeliverAll); + JsonSerializer.Deserialize("\"last\"").ShouldBe(DeliverPolicy.DeliverLast); + JsonSerializer.Deserialize("\"last_per_subject\"").ShouldBe(DeliverPolicy.DeliverLastPerSubject); + JsonSerializer.Deserialize("\"new\"").ShouldBe(DeliverPolicy.DeliverNew); + JsonSerializer.Deserialize("\"by_start_sequence\"").ShouldBe(DeliverPolicy.DeliverByStartSequence); + JsonSerializer.Deserialize("\"by_start_time\"").ShouldBe(DeliverPolicy.DeliverByStartTime); + Should.Throw(() => JsonSerializer.Deserialize("\"bad\"")); + } + private static void AppendUVarInt(List buffer, ulong value) { while (value >= 0x80) diff --git a/porting.db b/porting.db index 5110e57..4484a1e 100644 Binary files a/porting.db and b/porting.db differ