Files
natsdotnet/Documentation/JetStream/Overview.md
2026-03-13 11:34:19 -04:00

22 KiB

JetStream Overview

JetStream is the persistence layer of NATS. Clients publish to subjects that match a configured stream; the server stores those messages and delivers them to consumers on demand (pull) or proactively (push). This document describes the current .NET implementation: what is built, how the pieces connect, and where it falls short of the Go reference.


Architecture

Component diagram

NATS PUB message
        │
        ▼
JetStreamPublisher.TryCapture()
        │  duplicate check (PublishPreconditions)
        │  subject → stream lookup (StreamManager.FindBySubject)
        ▼
StreamManager.Capture()
        ├── StreamReplicaGroup.ProposeAsync()   ← in-process RAFT only
        ├── IStreamStore.AppendAsync()          ← MemStore or FileStore
        ├── EnforceLimits()                     ← MaxMsgs trim
        └── ReplicateIfConfigured()
                ├── MirrorCoordinator.OnOriginAppendAsync()   ← in-process only
                └── SourceCoordinator.OnOriginAppendAsync()   ← in-process only

$JS.API.* request
        │
        ▼
JetStreamApiRouter.Route()
        ├── $JS.API.STREAM.CREATE.*   → StreamApiHandlers.HandleCreate()  → StreamManager.CreateOrUpdate()
        ├── $JS.API.STREAM.INFO.*     → StreamApiHandlers.HandleInfo()    → StreamManager.GetInfo()
        ├── $JS.API.CONSUMER.CREATE.* → ConsumerApiHandlers.HandleCreate() → ConsumerManager.CreateOrUpdate()
        ├── $JS.API.CONSUMER.INFO.*   → ConsumerApiHandlers.HandleInfo()   → ConsumerManager.GetInfo()
        └── anything else             → JetStreamApiResponse.NotFound()

Consumer delivery
        ├── Pull: ConsumerManager.FetchAsync() → PullConsumerEngine.FetchAsync() → IStreamStore.LoadAsync()
        └── Push: ConsumerManager.OnPublished() → PushConsumerEngine.Enqueue() → ConsumerHandle.PushFrames (queue)

API dispatch

JetStreamApiRouter.Route is the single entry point for all $JS.API.* requests. It dispatches by prefix matching on the subject string:

// JetStreamApiRouter.cs
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
{
    if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal))
        return StreamApiHandlers.HandleCreate(subject, payload, _streamManager);

    if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
        return StreamApiHandlers.HandleInfo(subject, _streamManager);

    if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal))
        return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager);

    if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal))
        return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);

    return JetStreamApiResponse.NotFound(subject);
}

The stream or consumer name is the trailing token after the fixed prefix — e.g., $JS.API.STREAM.CREATE.ORDERS creates a stream named ORDERS.


API Surface

The following $JS.API.* subjects are handled. Every other subject returns a not-found error response.

Subject prefix Handler Description
$JS.API.STREAM.CREATE.<name> StreamApiHandlers.HandleCreate Create or update a stream
$JS.API.STREAM.INFO.<name> StreamApiHandlers.HandleInfo Get stream info and state
$JS.API.CONSUMER.CREATE.<stream>.<name> ConsumerApiHandlers.HandleCreate Create or update a durable consumer
$JS.API.CONSUMER.INFO.<stream>.<name> ConsumerApiHandlers.HandleInfo Get consumer info

Subjects such as $JS.API.STREAM.LIST, $JS.API.STREAM.DELETE, $JS.API.CONSUMER.LIST, $JS.API.CONSUMER.DELETE, and $JS.API.CONSUMER.PAUSE are not handled and return not-found.


Streams

StreamConfig fields

StreamConfig (src/NATS.Server/JetStream/Models/StreamConfig.cs) defines what the server stores for a stream:

Field Type Default Description
Name string "" Stream name. Required; rejected if empty or whitespace.
Subjects List<string> [] Subject filter patterns. Messages published to matching subjects are captured.
MaxMsgs int 0 Maximum number of messages to retain. 0 means unlimited. Enforced by trimming oldest messages after each append.
Replicas int 1 Number of in-process RAFT nodes to create for this stream. Has no network effect.
Mirror string? null Name of another stream in the same StreamManager to mirror from. In-process only.
Source string? null Name of another stream in the same StreamManager to source from. In-process only.

The Go reference supports many additional fields: RetentionPolicy, Storage, MaxBytes, MaxAge, MaxMsgSize, Discard, DuplicateWindow, Placement, SubjectTransform, and more. None of these are present in this implementation.

Subject matching and capture

StreamManager.FindBySubject scans all registered streams and uses SubjectMatch.MatchLiteral to find the first stream whose Subjects list matches the incoming publish subject. StreamManager.Capture then appends the message to that stream's store:

// StreamManager.cs
public PubAck? Capture(string subject, ReadOnlyMemory<byte> payload)
{
    var stream = FindBySubject(subject);
    if (stream == null)
        return null;

    if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
        _ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();

    var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
    EnforceLimits(stream);
    var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
    if (stored != null)
        ReplicateIfConfigured(stream.Config.Name, stored);

    return new PubAck { Stream = stream.Config.Name, Seq = seq };
}

EnforceLimits trims the store to MaxMsgs after each append, calling TrimToMaxMessages on MemStore or FileStore. No other limit types (MaxBytes, MaxAge) are enforced.


Consumers

ConsumerConfig fields

ConsumerConfig (src/NATS.Server/JetStream/Models/ConsumerConfig.cs) defines consumer behavior:

Field Type Default Description
DurableName string "" Consumer name. Required; rejected if empty or whitespace.
FilterSubject string? null Subject filter. Stored but not applied during fetch — all messages in the stream are delivered regardless.
AckPolicy AckPolicy None None (no ack tracking) or Explicit (pending ack tracking with redelivery).
AckWaitMs int 30000 Milliseconds before an unacknowledged message is considered expired and eligible for redelivery.
MaxDeliver int 1 Stored but not enforced — redelivery count is not capped.
Push bool false If true, the consumer receives messages via PushConsumerEngine on publish.
HeartbeatMs int 0 If positive, a heartbeat PushFrame is enqueued after each data frame. Not transmitted over the wire.

The Go reference supports additional fields: DeliverSubject, DeliverGroup, DeliverPolicy, OptStartSeq, OptStartTime, ReplayPolicy, FlowControl, IdleHeartbeat, HeadersOnly, MaxWaiting, MaxAckPending, BackOff, priority groups, and pause. None are present here.

Pull delivery

PullConsumerEngine.FetchAsync reads up to batch messages starting from consumer.NextSequence. With AckPolicy.Explicit, it first checks AckProcessor.NextExpired() and redelivers one expired message before advancing the cursor:

// PullConsumerEngine.cs
public async ValueTask<PullFetchBatch> FetchAsync(
    StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct)
{
    var messages = new List<StoredMessage>(batch);

    if (consumer.Config.AckPolicy == AckPolicy.Explicit)
    {
        var expired = consumer.AckProcessor.NextExpired();
        if (expired is { } expiredSequence)
        {
            var redelivery = await stream.Store.LoadAsync(expiredSequence, ct);
            if (redelivery != null)
                messages.Add(new StoredMessage { /* ... Redelivered = true */ });
            return new PullFetchBatch(messages);
        }

        if (consumer.AckProcessor.HasPending)
            return new PullFetchBatch(messages);
    }

    var sequence = consumer.NextSequence;
    for (var i = 0; i < batch; i++)
    {
        var message = await stream.Store.LoadAsync(sequence, ct);
        if (message == null) break;
        messages.Add(message);
        if (consumer.Config.AckPolicy == AckPolicy.Explicit)
            consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
        sequence++;
    }

    consumer.NextSequence = sequence;
    return new PullFetchBatch(messages);
}

The fetch blocks on pending acks: if any messages are registered but not yet acknowledged, no new messages are returned until either an ack is received or the deadline expires. Only one expired message is redelivered per fetch call.

Push delivery

PushConsumerEngine.Enqueue places messages onto ConsumerHandle.PushFrames, a plain Queue<PushFrame>. These frames are not transmitted to any NATS subject. ConsumerManager.ReadPushFrame allows callers to dequeue frames in-process:

// PushConsumerEngine.cs
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
{
    consumer.PushFrames.Enqueue(new PushFrame { IsData = true, Message = message });

    if (consumer.Config.AckPolicy == AckPolicy.Explicit)
        consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);

    if (consumer.Config.HeartbeatMs > 0)
        consumer.PushFrames.Enqueue(new PushFrame { IsHeartbeat = true });
}

Push delivery is not wired to the NATS protocol layer. A connected NATS client subscribing to a DeliverSubject will not receive messages from a push consumer. The queue is only accessible through ConsumerManager.ReadPushFrame.

Ack processing

AckProcessor is a per-consumer dictionary of sequence numbers to deadline timestamps. It is used by both PullConsumerEngine (to check for expired messages) and PushConsumerEngine (to register newly enqueued messages):

// AckProcessor.cs
public sealed class AckProcessor
{
    private readonly Dictionary<ulong, DateTime> _pending = new();

    public void Register(ulong sequence, int ackWaitMs)
    {
        _pending[sequence] = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1));
    }

    public ulong? NextExpired()
    {
        foreach (var (seq, deadline) in _pending)
        {
            if (DateTime.UtcNow >= deadline)
                return seq;
        }
        return null;
    }

    public bool HasPending => _pending.Count > 0;
}

Expiry detection is lazy — NextExpired() is only called from PullConsumerEngine.FetchAsync. There is no background timer or active expiry sweep. Acknowledged messages are never removed from _pending because there is no Ack(ulong sequence) method on AckProcessor. This means HasPending is always true once any message has been registered, and pending acks accumulate without bound.


Storage

IStreamStore interface

// IStreamStore.cs
public interface IStreamStore
{
    ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
    ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
    ValueTask PurgeAsync(CancellationToken ct);
    ValueTask<StreamState> GetStateAsync(CancellationToken ct);
}

AppendAsync returns the assigned sequence number. LoadAsync returns null if the sequence does not exist (trimmed or never written). The interface does not expose delete-by-sequence, range scans, or subject filtering. TrimToMaxMessages is implemented on the concrete types but is not part of the interface.

MemStore

MemStore holds all messages in a Dictionary<ulong, StoredMessage> under a single object lock. Every operation acquires that lock synchronously:

// MemStore.cs
public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
    lock (_gate)
    {
        _last++;
        _messages[_last] = new StoredMessage
        {
            Sequence = _last,
            Subject = subject,
            Payload = payload,
        };
        return ValueTask.FromResult(_last);
    }
}

TrimToMaxMessages removes entries one by one starting from the minimum key, using _messages.Keys.Min() on each iteration — O(n) per removal. This is the default store used by StreamManager.CreateOrUpdate. Messages survive only for the lifetime of the process.

FileStore

FileStore now persists messages into block files via MsgBlock, keeps a live in-memory message cache for load paths, and maintains a compact metadata index (Dictionary<ulong, StoredMessageIndex>) plus a per-subject last-sequence map for hot-path lookups such as LoadLastBySubjectAsync. Headers and payloads are stored separately and remain separate across snapshot, restore, block rewrite, and crash recovery. On startup, any legacy messages.jsonl file is migrated into block storage before recovery continues.

// FileStore.cs
private readonly Dictionary<ulong, StoredMessage> _messages = new();
private readonly Dictionary<ulong, StoredMessageIndex> _messageIndexes = new();
private readonly Dictionary<string, ulong> _lastSequenceBySubject = new(StringComparer.Ordinal);

public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
{
    if (_lastSequenceBySubject.TryGetValue(subject, out var sequence)
        && _messages.TryGetValue(sequence, out var match))
    {
        return ValueTask.FromResult<StoredMessage?>(match);
    }

    return ValueTask.FromResult<StoredMessage?>(null);
}

The current implementation is still materially simpler than Go filestore.go:

  • No synchronization: FileStore still exposes unsynchronized mutation and read paths. It is safe only under the current test and single-process usage assumptions.
  • Payloads still stay resident: the compact index removes duplicate payload ownership for metadata-heavy operations, but _messages still retains live payload bytes in memory for direct load paths.
  • No Go-equivalent block index stack: there is no per-block subject tree, mmap-backed read path, or Go-style cache/compaction parity. Deletes and trims rely on tombstones plus later block maintenance rather than Go's full production filestore behavior.

In-Process RAFT

The RAFT implementation has no network transport. All RaftNode instances live in the same process, and replication is a direct in-memory method call.

RaftNode.ProposeAsync

ProposeAsync requires the caller to be the leader (Role == RaftRole.Leader). It appends the command to the local RaftLog, calls RaftReplicator.Replicate to fan out synchronously to all peer nodes held in _cluster, and commits if a quorum of acknowledgements is reached:

// RaftNode.cs
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
{
    if (Role != RaftRole.Leader)
        throw new InvalidOperationException("Only leader can propose entries.");

    var entry = Log.Append(TermState.CurrentTerm, command);
    var followers = _cluster.Where(n => n.Id != Id).ToList();
    var acknowledgements = _replicator.Replicate(entry, followers);

    var quorum = (_cluster.Count / 2) + 1;
    if (acknowledgements + 1 >= quorum)
    {
        AppliedIndex = entry.Index;
        foreach (var node in _cluster)
            node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index);
    }

    await Task.CompletedTask;
    return entry.Index;
}

Task.CompletedTask is awaited unconditionally — the method is synchronous in practice. The log is not persisted; snapshots are stored via RaftSnapshotStore but that type's persistence behavior is not visible from RaftNode alone. Leader election uses StartElection / GrantVote / ReceiveVote, all of which are direct method calls within the same process.

StreamReplicaGroup

StreamReplicaGroup creates Math.Max(replicas, 1) RaftNode instances when a stream is created and immediately elects a leader via StartElection:

// StreamReplicaGroup.cs
public StreamReplicaGroup(string streamName, int replicas)
{
    var nodeCount = Math.Max(replicas, 1);
    _nodes = Enumerable.Range(1, nodeCount)
        .Select(i => new RaftNode($"{streamName.ToLowerInvariant()}-r{i}"))
        .ToList();

    foreach (var node in _nodes)
        node.ConfigureCluster(_nodes);

    Leader = ElectLeader(_nodes[0]);
}

ProposeAsync on the group delegates to the leader node. StepDownAsync forces a leader change by calling RequestStepDown() on the current leader and electing the next node in the list. All of this is in-process; there is no coordination across server instances.

JetStreamMetaGroup

JetStreamMetaGroup is a thin registry that tracks stream names and the declared cluster size. It does not use RaftNode internally. ProposeCreateStreamAsync records a stream name in a ConcurrentDictionary and returns immediately:

// JetStreamMetaGroup.cs
public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct)
{
    _streams[config.Name] = 0;
    return Task.CompletedTask;
}

Its purpose is to provide GetState() — a sorted list of known stream names and the configured cluster size — for monitoring or coordination callers. It does not replicate metadata across nodes.


Mirror and Source

MirrorCoordinator and SourceCoordinator are structurally identical: each holds a reference to a target IStreamStore and appends messages to it when notified of an origin append. Both operate entirely in-process within a single StreamManager:

// MirrorCoordinator.cs
public sealed class MirrorCoordinator
{
    private readonly IStreamStore _targetStore;

    public MirrorCoordinator(IStreamStore targetStore) { _targetStore = targetStore; }

    public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
        => _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
}

StreamManager.RebuildReplicationCoordinators rebuilds the coordinator lists whenever a stream is created or updated. A stream configured with Mirror = "ORDERS" receives a copy of every message appended to ORDERS, but only if ORDERS exists in the same StreamManager instance. There is no subscription to an external NATS subject, no replay of historical messages on coordinator setup, and no cross-server replication.


Configuration

JetStreamOptions (src/NATS.Server/Configuration/JetStreamOptions.cs) holds the configuration model for JetStream:

Field Type Default Description
StoreDir string "" Directory path for FileStore. Not currently used to switch the default store; StreamManager.CreateOrUpdate always allocates a MemStore.
MaxMemoryStore long 0 Maximum bytes for in-memory storage. Not enforced.
MaxFileStore long 0 Maximum bytes for file storage. Not enforced.

None of the three fields currently affect runtime behavior. StoreDir would need to be wired into StreamManager to cause FileStore allocation. MaxMemoryStore and MaxFileStore have no enforcement path.


What Is Not Implemented

The following features are present in the Go reference (golang/nats-server/server/) but absent from this implementation:

  • Stream delete and update: $JS.API.STREAM.DELETE.* and $JS.API.STREAM.UPDATE.* are not handled. CreateOrUpdate accepts updates but there is no delete path.
  • Stream list: $JS.API.STREAM.LIST and $JS.API.STREAM.NAMES return not-found.
  • Consumer delete, list, and pause: $JS.API.CONSUMER.DELETE.*, $JS.API.CONSUMER.LIST.*, and $JS.API.CONSUMER.PAUSE.* are not handled.
  • Retention policies: Only MaxMsgs trimming is enforced. Limits, Interest, and WorkQueue retention semantics are not implemented. MaxBytes and MaxAge are not enforced.
  • Ephemeral consumers: ConsumerManager.CreateOrUpdate requires a non-empty DurableName. There is no support for unnamed ephemeral consumers.
  • Push delivery over the NATS wire: Push consumers enqueue PushFrame objects into an in-memory queue. No MSG is written to any connected NATS client's TCP socket.
  • Consumer filter subject enforcement: FilterSubject is stored on ConsumerConfig but is never applied in PullConsumerEngine.FetchAsync. All messages in the stream are returned regardless of filter.
  • FileStore production safety: FileStore now uses block files and compact metadata indexes, but it still lacks synchronization and Go-level block indexing, so it remains unsuitable for production use.
  • RAFT persistence and networking: RaftNode log entries are not persisted across restarts. Replication uses direct in-process method calls; there is no network transport for multi-server consensus.
  • Cross-server replication: Mirror and source coordinators work only within one StreamManager in one process. Messages published on a remote server are not replicated.
  • Duplicate message window: PublishPreconditions tracks message IDs for deduplication but there is no configurable DuplicateWindow TTL to expire old IDs.
  • Subject transforms, placement, and mirroring policies: None of the stream configuration fields beyond Name, Subjects, MaxMsgs, Replicas, Mirror, and Source are processed.