feat: define jetstream storage interfaces

This commit is contained in:
Joseph Doherty
2026-02-23 05:59:39 -05:00
parent d1935bc9ec
commit cae09f9091
4 changed files with 57 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.JetStream.Models;
public sealed class StreamState
{
public ulong Messages { get; set; }
public ulong FirstSeq { get; set; }
public ulong LastSeq { get; set; }
}

View File

@@ -0,0 +1,9 @@
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Storage;
public interface IStreamStore
{
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
ValueTask<StreamState> GetStateAsync(CancellationToken ct);
}

View File

@@ -0,0 +1,9 @@
namespace NATS.Server.JetStream.Storage;
public sealed class StoredMessage
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public ReadOnlyMemory<byte> Payload { get; init; }
public bool Redelivered { get; init; }
}

View File

@@ -0,0 +1,31 @@
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests;
public class StreamStoreContractTests
{
[Fact]
public async Task Append_increments_sequence_and_updates_state()
{
var store = new FakeStreamStore();
var seq = await store.AppendAsync("foo", "bar"u8.ToArray(), default);
seq.ShouldBe((ulong)1);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)1);
}
private sealed class FakeStreamStore : IStreamStore
{
private ulong _last;
public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
_last++;
return ValueTask.FromResult(_last);
}
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
=> ValueTask.FromResult(new StreamState { Messages = _last });
}
}