feat: define StreamStore/ConsumerStore interfaces from Go store.go

Port IStreamStore, IConsumerStore, StoreMsg, StreamState, SimpleState,
ConsumerState, FileStoreConfig, StoreCipher, StoreCompression types.
Rename Models.StreamState → ApiStreamState to avoid namespace conflict.
This commit is contained in:
Joseph Doherty
2026-02-23 21:06:16 -05:00
parent 256daad8e5
commit 921554f410
14 changed files with 462 additions and 13 deletions

View File

@@ -45,7 +45,7 @@ public sealed class JetStreamApiResponse
public sealed class JetStreamStreamInfo
{
public required StreamConfig Config { get; init; }
public required StreamState State { get; init; }
public required ApiStreamState State { get; init; }
}
public sealed class JetStreamConsumerInfo

View File

@@ -1,6 +1,6 @@
namespace NATS.Server.JetStream.Models;
public sealed class StreamState
public sealed class ApiStreamState
{
public ulong Messages { get; set; }
public ulong FirstSeq { get; set; }

View File

@@ -0,0 +1,41 @@
namespace NATS.Server.JetStream.Storage;
// Go: server/store.go:376
/// <summary>
/// Pairs a consumer-sequence number with the corresponding stream-sequence number.
/// Both point to the same message. Mirrors Go's SequencePair struct.
/// </summary>
public record struct SequencePair(ulong Consumer, ulong Stream);
// Go: server/store.go:461
/// <summary>
/// Tracks a single pending (unacknowledged) message for an explicit-ack consumer.
/// Sequence is the original consumer delivery sequence; Timestamp is the Unix-nanosecond
/// wall clock at which the message was delivered.
/// Mirrors Go's Pending struct.
/// </summary>
public record struct Pending(ulong Sequence, long Timestamp);
// Go: server/store.go:382
/// <summary>
/// Complete durable state for a consumer, persisted by IConsumerStore.
/// Contains the high-water delivery and ack-floor marks, plus optional maps of
/// pending (unacknowledged) and redelivered messages.
/// Mirrors Go's ConsumerState struct.
/// </summary>
public sealed class ConsumerState
{
// Go: ConsumerState.Delivered — highest consumer-seq and stream-seq delivered
public SequencePair Delivered { get; set; }
// Go: ConsumerState.AckFloor — highest consumer-seq and stream-seq fully acknowledged
public SequencePair AckFloor { get; set; }
// Go: ConsumerState.Pending — pending acks keyed by stream sequence; only present
// when AckPolicy is Explicit.
public Dictionary<ulong, Pending>? Pending { get; set; }
// Go: ConsumerState.Redelivered — redelivery counts keyed by stream sequence;
// only present when a message has been delivered more than once.
public Dictionary<ulong, ulong>? Redelivered { get; set; }
}

View File

@@ -4,6 +4,10 @@ using System.Text;
using System.Text.Json;
using NATS.Server.JetStream.Models;
// Storage.StreamState is in this namespace. Use an alias for the API-layer type
// (now named ApiStreamState in the Models namespace) to keep method signatures clear.
using ApiStreamState = NATS.Server.JetStream.Models.ApiStreamState;
namespace NATS.Server.JetStream.Storage;
public sealed class FileStore : IStreamStore, IAsyncDisposable
@@ -163,9 +167,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return ValueTask.CompletedTask;
}
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
public ValueTask<ApiStreamState> GetStateAsync(CancellationToken ct)
{
return ValueTask.FromResult(new StreamState
return ValueTask.FromResult(new ApiStreamState
{
Messages = (ulong)_messages.Count,
FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),

View File

@@ -0,0 +1,75 @@
namespace NATS.Server.JetStream.Storage;
// Go: server/filestore.go:85
/// <summary>
/// Selects the symmetric cipher used for block encryption.
/// ChaCha is the default (ChaCha20-Poly1305); AES uses AES-256-GCM.
/// Mirrors Go's StoreCipher type (filestore.go:85).
/// </summary>
public enum StoreCipher
{
// Go: ChaCha — ChaCha20-Poly1305 (default)
ChaCha,
// Go: AES — AES-256-GCM
Aes,
// Go: NoCipher — encryption disabled
None,
}
// Go: server/filestore.go:106
/// <summary>
/// Selects the compression algorithm applied to each message block.
/// Mirrors Go's StoreCompression type (filestore.go:106).
/// </summary>
public enum StoreCompression : byte
{
// Go: NoCompression — no compression applied
None = 0,
// Go: S2Compression — S2 (Snappy variant) block compression
S2 = 1,
}
// Go: server/filestore.go:55
/// <summary>
/// Configuration for the file-based block store engine.
/// Passed to the FileStore constructor and controls directory layout, block sizing,
/// cache eviction, background sync, encryption, and compression.
/// Mirrors Go's FileStoreConfig struct (filestore.go:55).
/// </summary>
public sealed class FileStoreConfig
{
// Go: FileStoreConfig.StoreDir — root directory for all stream block files
public string StoreDir { get; set; } = string.Empty;
// Go: FileStoreConfig.BlockSize — maximum bytes per message block file.
// 0 means use the engine default (currently 8 MiB in Go).
public ulong BlockSize { get; set; }
// Go: FileStoreConfig.CacheExpire — how long to keep a loaded block in memory
// after the last read before evicting. Default: 10 seconds.
public TimeSpan CacheExpire { get; set; } = TimeSpan.FromSeconds(10);
// Go: FileStoreConfig.SubjectStateExpire — how long to keep per-subject state cached
// on an idle message block. Zero means use CacheExpire.
public TimeSpan SubjectStateExpire { get; set; }
// Go: FileStoreConfig.SyncInterval — interval at which dirty blocks are fsynced.
// Default: 2 minutes.
public TimeSpan SyncInterval { get; set; } = TimeSpan.FromMinutes(2);
// Go: FileStoreConfig.SyncAlways — when true every write is immediately fsynced
public bool SyncAlways { get; set; }
// Go: FileStoreConfig.AsyncFlush — when true write operations are batched and
// flushed asynchronously for higher throughput
public bool AsyncFlush { get; set; }
// Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; None disables it
public StoreCipher Cipher { get; set; } = StoreCipher.None;
// Go: FileStoreConfig.Compression — compression algorithm applied to block data
public StoreCompression Compression { get; set; } = StoreCompression.None;
}

View File

@@ -0,0 +1,56 @@
using NATS.Server.JetStream.Models;
using StorageType = NATS.Server.JetStream.Models.StorageType;
namespace NATS.Server.JetStream.Storage;
// Go: server/store.go:357
/// <summary>
/// Persists and retrieves durable consumer state: delivery progress, ack floor,
/// pending messages, and redelivery counts. One store instance per consumer.
/// Mirrors Go's ConsumerStore interface.
/// </summary>
public interface IConsumerStore
{
// Go: ConsumerStore.SetStarting — initialise the starting stream sequence for a new consumer
void SetStarting(ulong sseq);
// Go: ConsumerStore.UpdateStarting — update the starting sequence after a reset
void UpdateStarting(ulong sseq);
// Go: ConsumerStore.Reset — reset state to a given stream sequence
void Reset(ulong sseq);
// Go: ConsumerStore.HasState — returns true if any persisted state exists
bool HasState();
// Go: ConsumerStore.UpdateDelivered — record a new delivery (dseq=consumer seq, sseq=stream seq,
// dc=delivery count, ts=Unix nanosecond timestamp)
void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts);
// Go: ConsumerStore.UpdateAcks — record an acknowledgement (dseq=consumer seq, sseq=stream seq)
void UpdateAcks(ulong dseq, ulong sseq);
// Go: ConsumerStore.Update — overwrite the full consumer state in one call
void Update(ConsumerState state);
// Go: ConsumerStore.State — return a snapshot of current consumer state
ConsumerState State();
// Go: ConsumerStore.BorrowState — return state without copying (caller must not retain beyond call)
ConsumerState BorrowState();
// Go: ConsumerStore.EncodedState — return the binary-encoded state for replication
byte[] EncodedState();
// Go: ConsumerStore.Type — the storage type backing this store (File or Memory)
StorageType Type();
// Go: ConsumerStore.Stop — flush and close the store without deleting data
void Stop();
// Go: ConsumerStore.Delete — stop the store and delete all persisted state
void Delete();
// Go: ConsumerStore.StreamDelete — called when the parent stream is deleted
void StreamDelete();
}

View File

@@ -1,9 +1,24 @@
using NATS.Server.JetStream.Models;
// Alias for the full Go-parity StreamState in this namespace.
using StorageStreamState = NATS.Server.JetStream.Storage.StreamState;
namespace NATS.Server.JetStream.Storage;
// Go: server/store.go:91
/// <summary>
/// Abstraction over a single stream's message store.
/// The async methods (AppendAsync, LoadAsync, …) are used by the current
/// high-level JetStream layer. The sync methods (StoreMsg, LoadMsg, State, …)
/// mirror Go's StreamStore interface exactly and will be the primary surface
/// once the block-engine FileStore implementation lands.
/// </summary>
public interface IStreamStore
{
// -------------------------------------------------------------------------
// Async helpers — used by the current JetStream layer
// -------------------------------------------------------------------------
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct);
@@ -12,5 +27,146 @@ public interface IStreamStore
ValueTask PurgeAsync(CancellationToken ct);
ValueTask<byte[]> CreateSnapshotAsync(CancellationToken ct);
ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct);
ValueTask<StreamState> GetStateAsync(CancellationToken ct);
// Returns Models.StreamState for API-layer JSON serialisation compatibility.
// Existing MemStore/FileStore implementations return this type.
ValueTask<ApiStreamState> GetStateAsync(CancellationToken ct);
// -------------------------------------------------------------------------
// Go-parity sync interface — mirrors server/store.go StreamStore
// Default implementations throw NotSupportedException so existing
// MemStore / FileStore implementations continue to compile while the
// block-engine port is in progress.
// -------------------------------------------------------------------------
// Go: StreamStore.StoreMsg — append a message; returns (seq, timestamp)
(ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[] msg, long ttl)
=> throw new NotSupportedException("Block-engine StoreMsg not yet implemented.");
// Go: StreamStore.StoreRawMsg — store a raw message at a specified sequence
void StoreRawMsg(string subject, byte[]? hdr, byte[] msg, ulong seq, long ts, long ttl, bool discardNewCheck)
=> throw new NotSupportedException("Block-engine StoreRawMsg not yet implemented.");
// Go: StreamStore.SkipMsg — reserve a sequence without storing a message
ulong SkipMsg(ulong seq)
=> throw new NotSupportedException("Block-engine SkipMsg not yet implemented.");
// Go: StreamStore.SkipMsgs — reserve a range of sequences
void SkipMsgs(ulong seq, ulong num)
=> throw new NotSupportedException("Block-engine SkipMsgs not yet implemented.");
// Go: StreamStore.FlushAllPending — flush any buffered writes to backing storage
void FlushAllPending()
=> throw new NotSupportedException("Block-engine FlushAllPending not yet implemented.");
// Go: StreamStore.LoadMsg — load message by exact sequence; sm is an optional reusable buffer
StoreMsg LoadMsg(ulong seq, StoreMsg? sm)
=> throw new NotSupportedException("Block-engine LoadMsg not yet implemented.");
// Go: StreamStore.LoadNextMsg — load next message at or after start matching filter;
// returns the message and the number of sequences skipped
(StoreMsg Msg, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? sm)
=> throw new NotSupportedException("Block-engine LoadNextMsg not yet implemented.");
// Go: StreamStore.LoadLastMsg — load the most recent message on a given subject
StoreMsg LoadLastMsg(string subject, StoreMsg? sm)
=> throw new NotSupportedException("Block-engine LoadLastMsg not yet implemented.");
// Go: StreamStore.LoadPrevMsg — load message before start sequence
StoreMsg LoadPrevMsg(ulong start, StoreMsg? sm)
=> throw new NotSupportedException("Block-engine LoadPrevMsg not yet implemented.");
// Go: StreamStore.RemoveMsg — soft-delete a message by sequence; returns true if found
bool RemoveMsg(ulong seq)
=> throw new NotSupportedException("Block-engine RemoveMsg not yet implemented.");
// Go: StreamStore.EraseMsg — overwrite a message with random bytes before removing it
bool EraseMsg(ulong seq)
=> throw new NotSupportedException("Block-engine EraseMsg not yet implemented.");
// Go: StreamStore.Purge — remove all messages; returns count purged
ulong Purge()
=> throw new NotSupportedException("Block-engine Purge not yet implemented.");
// Go: StreamStore.PurgeEx — purge messages on subject up to seq keeping keep newest
ulong PurgeEx(string subject, ulong seq, ulong keep)
=> throw new NotSupportedException("Block-engine PurgeEx not yet implemented.");
// Go: StreamStore.Compact — remove all messages with seq < given sequence
ulong Compact(ulong seq)
=> throw new NotSupportedException("Block-engine Compact not yet implemented.");
// Go: StreamStore.Truncate — remove all messages with seq > given sequence
void Truncate(ulong seq)
=> throw new NotSupportedException("Block-engine Truncate not yet implemented.");
// Go: StreamStore.GetSeqFromTime — return first sequence at or after wall-clock time t
ulong GetSeqFromTime(DateTime t)
=> throw new NotSupportedException("Block-engine GetSeqFromTime not yet implemented.");
// Go: StreamStore.FilteredState — compact state for messages matching subject at or after seq
SimpleState FilteredState(ulong seq, string subject)
=> throw new NotSupportedException("Block-engine FilteredState not yet implemented.");
// Go: StreamStore.SubjectsState — per-subject SimpleState for all subjects matching filter
Dictionary<string, SimpleState> SubjectsState(string filterSubject)
=> throw new NotSupportedException("Block-engine SubjectsState not yet implemented.");
// Go: StreamStore.SubjectsTotals — per-subject message count for subjects matching filter
Dictionary<string, ulong> SubjectsTotals(string filterSubject)
=> throw new NotSupportedException("Block-engine SubjectsTotals not yet implemented.");
// Go: StreamStore.AllLastSeqs — last sequence for every subject in the stream
ulong[] AllLastSeqs()
=> throw new NotSupportedException("Block-engine AllLastSeqs not yet implemented.");
// Go: StreamStore.MultiLastSeqs — last sequences for subjects matching filters, up to maxSeq
ulong[] MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed)
=> throw new NotSupportedException("Block-engine MultiLastSeqs not yet implemented.");
// Go: StreamStore.SubjectForSeq — return the subject stored at the given sequence
string SubjectForSeq(ulong seq)
=> throw new NotSupportedException("Block-engine SubjectForSeq not yet implemented.");
// Go: StreamStore.NumPending — count messages pending from sseq on filter subject;
// lastPerSubject restricts to one-per-subject semantics
(ulong Total, ulong ValidThrough) NumPending(ulong sseq, string filter, bool lastPerSubject)
=> throw new NotSupportedException("Block-engine NumPending not yet implemented.");
// Go: StreamStore.State — return full stream state (Go-parity, with deleted sets)
StorageStreamState State()
=> throw new NotSupportedException("Block-engine State not yet implemented.");
// Go: StreamStore.FastState — populate a pre-allocated StreamState with the minimum
// fields needed for replication without allocating a new struct
void FastState(ref StorageStreamState state)
=> throw new NotSupportedException("Block-engine FastState not yet implemented.");
// Go: StreamStore.EncodedStreamState — binary-encode stream state for NRG replication
byte[] EncodedStreamState(ulong failed)
=> throw new NotSupportedException("Block-engine EncodedStreamState not yet implemented.");
// Go: StreamStore.Type — the storage type (File or Memory)
StorageType Type()
=> throw new NotSupportedException("Block-engine Type not yet implemented.");
// Go: StreamStore.UpdateConfig — apply a new StreamConfig without restarting the store
void UpdateConfig(StreamConfig cfg)
=> throw new NotSupportedException("Block-engine UpdateConfig not yet implemented.");
// Go: StreamStore.Delete — stop and delete all data; inline=true means synchronous deletion
void Delete(bool inline)
=> throw new NotSupportedException("Block-engine Delete not yet implemented.");
// Go: StreamStore.Stop — flush and stop without deleting data
void Stop()
=> throw new NotSupportedException("Block-engine Stop not yet implemented.");
// Go: StreamStore.ConsumerStore — create or open a consumer store for the named consumer
IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg)
=> throw new NotSupportedException("Block-engine ConsumerStore not yet implemented.");
// Go: StreamStore.ResetState — reset internal state caches (used after NRG catchup)
void ResetState()
=> throw new NotSupportedException("Block-engine ResetState not yet implemented.");
}

View File

@@ -132,11 +132,11 @@ public sealed class MemStore : IStreamStore
}
}
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
public ValueTask<ApiStreamState> GetStateAsync(CancellationToken ct)
{
lock (_gate)
{
return ValueTask.FromResult(new StreamState
return ValueTask.FromResult(new ApiStreamState
{
Messages = (ulong)_messages.Count,
FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),

View File

@@ -0,0 +1,39 @@
namespace NATS.Server.JetStream.Storage;
// Go: server/store.go:71
/// <summary>
/// Reusable message container returned by store load operations.
/// The internal buffer is reused across calls to avoid allocations on the hot path.
/// Mirrors Go's StoreMsg struct, which pools a single backing byte slice (buf) that
/// both hdr and msg slice into.
/// </summary>
public sealed class StoreMsg
{
// Go: StoreMsg.subj
public string Subject { get; set; } = string.Empty;
// Go: StoreMsg.hdr — NATS message headers (optional)
public byte[]? Header { get; set; }
// Go: StoreMsg.msg — message body
public byte[]? Data { get; set; }
// Go: StoreMsg.seq — stream sequence number
public ulong Sequence { get; set; }
// Go: StoreMsg.ts — wall-clock timestamp in Unix nanoseconds
public long Timestamp { get; set; }
/// <summary>
/// Resets all fields to their zero values while retaining the backing buffer
/// for reuse by the next load call. Matches Go's StoreMsg.clear().
/// </summary>
public void Clear()
{
Subject = string.Empty;
Header = null;
Data = null;
Sequence = 0;
Timestamp = 0;
}
}

View File

@@ -0,0 +1,78 @@
namespace NATS.Server.JetStream.Storage;
// Go: server/store.go:162
/// <summary>
/// Full state snapshot for a stream, returned by IStreamStore.State() and
/// IStreamStore.FastState(). Matches Go's StreamState struct, including optional
/// per-subject message counts, deleted sequence lists, and consumer count.
/// </summary>
public record struct StreamState
{
// Go: StreamState.Msgs — total number of messages in the stream
public ulong Msgs { get; set; }
// Go: StreamState.Bytes — total bytes stored
public ulong Bytes { get; set; }
// Go: StreamState.FirstSeq — sequence number of the oldest message
public ulong FirstSeq { get; set; }
// Go: StreamState.FirstTime — wall-clock time of the oldest message
public DateTime FirstTime { get; set; }
// Go: StreamState.LastSeq — sequence number of the newest message
public ulong LastSeq { get; set; }
// Go: StreamState.LastTime — wall-clock time of the newest message
public DateTime LastTime { get; set; }
// Go: StreamState.NumSubjects — count of distinct subjects in the stream
public int NumSubjects { get; set; }
// Go: StreamState.Subjects — per-subject message counts (populated on demand)
public Dictionary<string, ulong>? Subjects { get; set; }
// Go: StreamState.NumDeleted — number of interior gaps (deleted sequences)
public int NumDeleted { get; set; }
// Go: StreamState.Deleted — explicit list of deleted sequences (populated on demand)
public ulong[]? Deleted { get; set; }
// Go: StreamState.Lost (LostStreamData) — sequences that were lost due to storage corruption
public LostStreamData? Lost { get; set; }
// Go: StreamState.Consumers — number of consumers attached to the stream
public int Consumers { get; set; }
}
// Go: server/store.go:189
/// <summary>
/// Describes messages lost due to storage-level corruption.
/// Mirrors Go's LostStreamData struct.
/// </summary>
public sealed class LostStreamData
{
// Go: LostStreamData.Msgs — sequences of lost messages
public ulong[]? Msgs { get; set; }
// Go: LostStreamData.Bytes — total bytes of lost data
public ulong Bytes { get; set; }
}
// Go: server/store.go:178
/// <summary>
/// Compact state for a single subject filter within a stream.
/// Used by IStreamStore.FilteredState() and SubjectsState().
/// Mirrors Go's SimpleState struct.
/// </summary>
public record struct SimpleState
{
// Go: SimpleState.Msgs — number of messages matching the filter
public ulong Msgs { get; set; }
// Go: SimpleState.First — first sequence number matching the filter
public ulong First { get; set; }
// Go: SimpleState.Last — last sequence number matching the filter
public ulong Last { get; set; }
}

View File

@@ -138,12 +138,12 @@ public sealed class StreamManager
return true;
}
public ValueTask<StreamState> GetStateAsync(string name, CancellationToken ct)
public ValueTask<Models.ApiStreamState> GetStateAsync(string name, CancellationToken ct)
{
if (_streams.TryGetValue(name, out var stream))
return stream.Store.GetStateAsync(ct);
return ValueTask.FromResult(new StreamState());
return ValueTask.FromResult(new Models.ApiStreamState());
}
public StreamHandle? FindBySubject(string subject)

View File

@@ -209,7 +209,7 @@ internal sealed class LeaderFailoverFixture : IAsyncDisposable
return string.Empty;
}
public ValueTask<StreamState> GetStreamStateAsync(string stream)
public ValueTask<ApiStreamState> GetStreamStateAsync(string stream)
=> _streamManager.GetStateAsync(stream, default);
public MetaGroupState? GetMetaState() => _streamManager.GetMetaState();

View File

@@ -206,7 +206,7 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
return RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
}
public Task<StreamState> GetStreamStateAsync(string streamName)
public Task<ApiStreamState> GetStreamStateAsync(string streamName)
{
return _streamManager.GetStateAsync(streamName, default).AsTask();
}

View File

@@ -25,8 +25,8 @@ public class StreamStoreContractTests
return ValueTask.FromResult(_last);
}
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
=> ValueTask.FromResult(new StreamState { Messages = _last });
public ValueTask<ApiStreamState> GetStateAsync(CancellationToken ct)
=> ValueTask.FromResult(new ApiStreamState { Messages = _last });
public ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct)
=> ValueTask.FromResult<StoredMessage?>(null);