Files
natsnet/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamVersioningTests.cs

437 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright 2024-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Mirrors server/jetstream_versioning_test.go in the NATS server Go source.
using Shouldly;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
/// <summary>
/// Unit tests for JetStream API level versioning helpers.
/// Mirrors server/jetstream_versioning_test.go.
/// Tests 18031808 (TestJetStreamMetadataMutations, TestJetStreamMetadataStreamRestoreAndRestart,
/// TestJetStreamMetadataStreamRestoreAndRestartCluster, TestJetStreamApiErrorOnRequiredApiLevel,
/// TestJetStreamApiErrorOnRequiredApiLevelDirectGet, TestJetStreamApiErrorOnRequiredApiLevelPullConsumerNextMsg)
/// all require a running JetStream server and are deferred.
/// </summary>
public sealed class JetStreamVersioningTests
{
// -------------------------------------------------------------------------
// Helpers (mirrors module-level helpers in Go test file)
// -------------------------------------------------------------------------
private static Dictionary<string, string> MetadataAtLevel(string featureLevel) =>
new() { [JetStreamVersioning.JsRequiredLevelMetadataKey] = featureLevel };
private static Dictionary<string, string> MetadataPrevious() =>
new() { [JetStreamVersioning.JsRequiredLevelMetadataKey] = "previous-level" };
// -------------------------------------------------------------------------
// T:1791 — TestGetAndSupportsRequiredApiLevel
// -------------------------------------------------------------------------
[Fact] // T:1791
public void GetAndSupportsRequiredApiLevel_VariousInputs_ReturnsExpected()
{
// getRequiredApiLevel
JetStreamVersioning.GetRequiredApiLevel(null).ShouldBe(string.Empty);
JetStreamVersioning.GetRequiredApiLevel(new Dictionary<string, string>()).ShouldBe(string.Empty);
JetStreamVersioning.GetRequiredApiLevel(MetadataAtLevel("1")).ShouldBe("1");
JetStreamVersioning.GetRequiredApiLevel(MetadataAtLevel("text")).ShouldBe("text");
// supportsRequiredApiLevel
JetStreamVersioning.SupportsRequiredApiLevel(null).ShouldBeTrue();
JetStreamVersioning.SupportsRequiredApiLevel(new Dictionary<string, string>()).ShouldBeTrue();
JetStreamVersioning.SupportsRequiredApiLevel(MetadataAtLevel("1")).ShouldBeTrue();
JetStreamVersioning.SupportsRequiredApiLevel(
MetadataAtLevel(JetStreamVersioning.JsApiLevel.ToString())).ShouldBeTrue();
JetStreamVersioning.SupportsRequiredApiLevel(MetadataAtLevel("text")).ShouldBeFalse();
}
// -------------------------------------------------------------------------
// T:1792 — TestJetStreamSetStaticStreamMetadata
// -------------------------------------------------------------------------
[Fact] // T:1792
public void SetStaticStreamMetadata_VariousConfigs_SetsCorrectApiLevel()
{
var cases = new[]
{
("empty", new StreamConfig(), "0"),
("overwrite-user-provided", new StreamConfig { Metadata = MetadataPrevious() }, "0"),
("AllowMsgTTL", new StreamConfig { AllowMsgTTL = true }, "1"),
("SubjectDeleteMarkerTTL", new StreamConfig { SubjectDeleteMarkerTTL = TimeSpan.FromSeconds(1) }, "1"),
("AllowMsgCounter", new StreamConfig { AllowMsgCounter = true }, "2"),
("AllowAtomicPublish", new StreamConfig { AllowAtomicPublish = true }, "2"),
("AllowMsgSchedules", new StreamConfig { AllowMsgSchedules = true }, "2"),
("AsyncPersistMode", new StreamConfig { PersistMode = PersistModeType.AsyncPersistMode }, "2"),
};
foreach (var (desc, cfg, expectedLevel) in cases)
{
JetStreamVersioning.SetStaticStreamMetadata(cfg);
var level = cfg.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey];
level.ShouldBe(expectedLevel, $"case: {desc}");
// Ensure we do not exceed the server API level.
int.Parse(level).ShouldBeLessThanOrEqualTo(JetStreamVersioning.JsApiLevel,
customMessage: $"case: {desc}");
}
}
// -------------------------------------------------------------------------
// T:1793 — TestJetStreamSetStaticStreamMetadataRemoveDynamicFields
// -------------------------------------------------------------------------
[Fact] // T:1793
public void SetStaticStreamMetadata_RemovesDynamicFields()
{
var cfg = new StreamConfig
{
Metadata = new Dictionary<string, string>
{
[JetStreamVersioning.JsServerVersionMetadataKey] = "dynamic-version",
[JetStreamVersioning.JsServerLevelMetadataKey] = "dynamic-level",
}
};
JetStreamVersioning.SetStaticStreamMetadata(cfg);
cfg.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
cfg.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
cfg.Metadata[JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("0");
}
// -------------------------------------------------------------------------
// T:1794 — TestJetStreamSetDynamicStreamMetadata
// -------------------------------------------------------------------------
[Fact] // T:1794
public void SetDynamicStreamMetadata_DoesNotMutateOriginal_AddsVersionFields()
{
var cfg = new StreamConfig { Metadata = MetadataAtLevel("0") };
var newCfg = JetStreamVersioning.SetDynamicStreamMetadata(cfg);
// Original must NOT have dynamic fields.
cfg.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
cfg.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
// New copy must have dynamic fields.
newCfg.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("0");
newCfg.Metadata[JetStreamVersioning.JsServerVersionMetadataKey].ShouldBe(ServerConstants.Version);
newCfg.Metadata[JetStreamVersioning.JsServerLevelMetadataKey]
.ShouldBe(JetStreamVersioning.JsApiLevel.ToString());
}
// -------------------------------------------------------------------------
// T:1795 — TestJetStreamCopyStreamMetadata
// -------------------------------------------------------------------------
[Fact] // T:1795
public void CopyStreamMetadata_VariousScenarios_CopiesRequiredLevelKey()
{
// no-previous-ignore: when prevCfg is null, key must be absent
var cfg1 = new StreamConfig { Metadata = MetadataAtLevel("-1") };
JetStreamVersioning.CopyStreamMetadata(cfg1, null);
(cfg1.Metadata?.ContainsKey(JetStreamVersioning.JsRequiredLevelMetadataKey) ?? false).ShouldBeFalse();
// nil-previous-metadata-ignore: prevCfg has null Metadata
var cfg2 = new StreamConfig { Metadata = MetadataAtLevel("-1") };
JetStreamVersioning.CopyStreamMetadata(cfg2, new StreamConfig { Metadata = null });
(cfg2.Metadata?.ContainsKey(JetStreamVersioning.JsRequiredLevelMetadataKey) ?? false).ShouldBeFalse();
// nil-current-metadata-ignore: cfg has null Metadata — should not throw
var cfg3 = new StreamConfig { Metadata = null };
JetStreamVersioning.CopyStreamMetadata(cfg3, new StreamConfig { Metadata = MetadataPrevious() });
cfg3.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("previous-level");
// copy-previous: key from prevCfg is copied into cfg
var cfg4 = new StreamConfig { Metadata = MetadataAtLevel("-1") };
JetStreamVersioning.CopyStreamMetadata(cfg4, new StreamConfig { Metadata = MetadataPrevious() });
cfg4.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("previous-level");
// delete-missing-fields: prevCfg has empty metadata dict → key absent in cfg
var cfg5 = new StreamConfig { Metadata = MetadataAtLevel("-1") };
JetStreamVersioning.CopyStreamMetadata(cfg5, new StreamConfig { Metadata = new Dictionary<string, string>() });
(cfg5.Metadata?.ContainsKey(JetStreamVersioning.JsRequiredLevelMetadataKey) ?? false).ShouldBeFalse();
}
// -------------------------------------------------------------------------
// T:1796 — TestJetStreamCopyStreamMetadataRemoveDynamicFields
// -------------------------------------------------------------------------
[Fact] // T:1796
public void CopyStreamMetadata_RemovesDynamicFields()
{
// Copy from null prevCfg — dynamic fields should be removed and key absent.
var cfg = new StreamConfig
{
Metadata = new Dictionary<string, string>
{
[JetStreamVersioning.JsServerVersionMetadataKey] = "dynamic-version",
[JetStreamVersioning.JsServerLevelMetadataKey] = "dynamic-level",
}
};
JetStreamVersioning.CopyStreamMetadata(cfg, null);
cfg.Metadata.ShouldBeNull(); // all entries removed → null'd
// Copy from prevCfg with req-level → dynamic fields removed, req-level preserved.
var cfg2 = new StreamConfig
{
Metadata = new Dictionary<string, string>
{
[JetStreamVersioning.JsServerVersionMetadataKey] = "dynamic-version",
[JetStreamVersioning.JsServerLevelMetadataKey] = "dynamic-level",
}
};
var prev = new StreamConfig { Metadata = MetadataAtLevel("0") };
JetStreamVersioning.CopyStreamMetadata(cfg2, prev);
cfg2.Metadata.ShouldNotBeNull();
cfg2.Metadata!.ShouldNotContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
cfg2.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
cfg2.Metadata[JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("0");
}
// -------------------------------------------------------------------------
// T:1797 — TestJetStreamSetStaticConsumerMetadata
// -------------------------------------------------------------------------
[Fact] // T:1797
public void SetStaticConsumerMetadata_VariousConfigs_SetsCorrectApiLevel()
{
var pauseUntil = new DateTime(1970, 1, 1, 0, 0, 1, DateTimeKind.Utc); // Unix(0, 0) = epoch+1s
var pauseUntilZero = default(DateTime);
var cases = new[]
{
("empty", new ConsumerConfig(), "0"),
("overwrite-user-provided", new ConsumerConfig { Metadata = MetadataPrevious() }, "0"),
("PauseUntil/zero", new ConsumerConfig { PauseUntil = pauseUntilZero }, "0"),
("PauseUntil", new ConsumerConfig { PauseUntil = pauseUntil }, "1"),
("Pinned", new ConsumerConfig { PriorityPolicy = PriorityPolicy.PriorityPinnedClient,
PriorityGroups = new[] { "a" } }, "1"),
};
foreach (var (desc, cfg, expectedLevel) in cases)
{
JetStreamVersioning.SetStaticConsumerMetadata(cfg);
var level = cfg.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey];
level.ShouldBe(expectedLevel, $"case: {desc}");
int.Parse(level).ShouldBeLessThanOrEqualTo(JetStreamVersioning.JsApiLevel,
customMessage: $"case: {desc}");
}
}
// -------------------------------------------------------------------------
// T:1797-extra — SetStaticConsumerMetadata_RequiresLevel1_ForPriorityFeatures
// Missing case: PriorityPolicy != PriorityNone with empty PriorityGroups should
// still require API level 1.
// -------------------------------------------------------------------------
[Theory]
[InlineData("PolicyOnly")]
public void SetStaticConsumerMetadata_RequiresLevel1_ForPriorityFeatures(string desc)
{
// Case: PriorityPolicy != PriorityNone with no groups should still require level 1
var cfg = new ConsumerConfig { PriorityPolicy = PriorityPolicy.PriorityPinnedClient };
JetStreamVersioning.SetStaticConsumerMetadata(cfg);
var level = cfg.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey];
level.ShouldBe("1", $"case: {desc}");
int.Parse(level).ShouldBeLessThanOrEqualTo(JetStreamVersioning.JsApiLevel,
customMessage: $"case: {desc}");
}
// -------------------------------------------------------------------------
// T:1798 — TestJetStreamSetStaticConsumerMetadataRemoveDynamicFields
// -------------------------------------------------------------------------
[Fact] // T:1798
public void SetStaticConsumerMetadata_RemovesDynamicFields()
{
var cfg = new ConsumerConfig
{
Metadata = new Dictionary<string, string>
{
[JetStreamVersioning.JsServerVersionMetadataKey] = "dynamic-version",
[JetStreamVersioning.JsServerLevelMetadataKey] = "dynamic-level",
}
};
JetStreamVersioning.SetStaticConsumerMetadata(cfg);
cfg.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
cfg.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
cfg.Metadata[JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("0");
}
// -------------------------------------------------------------------------
// T:1799 — TestJetStreamSetDynamicConsumerMetadata
// -------------------------------------------------------------------------
[Fact] // T:1799
public void SetDynamicConsumerMetadata_DoesNotMutateOriginal_AddsVersionFields()
{
var cfg = new ConsumerConfig { Metadata = MetadataAtLevel("0") };
var newCfg = JetStreamVersioning.SetDynamicConsumerMetadata(cfg);
// Original must NOT have dynamic fields.
cfg.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
cfg.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
// New copy must have dynamic fields.
newCfg.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("0");
newCfg.Metadata[JetStreamVersioning.JsServerVersionMetadataKey].ShouldBe(ServerConstants.Version);
newCfg.Metadata[JetStreamVersioning.JsServerLevelMetadataKey]
.ShouldBe(JetStreamVersioning.JsApiLevel.ToString());
}
// -------------------------------------------------------------------------
// T:1800 — TestJetStreamSetDynamicConsumerInfoMetadata
// -------------------------------------------------------------------------
[Fact] // T:1800
public void SetDynamicConsumerInfoMetadata_DoesNotMutateOriginal_AddsVersionFields()
{
var ci = new ConsumerInfo { Config = new ConsumerConfig { Metadata = MetadataAtLevel("0") } };
var newCi = JetStreamVersioning.SetDynamicConsumerInfoMetadata(ci);
// Configs must not be reference-equal (we got a new object).
ReferenceEquals(ci, newCi).ShouldBeFalse();
// Original config must NOT have dynamic fields.
ci.Config!.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
ci.Config.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
// New config must have dynamic fields.
newCi.Config!.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("0");
newCi.Config.Metadata[JetStreamVersioning.JsServerVersionMetadataKey].ShouldBe(ServerConstants.Version);
newCi.Config.Metadata[JetStreamVersioning.JsServerLevelMetadataKey]
.ShouldBe(JetStreamVersioning.JsApiLevel.ToString());
}
// -------------------------------------------------------------------------
// T:1801 — TestJetStreamCopyConsumerMetadata
// -------------------------------------------------------------------------
[Fact] // T:1801
public void CopyConsumerMetadata_VariousScenarios_CopiesRequiredLevelKey()
{
// no-previous-ignore
var cfg1 = new ConsumerConfig { Metadata = MetadataAtLevel("-1") };
JetStreamVersioning.CopyConsumerMetadata(cfg1, null);
(cfg1.Metadata?.ContainsKey(JetStreamVersioning.JsRequiredLevelMetadataKey) ?? false).ShouldBeFalse();
// nil-previous-metadata-ignore
var cfg2 = new ConsumerConfig { Metadata = MetadataAtLevel("-1") };
JetStreamVersioning.CopyConsumerMetadata(cfg2, new ConsumerConfig { Metadata = null });
(cfg2.Metadata?.ContainsKey(JetStreamVersioning.JsRequiredLevelMetadataKey) ?? false).ShouldBeFalse();
// nil-current-metadata-ignore
var cfg3 = new ConsumerConfig { Metadata = null };
JetStreamVersioning.CopyConsumerMetadata(cfg3, new ConsumerConfig { Metadata = MetadataPrevious() });
cfg3.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("previous-level");
// copy-previous
var cfg4 = new ConsumerConfig { Metadata = MetadataAtLevel("-1") };
JetStreamVersioning.CopyConsumerMetadata(cfg4, new ConsumerConfig { Metadata = MetadataPrevious() });
cfg4.Metadata![JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("previous-level");
// delete-missing-fields
var cfg5 = new ConsumerConfig { Metadata = MetadataAtLevel("-1") };
JetStreamVersioning.CopyConsumerMetadata(cfg5,
new ConsumerConfig { Metadata = new Dictionary<string, string>() });
(cfg5.Metadata?.ContainsKey(JetStreamVersioning.JsRequiredLevelMetadataKey) ?? false).ShouldBeFalse();
}
// -------------------------------------------------------------------------
// T:1802 — TestJetStreamCopyConsumerMetadataRemoveDynamicFields
// -------------------------------------------------------------------------
[Fact] // T:1802
public void CopyConsumerMetadata_RemovesDynamicFields()
{
// Copy from null prevCfg → dynamic removed, key absent.
var cfg = new ConsumerConfig
{
Metadata = new Dictionary<string, string>
{
[JetStreamVersioning.JsServerVersionMetadataKey] = "dynamic-version",
[JetStreamVersioning.JsServerLevelMetadataKey] = "dynamic-level",
}
};
JetStreamVersioning.CopyConsumerMetadata(cfg, null);
cfg.Metadata.ShouldBeNull();
// Copy from prevCfg with req-level → dynamic removed, req-level preserved.
var cfg2 = new ConsumerConfig
{
Metadata = new Dictionary<string, string>
{
[JetStreamVersioning.JsServerVersionMetadataKey] = "dynamic-version",
[JetStreamVersioning.JsServerLevelMetadataKey] = "dynamic-level",
}
};
var prev = new ConsumerConfig { Metadata = MetadataAtLevel("0") };
JetStreamVersioning.CopyConsumerMetadata(cfg2, prev);
cfg2.Metadata.ShouldNotBeNull();
cfg2.Metadata!.ShouldNotContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
cfg2.Metadata.ShouldNotContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
cfg2.Metadata[JetStreamVersioning.JsRequiredLevelMetadataKey].ShouldBe("0");
}
// -------------------------------------------------------------------------
// T:1803 — TestJetStreamMetadataMutations — deferred: requires RunBasicJetStreamServer
// -------------------------------------------------------------------------
[Fact(Skip = "deferred: requires running JetStream server")] // T:1803
public void JetStreamMetadataMutations_RequiresRunningServer() { }
// -------------------------------------------------------------------------
// T:1804 — TestJetStreamMetadataStreamRestoreAndRestart — deferred
// -------------------------------------------------------------------------
[Fact(Skip = "deferred: requires running JetStream server")] // T:1804
public void JetStreamMetadataStreamRestoreAndRestart_RequiresRunningServer() { }
// -------------------------------------------------------------------------
// T:1805 — TestJetStreamMetadataStreamRestoreAndRestartCluster — deferred
// -------------------------------------------------------------------------
[Fact(Skip = "deferred: requires running JetStream cluster")] // T:1805
public void JetStreamMetadataStreamRestoreAndRestartCluster_RequiresRunningServer() { }
// -------------------------------------------------------------------------
// T:1806 — TestJetStreamApiErrorOnRequiredApiLevel — deferred
// -------------------------------------------------------------------------
[Fact(Skip = "deferred: requires running JetStream server")] // T:1806
public void JetStreamApiErrorOnRequiredApiLevel_RequiresRunningServer() { }
// -------------------------------------------------------------------------
// T:1807 — TestJetStreamApiErrorOnRequiredApiLevelDirectGet — deferred
// -------------------------------------------------------------------------
[Fact(Skip = "deferred: requires running JetStream server")] // T:1807
public void JetStreamApiErrorOnRequiredApiLevelDirectGet_RequiresRunningServer() { }
// -------------------------------------------------------------------------
// T:1808 — TestJetStreamApiErrorOnRequiredApiLevelPullConsumerNextMsg — deferred
// -------------------------------------------------------------------------
[Fact(Skip = "deferred: requires running JetStream server")] // T:1808
public void JetStreamApiErrorOnRequiredApiLevelPullConsumerNextMsg_RequiresRunningServer() { }
}