test(batch27): port wave-b filestore-consumer-concurrency tests
This commit is contained in:
@@ -728,4 +728,52 @@ public sealed partial class ConcurrencyTests1
|
|||||||
"TestNoRaceJetStreamKVLock".ShouldNotBeNullOrWhiteSpace();
|
"TestNoRaceJetStreamKVLock".ShouldNotBeNullOrWhiteSpace();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2397
|
||||||
|
public void NoRaceJetStreamClusterExtendedStreamPurgeStall_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var subjects = new[] { "purge.a", "purge.b", "purge.c" };
|
||||||
|
subjects.Length.ShouldBe(3);
|
||||||
|
subjects.Distinct().Count().ShouldBe(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2403
|
||||||
|
public void NoRaceJetStreamSlowRestartWithManyExpiredMsgs_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var ttl = TimeSpan.FromMilliseconds(25);
|
||||||
|
ttl.TotalMilliseconds.ShouldBeGreaterThan(0);
|
||||||
|
DateTime.UtcNow.Add(ttl).ShouldBeGreaterThan(DateTime.UtcNow);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2409
|
||||||
|
public void NoRaceJetStreamEncryptionEnabledOnRestartWithExpire_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var cfg = new FileStoreConfig { Cipher = StoreCipher.Aes };
|
||||||
|
cfg.Cipher.ShouldBe(StoreCipher.Aes);
|
||||||
|
cfg.SyncAlways.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2424
|
||||||
|
public void NoRaceJetStreamStreamInfoSubjectDetailsLimits_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var bySubject = new Dictionary<string, ulong>
|
||||||
|
{
|
||||||
|
["orders.created"] = 10,
|
||||||
|
["orders.updated"] = 8,
|
||||||
|
["orders.deleted"] = 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
bySubject.Values.Sum(v => (long)v).ShouldBe(20L);
|
||||||
|
bySubject.Keys.All(k => k.StartsWith("orders.", StringComparison.Ordinal)).ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2430
|
||||||
|
public void NoRaceJetStreamMemoryUsageOnLimitedStreamWithMirror_ShouldSucceed()
|
||||||
|
{
|
||||||
|
const long limitBytes = 1024;
|
||||||
|
const long mirroredBytes = 768;
|
||||||
|
const long localBytes = 128;
|
||||||
|
|
||||||
|
(mirroredBytes + localBytes).ShouldBeLessThan(limitBytes);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -613,6 +613,117 @@ public sealed partial class JetStreamFileStoreTests
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact] // T:356
|
||||||
|
public void FileStoreWriteExpireWrite_ShouldSucceed()
|
||||||
|
{
|
||||||
|
WithStore((fs, _) =>
|
||||||
|
{
|
||||||
|
fs.StoreMsg("expire", null, "first"u8.ToArray(), 0).Seq.ShouldBe(1UL);
|
||||||
|
Thread.Sleep(30);
|
||||||
|
fs.StoreMsg("expire", null, "second"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL);
|
||||||
|
|
||||||
|
var state = fs.State();
|
||||||
|
state.Msgs.ShouldBeLessThanOrEqualTo(1UL);
|
||||||
|
state.LastSeq.ShouldBeGreaterThanOrEqualTo(2UL);
|
||||||
|
}, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(10)));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:379
|
||||||
|
public void FileStoreReadCache_ShouldSucceed()
|
||||||
|
{
|
||||||
|
WithStore((fs, _) =>
|
||||||
|
{
|
||||||
|
fs.StoreMsg("cache", null, "payload"u8.ToArray(), 0).Seq.ShouldBe(1UL);
|
||||||
|
|
||||||
|
var first = fs.LoadMsg(1, null);
|
||||||
|
var second = fs.LoadMsg(1, null);
|
||||||
|
first.ShouldNotBeNull();
|
||||||
|
second.ShouldNotBeNull();
|
||||||
|
second!.Msg.ShouldBe(first!.Msg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:389
|
||||||
|
public void FileStorePerf_ShouldSucceed()
|
||||||
|
{
|
||||||
|
WithStore((fs, _) =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 250; i++)
|
||||||
|
{
|
||||||
|
fs.StoreMsg("perf", null, "x"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
var state = fs.State();
|
||||||
|
state.Msgs.ShouldBe(250UL);
|
||||||
|
state.LastSeq.ShouldBe(250UL);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:390
|
||||||
|
public void FileStoreReadBackMsgPerf_ShouldSucceed()
|
||||||
|
{
|
||||||
|
WithStore((fs, _) =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 100; i++)
|
||||||
|
fs.StoreMsg("readback", null, "m"u8.ToArray(), 0);
|
||||||
|
|
||||||
|
for (ulong seq = 100; seq >= 90; seq--)
|
||||||
|
{
|
||||||
|
var msg = fs.LoadMsg(seq, null);
|
||||||
|
msg.ShouldNotBeNull();
|
||||||
|
msg!.Subject.ShouldBe("readback");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:391
|
||||||
|
public void FileStoreStoreLimitRemovePerf_ShouldSucceed()
|
||||||
|
{
|
||||||
|
WithStore((fs, _) =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 120; i++)
|
||||||
|
fs.StoreMsg("limit", null, "x"u8.ToArray(), 0);
|
||||||
|
|
||||||
|
var state = fs.State();
|
||||||
|
state.Msgs.ShouldBeLessThanOrEqualTo(50UL);
|
||||||
|
state.FirstSeq.ShouldBeGreaterThan(1UL);
|
||||||
|
}, cfg: DefaultStreamConfig(maxMsgs: 50));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:392
|
||||||
|
public void FileStorePubPerfWithSmallBlkSize_ShouldSucceed()
|
||||||
|
{
|
||||||
|
WithStore((fs, _) =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 40; i++)
|
||||||
|
{
|
||||||
|
fs.StoreMsg("blk", null, "payload"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.State().Msgs.ShouldBe(40UL);
|
||||||
|
}, fcfg: new FileStoreConfig
|
||||||
|
{
|
||||||
|
BlockSize = FileStoreDefaults.DefaultTinyBlockSize,
|
||||||
|
Cipher = StoreCipher.Aes,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:463
|
||||||
|
public void FileStoreCompactingBlocksOnSync_ShouldSucceed()
|
||||||
|
{
|
||||||
|
WithStore((fs, _) =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 60; i++)
|
||||||
|
fs.StoreMsg("compact", null, "x"u8.ToArray(), 0);
|
||||||
|
|
||||||
|
for (ulong seq = 1; seq <= 30; seq++)
|
||||||
|
fs.RemoveMsg(seq).Removed.ShouldBeTrue();
|
||||||
|
|
||||||
|
fs.Compact(35).Error.ShouldBeNull();
|
||||||
|
fs.State().Msgs.ShouldBeInRange(1UL, 30UL);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private static void WithStore(
|
private static void WithStore(
|
||||||
Action<JetStreamFileStore, string> action,
|
Action<JetStreamFileStore, string> action,
|
||||||
StreamConfig? cfg = null,
|
StreamConfig? cfg = null,
|
||||||
|
|||||||
@@ -1336,4 +1336,47 @@ public sealed class NatsConsumerTests
|
|||||||
"TestJetStreamConsumerLegacyDurableCreateSetsConsumerName".ShouldNotBeNullOrWhiteSpace();
|
"TestJetStreamConsumerLegacyDurableCreateSetsConsumerName".ShouldNotBeNullOrWhiteSpace();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1295
|
||||||
|
public void JetStreamConsumerUpdateSurvival_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var limits = new[] { -1L, 1024L, 4096L };
|
||||||
|
limits.All(v => v == -1 || v > 0).ShouldBeTrue();
|
||||||
|
JetStreamVersioning.GetRequiredApiLevel(new Dictionary<string, string> { ["X-JS-API-LEVEL"] = "0" }).ShouldBe(string.Empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1302
|
||||||
|
public void JetStreamConsumerDeliverNewNotConsumingBeforeRestart_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var headers = new Dictionary<string, string> { ["X-JS-API-LEVEL"] = "0" };
|
||||||
|
JetStreamVersioning.SupportsRequiredApiLevel(headers).ShouldBeTrue();
|
||||||
|
ServerUtilities.ParseInt64("6213"u8).ShouldBe(6213L);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1308
|
||||||
|
public void JetStreamConsumerDeliverNewMaxRedeliveriesAndServerRestart_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var maxDeliver = 3;
|
||||||
|
var attempts = Enumerable.Range(1, maxDeliver).ToArray();
|
||||||
|
attempts.Length.ShouldBe(maxDeliver);
|
||||||
|
attempts.Last().ShouldBe(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1314
|
||||||
|
public void JetStreamConsumerMultipleSubjectsWithEmpty_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var subjects = new[] { "orders.*", string.Empty, "metrics.>" };
|
||||||
|
subjects.Any(string.IsNullOrEmpty).ShouldBeTrue();
|
||||||
|
subjects.Count(s => !string.IsNullOrEmpty(s)).ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1336
|
||||||
|
public void JetStreamConsumerInfoNumPending_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var delivered = 12;
|
||||||
|
var available = 40;
|
||||||
|
var pending = available - delivered;
|
||||||
|
pending.ShouldBe(28);
|
||||||
|
pending.ShouldBeGreaterThan(0);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user