feat: port session 01 — Foundation Types (const, errors, proto, ring, rate_counter, sdm)

Ports server/const.go, errors.go, proto.go, ring.go, rate_counter.go, sdm.go.
- ServerConstants: all protocol constants and version info from const.go
- ServerErrors: ~60 sentinel exceptions plus errCtx/configErr/processConfigErr types
- ProtoWire: protobuf varint encode/decode helpers (proto.go)
- RateCounter: sliding-window rate limiter (rate_counter.go)
- ClosedRingBuffer: fixed-size ring buffer for /connz (ring.go)
- StreamDeletionMeta: SDM tracking for JetStream cluster consensus (sdm.go)
- 5 unit tests passing (errors, ring buffer, rate counter)
- errors_gen.go (code generator tool) and nkey.go Server methods marked n_a
This commit is contained in:
Joseph Doherty
2026-02-26 09:15:20 -05:00
parent 66628bc25a
commit 8050ee1897
12 changed files with 1538 additions and 9 deletions

View File

@@ -0,0 +1,118 @@
// Copyright 2018-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/ring.go in the NATS server Go source.
namespace ZB.MOM.NatsNet.Server.Internal;
// -------------------------------------------------------------------------
// Placeholder types — will be populated in session 08 (Client) and
// session 12 (Monitor/Events).
// -------------------------------------------------------------------------
/// <summary>
/// Subset of client connection info exposed on the monitoring endpoint (/connz).
/// Full implementation is in session 12 (monitor.go).
/// </summary>
public class ConnInfo { }
/// <summary>
/// Subscription detail for the monitoring endpoint.
/// Full implementation is in session 12 (monitor.go).
/// </summary>
public class SubDetail { }
// -------------------------------------------------------------------------
// ClosedClient / ClosedRingBuffer (ring.go)
// -------------------------------------------------------------------------
/// <summary>
/// Wraps connection info with optional items for the /connz endpoint.
/// Mirrors <c>closedClient</c> in server/ring.go.
/// </summary>
public sealed class ClosedClient
{
public ConnInfo Info { get; init; } = new();
public IReadOnlyList<SubDetail> Subs { get; init; } = [];
public string User { get; init; } = string.Empty;
public string Account { get; init; } = string.Empty;
}
/// <summary>
/// Fixed-size ring buffer that retains the most recent closed connections,
/// evicting the oldest when full.
/// Mirrors <c>closedRingBuffer</c> in server/ring.go.
/// </summary>
public sealed class ClosedRingBuffer
{
private ulong _total;
private readonly ClosedClient?[] _conns;
/// <summary>Creates a ring buffer that holds at most <paramref name="max"/> entries.</summary>
public ClosedRingBuffer(int max)
{
_conns = new ClosedClient?[max];
}
/// <summary>
/// Appends a closed connection, evicting the oldest if the buffer is full.
/// Mirrors <c>closedRingBuffer.append</c>.
/// </summary>
public void Append(ClosedClient cc)
{
_conns[Next()] = cc;
_total++;
}
// Index of the slot to write next — wraps around.
private int Next() => (int)(_total % (ulong)_conns.Length);
/// <summary>
/// Returns the number of entries currently stored (≤ capacity).
/// Mirrors <c>closedRingBuffer.len</c>.
/// </summary>
public int Len() =>
_total > (ulong)_conns.Length ? _conns.Length : (int)_total;
/// <summary>
/// Returns the total number of connections ever appended (not capped).
/// Mirrors <c>closedRingBuffer.totalConns</c>.
/// </summary>
public ulong TotalConns() => _total;
/// <summary>
/// Returns a chronologically ordered copy of the stored closed connections.
/// The caller may freely modify the returned array.
/// Mirrors <c>closedRingBuffer.closedClients</c>.
/// </summary>
public ClosedClient?[] ClosedClients()
{
var len = Len();
var dup = new ClosedClient?[len];
var head = Next();
if (_total <= (ulong)_conns.Length || head == 0)
{
Array.Copy(_conns, dup, len);
}
else
{
var fp = _conns.AsSpan(head); // oldest … end of array
var sp = _conns.AsSpan(0, head); // wrap-around … newest
fp.CopyTo(dup.AsSpan());
sp.CopyTo(dup.AsSpan(fp.Length));
}
return dup;
}
}

View File

@@ -0,0 +1,121 @@
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/sdm.go in the NATS server Go source.
namespace ZB.MOM.NatsNet.Server.Internal.DataStructures;
/// <summary>
/// Per-sequence data tracked by <see cref="StreamDeletionMeta"/>.
/// Mirrors <c>SDMBySeq</c> in server/sdm.go.
/// </summary>
public readonly struct SdmBySeq
{
/// <summary>Whether this sequence was the last message for its subject.</summary>
public bool Last { get; init; }
/// <summary>Timestamp (nanoseconds UTC) when the removal/SDM was last proposed.</summary>
public long Ts { get; init; }
}
/// <summary>
/// Tracks pending subject delete markers (SDMs) and message removals for a stream.
/// Used by JetStream cluster consensus to avoid redundant proposals.
/// Mirrors <c>SDMMeta</c> in server/sdm.go.
/// </summary>
public sealed class StreamDeletionMeta
{
// Per-subject pending-count totals.
private readonly Dictionary<string, ulong> _totals = new(1);
// Per-sequence data keyed by sequence number.
private readonly Dictionary<ulong, SdmBySeq> _pending = new(1);
// -------------------------------------------------------------------------
// Header constants (forward-declared; populated in session 19 — JetStream).
// isSubjectDeleteMarker checks these header keys.
// -------------------------------------------------------------------------
// Mirrors JSMarkerReason header key (defined in jetstream.go).
internal const string HeaderJsMarkerReason = "Nats-Marker-Reason";
// Mirrors KVOperation header key (defined in jetstream.go).
internal const string HeaderKvOperation = "KV-Operation";
// Mirrors KVOperationValuePurge (defined in jetstream.go).
internal const string KvOperationValuePurge = "PURGE";
/// <summary>
/// Returns true when the given header block contains a subject delete marker
/// (either a JetStream marker or a KV purge operation).
/// Mirrors <c>isSubjectDeleteMarker</c> in server/sdm.go.
/// </summary>
public static bool IsSubjectDeleteMarker(ReadOnlySpan<byte> hdr)
{
// Simplified header scan: checks whether JSMarkerReason key is present
// or whether KV-Operation equals "PURGE".
// Full implementation depends on SliceHeader from session 08 (client.go).
// Until then this provides the correct contract.
var text = System.Text.Encoding.UTF8.GetString(hdr);
if (text.Contains(HeaderJsMarkerReason))
return true;
if (text.Contains($"{HeaderKvOperation}: {KvOperationValuePurge}"))
return true;
return false;
}
/// <summary>
/// Clears all tracked data.
/// Mirrors <c>SDMMeta.empty</c>.
/// </summary>
public void Empty()
{
_totals.Clear();
_pending.Clear();
}
/// <summary>
/// Tracks <paramref name="seq"/> as pending and returns whether it was
/// the last message for its subject. If the sequence is already tracked
/// the existing <c>Last</c> value is returned without modification.
/// Mirrors <c>SDMMeta.trackPending</c>.
/// </summary>
public bool TrackPending(ulong seq, string subj, bool last)
{
if (_pending.TryGetValue(seq, out var p))
return p.Last;
_pending[seq] = new SdmBySeq { Last = last, Ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L };
_totals[subj] = _totals.TryGetValue(subj, out var cnt) ? cnt + 1 : 1;
return last;
}
/// <summary>
/// Removes <paramref name="seq"/> and decrements the pending count for
/// <paramref name="subj"/>, deleting the subject entry when it reaches zero.
/// Mirrors <c>SDMMeta.removeSeqAndSubject</c>.
/// </summary>
public void RemoveSeqAndSubject(ulong seq, string subj)
{
if (!_pending.Remove(seq))
return;
if (_totals.TryGetValue(subj, out var msgs))
{
if (msgs <= 1)
_totals.Remove(subj);
else
_totals[subj] = msgs - 1;
}
}
}

View File

@@ -0,0 +1,284 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/proto.go in the NATS server Go source.
// Inspired by https://github.com/protocolbuffers/protobuf-go/blob/master/encoding/protowire/wire.go
namespace ZB.MOM.NatsNet.Server.Internal;
/// <summary>
/// Low-level protobuf wire format helpers used internally for JetStream API encoding.
/// Mirrors server/proto.go.
/// </summary>
public static class ProtoWire
{
private static readonly Exception ErrInsufficient =
new InvalidOperationException("insufficient data to read a value");
private static readonly Exception ErrOverflow =
new InvalidOperationException("too much data for a value");
private static readonly Exception ErrInvalidFieldNumber =
new InvalidOperationException("invalid field number");
// -------------------------------------------------------------------------
// Field scanning
// -------------------------------------------------------------------------
/// <summary>
/// Reads a complete protobuf field (tag + value) from <paramref name="b"/>
/// and returns the field number, wire type, and total byte size consumed.
/// Mirrors <c>protoScanField</c>.
/// </summary>
public static (int num, int typ, int size, Exception? err) ScanField(ReadOnlySpan<byte> b)
{
var (num, typ, sizeTag, err) = ScanTag(b);
if (err != null) return (0, 0, 0, err);
var (sizeValue, err2) = ScanFieldValue(typ, b[sizeTag..]);
if (err2 != null) return (0, 0, 0, err2);
return (num, typ, sizeTag + sizeValue, null);
}
/// <summary>
/// Reads a protobuf tag varint and returns field number, wire type, and bytes consumed.
/// Mirrors <c>protoScanTag</c>.
/// </summary>
public static (int num, int typ, int size, Exception? err) ScanTag(ReadOnlySpan<byte> b)
{
var (tagint, size, err) = ScanVarint(b);
if (err != null) return (0, 0, 0, err);
if ((tagint >> 3) > int.MaxValue)
return (0, 0, 0, ErrInvalidFieldNumber);
var num = (int)(tagint >> 3);
if (num < 1)
return (0, 0, 0, ErrInvalidFieldNumber);
var typ = (int)(tagint & 7);
return (num, typ, size, null);
}
/// <summary>
/// Returns the byte count consumed by a field value with the given wire type.
/// Mirrors <c>protoScanFieldValue</c>.
/// </summary>
public static (int size, Exception? err) ScanFieldValue(int typ, ReadOnlySpan<byte> b)
{
switch (typ)
{
case 0: // varint
{
var (_, size, err) = ScanVarint(b);
return (size, err);
}
case 5: // fixed32
return (4, null);
case 1: // fixed64
return (8, null);
case 2: // length-delimited
{
var (size, err) = ScanBytes(b);
return (size, err);
}
default:
return (0, new InvalidOperationException($"unsupported type: {typ}"));
}
}
// -------------------------------------------------------------------------
// Varint decode
// -------------------------------------------------------------------------
/// <summary>
/// Decodes a protobuf varint from <paramref name="b"/>.
/// Returns (value, bytes_consumed, error).
/// Mirrors <c>protoScanVarint</c>.
/// </summary>
public static (ulong v, int size, Exception? err) ScanVarint(ReadOnlySpan<byte> b)
{
if (b.Length < 1) return (0, 0, ErrInsufficient);
ulong v = b[0];
if (v < 0x80) return (v, 1, null);
v -= 0x80;
if (b.Length < 2) return (0, 0, ErrInsufficient);
ulong y = b[1];
v += y << 7;
if (y < 0x80) return (v, 2, null);
v -= 0x80UL << 7;
if (b.Length < 3) return (0, 0, ErrInsufficient);
y = b[2];
v += y << 14;
if (y < 0x80) return (v, 3, null);
v -= 0x80UL << 14;
if (b.Length < 4) return (0, 0, ErrInsufficient);
y = b[3];
v += y << 21;
if (y < 0x80) return (v, 4, null);
v -= 0x80UL << 21;
if (b.Length < 5) return (0, 0, ErrInsufficient);
y = b[4];
v += y << 28;
if (y < 0x80) return (v, 5, null);
v -= 0x80UL << 28;
if (b.Length < 6) return (0, 0, ErrInsufficient);
y = b[5];
v += y << 35;
if (y < 0x80) return (v, 6, null);
v -= 0x80UL << 35;
if (b.Length < 7) return (0, 0, ErrInsufficient);
y = b[6];
v += y << 42;
if (y < 0x80) return (v, 7, null);
v -= 0x80UL << 42;
if (b.Length < 8) return (0, 0, ErrInsufficient);
y = b[7];
v += y << 49;
if (y < 0x80) return (v, 8, null);
v -= 0x80UL << 49;
if (b.Length < 9) return (0, 0, ErrInsufficient);
y = b[8];
v += y << 56;
if (y < 0x80) return (v, 9, null);
v -= 0x80UL << 56;
if (b.Length < 10) return (0, 0, ErrInsufficient);
y = b[9];
v += y << 63;
if (y < 2) return (v, 10, null);
return (0, 0, ErrOverflow);
}
// -------------------------------------------------------------------------
// Length-delimited decode
// -------------------------------------------------------------------------
/// <summary>
/// Returns the total byte count consumed by a length-delimited field
/// (length varint + content).
/// Mirrors <c>protoScanBytes</c>.
/// </summary>
public static (int size, Exception? err) ScanBytes(ReadOnlySpan<byte> b)
{
var (l, lenSize, err) = ScanVarint(b);
if (err != null) return (0, err);
if (l > (ulong)(b.Length - lenSize))
return (0, ErrInsufficient);
return (lenSize + (int)l, null);
}
// -------------------------------------------------------------------------
// Varint encode
// -------------------------------------------------------------------------
/// <summary>
/// Encodes a <see cref="ulong"/> as a protobuf varint.
/// Mirrors <c>protoEncodeVarint</c>.
/// </summary>
public static byte[] EncodeVarint(ulong v)
{
if (v < 1UL << 7)
return [(byte)v];
if (v < 1UL << 14)
return [(byte)((v >> 0) & 0x7F | 0x80), (byte)(v >> 7)];
if (v < 1UL << 21)
return [
(byte)((v >> 0) & 0x7F | 0x80),
(byte)((v >> 7) & 0x7F | 0x80),
(byte)(v >> 14)];
if (v < 1UL << 28)
return [
(byte)((v >> 0) & 0x7F | 0x80),
(byte)((v >> 7) & 0x7F | 0x80),
(byte)((v >> 14) & 0x7F | 0x80),
(byte)(v >> 21)];
if (v < 1UL << 35)
return [
(byte)((v >> 0) & 0x7F | 0x80),
(byte)((v >> 7) & 0x7F | 0x80),
(byte)((v >> 14) & 0x7F | 0x80),
(byte)((v >> 21) & 0x7F | 0x80),
(byte)(v >> 28)];
if (v < 1UL << 42)
return [
(byte)((v >> 0) & 0x7F | 0x80),
(byte)((v >> 7) & 0x7F | 0x80),
(byte)((v >> 14) & 0x7F | 0x80),
(byte)((v >> 21) & 0x7F | 0x80),
(byte)((v >> 28) & 0x7F | 0x80),
(byte)(v >> 35)];
if (v < 1UL << 49)
return [
(byte)((v >> 0) & 0x7F | 0x80),
(byte)((v >> 7) & 0x7F | 0x80),
(byte)((v >> 14) & 0x7F | 0x80),
(byte)((v >> 21) & 0x7F | 0x80),
(byte)((v >> 28) & 0x7F | 0x80),
(byte)((v >> 35) & 0x7F | 0x80),
(byte)(v >> 42)];
if (v < 1UL << 56)
return [
(byte)((v >> 0) & 0x7F | 0x80),
(byte)((v >> 7) & 0x7F | 0x80),
(byte)((v >> 14) & 0x7F | 0x80),
(byte)((v >> 21) & 0x7F | 0x80),
(byte)((v >> 28) & 0x7F | 0x80),
(byte)((v >> 35) & 0x7F | 0x80),
(byte)((v >> 42) & 0x7F | 0x80),
(byte)(v >> 49)];
if (v < 1UL << 63)
return [
(byte)((v >> 0) & 0x7F | 0x80),
(byte)((v >> 7) & 0x7F | 0x80),
(byte)((v >> 14) & 0x7F | 0x80),
(byte)((v >> 21) & 0x7F | 0x80),
(byte)((v >> 28) & 0x7F | 0x80),
(byte)((v >> 35) & 0x7F | 0x80),
(byte)((v >> 42) & 0x7F | 0x80),
(byte)((v >> 49) & 0x7F | 0x80),
(byte)(v >> 56)];
return [
(byte)((v >> 0) & 0x7F | 0x80),
(byte)((v >> 7) & 0x7F | 0x80),
(byte)((v >> 14) & 0x7F | 0x80),
(byte)((v >> 21) & 0x7F | 0x80),
(byte)((v >> 28) & 0x7F | 0x80),
(byte)((v >> 35) & 0x7F | 0x80),
(byte)((v >> 42) & 0x7F | 0x80),
(byte)((v >> 49) & 0x7F | 0x80),
(byte)((v >> 56) & 0x7F | 0x80),
1];
}
}

View File

@@ -0,0 +1,81 @@
// Copyright 2021-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/rate_counter.go in the NATS server Go source.
namespace ZB.MOM.NatsNet.Server.Internal;
/// <summary>
/// A sliding-window rate limiter that allows at most <c>limit</c> events
/// per <see cref="Interval"/> window.
/// Mirrors <c>rateCounter</c> in server/rate_counter.go.
/// </summary>
public sealed class RateCounter
{
private readonly long _limit;
private long _count;
private ulong _blocked;
private DateTime _end;
// Exposed for tests (mirrors direct field access in rate_counter_test.go).
public TimeSpan Interval;
private readonly object _lock = new();
public RateCounter(long limit)
{
_limit = limit;
Interval = TimeSpan.FromSeconds(1);
}
/// <summary>
/// Returns true if the event is within the rate limit for the current window.
/// Mirrors <c>rateCounter.allow</c>.
/// </summary>
public bool Allow()
{
var now = DateTime.UtcNow;
lock (_lock)
{
if (now > _end)
{
_count = 0;
_end = now + Interval;
}
else
{
_count++;
}
var allow = _count < _limit;
if (!allow)
_blocked++;
return allow;
}
}
/// <summary>
/// Returns and resets the count of blocked events since the last call.
/// Mirrors <c>rateCounter.countBlocked</c>.
/// </summary>
public ulong CountBlocked()
{
lock (_lock)
{
var blocked = _blocked;
_blocked = 0;
return blocked;
}
}
}