Files
2026-03-13 11:34:19 -04:00

452 lines
22 KiB
Markdown

# JetStream Overview
JetStream is the persistence layer of NATS. Clients publish to subjects that match a configured stream; the server stores those messages and delivers them to consumers on demand (pull) or proactively (push). This document describes the current .NET implementation: what is built, how the pieces connect, and where it falls short of the Go reference.
---
## Architecture
### Component diagram
```
NATS PUB message
JetStreamPublisher.TryCapture()
│ duplicate check (PublishPreconditions)
│ subject → stream lookup (StreamManager.FindBySubject)
StreamManager.Capture()
├── StreamReplicaGroup.ProposeAsync() ← in-process RAFT only
├── IStreamStore.AppendAsync() ← MemStore or FileStore
├── EnforceLimits() ← MaxMsgs trim
└── ReplicateIfConfigured()
├── MirrorCoordinator.OnOriginAppendAsync() ← in-process only
└── SourceCoordinator.OnOriginAppendAsync() ← in-process only
$JS.API.* request
JetStreamApiRouter.Route()
├── $JS.API.STREAM.CREATE.* → StreamApiHandlers.HandleCreate() → StreamManager.CreateOrUpdate()
├── $JS.API.STREAM.INFO.* → StreamApiHandlers.HandleInfo() → StreamManager.GetInfo()
├── $JS.API.CONSUMER.CREATE.* → ConsumerApiHandlers.HandleCreate() → ConsumerManager.CreateOrUpdate()
├── $JS.API.CONSUMER.INFO.* → ConsumerApiHandlers.HandleInfo() → ConsumerManager.GetInfo()
└── anything else → JetStreamApiResponse.NotFound()
Consumer delivery
├── Pull: ConsumerManager.FetchAsync() → PullConsumerEngine.FetchAsync() → IStreamStore.LoadAsync()
└── Push: ConsumerManager.OnPublished() → PushConsumerEngine.Enqueue() → ConsumerHandle.PushFrames (queue)
```
### API dispatch
`JetStreamApiRouter.Route` is the single entry point for all `$JS.API.*` requests. It dispatches by prefix matching on the subject string:
```csharp
// JetStreamApiRouter.cs
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
{
if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal))
return StreamApiHandlers.HandleCreate(subject, payload, _streamManager);
if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
return StreamApiHandlers.HandleInfo(subject, _streamManager);
if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal))
return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager);
if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal))
return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);
return JetStreamApiResponse.NotFound(subject);
}
```
The stream or consumer name is the trailing token after the fixed prefix — e.g., `$JS.API.STREAM.CREATE.ORDERS` creates a stream named `ORDERS`.
---
## API Surface
The following `$JS.API.*` subjects are handled. Every other subject returns a not-found error response.
| Subject prefix | Handler | Description |
|---|---|---|
| `$JS.API.STREAM.CREATE.<name>` | `StreamApiHandlers.HandleCreate` | Create or update a stream |
| `$JS.API.STREAM.INFO.<name>` | `StreamApiHandlers.HandleInfo` | Get stream info and state |
| `$JS.API.CONSUMER.CREATE.<stream>.<name>` | `ConsumerApiHandlers.HandleCreate` | Create or update a durable consumer |
| `$JS.API.CONSUMER.INFO.<stream>.<name>` | `ConsumerApiHandlers.HandleInfo` | Get consumer info |
Subjects such as `$JS.API.STREAM.LIST`, `$JS.API.STREAM.DELETE`, `$JS.API.CONSUMER.LIST`, `$JS.API.CONSUMER.DELETE`, and `$JS.API.CONSUMER.PAUSE` are not handled and return not-found.
---
## Streams
### StreamConfig fields
`StreamConfig` (`src/NATS.Server/JetStream/Models/StreamConfig.cs`) defines what the server stores for a stream:
| Field | Type | Default | Description |
|---|---|---|---|
| `Name` | `string` | `""` | Stream name. Required; rejected if empty or whitespace. |
| `Subjects` | `List<string>` | `[]` | Subject filter patterns. Messages published to matching subjects are captured. |
| `MaxMsgs` | `int` | `0` | Maximum number of messages to retain. `0` means unlimited. Enforced by trimming oldest messages after each append. |
| `Replicas` | `int` | `1` | Number of in-process RAFT nodes to create for this stream. Has no network effect. |
| `Mirror` | `string?` | `null` | Name of another stream in the same `StreamManager` to mirror from. In-process only. |
| `Source` | `string?` | `null` | Name of another stream in the same `StreamManager` to source from. In-process only. |
The Go reference supports many additional fields: `RetentionPolicy`, `Storage`, `MaxBytes`, `MaxAge`, `MaxMsgSize`, `Discard`, `DuplicateWindow`, `Placement`, `SubjectTransform`, and more. None of these are present in this implementation.
### Subject matching and capture
`StreamManager.FindBySubject` scans all registered streams and uses `SubjectMatch.MatchLiteral` to find the first stream whose `Subjects` list matches the incoming publish subject. `StreamManager.Capture` then appends the message to that stream's store:
```csharp
// StreamManager.cs
public PubAck? Capture(string subject, ReadOnlyMemory<byte> payload)
{
var stream = FindBySubject(subject);
if (stream == null)
return null;
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
EnforceLimits(stream);
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
return new PubAck { Stream = stream.Config.Name, Seq = seq };
}
```
`EnforceLimits` trims the store to `MaxMsgs` after each append, calling `TrimToMaxMessages` on `MemStore` or `FileStore`. No other limit types (`MaxBytes`, `MaxAge`) are enforced.
---
## Consumers
### ConsumerConfig fields
`ConsumerConfig` (`src/NATS.Server/JetStream/Models/ConsumerConfig.cs`) defines consumer behavior:
| Field | Type | Default | Description |
|---|---|---|---|
| `DurableName` | `string` | `""` | Consumer name. Required; rejected if empty or whitespace. |
| `FilterSubject` | `string?` | `null` | Subject filter. Stored but not applied during fetch — all messages in the stream are delivered regardless. |
| `AckPolicy` | `AckPolicy` | `None` | `None` (no ack tracking) or `Explicit` (pending ack tracking with redelivery). |
| `AckWaitMs` | `int` | `30000` | Milliseconds before an unacknowledged message is considered expired and eligible for redelivery. |
| `MaxDeliver` | `int` | `1` | Stored but not enforced — redelivery count is not capped. |
| `Push` | `bool` | `false` | If `true`, the consumer receives messages via `PushConsumerEngine` on publish. |
| `HeartbeatMs` | `int` | `0` | If positive, a heartbeat `PushFrame` is enqueued after each data frame. Not transmitted over the wire. |
The Go reference supports additional fields: `DeliverSubject`, `DeliverGroup`, `DeliverPolicy`, `OptStartSeq`, `OptStartTime`, `ReplayPolicy`, `FlowControl`, `IdleHeartbeat`, `HeadersOnly`, `MaxWaiting`, `MaxAckPending`, `BackOff`, priority groups, and pause. None are present here.
### Pull delivery
`PullConsumerEngine.FetchAsync` reads up to `batch` messages starting from `consumer.NextSequence`. With `AckPolicy.Explicit`, it first checks `AckProcessor.NextExpired()` and redelivers one expired message before advancing the cursor:
```csharp
// PullConsumerEngine.cs
public async ValueTask<PullFetchBatch> FetchAsync(
StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct)
{
var messages = new List<StoredMessage>(batch);
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
{
var expired = consumer.AckProcessor.NextExpired();
if (expired is { } expiredSequence)
{
var redelivery = await stream.Store.LoadAsync(expiredSequence, ct);
if (redelivery != null)
messages.Add(new StoredMessage { /* ... Redelivered = true */ });
return new PullFetchBatch(messages);
}
if (consumer.AckProcessor.HasPending)
return new PullFetchBatch(messages);
}
var sequence = consumer.NextSequence;
for (var i = 0; i < batch; i++)
{
var message = await stream.Store.LoadAsync(sequence, ct);
if (message == null) break;
messages.Add(message);
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
sequence++;
}
consumer.NextSequence = sequence;
return new PullFetchBatch(messages);
}
```
The fetch blocks on pending acks: if any messages are registered but not yet acknowledged, no new messages are returned until either an ack is received or the deadline expires. Only one expired message is redelivered per fetch call.
### Push delivery
`PushConsumerEngine.Enqueue` places messages onto `ConsumerHandle.PushFrames`, a plain `Queue<PushFrame>`. These frames are not transmitted to any NATS subject. `ConsumerManager.ReadPushFrame` allows callers to dequeue frames in-process:
```csharp
// PushConsumerEngine.cs
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
{
consumer.PushFrames.Enqueue(new PushFrame { IsData = true, Message = message });
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
if (consumer.Config.HeartbeatMs > 0)
consumer.PushFrames.Enqueue(new PushFrame { IsHeartbeat = true });
}
```
Push delivery is not wired to the NATS protocol layer. A connected NATS client subscribing to a `DeliverSubject` will not receive messages from a push consumer. The queue is only accessible through `ConsumerManager.ReadPushFrame`.
### Ack processing
`AckProcessor` is a per-consumer dictionary of sequence numbers to deadline timestamps. It is used by both `PullConsumerEngine` (to check for expired messages) and `PushConsumerEngine` (to register newly enqueued messages):
```csharp
// AckProcessor.cs
public sealed class AckProcessor
{
private readonly Dictionary<ulong, DateTime> _pending = new();
public void Register(ulong sequence, int ackWaitMs)
{
_pending[sequence] = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1));
}
public ulong? NextExpired()
{
foreach (var (seq, deadline) in _pending)
{
if (DateTime.UtcNow >= deadline)
return seq;
}
return null;
}
public bool HasPending => _pending.Count > 0;
}
```
Expiry detection is lazy — `NextExpired()` is only called from `PullConsumerEngine.FetchAsync`. There is no background timer or active expiry sweep. Acknowledged messages are never removed from `_pending` because there is no `Ack(ulong sequence)` method on `AckProcessor`. This means `HasPending` is always `true` once any message has been registered, and pending acks accumulate without bound.
---
## Storage
### IStreamStore interface
```csharp
// IStreamStore.cs
public interface IStreamStore
{
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
ValueTask PurgeAsync(CancellationToken ct);
ValueTask<StreamState> GetStateAsync(CancellationToken ct);
}
```
`AppendAsync` returns the assigned sequence number. `LoadAsync` returns `null` if the sequence does not exist (trimmed or never written). The interface does not expose delete-by-sequence, range scans, or subject filtering. `TrimToMaxMessages` is implemented on the concrete types but is not part of the interface.
### MemStore
`MemStore` holds all messages in a `Dictionary<ulong, StoredMessage>` under a single `object` lock. Every operation acquires that lock synchronously:
```csharp
// MemStore.cs
public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
lock (_gate)
{
_last++;
_messages[_last] = new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = payload,
};
return ValueTask.FromResult(_last);
}
}
```
`TrimToMaxMessages` removes entries one by one starting from the minimum key, using `_messages.Keys.Min()` on each iteration — O(n) per removal. This is the default store used by `StreamManager.CreateOrUpdate`. Messages survive only for the lifetime of the process.
### FileStore
`FileStore` now persists messages into block files via `MsgBlock`, keeps a live in-memory message cache for load paths, and maintains a compact metadata index (`Dictionary<ulong, StoredMessageIndex>`) plus a per-subject last-sequence map for hot-path lookups such as `LoadLastBySubjectAsync`. Headers and payloads are stored separately and remain separate across snapshot, restore, block rewrite, and crash recovery. On startup, any legacy `messages.jsonl` file is migrated into block storage before recovery continues.
```csharp
// FileStore.cs
private readonly Dictionary<ulong, StoredMessage> _messages = new();
private readonly Dictionary<ulong, StoredMessageIndex> _messageIndexes = new();
private readonly Dictionary<string, ulong> _lastSequenceBySubject = new(StringComparer.Ordinal);
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
{
if (_lastSequenceBySubject.TryGetValue(subject, out var sequence)
&& _messages.TryGetValue(sequence, out var match))
{
return ValueTask.FromResult<StoredMessage?>(match);
}
return ValueTask.FromResult<StoredMessage?>(null);
}
```
The current implementation is still materially simpler than Go `filestore.go`:
- **No synchronization**: `FileStore` still exposes unsynchronized mutation and read paths. It is safe only under the current test and single-process usage assumptions.
- **Payloads still stay resident**: the compact index removes duplicate payload ownership for metadata-heavy operations, but `_messages` still retains live payload bytes in memory for direct load paths.
- **No Go-equivalent block index stack**: there is no per-block subject tree, mmap-backed read path, or Go-style cache/compaction parity. Deletes and trims rely on tombstones plus later block maintenance rather than Go's full production filestore behavior.
---
## In-Process RAFT
The RAFT implementation has no network transport. All `RaftNode` instances live in the same process, and replication is a direct in-memory method call.
### RaftNode.ProposeAsync
`ProposeAsync` requires the caller to be the leader (`Role == RaftRole.Leader`). It appends the command to the local `RaftLog`, calls `RaftReplicator.Replicate` to fan out synchronously to all peer nodes held in `_cluster`, and commits if a quorum of acknowledgements is reached:
```csharp
// RaftNode.cs
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
{
if (Role != RaftRole.Leader)
throw new InvalidOperationException("Only leader can propose entries.");
var entry = Log.Append(TermState.CurrentTerm, command);
var followers = _cluster.Where(n => n.Id != Id).ToList();
var acknowledgements = _replicator.Replicate(entry, followers);
var quorum = (_cluster.Count / 2) + 1;
if (acknowledgements + 1 >= quorum)
{
AppliedIndex = entry.Index;
foreach (var node in _cluster)
node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index);
}
await Task.CompletedTask;
return entry.Index;
}
```
`Task.CompletedTask` is awaited unconditionally — the method is synchronous in practice. The log is not persisted; snapshots are stored via `RaftSnapshotStore` but that type's persistence behavior is not visible from `RaftNode` alone. Leader election uses `StartElection` / `GrantVote` / `ReceiveVote`, all of which are direct method calls within the same process.
### StreamReplicaGroup
`StreamReplicaGroup` creates `Math.Max(replicas, 1)` `RaftNode` instances when a stream is created and immediately elects a leader via `StartElection`:
```csharp
// StreamReplicaGroup.cs
public StreamReplicaGroup(string streamName, int replicas)
{
var nodeCount = Math.Max(replicas, 1);
_nodes = Enumerable.Range(1, nodeCount)
.Select(i => new RaftNode($"{streamName.ToLowerInvariant()}-r{i}"))
.ToList();
foreach (var node in _nodes)
node.ConfigureCluster(_nodes);
Leader = ElectLeader(_nodes[0]);
}
```
`ProposeAsync` on the group delegates to the leader node. `StepDownAsync` forces a leader change by calling `RequestStepDown()` on the current leader and electing the next node in the list. All of this is in-process; there is no coordination across server instances.
### JetStreamMetaGroup
`JetStreamMetaGroup` is a thin registry that tracks stream names and the declared cluster size. It does not use `RaftNode` internally. `ProposeCreateStreamAsync` records a stream name in a `ConcurrentDictionary` and returns immediately:
```csharp
// JetStreamMetaGroup.cs
public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct)
{
_streams[config.Name] = 0;
return Task.CompletedTask;
}
```
Its purpose is to provide `GetState()` — a sorted list of known stream names and the configured cluster size — for monitoring or coordination callers. It does not replicate metadata across nodes.
---
## Mirror and Source
`MirrorCoordinator` and `SourceCoordinator` are structurally identical: each holds a reference to a target `IStreamStore` and appends messages to it when notified of an origin append. Both operate entirely in-process within a single `StreamManager`:
```csharp
// MirrorCoordinator.cs
public sealed class MirrorCoordinator
{
private readonly IStreamStore _targetStore;
public MirrorCoordinator(IStreamStore targetStore) { _targetStore = targetStore; }
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
}
```
`StreamManager.RebuildReplicationCoordinators` rebuilds the coordinator lists whenever a stream is created or updated. A stream configured with `Mirror = "ORDERS"` receives a copy of every message appended to `ORDERS`, but only if `ORDERS` exists in the same `StreamManager` instance. There is no subscription to an external NATS subject, no replay of historical messages on coordinator setup, and no cross-server replication.
---
## Configuration
`JetStreamOptions` (`src/NATS.Server/Configuration/JetStreamOptions.cs`) holds the configuration model for JetStream:
| Field | Type | Default | Description |
|---|---|---|---|
| `StoreDir` | `string` | `""` | Directory path for `FileStore`. Not currently used to switch the default store; `StreamManager.CreateOrUpdate` always allocates a `MemStore`. |
| `MaxMemoryStore` | `long` | `0` | Maximum bytes for in-memory storage. Not enforced. |
| `MaxFileStore` | `long` | `0` | Maximum bytes for file storage. Not enforced. |
None of the three fields currently affect runtime behavior. `StoreDir` would need to be wired into `StreamManager` to cause `FileStore` allocation. `MaxMemoryStore` and `MaxFileStore` have no enforcement path.
---
## What Is Not Implemented
The following features are present in the Go reference (`golang/nats-server/server/`) but absent from this implementation:
- **Stream delete and update**: `$JS.API.STREAM.DELETE.*` and `$JS.API.STREAM.UPDATE.*` are not handled. `CreateOrUpdate` accepts updates but there is no delete path.
- **Stream list**: `$JS.API.STREAM.LIST` and `$JS.API.STREAM.NAMES` return not-found.
- **Consumer delete, list, and pause**: `$JS.API.CONSUMER.DELETE.*`, `$JS.API.CONSUMER.LIST.*`, and `$JS.API.CONSUMER.PAUSE.*` are not handled.
- **Retention policies**: Only `MaxMsgs` trimming is enforced. `Limits`, `Interest`, and `WorkQueue` retention semantics are not implemented. `MaxBytes` and `MaxAge` are not enforced.
- **Ephemeral consumers**: `ConsumerManager.CreateOrUpdate` requires a non-empty `DurableName`. There is no support for unnamed ephemeral consumers.
- **Push delivery over the NATS wire**: Push consumers enqueue `PushFrame` objects into an in-memory queue. No MSG is written to any connected NATS client's TCP socket.
- **Consumer filter subject enforcement**: `FilterSubject` is stored on `ConsumerConfig` but is never applied in `PullConsumerEngine.FetchAsync`. All messages in the stream are returned regardless of filter.
- **FileStore production safety**: `FileStore` now uses block files and compact metadata indexes, but it still lacks synchronization and Go-level block indexing, so it remains unsuitable for production use.
- **RAFT persistence and networking**: `RaftNode` log entries are not persisted across restarts. Replication uses direct in-process method calls; there is no network transport for multi-server consensus.
- **Cross-server replication**: Mirror and source coordinators work only within one `StreamManager` in one process. Messages published on a remote server are not replicated.
- **Duplicate message window**: `PublishPreconditions` tracks message IDs for deduplication but there is no configurable `DuplicateWindow` TTL to expire old IDs.
- **Subject transforms, placement, and mirroring policies**: None of the stream configuration fields beyond `Name`, `Subjects`, `MaxMsgs`, `Replicas`, `Mirror`, and `Source` are processed.
---
## Related Documentation
- [Server Overview](../Server/Overview.md)
- [Subscriptions Overview](../Subscriptions/Overview.md)
- [Configuration Overview](../Configuration/Overview.md)
- [Protocol Overview](../Protocol/Overview.md)
<!-- Last verified against codebase: 2026-03-13 -->