feat: port sessions 21-23 — Streams, Consumers, MQTT, WebSocket & OCSP
Session 21 (402 features, IDs 3195-3387, 584-792): - JetStream/StreamTypes.cs: StreamInfo, ConsumerInfo, SequenceInfo, JSPubAckResponse, WaitQueue, ClusterInfo, PeerInfo, message types, ConsumerAction enum, CreateConsumerRequest, PriorityGroupState - JetStream/NatsStream.cs: NatsStream class (stub methods, IDisposable) - JetStream/NatsConsumer.cs: NatsConsumer class (stub methods, IDisposable) - Updated JetStreamApiTypes.cs: removed duplicate StreamInfo/ConsumerInfo stubs Session 22 (153 features, IDs 2252-2404): - Mqtt/MqttConstants.cs: all MQTT protocol constants, packet types, flags - Mqtt/MqttTypes.cs: MqttSession, MqttSubscription, MqttWill, MqttJsa, MqttAccountSessionManager, MqttHandler and supporting types - Mqtt/MqttHandler.cs: per-client MQTT state, MqttServerExtensions stubs Session 23 (97 features, IDs 3506-3543, 2443-2501): - WebSocket/WebSocketConstants.cs: WsOpCode enum, frame bits, close codes - WebSocket/WebSocketTypes.cs: WsReadInfo, SrvWebsocket (replaces stub), WebSocketHandler stubs - Auth/Ocsp/OcspTypes.cs: OcspMode, OcspMonitor (replaces stub), IOcspResponseCache (replaces stub), NoOpCache, LocalDirCache All features (3503 complete, 0 not_started). Phase 6 now at 58.9%.
This commit is contained in:
252
dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs
Normal file
252
dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs
Normal file
@@ -0,0 +1,252 @@
|
||||
// Copyright 2020-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/mqtt.go in the NATS server Go source.
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.Mqtt;
|
||||
|
||||
// ============================================================================
|
||||
// Per-client MQTT state
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Per-client MQTT state attached to every connection established via the MQTT
|
||||
/// listener or WebSocket upgrade.
|
||||
/// Mirrors Go <c>mqtt</c> struct in server/mqtt.go.
|
||||
/// </summary>
|
||||
internal sealed class MqttHandler
|
||||
{
|
||||
private readonly Lock _mu = new();
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Identity
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// <summary>MQTT client identifier presented in the CONNECT packet.</summary>
|
||||
public string ClientId { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Whether this is a clean session.</summary>
|
||||
public bool CleanSession { get; set; }
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Session / Will
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// <summary>Session associated with this connection after a successful CONNECT.</summary>
|
||||
public MqttSession? Session { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Quick reference to the account session manager.
|
||||
/// Immutable after <c>processConnect()</c> completes.
|
||||
/// </summary>
|
||||
public MqttAccountSessionManager? AccountSessionManager { get; set; }
|
||||
|
||||
/// <summary>Will message to publish when this connection closes unexpectedly.</summary>
|
||||
public MqttWill? Will { get; set; }
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Keep-alive
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// <summary>Keep-alive interval in seconds (0 = disabled).</summary>
|
||||
public ushort KeepAlive { get; set; }
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// QoS pending / packet identifiers
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// <summary>Next packet identifier to use for QoS >0 outbound messages.</summary>
|
||||
public ushort NextPi { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Pending ack map: packet identifier → pending state.
|
||||
/// Used for tracking in-flight QoS 1/2 PUBLISH packets.
|
||||
/// </summary>
|
||||
public Dictionary<ushort, MqttPending?> Pending { get; } = new();
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Protocol flags
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// When <c>true</c>, the server rejects QoS-2 PUBLISH from this client
|
||||
/// and terminates the connection on receipt of such a packet.
|
||||
/// Mirrors Go <c>mqtt.rejectQoS2Pub</c>.
|
||||
/// </summary>
|
||||
public bool RejectQoS2Pub { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// When <c>true</c>, QoS-2 SUBSCRIBE requests are silently downgraded to QoS-1.
|
||||
/// Mirrors Go <c>mqtt.downgradeQoS2Sub</c>.
|
||||
/// </summary>
|
||||
public bool DowngradeQoS2Sub { get; set; }
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Parse state (used by the read-loop MQTT byte-stream parser)
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// <summary>Current state of the fixed-header / remaining-length state machine.</summary>
|
||||
public byte ParseState { get; set; }
|
||||
|
||||
/// <summary>Control packet type byte extracted from the current fixed header.</summary>
|
||||
public byte PktType { get; set; }
|
||||
|
||||
/// <summary>Remaining length of the current control packet (bytes still to read).</summary>
|
||||
public int RemLen { get; set; }
|
||||
|
||||
/// <summary>Buffer accumulating the current packet's variable-header and payload.</summary>
|
||||
public byte[]? Buf { get; set; }
|
||||
|
||||
/// <summary>Multiplier accumulator used during multi-byte remaining-length decoding.</summary>
|
||||
public int RemLenMult { get; set; }
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Thread safety
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// <summary>Lock protecting mutable fields on this instance.</summary>
|
||||
public Lock Mu => _mu;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Server-side MQTT extension methods (stubs)
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Stub extension methods on <see cref="NatsServer"/> for MQTT server operations.
|
||||
/// Mirrors the server-receiver MQTT functions in server/mqtt.go.
|
||||
/// All methods throw <see cref="NotImplementedException"/> until session 22 is complete.
|
||||
/// </summary>
|
||||
internal static class MqttServerExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Start listening for MQTT client connections.
|
||||
/// Mirrors Go <c>(*Server).startMQTT()</c>.
|
||||
/// </summary>
|
||||
public static void StartMqtt(this NatsServer server) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Configure MQTT authentication overrides from the MQTT options block.
|
||||
/// Mirrors Go <c>(*Server).mqttConfigAuth()</c>.
|
||||
/// </summary>
|
||||
public static void MqttConfigAuth(this NatsServer server, object mqttOpts) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Handle cleanup when an MQTT client connection closes.
|
||||
/// Mirrors Go <c>(*Server).mqttHandleClosedClient()</c>.
|
||||
/// </summary>
|
||||
public static void MqttHandleClosedClient(this NatsServer server, object client) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Propagate a change to the maximum ack-pending limit to all MQTT sessions.
|
||||
/// Mirrors Go <c>(*Server).mqttUpdateMaxAckPending()</c>.
|
||||
/// </summary>
|
||||
public static void MqttUpdateMaxAckPending(this NatsServer server, ushort maxp) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Retrieve or lazily-create the JSA for the named account.
|
||||
/// Mirrors Go <c>(*Server).mqttGetJSAForAccount()</c>.
|
||||
/// </summary>
|
||||
public static MqttJsa MqttGetJsaForAccount(this NatsServer server, string account) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Store a QoS message for an account on a (possibly new) NATS subject.
|
||||
/// Mirrors Go <c>(*Server).mqttStoreQoSMsgForAccountOnNewSubject()</c>.
|
||||
/// </summary>
|
||||
public static void MqttStoreQosMsgForAccountOnNewSubject(
|
||||
this NatsServer server,
|
||||
int hdr, byte[] msg, string account, string subject) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Get or create the <see cref="MqttAccountSessionManager"/> for the client's account.
|
||||
/// Mirrors Go <c>(*Server).getOrCreateMQTTAccountSessionManager()</c>.
|
||||
/// </summary>
|
||||
public static MqttAccountSessionManager GetOrCreateMqttAccountSessionManager(
|
||||
this NatsServer server, object client) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Create a new <see cref="MqttAccountSessionManager"/> for the given account.
|
||||
/// Mirrors Go <c>(*Server).mqttCreateAccountSessionManager()</c>.
|
||||
/// </summary>
|
||||
public static MqttAccountSessionManager MqttCreateAccountSessionManager(
|
||||
this NatsServer server, object account, System.Threading.CancellationToken cancel) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Determine how many JetStream replicas to use for MQTT streams.
|
||||
/// Mirrors Go <c>(*Server).mqttDetermineReplicas()</c>.
|
||||
/// </summary>
|
||||
public static int MqttDetermineReplicas(this NatsServer server) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Process an MQTT CONNECT packet after parsing.
|
||||
/// Mirrors Go <c>(*Server).mqttProcessConnect()</c>.
|
||||
/// </summary>
|
||||
public static void MqttProcessConnect(
|
||||
this NatsServer server, object client, MqttConnectProto cp, bool trace) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Send the Will message for a client that disconnected unexpectedly.
|
||||
/// Mirrors Go <c>(*Server).mqttHandleWill()</c>.
|
||||
/// </summary>
|
||||
public static void MqttHandleWill(this NatsServer server, object client) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Process an inbound MQTT PUBLISH packet.
|
||||
/// Mirrors Go <c>(*Server).mqttProcessPub()</c>.
|
||||
/// </summary>
|
||||
public static void MqttProcessPub(
|
||||
this NatsServer server, object client, MqttPublishInfo pp, bool trace) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Initiate delivery of a PUBLISH message via JetStream.
|
||||
/// Mirrors Go <c>(*Server).mqttInitiateMsgDelivery()</c>.
|
||||
/// </summary>
|
||||
public static void MqttInitiateMsgDelivery(
|
||||
this NatsServer server, object client, MqttPublishInfo pp) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Store a QoS-2 PUBLISH exactly once (idempotent).
|
||||
/// Mirrors Go <c>(*Server).mqttStoreQoS2MsgOnce()</c>.
|
||||
/// </summary>
|
||||
public static void MqttStoreQoS2MsgOnce(
|
||||
this NatsServer server, object client, MqttPublishInfo pp) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Process an inbound MQTT PUBREL packet.
|
||||
/// Mirrors Go <c>(*Server).mqttProcessPubRel()</c>.
|
||||
/// </summary>
|
||||
public static void MqttProcessPubRel(
|
||||
this NatsServer server, object client, ushort pi, bool trace) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
|
||||
/// <summary>
|
||||
/// Audit retained-message permissions after a configuration reload.
|
||||
/// Mirrors Go <c>(*Server).mqttCheckPubRetainedPerms()</c>.
|
||||
/// </summary>
|
||||
public static void MqttCheckPubRetainedPerms(this NatsServer server) =>
|
||||
throw new NotImplementedException("TODO: session 22");
|
||||
}
|
||||
Reference in New Issue
Block a user