Generated design docs and implementation plans via Codex for: - Batch 37: Stream Messages - Batch 38: Consumer Lifecycle - Batch 39: Consumer Dispatch - Batch 40: MQTT Server/JSA - Batch 41: MQTT Client/IO All plans include mandatory verification protocol and anti-stub guardrails. Updated batches.md with file paths and planned status. All 42 batches (0-41) now have design docs and implementation plans.
428 lines
16 KiB
Markdown
428 lines
16 KiB
Markdown
# Batch 40 MQTT Server/JSA Implementation Plan
|
|
|
|
> **For Codex:** REQUIRED SUB-SKILL: Use `executeplan` to implement this plan task-by-task.
|
|
|
|
**Goal:** Port and verify Batch 40 (`MQTT Server/JSA`) from `server/mqtt.go` so all 78 mapped features and 323 mapped tests are either genuinely implemented/verified or explicitly deferred with concrete blocker reasons.
|
|
|
|
**Architecture:** Implement Batch 40 in four feature groups (`20/20/20/18`) aligned to MQTT bootstrap/parser, JSA adapter, account session manager, and session persistence/retained-message codec behavior. After each feature group, execute mapped test waves with strict evidence gates before any status promotion.
|
|
|
|
**Tech Stack:** .NET 10, C# latest, xUnit 3, Shouldly, NSubstitute, PortTracker CLI, SQLite (`porting.db`)
|
|
|
|
**Design doc:** `docs/plans/2026-02-27-batch-40-mqtt-server-jsa-design.md`
|
|
|
|
---
|
|
|
|
## Batch 40 Scope
|
|
|
|
- Batch ID: `40`
|
|
- Name: `MQTT Server/JSA`
|
|
- Dependencies: `19`, `27`
|
|
- Go source: `golang/nats-server/server/mqtt.go`
|
|
- Features: `78`
|
|
- Tests: `323`
|
|
|
|
If `dotnet` is not on `PATH`, use:
|
|
|
|
```bash
|
|
DOTNET=/usr/local/share/dotnet/dotnet
|
|
```
|
|
|
|
Expected production files (create/modify as needed):
|
|
|
|
- `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs`
|
|
- `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttTypes.cs`
|
|
- `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttConstants.cs`
|
|
- `dotnet/src/ZB.MOM.NatsNet.Server/NatsServer*.cs` (MQTT partials)
|
|
- `dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs` (or MQTT partial)
|
|
- `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttJetStreamAdapter*.cs` (new if absent)
|
|
- `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttAccountSessionManager*.cs` (new/partials)
|
|
- `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttSession*.cs` (new/partials)
|
|
|
|
Expected test files (create/modify as needed):
|
|
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests2.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs`
|
|
- `dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamLeafNodeTests.Impltests.cs`
|
|
- Create missing mapped backlog files as needed:
|
|
- `JetStreamClusterTests1.Impltests.cs`
|
|
- `JetStreamClusterTests3.Impltests.cs`
|
|
- `JetStreamClusterTests4.Impltests.cs`
|
|
- `JetStreamSuperClusterTests.Impltests.cs`
|
|
- `JetStreamClusterLongTests.Impltests.cs`
|
|
- `JetStreamSourcingScalingTests.Impltests.cs`
|
|
- `JetStreamBenchmarks.Impltests.cs`
|
|
|
|
---
|
|
|
|
## MANDATORY VERIFICATION PROTOCOL
|
|
|
|
> **NON-NEGOTIABLE:** Every Batch 40 feature and test must pass this protocol. No shortcuts.
|
|
|
|
### 1. Dependency Preflight and Baseline
|
|
|
|
Run before any status changes:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- batch show 19 --db porting.db
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- batch show 27 --db porting.db
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- batch show 40 --db porting.db
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- batch ready --db porting.db
|
|
```
|
|
|
|
Start batch only when ready:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- batch start 40 --db porting.db
|
|
```
|
|
|
|
Capture baseline:
|
|
|
|
```bash
|
|
$DOTNET build dotnet/
|
|
$DOTNET test dotnet/tests/ZB.MOM.NatsNet.Server.Tests/
|
|
```
|
|
|
|
### 2. Per-Feature Verification Loop (REQUIRED for each feature ID)
|
|
|
|
1. Inspect mapping and Go source intent:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- feature show <feature_id> --db porting.db
|
|
```
|
|
|
|
2. Claim the feature:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- feature update <feature_id> --status stub --db porting.db
|
|
```
|
|
|
|
3. Add/adjust at least one behavioral test path for this feature.
|
|
4. Implement minimal correct logic (no placeholders).
|
|
5. Run **Stub Detection Check**.
|
|
6. Run **Build Gate**.
|
|
7. Run relevant **Test Gate** filters.
|
|
8. Promote feature to `complete` only after evidence is captured.
|
|
9. Promote to `verified` only during task checkpoint after full gates pass.
|
|
|
|
### 3. Per-Test Verification Loop (REQUIRED for each test ID)
|
|
|
|
1. Inspect mapped test:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- test show <test_id> --db porting.db
|
|
```
|
|
|
|
2. Claim the test:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- test update <test_id> --status stub --db porting.db
|
|
```
|
|
|
|
3. Port full Arrange/Act/Assert behavior.
|
|
4. Run single-test filter and verify discovery:
|
|
|
|
```bash
|
|
$DOTNET test dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ \
|
|
--filter "FullyQualifiedName~<ClassName>.<MethodName>" --verbosity normal
|
|
```
|
|
|
|
5. Confirm `Passed: 1, Failed: 0` (never accept `Passed: 0`).
|
|
6. Run class-level filter.
|
|
7. Run **Stub Detection Check** and **Build Gate**.
|
|
8. Promote test to `complete`; defer `verified` to checkpoint.
|
|
|
|
### 4. Stub Detection Check (REQUIRED)
|
|
|
|
Run after each feature loop, each test loop, and before any status promotion:
|
|
|
|
```bash
|
|
changed_files=$(git diff --name-only -- dotnet/src/ZB.MOM.NatsNet.Server dotnet/tests/ZB.MOM.NatsNet.Server.Tests | rg "\.cs$" || true)
|
|
if [ -n "$changed_files" ]; then
|
|
echo "$changed_files" | xargs rg -n "(NotImplementedException|// TODO|// PLACEHOLDER|Assert\.True\(true\)|Assert\.Pass\(\)|var goFile = \"server/|\.ShouldContain\(\"Should\"\)|GetRequiredApiLevel\(new Dictionary<string, string>\)\.ShouldBe\(string\.Empty\)|=>\s*default;|return\s+default;|return\s+null;\s*$)"
|
|
fi
|
|
```
|
|
|
|
Any match blocks status promotion until fixed or explicitly deferred.
|
|
|
|
### 5. Build Gate (REQUIRED)
|
|
|
|
```bash
|
|
$DOTNET build dotnet/
|
|
```
|
|
|
|
Run after each feature loop, each test loop, before any batch-update, and at every task checkpoint.
|
|
|
|
### 6. Test Gate (REQUIRED)
|
|
|
|
Run smallest relevant filters continuously; run all sections at checkpoints.
|
|
|
|
MQTT-first filter:
|
|
|
|
```bash
|
|
$DOTNET test dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ \
|
|
--filter "FullyQualifiedName~ImplBacklog.MqttHandlerTests|FullyQualifiedName~ImplBacklog.MessageTracerTests|FullyQualifiedName~ImplBacklog.ConfigReloaderTests"
|
|
```
|
|
|
|
Deterministic JS/consumer filter:
|
|
|
|
```bash
|
|
$DOTNET test dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ \
|
|
--filter "FullyQualifiedName~ImplBacklog.NatsConsumerTests|FullyQualifiedName~ImplBacklog.JetStreamBatchingTests|FullyQualifiedName~ImplBacklog.JetStreamEngineTests|FullyQualifiedName~ImplBacklog.JetStreamJwtTests|FullyQualifiedName~ImplBacklog.JetStreamFileStoreTests|FullyQualifiedName~ImplBacklog.JetStreamLeafNodeTests"
|
|
```
|
|
|
|
Cluster/concurrency filter:
|
|
|
|
```bash
|
|
$DOTNET test dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ \
|
|
--filter "FullyQualifiedName~ImplBacklog.JetStreamClusterTests1|FullyQualifiedName~ImplBacklog.JetStreamClusterTests2|FullyQualifiedName~ImplBacklog.JetStreamClusterTests3|FullyQualifiedName~ImplBacklog.JetStreamClusterTests4|FullyQualifiedName~ImplBacklog.JetStreamSuperClusterTests|FullyQualifiedName~ImplBacklog.JetStreamClusterLongTests|FullyQualifiedName~ImplBacklog.JetStreamSourcingScalingTests|FullyQualifiedName~ImplBacklog.ConcurrencyTests1|FullyQualifiedName~ImplBacklog.ConcurrencyTests2|FullyQualifiedName~ImplBacklog.JetStreamBenchmarks"
|
|
```
|
|
|
|
Checkpoint/full gate:
|
|
|
|
```bash
|
|
$DOTNET test dotnet/tests/ZB.MOM.NatsNet.Server.Tests/
|
|
```
|
|
|
|
### 7. Status Update Protocol (HARD LIMIT: 15 IDs per batch-update)
|
|
|
|
- Never send more than `15` IDs per `feature batch-update` command.
|
|
- Never send more than `15` IDs per `test batch-update` command.
|
|
- Never promote `verified` without checkpoint evidence.
|
|
- Never update IDs outside active task scope.
|
|
- Deferred items must include explicit blocker reason.
|
|
|
|
Use:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- \
|
|
feature batch-update --ids "<max 15 ids>" --set-status <stub|complete|verified> --db porting.db --execute
|
|
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- \
|
|
test batch-update --ids "<max 15 ids>" --set-status <stub|complete|verified> --db porting.db --execute
|
|
```
|
|
|
|
### 8. Checkpoint Protocol Between Tasks (REQUIRED)
|
|
|
|
Before moving to the next task:
|
|
|
|
1. Run **Stub Detection Check**.
|
|
2. Run **Build Gate**.
|
|
3. Run **Test Gate** (targeted filters + full unit suite).
|
|
4. Apply status updates in <=15-ID chunks only.
|
|
5. Run:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- batch show 40 --db porting.db
|
|
```
|
|
|
|
6. Commit checkpoint before continuing.
|
|
|
|
---
|
|
|
|
## ANTI-STUB GUARDRAILS (NON-NEGOTIABLE)
|
|
|
|
### Forbidden Patterns
|
|
|
|
The following patterns are forbidden for Batch 40 feature/test work:
|
|
|
|
- `throw new NotImplementedException(...)` in mapped feature methods
|
|
- Empty method bodies for mapped features
|
|
- `TODO` / `PLACEHOLDER` markers left in mapped logic
|
|
- `Assert.True(true)` / `Assert.Pass()`
|
|
- Placeholder template assertions disconnected from behavior:
|
|
- `var goFile = "server/..."`
|
|
- `"<name>".ShouldContain("Should")`
|
|
- `GetRequiredApiLevel(new Dictionary<string, string>()).ShouldBe(string.Empty)` as primary assertion
|
|
- Placeholder return shortcuts in mapped code:
|
|
- `=> default;`
|
|
- `return default;`
|
|
- `return null;`
|
|
|
|
### Hard Limits
|
|
|
|
- Max feature group size: `~20` features.
|
|
- Max IDs per `feature batch-update`: `15`.
|
|
- Max IDs per `test batch-update`: `15`.
|
|
- One active feature loop at a time.
|
|
- One active test loop at a time.
|
|
- Mandatory checkpoint between tasks.
|
|
- No `verified` promotion without checkpoint evidence.
|
|
|
|
### If You Get Stuck (REQUIRED)
|
|
|
|
1. Stop current item immediately.
|
|
2. Do not stub or fake-pass.
|
|
3. Mark item `deferred` with explicit blocker reason.
|
|
4. Continue with next unblocked feature/test.
|
|
|
|
Feature deferral:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- \
|
|
feature update <feature_id> --status deferred --override "blocked: <specific reason>" --db porting.db
|
|
```
|
|
|
|
Test deferral:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- \
|
|
test update <test_id> --status deferred --override "blocked: <specific reason>" --db porting.db
|
|
```
|
|
|
|
`deferred` with concrete reason is correct. Stubs are not.
|
|
|
|
---
|
|
|
|
## Feature Groups (max ~20 each)
|
|
|
|
### Group A (20): bootstrap/parser/auth/JSA request foundations
|
|
|
|
IDs:
|
|
`2252,2253,2254,2255,2257,2258,2259,2260,2261,2262,2263,2264,2265,2266,2267,2268,2269,2270,2271,2272`
|
|
|
|
### Group B (20): JSA operations + reply processing bridge
|
|
|
|
IDs:
|
|
`2273,2274,2275,2276,2277,2278,2279,2280,2281,2282,2283,2284,2285,2286,2287,2288,2289,2290,2291,2292`
|
|
|
|
### Group C (20): account session manager retained/subscription core
|
|
|
|
IDs:
|
|
`2293,2294,2295,2296,2297,2298,2299,2300,2301,2302,2303,2304,2305,2306,2307,2308,2309,2310,2311,2312`
|
|
|
|
### Group D (18): retained encode/decode + session lifecycle/publish tracking
|
|
|
|
IDs:
|
|
`2313,2314,2315,2316,2317,2318,2319,2320,2321,2322,2323,2324,2325,2326,2327,2328,2329,2330`
|
|
|
|
## Test Waves (323 total)
|
|
|
|
- Wave T1 (51): `MqttHandlerTests`, `MessageTracerTests`, `ConfigReloaderTests`
|
|
- Wave T2 (89): `NatsConsumerTests`, `JetStreamBatchingTests`, `JetStreamEngineTests`, `JetStreamJwtTests`, `JetStreamFileStoreTests`, `JetStreamLeafNodeTests`
|
|
- Wave T3 (70): `JetStreamClusterTests1`, `JetStreamClusterTests2`
|
|
- Wave T4 (77): `JetStreamClusterTests3`, `JetStreamClusterTests4`, `JetStreamSuperClusterTests`, `JetStreamClusterLongTests`, `JetStreamSourcingScalingTests`
|
|
- Wave T5 (36): `ConcurrencyTests1`, `ConcurrencyTests2`, `JetStreamBenchmarks`
|
|
|
|
ID extraction helper:
|
|
|
|
```bash
|
|
sqlite3 -header -column porting.db "
|
|
SELECT bt.test_id AS id, t.dotnet_class, t.name
|
|
FROM batch_tests bt
|
|
JOIN unit_tests t ON t.id=bt.test_id
|
|
WHERE bt.batch_id=40
|
|
AND t.dotnet_class IN (<wave classes here>)
|
|
ORDER BY bt.test_id;"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 1: Preflight, Mapping Alignment, and Baseline
|
|
|
|
**Files:**
|
|
- Read: `docs/plans/2026-02-27-batch-40-mqtt-server-jsa-design.md`
|
|
- Read: `golang/nats-server/server/mqtt.go`
|
|
- Modify (if needed): `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/*` for class/method mapping alignment
|
|
|
|
**Steps:**
|
|
1. Run dependency preflight and start Batch 40.
|
|
2. Confirm mapped class/method targets compile (notably `MqttJetStreamAdapter`).
|
|
3. Capture baseline build + full unit test counts.
|
|
4. Run checkpoint protocol.
|
|
|
|
### Task 2: Implement Feature Group A + Test Wave T1
|
|
|
|
**Files:**
|
|
- Modify/Create: `dotnet/src/ZB.MOM.NatsNet.Server/NatsServer*.cs` (MQTT methods)
|
|
- Modify/Create: `dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection*.cs` (MQTT parse/trace helpers)
|
|
- Modify: `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs`
|
|
- Modify tests: `ImplBacklog/MqttHandlerTests.Impltests.cs`, `ImplBacklog/MessageTracerTests.Impltests.cs`, `ImplBacklog/ConfigReloaderTests.Impltests.cs`
|
|
|
|
**Steps:**
|
|
1. Process each Group A feature through per-feature loop.
|
|
2. Port/verify Wave T1 tests through per-test loop.
|
|
3. Apply status updates in <=15-ID chunks.
|
|
4. Run checkpoint protocol and commit.
|
|
|
|
### Task 3: Implement Feature Group B + Test Wave T2 (Part 1)
|
|
|
|
**Files:**
|
|
- Modify/Create: `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttJetStreamAdapter*.cs`
|
|
- Modify/Create: `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttAccountSessionManager*.cs`
|
|
- Modify tests: `ImplBacklog/JetStreamBatchingTests.Impltests.cs`, `ImplBacklog/JetStreamEngineTests.Impltests.cs`, `ImplBacklog/NatsConsumerTests.Impltests.cs`, `ImplBacklog/JetStreamJwtTests.Impltests.cs`, `ImplBacklog/JetStreamFileStoreTests.Impltests.cs`, `ImplBacklog/JetStreamLeafNodeTests.Impltests.cs`
|
|
|
|
**Steps:**
|
|
1. Process each Group B feature through per-feature loop.
|
|
2. Port deterministic tests from Wave T2 first; defer runtime-blocked tests with reason.
|
|
3. Apply status updates in <=15-ID chunks.
|
|
4. Run checkpoint protocol and commit.
|
|
|
|
### Task 4: Implement Feature Group C + Test Wave T3
|
|
|
|
**Files:**
|
|
- Modify/Create: `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttAccountSessionManager*.cs`
|
|
- Modify/Create: `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttSession*.cs`
|
|
- Modify/Create tests: `ImplBacklog/JetStreamClusterTests1.Impltests.cs`, `ImplBacklog/JetStreamClusterTests2.Impltests.cs`
|
|
|
|
**Steps:**
|
|
1. Process each Group C feature through per-feature loop.
|
|
2. Execute Wave T3 with strict infra eligibility checks.
|
|
3. Defer cluster-runtime blocked tests explicitly; do not stub.
|
|
4. Apply status updates in <=15-ID chunks.
|
|
5. Run checkpoint protocol and commit.
|
|
|
|
### Task 5: Implement Feature Group D + Test Waves T4/T5
|
|
|
|
**Files:**
|
|
- Modify/Create: `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs`
|
|
- Modify/Create: `dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttSession*.cs`
|
|
- Modify/Create tests:
|
|
- `ImplBacklog/JetStreamClusterTests3.Impltests.cs`
|
|
- `ImplBacklog/JetStreamClusterTests4.Impltests.cs`
|
|
- `ImplBacklog/JetStreamSuperClusterTests.Impltests.cs`
|
|
- `ImplBacklog/JetStreamClusterLongTests.Impltests.cs`
|
|
- `ImplBacklog/JetStreamSourcingScalingTests.Impltests.cs`
|
|
- `ImplBacklog/ConcurrencyTests1.Impltests.cs`
|
|
- `ImplBacklog/ConcurrencyTests2.Impltests.cs`
|
|
- `ImplBacklog/JetStreamBenchmarks.Impltests.cs`
|
|
|
|
**Steps:**
|
|
1. Process each Group D feature through per-feature loop.
|
|
2. Execute Waves T4/T5 with strict defer-with-reason handling.
|
|
3. Apply status updates in <=15-ID chunks.
|
|
4. Run checkpoint protocol and commit.
|
|
|
|
### Task 6: Batch 40 Closure Verification
|
|
|
|
**Files:**
|
|
- Modify: `porting.db`
|
|
- Generate: `reports/current.md` (via report script)
|
|
|
|
**Steps:**
|
|
1. Run final stub detection across changed MQTT source/test files.
|
|
2. Run `build` and full unit test suite.
|
|
3. Verify Batch 40 state:
|
|
|
|
```bash
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- batch show 40 --db porting.db
|
|
$DOTNET run --project tools/NatsNet.PortTracker -- report summary --db porting.db
|
|
```
|
|
|
|
4. Generate updated report:
|
|
|
|
```bash
|
|
./reports/generate-report.sh
|
|
```
|
|
|
|
5. Final checkpoint commit.
|
|
|