1235 lines
40 KiB
C#
1235 lines
40 KiB
C#
// Copyright 2019-2026 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
// Mirrors server/memstore_test.go
|
|
|
|
using System.Text;
|
|
using Shouldly;
|
|
using ZB.MOM.NatsNet.Server;
|
|
|
|
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
|
|
|
|
/// <summary>
|
|
/// Unit tests for the in-memory JetStream message store.
|
|
/// Mirrors server/memstore_test.go.
|
|
/// </summary>
|
|
public class JetStreamMemoryStoreTests
|
|
{
|
|
// -----------------------------------------------------------------------
|
|
// Helpers
|
|
// -----------------------------------------------------------------------
|
|
|
|
private static JetStreamMemStore NewMemStore(StreamConfig? cfg = null)
|
|
{
|
|
cfg ??= new StreamConfig { Storage = StorageType.MemoryStorage, Name = "test" };
|
|
return JetStreamMemStore.NewMemStore(cfg);
|
|
}
|
|
|
|
private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s);
|
|
|
|
/// <summary>Computes the expected stored-message size: subj.len + hdr.len + msg.len + 16.</summary>
|
|
private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg)
|
|
=> (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16);
|
|
|
|
[Fact]
|
|
public void NewMemStore_FactoryMethod_InitializesFirstSequence()
|
|
{
|
|
var method = typeof(JetStreamMemStore).GetMethod(
|
|
"NewMemStore",
|
|
System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic);
|
|
method.ShouldNotBeNull();
|
|
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "factory-init",
|
|
Storage = StorageType.MemoryStorage,
|
|
FirstSeq = 1000,
|
|
};
|
|
|
|
var created = method!.Invoke(null, new object?[] { cfg });
|
|
created.ShouldBeOfType<JetStreamMemStore>();
|
|
|
|
var ms = (JetStreamMemStore)created!;
|
|
var state = ms.State();
|
|
state.FirstSeq.ShouldBe(1000UL);
|
|
state.LastSeq.ShouldBe(999UL);
|
|
ms.Stop();
|
|
}
|
|
|
|
[Fact]
|
|
public void AllLastSeqsLocked_MatchesPublicAllLastSeqsOrdering()
|
|
{
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "locked-all-last-seqs",
|
|
Subjects = new[] { "*.*" },
|
|
MaxMsgsPer = 50,
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" };
|
|
var msg = Bytes("abc");
|
|
var rng = new Random(11);
|
|
|
|
for (var i = 0; i < 1_000; i++)
|
|
{
|
|
var subj = subjs[rng.Next(subjs.Length)];
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
}
|
|
|
|
var method = typeof(JetStreamMemStore).GetMethod(
|
|
"AllLastSeqsLocked",
|
|
System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
|
|
method.ShouldNotBeNull();
|
|
|
|
var result = method!.Invoke(ms, Array.Empty<object>());
|
|
result.ShouldNotBeNull();
|
|
var (lockedSeqs, lockedErr) = ((ValueTuple<ulong[], Exception?>)result!);
|
|
|
|
var (publicSeqs, publicErr) = ms.AllLastSeqs();
|
|
|
|
lockedErr.ShouldBeNull();
|
|
publicErr.ShouldBeNull();
|
|
lockedSeqs.SequenceEqual(publicSeqs).ShouldBeTrue();
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreBasics (T:2023)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2023
|
|
public void MemStoreBasics_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBasics line 32
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
ms.Stop();
|
|
|
|
ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
var subj = "foo";
|
|
var msg = Bytes("Hello World");
|
|
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
|
|
|
|
var (seq, ts) = ms.StoreMsg(subj, null, msg, 0);
|
|
seq.ShouldBe(1UL);
|
|
ts.ShouldBeGreaterThanOrEqualTo(now);
|
|
|
|
var state = ms.State();
|
|
state.Msgs.ShouldBe(1UL);
|
|
|
|
var expectedSize = MsgSize(subj, null, msg);
|
|
state.Bytes.ShouldBe(expectedSize);
|
|
|
|
var sm = ms.LoadMsg(1, null);
|
|
sm.ShouldNotBeNull();
|
|
sm!.Subject.ShouldBe(subj);
|
|
sm.Msg.ShouldBe(msg);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreMsgLimit (T:2024)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2024
|
|
public void MemStoreMsgLimit_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMsgLimit line 67
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxMsgs = 10 });
|
|
|
|
var subj = "foo";
|
|
var msg = Bytes("Hello World");
|
|
for (var i = 0; i < 10; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
var state = ms.State();
|
|
state.Msgs.ShouldBe(10UL);
|
|
|
|
// Adding one more should still keep 10 (oldest evicted)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
state = ms.State();
|
|
state.Msgs.ShouldBe(10UL);
|
|
state.LastSeq.ShouldBe(11UL);
|
|
state.FirstSeq.ShouldBe(2UL);
|
|
|
|
// Seq 1 should not be loadable
|
|
Should.Throw<Exception>(() => ms.LoadMsg(1, null));
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreBytesLimit (T:2025)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2025
|
|
public void MemStoreBytesLimit_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBytesLimit line 99
|
|
var subj = "foo";
|
|
var msg = new byte[512];
|
|
var storedMsgSize = MsgSize(subj, null, msg);
|
|
|
|
var toStore = 1024UL;
|
|
var maxBytes = storedMsgSize * toStore;
|
|
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxBytes = (long)maxBytes });
|
|
|
|
for (var i = 0UL; i < toStore; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
var state = ms.State();
|
|
state.Msgs.ShouldBe(toStore);
|
|
state.Bytes.ShouldBe(storedMsgSize * toStore);
|
|
|
|
// Now send 10 more and check that bytes limit enforced
|
|
for (var i = 0; i < 10; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
state = ms.State();
|
|
state.Msgs.ShouldBe(toStore);
|
|
state.Bytes.ShouldBe(storedMsgSize * toStore);
|
|
state.FirstSeq.ShouldBe(11UL);
|
|
state.LastSeq.ShouldBe(toStore + 10UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreBytesLimitWithDiscardNew (T:2026)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2026
|
|
public void MemStoreBytesLimitWithDiscardNew_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBytesLimitWithDiscardNew line 143
|
|
var subj = "tiny";
|
|
var msg = new byte[7];
|
|
var storedMsgSize = MsgSize(subj, null, msg);
|
|
var toStore = 3UL;
|
|
var maxBytes = 100;
|
|
|
|
var ms = NewMemStore(new StreamConfig
|
|
{
|
|
Storage = StorageType.MemoryStorage,
|
|
MaxBytes = maxBytes,
|
|
Discard = DiscardPolicy.DiscardNew
|
|
});
|
|
|
|
// Send 10 messages; first toStore succeed, the rest hit ErrMaxBytes
|
|
for (var i = 0; i < 10; i++)
|
|
{
|
|
var (seq, _) = ms.StoreMsg(subj, null, msg, 0);
|
|
if (i < (int)toStore)
|
|
seq.ShouldNotBe(0UL); // success
|
|
// After toStore, StoreMsg returns (0, 0) on error
|
|
}
|
|
|
|
var state = ms.State();
|
|
state.Msgs.ShouldBe(toStore);
|
|
state.Bytes.ShouldBe(storedMsgSize * toStore);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreAgeLimit (T:2027)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2027
|
|
public async Task MemStoreAgeLimit_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreAgeLimit line 174
|
|
var maxAge = TimeSpan.FromMilliseconds(100);
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxAge = maxAge });
|
|
|
|
var subj = "foo";
|
|
var msg = Bytes("Hello World");
|
|
for (var i = 0; i < 100; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
var state = ms.State();
|
|
state.Msgs.ShouldBe(100UL);
|
|
|
|
// Wait for messages to expire
|
|
await Task.Delay(TimeSpan.FromMilliseconds(500));
|
|
|
|
// Check messages expired
|
|
var deadline = DateTime.UtcNow.AddSeconds(2);
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
state = ms.State();
|
|
if (state.Msgs == 0 && state.Bytes == 0) break;
|
|
await Task.Delay(10);
|
|
}
|
|
|
|
state = ms.State();
|
|
state.Msgs.ShouldBe(0UL);
|
|
state.Bytes.ShouldBe(0UL);
|
|
|
|
// Now add some more and make sure that timer will fire again
|
|
for (var i = 0; i < 100; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
state = ms.State();
|
|
state.Msgs.ShouldBe(100UL);
|
|
|
|
await Task.Delay(TimeSpan.FromMilliseconds(500));
|
|
|
|
deadline = DateTime.UtcNow.AddSeconds(2);
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
state = ms.State();
|
|
if (state.Msgs == 0 && state.Bytes == 0) break;
|
|
await Task.Delay(10);
|
|
}
|
|
|
|
state = ms.State();
|
|
state.Msgs.ShouldBe(0UL);
|
|
state.Bytes.ShouldBe(0UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreTimeStamps (T:2028)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2028
|
|
public async Task MemStoreTimeStamps_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreTimeStamps line 216
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
|
|
var last = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
|
|
var subj = "foo";
|
|
var msg = Bytes("Hello World");
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
{
|
|
await Task.Delay(5); // small delay to ensure distinct timestamps
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
}
|
|
|
|
var smv = new StoreMsg();
|
|
for (var seq = 1UL; seq <= 10UL; seq++)
|
|
{
|
|
var sm = ms.LoadMsg(seq, smv);
|
|
sm.ShouldNotBeNull();
|
|
sm!.Ts.ShouldBeGreaterThan(last);
|
|
last = sm.Ts;
|
|
}
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStorePurge (T:2029)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2029
|
|
public void MemStorePurge_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurge line 241
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
|
|
var subj = "foo";
|
|
var msg = Bytes("Hello World");
|
|
for (var i = 0; i < 10; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
ms.State().Msgs.ShouldBe(10UL);
|
|
|
|
ms.Purge();
|
|
|
|
ms.State().Msgs.ShouldBe(0UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreCompact (T:2030)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2030
|
|
public void MemStoreCompact_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreCompact line 259
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
|
|
var subj = "foo";
|
|
var msg = Bytes("Hello World");
|
|
for (var i = 0; i < 10; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
ms.State().Msgs.ShouldBe(10UL);
|
|
|
|
var (n, err) = ms.Compact(6);
|
|
err.ShouldBeNull();
|
|
n.ShouldBe(5UL);
|
|
|
|
var state = ms.State();
|
|
state.Msgs.ShouldBe(5UL);
|
|
state.FirstSeq.ShouldBe(6UL);
|
|
|
|
// Compact past end should purge
|
|
(n, err) = ms.Compact(100);
|
|
err.ShouldBeNull();
|
|
n.ShouldBe(5UL);
|
|
|
|
state = ms.State();
|
|
state.FirstSeq.ShouldBe(100UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreEraseMsg (T:2031)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2031
|
|
public void MemStoreEraseMsg_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreEraseMsg line 298
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
|
|
var subj = "foo";
|
|
var msg = Bytes("Hello World");
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
var sm = ms.LoadMsg(1, null);
|
|
sm.ShouldNotBeNull();
|
|
sm!.Msg.ShouldBe(msg);
|
|
|
|
var (removed, _) = ms.EraseMsg(1);
|
|
removed.ShouldBeTrue();
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreMsgHeaders (T:2032)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2032
|
|
public void MemStoreMsgHeaders_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMsgHeaders line 317
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
|
|
var subj = "foo";
|
|
var hdr = Bytes("name:derek");
|
|
var msg = Bytes("Hello World");
|
|
|
|
// Verify computed size
|
|
var sz = (int)MsgSize(subj, hdr, msg);
|
|
sz.ShouldBe(subj.Length + hdr.Length + msg.Length + 16);
|
|
|
|
ms.StoreMsg(subj, hdr, msg, 0);
|
|
|
|
var sm = ms.LoadMsg(1, null);
|
|
sm.ShouldNotBeNull();
|
|
sm!.Msg.ShouldBe(msg);
|
|
sm.Hdr.ShouldBe(hdr);
|
|
|
|
var (removed, _) = ms.EraseMsg(1);
|
|
removed.ShouldBeTrue();
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreStreamStateDeleted (T:2033)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2033
|
|
public void MemStoreStreamStateDeleted_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamStateDeleted line 342
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
|
|
var subj = "foo";
|
|
var toStore = 10UL;
|
|
for (var i = 1UL; i <= toStore; i++)
|
|
{
|
|
var m = Encoding.UTF8.GetBytes($"[{i:D8}] Hello World!");
|
|
ms.StoreMsg(subj, null, m, 0);
|
|
}
|
|
|
|
var state = ms.State();
|
|
state.Deleted.ShouldBeNull();
|
|
|
|
// Remove some interior messages (even seqs: 2,4,6,8)
|
|
var expected = new List<ulong>();
|
|
for (var seq = 2UL; seq < toStore; seq += 2)
|
|
{
|
|
ms.RemoveMsg(seq);
|
|
expected.Add(seq);
|
|
}
|
|
|
|
state = ms.State();
|
|
state.Deleted.ShouldNotBeNull();
|
|
state.Deleted!.ShouldBe(expected.ToArray());
|
|
|
|
// Now fill the gap by deleting 1 and 3
|
|
ms.RemoveMsg(1);
|
|
ms.RemoveMsg(3);
|
|
expected.RemoveRange(0, 2); // remove 2 and 4 (the first two)
|
|
state = ms.State();
|
|
state.Deleted!.ShouldBe(expected.ToArray());
|
|
state.FirstSeq.ShouldBe(5UL);
|
|
|
|
ms.Purge();
|
|
state = ms.State();
|
|
(state.Deleted?.Length ?? 0).ShouldBe(0);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreStreamTruncate (T:2034)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2034
|
|
public void MemStoreStreamTruncate_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamTruncate line 385
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
|
|
var tseq = 50UL;
|
|
var toStore = 100UL;
|
|
|
|
for (var i = 1UL; i < tseq; i++)
|
|
ms.StoreMsg("foo", null, Bytes("ok"), 0);
|
|
for (var i = tseq; i <= toStore; i++)
|
|
ms.StoreMsg("bar", null, Bytes("ok"), 0);
|
|
|
|
ms.State().Msgs.ShouldBe(toStore);
|
|
|
|
ms.Truncate(tseq);
|
|
ms.State().Msgs.ShouldBe(tseq);
|
|
|
|
// Now test with interior deletes
|
|
ms.RemoveMsg(10);
|
|
ms.RemoveMsg(20);
|
|
ms.RemoveMsg(30);
|
|
ms.RemoveMsg(40);
|
|
|
|
tseq = 25UL;
|
|
ms.Truncate(tseq);
|
|
|
|
var state = ms.State();
|
|
state.Msgs.ShouldBe(tseq - 2); // 10 and 20 deleted
|
|
state.NumSubjects.ShouldBe(1);
|
|
state.Deleted.ShouldBe(new ulong[] { 10, 20 });
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStorePurgeExWithSubject (T:2035)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2035
|
|
public void MemStorePurgeExWithSubject_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurgeExWithSubject line 437
|
|
var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage });
|
|
|
|
for (var i = 0; i < 100; i++)
|
|
ms.StoreMsg("foo", null, null, 0);
|
|
|
|
// Purge all with subject "foo"
|
|
ms.PurgeEx("foo", 1, 0);
|
|
ms.State().Msgs.ShouldBe(0UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreUpdateMaxMsgsPerSubject (T:2036)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2036
|
|
public void MemStoreUpdateMaxMsgsPerSubject_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreUpdateMaxMsgsPerSubject line 452
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Storage = StorageType.MemoryStorage,
|
|
Subjects = new[] { "foo" },
|
|
MaxMsgsPer = 10,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
// Increase limit
|
|
cfg.MaxMsgsPer = 50;
|
|
ms.UpdateConfig(cfg);
|
|
|
|
var numStored = 22;
|
|
for (var i = 0; i < numStored; i++)
|
|
ms.StoreMsg("foo", null, null, 0);
|
|
|
|
var ss = ms.SubjectsState("foo")["foo"];
|
|
ss.Msgs.ShouldBe((ulong)numStored);
|
|
|
|
// Decrease limit to 10 — should trim to 10
|
|
cfg.MaxMsgsPer = 10;
|
|
ms.UpdateConfig(cfg);
|
|
|
|
ss = ms.SubjectsState("foo")["foo"];
|
|
ss.Msgs.ShouldBe(10UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreStreamTruncateReset (T:2037)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2037
|
|
public void MemStoreStreamTruncateReset_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamTruncateReset line 490
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Storage = StorageType.MemoryStorage,
|
|
Subjects = new[] { "foo" },
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var subj = "foo";
|
|
var msg = Bytes("Hello World");
|
|
for (var i = 0; i < 1000; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
// Reset everything via Truncate(0)
|
|
ms.Truncate(0);
|
|
|
|
var state = ms.State();
|
|
state.Msgs.ShouldBe(0UL);
|
|
state.Bytes.ShouldBe(0UL);
|
|
state.FirstSeq.ShouldBe(0UL);
|
|
state.LastSeq.ShouldBe(0UL);
|
|
state.NumSubjects.ShouldBe(0);
|
|
state.NumDeleted.ShouldBe(0);
|
|
|
|
// Store 1000 more after reset
|
|
for (var i = 0; i < 1000; i++)
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
|
|
state = ms.State();
|
|
state.Msgs.ShouldBe(1000UL);
|
|
state.FirstSeq.ShouldBe(1UL);
|
|
state.LastSeq.ShouldBe(1000UL);
|
|
state.NumSubjects.ShouldBe(1);
|
|
state.NumDeleted.ShouldBe(0);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreStreamCompactMultiBlockSubjectInfo (T:2038)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2038
|
|
public void MemStoreStreamCompactMultiBlockSubjectInfo_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamCompactMultiBlockSubjectInfo line 531
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Storage = StorageType.MemoryStorage,
|
|
Subjects = new[] { "foo.*" },
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
for (var i = 0; i < 1000; i++)
|
|
ms.StoreMsg($"foo.{i}", null, Bytes("Hello World"), 0);
|
|
|
|
var (deleted, err) = ms.Compact(501);
|
|
err.ShouldBeNull();
|
|
deleted.ShouldBe(500UL);
|
|
|
|
var state = ms.State();
|
|
state.NumSubjects.ShouldBe(500);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreSubjectsTotals (T:2039)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2039
|
|
public void MemStoreSubjectsTotals_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSubjectsTotals line 557
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Storage = StorageType.MemoryStorage,
|
|
Subjects = new[] { "*.*" },
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var rng = new Random(42);
|
|
var fmap = new Dictionary<int, int>();
|
|
var bmap = new Dictionary<int, int>();
|
|
|
|
for (var i = 0; i < 10_000; i++)
|
|
{
|
|
string ft;
|
|
Dictionary<int, int> m;
|
|
if (rng.Next(2) == 0)
|
|
{
|
|
ft = "foo"; m = fmap;
|
|
}
|
|
else
|
|
{
|
|
ft = "bar"; m = bmap;
|
|
}
|
|
var dt = rng.Next(100);
|
|
var subj = $"{ft}.{dt}";
|
|
m[dt] = m.GetValueOrDefault(dt) + 1;
|
|
ms.StoreMsg(subj, null, Bytes("Hello World"), 0);
|
|
}
|
|
|
|
// Check each individual subject in fmap
|
|
foreach (var (dt, total) in fmap)
|
|
{
|
|
var subj = $"foo.{dt}";
|
|
var m = ms.SubjectsTotals(subj);
|
|
m[subj].ShouldBe((ulong)total);
|
|
}
|
|
|
|
// Check fmap wildcard totals
|
|
var st = ms.SubjectsTotals("foo.*");
|
|
st.Count.ShouldBe(fmap.Count);
|
|
var expectedTotal = fmap.Values.Sum();
|
|
var receivedTotal = st.Values.Select(v => (long)v).Sum();
|
|
receivedTotal.ShouldBe(expectedTotal);
|
|
|
|
// Check bmap wildcard totals
|
|
st = ms.SubjectsTotals("bar.*");
|
|
st.Count.ShouldBe(bmap.Count);
|
|
expectedTotal = bmap.Values.Sum();
|
|
receivedTotal = st.Values.Select(v => (long)v).Sum();
|
|
receivedTotal.ShouldBe(expectedTotal);
|
|
|
|
// All with pwc match
|
|
st = ms.SubjectsTotals("*.*");
|
|
st.Count.ShouldBe(bmap.Count + fmap.Count);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreNumPending (T:2040)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2040
|
|
public void MemStoreNumPending_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreNumPending line 637
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Storage = StorageType.MemoryStorage,
|
|
Subjects = new[] { "*.*.*.*" },
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var tokens = new[] { "foo", "bar", "baz" };
|
|
var rng = new Random(99);
|
|
string GenSubj() =>
|
|
$"{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}";
|
|
|
|
for (var i = 0; i < 5_000; i++)
|
|
{
|
|
var subj = GenSubj();
|
|
ms.StoreMsg(subj, null, Bytes("Hello World"), 0);
|
|
}
|
|
|
|
var state = ms.State();
|
|
|
|
// Scan one by one for sanity check
|
|
SimpleState SanityCheck(ulong sseq, string filter)
|
|
{
|
|
var ss = new SimpleState();
|
|
var smv = new StoreMsg();
|
|
if (sseq == 0) sseq = 1;
|
|
for (var seq = sseq; seq <= state.LastSeq; seq++)
|
|
{
|
|
try
|
|
{
|
|
var sm = ms.LoadMsg(seq, smv);
|
|
if (sm != null && SubjectIsSubsetMatch(sm.Subject, filter))
|
|
{
|
|
ss.Msgs++;
|
|
ss.Last = seq;
|
|
if (ss.First == 0 || seq < ss.First) ss.First = seq;
|
|
}
|
|
}
|
|
catch { /* skip deleted */ }
|
|
}
|
|
return ss;
|
|
}
|
|
|
|
void Check(ulong sseq, string filter)
|
|
{
|
|
var (np, lvs, err) = ms.NumPending(sseq, filter, false);
|
|
err.ShouldBeNull();
|
|
lvs.ShouldBe(state.LastSeq);
|
|
var ss = ms.FilteredState(sseq, filter);
|
|
var sss = SanityCheck(sseq, filter);
|
|
ss.Msgs.ShouldBe(np);
|
|
ss.Msgs.ShouldBe(sss.Msgs);
|
|
}
|
|
|
|
var startSeqs = new ulong[] { 0, 1, 2, 200, 444, 555, 2222 };
|
|
var checkSubs = new[] { "foo.>", "*.bar.>", "foo.bar.*.baz" };
|
|
|
|
foreach (var filter in checkSubs)
|
|
foreach (var start in startSeqs)
|
|
Check(start, filter);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreInitialFirstSeq (T:2041)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2041
|
|
public void MemStoreInitialFirstSeq_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreInitialFirstSeq line 765
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "zzz",
|
|
Storage = StorageType.MemoryStorage,
|
|
FirstSeq = 1000,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var (seq, _, err) = (ms.StoreMsg("A", null, Bytes("OK"), 0).Seq, 0L, (Exception?)null);
|
|
seq.ShouldBe(1000UL);
|
|
|
|
var (seq2, _) = ms.StoreMsg("B", null, Bytes("OK"), 0);
|
|
seq2.ShouldBe(1001UL);
|
|
|
|
var state = new StreamState();
|
|
ms.FastState(state);
|
|
state.Msgs.ShouldBe(2UL);
|
|
state.FirstSeq.ShouldBe(1000UL);
|
|
state.LastSeq.ShouldBe(1001UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreGetSeqFromTimeWithLastDeleted (T:2043)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2043
|
|
public async Task MemStoreGetSeqFromTimeWithLastDeleted_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreGetSeqFromTimeWithLastDeleted line 839
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "zzz",
|
|
Subjects = new[] { "*" },
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var total = 100;
|
|
DateTime st = default;
|
|
for (var i = 1; i <= total; i++)
|
|
{
|
|
ms.StoreMsg("A", null, Bytes("OK"), 0);
|
|
if (i == total / 2)
|
|
{
|
|
await Task.Delay(50);
|
|
st = DateTime.UtcNow;
|
|
}
|
|
}
|
|
|
|
// Delete last 10
|
|
for (var seq = total - 10; seq <= total; seq++)
|
|
ms.RemoveMsg((ulong)seq);
|
|
|
|
// Make sure this does not panic with last sequence no longer accessible
|
|
var gotSeq = ms.GetSeqFromTime(st);
|
|
// Should return around 51 (the halfway point)
|
|
gotSeq.ShouldBeGreaterThanOrEqualTo(49UL);
|
|
gotSeq.ShouldBeLessThanOrEqualTo(53UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreSkipMsgs (T:2044)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2044
|
|
public void MemStoreSkipMsgs_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSkipMsgs line 871
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "zzz",
|
|
Subjects = new[] { "*" },
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
// Wrong starting sequence must fail
|
|
Should.Throw<Exception>(() => ms.SkipMsgs(10, 100));
|
|
|
|
ms.SkipMsgs(1, 100);
|
|
|
|
var state = ms.State();
|
|
state.FirstSeq.ShouldBe(101UL);
|
|
state.LastSeq.ShouldBe(100UL);
|
|
|
|
// Now skip a large amount
|
|
ms.SkipMsgs(101, 100_000);
|
|
state = ms.State();
|
|
state.FirstSeq.ShouldBe(100_101UL);
|
|
state.LastSeq.ShouldBe(100_100UL);
|
|
|
|
// New store: add a message then skip
|
|
ms = NewMemStore(cfg);
|
|
ms.StoreMsg("foo", null, null, 0);
|
|
|
|
ms.SkipMsgs(2, 10);
|
|
state = ms.State();
|
|
state.FirstSeq.ShouldBe(1UL);
|
|
state.LastSeq.ShouldBe(11UL);
|
|
state.Msgs.ShouldBe(1UL);
|
|
state.NumDeleted.ShouldBe(10);
|
|
state.Deleted!.Length.ShouldBe(10);
|
|
|
|
// Check FastState
|
|
state.Deleted = null;
|
|
ms.FastState(state);
|
|
state.FirstSeq.ShouldBe(1UL);
|
|
state.LastSeq.ShouldBe(11UL);
|
|
state.Msgs.ShouldBe(1UL);
|
|
state.NumDeleted.ShouldBe(10);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreMultiLastSeqs (T:2045)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2045
|
|
public void MemStoreMultiLastSeqs_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMultiLastSeqs line 923
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "zzz",
|
|
Subjects = new[] { "foo.*", "bar.*" },
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var msg = Bytes("abc");
|
|
for (var i = 0; i < 33; i++)
|
|
{
|
|
ms.StoreMsg("foo.foo", null, msg, 0);
|
|
ms.StoreMsg("foo.bar", null, msg, 0);
|
|
ms.StoreMsg("foo.baz", null, msg, 0);
|
|
}
|
|
for (var i = 0; i < 33; i++)
|
|
{
|
|
ms.StoreMsg("bar.foo", null, msg, 0);
|
|
ms.StoreMsg("bar.bar", null, msg, 0);
|
|
ms.StoreMsg("bar.baz", null, msg, 0);
|
|
}
|
|
|
|
void CheckResults(ulong[] seqs, ulong[] expected)
|
|
{
|
|
seqs.Length.ShouldBe(expected.Length);
|
|
for (var i = 0; i < seqs.Length; i++)
|
|
seqs[i].ShouldBe(expected[i]);
|
|
}
|
|
|
|
// UpTo sequence 3
|
|
var (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 3, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 1, 2, 3 });
|
|
|
|
// Up to last sequence of the stream
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 0, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 97, 98, 99 });
|
|
|
|
// Check for bar.* at the end
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "bar.*" }, 0, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 196, 197, 198 });
|
|
|
|
// This should find nothing
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "bar.*" }, 99, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, Array.Empty<ulong>());
|
|
|
|
// Explicit subjects
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo", "foo.bar", "foo.baz" }, 3, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 1, 2, 3 });
|
|
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo", "foo.bar", "foo.baz" }, 0, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 97, 98, 99 });
|
|
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "bar.foo", "bar.bar", "bar.baz" }, 0, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 196, 197, 198 });
|
|
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "bar.foo", "bar.bar", "bar.baz" }, 99, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, Array.Empty<ulong>());
|
|
|
|
// Single subject
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo" }, 3, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 1 });
|
|
|
|
// De-duplicate
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "foo.*", "foo.bar" }, 3, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 1, 2, 3 });
|
|
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { "bar.>", "bar.bar", "bar.baz" }, 0, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 196, 197, 198 });
|
|
|
|
// All
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { ">" }, 0, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 97, 98, 99, 196, 197, 198 });
|
|
|
|
(seqs, err) = ms.MultiLastSeqs(new[] { ">" }, 99, -1);
|
|
err.ShouldBeNull();
|
|
CheckResults(seqs!, new ulong[] { 97, 98, 99 });
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreMultiLastSeqsMaxAllowed (T:2046)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2046
|
|
public void MemStoreMultiLastSeqsMaxAllowed_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMultiLastSeqsMaxAllowed line 1010
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "zzz",
|
|
Subjects = new[] { "foo.*" },
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var msg = Bytes("abc");
|
|
for (var i = 1; i <= 100; i++)
|
|
ms.StoreMsg($"foo.{i}", null, msg, 0);
|
|
|
|
var (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 0, 10);
|
|
seqs.ShouldBeNull();
|
|
err.ShouldNotBeNull();
|
|
err!.Message.ShouldContain("too many");
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStorePurgeExWithDeletedMsgs (T:2047)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2047
|
|
public void MemStorePurgeExWithDeletedMsgs_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurgeExWithDeletedMsgs line 1031
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "zzz",
|
|
Subjects = new[] { "foo" },
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var msg = Bytes("abc");
|
|
for (var i = 1; i <= 10; i++)
|
|
ms.StoreMsg("foo", null, msg, 0);
|
|
|
|
ms.RemoveMsg(2);
|
|
ms.RemoveMsg(9); // This was the bug
|
|
|
|
var (n, err) = ms.PurgeEx(string.Empty, 9, 0);
|
|
err.ShouldBeNull();
|
|
n.ShouldBe(7UL);
|
|
|
|
var state = new StreamState();
|
|
ms.FastState(state);
|
|
state.FirstSeq.ShouldBe(10UL);
|
|
state.LastSeq.ShouldBe(10UL);
|
|
state.Msgs.ShouldBe(1UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreDeleteAllFirstSequenceCheck (T:2048)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2048
|
|
public void MemStoreDeleteAllFirstSequenceCheck_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreDeleteAllFirstSequenceCheck line 1060
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "zzz",
|
|
Subjects = new[] { "foo" },
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var msg = Bytes("abc");
|
|
for (var i = 1; i <= 10; i++)
|
|
ms.StoreMsg("foo", null, msg, 0);
|
|
|
|
for (var seq = 1UL; seq <= 10UL; seq++)
|
|
ms.RemoveMsg(seq);
|
|
|
|
var state = new StreamState();
|
|
ms.FastState(state);
|
|
state.FirstSeq.ShouldBe(11UL);
|
|
state.LastSeq.ShouldBe(10UL);
|
|
state.Msgs.ShouldBe(0UL);
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreAllLastSeqs (T:2054)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2054
|
|
public void MemStoreAllLastSeqs_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreAllLastSeqs line 1266
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "zzz",
|
|
Subjects = new[] { "*.*" },
|
|
MaxMsgsPer = 50,
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" };
|
|
var msg = Bytes("abc");
|
|
var rng = new Random(7);
|
|
|
|
for (var i = 0; i < 10_000; i++)
|
|
{
|
|
var subj = subjs[rng.Next(subjs.Length)];
|
|
ms.StoreMsg(subj, null, msg, 0);
|
|
}
|
|
|
|
var expected = new List<ulong>();
|
|
var smv = new StoreMsg();
|
|
foreach (var subj in subjs)
|
|
{
|
|
var sm = ms.LoadLastMsg(subj, smv);
|
|
sm.ShouldNotBeNull();
|
|
expected.Add(sm!.Seq);
|
|
}
|
|
expected.Sort();
|
|
|
|
var (seqs, err) = ms.AllLastSeqs();
|
|
err.ShouldBeNull();
|
|
seqs!.SequenceEqual(expected).ShouldBeTrue();
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// TestMemStoreSubjectForSeq (T:2056)
|
|
// -----------------------------------------------------------------------
|
|
|
|
[Fact] // T:2056
|
|
public void MemStoreSubjectForSeq_ShouldSucceed()
|
|
{
|
|
// Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSubjectForSeq line 1319
|
|
var cfg = new StreamConfig
|
|
{
|
|
Name = "foo",
|
|
Subjects = new[] { "foo.>" },
|
|
Storage = StorageType.MemoryStorage,
|
|
};
|
|
var ms = NewMemStore(cfg);
|
|
|
|
var (seq, _) = ms.StoreMsg("foo.bar", null, null, 0);
|
|
seq.ShouldBe(1UL);
|
|
|
|
var (subj, err) = ms.SubjectForSeq(0);
|
|
err.ShouldNotBeNull();
|
|
|
|
(subj, err) = ms.SubjectForSeq(1);
|
|
err.ShouldBeNull();
|
|
subj.ShouldBe("foo.bar");
|
|
|
|
(subj, err) = ms.SubjectForSeq(2);
|
|
err.ShouldNotBeNull();
|
|
|
|
ms.Stop();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Private helper: subject subset matching (mirrors Go subjectIsSubsetMatch)
|
|
// -----------------------------------------------------------------------
|
|
|
|
private static bool SubjectIsSubsetMatch(string subject, string filter)
|
|
{
|
|
var sParts = subject.Split('.');
|
|
var fParts = filter.Split('.');
|
|
var si = 0;
|
|
for (var fi = 0; fi < fParts.Length; fi++)
|
|
{
|
|
if (fParts[fi] == ">")
|
|
return si < sParts.Length;
|
|
if (si >= sParts.Length)
|
|
return false;
|
|
if (fParts[fi] != "*" && fParts[fi] != sParts[si])
|
|
return false;
|
|
si++;
|
|
}
|
|
return si == sParts.Length;
|
|
}
|
|
}
|