diff --git a/src/NATS.Server/JetStream/Models/StreamState.cs b/src/NATS.Server/JetStream/Models/StreamState.cs new file mode 100644 index 0000000..9fe197f --- /dev/null +++ b/src/NATS.Server/JetStream/Models/StreamState.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.JetStream.Models; + +public sealed class StreamState +{ + public ulong Messages { get; set; } + public ulong FirstSeq { get; set; } + public ulong LastSeq { get; set; } +} diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs new file mode 100644 index 0000000..8073199 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs @@ -0,0 +1,9 @@ +using NATS.Server.JetStream.Models; + +namespace NATS.Server.JetStream.Storage; + +public interface IStreamStore +{ + ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct); + ValueTask GetStateAsync(CancellationToken ct); +} diff --git a/src/NATS.Server/JetStream/Storage/StoredMessage.cs b/src/NATS.Server/JetStream/Storage/StoredMessage.cs new file mode 100644 index 0000000..b702613 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/StoredMessage.cs @@ -0,0 +1,9 @@ +namespace NATS.Server.JetStream.Storage; + +public sealed class StoredMessage +{ + public ulong Sequence { get; init; } + public string Subject { get; init; } = string.Empty; + public ReadOnlyMemory Payload { get; init; } + public bool Redelivered { get; init; } +} diff --git a/tests/NATS.Server.Tests/StreamStoreContractTests.cs b/tests/NATS.Server.Tests/StreamStoreContractTests.cs new file mode 100644 index 0000000..f5243f8 --- /dev/null +++ b/tests/NATS.Server.Tests/StreamStoreContractTests.cs @@ -0,0 +1,31 @@ +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class StreamStoreContractTests +{ + [Fact] + public async Task Append_increments_sequence_and_updates_state() + { + var store = new FakeStreamStore(); + var seq = await store.AppendAsync("foo", "bar"u8.ToArray(), default); + + seq.ShouldBe((ulong)1); + (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)1); + } + + private sealed class FakeStreamStore : IStreamStore + { + private ulong _last; + + public ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) + { + _last++; + return ValueTask.FromResult(_last); + } + + public ValueTask GetStateAsync(CancellationToken ct) + => ValueTask.FromResult(new StreamState { Messages = _last }); + } +}