diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs new file mode 100644 index 0000000..066600c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs @@ -0,0 +1,172 @@ +// Copyright 2021-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/ocsp.go, server/ocsp_peer.go, server/ocsp_responsecache.go +// in the NATS server Go source. + +using System.Security.Cryptography.X509Certificates; + +namespace ZB.MOM.NatsNet.Server.Auth.Ocsp; + +/// +/// Controls how OCSP stapling behaves for a TLS certificate. +/// Mirrors Go OCSPMode uint8 in server/ocsp.go. +/// +public enum OcspMode : byte +{ + /// + /// Staple only if the "status_request" OID is present in the certificate. + /// Mirrors Go OCSPModeAuto. + /// + Auto = 0, + + /// + /// Must staple — honors the Must-Staple flag and shuts down on revocation. + /// Mirrors Go OCSPModeMust. + /// + MustStaple = 1, + + /// + /// Always obtain OCSP status, regardless of certificate flags. + /// Mirrors Go OCSPModeAlways. + /// + Always = 2, + + /// + /// Never check OCSP, even if the certificate has the Must-Staple flag. + /// Mirrors Go OCSPModeNever. + /// + Never = 3, +} + +/// +/// Holds a cached OCSP staple response and its expiry information. +/// +internal sealed class OcspStaple +{ + /// The raw DER-encoded OCSP response bytes. + public byte[]? Response { get; set; } + + /// When the OCSP response next needs to be refreshed. + public DateTime NextUpdate { get; set; } +} + +/// +/// Orchestrates OCSP stapling for a single TLS certificate. +/// Monitors certificate validity and refreshes the staple on a background timer. +/// Mirrors Go OCSPMonitor struct in server/ocsp.go. +/// Replaces the stub in NatsServerTypes.cs. +/// +internal sealed class OcspMonitor +{ + private readonly Lock _mu = new(); + + /// Path to the TLS certificate file being monitored. + public string? CertFile { get; set; } + + /// Path to the CA certificate file used to verify OCSP responses. + public string? CaFile { get; set; } + + /// Path to a persisted OCSP staple file (optional). + public string? OcspStapleFile { get; set; } + + /// The OCSP stapling mode for this monitor. + public OcspMode Mode { get; set; } + + /// How often to check for a fresh OCSP response. + public TimeSpan CheckInterval { get; set; } = TimeSpan.FromHours(24); + + /// The owning server instance. + public NatsServer? Server { get; set; } + + /// The synchronisation lock for this monitor's mutable state. + public Lock Mu => _mu; + + /// Starts the background OCSP refresh timer. + public void Start() + => throw new NotImplementedException("TODO: session 23 — ocsp"); + + /// Stops the background OCSP refresh timer. + public void Stop() + => throw new NotImplementedException("TODO: session 23 — ocsp"); + + /// Returns the current cached OCSP staple bytes, or null if none. + public byte[]? GetStaple() + => throw new NotImplementedException("TODO: session 23 — ocsp"); +} + +/// +/// Interface for caching raw OCSP response bytes keyed by certificate fingerprint. +/// Mirrors Go OCSPResponseCache interface in server/ocsp_responsecache.go. +/// Replaces the stub in NatsServerTypes.cs. +/// +public interface IOcspResponseCache +{ + /// Returns the cached OCSP response for , or null. + byte[]? Get(string key); + + /// Stores an OCSP response under . + void Put(string key, byte[] response); + + /// Removes the cached entry for . + void Remove(string key); +} + +/// +/// A no-op OCSP cache that never stores anything. +/// Mirrors Go NoOpCache in server/ocsp_responsecache.go. +/// +internal sealed class NoOpCache : IOcspResponseCache +{ + public byte[]? Get(string key) => null; + public void Put(string key, byte[] response) { } + public void Remove(string key) { } +} + +/// +/// An OCSP cache backed by a local directory on disk. +/// Mirrors Go LocalCache in server/ocsp_responsecache.go. +/// Full implementation is deferred to session 23. +/// +internal sealed class LocalDirCache : IOcspResponseCache +{ + private readonly string _dir; + + public LocalDirCache(string dir) + { + _dir = dir; + } + + public byte[]? Get(string key) + => throw new NotImplementedException("TODO: session 23 — ocsp"); + + public void Put(string key, byte[] response) + => throw new NotImplementedException("TODO: session 23 — ocsp"); + + public void Remove(string key) + => throw new NotImplementedException("TODO: session 23 — ocsp"); +} + +/// +/// Payload for the OCSP peer certificate rejection advisory event. +/// Mirrors Go OCSPPeerRejectEventMsg fields in server/events.go +/// and the OCSP peer reject logic in server/ocsp_peer.go. +/// +public sealed class OcspPeerRejectInfo +{ + [System.Text.Json.Serialization.JsonPropertyName("peer")] + public string Peer { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs index e7bf85a..53ea072 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs @@ -21,12 +21,6 @@ namespace ZB.MOM.NatsNet.Server; // Forward stubs for types defined in later sessions // --------------------------------------------------------------------------- -/// Stub: full definition in session 20 (stream.go). -public sealed class StreamInfo { } - -/// Stub: full definition in session 20 (consumer.go). -public sealed class ConsumerInfo { } - /// Stub: stored message type — full definition in session 20. public sealed class StoredMsg { } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs new file mode 100644 index 0000000..e06c74f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -0,0 +1,151 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/consumer.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server; + +/// +/// Represents a JetStream consumer, managing message delivery, ack tracking, and lifecycle. +/// Mirrors the consumer struct in server/consumer.go. +/// +internal sealed class NatsConsumer : IDisposable +{ + private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); + + public string Name { get; private set; } = string.Empty; + public string Stream { get; private set; } = string.Empty; + public ConsumerConfig Config { get; private set; } = new(); + public DateTime Created { get; private set; } + + // Atomic counters — use Interlocked for thread-safe access + internal long Delivered; + internal long AckFloor; + internal long NumAckPending; + internal long NumRedelivered; + + private bool _closed; + + /// IRaftNode — stored as object to avoid cross-dependency on Raft session. + private object? _node; + + private CancellationTokenSource? _quitCts; + + public NatsConsumer(string stream, ConsumerConfig config, DateTime created) + { + Stream = stream; + Name = (config.Name is { Length: > 0 } name) ? name + : (config.Durable ?? string.Empty); + Config = config; + Created = created; + _quitCts = new CancellationTokenSource(); + } + + // ------------------------------------------------------------------------- + // Factory + // ------------------------------------------------------------------------- + + /// + /// Creates a new for the given stream. + /// Returns null if the consumer cannot be created (stub: always throws). + /// Mirrors newConsumer / consumer.create in server/consumer.go. + /// + public static NatsConsumer? Create( + NatsStream stream, + ConsumerConfig cfg, + ConsumerAction action, + ConsumerAssignment? sa) + { + throw new NotImplementedException("TODO: session 21 — consumer"); + } + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + /// + /// Stops processing and tears down goroutines / timers. + /// Mirrors consumer.stop in server/consumer.go. + /// + public void Stop() => + throw new NotImplementedException("TODO: session 21 — consumer"); + + /// + /// Deletes the consumer and all associated state permanently. + /// Mirrors consumer.delete in server/consumer.go. + /// + public void Delete() => + throw new NotImplementedException("TODO: session 21 — consumer"); + + // ------------------------------------------------------------------------- + // Info / State + // ------------------------------------------------------------------------- + + /// + /// Returns a snapshot of consumer info including config and delivery state. + /// Mirrors consumer.info in server/consumer.go. + /// + public ConsumerInfo GetInfo() => + throw new NotImplementedException("TODO: session 21 — consumer"); + + /// + /// Returns the current consumer configuration. + /// Mirrors consumer.config in server/consumer.go. + /// + public ConsumerConfig GetConfig() => + throw new NotImplementedException("TODO: session 21 — consumer"); + + /// + /// Applies an updated configuration to the consumer. + /// Mirrors consumer.update in server/consumer.go. + /// + public void UpdateConfig(ConsumerConfig config) => + throw new NotImplementedException("TODO: session 21 — consumer"); + + /// + /// Returns the current durable consumer state (delivered, ack_floor, pending, redelivered). + /// Mirrors consumer.state in server/consumer.go. + /// + public ConsumerState GetConsumerState() => + throw new NotImplementedException("TODO: session 21 — consumer"); + + // ------------------------------------------------------------------------- + // Leadership + // ------------------------------------------------------------------------- + + /// + /// Returns true if this server is the current consumer leader. + /// Mirrors consumer.isLeader in server/consumer.go. + /// + public bool IsLeader() => + throw new NotImplementedException("TODO: session 21 — consumer"); + + /// + /// Transitions this consumer into or out of the leader role. + /// Mirrors consumer.setLeader in server/consumer.go. + /// + public void SetLeader(bool isLeader, ulong term) => + throw new NotImplementedException("TODO: session 21 — consumer"); + + // ------------------------------------------------------------------------- + // IDisposable + // ------------------------------------------------------------------------- + + public void Dispose() + { + _quitCts?.Cancel(); + _quitCts?.Dispose(); + _quitCts = null; + _mu.Dispose(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs new file mode 100644 index 0000000..fa9ab3d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -0,0 +1,197 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/stream.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server; + +/// +/// Represents a JetStream stream, managing message storage, replication, and lifecycle. +/// Mirrors the stream struct in server/stream.go. +/// +internal sealed class NatsStream : IDisposable +{ + private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); + + public Account Account { get; private set; } + public string Name { get; private set; } = string.Empty; + public StreamConfig Config { get; private set; } = new(); + public DateTime Created { get; private set; } + internal IStreamStore? Store { get; private set; } + + // Atomic counters — use Interlocked for thread-safe access + internal long Msgs; + internal long Bytes; + internal long FirstSeq; + internal long LastSeq; + + internal bool IsMirror; + + private bool _closed; + private CancellationTokenSource? _quitCts; + + /// IRaftNode — stored as object to avoid cross-dependency on Raft session. + private object? _node; + + public NatsStream(Account account, StreamConfig config, DateTime created) + { + Account = account; + Name = config.Name ?? string.Empty; + Config = config; + Created = created; + _quitCts = new CancellationTokenSource(); + } + + // ------------------------------------------------------------------------- + // Factory + // ------------------------------------------------------------------------- + + /// + /// Creates a new after validating the configuration. + /// Returns null if the stream cannot be created (stub: always throws). + /// Mirrors newStream / stream.create in server/stream.go. + /// + public static NatsStream? Create( + Account acc, + StreamConfig cfg, + object? jsacc, + IStreamStore? store, + StreamAssignment? sa, + object? server) + { + throw new NotImplementedException("TODO: session 21 — stream"); + } + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + /// + /// Stops processing and tears down goroutines / timers. + /// Mirrors stream.stop in server/stream.go. + /// + public void Stop() => + throw new NotImplementedException("TODO: session 21 — stream"); + + /// + /// Deletes the stream and all stored messages permanently. + /// Mirrors stream.delete in server/stream.go. + /// + public void Delete() => + throw new NotImplementedException("TODO: session 21 — stream"); + + /// + /// Purges messages from the stream according to the optional request filter. + /// Mirrors stream.purge in server/stream.go. + /// + public void Purge(StreamPurgeRequest? req = null) => + throw new NotImplementedException("TODO: session 21 — stream"); + + // ------------------------------------------------------------------------- + // Info / State + // ------------------------------------------------------------------------- + + /// + /// Returns a snapshot of stream info including config, state, and cluster information. + /// Mirrors stream.info in server/stream.go. + /// + public StreamInfo GetInfo(bool includeDeleted = false) => + throw new NotImplementedException("TODO: session 21 — stream"); + + /// + /// Asynchronously returns a snapshot of stream info. + /// Mirrors stream.info (async path) in server/stream.go. + /// + public Task GetInfoAsync(bool includeDeleted = false, CancellationToken ct = default) => + throw new NotImplementedException("TODO: session 21 — stream"); + + /// + /// Returns the current stream state (message counts, byte totals, sequences). + /// Mirrors stream.state in server/stream.go. + /// + public StreamState State() => + throw new NotImplementedException("TODO: session 21 — stream"); + + // ------------------------------------------------------------------------- + // Leadership + // ------------------------------------------------------------------------- + + /// + /// Transitions this stream into or out of the leader role. + /// Mirrors stream.setLeader in server/stream.go. + /// + public void SetLeader(bool isLeader, ulong term) => + throw new NotImplementedException("TODO: session 21 — stream"); + + /// + /// Returns true if this server is the current stream leader. + /// Mirrors stream.isLeader in server/stream.go. + /// + public bool IsLeader() => + throw new NotImplementedException("TODO: session 21 — stream"); + + // ------------------------------------------------------------------------- + // Configuration + // ------------------------------------------------------------------------- + + /// + /// Returns the owning account. + /// Mirrors stream.account in server/stream.go. + /// + public Account GetAccount() => + throw new NotImplementedException("TODO: session 21 — stream"); + + /// + /// Returns the current stream configuration. + /// Mirrors stream.config in server/stream.go. + /// + public StreamConfig GetConfig() => + throw new NotImplementedException("TODO: session 21 — stream"); + + /// + /// Applies an updated configuration to the stream. + /// Mirrors stream.update in server/stream.go. + /// + public void UpdateConfig(StreamConfig config) => + throw new NotImplementedException("TODO: session 21 — stream"); + + // ------------------------------------------------------------------------- + // Sealed state + // ------------------------------------------------------------------------- + + /// + /// Returns true if the stream is sealed (no new messages accepted). + /// Mirrors stream.isSealed in server/stream.go. + /// + public bool IsSealed() => + throw new NotImplementedException("TODO: session 21 — stream"); + + /// + /// Seals the stream so that no new messages can be stored. + /// Mirrors stream.seal in server/stream.go. + /// + public void Seal() => + throw new NotImplementedException("TODO: session 21 — stream"); + + // ------------------------------------------------------------------------- + // IDisposable + // ------------------------------------------------------------------------- + + public void Dispose() + { + _quitCts?.Cancel(); + _quitCts?.Dispose(); + _quitCts = null; + _mu.Dispose(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs new file mode 100644 index 0000000..90e8037 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -0,0 +1,484 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/stream.go and server/consumer.go in the NATS server Go source. + +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// Stream API types (from stream.go) +// ============================================================================ + +/// +/// A stream create request that extends with a pedantic flag. +/// Mirrors streamConfigRequest in server/stream.go. +/// +public sealed class StreamConfigRequest +{ + [JsonPropertyName("config")] + public StreamConfig Config { get; set; } = new(); + + /// If true, strict validation is applied during stream creation/update. + [JsonPropertyName("pedantic")] + public bool Pedantic { get; set; } +} + +/// +/// Information about a stream, returned from info requests. +/// Mirrors StreamInfo in server/stream.go. +/// +public sealed class StreamInfo +{ + [JsonPropertyName("config")] + public StreamConfig Config { get; set; } = new(); + + [JsonPropertyName("created")] + public DateTime Created { get; set; } + + [JsonPropertyName("state")] + public StreamState State { get; set; } = new(); + + [JsonPropertyName("mirror")] + public StreamSourceInfo? Mirror { get; set; } + + [JsonPropertyName("sources")] + public StreamSourceInfo[]? Sources { get; set; } + + [JsonPropertyName("cluster")] + public ClusterInfo? Cluster { get; set; } + + [JsonPropertyName("mirror_direct")] + public bool Mirror_Direct { get; set; } + + [JsonPropertyName("allow_direct")] + public bool Allow_Direct { get; set; } + + /// Alternate cluster name. + [JsonPropertyName("alternates")] + public string? Alt { get; set; } +} + +/// +/// Information about a stream mirror or source. +/// Mirrors StreamSourceInfo in server/stream.go. +/// +public sealed class StreamSourceInfo +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("filter_subject")] + public string? FilterSubject { get; set; } + + [JsonPropertyName("lag")] + public ulong Lag { get; set; } + + [JsonPropertyName("active")] + public DateTime? Active { get; set; } + + [JsonPropertyName("external")] + public StreamSource? External { get; set; } + + [JsonPropertyName("error")] + public string? Error { get; set; } +} + +/// +/// Request parameters for stream info, allowing filtering. +/// Mirrors streamInfoRequest in server/stream.go. +/// +public sealed class StreamInfoRequest +{ + [JsonPropertyName("subjects_filter")] + public string? SubjectsFilter { get; set; } + + [JsonPropertyName("mirror_check_until")] + public string? MirrorCheckUntil { get; set; } + + [JsonPropertyName("deleted_details")] + public bool DeletedDetails { get; set; } + + [JsonPropertyName("subjects_detail")] + public bool SubjectsDetail { get; set; } +} + +/// +/// Request parameters for purging a stream. +/// Mirrors StreamPurgeRequest in server/stream.go. +/// +public sealed class StreamPurgeRequest +{ + [JsonPropertyName("filter")] + public string? Filter { get; set; } + + [JsonPropertyName("seq")] + public ulong Sequence { get; set; } + + [JsonPropertyName("keep")] + public ulong Keep { get; set; } +} + +/// +/// Request for deleting a specific stream message. +/// Mirrors StreamMsgDeleteRequest in server/stream.go. +/// +public sealed class StreamMsgDeleteRequest +{ + [JsonPropertyName("seq")] + public ulong Seq { get; set; } + + [JsonPropertyName("no_erase")] + public bool NoErase { get; set; } +} + +/// +/// Request for retrieving a specific stream message. +/// Mirrors StreamGetMsgRequest in server/stream.go. +/// +public sealed class StreamGetMsgRequest +{ + [JsonPropertyName("seq")] + public ulong Seq { get; set; } + + [JsonPropertyName("last_by_subj")] + public string? LastBySubject { get; set; } + + [JsonPropertyName("next_by_subj")] + public string? NextBySubject { get; set; } +} + +/// +/// Publish acknowledgement response from JetStream. +/// Mirrors JSPubAckResponse in server/stream.go. +/// +public sealed class JSPubAckResponse +{ + [JsonPropertyName("stream")] + public string Stream { get; set; } = string.Empty; + + [JsonPropertyName("seq")] + public ulong Seq { get; set; } + + [JsonPropertyName("duplicate")] + public bool Duplicate { get; set; } + + [JsonPropertyName("domain")] + public string? Domain { get; set; } + + [JsonPropertyName("error")] + public JsApiError? PubAckError { get; set; } + + /// + /// Returns an exception if the response contains an error, otherwise null. + /// Mirrors ToError() helper pattern in NATS Go server. + /// + public Exception? ToError() + { + if (PubAckError is { ErrCode: > 0 }) + return new InvalidOperationException($"{PubAckError.Description} (errCode={PubAckError.ErrCode})"); + return null; + } +} + +/// +/// A raw published message before JetStream processing. +/// Mirrors pubMsg (JetStream variant) in server/stream.go. +/// Note: renamed JsStreamPubMsg to avoid collision with the server-level +/// PubMsg (events.go) which lives in the same namespace. +/// +public sealed class JsStreamPubMsg +{ + public string Subject { get; set; } = string.Empty; + public string? Reply { get; set; } + public byte[]? Hdr { get; set; } + public byte[]? Msg { get; set; } + public Dictionary? Meta { get; set; } +} + +/// +/// A JetStream publish message with sync tracking. +/// Mirrors jsPubMsg in server/stream.go. +/// +public sealed class JsPubMsg +{ + public string Subject { get; set; } = string.Empty; + public string? Reply { get; set; } + public byte[]? Hdr { get; set; } + public byte[]? Msg { get; set; } + + /// Publish argument (opaque, set at runtime). + public object? Pa { get; set; } + + /// Sync/ack channel (opaque, set at runtime). + public object? Sync { get; set; } +} + +/// +/// An inbound message to be processed by the JetStream layer. +/// Mirrors inMsg in server/stream.go. +/// +public sealed class InMsg +{ + public string Subject { get; set; } = string.Empty; + public string? Reply { get; set; } + public byte[]? Hdr { get; set; } + public byte[]? Msg { get; set; } + + /// The originating client (opaque, set at runtime). + public object? Client { get; set; } +} + +/// +/// A cached/clustered message for replication. +/// Mirrors cMsg in server/stream.go. +/// +public sealed class CMsg +{ + public string Subject { get; set; } = string.Empty; + public byte[]? Msg { get; set; } + public ulong Seq { get; set; } +} + +// ============================================================================ +// Consumer API types (from consumer.go) +// ============================================================================ + +/// +/// Information about a consumer, returned from info requests. +/// Mirrors ConsumerInfo in server/consumer.go. +/// +public sealed class ConsumerInfo +{ + [JsonPropertyName("stream_name")] + public string Stream { get; set; } = string.Empty; + + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("created")] + public DateTime Created { get; set; } + + [JsonPropertyName("config")] + public ConsumerConfig? Config { get; set; } + + [JsonPropertyName("delivered")] + public SequenceInfo Delivered { get; set; } = new(); + + [JsonPropertyName("ack_floor")] + public SequenceInfo AckFloor { get; set; } = new(); + + [JsonPropertyName("num_ack_pending")] + public int NumAckPending { get; set; } + + [JsonPropertyName("num_redelivered")] + public int NumRedelivered { get; set; } + + [JsonPropertyName("num_waiting")] + public int NumWaiting { get; set; } + + [JsonPropertyName("num_pending")] + public ulong NumPending { get; set; } + + [JsonPropertyName("cluster")] + public ClusterInfo? Cluster { get; set; } + + [JsonPropertyName("push_bound")] + public bool PushBound { get; set; } + + [JsonPropertyName("paused")] + public bool Paused { get; set; } + + [JsonPropertyName("pause_remaining")] + public TimeSpan PauseRemaining { get; set; } + + [JsonPropertyName("ts")] + public DateTime TimeStamp { get; set; } + + [JsonPropertyName("priority_groups")] + public PriorityGroupState[]? PriorityGroups { get; set; } +} + +/// +/// State information for a priority group on a pull consumer. +/// Mirrors PriorityGroupState in server/consumer.go. +/// +public sealed class PriorityGroupState +{ + [JsonPropertyName("group")] + public string Group { get; set; } = string.Empty; + + [JsonPropertyName("pinned_client_id")] + public string? PinnedClientId { get; set; } + + [JsonPropertyName("pinned_ts")] + public DateTime PinnedTs { get; set; } +} + +/// +/// Sequence information for consumer delivered/ack_floor positions. +/// Mirrors SequenceInfo in server/consumer.go. +/// +public sealed class SequenceInfo +{ + [JsonPropertyName("consumer_seq")] + public ulong Consumer { get; set; } + + [JsonPropertyName("stream_seq")] + public ulong Stream { get; set; } + + [JsonPropertyName("last_active")] + public DateTime? Last { get; set; } +} + +/// +/// Request to create or update a consumer. +/// Mirrors CreateConsumerRequest in server/consumer.go. +/// +public sealed class CreateConsumerRequest +{ + [JsonPropertyName("stream_name")] + public string Stream { get; set; } = string.Empty; + + [JsonPropertyName("config")] + public ConsumerConfig Config { get; set; } = new(); + + [JsonPropertyName("action")] + public ConsumerAction Action { get; set; } +} + +/// +/// Specifies the intended action when creating a consumer. +/// Mirrors ConsumerAction in server/consumer.go. +/// +public enum ConsumerAction +{ + /// Create a new consumer or update if it already exists. + CreateOrUpdate = 0, + + /// Create a new consumer; fail if it already exists. + Create = 1, + + /// Update an existing consumer; fail if it does not exist. + Update = 2, +} + +/// +/// Response for a consumer deletion request. +/// Mirrors ConsumerDeleteResponse in server/consumer.go. +/// +public sealed class ConsumerDeleteResponse +{ + [JsonPropertyName("success")] + public bool Success { get; set; } +} + +/// +/// A pending pull request waiting in the wait queue. +/// Mirrors waitingRequest in server/consumer.go. +/// +public sealed class WaitingRequest +{ + public string Subject { get; set; } = string.Empty; + public string? Reply { get; set; } + + /// Number of messages requested. + public int N { get; set; } + + /// Number of messages delivered so far. + public int D { get; set; } + + /// No-wait flag (1 = no wait). + public int NoWait { get; set; } + + public DateTime? Expires { get; set; } + + /// Max byte limit for this batch. + public int MaxBytes { get; set; } + + /// Bytes accumulated so far. + public int B { get; set; } +} + +/// +/// A circular wait queue for pending pull requests. +/// Mirrors waitQueue in server/consumer.go. +/// +public sealed class WaitQueue +{ + private readonly List _reqs = new(); + private int _head; + private int _tail; + + /// Number of pending requests in the queue. + public int Len => _reqs.Count; + + /// Add a waiting request to the tail of the queue. + public void Add(WaitingRequest req) => + throw new NotImplementedException("TODO: session 21"); + + /// Peek at the head request without removing it. + public WaitingRequest? Peek() => + throw new NotImplementedException("TODO: session 21"); + + /// Remove and return the head request. + public WaitingRequest? Pop() => + throw new NotImplementedException("TODO: session 21"); + + /// Compact the internal backing list to reclaim removed slots. + public void Compress() => + throw new NotImplementedException("TODO: session 21"); + + /// Returns true if the queue is at capacity (head == tail when full). + public bool IsFull(int max) => + throw new NotImplementedException("TODO: session 21"); +} + +/// +/// Cluster membership and leadership information for a stream or consumer. +/// Mirrors ClusterInfo in server/consumer.go and server/stream.go. +/// +public sealed class ClusterInfo +{ + [JsonPropertyName("name")] + public string? Name { get; set; } + + [JsonPropertyName("leader")] + public string? Leader { get; set; } + + [JsonPropertyName("replicas")] + public PeerInfo[]? Replicas { get; set; } +} + +/// +/// Information about a peer in a JetStream Raft group. +/// Mirrors PeerInfo in server/consumer.go and server/stream.go. +/// +public sealed class PeerInfo +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("current")] + public bool Current { get; set; } + + [JsonPropertyName("offline")] + public bool Offline { get; set; } + + [JsonPropertyName("active")] + public TimeSpan Active { get; set; } + + [JsonPropertyName("lag")] + public ulong Lag { get; set; } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttConstants.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttConstants.cs new file mode 100644 index 0000000..ca6aa64 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttConstants.cs @@ -0,0 +1,271 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/mqtt.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server.Mqtt; + +// References to "spec" here are from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf + +/// +/// MQTT control packet type byte values. +/// Mirrors the mqttPacket* constants in server/mqtt.go. +/// +internal static class MqttPacket +{ + public const byte Connect = 0x10; + public const byte ConnectAck = 0x20; + public const byte Pub = 0x30; + public const byte PubAck = 0x40; + public const byte PubRec = 0x50; + public const byte PubRel = 0x60; + public const byte PubComp = 0x70; + public const byte Sub = 0x80; + public const byte SubAck = 0x90; + public const byte Unsub = 0xA0; + public const byte UnsubAck = 0xB0; + public const byte Ping = 0xC0; + public const byte PingResp = 0xD0; + public const byte Disconnect = 0xE0; + public const byte Mask = 0xF0; + public const byte FlagMask = 0x0F; +} + +/// +/// MQTT CONNECT packet flag byte values. +/// Mirrors the mqttConnFlag* constants in server/mqtt.go. +/// +internal static class MqttConnectFlag +{ + public const byte Reserved = 0x01; + public const byte CleanSession = 0x02; + public const byte WillFlag = 0x04; + public const byte WillQoS = 0x18; + public const byte WillRetain = 0x20; + public const byte PasswordFlag = 0x40; + public const byte UsernameFlag = 0x80; +} + +/// +/// MQTT PUBLISH packet flag byte values. +/// Mirrors the mqttPubFlag* and mqttPubQoS* constants in server/mqtt.go. +/// +internal static class MqttPubFlag +{ + public const byte Retain = 0x01; + public const byte QoS = 0x06; + public const byte Dup = 0x08; + public const byte QoS1 = 0x1 << 1; + public const byte QoS2 = 0x2 << 1; +} + +/// +/// MQTT CONNACK return codes. +/// Mirrors the mqttConnAckRC* constants in server/mqtt.go. +/// +internal static class MqttConnAckRc +{ + public const byte Accepted = 0x00; + public const byte UnacceptableProtocol = 0x01; + public const byte IdentifierRejected = 0x02; + public const byte ServerUnavailable = 0x03; + public const byte BadUserOrPassword = 0x04; + public const byte NotAuthorized = 0x05; + public const byte QoS2WillRejected = 0x10; +} + +/// +/// Miscellaneous MQTT protocol constants. +/// Mirrors the remaining scalar constants in server/mqtt.go. +/// +internal static class MqttConst +{ + /// Maximum control packet payload size (0xFFFFFFF). + public const int MaxPayloadSize = 0xFFFFFFF; + + /// MQTT topic level separator character ('/'). + public const char TopicLevelSep = '/'; + + /// Single-level wildcard character ('+'). + public const char SingleLevelWildcard = '+'; + + /// Multi-level wildcard character ('#'). + public const char MultiLevelWildcard = '#'; + + /// Reserved topic prefix character ('$'). + public const char ReservedPrefix = '$'; + + /// MQTT protocol level byte (v3.1.1 = 0x04). + public const byte ProtoLevel = 0x04; + + /// SUBACK failure return code (0x80). + public const byte SubAckFailure = 0x80; + + /// Fixed flags byte in SUBSCRIBE packets (0x02). + public const byte SubscribeFlags = 0x02; + + /// Fixed flags byte in UNSUBSCRIBE packets (0x02). + public const byte UnsubscribeFlags = 0x02; + + /// + /// Suffix appended to the SID of subscriptions created for MQTT '#' wildcard + /// at the upper level. Mirrors mqttMultiLevelSidSuffix. + /// + public const string MultiLevelSidSuffix = " fwc"; + + /// Initial byte allocation for publish headers (overestimate). + public const int InitialPubHeader = 16; + + /// Default maximum number of pending QoS-1 acks per session. + public const int DefaultMaxAckPending = 1024; + + /// Absolute upper limit on cumulative MaxAckPending across all session subscriptions. + public const int MaxAckTotalLimit = 0xFFFF; + + /// WebSocket path for MQTT connections. + public const string WsPath = "/mqtt"; + + /// Marker character for deleted retained messages (used in flag field). + public const char RetainedFlagDelMarker = '-'; +} + +/// +/// MQTT-internal NATS subject / stream / consumer name constants. +/// Mirrors the string constants in server/mqtt.go that define JetStream stream names, +/// subject prefixes, and JSA reply tokens. +/// +internal static class MqttTopics +{ + // ------------------------------------------------------------------------- + // Top-level MQTT subject prefix + // ------------------------------------------------------------------------- + + /// Prefix used for all internal MQTT subjects. + public const string Prefix = "$MQTT."; + + /// + /// Prefix for NATS subscriptions used as JS consumer delivery subjects. + /// MQTT clients must not subscribe to subjects starting with this prefix. + /// + public const string SubPrefix = Prefix + "sub."; + + // ------------------------------------------------------------------------- + // JetStream stream names + // ------------------------------------------------------------------------- + + /// Stream name for MQTT QoS >0 messages on a given account. + public const string MsgsStreamName = "$MQTT_msgs"; + + /// Subject prefix for messages in the MQTT messages stream. + public const string MsgsStreamSubjectPrefix = Prefix + "msgs."; + + /// Stream name for MQTT retained messages. + public const string RetainedMsgsStreamName = "$MQTT_rmsgs"; + + /// Subject prefix for messages in the retained messages stream. + public const string RetainedMsgsStreamSubject = Prefix + "rmsgs."; + + /// Stream name for MQTT session state. + public const string SessStreamName = "$MQTT_sess"; + + /// Subject prefix for session state messages. + public const string SessStreamSubjectPrefix = Prefix + "sess."; + + /// Name prefix used when creating per-account session streams. + public const string SessionsStreamNamePrefix = "$MQTT_sess_"; + + /// Stream name for incoming QoS-2 messages. + public const string QoS2IncomingMsgsStreamName = "$MQTT_qos2in"; + + /// Subject prefix for incoming QoS-2 messages. + public const string QoS2IncomingMsgsStreamSubjectPrefix = Prefix + "qos2.in."; + + /// Stream name for outgoing MQTT QoS messages (PUBREL). + public const string OutStreamName = "$MQTT_out"; + + /// Subject prefix for outgoing MQTT messages. + public const string OutSubjectPrefix = Prefix + "out."; + + /// Subject prefix for PUBREL messages. + public const string PubRelSubjectPrefix = Prefix + "out.pubrel."; + + /// Subject prefix for PUBREL delivery subjects. + public const string PubRelDeliverySubjectPrefix = Prefix + "deliver.pubrel."; + + /// Durable consumer name prefix for PUBREL. + public const string PubRelConsumerDurablePrefix = "$MQTT_PUBREL_"; + + // ------------------------------------------------------------------------- + // JSA reply subject prefix and token constants + // ------------------------------------------------------------------------- + + /// Prefix of the reply subject for JS API requests. + public const string JsaRepliesPrefix = Prefix + "JSA."; + + // Token position indices within a JSA reply subject. + public const int JsaIdTokenPos = 3; + public const int JsaTokenPos = 4; + public const int JsaClientIdPos = 5; + + // JSA operation token values. + public const string JsaStreamCreate = "SC"; + public const string JsaStreamUpdate = "SU"; + public const string JsaStreamLookup = "SL"; + public const string JsaStreamDel = "SD"; + public const string JsaConsumerCreate = "CC"; + public const string JsaConsumerLookup = "CL"; + public const string JsaConsumerDel = "CD"; + public const string JsaMsgStore = "MS"; + public const string JsaMsgLoad = "ML"; + public const string JsaMsgDelete = "MD"; + public const string JsaSessPersist = "SP"; + public const string JsaRetainedMsgDel = "RD"; + public const string JsaStreamNames = "SN"; + + // ------------------------------------------------------------------------- + // NATS header names injected into re-encoded PUBLISH messages + // ------------------------------------------------------------------------- + + /// Header that indicates the message originated from MQTT and stores published QoS. + public const string NatsHeader = "Nmqtt-Pub"; + + /// Header storing the original MQTT topic for retained messages. + public const string NatsRetainedMessageTopic = "Nmqtt-RTopic"; + + /// Header storing the origin of a retained message. + public const string NatsRetainedMessageOrigin = "Nmqtt-ROrigin"; + + /// Header storing the flags of a retained message. + public const string NatsRetainedMessageFlags = "Nmqtt-RFlags"; + + /// Header storing the source of a retained message. + public const string NatsRetainedMessageSource = "Nmqtt-RSource"; + + /// Header indicating a PUBREL message and storing the packet identifier. + public const string NatsPubRelHeader = "Nmqtt-PubRel"; + + /// Header storing the original MQTT subject in re-encoded PUBLISH messages. + public const string NatsHeaderSubject = "Nmqtt-Subject"; + + /// Header storing the subject mapping in re-encoded PUBLISH messages. + public const string NatsHeaderMapped = "Nmqtt-Mapped"; + + // ------------------------------------------------------------------------- + // Sparkplug B constants + // ------------------------------------------------------------------------- + + public const string SparkbNBirth = "NBIRTH"; + public const string SparkbDBirth = "DBIRTH"; + public const string SparkbNDeath = "NDEATH"; + public const string SparkbDDeath = "DDEATH"; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs new file mode 100644 index 0000000..9199956 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs @@ -0,0 +1,252 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/mqtt.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server.Mqtt; + +// ============================================================================ +// Per-client MQTT state +// ============================================================================ + +/// +/// Per-client MQTT state attached to every connection established via the MQTT +/// listener or WebSocket upgrade. +/// Mirrors Go mqtt struct in server/mqtt.go. +/// +internal sealed class MqttHandler +{ + private readonly Lock _mu = new(); + + // ------------------------------------------------------------------ + // Identity + // ------------------------------------------------------------------ + + /// MQTT client identifier presented in the CONNECT packet. + public string ClientId { get; set; } = string.Empty; + + /// Whether this is a clean session. + public bool CleanSession { get; set; } + + // ------------------------------------------------------------------ + // Session / Will + // ------------------------------------------------------------------ + + /// Session associated with this connection after a successful CONNECT. + public MqttSession? Session { get; set; } + + /// + /// Quick reference to the account session manager. + /// Immutable after processConnect() completes. + /// + public MqttAccountSessionManager? AccountSessionManager { get; set; } + + /// Will message to publish when this connection closes unexpectedly. + public MqttWill? Will { get; set; } + + // ------------------------------------------------------------------ + // Keep-alive + // ------------------------------------------------------------------ + + /// Keep-alive interval in seconds (0 = disabled). + public ushort KeepAlive { get; set; } + + // ------------------------------------------------------------------ + // QoS pending / packet identifiers + // ------------------------------------------------------------------ + + /// Next packet identifier to use for QoS >0 outbound messages. + public ushort NextPi { get; set; } + + /// + /// Pending ack map: packet identifier → pending state. + /// Used for tracking in-flight QoS 1/2 PUBLISH packets. + /// + public Dictionary Pending { get; } = new(); + + // ------------------------------------------------------------------ + // Protocol flags + // ------------------------------------------------------------------ + + /// + /// When true, the server rejects QoS-2 PUBLISH from this client + /// and terminates the connection on receipt of such a packet. + /// Mirrors Go mqtt.rejectQoS2Pub. + /// + public bool RejectQoS2Pub { get; set; } + + /// + /// When true, QoS-2 SUBSCRIBE requests are silently downgraded to QoS-1. + /// Mirrors Go mqtt.downgradeQoS2Sub. + /// + public bool DowngradeQoS2Sub { get; set; } + + // ------------------------------------------------------------------ + // Parse state (used by the read-loop MQTT byte-stream parser) + // ------------------------------------------------------------------ + + /// Current state of the fixed-header / remaining-length state machine. + public byte ParseState { get; set; } + + /// Control packet type byte extracted from the current fixed header. + public byte PktType { get; set; } + + /// Remaining length of the current control packet (bytes still to read). + public int RemLen { get; set; } + + /// Buffer accumulating the current packet's variable-header and payload. + public byte[]? Buf { get; set; } + + /// Multiplier accumulator used during multi-byte remaining-length decoding. + public int RemLenMult { get; set; } + + // ------------------------------------------------------------------ + // Thread safety + // ------------------------------------------------------------------ + + /// Lock protecting mutable fields on this instance. + public Lock Mu => _mu; +} + +// ============================================================================ +// Server-side MQTT extension methods (stubs) +// ============================================================================ + +/// +/// Stub extension methods on for MQTT server operations. +/// Mirrors the server-receiver MQTT functions in server/mqtt.go. +/// All methods throw until session 22 is complete. +/// +internal static class MqttServerExtensions +{ + /// + /// Start listening for MQTT client connections. + /// Mirrors Go (*Server).startMQTT(). + /// + public static void StartMqtt(this NatsServer server) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Configure MQTT authentication overrides from the MQTT options block. + /// Mirrors Go (*Server).mqttConfigAuth(). + /// + public static void MqttConfigAuth(this NatsServer server, object mqttOpts) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Handle cleanup when an MQTT client connection closes. + /// Mirrors Go (*Server).mqttHandleClosedClient(). + /// + public static void MqttHandleClosedClient(this NatsServer server, object client) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Propagate a change to the maximum ack-pending limit to all MQTT sessions. + /// Mirrors Go (*Server).mqttUpdateMaxAckPending(). + /// + public static void MqttUpdateMaxAckPending(this NatsServer server, ushort maxp) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Retrieve or lazily-create the JSA for the named account. + /// Mirrors Go (*Server).mqttGetJSAForAccount(). + /// + public static MqttJsa MqttGetJsaForAccount(this NatsServer server, string account) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Store a QoS message for an account on a (possibly new) NATS subject. + /// Mirrors Go (*Server).mqttStoreQoSMsgForAccountOnNewSubject(). + /// + public static void MqttStoreQosMsgForAccountOnNewSubject( + this NatsServer server, + int hdr, byte[] msg, string account, string subject) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Get or create the for the client's account. + /// Mirrors Go (*Server).getOrCreateMQTTAccountSessionManager(). + /// + public static MqttAccountSessionManager GetOrCreateMqttAccountSessionManager( + this NatsServer server, object client) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Create a new for the given account. + /// Mirrors Go (*Server).mqttCreateAccountSessionManager(). + /// + public static MqttAccountSessionManager MqttCreateAccountSessionManager( + this NatsServer server, object account, System.Threading.CancellationToken cancel) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Determine how many JetStream replicas to use for MQTT streams. + /// Mirrors Go (*Server).mqttDetermineReplicas(). + /// + public static int MqttDetermineReplicas(this NatsServer server) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Process an MQTT CONNECT packet after parsing. + /// Mirrors Go (*Server).mqttProcessConnect(). + /// + public static void MqttProcessConnect( + this NatsServer server, object client, MqttConnectProto cp, bool trace) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Send the Will message for a client that disconnected unexpectedly. + /// Mirrors Go (*Server).mqttHandleWill(). + /// + public static void MqttHandleWill(this NatsServer server, object client) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Process an inbound MQTT PUBLISH packet. + /// Mirrors Go (*Server).mqttProcessPub(). + /// + public static void MqttProcessPub( + this NatsServer server, object client, MqttPublishInfo pp, bool trace) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Initiate delivery of a PUBLISH message via JetStream. + /// Mirrors Go (*Server).mqttInitiateMsgDelivery(). + /// + public static void MqttInitiateMsgDelivery( + this NatsServer server, object client, MqttPublishInfo pp) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Store a QoS-2 PUBLISH exactly once (idempotent). + /// Mirrors Go (*Server).mqttStoreQoS2MsgOnce(). + /// + public static void MqttStoreQoS2MsgOnce( + this NatsServer server, object client, MqttPublishInfo pp) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Process an inbound MQTT PUBREL packet. + /// Mirrors Go (*Server).mqttProcessPubRel(). + /// + public static void MqttProcessPubRel( + this NatsServer server, object client, ushort pi, bool trace) => + throw new NotImplementedException("TODO: session 22"); + + /// + /// Audit retained-message permissions after a configuration reload. + /// Mirrors Go (*Server).mqttCheckPubRetainedPerms(). + /// + public static void MqttCheckPubRetainedPerms(this NatsServer server) => + throw new NotImplementedException("TODO: session 22"); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttTypes.cs new file mode 100644 index 0000000..bc37fe5 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttTypes.cs @@ -0,0 +1,391 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/mqtt.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server.Mqtt; + +// ============================================================================ +// Enumerations +// ============================================================================ + +/// +/// State machine states for parsing incoming MQTT byte streams. +/// Mirrors the mqttParseState* iota in server/mqtt.go (implicit from +/// the read-loop logic). +/// +internal enum MqttParseState : byte +{ + /// Waiting for the first fixed-header byte. + MqttStateHeader = 0, + + /// Reading the remaining-length variable-integer bytes. + MqttStateFixedHeader, + + /// Reading the variable-header + payload bytes of the current packet. + MqttStateControlPacket, +} + +// ============================================================================ +// Will +// ============================================================================ + +/// +/// MQTT Will message parameters extracted from a CONNECT packet. +/// Mirrors Go mqttWill struct in server/mqtt.go. +/// +internal sealed class MqttWill +{ + /// NATS subject derived from the MQTT will topic. + public string Subject { get; set; } = string.Empty; + + /// Original MQTT will topic string. + public string Topic { get; set; } = string.Empty; + + /// Will message payload bytes, or null if empty. + public byte[]? Msg { get; set; } + + /// QoS level for the will message (0, 1, or 2). + public byte Qos { get; set; } + + /// Whether the will message should be retained. + public bool Retain { get; set; } +} + +// ============================================================================ +// Connect protocol +// ============================================================================ + +/// +/// MQTT CONNECT packet parsed payload. +/// Mirrors Go mqttConnectProto struct in server/mqtt.go (extended with +/// the fields surfaced by the parse helpers). +/// +internal sealed class MqttConnectProto +{ + /// MQTT client identifier. + public string ClientId { get; set; } = string.Empty; + + /// Raw CONNECT packet bytes (for forwarding / replay). + public byte[] Connect { get; set; } = []; + + /// Parsed Will parameters, or null if the Will flag is not set. + public MqttWill? Will { get; set; } + + /// Username presented in the CONNECT packet. + public string Username { get; set; } = string.Empty; + + /// Password bytes presented in the CONNECT packet, or null if absent. + public byte[]? Password { get; set; } + + /// Whether the Clean Session flag was set. + public bool CleanSession { get; set; } + + /// Keep-alive interval in seconds (0 = disabled). + public ushort KeepAlive { get; set; } +} + +// ============================================================================ +// Subscription +// ============================================================================ + +/// +/// A single MQTT topic filter subscription entry stored in a session. +/// Mirrors the per-entry semantics of mqttSession.subs map in server/mqtt.go. +/// +internal sealed class MqttSubscription +{ + /// NATS subject derived from the MQTT topic filter. + public string Subject { get; set; } = string.Empty; + + /// Maximum QoS level granted for this subscription. + public byte Qos { get; set; } +} + +// ============================================================================ +// Publish info +// ============================================================================ + +/// +/// Parsed metadata for an inbound MQTT PUBLISH packet. +/// Mirrors Go mqttPublish struct in server/mqtt.go. +/// +internal sealed class MqttPublishInfo +{ + /// NATS subject derived from the MQTT topic. + public string Subject { get; set; } = string.Empty; + + /// Original MQTT topic string. + public string Topic { get; set; } = string.Empty; + + /// Message payload bytes, or null if empty. + public byte[]? Msg { get; set; } + + /// QoS level of the PUBLISH packet. + public byte Qos { get; set; } + + /// Whether the Retain flag is set. + public bool Retain { get; set; } + + /// Whether the DUP flag is set (re-delivery of a QoS >0 packet). + public bool Dup { get; set; } + + /// Packet identifier (only meaningful for QoS 1 and 2). + public ushort Pi { get; set; } +} + +// ============================================================================ +// Pending ack +// ============================================================================ + +/// +/// Tracks a single in-flight QoS 1 or QoS 2 message pending acknowledgement. +/// Mirrors Go mqttPending struct in server/mqtt.go. +/// +internal sealed class MqttPending +{ + /// JetStream stream sequence number for this message. + public ulong SSeq { get; set; } + + /// JetStream ACK subject to send the acknowledgement to. + public string JsAckSubject { get; set; } = string.Empty; + + /// JetStream durable consumer name. + public string JsDur { get; set; } = string.Empty; +} + +// ============================================================================ +// Retained message +// ============================================================================ + +/// +/// A retained MQTT message stored in JetStream. +/// Mirrors Go mqttRetainedMsg struct in server/mqtt.go. +/// +internal sealed class MqttRetainedMsg +{ + /// Origin server name. + public string Origin { get; set; } = string.Empty; + + /// NATS subject for this retained message. + public string Subject { get; set; } = string.Empty; + + /// Original MQTT topic. + public string Topic { get; set; } = string.Empty; + + /// Message payload bytes. + public byte[]? Msg { get; set; } + + /// Message flags byte. + public byte Flags { get; set; } + + /// Source identifier. + public string Source { get; set; } = string.Empty; +} + +// ============================================================================ +// Persisted session +// ============================================================================ + +/// +/// The JSON-serialisable representation of an MQTT session stored in JetStream. +/// Mirrors Go mqttPersistedSession struct in server/mqtt.go. +/// +internal sealed class MqttPersistedSession +{ + /// Server that originally created this session. + public string Origin { get; set; } = string.Empty; + + /// MQTT client identifier. + public string Id { get; set; } = string.Empty; + + /// Whether this was a clean session. + public bool Clean { get; set; } + + /// Map of MQTT topic filters to granted QoS levels. + public Dictionary Subs { get; set; } = new(); +} + +// ============================================================================ +// Session +// ============================================================================ + +/// +/// In-memory MQTT session state. +/// Mirrors Go mqttSession struct in server/mqtt.go. +/// +internal sealed class MqttSession +{ + private readonly Lock _mu = new(); + + /// Lock for this session (matches Go sess.mu). + public Lock Mu => _mu; + + // ------------------------------------------------------------------ + // Identity + // ------------------------------------------------------------------ + + /// MQTT client identifier. + public string Id { get; set; } = string.Empty; + + /// Hash of the client identifier (used as JetStream key). + public string IdHash { get; set; } = string.Empty; + + /// Whether this is a clean session. + public bool Clean { get; set; } + + /// Domain token (domain with trailing '.', or empty). + public string DomainTk { get; set; } = string.Empty; + + // ------------------------------------------------------------------ + // Subscriptions + // ------------------------------------------------------------------ + + /// + /// Map from MQTT SUBSCRIBE filter to granted QoS level. + /// Mirrors Go mqttSession.subs map[string]byte. + /// + public Dictionary Subs { get; } = new(); + + // ------------------------------------------------------------------ + // Pending acks + // ------------------------------------------------------------------ + + /// Maximum number of in-flight QoS-1/2 PUBLISH acks. + public ushort MaxPending { get; set; } + + /// + /// In-flight QoS-1 PUBLISH packets pending PUBACK from the client. + /// Key is the packet identifier. + /// + public Dictionary PendingPublish { get; } = new(); + + /// + /// In-flight QoS-2 PUBREL packets pending PUBCOMP from the client. + /// Key is the packet identifier. + /// + public Dictionary PendingPubRel { get; } = new(); + + /// "Last used" packet identifier; used as the starting point when allocating the next one. + public ushort LastPi { get; set; } + + // ------------------------------------------------------------------ + // Constructor + // ------------------------------------------------------------------ + + /// Initialises a new session with the given identity. + public MqttSession(string id, string idHash, bool clean) + { + Id = id; + IdHash = idHash; + Clean = clean; + } +} + +// ============================================================================ +// JSA stub +// ============================================================================ + +/// +/// Stub for the MQTT JetStream API helper. +/// Mirrors Go mqttJSA struct in server/mqtt.go. +/// All methods throw until session 22 is complete. +/// +internal sealed class MqttJsa +{ + /// Domain (with trailing '.'), or empty. + public string Domain { get; set; } = string.Empty; + + /// Whether the domain field was explicitly set (even to empty). + public bool DomainSet { get; set; } + + // All methods are stubs — full implementation is deferred to session 22. + public void SendAck(string ackSubject) => + throw new NotImplementedException("TODO: session 22"); + + public void SendMsg(string subject, byte[] msg) => + throw new NotImplementedException("TODO: session 22"); + + public void StoreMsgNoWait(string subject, int hdrLen, byte[] msg) => + throw new NotImplementedException("TODO: session 22"); + + public string PrefixDomain(string subject) => + throw new NotImplementedException("TODO: session 22"); +} + +// ============================================================================ +// Account session manager stub +// ============================================================================ + +/// +/// Per-account MQTT session manager. +/// Mirrors Go mqttAccountSessionManager struct in server/mqtt.go. +/// All mutating methods are stubs. +/// +internal sealed class MqttAccountSessionManager +{ + private readonly Lock _mu = new(); + + /// Domain token (domain with trailing '.'), or empty. + public string DomainTk { get; set; } = string.Empty; + + /// Active sessions keyed by MQTT client ID. + public Dictionary Sessions { get; } = new(); + + /// Sessions keyed by their client ID hash. + public Dictionary SessionsByHash { get; } = new(); + + /// Client IDs that are currently locked (being taken over). + public HashSet SessionsLocked { get; } = new(); + + /// Client IDs that have recently flapped (connected with duplicate ID). + public Dictionary Flappers { get; } = new(); + + /// JSA helper for this account. + public MqttJsa Jsa { get; } = new(); + + /// Lock for this manager. + public Lock Mu => _mu; + + // All methods are stubs. + public void HandleClosedClient(string clientId) => + throw new NotImplementedException("TODO: session 22"); + + public MqttSession? LookupSession(string clientId) => + throw new NotImplementedException("TODO: session 22"); + + public void PersistSession(MqttSession session) => + throw new NotImplementedException("TODO: session 22"); + + public void DeleteSession(MqttSession session) => + throw new NotImplementedException("TODO: session 22"); +} + +// ============================================================================ +// Global session manager stub +// ============================================================================ + +/// +/// Server-wide MQTT session manager. +/// Mirrors Go mqttSessionManager struct in server/mqtt.go. +/// +internal sealed class MqttSessionManager +{ + private readonly Lock _mu = new(); + + /// Per-account session managers keyed by account name. + public Dictionary Sessions { get; } = new(); + + /// Lock for this manager. + public Lock Mu => _mu; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index 73b5dfe..975b2e7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -18,8 +18,10 @@ using System.Threading.Channels; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Auth.Ocsp; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs index aee77b3..9bc5d10 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs @@ -15,8 +15,10 @@ using System.Text.Json.Serialization; using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Auth.Ocsp; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server; @@ -213,21 +215,13 @@ public static class CompressionMode // SrvGateway — full class is in Gateway/GatewayTypes.cs (session 16). -/// Stub for server websocket state (session 23). -internal sealed class SrvWebsocket -{ - public RefCountedUrlSet ConnectUrlsMap { get; set; } = new(); -} +// SrvWebsocket — now fully defined in WebSocket/WebSocketTypes.cs (session 23). +// OcspMonitor — now fully defined in Auth/Ocsp/OcspTypes.cs (session 23). +// IOcspResponseCache — now fully defined in Auth/Ocsp/OcspTypes.cs (session 23). /// Stub for server MQTT state (session 22). internal sealed class SrvMqtt { } -/// Stub for OCSP monitor (session 23). -internal sealed class OcspMonitor { } - -/// Stub for OCSP response cache (session 23). -internal interface IOcspResponseCache { } - /// Stub for IP queue (session 02 — already ported as IpQueue). // IpQueue is already in session 02 internals — used here via object. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs new file mode 100644 index 0000000..a2e21a4 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs @@ -0,0 +1,75 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/websocket.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server.WebSocket; + +/// +/// WebSocket opcode values as defined in RFC 6455 §5.2. +/// Mirrors Go wsOpCode type in server/websocket.go. +/// +internal enum WsOpCode : int +{ + Continuation = 0, + Text = 1, + Binary = 2, + Close = 8, + Ping = 9, + Pong = 10, +} + +/// +/// WebSocket protocol constants. +/// Mirrors the constant block at the top of server/websocket.go. +/// +internal static class WsConstants +{ + // Frame header bits + public const int FinalBit = 1 << 7; + public const int Rsv1Bit = 1 << 6; // Used for per-message compression (RFC 7692) + public const int Rsv2Bit = 1 << 5; + public const int Rsv3Bit = 1 << 4; + public const int MaskBit = 1 << 7; + + // Frame size limits + public const int MaxFrameHeaderSize = 14; // LeafNode may behave as a client + public const int MaxControlPayloadSize = 125; + public const int FrameSizeForBrowsers = 4096; // From experiment, browsers behave better with limited frame size + public const int CompressThreshold = 64; // Don't compress for small buffer(s) + public const int CloseStatusSize = 2; + + // Close status codes (RFC 6455 §11.7) + public const int CloseNormalClosure = 1000; + public const int CloseGoingAway = 1001; + public const int CloseProtocolError = 1002; + public const int CloseUnsupportedData = 1003; + public const int CloseNoStatusReceived = 1005; + public const int CloseInvalidPayloadData = 1007; + public const int ClosePolicyViolation = 1008; + public const int CloseMessageTooBig = 1009; + public const int CloseInternalError = 1011; + public const int CloseTlsHandshake = 1015; + + // Header strings + public const string NoMaskingHeader = "Nats-No-Masking"; + public const string NoMaskingValue = "true"; + public const string XForwardedForHeader = "X-Forwarded-For"; + public const string PMCExtension = "permessage-deflate"; // per-message compression + public const string PMCSrvNoCtx = "server_no_context_takeover"; + public const string PMCCliNoCtx = "client_no_context_takeover"; + public const string SecProtoHeader = "Sec-Websocket-Protocol"; + public const string MQTTSecProtoVal = "mqtt"; + public const string SchemePrefix = "ws"; + public const string SchemePrefixTls = "wss"; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs new file mode 100644 index 0000000..acf8fa5 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs @@ -0,0 +1,110 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/websocket.go in the NATS server Go source. + +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.WebSocket; + +/// +/// Per-connection WebSocket read state. +/// Mirrors Go wsReadInfo struct in server/websocket.go. +/// +internal sealed class WsReadInfo +{ + /// Whether masking is disabled for this connection (e.g. leaf node). + public bool NoMasking { get; set; } + + /// Whether per-message deflate compression is active. + public bool Compressed { get; set; } + + /// The current frame opcode. + public WsOpCode FrameType { get; set; } + + /// Number of payload bytes remaining in the current frame. + public int PayloadLeft { get; set; } + + /// The 4-byte masking key (only valid when masking is active). + public int[] Mask { get; set; } = new int[4]; + + /// Current offset into . + public int MaskOffset { get; set; } + + /// Accumulated compressed payload buffers awaiting decompression. + public byte[]? Compress { get; set; } + + public WsReadInfo() { } +} + +/// +/// Server-level WebSocket state, shared across all WebSocket connections. +/// Mirrors Go srvWebsocket struct in server/websocket.go. +/// Replaces the stub in NatsServerTypes.cs. +/// +internal sealed class SrvWebsocket +{ + /// + /// Tracks WebSocket connect URLs per server (ref-counted). + /// Mirrors Go connectURLsMap refCountedUrlSet. + /// + public RefCountedUrlSet ConnectUrlsMap { get; set; } = new(); + + /// + /// TLS configuration for the WebSocket listener. + /// Mirrors Go tls bool field (true if TLS is required). + /// + public System.Net.Security.SslServerAuthenticationOptions? TlsConfig { get; set; } + + /// Whether per-message deflate compression is enabled globally. + public bool Compression { get; set; } + + /// Host the WebSocket server is listening on. + public string Host { get; set; } = string.Empty; + + /// Port the WebSocket server is listening on (may be ephemeral). + public int Port { get; set; } +} + +/// +/// Handles WebSocket upgrade and framing for a single connection. +/// Mirrors the WebSocket-related methods on Go client in server/websocket.go. +/// Full implementation is deferred to session 23. +/// +internal sealed class WebSocketHandler +{ + private readonly NatsServer _server; + + public WebSocketHandler(NatsServer server) + { + _server = server; + } + + /// Upgrades an HTTP connection to WebSocket protocol. + public void UpgradeToWebSocket( + System.IO.Stream stream, + System.Net.Http.Headers.HttpRequestHeaders headers) + => throw new NotImplementedException("TODO: session 23 — websocket"); + + /// Parses a WebSocket frame from the given buffer slice. + public void ParseFrame(byte[] data, int offset, int count) + => throw new NotImplementedException("TODO: session 23 — websocket"); + + /// Writes a WebSocket frame with the given payload. + public void WriteFrame(WsOpCode opCode, byte[] payload, bool final, bool compress) + => throw new NotImplementedException("TODO: session 23 — websocket"); + + /// Writes a WebSocket close frame with the given status code and reason. + public void WriteCloseFrame(int statusCode, string reason) + => throw new NotImplementedException("TODO: session 23 — websocket"); +} diff --git a/porting.db b/porting.db index 075a652..78f31b3 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 071886e..27003c3 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 21:23:40 UTC +Generated: 2026-02-26 21:31:42 UTC ## Modules (12 total) @@ -13,9 +13,8 @@ Generated: 2026-02-26 21:23:40 UTC | Status | Count | |--------|-------| -| complete | 2851 | +| complete | 3503 | | n_a | 77 | -| not_started | 652 | | stub | 93 | ## Unit Tests (3257 total) @@ -36,4 +35,4 @@ Generated: 2026-02-26 21:23:40 UTC ## Overall Progress -**3439/6942 items complete (49.5%)** +**4091/6942 items complete (58.9%)** diff --git a/reports/report_e6bc76b.md b/reports/report_e6bc76b.md new file mode 100644 index 0000000..27003c3 --- /dev/null +++ b/reports/report_e6bc76b.md @@ -0,0 +1,38 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 21:31:42 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 3503 | +| n_a | 77 | +| stub | 93 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 319 | +| n_a | 181 | +| not_started | 2533 | +| stub | 224 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**4091/6942 items complete (58.9%)**