Merge branch 'worktree-agent-adb1298d'

# Conflicts:
#	dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs
This commit is contained in:
Joseph Doherty
2026-03-01 10:16:19 -05:00
7 changed files with 1968 additions and 84 deletions

View File

@@ -0,0 +1,41 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/mqtt.go in the NATS server Go source.
using ZB.MOM.NatsNet.Server.Mqtt;
namespace ZB.MOM.NatsNet.Server;
// ============================================================================
// ClientConnection — MQTT partial (Sub-batch E, features 2257-2384)
// ============================================================================
public sealed partial class ClientConnection
{
// ------------------------------------------------------------------
// MQTT state field
// ------------------------------------------------------------------
/// <summary>
/// Per-connection MQTT handler, non-null only for MQTT client connections.
/// Mirrors Go <c>client.mqtt *mqtt</c> field in server/mqtt.go.
/// </summary>
internal MqttHandler? Mqtt { get; private set; }
/// <summary>
/// Attaches an MQTT handler to this connection.
/// Called during CONNECT processing.
/// </summary>
internal void InitMqtt(MqttHandler handler) => Mqtt = handler;
}

View File

@@ -14,6 +14,7 @@
// Adapted from server/client.go (subscription struct) in the NATS server Go source.
using ZB.MOM.NatsNet.Server;
using ZB.MOM.NatsNet.Server.Mqtt;
namespace ZB.MOM.NatsNet.Server.Internal;
@@ -49,6 +50,21 @@ public sealed class Subscription
/// </summary>
public SysMsgHandler? SysMsgCb { get; set; }
/// <summary>
/// MQTT-specific metadata attached to this subscription.
/// Non-null only for subscriptions created by MQTT clients.
/// Mirrors Go <c>subscription.mqtt *mqttSub</c>.
/// </summary>
internal MqttSub? Mqtt { get; set; }
/// <summary>
/// Internal delivery callback for server-side subscriptions (e.g. JSA subscriptions).
/// When set, this delegate is called instead of routing via the normal client connection.
/// Mirrors Go <c>subscription.icb msgHandler</c>.
/// </summary>
internal Action<Subscription?, ClientConnection?, INatsAccount?, string, string, byte[]>?
InternalCallback { get; set; }
/// <summary>Marks this subscription as closed.</summary>
public void Close() => Interlocked.Exchange(ref _closed, 1);

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,184 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/mqtt.go in the NATS server Go source.
using System.Text;
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server.Mqtt;
// ============================================================================
// MqttHelpers — Standalone helpers (Sub-batch F, features 2264-2404)
// ============================================================================
/// <summary>
/// Static helper methods for MQTT protocol processing.
/// Mirrors various free functions and helper methods in server/mqtt.go.
/// </summary>
internal static class MqttHelpers
{
// ------------------------------------------------------------------
// Feature 2264: mqttIsReservedSubscription
// ------------------------------------------------------------------
/// <summary>
/// Returns <c>true</c> if the given topic filter is a reserved MQTT topic
/// (i.e. starts with <c>$</c> or begins with <see cref="MqttTopics.SubPrefix"/>).
/// Mirrors Go <c>mqttIsReservedSubscription()</c> in server/mqtt.go.
/// </summary>
public static bool IsMqttReservedSubscription(string filter)
{
if (string.IsNullOrEmpty(filter)) return false;
return filter[0] == '$' || filter.StartsWith(MqttTopics.SubPrefix, StringComparison.Ordinal);
}
// ------------------------------------------------------------------
// Feature 2265: mqttDecodeRetainedMessage
// ------------------------------------------------------------------
/// <summary>
/// Decodes a retained message from the JetStream stored message headers and body.
/// Returns <c>null</c> if the message cannot be decoded.
/// Mirrors Go <c>mqttDecodeRetainedMessage()</c> in server/mqtt.go.
/// </summary>
public static MqttRetainedMsg? DecodeRetainedMessage(string subject, byte[]? hdr, byte[]? body)
{
if (string.IsNullOrEmpty(subject)) return null;
// Attempt JSON deserialization first (modern format).
if (body != null && body.Length > 0)
{
try
{
var rm = JsonSerializer.Deserialize<MqttRetainedMsg>(body);
if (rm != null)
{
// Fill in the subject from the NATS subject if not in body.
if (string.IsNullOrEmpty(rm.Subject))
rm.Subject = subject;
if (string.IsNullOrEmpty(rm.Topic))
rm.Topic = Encoding.UTF8.GetString(MqttSubjectConverter.NatsSubjectStrToMqttTopic(subject));
return rm;
}
}
catch
{
// Fall through to header-based decode.
}
}
// Legacy: topic encoded in NATS subject via subject converter.
byte[] topicBytes = MqttSubjectConverter.NatsSubjectStrToMqttTopic(subject);
string topic = topicBytes.Length > 0 ? Encoding.UTF8.GetString(topicBytes) : string.Empty;
if (string.IsNullOrEmpty(topic)) return null;
// Parse QoS flag and origin from NATS message headers.
byte flags = 0;
string origin = string.Empty;
if (hdr != null && hdr.Length > 0)
{
string hdrStr = Encoding.UTF8.GetString(hdr);
foreach (var line in hdrStr.Split('\n', StringSplitOptions.RemoveEmptyEntries))
{
var trimmed = line.Trim();
if (trimmed.StartsWith(MqttTopics.NatsRetainedMessageOrigin + ":", StringComparison.OrdinalIgnoreCase))
origin = trimmed[(MqttTopics.NatsRetainedMessageOrigin.Length + 1)..].Trim();
else if (trimmed.StartsWith(MqttTopics.NatsRetainedMessageFlags + ":", StringComparison.OrdinalIgnoreCase))
_ = byte.TryParse(trimmed[(MqttTopics.NatsRetainedMessageFlags.Length + 1)..].Trim(), out flags);
}
}
return new MqttRetainedMsg
{
Origin = origin,
Subject = subject,
Topic = topic,
Msg = body,
Flags = flags,
};
}
// ------------------------------------------------------------------
// Feature 2266: mqttGeneratePubPerms
// ------------------------------------------------------------------
/// <summary>
/// Generates a publish-permissions filter for retained-message authorization.
/// Mirrors Go <c>mqttGeneratePubPerms()</c> in server/mqtt.go.
/// </summary>
public static MqttPubPerms GeneratePubPerms(
IEnumerable<string>? allow,
IEnumerable<string>? deny)
{
var perms = new MqttPubPerms();
if (allow != null)
{
foreach (var a in allow)
{
if (a == ">")
perms.AllowAll = true;
else
perms.Allow.Add(a);
}
}
if (deny != null)
{
foreach (var d in deny)
perms.Deny.Add(d);
}
return perms;
}
// ------------------------------------------------------------------
// Feature 2267: mqttCheckPubRetainedPerms
// ------------------------------------------------------------------
/// <summary>
/// Checks whether the subject is permitted for retained-message publishing.
/// Mirrors Go <c>mqttCheckPubRetainedPerms()</c> in server/mqtt.go.
/// </summary>
public static bool CheckPubRetainedPerms(MqttPubPerms perms, string subject)
{
if (perms.AllowAll)
{
// Check deny list.
return !perms.Deny.Contains(subject);
}
if (!perms.Allow.Contains(subject)) return false;
return !perms.Deny.Contains(subject);
}
// ------------------------------------------------------------------
// Feature 2268: mqttTopicFilter / helper utilities
// ------------------------------------------------------------------
/// <summary>
/// Returns <c>true</c> if the topic filter contains wildcard characters.
/// Mirrors Go <c>mqttTopicFilterContainsWildcard()</c>.
/// </summary>
public static bool TopicFilterContainsWildcard(string filter) =>
filter.Contains('+') || filter.Contains('#');
/// <summary>
/// Returns the MQTT topic as a NATS subject (for publish/subscribe).
/// Thin wrapper around <see cref="MqttSubjectConverter.MqttTopicToNatsPubSubject"/>.
/// </summary>
public static string TopicToNatsSubject(string topic)
{
byte[] subjBytes = MqttSubjectConverter.MqttTopicToNatsPubSubject(
Encoding.UTF8.GetBytes(topic));
return Encoding.UTF8.GetString(subjBytes);
}
}

View File

@@ -0,0 +1,593 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/mqtt.go in the NATS server Go source.
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
// All JetStream API types (StreamInfo, ConsumerConfig, JSPubAckResponse, etc.)
// live in the parent namespace ZB.MOM.NatsNet.Server and are accessible here.
namespace ZB.MOM.NatsNet.Server.Mqtt;
// ============================================================================
// Helper types
// ============================================================================
/// <summary>
/// A pending JS API request message to be sent via the JSA send queue.
/// Mirrors Go <c>mqttJSPubMsg</c> struct in server/mqtt.go.
/// </summary>
internal sealed class MqttJsPubMsg
{
public string Subject { get; set; } = string.Empty;
public string Reply { get; set; } = string.Empty;
public int Hdr { get; set; }
public byte[]? Msg { get; set; }
}
/// <summary>
/// A response received from a JS API request.
/// Mirrors Go <c>mqttJSAResponse</c> struct in server/mqtt.go.
/// </summary>
internal sealed class MqttJsaResponse
{
public string Reply { get; set; } = string.Empty;
public object? Value { get; set; }
}
/// <summary>
/// Deletion request for a retained message (JSON payload).
/// Mirrors Go <c>mqttRetMsgDel</c> struct in server/mqtt.go.
/// </summary>
internal sealed class MqttRetMsgDel
{
public string Subject { get; set; } = string.Empty;
public ulong Seq { get; set; }
}
// ============================================================================
// MqttJsa — JetStream API bridge (Sub-batch A: features 2269-2290)
// ============================================================================
/// <summary>
/// Per-account MQTT JetStream API bridge.
/// Mirrors Go <c>mqttJSA</c> struct in server/mqtt.go.
/// Sends requests to JS API subjects and waits for replies via a reply subject.
/// </summary>
internal sealed class MqttJsa
{
private readonly Lock _mu = new();
// ------------------------------------------------------------------
// Identity / routing
// ------------------------------------------------------------------
/// <summary>Node identifier (server name hash used in reply subjects).</summary>
public string Id { get; set; } = string.Empty;
/// <summary>Internal client connection used to send JS API messages.</summary>
public ClientConnection? Client { get; set; }
/// <summary>Reply subject prefix: <c>$MQTT.JSA.{id}.</c></summary>
public string Rplyr { get; set; } = string.Empty;
/// <summary>Send queue for pending JS API messages.</summary>
public Channel<MqttJsPubMsg> SendQ { get; } = Channel.CreateUnbounded<MqttJsPubMsg>(
new UnboundedChannelOptions { SingleReader = true });
/// <summary>
/// Map of reply subjects to their response channels.
/// Mirrors Go <c>replies sync.Map</c> in mqttJSA.
/// </summary>
public ConcurrentDictionary<string, Channel<MqttJsaResponse>> Replies { get; } = new();
/// <summary>Monotonically increasing counter used for unique reply IDs.</summary>
private long _nuid;
/// <summary>Cancellation token (mirrors Go <c>quitCh chan struct{}</c>).</summary>
public CancellationToken QuitCt { get; set; }
/// <summary>JS domain (may be empty).</summary>
public string Domain { get; set; } = string.Empty;
/// <summary>Whether <see cref="Domain"/> was explicitly set (even to empty).</summary>
public bool DomainSet { get; set; }
/// <summary>Timeout for JS API requests (mirrors Go <c>timeout time.Duration</c>).</summary>
public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(5);
// ------------------------------------------------------------------
// Feature 2270: prefixDomain
// ------------------------------------------------------------------
/// <summary>
/// Rewrites a JS API subject to include the configured domain.
/// Mirrors Go <c>(*mqttJSA).prefixDomain()</c>.
/// </summary>
public string PrefixDomain(string subject)
{
if (string.IsNullOrEmpty(Domain))
return subject;
const string jsApiPrefix = "$JS.API.";
if (subject.StartsWith(jsApiPrefix, StringComparison.Ordinal))
{
var sub = subject[jsApiPrefix.Length..];
return $"$JS.{Domain}.API.{sub}";
}
return subject;
}
// ------------------------------------------------------------------
// Feature 2269: newRequest
// ------------------------------------------------------------------
/// <summary>
/// Creates a new JS API request (single message, no client-ID hash).
/// Mirrors Go <c>(*mqttJSA).newRequest()</c>.
/// </summary>
public async Task<object?> NewRequestAsync(
string kind, string subject, int hdr, byte[]? msg,
CancellationToken ct = default)
{
var responses = await NewRequestExMultiAsync(
kind, subject, string.Empty, [hdr], [msg ?? []], ct);
if (responses.Length != 1)
throw new InvalidOperationException(
$"unreachable: invalid number of responses ({responses.Length})");
return responses[0]?.Value;
}
// ------------------------------------------------------------------
// Feature 2271: newRequestEx
// ------------------------------------------------------------------
/// <summary>
/// Extended request with an optional client-ID hash embedded in the reply subject.
/// Mirrors Go <c>(*mqttJSA).newRequestEx()</c>.
/// </summary>
public async Task<object?> NewRequestExAsync(
string kind, string subject, string cidHash, int hdr, byte[]? msg,
CancellationToken ct = default)
{
var responses = await NewRequestExMultiAsync(
kind, subject, cidHash, [hdr], [msg ?? []], ct);
if (responses.Length != 1)
throw new InvalidOperationException(
$"unreachable: invalid number of responses ({responses.Length})");
return responses[0]?.Value;
}
// ------------------------------------------------------------------
// Feature 2272: newRequestExMulti
// ------------------------------------------------------------------
/// <summary>
/// Sends multiple messages on the same subject and waits for all responses.
/// Returns a sparse array (same length as <paramref name="msgs"/>)
/// where timed-out entries remain <c>null</c>.
/// Mirrors Go <c>(*mqttJSA).newRequestExMulti()</c>.
/// </summary>
public async Task<MqttJsaResponse?[]> NewRequestExMultiAsync(
string kind, string subject, string cidHash,
int[] hdrs, byte[][] msgs,
CancellationToken ct = default)
{
if (hdrs.Length != msgs.Length)
throw new ArgumentException(
$"mismatched hdrs ({hdrs.Length}) and msgs ({msgs.Length}) counts");
var responseCh = Channel.CreateBounded<MqttJsaResponse>(
new BoundedChannelOptions(msgs.Length) { FullMode = BoundedChannelFullMode.Wait });
var replyToIndex = new Dictionary<string, int>(msgs.Length);
var responses = new MqttJsaResponse?[msgs.Length];
for (int i = 0; i < msgs.Length; i++)
{
string uid;
string replyBase;
lock (_mu)
{
uid = Interlocked.Increment(ref _nuid).ToString();
replyBase = Rplyr;
}
var sb = new StringBuilder(replyBase);
sb.Append(kind);
sb.Append('.');
if (!string.IsNullOrEmpty(cidHash))
{
sb.Append(cidHash);
sb.Append('.');
}
sb.Append(uid);
string reply = sb.ToString();
Replies[reply] = responseCh;
string finalSubject = PrefixDomain(subject);
await SendQ.Writer.WriteAsync(new MqttJsPubMsg
{
Subject = finalSubject,
Reply = reply,
Hdr = hdrs[i],
Msg = msgs[i],
}, ct);
replyToIndex[reply] = i;
}
int count = 0;
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, QuitCt);
linked.CancelAfter(Timeout);
try
{
while (count < msgs.Length)
{
var r = await responseCh.Reader.ReadAsync(linked.Token);
if (replyToIndex.TryGetValue(r.Reply, out int idx))
{
responses[idx] = r;
count++;
}
}
}
catch (OperationCanceledException) when (QuitCt.IsCancellationRequested)
{
throw new InvalidOperationException("server not running");
}
catch (OperationCanceledException)
{
// Timeout — clean up pending reply registrations.
foreach (var key in replyToIndex.Keys)
Replies.TryRemove(key, out _);
}
return responses;
}
// ------------------------------------------------------------------
// Feature 2273: sendAck
// ------------------------------------------------------------------
/// <summary>
/// Sends a JS acknowledgement to the given ack subject (empty payload).
/// Mirrors Go <c>(*mqttJSA).sendAck()</c>.
/// </summary>
public void SendAck(string ackSubject) => SendMsg(ackSubject, null);
// ------------------------------------------------------------------
// Feature 2274: sendMsg
// ------------------------------------------------------------------
/// <summary>
/// Queues a message for fire-and-forget delivery (no JS reply).
/// Mirrors Go <c>(*mqttJSA).sendMsg()</c>.
/// </summary>
public void SendMsg(string subj, byte[]? msg)
{
if (string.IsNullOrEmpty(subj))
return;
// hdr = -1 signals the send loop to skip adding client-info header.
SendQ.Writer.TryWrite(new MqttJsPubMsg { Subject = subj, Msg = msg, Hdr = -1 });
}
// ------------------------------------------------------------------
// Feature 2275: createEphemeralConsumer
// ------------------------------------------------------------------
/// <summary>
/// Creates an ephemeral JetStream consumer for MQTT delivery.
/// Mirrors Go <c>(*mqttJSA).createEphemeralConsumer()</c>.
/// </summary>
public async Task<JsApiConsumerCreateResponse?> CreateEphemeralConsumerAsync(
CreateConsumerRequest cfg, CancellationToken ct = default)
{
var cfgBytes = JsonSerializer.SerializeToUtf8Bytes(cfg);
string subj = string.Format(JsApiSubjects.JsApiConsumerCreateT, cfg.Stream);
var resp = await NewRequestAsync(MqttTopics.JsaConsumerCreate, subj, 0, cfgBytes, ct);
return resp as JsApiConsumerCreateResponse;
}
// ------------------------------------------------------------------
// Feature 2276: createDurableConsumer
// ------------------------------------------------------------------
/// <summary>
/// Creates a durable JetStream consumer for MQTT QoS delivery.
/// Mirrors Go <c>(*mqttJSA).createDurableConsumer()</c>.
/// </summary>
public async Task<JsApiConsumerCreateResponse?> CreateDurableConsumerAsync(
CreateConsumerRequest cfg, CancellationToken ct = default)
{
var cfgBytes = JsonSerializer.SerializeToUtf8Bytes(cfg);
string durable = cfg.Config?.Durable ?? string.Empty;
string subj = string.Format(JsApiSubjects.JsApiDurableCreateT, cfg.Stream, durable);
var resp = await NewRequestAsync(MqttTopics.JsaConsumerCreate, subj, 0, cfgBytes, ct);
return resp as JsApiConsumerCreateResponse;
}
// ------------------------------------------------------------------
// Feature 2277: deleteConsumer
// ------------------------------------------------------------------
/// <summary>
/// Deletes a JetStream consumer. If <paramref name="noWait"/> is <c>true</c>,
/// the request is fire-and-forget.
/// Mirrors Go <c>(*mqttJSA).deleteConsumer()</c>.
/// </summary>
public async Task<JsApiConsumerDeleteResponse?> DeleteConsumerAsync(
string streamName, string consName, bool noWait,
CancellationToken ct = default)
{
string subj = string.Format(JsApiSubjects.JsApiConsumerDeleteT, streamName, consName);
if (noWait)
{
SendMsg(subj, null);
return null;
}
var resp = await NewRequestAsync(MqttTopics.JsaConsumerDel, subj, 0, null, ct);
return resp as JsApiConsumerDeleteResponse;
}
// ------------------------------------------------------------------
// Feature 2278: createStream
// ------------------------------------------------------------------
/// <summary>
/// Creates a JetStream stream.
/// Returns (<see cref="StreamInfo"/>, didCreate).
/// Mirrors Go <c>(*mqttJSA).createStream()</c>.
/// </summary>
public async Task<(StreamInfo? Info, bool DidCreate)> CreateStreamAsync(
StreamConfig cfg, CancellationToken ct = default)
{
var cfgBytes = JsonSerializer.SerializeToUtf8Bytes(cfg);
string subj = string.Format(JsApiSubjects.JsApiStreamCreateT, cfg.Name);
var resp = await NewRequestAsync(MqttTopics.JsaStreamCreate, subj, 0, cfgBytes, ct);
if (resp is JsApiStreamCreateResponse scr)
{
var si = scr.Config != null ? new StreamInfo { Config = scr.Config, State = scr.State } : null;
return (si, scr.DidCreate);
}
return (null, false);
}
// ------------------------------------------------------------------
// Feature 2279: updateStream
// ------------------------------------------------------------------
/// <summary>
/// Updates an existing JetStream stream.
/// Mirrors Go <c>(*mqttJSA).updateStream()</c>.
/// </summary>
public async Task<StreamInfo?> UpdateStreamAsync(
StreamConfig cfg, CancellationToken ct = default)
{
var cfgBytes = JsonSerializer.SerializeToUtf8Bytes(cfg);
string subj = string.Format(JsApiSubjects.JsApiStreamUpdateT, cfg.Name);
var resp = await NewRequestAsync(MqttTopics.JsaStreamUpdate, subj, 0, cfgBytes, ct);
return (resp as JsApiStreamUpdateResponse)?.Info;
}
// ------------------------------------------------------------------
// Feature 2280: lookupStream
// ------------------------------------------------------------------
/// <summary>
/// Looks up a JetStream stream by name.
/// Mirrors Go <c>(*mqttJSA).lookupStream()</c>.
/// </summary>
public async Task<StreamInfo?> LookupStreamAsync(
string name, CancellationToken ct = default)
{
string subj = string.Format(JsApiSubjects.JsApiStreamInfoT, name);
var resp = await NewRequestAsync(MqttTopics.JsaStreamLookup, subj, 0, null, ct);
return (resp as JsApiStreamInfoResponse)?.Info;
}
// ------------------------------------------------------------------
// Feature 2281: deleteStream
// ------------------------------------------------------------------
/// <summary>
/// Deletes a JetStream stream by name.
/// Mirrors Go <c>(*mqttJSA).deleteStream()</c>.
/// </summary>
public async Task<bool> DeleteStreamAsync(
string name, CancellationToken ct = default)
{
string subj = string.Format(JsApiSubjects.JsApiStreamDeleteT, name);
var resp = await NewRequestAsync(MqttTopics.JsaStreamDel, subj, 0, null, ct);
return (resp as JsApiStreamDeleteResponse)?.Success ?? false;
}
// ------------------------------------------------------------------
// Feature 2282: loadLastMsgFor
// ------------------------------------------------------------------
/// <summary>
/// Loads the last message stored for a subject in a stream.
/// Mirrors Go <c>(*mqttJSA).loadLastMsgFor()</c>.
/// </summary>
public async Task<StoredMsg?> LoadLastMsgForAsync(
string streamName, string subject, CancellationToken ct = default)
{
var req = new JsApiMsgGetRequest { LastFor = subject };
var reqBytes = JsonSerializer.SerializeToUtf8Bytes(req);
string subj = string.Format(JsApiSubjects.JsApiMsgGetT, streamName);
var resp = await NewRequestAsync(MqttTopics.JsaMsgLoad, subj, 0, reqBytes, ct);
return (resp as JsApiMsgGetResponse)?.Message;
}
// ------------------------------------------------------------------
// Feature 2283: loadLastMsgForMulti
// ------------------------------------------------------------------
/// <summary>
/// Loads the last message for each of the supplied subjects.
/// Returns a sparse array in the same order as <paramref name="subjects"/>.
/// Mirrors Go <c>(*mqttJSA).loadLastMsgForMulti()</c>.
/// </summary>
public async Task<JsApiMsgGetResponse?[]> LoadLastMsgForMultiAsync(
string streamName, string[] subjects, CancellationToken ct = default)
{
var marshaled = new byte[subjects.Length][];
var headerBytes = new int[subjects.Length];
for (int i = 0; i < subjects.Length; i++)
{
var req = new JsApiMsgGetRequest { LastFor = subjects[i] };
marshaled[i] = JsonSerializer.SerializeToUtf8Bytes(req);
}
string subj = string.Format(JsApiSubjects.JsApiMsgGetT, streamName);
var all = await NewRequestExMultiAsync(
MqttTopics.JsaMsgLoad, subj, string.Empty, headerBytes, marshaled, ct);
var responses = new JsApiMsgGetResponse?[all.Length];
for (int i = 0; i < all.Length; i++)
responses[i] = all[i]?.Value as JsApiMsgGetResponse;
return responses;
}
// ------------------------------------------------------------------
// Feature 2284: loadNextMsgFor
// ------------------------------------------------------------------
/// <summary>
/// Loads the next message after a given subject sequence in a stream.
/// Mirrors Go <c>(*mqttJSA).loadNextMsgFor()</c>.
/// </summary>
public async Task<StoredMsg?> LoadNextMsgForAsync(
string streamName, string subject, CancellationToken ct = default)
{
var req = new JsApiMsgGetRequest { NextFor = subject };
var reqBytes = JsonSerializer.SerializeToUtf8Bytes(req);
string subj = string.Format(JsApiSubjects.JsApiMsgGetT, streamName);
var resp = await NewRequestAsync(MqttTopics.JsaMsgLoad, subj, 0, reqBytes, ct);
return (resp as JsApiMsgGetResponse)?.Message;
}
// ------------------------------------------------------------------
// Feature 2285: loadMsg
// ------------------------------------------------------------------
/// <summary>
/// Loads a specific message by sequence from a stream.
/// Mirrors Go <c>(*mqttJSA).loadMsg()</c>.
/// </summary>
public async Task<StoredMsg?> LoadMsgAsync(
string streamName, ulong seq, CancellationToken ct = default)
{
var req = new JsApiMsgGetRequest { Seq = seq };
var reqBytes = JsonSerializer.SerializeToUtf8Bytes(req);
string subj = string.Format(JsApiSubjects.JsApiMsgGetT, streamName);
var resp = await NewRequestAsync(MqttTopics.JsaMsgLoad, subj, 0, reqBytes, ct);
return (resp as JsApiMsgGetResponse)?.Message;
}
// ------------------------------------------------------------------
// Feature 2286: storeMsgNoWait
// ------------------------------------------------------------------
/// <summary>
/// Queues a message for JetStream storage without waiting for the pub-ack.
/// Mirrors Go <c>(*mqttJSA).storeMsgNoWait()</c>.
/// </summary>
public void StoreMsgNoWait(string subject, int hdrLen, byte[]? msg)
{
SendQ.Writer.TryWrite(new MqttJsPubMsg
{
Subject = subject,
Msg = msg,
Hdr = hdrLen,
});
}
// ------------------------------------------------------------------
// Feature 2287: storeMsg
// ------------------------------------------------------------------
/// <summary>
/// Stores a message in JetStream and waits for the pub-ack.
/// Mirrors Go <c>(*mqttJSA).storeMsg()</c>.
/// </summary>
public async Task<JSPubAckResponse?> StoreMsgAsync(
string subject, int headers, byte[]? msg, CancellationToken ct = default)
{
var resp = await NewRequestAsync(MqttTopics.JsaMsgStore, subject, headers, msg, ct);
return resp as JSPubAckResponse;
}
// ------------------------------------------------------------------
// Feature 2288: storeSessionMsg
// ------------------------------------------------------------------
/// <summary>
/// Stores a session-state message in the MQTT sessions JetStream stream.
/// Mirrors Go <c>(*mqttJSA).storeSessionMsg()</c>.
/// </summary>
public async Task<JSPubAckResponse?> StoreSessionMsgAsync(
string domainTk, string cidHash, int hdr, byte[]? msg, CancellationToken ct = default)
{
string subject = $"{domainTk}{MqttTopics.SessStreamSubjectPrefix}{cidHash}";
var resp = await NewRequestExAsync(
MqttTopics.JsaSessPersist, subject, cidHash, hdr, msg, ct);
return resp as JSPubAckResponse;
}
// ------------------------------------------------------------------
// Feature 2289: loadSessionMsg
// ------------------------------------------------------------------
/// <summary>
/// Loads the most recent session-state message for a client ID hash.
/// Mirrors Go <c>(*mqttJSA).loadSessionMsg()</c>.
/// </summary>
public Task<StoredMsg?> LoadSessionMsgAsync(
string domainTk, string cidHash, CancellationToken ct = default)
{
string streamSubject = $"{domainTk}{MqttTopics.SessStreamSubjectPrefix}{cidHash}";
return LoadLastMsgForAsync(MqttTopics.SessStreamName, streamSubject, ct);
}
// ------------------------------------------------------------------
// Feature 2290: deleteMsg
// ------------------------------------------------------------------
/// <summary>
/// Deletes a message by sequence from a JetStream stream.
/// If <paramref name="wait"/> is <c>false</c>, the request is fire-and-forget.
/// Mirrors Go <c>(*mqttJSA).deleteMsg()</c>.
/// </summary>
public async Task DeleteMsgAsync(
string stream, ulong seq, bool wait, CancellationToken ct = default)
{
var dreq = new JsApiMsgDeleteRequest { Seq = seq, NoErase = true };
var reqBytes = JsonSerializer.SerializeToUtf8Bytes(dreq);
string subj = PrefixDomain(string.Format(JsApiSubjects.JsApiMsgDeleteT, stream));
if (!wait)
{
SendQ.Writer.TryWrite(new MqttJsPubMsg { Subject = subj, Msg = reqBytes });
return;
}
await NewRequestAsync(MqttTopics.JsaMsgDelete, subj, 0, reqBytes, ct);
}
}

View File

@@ -191,6 +191,12 @@ internal sealed class MqttRetainedMsg
/// <summary>Source identifier.</summary>
public string Source { get; set; } = string.Empty;
/// <summary>
/// Cache expiry time for this retained message in the in-memory RmsCache.
/// Not persisted to JetStream.
/// </summary>
public DateTime ExpiresFromCache { get; set; }
}
// ============================================================================
@@ -247,6 +253,28 @@ internal sealed class MqttSession
/// <summary>Domain token (domain with trailing '.', or empty).</summary>
public string DomainTk { get; set; } = string.Empty;
// ------------------------------------------------------------------
// Client link
// ------------------------------------------------------------------
/// <summary>
/// The <see cref="ClientConnection"/> that currently owns this session.
/// Protected by <see cref="Mu"/>. Set to <c>null</c> when the client disconnects.
/// Mirrors Go <c>mqttSession.c *client</c>.
/// </summary>
public ClientConnection? Client { get; set; }
// ------------------------------------------------------------------
// JetStream sequence
// ------------------------------------------------------------------
/// <summary>
/// JetStream stream sequence number of the last persisted session record.
/// Protected by <see cref="Mu"/>.
/// Mirrors Go <c>mqttSession.seq uint64</c>.
/// </summary>
public ulong Seq { get; set; }
// ------------------------------------------------------------------
// Subscriptions
// ------------------------------------------------------------------
@@ -293,86 +321,7 @@ internal sealed class MqttSession
}
// ============================================================================
// JSA stub
// ============================================================================
/// <summary>
/// Stub for the MQTT JetStream API helper.
/// Mirrors Go <c>mqttJSA</c> struct in server/mqtt.go.
/// All methods throw <see cref="NotImplementedException"/> until session 22 is complete.
/// </summary>
internal sealed class MqttJsa
{
/// <summary>Domain (with trailing '.'), or empty.</summary>
public string Domain { get; set; } = string.Empty;
/// <summary>Whether the domain field was explicitly set (even to empty).</summary>
public bool DomainSet { get; set; }
// All methods are stubs — full implementation is deferred to session 22.
public void SendAck(string ackSubject) =>
throw new NotImplementedException("TODO: session 22");
public void SendMsg(string subject, byte[] msg) =>
throw new NotImplementedException("TODO: session 22");
public void StoreMsgNoWait(string subject, int hdrLen, byte[] msg) =>
throw new NotImplementedException("TODO: session 22");
public string PrefixDomain(string subject) =>
throw new NotImplementedException("TODO: session 22");
}
// ============================================================================
// Account session manager stub
// ============================================================================
/// <summary>
/// Per-account MQTT session manager.
/// Mirrors Go <c>mqttAccountSessionManager</c> struct in server/mqtt.go.
/// All mutating methods are stubs.
/// </summary>
internal sealed class MqttAccountSessionManager
{
private readonly Lock _mu = new();
/// <summary>Domain token (domain with trailing '.'), or empty.</summary>
public string DomainTk { get; set; } = string.Empty;
/// <summary>Active sessions keyed by MQTT client ID.</summary>
public Dictionary<string, MqttSession> Sessions { get; } = new();
/// <summary>Sessions keyed by their client ID hash.</summary>
public Dictionary<string, MqttSession> SessionsByHash { get; } = new();
/// <summary>Client IDs that are currently locked (being taken over).</summary>
public HashSet<string> SessionsLocked { get; } = new();
/// <summary>Client IDs that have recently flapped (connected with duplicate ID).</summary>
public Dictionary<string, long> Flappers { get; } = new();
/// <summary>JSA helper for this account.</summary>
public MqttJsa Jsa { get; } = new();
/// <summary>Lock for this manager.</summary>
public Lock Mu => _mu;
// All methods are stubs.
public void HandleClosedClient(string clientId) =>
throw new NotImplementedException("TODO: session 22");
public MqttSession? LookupSession(string clientId) =>
throw new NotImplementedException("TODO: session 22");
public void PersistSession(MqttSession session) =>
throw new NotImplementedException("TODO: session 22");
public void DeleteSession(MqttSession session) =>
throw new NotImplementedException("TODO: session 22");
}
// ============================================================================
// Global session manager stub
// Global session manager
// ============================================================================
/// <summary>
@@ -381,11 +330,11 @@ internal sealed class MqttAccountSessionManager
/// </summary>
internal sealed class MqttSessionManager
{
private readonly Lock _mu = new();
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.NoRecursion);
/// <summary>Per-account session managers keyed by account name.</summary>
public Dictionary<string, MqttAccountSessionManager> Sessions { get; } = new();
/// <summary>Lock for this manager.</summary>
public Lock Mu => _mu;
/// <summary>Read lock for this manager.</summary>
public ReaderWriterLockSlim Mu => _mu;
}