diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs index 027b9a9..79cbdbf 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs @@ -45,7 +45,7 @@ public sealed class JetStreamApiResponse public sealed class JetStreamStreamInfo { public required StreamConfig Config { get; init; } - public required StreamState State { get; init; } + public required ApiStreamState State { get; init; } } public sealed class JetStreamConsumerInfo diff --git a/src/NATS.Server/JetStream/Models/StreamState.cs b/src/NATS.Server/JetStream/Models/StreamState.cs index 04930e9..8b779c2 100644 --- a/src/NATS.Server/JetStream/Models/StreamState.cs +++ b/src/NATS.Server/JetStream/Models/StreamState.cs @@ -1,6 +1,6 @@ namespace NATS.Server.JetStream.Models; -public sealed class StreamState +public sealed class ApiStreamState { public ulong Messages { get; set; } public ulong FirstSeq { get; set; } diff --git a/src/NATS.Server/JetStream/Storage/ConsumerState.cs b/src/NATS.Server/JetStream/Storage/ConsumerState.cs new file mode 100644 index 0000000..fe459d6 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/ConsumerState.cs @@ -0,0 +1,41 @@ +namespace NATS.Server.JetStream.Storage; + +// Go: server/store.go:376 +/// +/// Pairs a consumer-sequence number with the corresponding stream-sequence number. +/// Both point to the same message. Mirrors Go's SequencePair struct. +/// +public record struct SequencePair(ulong Consumer, ulong Stream); + +// Go: server/store.go:461 +/// +/// Tracks a single pending (unacknowledged) message for an explicit-ack consumer. +/// Sequence is the original consumer delivery sequence; Timestamp is the Unix-nanosecond +/// wall clock at which the message was delivered. +/// Mirrors Go's Pending struct. +/// +public record struct Pending(ulong Sequence, long Timestamp); + +// Go: server/store.go:382 +/// +/// Complete durable state for a consumer, persisted by IConsumerStore. +/// Contains the high-water delivery and ack-floor marks, plus optional maps of +/// pending (unacknowledged) and redelivered messages. +/// Mirrors Go's ConsumerState struct. +/// +public sealed class ConsumerState +{ + // Go: ConsumerState.Delivered — highest consumer-seq and stream-seq delivered + public SequencePair Delivered { get; set; } + + // Go: ConsumerState.AckFloor — highest consumer-seq and stream-seq fully acknowledged + public SequencePair AckFloor { get; set; } + + // Go: ConsumerState.Pending — pending acks keyed by stream sequence; only present + // when AckPolicy is Explicit. + public Dictionary? Pending { get; set; } + + // Go: ConsumerState.Redelivered — redelivery counts keyed by stream sequence; + // only present when a message has been delivered more than once. + public Dictionary? Redelivered { get; set; } +} diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 8252350..0c3419a 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -4,6 +4,10 @@ using System.Text; using System.Text.Json; using NATS.Server.JetStream.Models; +// Storage.StreamState is in this namespace. Use an alias for the API-layer type +// (now named ApiStreamState in the Models namespace) to keep method signatures clear. +using ApiStreamState = NATS.Server.JetStream.Models.ApiStreamState; + namespace NATS.Server.JetStream.Storage; public sealed class FileStore : IStreamStore, IAsyncDisposable @@ -163,9 +167,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable return ValueTask.CompletedTask; } - public ValueTask GetStateAsync(CancellationToken ct) + public ValueTask GetStateAsync(CancellationToken ct) { - return ValueTask.FromResult(new StreamState + return ValueTask.FromResult(new ApiStreamState { Messages = (ulong)_messages.Count, FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(), diff --git a/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs b/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs new file mode 100644 index 0000000..bc1f32d --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs @@ -0,0 +1,75 @@ +namespace NATS.Server.JetStream.Storage; + +// Go: server/filestore.go:85 +/// +/// Selects the symmetric cipher used for block encryption. +/// ChaCha is the default (ChaCha20-Poly1305); AES uses AES-256-GCM. +/// Mirrors Go's StoreCipher type (filestore.go:85). +/// +public enum StoreCipher +{ + // Go: ChaCha — ChaCha20-Poly1305 (default) + ChaCha, + + // Go: AES — AES-256-GCM + Aes, + + // Go: NoCipher — encryption disabled + None, +} + +// Go: server/filestore.go:106 +/// +/// Selects the compression algorithm applied to each message block. +/// Mirrors Go's StoreCompression type (filestore.go:106). +/// +public enum StoreCompression : byte +{ + // Go: NoCompression — no compression applied + None = 0, + + // Go: S2Compression — S2 (Snappy variant) block compression + S2 = 1, +} + +// Go: server/filestore.go:55 +/// +/// Configuration for the file-based block store engine. +/// Passed to the FileStore constructor and controls directory layout, block sizing, +/// cache eviction, background sync, encryption, and compression. +/// Mirrors Go's FileStoreConfig struct (filestore.go:55). +/// +public sealed class FileStoreConfig +{ + // Go: FileStoreConfig.StoreDir — root directory for all stream block files + public string StoreDir { get; set; } = string.Empty; + + // Go: FileStoreConfig.BlockSize — maximum bytes per message block file. + // 0 means use the engine default (currently 8 MiB in Go). + public ulong BlockSize { get; set; } + + // Go: FileStoreConfig.CacheExpire — how long to keep a loaded block in memory + // after the last read before evicting. Default: 10 seconds. + public TimeSpan CacheExpire { get; set; } = TimeSpan.FromSeconds(10); + + // Go: FileStoreConfig.SubjectStateExpire — how long to keep per-subject state cached + // on an idle message block. Zero means use CacheExpire. + public TimeSpan SubjectStateExpire { get; set; } + + // Go: FileStoreConfig.SyncInterval — interval at which dirty blocks are fsynced. + // Default: 2 minutes. + public TimeSpan SyncInterval { get; set; } = TimeSpan.FromMinutes(2); + + // Go: FileStoreConfig.SyncAlways — when true every write is immediately fsynced + public bool SyncAlways { get; set; } + + // Go: FileStoreConfig.AsyncFlush — when true write operations are batched and + // flushed asynchronously for higher throughput + public bool AsyncFlush { get; set; } + + // Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; None disables it + public StoreCipher Cipher { get; set; } = StoreCipher.None; + + // Go: FileStoreConfig.Compression — compression algorithm applied to block data + public StoreCompression Compression { get; set; } = StoreCompression.None; +} diff --git a/src/NATS.Server/JetStream/Storage/IConsumerStore.cs b/src/NATS.Server/JetStream/Storage/IConsumerStore.cs new file mode 100644 index 0000000..70fde7e --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/IConsumerStore.cs @@ -0,0 +1,56 @@ +using NATS.Server.JetStream.Models; +using StorageType = NATS.Server.JetStream.Models.StorageType; + +namespace NATS.Server.JetStream.Storage; + +// Go: server/store.go:357 +/// +/// Persists and retrieves durable consumer state: delivery progress, ack floor, +/// pending messages, and redelivery counts. One store instance per consumer. +/// Mirrors Go's ConsumerStore interface. +/// +public interface IConsumerStore +{ + // Go: ConsumerStore.SetStarting — initialise the starting stream sequence for a new consumer + void SetStarting(ulong sseq); + + // Go: ConsumerStore.UpdateStarting — update the starting sequence after a reset + void UpdateStarting(ulong sseq); + + // Go: ConsumerStore.Reset — reset state to a given stream sequence + void Reset(ulong sseq); + + // Go: ConsumerStore.HasState — returns true if any persisted state exists + bool HasState(); + + // Go: ConsumerStore.UpdateDelivered — record a new delivery (dseq=consumer seq, sseq=stream seq, + // dc=delivery count, ts=Unix nanosecond timestamp) + void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts); + + // Go: ConsumerStore.UpdateAcks — record an acknowledgement (dseq=consumer seq, sseq=stream seq) + void UpdateAcks(ulong dseq, ulong sseq); + + // Go: ConsumerStore.Update — overwrite the full consumer state in one call + void Update(ConsumerState state); + + // Go: ConsumerStore.State — return a snapshot of current consumer state + ConsumerState State(); + + // Go: ConsumerStore.BorrowState — return state without copying (caller must not retain beyond call) + ConsumerState BorrowState(); + + // Go: ConsumerStore.EncodedState — return the binary-encoded state for replication + byte[] EncodedState(); + + // Go: ConsumerStore.Type — the storage type backing this store (File or Memory) + StorageType Type(); + + // Go: ConsumerStore.Stop — flush and close the store without deleting data + void Stop(); + + // Go: ConsumerStore.Delete — stop the store and delete all persisted state + void Delete(); + + // Go: ConsumerStore.StreamDelete — called when the parent stream is deleted + void StreamDelete(); +} diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs index 4498e7b..f9ad283 100644 --- a/src/NATS.Server/JetStream/Storage/IStreamStore.cs +++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs @@ -1,9 +1,24 @@ using NATS.Server.JetStream.Models; +// Alias for the full Go-parity StreamState in this namespace. +using StorageStreamState = NATS.Server.JetStream.Storage.StreamState; + namespace NATS.Server.JetStream.Storage; +// Go: server/store.go:91 +/// +/// Abstraction over a single stream's message store. +/// The async methods (AppendAsync, LoadAsync, …) are used by the current +/// high-level JetStream layer. The sync methods (StoreMsg, LoadMsg, State, …) +/// mirror Go's StreamStore interface exactly and will be the primary surface +/// once the block-engine FileStore implementation lands. +/// public interface IStreamStore { + // ------------------------------------------------------------------------- + // Async helpers — used by the current JetStream layer + // ------------------------------------------------------------------------- + ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct); ValueTask LoadAsync(ulong sequence, CancellationToken ct); ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct); @@ -12,5 +27,146 @@ public interface IStreamStore ValueTask PurgeAsync(CancellationToken ct); ValueTask CreateSnapshotAsync(CancellationToken ct); ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct); - ValueTask GetStateAsync(CancellationToken ct); + + // Returns Models.StreamState for API-layer JSON serialisation compatibility. + // Existing MemStore/FileStore implementations return this type. + ValueTask GetStateAsync(CancellationToken ct); + + // ------------------------------------------------------------------------- + // Go-parity sync interface — mirrors server/store.go StreamStore + // Default implementations throw NotSupportedException so existing + // MemStore / FileStore implementations continue to compile while the + // block-engine port is in progress. + // ------------------------------------------------------------------------- + + // Go: StreamStore.StoreMsg — append a message; returns (seq, timestamp) + (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[] msg, long ttl) + => throw new NotSupportedException("Block-engine StoreMsg not yet implemented."); + + // Go: StreamStore.StoreRawMsg — store a raw message at a specified sequence + void StoreRawMsg(string subject, byte[]? hdr, byte[] msg, ulong seq, long ts, long ttl, bool discardNewCheck) + => throw new NotSupportedException("Block-engine StoreRawMsg not yet implemented."); + + // Go: StreamStore.SkipMsg — reserve a sequence without storing a message + ulong SkipMsg(ulong seq) + => throw new NotSupportedException("Block-engine SkipMsg not yet implemented."); + + // Go: StreamStore.SkipMsgs — reserve a range of sequences + void SkipMsgs(ulong seq, ulong num) + => throw new NotSupportedException("Block-engine SkipMsgs not yet implemented."); + + // Go: StreamStore.FlushAllPending — flush any buffered writes to backing storage + void FlushAllPending() + => throw new NotSupportedException("Block-engine FlushAllPending not yet implemented."); + + // Go: StreamStore.LoadMsg — load message by exact sequence; sm is an optional reusable buffer + StoreMsg LoadMsg(ulong seq, StoreMsg? sm) + => throw new NotSupportedException("Block-engine LoadMsg not yet implemented."); + + // Go: StreamStore.LoadNextMsg — load next message at or after start matching filter; + // returns the message and the number of sequences skipped + (StoreMsg Msg, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? sm) + => throw new NotSupportedException("Block-engine LoadNextMsg not yet implemented."); + + // Go: StreamStore.LoadLastMsg — load the most recent message on a given subject + StoreMsg LoadLastMsg(string subject, StoreMsg? sm) + => throw new NotSupportedException("Block-engine LoadLastMsg not yet implemented."); + + // Go: StreamStore.LoadPrevMsg — load message before start sequence + StoreMsg LoadPrevMsg(ulong start, StoreMsg? sm) + => throw new NotSupportedException("Block-engine LoadPrevMsg not yet implemented."); + + // Go: StreamStore.RemoveMsg — soft-delete a message by sequence; returns true if found + bool RemoveMsg(ulong seq) + => throw new NotSupportedException("Block-engine RemoveMsg not yet implemented."); + + // Go: StreamStore.EraseMsg — overwrite a message with random bytes before removing it + bool EraseMsg(ulong seq) + => throw new NotSupportedException("Block-engine EraseMsg not yet implemented."); + + // Go: StreamStore.Purge — remove all messages; returns count purged + ulong Purge() + => throw new NotSupportedException("Block-engine Purge not yet implemented."); + + // Go: StreamStore.PurgeEx — purge messages on subject up to seq keeping keep newest + ulong PurgeEx(string subject, ulong seq, ulong keep) + => throw new NotSupportedException("Block-engine PurgeEx not yet implemented."); + + // Go: StreamStore.Compact — remove all messages with seq < given sequence + ulong Compact(ulong seq) + => throw new NotSupportedException("Block-engine Compact not yet implemented."); + + // Go: StreamStore.Truncate — remove all messages with seq > given sequence + void Truncate(ulong seq) + => throw new NotSupportedException("Block-engine Truncate not yet implemented."); + + // Go: StreamStore.GetSeqFromTime — return first sequence at or after wall-clock time t + ulong GetSeqFromTime(DateTime t) + => throw new NotSupportedException("Block-engine GetSeqFromTime not yet implemented."); + + // Go: StreamStore.FilteredState — compact state for messages matching subject at or after seq + SimpleState FilteredState(ulong seq, string subject) + => throw new NotSupportedException("Block-engine FilteredState not yet implemented."); + + // Go: StreamStore.SubjectsState — per-subject SimpleState for all subjects matching filter + Dictionary SubjectsState(string filterSubject) + => throw new NotSupportedException("Block-engine SubjectsState not yet implemented."); + + // Go: StreamStore.SubjectsTotals — per-subject message count for subjects matching filter + Dictionary SubjectsTotals(string filterSubject) + => throw new NotSupportedException("Block-engine SubjectsTotals not yet implemented."); + + // Go: StreamStore.AllLastSeqs — last sequence for every subject in the stream + ulong[] AllLastSeqs() + => throw new NotSupportedException("Block-engine AllLastSeqs not yet implemented."); + + // Go: StreamStore.MultiLastSeqs — last sequences for subjects matching filters, up to maxSeq + ulong[] MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed) + => throw new NotSupportedException("Block-engine MultiLastSeqs not yet implemented."); + + // Go: StreamStore.SubjectForSeq — return the subject stored at the given sequence + string SubjectForSeq(ulong seq) + => throw new NotSupportedException("Block-engine SubjectForSeq not yet implemented."); + + // Go: StreamStore.NumPending — count messages pending from sseq on filter subject; + // lastPerSubject restricts to one-per-subject semantics + (ulong Total, ulong ValidThrough) NumPending(ulong sseq, string filter, bool lastPerSubject) + => throw new NotSupportedException("Block-engine NumPending not yet implemented."); + + // Go: StreamStore.State — return full stream state (Go-parity, with deleted sets) + StorageStreamState State() + => throw new NotSupportedException("Block-engine State not yet implemented."); + + // Go: StreamStore.FastState — populate a pre-allocated StreamState with the minimum + // fields needed for replication without allocating a new struct + void FastState(ref StorageStreamState state) + => throw new NotSupportedException("Block-engine FastState not yet implemented."); + + // Go: StreamStore.EncodedStreamState — binary-encode stream state for NRG replication + byte[] EncodedStreamState(ulong failed) + => throw new NotSupportedException("Block-engine EncodedStreamState not yet implemented."); + + // Go: StreamStore.Type — the storage type (File or Memory) + StorageType Type() + => throw new NotSupportedException("Block-engine Type not yet implemented."); + + // Go: StreamStore.UpdateConfig — apply a new StreamConfig without restarting the store + void UpdateConfig(StreamConfig cfg) + => throw new NotSupportedException("Block-engine UpdateConfig not yet implemented."); + + // Go: StreamStore.Delete — stop and delete all data; inline=true means synchronous deletion + void Delete(bool inline) + => throw new NotSupportedException("Block-engine Delete not yet implemented."); + + // Go: StreamStore.Stop — flush and stop without deleting data + void Stop() + => throw new NotSupportedException("Block-engine Stop not yet implemented."); + + // Go: StreamStore.ConsumerStore — create or open a consumer store for the named consumer + IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg) + => throw new NotSupportedException("Block-engine ConsumerStore not yet implemented."); + + // Go: StreamStore.ResetState — reset internal state caches (used after NRG catchup) + void ResetState() + => throw new NotSupportedException("Block-engine ResetState not yet implemented."); } diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index 1e6d111..e9d9b14 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -132,11 +132,11 @@ public sealed class MemStore : IStreamStore } } - public ValueTask GetStateAsync(CancellationToken ct) + public ValueTask GetStateAsync(CancellationToken ct) { lock (_gate) { - return ValueTask.FromResult(new StreamState + return ValueTask.FromResult(new ApiStreamState { Messages = (ulong)_messages.Count, FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(), diff --git a/src/NATS.Server/JetStream/Storage/StoreMsg.cs b/src/NATS.Server/JetStream/Storage/StoreMsg.cs new file mode 100644 index 0000000..80baa81 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/StoreMsg.cs @@ -0,0 +1,39 @@ +namespace NATS.Server.JetStream.Storage; + +// Go: server/store.go:71 +/// +/// Reusable message container returned by store load operations. +/// The internal buffer is reused across calls to avoid allocations on the hot path. +/// Mirrors Go's StoreMsg struct, which pools a single backing byte slice (buf) that +/// both hdr and msg slice into. +/// +public sealed class StoreMsg +{ + // Go: StoreMsg.subj + public string Subject { get; set; } = string.Empty; + + // Go: StoreMsg.hdr — NATS message headers (optional) + public byte[]? Header { get; set; } + + // Go: StoreMsg.msg — message body + public byte[]? Data { get; set; } + + // Go: StoreMsg.seq — stream sequence number + public ulong Sequence { get; set; } + + // Go: StoreMsg.ts — wall-clock timestamp in Unix nanoseconds + public long Timestamp { get; set; } + + /// + /// Resets all fields to their zero values while retaining the backing buffer + /// for reuse by the next load call. Matches Go's StoreMsg.clear(). + /// + public void Clear() + { + Subject = string.Empty; + Header = null; + Data = null; + Sequence = 0; + Timestamp = 0; + } +} diff --git a/src/NATS.Server/JetStream/Storage/StreamState.cs b/src/NATS.Server/JetStream/Storage/StreamState.cs new file mode 100644 index 0000000..14ee95e --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/StreamState.cs @@ -0,0 +1,78 @@ +namespace NATS.Server.JetStream.Storage; + +// Go: server/store.go:162 +/// +/// Full state snapshot for a stream, returned by IStreamStore.State() and +/// IStreamStore.FastState(). Matches Go's StreamState struct, including optional +/// per-subject message counts, deleted sequence lists, and consumer count. +/// +public record struct StreamState +{ + // Go: StreamState.Msgs — total number of messages in the stream + public ulong Msgs { get; set; } + + // Go: StreamState.Bytes — total bytes stored + public ulong Bytes { get; set; } + + // Go: StreamState.FirstSeq — sequence number of the oldest message + public ulong FirstSeq { get; set; } + + // Go: StreamState.FirstTime — wall-clock time of the oldest message + public DateTime FirstTime { get; set; } + + // Go: StreamState.LastSeq — sequence number of the newest message + public ulong LastSeq { get; set; } + + // Go: StreamState.LastTime — wall-clock time of the newest message + public DateTime LastTime { get; set; } + + // Go: StreamState.NumSubjects — count of distinct subjects in the stream + public int NumSubjects { get; set; } + + // Go: StreamState.Subjects — per-subject message counts (populated on demand) + public Dictionary? Subjects { get; set; } + + // Go: StreamState.NumDeleted — number of interior gaps (deleted sequences) + public int NumDeleted { get; set; } + + // Go: StreamState.Deleted — explicit list of deleted sequences (populated on demand) + public ulong[]? Deleted { get; set; } + + // Go: StreamState.Lost (LostStreamData) — sequences that were lost due to storage corruption + public LostStreamData? Lost { get; set; } + + // Go: StreamState.Consumers — number of consumers attached to the stream + public int Consumers { get; set; } +} + +// Go: server/store.go:189 +/// +/// Describes messages lost due to storage-level corruption. +/// Mirrors Go's LostStreamData struct. +/// +public sealed class LostStreamData +{ + // Go: LostStreamData.Msgs — sequences of lost messages + public ulong[]? Msgs { get; set; } + + // Go: LostStreamData.Bytes — total bytes of lost data + public ulong Bytes { get; set; } +} + +// Go: server/store.go:178 +/// +/// Compact state for a single subject filter within a stream. +/// Used by IStreamStore.FilteredState() and SubjectsState(). +/// Mirrors Go's SimpleState struct. +/// +public record struct SimpleState +{ + // Go: SimpleState.Msgs — number of messages matching the filter + public ulong Msgs { get; set; } + + // Go: SimpleState.First — first sequence number matching the filter + public ulong First { get; set; } + + // Go: SimpleState.Last — last sequence number matching the filter + public ulong Last { get; set; } +} diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 0f67f61..b12bfc5 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -138,12 +138,12 @@ public sealed class StreamManager return true; } - public ValueTask GetStateAsync(string name, CancellationToken ct) + public ValueTask GetStateAsync(string name, CancellationToken ct) { if (_streams.TryGetValue(name, out var stream)) return stream.Store.GetStateAsync(ct); - return ValueTask.FromResult(new StreamState()); + return ValueTask.FromResult(new Models.ApiStreamState()); } public StreamHandle? FindBySubject(string subject) diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/LeaderFailoverParityTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/LeaderFailoverParityTests.cs index 1991acd..d62504d 100644 --- a/tests/NATS.Server.Tests/JetStream/Cluster/LeaderFailoverParityTests.cs +++ b/tests/NATS.Server.Tests/JetStream/Cluster/LeaderFailoverParityTests.cs @@ -209,7 +209,7 @@ internal sealed class LeaderFailoverFixture : IAsyncDisposable return string.Empty; } - public ValueTask GetStreamStateAsync(string stream) + public ValueTask GetStreamStateAsync(string stream) => _streamManager.GetStateAsync(stream, default); public MetaGroupState? GetMetaState() => _streamManager.GetMetaState(); diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index 60aca8a..5337bba 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -206,7 +206,7 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload); } - public Task GetStreamStateAsync(string streamName) + public Task GetStreamStateAsync(string streamName) { return _streamManager.GetStateAsync(streamName, default).AsTask(); } diff --git a/tests/NATS.Server.Tests/StreamStoreContractTests.cs b/tests/NATS.Server.Tests/StreamStoreContractTests.cs index 631c126..cc8ba73 100644 --- a/tests/NATS.Server.Tests/StreamStoreContractTests.cs +++ b/tests/NATS.Server.Tests/StreamStoreContractTests.cs @@ -25,8 +25,8 @@ public class StreamStoreContractTests return ValueTask.FromResult(_last); } - public ValueTask GetStateAsync(CancellationToken ct) - => ValueTask.FromResult(new StreamState { Messages = _last }); + public ValueTask GetStateAsync(CancellationToken ct) + => ValueTask.FromResult(new ApiStreamState { Messages = _last }); public ValueTask LoadAsync(ulong sequence, CancellationToken ct) => ValueTask.FromResult(null);