diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs index 8073199..ec2dd15 100644 --- a/src/NATS.Server/JetStream/Storage/IStreamStore.cs +++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs @@ -5,5 +5,7 @@ namespace NATS.Server.JetStream.Storage; public interface IStreamStore { ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct); + ValueTask LoadAsync(ulong sequence, CancellationToken ct); + ValueTask PurgeAsync(CancellationToken ct); ValueTask GetStateAsync(CancellationToken ct); } diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs new file mode 100644 index 0000000..28e82f0 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -0,0 +1,57 @@ +using NATS.Server.JetStream.Models; + +namespace NATS.Server.JetStream.Storage; + +public sealed class MemStore : IStreamStore +{ + private readonly object _gate = new(); + private ulong _last; + private readonly Dictionary _messages = new(); + + public ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) + { + lock (_gate) + { + _last++; + _messages[_last] = new StoredMessage + { + Sequence = _last, + Subject = subject, + Payload = payload, + }; + return ValueTask.FromResult(_last); + } + } + + public ValueTask LoadAsync(ulong sequence, CancellationToken ct) + { + lock (_gate) + { + _messages.TryGetValue(sequence, out var msg); + return ValueTask.FromResult(msg); + } + } + + public ValueTask PurgeAsync(CancellationToken ct) + { + lock (_gate) + { + _messages.Clear(); + _last = 0; + return ValueTask.CompletedTask; + } + } + + public ValueTask GetStateAsync(CancellationToken ct) + { + lock (_gate) + { + return ValueTask.FromResult(new StreamState + { + Messages = (ulong)_messages.Count, + FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(), + LastSeq = _last, + }); + } + } +} diff --git a/tests/NATS.Server.Tests/MemStoreTests.cs b/tests/NATS.Server.Tests/MemStoreTests.cs new file mode 100644 index 0000000..78cabc4 --- /dev/null +++ b/tests/NATS.Server.Tests/MemStoreTests.cs @@ -0,0 +1,20 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class MemStoreTests +{ + [Fact] + public async Task MemStore_supports_append_load_and_purge() + { + var store = new MemStore(); + var seq1 = await store.AppendAsync("a", "one"u8.ToArray(), default); + var seq2 = await store.AppendAsync("a", "two"u8.ToArray(), default); + + seq2.ShouldBe(seq1 + 1); + (await store.LoadAsync(seq2, default))!.Payload.Span.SequenceEqual("two"u8).ShouldBeTrue(); + + await store.PurgeAsync(default); + (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)0); + } +} diff --git a/tests/NATS.Server.Tests/StreamStoreContractTests.cs b/tests/NATS.Server.Tests/StreamStoreContractTests.cs index f5243f8..c6ffa18 100644 --- a/tests/NATS.Server.Tests/StreamStoreContractTests.cs +++ b/tests/NATS.Server.Tests/StreamStoreContractTests.cs @@ -27,5 +27,10 @@ public class StreamStoreContractTests public ValueTask GetStateAsync(CancellationToken ct) => ValueTask.FromResult(new StreamState { Messages = _last }); + + public ValueTask LoadAsync(ulong sequence, CancellationToken ct) + => ValueTask.FromResult(null); + + public ValueTask PurgeAsync(CancellationToken ct) => ValueTask.CompletedTask; } }