docs: fresh gap analysis and production gaps design

Rewrote structuregaps.md with function-by-function gap analysis
comparing current .NET implementation against Go reference.
Added design doc for Tier 1+2 gap closure across 4 phases:
FileStore+RAFT → JetStream Cluster → Consumer/Stream → Client/MQTT/Config.
This commit is contained in:
Joseph Doherty
2026-02-25 00:09:54 -05:00
parent 9bea1956af
commit f03e72654f
2 changed files with 1309 additions and 337 deletions

View File

@@ -0,0 +1,494 @@
# Design: Production Gap Closure (Tier 1 + Tier 2)
**Date**: 2026-02-25
**Scope**: 15 gaps across FileStore, RAFT, JetStream Cluster, Consumer/Stream, Client, MQTT, Config, Networking
**Approach**: Bottom-up by dependency (Phase 1→2→3→4)
**Codex review**: Verified 2026-02-25 — feedback incorporated
## Overview
This design covers all CRITICAL and HIGH severity gaps identified in the fresh scan of `docs/structuregaps.md` (2026-02-24). The implementation follows a bottom-up dependency approach where foundational layers (storage, consensus) are completed before higher-level features (cluster coordination, consumers, client protocol).
### Codex Feedback (Incorporated)
1. RAFT WAL is the hardest dependency — kept first in Phase 1
2. Meta snapshot encode/decode moved earlier in Phase 2 (before monitor loops)
3. Ack/NAK processing moved before/alongside delivery loop — foundational to correctness
4. Exit gates added per phase with measurable criteria
5. Binary formats treated as versioned contracts with golden tests
6. Feature flags for major subsystems recommended
---
## Phase 1: FileStore + RAFT Foundation
**Dependencies**: None (pure infrastructure)
**Exit gate**: All IStreamStore methods implemented, FileStore round-trips encrypted+compressed blocks, RAFT persists and recovers state across restarts, joint consensus passes membership change tests.
### 1A: FileStore Block Lifecycle & Encryption/Compression
**Goal**: Wire AeadEncryptor and S2Codec into block I/O, add automatic rotation, crash recovery.
**MsgBlock.cs changes:**
- New fields: `byte[]? _aek` (per-block AEAD key), `bool _compressed`, `byte[] _lastChecksum`
- `Seal()` — marks block read-only, releases write handle
- `EncryptOnDisk(AeadEncryptor, key)` — encrypts block data at rest
- `DecryptOnRead(AeadEncryptor, key)` — decrypts block data on load
- `CompressRecord(S2Codec, payload)``byte[]` — per-message compression
- `DecompressRecord(S2Codec, data)``byte[]` — per-message decompression
- `ComputeChecksum(data)``byte[8]` — highway hash
- `ValidateChecksum(data, expected)``bool`
**FileStore.cs changes:**
- `AppendAsync()` → triggers `RotateBlock()` when `_activeBlock.BytesWritten >= _options.BlockSize`
- `RotateBlock()` — seals active block, creates new with `_nextBlockId++`
- Enhanced `RecoverBlocks()` — rebuild TTL wheel, scan tombstones, validate checksums
- `RecoverTtlState()` — scan all blocks, re-register unexpired messages in `_ttlWheel`
- `WriteStateAtomically(path, data)` — temp file + `File.Move(temp, target, overwrite: true)`
- `GenEncryptionKeys()` → per-block key derivation from stream encryption key
- `RecoverAek(blockId)` → recover per-block key from metadata file
**New file: `HighwayHash.cs`** — .NET highway hash implementation or NuGet wrapper.
**Go reference**: `filestore.go` lines 8161407 (encryption), 17542401 (recovery), 44434477 (cache), 57835842 (flusher).
### 1B: Missing IStreamStore Methods
Implement 15+ methods currently throwing `NotSupportedException`:
| Method | Strategy | Go ref |
|--------|----------|--------|
| `StoreRawMsg` | Append with caller seq/ts, bypass auto-increment | 4759 |
| `LoadNextMsgMulti` | Multi-filter scan across blocks | 8426 |
| `LoadPrevMsg` | Reverse scan from start sequence | 8580 |
| `LoadPrevMsgMulti` | Reverse multi-filter scan | 8634 |
| `NumPendingMulti` | Count with filter set | 4051 |
| `EncodedStreamState` | Binary serialize (versioned format) | 10599 |
| `Utilization` | bytes_used / bytes_allocated | 8758 |
| `FlushAllPending` | Force-flush active block to disk | — |
| `RegisterStorageUpdates` | Callback for change notifications | 4412 |
| `RegisterStorageRemoveMsg` | Callback for removal events | 4424 |
| `RegisterProcessJetStreamMsg` | Callback for JS processing | 4431 |
| `ResetState` | Clear caches post-NRG catchup | 8788 |
| `UpdateConfig` | Apply block size, retention, limits | 655 |
| `Delete(inline)` | Stop + delete all block files | — |
| `Stop()` | Flush + close all FDs | — |
### 1C: RAFT Persistent WAL
**RaftLog.cs changes:**
- Replace `List<RaftLogEntry>` backing with binary WAL file
- Record format: `[4-byte length][entry bytes]` (length-prefixed)
- Entry serialization: `[8-byte index][4-byte term][payload]`
- `Append()` → append to WAL file, increment in-memory index
- `SyncAsync()``FileStream.FlushAsync()` for durability
- `LoadAsync()` → scan WAL, rebuild in-memory index, validate
- `Compact(upToIndex)` → rewrite WAL from upToIndex+1 onward
- Version header at WAL file start for forward compatibility
**RaftNode.cs changes:**
- Wire `_persistDirectory` into disk operations
- Persist term/vote in `meta.json`: `{ "term": N, "votedFor": "id" }`
- Startup: load WAL → load snapshot → apply entries after snapshot index
- `PersistTermAsync()` — called on term/vote changes
**Go reference**: `raft.go` lines 10001100 (snapshot streaming), WAL patterns throughout.
### 1D: RAFT Joint Consensus
**RaftNode.cs changes:**
- New field: `HashSet<string>? _jointNewMembers` — Cnew during transition
- `ProposeAddPeer(peerId)`:
1. Propose joint entry (Cold Cnew) to log
2. Wait for commit
3. Propose final Cnew entry
4. Wait for commit
- `ProposeRemovePeer(peerId)` — same two-phase pattern
- `CalculateQuorum()`:
- Normal: `majority(_members)`
- Joint: `max(majority(Cold), majority(Cnew))`
- `ApplyMembershipChange(entry)`:
- If joint entry: set `_jointNewMembers`
- If final entry: replace `_members` with Cnew, clear `_jointNewMembers`
**Go reference**: `raft.go` Section 4 (Raft paper), membership change handling.
---
## Phase 2: JetStream Cluster Coordination
**Dependencies**: Phase 1 (RAFT persistence, FileStore)
**Exit gate**: Meta-group processes stream/consumer assignments via RAFT, snapshots encode/decode round-trip, leadership transitions handled correctly, orphan detection works.
### 2A: Meta Snapshot Encoding/Decoding (moved earlier per Codex)
**New file: `MetaSnapshotCodec.cs`**
- `Encode(Dictionary<string, Dictionary<string, StreamAssignment>> assignments)``byte[]`
- Uses S2 compression for wire efficiency
- Versioned format header for compatibility
- `Decode(byte[])` → assignment maps
- Golden tests with known-good snapshots
### 2B: Cluster Monitoring Loop
**New class: `JetStreamClusterMonitor`** (implements `IHostedService`)
- Long-running `ExecuteAsync()` loop consuming `Channel<RaftLogEntry>` from meta RAFT node
- `ApplyMetaEntry(entry)` → dispatch by op type:
- `assignStreamOp``ProcessStreamAssignment()`
- `assignConsumerOp``ProcessConsumerAssignment()`
- `removeStreamOp``ProcessStreamRemoval()`
- `removeConsumerOp``ProcessConsumerRemoval()`
- `updateStreamOp``ProcessUpdateStreamAssignment()`
- `EntrySnapshot``ApplyMetaSnapshot()`
- `EntryAddPeer`/`EntryRemovePeer` → peer management
- Periodic snapshot: 1-minute interval, 15s on errors
- Orphan check: 30s post-recovery scan
- Recovery state machine: deferred add/remove during catchup
### 2C: Stream/Consumer Assignment Processing
**Enhance `JetStreamMetaGroup.cs`:**
- `ProcessStreamAssignment(sa)` — validate config, check account limits, create stream on this node if member of replica group, respond to inflight request
- `ProcessConsumerAssignment(ca)` — validate, create consumer, bind to stream
- `ProcessStreamRemoval(sa)` — cleanup local resources, delete RAFT group
- `ProcessConsumerRemoval(ca)` — cleanup consumer state
- `ProcessUpdateStreamAssignment(sa)` — apply config changes, handle replica scaling
### 2D: Inflight Tracking Enhancement
**Change inflight maps from:**
```csharp
ConcurrentDictionary<string, string> _inflightStreams
```
**To:**
```csharp
record InflightInfo(int OpsCount, bool Deleted, StreamAssignment? Assignment);
ConcurrentDictionary<string, Dictionary<string, InflightInfo>> _inflightStreams; // account → stream → info
```
- `TrackInflightStreamProposal(account, stream, assignment)` — increment ops, store
- `RemoveInflightStreamProposal(account, stream)` — decrement, clear when zero
- Same pattern for consumers
### 2E: Leadership Transition
**Add to `JetStreamClusterMonitor`:**
- `ProcessLeaderChange(isLeader)`:
- Clear all inflight proposal maps
- If becoming leader: start update subscriptions, publish domain leader advisory
- If losing leadership: stop update subscriptions
- `ProcessStreamLeaderChange(stream, isLeader)`:
- Leader: start monitor, set sync subjects, begin replication
- Follower: stop local delivery
- `ProcessConsumerLeaderChange(consumer, isLeader)`:
- Leader: start delivery engine
- Follower: stop delivery
### 2F: Per-Stream/Consumer Monitor Loops
**New class: `StreamMonitor`** — per-stream background task:
- Snapshot timing: 2-minute interval + random jitter
- State change detection via hash comparison
- Recovery timeout: escalate after N failures
- Compaction trigger after snapshot
**New class: `ConsumerMonitor`** — per-consumer background task:
- Hash-based state change detection
- Timeout tracking for stalled consumers
- Migration detection (R1 → R3 scale-up)
---
## Phase 3: Consumer + Stream Engines
**Dependencies**: Phase 1 (storage), Phase 2 (cluster coordination)
**Exit gate**: Consumer delivery loop correctly delivers messages with redelivery, pull requests expire and respect batch/maxBytes, purge with subject filtering works, mirror/source retry recovers from failures.
### 3A: Ack/NAK Processing (moved before delivery loop per Codex)
**Enhance `AckProcessor.cs`:**
- Background subscription consuming from consumer's ack subject
- Parse ack types:
- `+ACK` — acknowledge message, remove from pending
- `-NAK` — negative ack, optional `{delay}` for redelivery schedule
- `-NAK {"delay": "2s", "backoff": [1,2,4]}` — with backoff schedule
- `+TERM` — terminate, remove from pending, no redeliver
- `+WPI` — work in progress, extend ack wait deadline
- Track `deliveryCount` per message sequence
- Max deliveries enforcement: advisory on exceed
**Enhance `RedeliveryTracker.cs`:**
- Replace internals with `PriorityQueue<ulong, DateTimeOffset>` (min-heap by deadline)
- `Schedule(seq, delay)` — insert with deadline
- `TryGetNextReady(now)` → sequence past deadline, or null
- `BackoffSchedule` support: delay array indexed by delivery count
- Rate limiting: configurable max redeliveries per tick
### 3B: Consumer Delivery Loop
**New class: `ConsumerDeliveryLoop`** — shared between push and pull:
- Background `Task` running while consumer is active
- Main loop:
1. Check redelivery queue first (priority over new messages)
2. Poll `IStreamStore.LoadNextMsg()` for next matching message
3. Check `MaxAckPending` — block if pending >= limit
4. Deliver message (push: to deliver subject; pull: to request reply subject)
5. Register in pending map with ack deadline
6. Update stats (delivered count, bytes)
**Push mode integration:**
- Delivers to `DeliverSubject` directly
- Idle heartbeat: sends when no messages for `IdleHeartbeat` duration
- Flow control: `Nats-Consumer-Stalled` header when pending > threshold
**Pull mode integration:**
- Consumes from `WaitingRequestQueue`
- Delivers to request's `ReplyTo` subject
- Respects `Batch`, `MaxBytes`, `NoWait` per request
### 3C: Pull Request Pipeline
**New class: `WaitingRequestQueue`:**
```csharp
record PullRequest(string ReplyTo, int Batch, long MaxBytes, DateTimeOffset Expires, bool NoWait, string? PinId);
```
- Internal `LinkedList<PullRequest>` ordered by arrival
- `Enqueue(request)` — adds, starts expiry timer
- `TryDequeue()` → next request with remaining capacity
- `RemoveExpired(now)` — periodic cleanup
- `Count`, `IsEmpty` properties
### 3D: Priority Group Pinning
**Enhance `PriorityGroupManager.cs`:**
- `AssignPinId()` → generates NUID string, starts TTL timer
- `ValidatePinId(pinId)` → checks validity
- `UnassignPinId()` → clears, publishes advisory
- `PinnedTtl` configurable timeout
- Integrates with `WaitingRequestQueue` for pin-aware dispatch
### 3E: Consumer Pause/Resume
**Add to `ConsumerManager.cs`:**
- `PauseUntil` field on consumer state
- `PauseAsync(until)`:
- Set deadline, stop delivery loop
- Publish `$JS.EVENT.ADVISORY.CONSUMER.PAUSED`
- Start auto-resume timer
- `ResumeAsync()`:
- Clear deadline, restart delivery loop
- Publish resume advisory
### 3F: Mirror/Source Retry
**Enhance `MirrorCoordinator.cs`:**
- Retry loop: exponential backoff 1s → 30s with jitter
- `SetupMirrorConsumer()` → create ephemeral consumer on source via JetStream API
- Sequence tracking: `_sourceSeq` (last seen), `_destSeq` (last stored)
- Gap detection: if source sequence jumps, log advisory
- `SetMirrorErr(err)` → publishes error state advisory
**Enhance `SourceCoordinator.cs`:**
- Same retry pattern
- Per-source consumer setup/teardown
- Subject filter + transform before storing
- Dedup window: `Dictionary<string, DateTimeOffset>` keyed by Nats-Msg-Id
- Time-based pruning of dedup window
### 3G: Stream Purge with Filtering
**Enhance `StreamManager.cs`:**
- `PurgeEx(subject, seq, keep)`:
- `subject` filter: iterate messages, purge only matching subject pattern
- `seq` range: purge messages with seq < specified
- `keep`: retain N most recent (purge all but last N)
- After purge: cascade to consumers (clear pending for purged sequences)
### 3H: Interest Retention
**New class: `InterestRetentionPolicy`:**
- Track which consumers have interest in each message
- `RegisterInterest(consumerName, filterSubject)` — consumer subscribes
- `AcknowledgeDelivery(consumerName, seq)` — consumer acked
- `ShouldRetain(seq)``false` when all interested consumers have acked
- Periodic scan for removable messages
---
## Phase 4: Client Performance, MQTT, Config, Networking
**Dependencies**: Phase 1 (storage for MQTT), Phase 3 (consumer for MQTT)
**Exit gate**: Client flush coalescing measurably reduces syscalls under load, MQTT sessions survive server restart, SIGHUP triggers config reload, implicit routes auto-discover.
### 4A: Client Flush Coalescing
**Enhance `NatsClient.cs`:**
- New fields: `int _flushSignalsPending`, `const int MaxFlushPending = 10`
- New field: `AsyncAutoResetEvent _flushSignal` — write loop waits on this
- `QueueOutbound()`: after writing to channel, increment `_flushSignalsPending`, signal
- `RunWriteLoopAsync()`: if `_flushSignalsPending < MaxFlushPending && _pendingBytes < MaxBufSize`, wait for signal (coalesces)
- `NatsServer.FlushClients(HashSet<NatsClient>)` — signals all pending clients
### 4B: Client Stall Gate
**Enhance `NatsClient.cs`:**
- New field: `SemaphoreSlim? _stallGate`
- In `QueueOutbound()`:
- If `_pendingBytes > _maxPending * 3 / 4` and `_stallGate == null`: create `new SemaphoreSlim(0, 1)`
- If `_stallGate != null`: `await _stallGate.WaitAsync(timeout)`
- In `RunWriteLoopAsync()`:
- After successful flush below threshold: `_stallGate?.Release()`, set `_stallGate = null`
- Per-kind: CLIENT → close on slow consumer; ROUTER/GATEWAY/LEAF → mark flag, keep alive
### 4C: Write Timeout Recovery
**Enhance `RunWriteLoopAsync()`:**
- Track `bytesWritten` and `bytesAttempted` per flush cycle
- On write timeout:
- If `Kind == CLIENT` → close immediately
- If `bytesWritten > 0` (partial flush) → mark `isSlowConsumer`, continue
- If `bytesWritten == 0` → close
- New enum: `WriteTimeoutPolicy { Close, TcpFlush }`
- Route/gateway/leaf use `TcpFlush` by default
### 4D: MQTT JetStream Persistence
**Enhance MQTT subsystem:**
**Streams per account:**
- `$MQTT_msgs` — message persistence (subject: `$MQTT.msgs.>`)
- `$MQTT_rmsgs` — retained messages (subject: `$MQTT.rmsgs.>`)
- `$MQTT_sess` — session state (subject: `$MQTT.sess.>`)
**MqttSessionStore.cs changes:**
- `ConnectAsync(clientId, cleanSession)`:
- If `cleanSession=false`: query `$MQTT_sess` for existing session, restore
- If `cleanSession=true`: delete existing session from stream
- `SaveSessionAsync()` — persist current state to `$MQTT_sess`
- `DisconnectAsync(abnormal)`:
- If abnormal and will configured: publish will message
- If `cleanSession=false`: save session
**MqttRetainedStore.cs changes:**
- Backed by `$MQTT_rmsgs` stream
- `SetRetained(topic, payload)` — store/update retained message
- `GetRetained(topicFilter)` — query matching retained messages
- On SUBSCRIBE: deliver matching retained messages
**QoS tracking:**
- QoS 1 pending acks stored in session stream
- On reconnect: redeliver unacked QoS 1 messages
- QoS 2 state machine backed by session stream
### 4E: SIGHUP Config Reload
**New file: `SignalHandler.cs`** in Configuration/:
```csharp
public static class SignalHandler
{
public static void Register(ConfigReloader reloader, NatsServer server)
{
PosixSignalRegistration.Create(PosixSignal.SIGHUP, ctx =>
{
_ = reloader.ReloadAsync(server);
});
}
}
```
**Enhance `ConfigReloader.cs`:**
- New `ReloadAsync(NatsServer server)`:
- Re-parse config file
- `Diff(old, new)` for change list
- Apply reloadable changes: logging, auth, TLS, limits
- Reject non-reloadable with warning
- Auth change propagation: iterate clients, re-evaluate, close unauthorized
- TLS reload: `server.ReloadTlsCertificate(newCert)`
### 4F: Implicit Route/Gateway Discovery
**Enhance `RouteManager.cs`:**
- `ProcessImplicitRoute(serverInfo)`:
- Extract `connect_urls` from INFO
- For each unknown URL: create solicited connection
- Track in `_discoveredRoutes` set to avoid duplicates
- `ForwardNewRouteInfoToKnownServers(newPeer)`:
- Send updated INFO to all existing routes
- Include new peer in `connect_urls`
**Enhance `GatewayManager.cs`:**
- `ProcessImplicitGateway(gatewayInfo)`:
- Extract gateway URLs from INFO
- Create outbound connection to discovered gateway
- `GossipGatewaysToInboundGateway(conn)`:
- Share known gateways with new inbound connections
---
## Test Strategy
### Per-Phase Test Requirements
**Phase 1 tests:**
- FileStore: encrypted block round-trip, compressed block round-trip, crash recovery with TTL, checksum validation on corrupt data, atomic write failure recovery
- IStreamStore: each missing method gets at least 2 test cases
- RAFT WAL: persist → restart → verify state, concurrent appends, compaction
- Joint consensus: add peer during normal operation, remove peer, add during leader change
**Phase 2 tests:**
- Snapshot encode → decode round-trip (golden test with known data)
- Monitor loop processes stream/consumer assignment entries
- Inflight tracking prevents duplicate proposals
- Leadership transition clears state correctly
**Phase 3 tests:**
- Consumer delivery loop delivers messages in order, respects MaxAckPending
- Pull request expiry, batch/maxBytes enforcement
- NAK with delay schedules redelivery at correct time
- Pause/resume stops and restarts delivery
- Mirror retry recovers after source failure
- Purge with subject filter only removes matching
**Phase 4 tests:**
- Flush coalescing: measure syscall count reduction under load
- Stall gate: producer blocks when client is slow
- MQTT session restore after restart
- SIGHUP triggers reload, auth changes disconnect unauthorized clients
- Implicit route discovery creates connections to unknown peers
### Parity DB Updates
When porting Go tests, update `docs/test_parity.db`:
```sql
UPDATE go_tests SET status='mapped', dotnet_test='TestName', dotnet_file='File.cs'
WHERE go_test='GoTestName';
```
---
## Risk Mitigation
1. **RAFT WAL correctness**: Use fsync on every commit, validate on load, golden tests with known-good WAL files
2. **Joint consensus edge cases**: Test split-brain scenarios with simulated partitions
3. **Snapshot format drift**: Version header, backward compatibility checks, golden tests
4. **Async loop races**: Use structured concurrency (CancellationToken chains), careful locking around inflight maps
5. **FileStore crash recovery**: Intentional corruption tests (truncated blocks, zero-filled regions)
6. **MQTT persistence migration**: Feature flag to toggle JetStream backing vs in-memory
---
## Phase Dependencies
```
Phase 1A (FileStore blocks) ─────────────────┐
Phase 1B (IStreamStore methods) ─────────────┤
Phase 1C (RAFT WAL) ─────────────────────────┼─→ Phase 2 (JetStream Cluster) ─→ Phase 3 (Consumer/Stream)
Phase 1D (RAFT joint consensus) ─────────────┘ │
Phase 4A-C (Client perf) ───────────────────────────────────────────── independent ──────┤
Phase 4D (MQTT persistence) ──────────────────── depends on Phase 1 + Phase 3 ──────────┤
Phase 4E (SIGHUP reload) ──────────────────────────────────────────── independent ──────┤
Phase 4F (Route/Gateway discovery) ─────────────────────────────────── independent ─────┘
```
Phase 4A-C, 4E, and 4F are independent and can be parallelized with Phase 3.
Phase 4D requires Phase 1 (storage) and Phase 3 (consumers) to be complete.

File diff suppressed because it is too large Load Diff