10 tasks from scaffolding through integration tests, each with TDD steps, exact file paths, and complete code. Bottom-up order: SubList → Parser → Client → Server → Integration.
2722 lines
78 KiB
Markdown
2722 lines
78 KiB
Markdown
# Base NATS Server Port — Implementation Plan
|
|
|
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
|
|
|
|
**Goal:** Port the core NATS pub/sub server from Go to .NET 10 C# — supporting CONNECT/INFO handshake, PUB/SUB/UNSUB, wildcards (`*`, `>`), queue groups, PING/PONG, and HPUB/HMSG headers.
|
|
|
|
**Architecture:** Bottom-up build: subject matching trie (SubList) → protocol parser (System.IO.Pipelines) → client connection handler → server orchestrator. Each layer is independently testable. The library (NATS.Server) is separate from the host console app (NATS.Server.Host).
|
|
|
|
**Tech Stack:** .NET 10, C# 14, xUnit, System.IO.Pipelines, System.Text.Json
|
|
|
|
---
|
|
|
|
### Task 0: Project Scaffolding
|
|
|
|
**Files:**
|
|
- Create: `NatsDotNet.sln`
|
|
- Create: `src/NATS.Server/NATS.Server.csproj`
|
|
- Create: `src/NATS.Server.Host/NATS.Server.Host.csproj`
|
|
- Create: `tests/NATS.Server.Tests/NATS.Server.Tests.csproj`
|
|
- Create: `Directory.Build.props`
|
|
|
|
**Step 1: Create the solution and projects**
|
|
|
|
```bash
|
|
cd /Users/dohertj2/Desktop/natsdotnet
|
|
|
|
# Create solution
|
|
dotnet new sln -n NatsDotNet
|
|
|
|
# Create class library
|
|
dotnet new classlib -n NATS.Server -o src/NATS.Server -f net10.0
|
|
rm src/NATS.Server/Class1.cs
|
|
|
|
# Create console app
|
|
dotnet new console -n NATS.Server.Host -o src/NATS.Server.Host -f net10.0
|
|
|
|
# Create test project
|
|
dotnet new xunit -n NATS.Server.Tests -o tests/NATS.Server.Tests -f net10.0
|
|
|
|
# Add projects to solution
|
|
dotnet sln add src/NATS.Server/NATS.Server.csproj
|
|
dotnet sln add src/NATS.Server.Host/NATS.Server.Host.csproj
|
|
dotnet sln add tests/NATS.Server.Tests/NATS.Server.Tests.csproj
|
|
|
|
# Add project references
|
|
dotnet add src/NATS.Server.Host/NATS.Server.Host.csproj reference src/NATS.Server/NATS.Server.csproj
|
|
dotnet add tests/NATS.Server.Tests/NATS.Server.Tests.csproj reference src/NATS.Server/NATS.Server.csproj
|
|
```
|
|
|
|
**Step 2: Create Directory.Build.props**
|
|
|
|
```xml
|
|
<Project>
|
|
<PropertyGroup>
|
|
<TargetFramework>net10.0</TargetFramework>
|
|
<LangVersion>preview</LangVersion>
|
|
<Nullable>enable</Nullable>
|
|
<ImplicitUsings>enable</ImplicitUsings>
|
|
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
|
</PropertyGroup>
|
|
</Project>
|
|
```
|
|
|
|
Remove the `<TargetFramework>` and `<Nullable>` and `<ImplicitUsings>` from each individual `.csproj` since they're now in `Directory.Build.props`.
|
|
|
|
**Step 3: Create directory structure**
|
|
|
|
```bash
|
|
mkdir -p src/NATS.Server/Protocol
|
|
mkdir -p src/NATS.Server/Subscriptions
|
|
```
|
|
|
|
**Step 4: Verify build**
|
|
|
|
Run: `dotnet build NatsDotNet.sln`
|
|
Expected: Build succeeded with 0 errors.
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add -A
|
|
git commit -m "feat: scaffold solution with NATS.Server library, host, and test projects"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 1: Subscription Types and Subject Validation
|
|
|
|
**Files:**
|
|
- Create: `src/NATS.Server/Subscriptions/Subscription.cs`
|
|
- Create: `src/NATS.Server/Subscriptions/SubListResult.cs`
|
|
- Create: `src/NATS.Server/Subscriptions/SubjectMatch.cs`
|
|
- Test: `tests/NATS.Server.Tests/SubjectMatchTests.cs`
|
|
|
|
**Reference:** `golang/nats-server/server/sublist.go` lines 1159-1233 for subject validation
|
|
|
|
**Step 1: Write failing tests for subject validation**
|
|
|
|
```csharp
|
|
// tests/NATS.Server.Tests/SubjectMatchTests.cs
|
|
using NATS.Server.Subscriptions;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
public class SubjectMatchTests
|
|
{
|
|
[Theory]
|
|
[InlineData("foo", true)]
|
|
[InlineData("foo.bar", true)]
|
|
[InlineData("foo.bar.baz", true)]
|
|
[InlineData("foo.*", true)]
|
|
[InlineData("foo.>", true)]
|
|
[InlineData(">", true)]
|
|
[InlineData("*", true)]
|
|
[InlineData("*.bar", true)]
|
|
[InlineData("foo.*.baz", true)]
|
|
[InlineData("", false)]
|
|
[InlineData("foo.", false)]
|
|
[InlineData(".foo", false)]
|
|
[InlineData("foo..bar", false)]
|
|
[InlineData("foo.>.bar", false)] // > must be last token
|
|
[InlineData("foo bar", false)] // no spaces
|
|
[InlineData("foo\tbar", false)] // no tabs
|
|
public void IsValidSubject(string subject, bool expected)
|
|
{
|
|
Assert.Equal(expected, SubjectMatch.IsValidSubject(subject));
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData("foo", true)]
|
|
[InlineData("foo.bar.baz", true)]
|
|
[InlineData("foo.*", false)]
|
|
[InlineData("foo.>", false)]
|
|
[InlineData(">", false)]
|
|
[InlineData("*", false)]
|
|
public void IsValidPublishSubject(string subject, bool expected)
|
|
{
|
|
Assert.Equal(expected, SubjectMatch.IsValidPublishSubject(subject));
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData("foo", "foo", true)]
|
|
[InlineData("foo", "bar", false)]
|
|
[InlineData("foo.bar", "foo.*", true)]
|
|
[InlineData("foo.bar", "*.bar", true)]
|
|
[InlineData("foo.bar", "*.*", true)]
|
|
[InlineData("foo.bar.baz", "foo.>", true)]
|
|
[InlineData("foo.bar.baz", ">", true)]
|
|
[InlineData("foo.bar", "foo.>", true)]
|
|
[InlineData("foo", "foo.>", false)]
|
|
[InlineData("foo.bar.baz", "foo.*", false)]
|
|
[InlineData("foo.bar", "foo.bar.>", false)]
|
|
public void MatchLiteral(string literal, string pattern, bool expected)
|
|
{
|
|
Assert.Equal(expected, SubjectMatch.MatchLiteral(literal, pattern));
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 2: Run tests to verify they fail**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubjectMatchTests" -v quiet`
|
|
Expected: FAIL — types don't exist yet.
|
|
|
|
**Step 3: Implement Subscription, SubListResult, and SubjectMatch**
|
|
|
|
```csharp
|
|
// src/NATS.Server/Subscriptions/Subscription.cs
|
|
namespace NATS.Server.Subscriptions;
|
|
|
|
public sealed class Subscription
|
|
{
|
|
public required string Subject { get; init; }
|
|
public string? Queue { get; init; }
|
|
public required string Sid { get; init; }
|
|
public long MessageCount; // Interlocked
|
|
public long MaxMessages; // 0 = unlimited
|
|
}
|
|
```
|
|
|
|
```csharp
|
|
// src/NATS.Server/Subscriptions/SubListResult.cs
|
|
namespace NATS.Server.Subscriptions;
|
|
|
|
public sealed class SubListResult
|
|
{
|
|
public static readonly SubListResult Empty = new([], []);
|
|
|
|
public Subscription[] PlainSubs { get; }
|
|
public Subscription[][] QueueSubs { get; } // outer = groups, inner = members
|
|
|
|
public SubListResult(Subscription[] plainSubs, Subscription[][] queueSubs)
|
|
{
|
|
PlainSubs = plainSubs;
|
|
QueueSubs = queueSubs;
|
|
}
|
|
}
|
|
```
|
|
|
|
```csharp
|
|
// src/NATS.Server/Subscriptions/SubjectMatch.cs
|
|
namespace NATS.Server.Subscriptions;
|
|
|
|
public static class SubjectMatch
|
|
{
|
|
public const char Pwc = '*'; // partial wildcard
|
|
public const char Fwc = '>'; // full wildcard
|
|
public const char Sep = '.'; // token separator
|
|
|
|
public static bool IsValidSubject(string subject)
|
|
{
|
|
if (string.IsNullOrEmpty(subject))
|
|
return false;
|
|
|
|
bool sawFwc = false;
|
|
int start = 0;
|
|
|
|
for (int i = 0; i <= subject.Length; i++)
|
|
{
|
|
if (i == subject.Length || subject[i] == Sep)
|
|
{
|
|
int tokenLen = i - start;
|
|
if (tokenLen == 0 || sawFwc)
|
|
return false;
|
|
|
|
if (tokenLen == 1)
|
|
{
|
|
char c = subject[start];
|
|
if (c == Fwc)
|
|
sawFwc = true;
|
|
else if (c is ' ' or '\t' or '\n' or '\r' or '\f')
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
for (int j = start; j < i; j++)
|
|
{
|
|
char c = subject[j];
|
|
if (c is ' ' or '\t' or '\n' or '\r' or '\f')
|
|
return false;
|
|
}
|
|
}
|
|
|
|
start = i + 1;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
public static bool IsLiteral(string subject)
|
|
{
|
|
for (int i = 0; i < subject.Length; i++)
|
|
{
|
|
char c = subject[i];
|
|
if (c is Pwc or Fwc)
|
|
{
|
|
bool atStart = i == 0 || subject[i - 1] == Sep;
|
|
bool atEnd = i + 1 == subject.Length || subject[i + 1] == Sep;
|
|
if (atStart && atEnd)
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
public static bool IsValidPublishSubject(string subject)
|
|
{
|
|
return IsValidSubject(subject) && IsLiteral(subject);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Match a literal subject against a pattern that may contain wildcards.
|
|
/// </summary>
|
|
public static bool MatchLiteral(string literal, string pattern)
|
|
{
|
|
int li = 0, pi = 0;
|
|
|
|
while (pi < pattern.Length)
|
|
{
|
|
// Get next pattern token
|
|
int pTokenStart = pi;
|
|
while (pi < pattern.Length && pattern[pi] != Sep) pi++;
|
|
int pTokenLen = pi - pTokenStart;
|
|
if (pi < pattern.Length) pi++; // skip separator
|
|
|
|
// Full wildcard — matches everything remaining
|
|
if (pTokenLen == 1 && pattern[pTokenStart] == Fwc)
|
|
return li < literal.Length; // must have at least one token left
|
|
|
|
// Get next literal token
|
|
if (li >= literal.Length) return false;
|
|
int lTokenStart = li;
|
|
while (li < literal.Length && literal[li] != Sep) li++;
|
|
int lTokenLen = li - lTokenStart;
|
|
if (li < literal.Length) li++; // skip separator
|
|
|
|
// Partial wildcard — matches any single token
|
|
if (pTokenLen == 1 && pattern[pTokenStart] == Pwc)
|
|
continue;
|
|
|
|
// Literal comparison
|
|
if (pTokenLen != lTokenLen)
|
|
return false;
|
|
if (string.Compare(literal, lTokenStart, pattern, pTokenStart, pTokenLen, StringComparison.Ordinal) != 0)
|
|
return false;
|
|
}
|
|
|
|
return li >= literal.Length; // both exhausted
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 4: Run tests to verify they pass**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubjectMatchTests" -v quiet`
|
|
Expected: All tests PASS.
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/NATS.Server/Subscriptions/ tests/NATS.Server.Tests/SubjectMatchTests.cs
|
|
git commit -m "feat: add Subscription types and subject validation with wildcard matching"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: SubList Trie — Insert, Remove, Match
|
|
|
|
**Files:**
|
|
- Create: `src/NATS.Server/Subscriptions/SubList.cs`
|
|
- Test: `tests/NATS.Server.Tests/SubListTests.cs`
|
|
|
|
**Reference:** `golang/nats-server/server/sublist.go` — the core trie, matchLevel(), addNodeToResults(), cache logic
|
|
|
|
**Step 1: Write failing tests for SubList**
|
|
|
|
```csharp
|
|
// tests/NATS.Server.Tests/SubListTests.cs
|
|
using NATS.Server.Subscriptions;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
public class SubListTests
|
|
{
|
|
private static Subscription MakeSub(string subject, string? queue = null, string sid = "1")
|
|
=> new() { Subject = subject, Queue = queue, Sid = sid };
|
|
|
|
[Fact]
|
|
public void Insert_and_match_literal_subject()
|
|
{
|
|
var sl = new SubList();
|
|
var sub = MakeSub("foo.bar");
|
|
sl.Insert(sub);
|
|
|
|
var r = sl.Match("foo.bar");
|
|
Assert.Single(r.PlainSubs);
|
|
Assert.Same(sub, r.PlainSubs[0]);
|
|
Assert.Empty(r.QueueSubs);
|
|
}
|
|
|
|
[Fact]
|
|
public void Match_returns_empty_for_no_match()
|
|
{
|
|
var sl = new SubList();
|
|
sl.Insert(MakeSub("foo.bar"));
|
|
|
|
var r = sl.Match("foo.baz");
|
|
Assert.Empty(r.PlainSubs);
|
|
}
|
|
|
|
[Fact]
|
|
public void Match_partial_wildcard()
|
|
{
|
|
var sl = new SubList();
|
|
var sub = MakeSub("foo.*");
|
|
sl.Insert(sub);
|
|
|
|
Assert.Single(sl.Match("foo.bar").PlainSubs);
|
|
Assert.Single(sl.Match("foo.baz").PlainSubs);
|
|
Assert.Empty(sl.Match("foo.bar.baz").PlainSubs);
|
|
}
|
|
|
|
[Fact]
|
|
public void Match_full_wildcard()
|
|
{
|
|
var sl = new SubList();
|
|
var sub = MakeSub("foo.>");
|
|
sl.Insert(sub);
|
|
|
|
Assert.Single(sl.Match("foo.bar").PlainSubs);
|
|
Assert.Single(sl.Match("foo.bar.baz").PlainSubs);
|
|
Assert.Empty(sl.Match("foo").PlainSubs);
|
|
}
|
|
|
|
[Fact]
|
|
public void Match_root_full_wildcard()
|
|
{
|
|
var sl = new SubList();
|
|
sl.Insert(MakeSub(">"));
|
|
|
|
Assert.Single(sl.Match("foo").PlainSubs);
|
|
Assert.Single(sl.Match("foo.bar").PlainSubs);
|
|
Assert.Single(sl.Match("foo.bar.baz").PlainSubs);
|
|
}
|
|
|
|
[Fact]
|
|
public void Match_collects_multiple_subs()
|
|
{
|
|
var sl = new SubList();
|
|
sl.Insert(MakeSub("foo.bar", sid: "1"));
|
|
sl.Insert(MakeSub("foo.*", sid: "2"));
|
|
sl.Insert(MakeSub("foo.>", sid: "3"));
|
|
sl.Insert(MakeSub(">", sid: "4"));
|
|
|
|
var r = sl.Match("foo.bar");
|
|
Assert.Equal(4, r.PlainSubs.Length);
|
|
}
|
|
|
|
[Fact]
|
|
public void Remove_subscription()
|
|
{
|
|
var sl = new SubList();
|
|
var sub = MakeSub("foo.bar");
|
|
sl.Insert(sub);
|
|
Assert.Single(sl.Match("foo.bar").PlainSubs);
|
|
|
|
sl.Remove(sub);
|
|
Assert.Empty(sl.Match("foo.bar").PlainSubs);
|
|
}
|
|
|
|
[Fact]
|
|
public void Queue_group_subscriptions()
|
|
{
|
|
var sl = new SubList();
|
|
sl.Insert(MakeSub("foo.bar", queue: "workers", sid: "1"));
|
|
sl.Insert(MakeSub("foo.bar", queue: "workers", sid: "2"));
|
|
sl.Insert(MakeSub("foo.bar", queue: "loggers", sid: "3"));
|
|
|
|
var r = sl.Match("foo.bar");
|
|
Assert.Empty(r.PlainSubs);
|
|
Assert.Equal(2, r.QueueSubs.Length); // 2 queue groups
|
|
}
|
|
|
|
[Fact]
|
|
public void Count_tracks_subscriptions()
|
|
{
|
|
var sl = new SubList();
|
|
Assert.Equal(0u, sl.Count);
|
|
|
|
sl.Insert(MakeSub("foo", sid: "1"));
|
|
sl.Insert(MakeSub("bar", sid: "2"));
|
|
Assert.Equal(2u, sl.Count);
|
|
|
|
sl.Remove(MakeSub("foo", sid: "1"));
|
|
// Remove by reference won't work — we need the same instance
|
|
}
|
|
|
|
[Fact]
|
|
public void Count_tracks_with_same_instance()
|
|
{
|
|
var sl = new SubList();
|
|
var sub = MakeSub("foo");
|
|
sl.Insert(sub);
|
|
Assert.Equal(1u, sl.Count);
|
|
sl.Remove(sub);
|
|
Assert.Equal(0u, sl.Count);
|
|
}
|
|
|
|
[Fact]
|
|
public void Cache_invalidation_on_insert()
|
|
{
|
|
var sl = new SubList();
|
|
sl.Insert(MakeSub("foo.bar", sid: "1"));
|
|
|
|
// Prime the cache
|
|
var r1 = sl.Match("foo.bar");
|
|
Assert.Single(r1.PlainSubs);
|
|
|
|
// Insert a wildcard that matches — cache should be invalidated
|
|
sl.Insert(MakeSub("foo.*", sid: "2"));
|
|
|
|
var r2 = sl.Match("foo.bar");
|
|
Assert.Equal(2, r2.PlainSubs.Length);
|
|
}
|
|
|
|
[Fact]
|
|
public void Match_partial_wildcard_at_different_levels()
|
|
{
|
|
var sl = new SubList();
|
|
sl.Insert(MakeSub("*.bar.baz", sid: "1"));
|
|
sl.Insert(MakeSub("foo.*.baz", sid: "2"));
|
|
sl.Insert(MakeSub("foo.bar.*", sid: "3"));
|
|
|
|
var r = sl.Match("foo.bar.baz");
|
|
Assert.Equal(3, r.PlainSubs.Length);
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 2: Run tests to verify they fail**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubListTests" -v quiet`
|
|
Expected: FAIL — `SubList` class doesn't exist yet.
|
|
|
|
**Step 3: Implement SubList**
|
|
|
|
```csharp
|
|
// src/NATS.Server/Subscriptions/SubList.cs
|
|
namespace NATS.Server.Subscriptions;
|
|
|
|
public sealed class SubList
|
|
{
|
|
private const int CacheMax = 1024;
|
|
private const int CacheSweep = 256;
|
|
|
|
private readonly ReaderWriterLockSlim _lock = new();
|
|
private readonly TrieLevel _root = new();
|
|
private Dictionary<string, SubListResult>? _cache = new();
|
|
private uint _count;
|
|
private ulong _genId;
|
|
|
|
public uint Count
|
|
{
|
|
get
|
|
{
|
|
_lock.EnterReadLock();
|
|
try { return _count; }
|
|
finally { _lock.ExitReadLock(); }
|
|
}
|
|
}
|
|
|
|
public void Insert(Subscription sub)
|
|
{
|
|
var subject = sub.Subject;
|
|
_lock.EnterWriteLock();
|
|
try
|
|
{
|
|
var level = _root;
|
|
TrieNode? node = null;
|
|
bool sawFwc = false;
|
|
|
|
foreach (var token in new TokenEnumerator(subject))
|
|
{
|
|
if (token.Length == 0 || sawFwc)
|
|
throw new ArgumentException("Invalid subject", nameof(sub));
|
|
|
|
if (token.Length == 1 && token[0] == SubjectMatch.Pwc)
|
|
{
|
|
node = level.Pwc ??= new TrieNode();
|
|
}
|
|
else if (token.Length == 1 && token[0] == SubjectMatch.Fwc)
|
|
{
|
|
node = level.Fwc ??= new TrieNode();
|
|
sawFwc = true;
|
|
}
|
|
else
|
|
{
|
|
var key = token.ToString();
|
|
if (!level.Nodes.TryGetValue(key, out node))
|
|
{
|
|
node = new TrieNode();
|
|
level.Nodes[key] = node;
|
|
}
|
|
}
|
|
|
|
node.Next ??= new TrieLevel();
|
|
level = node.Next;
|
|
}
|
|
|
|
if (node == null)
|
|
throw new ArgumentException("Invalid subject", nameof(sub));
|
|
|
|
if (sub.Queue == null)
|
|
{
|
|
node.PlainSubs.Add(sub);
|
|
}
|
|
else
|
|
{
|
|
if (!node.QueueSubs.TryGetValue(sub.Queue, out var qset))
|
|
{
|
|
qset = [];
|
|
node.QueueSubs[sub.Queue] = qset;
|
|
}
|
|
qset.Add(sub);
|
|
}
|
|
|
|
_count++;
|
|
_genId++;
|
|
_cache?.Clear();
|
|
}
|
|
finally
|
|
{
|
|
_lock.ExitWriteLock();
|
|
}
|
|
}
|
|
|
|
public void Remove(Subscription sub)
|
|
{
|
|
_lock.EnterWriteLock();
|
|
try
|
|
{
|
|
var level = _root;
|
|
TrieNode? node = null;
|
|
bool sawFwc = false;
|
|
|
|
// Stack for pruning empty nodes
|
|
Span<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)> path =
|
|
stackalloc (TrieLevel, TrieNode, string, bool, bool)[32];
|
|
// Can't stackalloc tuples with references — use a list instead
|
|
var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>();
|
|
|
|
foreach (var token in new TokenEnumerator(sub.Subject))
|
|
{
|
|
if (token.Length == 0 || sawFwc)
|
|
return;
|
|
|
|
bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc;
|
|
bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc;
|
|
|
|
if (isPwc)
|
|
{
|
|
node = level.Pwc;
|
|
}
|
|
else if (isFwc)
|
|
{
|
|
node = level.Fwc;
|
|
sawFwc = true;
|
|
}
|
|
else
|
|
{
|
|
level.Nodes.TryGetValue(token.ToString(), out node);
|
|
}
|
|
|
|
if (node == null)
|
|
return; // not found
|
|
|
|
var tokenStr = token.ToString();
|
|
pathList.Add((level, node, tokenStr, isPwc, isFwc));
|
|
level = node.Next ?? new TrieLevel(); // shouldn't be null on valid path
|
|
}
|
|
|
|
if (node == null) return;
|
|
|
|
// Remove from node
|
|
bool removed;
|
|
if (sub.Queue == null)
|
|
{
|
|
removed = node.PlainSubs.Remove(sub);
|
|
}
|
|
else
|
|
{
|
|
removed = false;
|
|
if (node.QueueSubs.TryGetValue(sub.Queue, out var qset))
|
|
{
|
|
removed = qset.Remove(sub);
|
|
if (qset.Count == 0)
|
|
node.QueueSubs.Remove(sub.Queue);
|
|
}
|
|
}
|
|
|
|
if (!removed) return;
|
|
|
|
_count--;
|
|
_genId++;
|
|
_cache?.Clear();
|
|
|
|
// Prune empty nodes (walk backwards)
|
|
for (int i = pathList.Count - 1; i >= 0; i--)
|
|
{
|
|
var (l, n, t, isPwc, isFwc) = pathList[i];
|
|
if (n.IsEmpty)
|
|
{
|
|
if (isPwc) l.Pwc = null;
|
|
else if (isFwc) l.Fwc = null;
|
|
else l.Nodes.Remove(t);
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
_lock.ExitWriteLock();
|
|
}
|
|
}
|
|
|
|
public SubListResult Match(string subject)
|
|
{
|
|
_lock.EnterReadLock();
|
|
bool upgraded = false;
|
|
try
|
|
{
|
|
// Check cache
|
|
if (_cache != null && _cache.TryGetValue(subject, out var cached))
|
|
return cached;
|
|
|
|
// Cache miss — collect results
|
|
var plainSubs = new List<Subscription>();
|
|
var queueSubs = new List<List<Subscription>>();
|
|
|
|
MatchLevel(_root, subject, 0, plainSubs, queueSubs);
|
|
|
|
SubListResult result;
|
|
if (plainSubs.Count == 0 && queueSubs.Count == 0)
|
|
{
|
|
result = SubListResult.Empty;
|
|
}
|
|
else
|
|
{
|
|
result = new SubListResult(
|
|
plainSubs.ToArray(),
|
|
queueSubs.Select(q => q.ToArray()).ToArray()
|
|
);
|
|
}
|
|
|
|
// Upgrade to write lock for cache update
|
|
if (_cache != null)
|
|
{
|
|
_lock.ExitReadLock();
|
|
_lock.EnterWriteLock();
|
|
upgraded = true;
|
|
|
|
// Re-check cache after lock upgrade
|
|
if (_cache.TryGetValue(subject, out cached))
|
|
return cached;
|
|
|
|
_cache[subject] = result;
|
|
|
|
if (_cache.Count > CacheMax)
|
|
{
|
|
// Sweep: remove entries until at CacheSweep
|
|
var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList();
|
|
foreach (var key in keys)
|
|
_cache.Remove(key);
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
finally
|
|
{
|
|
if (upgraded)
|
|
_lock.ExitWriteLock();
|
|
else
|
|
_lock.ExitReadLock();
|
|
}
|
|
}
|
|
|
|
private static void MatchLevel(TrieLevel level, string subject, int startIndex,
|
|
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
|
|
{
|
|
int i = startIndex;
|
|
|
|
while (i <= subject.Length)
|
|
{
|
|
// Find next token boundary
|
|
int tokenStart = i;
|
|
while (i < subject.Length && subject[i] != SubjectMatch.Sep)
|
|
i++;
|
|
var token = subject.AsSpan(tokenStart, i - tokenStart);
|
|
bool isLast = i >= subject.Length;
|
|
if (!isLast) i++; // skip separator
|
|
|
|
// Full wildcard (>) at this level matches all remaining tokens
|
|
if (level.Fwc != null)
|
|
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
|
|
|
|
// Partial wildcard (*) — recurse with remaining tokens
|
|
if (level.Pwc != null)
|
|
{
|
|
if (isLast)
|
|
{
|
|
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
|
|
}
|
|
else if (level.Pwc.Next != null)
|
|
{
|
|
MatchLevel(level.Pwc.Next, subject, i, plainSubs, queueSubs);
|
|
}
|
|
}
|
|
|
|
// Literal match
|
|
if (level.Nodes.TryGetValue(token.ToString(), out var node))
|
|
{
|
|
if (isLast)
|
|
{
|
|
AddNodeToResults(node, plainSubs, queueSubs);
|
|
}
|
|
else if (node.Next != null)
|
|
{
|
|
level = node.Next;
|
|
}
|
|
else
|
|
{
|
|
return; // no further levels
|
|
}
|
|
}
|
|
else
|
|
{
|
|
return; // no literal match
|
|
}
|
|
|
|
if (isLast)
|
|
return;
|
|
}
|
|
}
|
|
|
|
private static void AddNodeToResults(TrieNode node,
|
|
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
|
|
{
|
|
// Add plain subscriptions
|
|
foreach (var sub in node.PlainSubs)
|
|
plainSubs.Add(sub);
|
|
|
|
// Add queue subscriptions grouped by queue name
|
|
foreach (var (queueName, subs) in node.QueueSubs)
|
|
{
|
|
if (subs.Count == 0) continue;
|
|
|
|
// Find existing queue group or create new one
|
|
List<Subscription>? existing = null;
|
|
foreach (var qs in queueSubs)
|
|
{
|
|
if (qs.Count > 0 && qs[0].Queue == queueName)
|
|
{
|
|
existing = qs;
|
|
break;
|
|
}
|
|
}
|
|
if (existing == null)
|
|
{
|
|
existing = new List<Subscription>();
|
|
queueSubs.Add(existing);
|
|
}
|
|
existing.AddRange(subs);
|
|
}
|
|
}
|
|
|
|
/// <summary>Enumerates '.' separated tokens in a subject without allocating.</summary>
|
|
private ref struct TokenEnumerator
|
|
{
|
|
private ReadOnlySpan<char> _remaining;
|
|
|
|
public TokenEnumerator(string subject)
|
|
{
|
|
_remaining = subject.AsSpan();
|
|
Current = default;
|
|
}
|
|
|
|
public ReadOnlySpan<char> Current { get; private set; }
|
|
|
|
public TokenEnumerator GetEnumerator() => this;
|
|
|
|
public bool MoveNext()
|
|
{
|
|
if (_remaining.IsEmpty)
|
|
return false;
|
|
|
|
int sep = _remaining.IndexOf(SubjectMatch.Sep);
|
|
if (sep < 0)
|
|
{
|
|
Current = _remaining;
|
|
_remaining = default;
|
|
}
|
|
else
|
|
{
|
|
Current = _remaining[..sep];
|
|
_remaining = _remaining[(sep + 1)..];
|
|
}
|
|
return true;
|
|
}
|
|
}
|
|
|
|
private sealed class TrieLevel
|
|
{
|
|
public readonly Dictionary<string, TrieNode> Nodes = new();
|
|
public TrieNode? Pwc; // partial wildcard (*)
|
|
public TrieNode? Fwc; // full wildcard (>)
|
|
}
|
|
|
|
private sealed class TrieNode
|
|
{
|
|
public TrieLevel? Next;
|
|
public readonly HashSet<Subscription> PlainSubs = [];
|
|
public readonly Dictionary<string, HashSet<Subscription>> QueueSubs = new();
|
|
|
|
public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 &&
|
|
(Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null));
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 4: Run tests to verify they pass**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubListTests" -v quiet`
|
|
Expected: All tests PASS.
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/NATS.Server/Subscriptions/SubList.cs tests/NATS.Server.Tests/SubListTests.cs
|
|
git commit -m "feat: implement SubList trie with wildcard matching and cache"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: Protocol Constants and Types
|
|
|
|
**Files:**
|
|
- Create: `src/NATS.Server/Protocol/NatsProtocol.cs`
|
|
- Create: `src/NATS.Server/NatsOptions.cs`
|
|
|
|
**Reference:** `golang/nats-server/server/const.go`, `golang/nats-server/server/server.go` lines 109-166 (Info struct), `golang/nats-server/server/client.go` lines 661-690 (ClientOpts)
|
|
|
|
**Step 1: Create protocol constants and DTO types**
|
|
|
|
```csharp
|
|
// src/NATS.Server/Protocol/NatsProtocol.cs
|
|
using System.Text;
|
|
using System.Text.Json.Serialization;
|
|
|
|
namespace NATS.Server.Protocol;
|
|
|
|
public static class NatsProtocol
|
|
{
|
|
public const int MaxControlLineSize = 4096;
|
|
public const int MaxPayloadSize = 1024 * 1024; // 1MB
|
|
public const int DefaultPort = 4222;
|
|
public const string Version = "0.1.0";
|
|
public const int ProtoVersion = 1;
|
|
|
|
// Pre-encoded protocol fragments
|
|
public static readonly byte[] CrLf = "\r\n"u8.ToArray();
|
|
public static readonly byte[] PingBytes = "PING\r\n"u8.ToArray();
|
|
public static readonly byte[] PongBytes = "PONG\r\n"u8.ToArray();
|
|
public static readonly byte[] OkBytes = "+OK\r\n"u8.ToArray();
|
|
public static readonly byte[] InfoPrefix = "INFO "u8.ToArray();
|
|
public static readonly byte[] MsgPrefix = "MSG "u8.ToArray();
|
|
public static readonly byte[] HmsgPrefix = "HMSG "u8.ToArray();
|
|
public static readonly byte[] ErrPrefix = "-ERR "u8.ToArray();
|
|
}
|
|
|
|
public sealed class ServerInfo
|
|
{
|
|
[JsonPropertyName("server_id")]
|
|
public required string ServerId { get; set; }
|
|
|
|
[JsonPropertyName("server_name")]
|
|
public required string ServerName { get; set; }
|
|
|
|
[JsonPropertyName("version")]
|
|
public required string Version { get; set; }
|
|
|
|
[JsonPropertyName("proto")]
|
|
public int Proto { get; set; } = NatsProtocol.ProtoVersion;
|
|
|
|
[JsonPropertyName("host")]
|
|
public required string Host { get; set; }
|
|
|
|
[JsonPropertyName("port")]
|
|
public int Port { get; set; }
|
|
|
|
[JsonPropertyName("headers")]
|
|
public bool Headers { get; set; } = true;
|
|
|
|
[JsonPropertyName("max_payload")]
|
|
public int MaxPayload { get; set; } = NatsProtocol.MaxPayloadSize;
|
|
|
|
[JsonPropertyName("client_id")]
|
|
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
|
public ulong ClientId { get; set; }
|
|
|
|
[JsonPropertyName("client_ip")]
|
|
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
|
public string? ClientIp { get; set; }
|
|
}
|
|
|
|
public sealed class ClientOptions
|
|
{
|
|
[JsonPropertyName("verbose")]
|
|
public bool Verbose { get; set; }
|
|
|
|
[JsonPropertyName("pedantic")]
|
|
public bool Pedantic { get; set; }
|
|
|
|
[JsonPropertyName("echo")]
|
|
public bool Echo { get; set; } = true;
|
|
|
|
[JsonPropertyName("name")]
|
|
public string? Name { get; set; }
|
|
|
|
[JsonPropertyName("lang")]
|
|
public string? Lang { get; set; }
|
|
|
|
[JsonPropertyName("version")]
|
|
public string? Version { get; set; }
|
|
|
|
[JsonPropertyName("protocol")]
|
|
public int Protocol { get; set; }
|
|
|
|
[JsonPropertyName("headers")]
|
|
public bool Headers { get; set; }
|
|
|
|
[JsonPropertyName("no_responders")]
|
|
public bool NoResponders { get; set; }
|
|
}
|
|
```
|
|
|
|
```csharp
|
|
// src/NATS.Server/NatsOptions.cs
|
|
namespace NATS.Server;
|
|
|
|
public sealed class NatsOptions
|
|
{
|
|
public string Host { get; set; } = "0.0.0.0";
|
|
public int Port { get; set; } = 4222;
|
|
public string? ServerName { get; set; }
|
|
public int MaxPayload { get; set; } = 1024 * 1024; // 1MB
|
|
public int MaxControlLine { get; set; } = 4096;
|
|
public int MaxConnections { get; set; } = 65536;
|
|
public TimeSpan PingInterval { get; set; } = TimeSpan.FromMinutes(2);
|
|
public int MaxPingsOut { get; set; } = 2;
|
|
}
|
|
```
|
|
|
|
**Step 2: Verify build**
|
|
|
|
Run: `dotnet build NatsDotNet.sln`
|
|
Expected: Build succeeded.
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/NATS.Server/Protocol/NatsProtocol.cs src/NATS.Server/NatsOptions.cs
|
|
git commit -m "feat: add protocol constants, ServerInfo, ClientOptions, and NatsOptions"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 4: Protocol Parser
|
|
|
|
**Files:**
|
|
- Create: `src/NATS.Server/Protocol/NatsParser.cs`
|
|
- Test: `tests/NATS.Server.Tests/ParserTests.cs`
|
|
|
|
**Reference:** `golang/nats-server/server/parser.go` — state machine, argument parsing, payload reading
|
|
|
|
**Step 1: Write failing parser tests**
|
|
|
|
```csharp
|
|
// tests/NATS.Server.Tests/ParserTests.cs
|
|
using System.Buffers;
|
|
using System.IO.Pipelines;
|
|
using System.Text;
|
|
using NATS.Server.Protocol;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
public class ParserTests
|
|
{
|
|
private static async Task<List<ParsedCommand>> ParseAsync(string input)
|
|
{
|
|
var pipe = new Pipe();
|
|
var commands = new List<ParsedCommand>();
|
|
|
|
// Write input to pipe
|
|
var bytes = Encoding.ASCII.GetBytes(input);
|
|
await pipe.Writer.WriteAsync(bytes);
|
|
pipe.Writer.Complete();
|
|
|
|
// Parse from pipe
|
|
var parser = new NatsParser(maxPayload: NatsProtocol.MaxPayloadSize);
|
|
while (true)
|
|
{
|
|
var result = await pipe.Reader.ReadAsync();
|
|
var buffer = result.Buffer;
|
|
|
|
while (parser.TryParse(ref buffer, out var cmd))
|
|
commands.Add(cmd);
|
|
|
|
pipe.Reader.AdvanceTo(buffer.Start, buffer.End);
|
|
|
|
if (result.IsCompleted)
|
|
break;
|
|
}
|
|
|
|
return commands;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_PING()
|
|
{
|
|
var cmds = await ParseAsync("PING\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Ping, cmds[0].Type);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_PONG()
|
|
{
|
|
var cmds = await ParseAsync("PONG\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Pong, cmds[0].Type);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_CONNECT()
|
|
{
|
|
var cmds = await ParseAsync("CONNECT {\"verbose\":false,\"echo\":true}\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Connect, cmds[0].Type);
|
|
Assert.Contains("verbose", Encoding.ASCII.GetString(cmds[0].Payload.ToArray()));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_SUB_without_queue()
|
|
{
|
|
var cmds = await ParseAsync("SUB foo 1\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Sub, cmds[0].Type);
|
|
Assert.Equal("foo", cmds[0].Subject);
|
|
Assert.Null(cmds[0].Queue);
|
|
Assert.Equal("1", cmds[0].Sid);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_SUB_with_queue()
|
|
{
|
|
var cmds = await ParseAsync("SUB foo workers 1\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Sub, cmds[0].Type);
|
|
Assert.Equal("foo", cmds[0].Subject);
|
|
Assert.Equal("workers", cmds[0].Queue);
|
|
Assert.Equal("1", cmds[0].Sid);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_UNSUB()
|
|
{
|
|
var cmds = await ParseAsync("UNSUB 1\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Unsub, cmds[0].Type);
|
|
Assert.Equal("1", cmds[0].Sid);
|
|
Assert.Equal(-1, cmds[0].MaxMessages);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_UNSUB_with_max()
|
|
{
|
|
var cmds = await ParseAsync("UNSUB 1 5\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Unsub, cmds[0].Type);
|
|
Assert.Equal("1", cmds[0].Sid);
|
|
Assert.Equal(5, cmds[0].MaxMessages);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_PUB_with_payload()
|
|
{
|
|
var cmds = await ParseAsync("PUB foo 5\r\nHello\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Pub, cmds[0].Type);
|
|
Assert.Equal("foo", cmds[0].Subject);
|
|
Assert.Null(cmds[0].ReplyTo);
|
|
Assert.Equal("Hello", Encoding.ASCII.GetString(cmds[0].Payload.ToArray()));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_PUB_with_reply()
|
|
{
|
|
var cmds = await ParseAsync("PUB foo reply 5\r\nHello\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Pub, cmds[0].Type);
|
|
Assert.Equal("foo", cmds[0].Subject);
|
|
Assert.Equal("reply", cmds[0].ReplyTo);
|
|
Assert.Equal("Hello", Encoding.ASCII.GetString(cmds[0].Payload.ToArray()));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_multiple_commands()
|
|
{
|
|
var cmds = await ParseAsync("PING\r\nPONG\r\nSUB foo 1\r\n");
|
|
Assert.Equal(3, cmds.Count);
|
|
Assert.Equal(CommandType.Ping, cmds[0].Type);
|
|
Assert.Equal(CommandType.Pong, cmds[1].Type);
|
|
Assert.Equal(CommandType.Sub, cmds[2].Type);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_PUB_zero_payload()
|
|
{
|
|
var cmds = await ParseAsync("PUB foo 0\r\n\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Pub, cmds[0].Type);
|
|
Assert.Empty(cmds[0].Payload.ToArray());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_case_insensitive()
|
|
{
|
|
var cmds = await ParseAsync("ping\r\npub FOO 3\r\nabc\r\n");
|
|
Assert.Equal(2, cmds.Count);
|
|
Assert.Equal(CommandType.Ping, cmds[0].Type);
|
|
Assert.Equal(CommandType.Pub, cmds[1].Type);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_HPUB()
|
|
{
|
|
// HPUB subject 12 17\r\nNATS/1.0\r\n\r\nHello\r\n
|
|
var header = "NATS/1.0\r\n\r\n";
|
|
var payload = "Hello";
|
|
var total = header.Length + payload.Length;
|
|
var cmds = await ParseAsync($"HPUB foo {header.Length} {total}\r\n{header}{payload}\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.HPub, cmds[0].Type);
|
|
Assert.Equal("foo", cmds[0].Subject);
|
|
Assert.Equal(header.Length, cmds[0].HeaderSize);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Parse_INFO()
|
|
{
|
|
var cmds = await ParseAsync("INFO {\"server_id\":\"test\"}\r\n");
|
|
Assert.Single(cmds);
|
|
Assert.Equal(CommandType.Info, cmds[0].Type);
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 2: Run tests to verify they fail**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ParserTests" -v quiet`
|
|
Expected: FAIL — `NatsParser` doesn't exist.
|
|
|
|
**Step 3: Implement NatsParser**
|
|
|
|
```csharp
|
|
// src/NATS.Server/Protocol/NatsParser.cs
|
|
using System.Buffers;
|
|
using System.Text;
|
|
|
|
namespace NATS.Server.Protocol;
|
|
|
|
public enum CommandType
|
|
{
|
|
Ping,
|
|
Pong,
|
|
Connect,
|
|
Info,
|
|
Pub,
|
|
HPub,
|
|
Sub,
|
|
Unsub,
|
|
Ok,
|
|
Err,
|
|
}
|
|
|
|
public readonly struct ParsedCommand
|
|
{
|
|
public CommandType Type { get; init; }
|
|
public string? Subject { get; init; }
|
|
public string? ReplyTo { get; init; }
|
|
public string? Queue { get; init; }
|
|
public string? Sid { get; init; }
|
|
public int MaxMessages { get; init; }
|
|
public int HeaderSize { get; init; }
|
|
public ReadOnlyMemory<byte> Payload { get; init; }
|
|
|
|
public static ParsedCommand Simple(CommandType type) => new() { Type = type, MaxMessages = -1 };
|
|
}
|
|
|
|
public sealed class NatsParser
|
|
{
|
|
private static readonly byte[] CrLf = "\r\n"u8.ToArray();
|
|
private readonly int _maxPayload;
|
|
|
|
// State for split-packet payload reading
|
|
private bool _awaitingPayload;
|
|
private int _expectedPayloadSize;
|
|
private string? _pendingSubject;
|
|
private string? _pendingReplyTo;
|
|
private int _pendingHeaderSize;
|
|
private CommandType _pendingType;
|
|
|
|
public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize)
|
|
{
|
|
_maxPayload = maxPayload;
|
|
}
|
|
|
|
public bool TryParse(ref ReadOnlySequence<byte> buffer, out ParsedCommand command)
|
|
{
|
|
command = default;
|
|
|
|
if (_awaitingPayload)
|
|
return TryReadPayload(ref buffer, out command);
|
|
|
|
// Look for \r\n to find control line
|
|
var reader = new SequenceReader<byte>(buffer);
|
|
if (!reader.TryReadTo(out ReadOnlySequence<byte> line, CrLf.AsSpan()))
|
|
return false;
|
|
|
|
// Control line size check
|
|
if (line.Length > NatsProtocol.MaxControlLineSize)
|
|
throw new ProtocolViolationException("Maximum control line exceeded");
|
|
|
|
// Get line as contiguous span
|
|
Span<byte> lineSpan = stackalloc byte[(int)line.Length];
|
|
line.CopyTo(lineSpan);
|
|
|
|
// Identify command by first bytes
|
|
if (lineSpan.Length < 2)
|
|
{
|
|
if (lineSpan.Length == 1 && lineSpan[0] is (byte)'+')
|
|
{
|
|
// partial — need more data
|
|
return false;
|
|
}
|
|
throw new ProtocolViolationException("Unknown protocol operation");
|
|
}
|
|
|
|
byte b0 = (byte)(lineSpan[0] | 0x20); // lowercase
|
|
byte b1 = (byte)(lineSpan[1] | 0x20);
|
|
|
|
switch (b0)
|
|
{
|
|
case (byte)'p':
|
|
if (b1 == (byte)'i') // PING
|
|
{
|
|
command = ParsedCommand.Simple(CommandType.Ping);
|
|
buffer = buffer.Slice(reader.Position);
|
|
return true;
|
|
}
|
|
if (b1 == (byte)'o') // PONG
|
|
{
|
|
command = ParsedCommand.Simple(CommandType.Pong);
|
|
buffer = buffer.Slice(reader.Position);
|
|
return true;
|
|
}
|
|
if (b1 == (byte)'u') // PUB
|
|
{
|
|
return ParsePub(lineSpan, ref buffer, reader.Position, out command);
|
|
}
|
|
break;
|
|
|
|
case (byte)'h':
|
|
if (b1 == (byte)'p') // HPUB
|
|
{
|
|
return ParseHPub(lineSpan, ref buffer, reader.Position, out command);
|
|
}
|
|
break;
|
|
|
|
case (byte)'s':
|
|
if (b1 == (byte)'u') // SUB
|
|
{
|
|
command = ParseSub(lineSpan);
|
|
buffer = buffer.Slice(reader.Position);
|
|
return true;
|
|
}
|
|
break;
|
|
|
|
case (byte)'u':
|
|
if (b1 == (byte)'n') // UNSUB
|
|
{
|
|
command = ParseUnsub(lineSpan);
|
|
buffer = buffer.Slice(reader.Position);
|
|
return true;
|
|
}
|
|
break;
|
|
|
|
case (byte)'c':
|
|
if (b1 == (byte)'o') // CONNECT
|
|
{
|
|
command = ParseConnect(lineSpan);
|
|
buffer = buffer.Slice(reader.Position);
|
|
return true;
|
|
}
|
|
break;
|
|
|
|
case (byte)'i':
|
|
if (b1 == (byte)'n') // INFO
|
|
{
|
|
command = ParseInfo(lineSpan);
|
|
buffer = buffer.Slice(reader.Position);
|
|
return true;
|
|
}
|
|
break;
|
|
|
|
case (byte)'+': // +OK
|
|
command = ParsedCommand.Simple(CommandType.Ok);
|
|
buffer = buffer.Slice(reader.Position);
|
|
return true;
|
|
|
|
case (byte)'-': // -ERR
|
|
command = ParsedCommand.Simple(CommandType.Err);
|
|
buffer = buffer.Slice(reader.Position);
|
|
return true;
|
|
}
|
|
|
|
throw new ProtocolViolationException($"Unknown protocol operation");
|
|
}
|
|
|
|
private bool ParsePub(Span<byte> line, ref ReadOnlySequence<byte> buffer,
|
|
SequencePosition afterLine, out ParsedCommand command)
|
|
{
|
|
command = default;
|
|
|
|
// PUB subject [reply] size
|
|
// Skip "PUB "
|
|
var args = SplitArgs(line[4..]);
|
|
|
|
string subject;
|
|
string? reply = null;
|
|
int size;
|
|
|
|
if (args.Length == 2)
|
|
{
|
|
subject = Encoding.ASCII.GetString(args[0]);
|
|
size = ParseSize(args[1]);
|
|
}
|
|
else if (args.Length == 3)
|
|
{
|
|
subject = Encoding.ASCII.GetString(args[0]);
|
|
reply = Encoding.ASCII.GetString(args[1]);
|
|
size = ParseSize(args[2]);
|
|
}
|
|
else
|
|
{
|
|
throw new ProtocolViolationException("Invalid PUB arguments");
|
|
}
|
|
|
|
if (size < 0 || size > _maxPayload)
|
|
throw new ProtocolViolationException("Invalid payload size");
|
|
|
|
// Now read payload + \r\n
|
|
buffer = buffer.Slice(afterLine);
|
|
_awaitingPayload = true;
|
|
_expectedPayloadSize = size;
|
|
_pendingSubject = subject;
|
|
_pendingReplyTo = reply;
|
|
_pendingHeaderSize = -1;
|
|
_pendingType = CommandType.Pub;
|
|
|
|
return TryReadPayload(ref buffer, out command);
|
|
}
|
|
|
|
private bool ParseHPub(Span<byte> line, ref ReadOnlySequence<byte> buffer,
|
|
SequencePosition afterLine, out ParsedCommand command)
|
|
{
|
|
command = default;
|
|
|
|
// HPUB subject [reply] hdr_size total_size
|
|
// Skip "HPUB "
|
|
var args = SplitArgs(line[5..]);
|
|
|
|
string subject;
|
|
string? reply = null;
|
|
int hdrSize, totalSize;
|
|
|
|
if (args.Length == 3)
|
|
{
|
|
subject = Encoding.ASCII.GetString(args[0]);
|
|
hdrSize = ParseSize(args[1]);
|
|
totalSize = ParseSize(args[2]);
|
|
}
|
|
else if (args.Length == 4)
|
|
{
|
|
subject = Encoding.ASCII.GetString(args[0]);
|
|
reply = Encoding.ASCII.GetString(args[1]);
|
|
hdrSize = ParseSize(args[2]);
|
|
totalSize = ParseSize(args[3]);
|
|
}
|
|
else
|
|
{
|
|
throw new ProtocolViolationException("Invalid HPUB arguments");
|
|
}
|
|
|
|
if (hdrSize < 0 || totalSize < 0 || hdrSize > totalSize || totalSize > _maxPayload)
|
|
throw new ProtocolViolationException("Invalid HPUB sizes");
|
|
|
|
buffer = buffer.Slice(afterLine);
|
|
_awaitingPayload = true;
|
|
_expectedPayloadSize = totalSize;
|
|
_pendingSubject = subject;
|
|
_pendingReplyTo = reply;
|
|
_pendingHeaderSize = hdrSize;
|
|
_pendingType = CommandType.HPub;
|
|
|
|
return TryReadPayload(ref buffer, out command);
|
|
}
|
|
|
|
private bool TryReadPayload(ref ReadOnlySequence<byte> buffer, out ParsedCommand command)
|
|
{
|
|
command = default;
|
|
|
|
// Need: _expectedPayloadSize bytes + \r\n
|
|
long needed = _expectedPayloadSize + 2; // payload + \r\n
|
|
if (buffer.Length < needed)
|
|
return false;
|
|
|
|
// Extract payload
|
|
var payloadSlice = buffer.Slice(0, _expectedPayloadSize);
|
|
var payload = new byte[_expectedPayloadSize];
|
|
payloadSlice.CopyTo(payload);
|
|
|
|
// Verify \r\n after payload
|
|
var trailer = buffer.Slice(_expectedPayloadSize, 2);
|
|
Span<byte> trailerBytes = stackalloc byte[2];
|
|
trailer.CopyTo(trailerBytes);
|
|
if (trailerBytes[0] != (byte)'\r' || trailerBytes[1] != (byte)'\n')
|
|
throw new ProtocolViolationException("Expected \\r\\n after payload");
|
|
|
|
command = new ParsedCommand
|
|
{
|
|
Type = _pendingType,
|
|
Subject = _pendingSubject,
|
|
ReplyTo = _pendingReplyTo,
|
|
Payload = payload,
|
|
HeaderSize = _pendingHeaderSize,
|
|
MaxMessages = -1,
|
|
};
|
|
|
|
buffer = buffer.Slice(buffer.GetPosition(needed));
|
|
_awaitingPayload = false;
|
|
return true;
|
|
}
|
|
|
|
private static ParsedCommand ParseSub(Span<byte> line)
|
|
{
|
|
// SUB subject [queue] sid — skip "SUB "
|
|
var args = SplitArgs(line[4..]);
|
|
|
|
return args.Length switch
|
|
{
|
|
2 => new ParsedCommand
|
|
{
|
|
Type = CommandType.Sub,
|
|
Subject = Encoding.ASCII.GetString(args[0]),
|
|
Sid = Encoding.ASCII.GetString(args[1]),
|
|
MaxMessages = -1,
|
|
},
|
|
3 => new ParsedCommand
|
|
{
|
|
Type = CommandType.Sub,
|
|
Subject = Encoding.ASCII.GetString(args[0]),
|
|
Queue = Encoding.ASCII.GetString(args[1]),
|
|
Sid = Encoding.ASCII.GetString(args[2]),
|
|
MaxMessages = -1,
|
|
},
|
|
_ => throw new ProtocolViolationException("Invalid SUB arguments"),
|
|
};
|
|
}
|
|
|
|
private static ParsedCommand ParseUnsub(Span<byte> line)
|
|
{
|
|
// UNSUB sid [max_msgs] — skip "UNSUB "
|
|
var args = SplitArgs(line[6..]);
|
|
|
|
return args.Length switch
|
|
{
|
|
1 => new ParsedCommand
|
|
{
|
|
Type = CommandType.Unsub,
|
|
Sid = Encoding.ASCII.GetString(args[0]),
|
|
MaxMessages = -1,
|
|
},
|
|
2 => new ParsedCommand
|
|
{
|
|
Type = CommandType.Unsub,
|
|
Sid = Encoding.ASCII.GetString(args[0]),
|
|
MaxMessages = ParseSize(args[1]),
|
|
},
|
|
_ => throw new ProtocolViolationException("Invalid UNSUB arguments"),
|
|
};
|
|
}
|
|
|
|
private static ParsedCommand ParseConnect(Span<byte> line)
|
|
{
|
|
// CONNECT {json} — skip "CONNECT "
|
|
int spaceIdx = line.IndexOf((byte)' ');
|
|
if (spaceIdx < 0)
|
|
throw new ProtocolViolationException("Invalid CONNECT");
|
|
|
|
var json = line[(spaceIdx + 1)..];
|
|
return new ParsedCommand
|
|
{
|
|
Type = CommandType.Connect,
|
|
Payload = json.ToArray(),
|
|
MaxMessages = -1,
|
|
};
|
|
}
|
|
|
|
private static ParsedCommand ParseInfo(Span<byte> line)
|
|
{
|
|
// INFO {json} — skip "INFO "
|
|
int spaceIdx = line.IndexOf((byte)' ');
|
|
if (spaceIdx < 0)
|
|
throw new ProtocolViolationException("Invalid INFO");
|
|
|
|
var json = line[(spaceIdx + 1)..];
|
|
return new ParsedCommand
|
|
{
|
|
Type = CommandType.Info,
|
|
Payload = json.ToArray(),
|
|
MaxMessages = -1,
|
|
};
|
|
}
|
|
|
|
/// <summary>Parse a decimal integer from ASCII bytes. Returns -1 on error.</summary>
|
|
private static int ParseSize(Span<byte> data)
|
|
{
|
|
if (data.Length == 0 || data.Length > 9)
|
|
return -1;
|
|
int n = 0;
|
|
foreach (byte b in data)
|
|
{
|
|
if (b < (byte)'0' || b > (byte)'9')
|
|
return -1;
|
|
n = n * 10 + (b - '0');
|
|
}
|
|
return n;
|
|
}
|
|
|
|
/// <summary>Split by spaces/tabs into up to 6 arguments. Returns Span of Spans via ranges.</summary>
|
|
private static SpanArgs SplitArgs(Span<byte> data)
|
|
{
|
|
var result = new SpanArgs();
|
|
int start = -1;
|
|
|
|
for (int i = 0; i < data.Length; i++)
|
|
{
|
|
byte b = data[i];
|
|
if (b is (byte)' ' or (byte)'\t')
|
|
{
|
|
if (start >= 0)
|
|
{
|
|
result.Add(data[start..i]);
|
|
start = -1;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (start < 0) start = i;
|
|
}
|
|
}
|
|
if (start >= 0)
|
|
result.Add(data[start..]);
|
|
|
|
return result;
|
|
}
|
|
|
|
private ref struct SpanArgs
|
|
{
|
|
private Span<byte> _a0, _a1, _a2, _a3, _a4, _a5;
|
|
private int _count;
|
|
|
|
public readonly int Length => _count;
|
|
|
|
public void Add(Span<byte> arg)
|
|
{
|
|
switch (_count)
|
|
{
|
|
case 0: _a0 = arg; break;
|
|
case 1: _a1 = arg; break;
|
|
case 2: _a2 = arg; break;
|
|
case 3: _a3 = arg; break;
|
|
case 4: _a4 = arg; break;
|
|
case 5: _a5 = arg; break;
|
|
default: throw new ProtocolViolationException("Too many arguments");
|
|
}
|
|
_count++;
|
|
}
|
|
|
|
public readonly Span<byte> this[int index] => index switch
|
|
{
|
|
0 => _a0, 1 => _a1, 2 => _a2, 3 => _a3, 4 => _a4, 5 => _a5,
|
|
_ => throw new IndexOutOfRangeException(),
|
|
};
|
|
}
|
|
}
|
|
|
|
public class ProtocolViolationException : Exception
|
|
{
|
|
public ProtocolViolationException(string message) : base(message) { }
|
|
}
|
|
```
|
|
|
|
**Step 4: Run tests to verify they pass**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ParserTests" -v quiet`
|
|
Expected: All tests PASS.
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/NATS.Server/Protocol/NatsParser.cs tests/NATS.Server.Tests/ParserTests.cs
|
|
git commit -m "feat: implement NATS protocol parser with System.IO.Pipelines"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 5: NatsClient — Connection Handler
|
|
|
|
**Files:**
|
|
- Create: `src/NATS.Server/NatsClient.cs`
|
|
- Test: `tests/NATS.Server.Tests/ClientTests.cs`
|
|
|
|
**Reference:** `golang/nats-server/server/client.go` — readLoop, processSub, processUnsub, processConnect, deliverMsg
|
|
|
|
**Step 1: Write failing client tests**
|
|
|
|
```csharp
|
|
// tests/NATS.Server.Tests/ClientTests.cs
|
|
using System.IO.Pipelines;
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using NATS.Server;
|
|
using NATS.Server.Protocol;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
public class ClientTests : IAsyncDisposable
|
|
{
|
|
private readonly Socket _serverSocket;
|
|
private readonly Socket _clientSocket;
|
|
private readonly NatsClient _natsClient;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
|
|
public ClientTests()
|
|
{
|
|
// Create connected socket pair via loopback
|
|
var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
|
listener.Listen(1);
|
|
var port = ((IPEndPoint)listener.LocalEndPoint!).Port;
|
|
|
|
_clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
_clientSocket.Connect(IPAddress.Loopback, port);
|
|
_serverSocket = listener.Accept();
|
|
listener.Dispose();
|
|
|
|
var serverInfo = new ServerInfo
|
|
{
|
|
ServerId = "test",
|
|
ServerName = "test",
|
|
Version = "0.1.0",
|
|
Host = "127.0.0.1",
|
|
Port = 4222,
|
|
};
|
|
|
|
_natsClient = new NatsClient(1, _serverSocket, new NatsOptions(), serverInfo);
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _cts.CancelAsync();
|
|
_natsClient.Dispose();
|
|
_clientSocket.Dispose();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Client_sends_INFO_on_start()
|
|
{
|
|
var runTask = _natsClient.RunAsync(_cts.Token);
|
|
|
|
// Read from client socket — should get INFO
|
|
var buf = new byte[4096];
|
|
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
|
|
var response = Encoding.ASCII.GetString(buf, 0, n);
|
|
|
|
Assert.StartsWith("INFO ", response);
|
|
Assert.Contains("server_id", response);
|
|
Assert.Contains("\r\n", response);
|
|
|
|
await _cts.CancelAsync();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Client_responds_PONG_to_PING()
|
|
{
|
|
var runTask = _natsClient.RunAsync(_cts.Token);
|
|
|
|
// Read INFO
|
|
var buf = new byte[4096];
|
|
await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
|
|
|
|
// Send CONNECT then PING
|
|
await _clientSocket.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n"));
|
|
|
|
// Read response — should get PONG
|
|
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
|
|
var response = Encoding.ASCII.GetString(buf, 0, n);
|
|
|
|
Assert.Contains("PONG\r\n", response);
|
|
|
|
await _cts.CancelAsync();
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 2: Run tests to verify they fail**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ClientTests" -v quiet`
|
|
Expected: FAIL — `NatsClient` doesn't exist.
|
|
|
|
**Step 3: Implement NatsClient**
|
|
|
|
```csharp
|
|
// src/NATS.Server/NatsClient.cs
|
|
using System.Buffers;
|
|
using System.IO.Pipelines;
|
|
using System.Net.Sockets;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using NATS.Server.Protocol;
|
|
using NATS.Server.Subscriptions;
|
|
|
|
namespace NATS.Server;
|
|
|
|
public interface IMessageRouter
|
|
{
|
|
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
|
ReadOnlyMemory<byte> payload, NatsClient sender);
|
|
void RemoveClient(NatsClient client);
|
|
}
|
|
|
|
public sealed class NatsClient : IDisposable
|
|
{
|
|
private readonly Socket _socket;
|
|
private readonly NetworkStream _stream;
|
|
private readonly NatsOptions _options;
|
|
private readonly ServerInfo _serverInfo;
|
|
private readonly NatsParser _parser;
|
|
private readonly SemaphoreSlim _writeLock = new(1, 1);
|
|
private readonly Dictionary<string, Subscription> _subs = new();
|
|
|
|
public ulong Id { get; }
|
|
public ClientOptions? ClientOpts { get; private set; }
|
|
public IMessageRouter? Router { get; set; }
|
|
public bool ConnectReceived { get; private set; }
|
|
|
|
// Stats
|
|
public long InMsgs;
|
|
public long OutMsgs;
|
|
public long InBytes;
|
|
public long OutBytes;
|
|
|
|
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
|
|
|
|
public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo)
|
|
{
|
|
Id = id;
|
|
_socket = socket;
|
|
_stream = new NetworkStream(socket, ownsSocket: false);
|
|
_options = options;
|
|
_serverInfo = serverInfo;
|
|
_parser = new NatsParser(options.MaxPayload);
|
|
}
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
var pipe = new Pipe();
|
|
try
|
|
{
|
|
// Send INFO
|
|
await SendInfoAsync(ct);
|
|
|
|
// Start read pump and command processing in parallel
|
|
var fillTask = FillPipeAsync(pipe.Writer, ct);
|
|
var processTask = ProcessCommandsAsync(pipe.Reader, ct);
|
|
|
|
await Task.WhenAny(fillTask, processTask);
|
|
}
|
|
catch (OperationCanceledException) { }
|
|
catch (Exception) { /* connection error — clean up */ }
|
|
finally
|
|
{
|
|
Router?.RemoveClient(this);
|
|
}
|
|
}
|
|
|
|
private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
var memory = writer.GetMemory(4096);
|
|
int bytesRead = await _stream.ReadAsync(memory, ct);
|
|
if (bytesRead == 0)
|
|
break;
|
|
|
|
writer.Advance(bytesRead);
|
|
var result = await writer.FlushAsync(ct);
|
|
if (result.IsCompleted)
|
|
break;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await writer.CompleteAsync();
|
|
}
|
|
}
|
|
|
|
private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
var result = await reader.ReadAsync(ct);
|
|
var buffer = result.Buffer;
|
|
|
|
while (_parser.TryParse(ref buffer, out var cmd))
|
|
{
|
|
await DispatchCommandAsync(cmd, ct);
|
|
}
|
|
|
|
reader.AdvanceTo(buffer.Start, buffer.End);
|
|
|
|
if (result.IsCompleted)
|
|
break;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await reader.CompleteAsync();
|
|
}
|
|
}
|
|
|
|
private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct)
|
|
{
|
|
switch (cmd.Type)
|
|
{
|
|
case CommandType.Connect:
|
|
ProcessConnect(cmd);
|
|
break;
|
|
|
|
case CommandType.Ping:
|
|
await WriteAsync(NatsProtocol.PongBytes, ct);
|
|
break;
|
|
|
|
case CommandType.Pong:
|
|
// Update RTT (placeholder)
|
|
break;
|
|
|
|
case CommandType.Sub:
|
|
ProcessSub(cmd);
|
|
break;
|
|
|
|
case CommandType.Unsub:
|
|
ProcessUnsub(cmd);
|
|
break;
|
|
|
|
case CommandType.Pub:
|
|
case CommandType.HPub:
|
|
ProcessPub(cmd);
|
|
break;
|
|
}
|
|
}
|
|
|
|
private void ProcessConnect(ParsedCommand cmd)
|
|
{
|
|
ClientOpts = JsonSerializer.Deserialize<ClientOptions>(cmd.Payload.Span)
|
|
?? new ClientOptions();
|
|
ConnectReceived = true;
|
|
}
|
|
|
|
private void ProcessSub(ParsedCommand cmd)
|
|
{
|
|
var sub = new Subscription
|
|
{
|
|
Subject = cmd.Subject!,
|
|
Queue = cmd.Queue,
|
|
Sid = cmd.Sid!,
|
|
};
|
|
|
|
_subs[cmd.Sid!] = sub;
|
|
sub.Client = this;
|
|
|
|
if (Router is ISubListAccess sl)
|
|
sl.SubList.Insert(sub);
|
|
}
|
|
|
|
private void ProcessUnsub(ParsedCommand cmd)
|
|
{
|
|
if (!_subs.TryGetValue(cmd.Sid!, out var sub))
|
|
return;
|
|
|
|
if (cmd.MaxMessages > 0)
|
|
{
|
|
sub.MaxMessages = cmd.MaxMessages;
|
|
// Will be cleaned up when MessageCount reaches MaxMessages
|
|
return;
|
|
}
|
|
|
|
_subs.Remove(cmd.Sid!);
|
|
|
|
if (Router is ISubListAccess sl)
|
|
sl.SubList.Remove(sub);
|
|
}
|
|
|
|
private void ProcessPub(ParsedCommand cmd)
|
|
{
|
|
Interlocked.Increment(ref InMsgs);
|
|
Interlocked.Add(ref InBytes, cmd.Payload.Length);
|
|
|
|
ReadOnlyMemory<byte> headers = default;
|
|
ReadOnlyMemory<byte> payload = cmd.Payload;
|
|
|
|
if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0)
|
|
{
|
|
headers = cmd.Payload[..cmd.HeaderSize];
|
|
payload = cmd.Payload[cmd.HeaderSize..];
|
|
}
|
|
|
|
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
|
|
}
|
|
|
|
private async Task SendInfoAsync(CancellationToken ct)
|
|
{
|
|
var infoJson = JsonSerializer.Serialize(_serverInfo);
|
|
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
|
|
await WriteAsync(infoLine, ct);
|
|
}
|
|
|
|
public async Task SendMessageAsync(string subject, string sid, string? replyTo,
|
|
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
|
{
|
|
Interlocked.Increment(ref OutMsgs);
|
|
Interlocked.Add(ref OutBytes, payload.Length + headers.Length);
|
|
|
|
byte[] line;
|
|
if (headers.Length > 0)
|
|
{
|
|
int totalSize = headers.Length + payload.Length;
|
|
line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n");
|
|
}
|
|
else
|
|
{
|
|
line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
|
|
}
|
|
|
|
await _writeLock.WaitAsync(ct);
|
|
try
|
|
{
|
|
await _stream.WriteAsync(line, ct);
|
|
if (headers.Length > 0)
|
|
await _stream.WriteAsync(headers, ct);
|
|
if (payload.Length > 0)
|
|
await _stream.WriteAsync(payload, ct);
|
|
await _stream.WriteAsync(NatsProtocol.CrLf, ct);
|
|
await _stream.FlushAsync(ct);
|
|
}
|
|
finally
|
|
{
|
|
_writeLock.Release();
|
|
}
|
|
}
|
|
|
|
private async Task WriteAsync(byte[] data, CancellationToken ct)
|
|
{
|
|
await _writeLock.WaitAsync(ct);
|
|
try
|
|
{
|
|
await _stream.WriteAsync(data, ct);
|
|
await _stream.FlushAsync(ct);
|
|
}
|
|
finally
|
|
{
|
|
_writeLock.Release();
|
|
}
|
|
}
|
|
|
|
public void RemoveAllSubscriptions(SubList subList)
|
|
{
|
|
foreach (var sub in _subs.Values)
|
|
subList.Remove(sub);
|
|
_subs.Clear();
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_stream.Dispose();
|
|
_socket.Dispose();
|
|
_writeLock.Dispose();
|
|
}
|
|
}
|
|
|
|
public interface ISubListAccess
|
|
{
|
|
SubList SubList { get; }
|
|
}
|
|
```
|
|
|
|
Update `Subscription.cs` to add the `Client` field:
|
|
|
|
```csharp
|
|
// Add to Subscription.cs
|
|
public NatsClient? Client { get; set; }
|
|
```
|
|
|
|
**Step 4: Run tests to verify they pass**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ClientTests" -v quiet`
|
|
Expected: All tests PASS.
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ClientTests.cs src/NATS.Server/Subscriptions/Subscription.cs
|
|
git commit -m "feat: implement NatsClient connection handler with read/write pipeline"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 6: NatsServer — Orchestrator
|
|
|
|
**Files:**
|
|
- Create: `src/NATS.Server/NatsServer.cs`
|
|
- Test: `tests/NATS.Server.Tests/ServerTests.cs`
|
|
|
|
**Reference:** `golang/nats-server/server/server.go` — NewServer, AcceptLoop, createClient, `golang/nats-server/server/client.go` — processMsgResults, deliverMsg
|
|
|
|
**Step 1: Write failing server tests**
|
|
|
|
```csharp
|
|
// tests/NATS.Server.Tests/ServerTests.cs
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using System.Text;
|
|
using NATS.Server;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
public class ServerTests : IAsyncDisposable
|
|
{
|
|
private readonly NatsServer _server;
|
|
private readonly int _port;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
|
|
public ServerTests()
|
|
{
|
|
// Use random port
|
|
_port = GetFreePort();
|
|
_server = new NatsServer(new NatsOptions { Port = _port });
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _cts.CancelAsync();
|
|
_server.Dispose();
|
|
}
|
|
|
|
private static int GetFreePort()
|
|
{
|
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
|
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
|
}
|
|
|
|
private async Task<Socket> ConnectClientAsync()
|
|
{
|
|
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await sock.ConnectAsync(IPAddress.Loopback, _port);
|
|
return sock;
|
|
}
|
|
|
|
private static async Task<string> ReadLineAsync(Socket sock, int bufSize = 4096)
|
|
{
|
|
var buf = new byte[bufSize];
|
|
var n = await sock.ReceiveAsync(buf, SocketFlags.None);
|
|
return Encoding.ASCII.GetString(buf, 0, n);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Server_accepts_connection_and_sends_INFO()
|
|
{
|
|
var serverTask = _server.StartAsync(_cts.Token);
|
|
await Task.Delay(100); // let server start
|
|
|
|
using var client = await ConnectClientAsync();
|
|
var response = await ReadLineAsync(client);
|
|
|
|
Assert.StartsWith("INFO ", response);
|
|
await _cts.CancelAsync();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Server_basic_pubsub()
|
|
{
|
|
var serverTask = _server.StartAsync(_cts.Token);
|
|
await Task.Delay(100);
|
|
|
|
using var pub = await ConnectClientAsync();
|
|
using var sub = await ConnectClientAsync();
|
|
|
|
// Read INFO from both
|
|
await ReadLineAsync(pub);
|
|
await ReadLineAsync(sub);
|
|
|
|
// CONNECT + SUB on subscriber
|
|
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\n"));
|
|
await Task.Delay(50);
|
|
|
|
// CONNECT + PUB on publisher
|
|
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo 5\r\nHello\r\n"));
|
|
await Task.Delay(100);
|
|
|
|
// Read MSG from subscriber
|
|
var buf = new byte[4096];
|
|
var n = await sub.ReceiveAsync(buf, SocketFlags.None);
|
|
var msg = Encoding.ASCII.GetString(buf, 0, n);
|
|
|
|
Assert.Contains("MSG foo 1 5\r\nHello\r\n", msg);
|
|
await _cts.CancelAsync();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Server_wildcard_matching()
|
|
{
|
|
var serverTask = _server.StartAsync(_cts.Token);
|
|
await Task.Delay(100);
|
|
|
|
using var pub = await ConnectClientAsync();
|
|
using var sub = await ConnectClientAsync();
|
|
|
|
await ReadLineAsync(pub);
|
|
await ReadLineAsync(sub);
|
|
|
|
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\n"));
|
|
await Task.Delay(50);
|
|
|
|
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.bar 5\r\nHello\r\n"));
|
|
await Task.Delay(100);
|
|
|
|
var buf = new byte[4096];
|
|
var n = await sub.ReceiveAsync(buf, SocketFlags.None);
|
|
var msg = Encoding.ASCII.GetString(buf, 0, n);
|
|
|
|
Assert.Contains("MSG foo.bar 1 5\r\n", msg);
|
|
await _cts.CancelAsync();
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 2: Run tests to verify they fail**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ServerTests" -v quiet`
|
|
Expected: FAIL — `NatsServer` doesn't exist.
|
|
|
|
**Step 3: Implement NatsServer**
|
|
|
|
```csharp
|
|
// src/NATS.Server/NatsServer.cs
|
|
using System.Collections.Concurrent;
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using NATS.Server.Protocol;
|
|
using NATS.Server.Subscriptions;
|
|
|
|
namespace NATS.Server;
|
|
|
|
public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|
{
|
|
private readonly NatsOptions _options;
|
|
private readonly ConcurrentDictionary<ulong, NatsClient> _clients = new();
|
|
private readonly SubList _subList = new();
|
|
private readonly ServerInfo _serverInfo;
|
|
private Socket? _listener;
|
|
private ulong _nextClientId;
|
|
|
|
public SubList SubList => _subList;
|
|
|
|
public NatsServer(NatsOptions options)
|
|
{
|
|
_options = options;
|
|
_serverInfo = new ServerInfo
|
|
{
|
|
ServerId = Guid.NewGuid().ToString("N")[..20].ToUpperInvariant(),
|
|
ServerName = options.ServerName ?? $"nats-dotnet-{Environment.MachineName}",
|
|
Version = NatsProtocol.Version,
|
|
Host = options.Host,
|
|
Port = options.Port,
|
|
MaxPayload = options.MaxPayload,
|
|
};
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken ct)
|
|
{
|
|
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
_listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
|
_listener.Bind(new IPEndPoint(
|
|
_options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host),
|
|
_options.Port));
|
|
_listener.Listen(128);
|
|
|
|
try
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
var socket = await _listener.AcceptAsync(ct);
|
|
var clientId = Interlocked.Increment(ref _nextClientId);
|
|
|
|
var client = new NatsClient(clientId, socket, _options, _serverInfo);
|
|
client.Router = this;
|
|
_clients[clientId] = client;
|
|
|
|
_ = RunClientAsync(client, ct);
|
|
}
|
|
}
|
|
catch (OperationCanceledException) { }
|
|
}
|
|
|
|
private async Task RunClientAsync(NatsClient client, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await client.RunAsync(ct);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
// Client disconnected or errored
|
|
}
|
|
finally
|
|
{
|
|
RemoveClient(client);
|
|
}
|
|
}
|
|
|
|
public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
|
ReadOnlyMemory<byte> payload, NatsClient sender)
|
|
{
|
|
var result = _subList.Match(subject);
|
|
|
|
// Deliver to plain subscribers
|
|
foreach (var sub in result.PlainSubs)
|
|
{
|
|
if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true))
|
|
continue;
|
|
|
|
DeliverMessage(sub, subject, replyTo, headers, payload);
|
|
}
|
|
|
|
// Deliver to one member of each queue group (round-robin)
|
|
foreach (var queueGroup in result.QueueSubs)
|
|
{
|
|
if (queueGroup.Length == 0) continue;
|
|
|
|
// Simple round-robin — pick based on total delivered across group
|
|
var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length;
|
|
// Undo the OutMsgs increment — it'll be incremented properly in SendMessageAsync
|
|
Interlocked.Decrement(ref sender.OutMsgs);
|
|
|
|
for (int attempt = 0; attempt < queueGroup.Length; attempt++)
|
|
{
|
|
var sub = queueGroup[(idx + attempt) % queueGroup.Length];
|
|
if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true)))
|
|
{
|
|
DeliverMessage(sub, subject, replyTo, headers, payload);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private static void DeliverMessage(Subscription sub, string subject, string? replyTo,
|
|
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
|
{
|
|
var client = sub.Client;
|
|
if (client == null) return;
|
|
|
|
// Check auto-unsub
|
|
var count = Interlocked.Increment(ref sub.MessageCount);
|
|
if (sub.MaxMessages > 0 && count > sub.MaxMessages)
|
|
return;
|
|
|
|
// Fire and forget — deliver asynchronously
|
|
_ = client.SendMessageAsync(subject, sub.Sid, replyTo, headers, payload, CancellationToken.None);
|
|
}
|
|
|
|
public void RemoveClient(NatsClient client)
|
|
{
|
|
_clients.TryRemove(client.Id, out _);
|
|
client.RemoveAllSubscriptions(_subList);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_listener?.Dispose();
|
|
foreach (var client in _clients.Values)
|
|
client.Dispose();
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 4: Run tests to verify they pass**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ServerTests" -v quiet`
|
|
Expected: All tests PASS.
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/ServerTests.cs
|
|
git commit -m "feat: implement NatsServer orchestrator with accept loop and message routing"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 7: Host Console Application
|
|
|
|
**Files:**
|
|
- Modify: `src/NATS.Server.Host/Program.cs`
|
|
|
|
**Reference:** `golang/nats-server/main.go`
|
|
|
|
**Step 1: Implement Program.cs**
|
|
|
|
```csharp
|
|
// src/NATS.Server.Host/Program.cs
|
|
using NATS.Server;
|
|
|
|
var options = new NatsOptions();
|
|
|
|
// Simple CLI argument parsing
|
|
for (int i = 0; i < args.Length; i++)
|
|
{
|
|
switch (args[i])
|
|
{
|
|
case "-p" or "--port" when i + 1 < args.Length:
|
|
options.Port = int.Parse(args[++i]);
|
|
break;
|
|
case "-a" or "--addr" when i + 1 < args.Length:
|
|
options.Host = args[++i];
|
|
break;
|
|
case "-n" or "--name" when i + 1 < args.Length:
|
|
options.ServerName = args[++i];
|
|
break;
|
|
}
|
|
}
|
|
|
|
var server = new NatsServer(options);
|
|
|
|
var cts = new CancellationTokenSource();
|
|
Console.CancelKeyPress += (_, e) =>
|
|
{
|
|
e.Cancel = true;
|
|
cts.Cancel();
|
|
};
|
|
|
|
Console.WriteLine($"[NATS] Listening on {options.Host}:{options.Port}");
|
|
|
|
try
|
|
{
|
|
await server.StartAsync(cts.Token);
|
|
}
|
|
catch (OperationCanceledException) { }
|
|
|
|
Console.WriteLine("[NATS] Server stopped.");
|
|
```
|
|
|
|
**Step 2: Verify it builds and runs**
|
|
|
|
Run: `dotnet build src/NATS.Server.Host/NATS.Server.Host.csproj`
|
|
Expected: Build succeeded.
|
|
|
|
Run (quick smoke): `timeout 2 dotnet run --project src/NATS.Server.Host -- -p 14222 || true`
|
|
Expected: Prints "Listening on 0.0.0.0:14222" then exits after timeout.
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/NATS.Server.Host/Program.cs
|
|
git commit -m "feat: add NATS.Server.Host console app with basic CLI arguments"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 8: Integration Tests with NATS.Client.Core
|
|
|
|
**Files:**
|
|
- Modify: `tests/NATS.Server.Tests/NATS.Server.Tests.csproj` (add NuGet ref)
|
|
- Create: `tests/NATS.Server.Tests/IntegrationTests.cs`
|
|
|
|
**Step 1: Add NATS client NuGet package**
|
|
|
|
```bash
|
|
dotnet add tests/NATS.Server.Tests/NATS.Server.Tests.csproj package NATS.Client.Core
|
|
```
|
|
|
|
**Step 2: Write integration tests**
|
|
|
|
```csharp
|
|
// tests/NATS.Server.Tests/IntegrationTests.cs
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using System.Text;
|
|
using NATS.Client.Core;
|
|
using NATS.Server;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
public class IntegrationTests : IAsyncDisposable
|
|
{
|
|
private readonly NatsServer _server;
|
|
private readonly int _port;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
private readonly Task _serverTask;
|
|
|
|
public IntegrationTests()
|
|
{
|
|
_port = GetFreePort();
|
|
_server = new NatsServer(new NatsOptions { Port = _port });
|
|
_serverTask = _server.StartAsync(_cts.Token);
|
|
Thread.Sleep(200); // Let server start
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _cts.CancelAsync();
|
|
_server.Dispose();
|
|
}
|
|
|
|
private static int GetFreePort()
|
|
{
|
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
|
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
|
}
|
|
|
|
private NatsConnection CreateClient()
|
|
{
|
|
var opts = new NatsOpts { Url = $"nats://127.0.0.1:{_port}" };
|
|
return new NatsConnection(opts);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PubSub_basic()
|
|
{
|
|
await using var pub = CreateClient();
|
|
await using var sub = CreateClient();
|
|
|
|
await pub.ConnectAsync();
|
|
await sub.ConnectAsync();
|
|
|
|
var received = new TaskCompletionSource<string>();
|
|
|
|
var subscription = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub.SubscribeAsync<string>("test.subject"))
|
|
{
|
|
received.TrySetResult(msg.Data!);
|
|
break;
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100); // let subscription register
|
|
|
|
await pub.PublishAsync("test.subject", "Hello NATS!");
|
|
|
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
Assert.Equal("Hello NATS!", result);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PubSub_wildcard_star()
|
|
{
|
|
await using var pub = CreateClient();
|
|
await using var sub = CreateClient();
|
|
|
|
await pub.ConnectAsync();
|
|
await sub.ConnectAsync();
|
|
|
|
var received = new TaskCompletionSource<string>();
|
|
|
|
var subscription = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub.SubscribeAsync<string>("test.*"))
|
|
{
|
|
received.TrySetResult(msg.Subject);
|
|
break;
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100);
|
|
await pub.PublishAsync("test.hello", "data");
|
|
|
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
Assert.Equal("test.hello", result);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PubSub_wildcard_gt()
|
|
{
|
|
await using var pub = CreateClient();
|
|
await using var sub = CreateClient();
|
|
|
|
await pub.ConnectAsync();
|
|
await sub.ConnectAsync();
|
|
|
|
var received = new TaskCompletionSource<string>();
|
|
|
|
var subscription = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub.SubscribeAsync<string>("test.>"))
|
|
{
|
|
received.TrySetResult(msg.Subject);
|
|
break;
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100);
|
|
await pub.PublishAsync("test.foo.bar.baz", "data");
|
|
|
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
Assert.Equal("test.foo.bar.baz", result);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PubSub_fan_out()
|
|
{
|
|
await using var pub = CreateClient();
|
|
await using var sub1 = CreateClient();
|
|
await using var sub2 = CreateClient();
|
|
|
|
await pub.ConnectAsync();
|
|
await sub1.ConnectAsync();
|
|
await sub2.ConnectAsync();
|
|
|
|
var count1 = 0;
|
|
var count2 = 0;
|
|
var done = new TaskCompletionSource();
|
|
|
|
var s1 = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub1.SubscribeAsync<string>("fanout"))
|
|
{
|
|
Interlocked.Increment(ref count1);
|
|
if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2)
|
|
done.TrySetResult();
|
|
break;
|
|
}
|
|
});
|
|
|
|
var s2 = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub2.SubscribeAsync<string>("fanout"))
|
|
{
|
|
Interlocked.Increment(ref count2);
|
|
if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2)
|
|
done.TrySetResult();
|
|
break;
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100);
|
|
await pub.PublishAsync("fanout", "hello");
|
|
|
|
await done.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
Assert.Equal(1, count1);
|
|
Assert.Equal(1, count2);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PingPong()
|
|
{
|
|
await using var client = CreateClient();
|
|
await client.ConnectAsync();
|
|
|
|
// If we got here, the connection is alive and PING/PONG works
|
|
await client.PingAsync();
|
|
}
|
|
}
|
|
```
|
|
|
|
**Step 3: Run tests**
|
|
|
|
Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~IntegrationTests" -v normal`
|
|
Expected: All tests PASS (may need debugging iterations).
|
|
|
|
**Step 4: Commit**
|
|
|
|
```bash
|
|
git add tests/NATS.Server.Tests/
|
|
git commit -m "feat: add integration tests using NATS.Client.Core NuGet package"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 9: Final Validation and Cleanup
|
|
|
|
**Files:**
|
|
- Modify: `CLAUDE.md` (update build commands now that project exists)
|
|
|
|
**Step 1: Run full test suite**
|
|
|
|
Run: `dotnet test NatsDotNet.sln -v normal`
|
|
Expected: All tests PASS.
|
|
|
|
**Step 2: Verify host app runs and accepts nats CLI connections**
|
|
|
|
```bash
|
|
# Terminal 1: Start server
|
|
dotnet run --project src/NATS.Server.Host -- -p 14222 &
|
|
|
|
# Terminal 2: Test with nats CLI (if available)
|
|
nats sub test -s nats://127.0.0.1:14222 &
|
|
nats pub test "Hello from nats CLI" -s nats://127.0.0.1:14222
|
|
```
|
|
|
|
**Step 3: Update CLAUDE.md with actual verified commands**
|
|
|
|
Update the Build & Test Commands section to match the real project structure.
|
|
|
|
**Step 4: Commit**
|
|
|
|
```bash
|
|
git add CLAUDE.md
|
|
git commit -m "docs: update CLAUDE.md with verified build and test commands"
|
|
```
|