batch37 task3 implement group B direct-get and pipeline

This commit is contained in:
Joseph Doherty
2026-02-28 23:42:47 -05:00
parent f0ea92b8dd
commit 07b494544d
7 changed files with 537 additions and 5 deletions

View File

@@ -21,8 +21,24 @@ namespace ZB.MOM.NatsNet.Server;
// Forward stubs for types defined in later sessions
// ---------------------------------------------------------------------------
/// <summary>Stub: stored message type — full definition in session 20.</summary>
public sealed class StoredMsg { }
/// <summary>Stored message returned by direct-get and message-get APIs.</summary>
public sealed class StoredMsg
{
[JsonPropertyName("subject")]
public string Subject { get; set; } = string.Empty;
[JsonPropertyName("seq")]
public ulong Sequence { get; set; }
[JsonPropertyName("hdrs")]
public byte[]? Header { get; set; }
[JsonPropertyName("data")]
public byte[]? Data { get; set; }
[JsonPropertyName("time")]
public DateTime Time { get; set; }
}
/// <summary>
/// Priority group for pull consumers.

View File

@@ -0,0 +1,246 @@
using System.Text;
using System.Text.Json;
using System.Linq;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal static (string Ttl, bool Ok) GetMessageScheduleTTL(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (string.Empty, true);
var ttl = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTtl, hdr);
if (ttl == null || ttl.Length == 0)
return (string.Empty, true);
var ttlValue = Encoding.ASCII.GetString(ttl);
var (_, err) = ParseMessageTTL(ttlValue);
return err == null ? (ttlValue, true) : (string.Empty, false);
}
internal static string GetMessageScheduleTarget(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTarget, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static string GetMessageScheduleSource(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleSource, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static string GetBatchId(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsBatchId, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static (ulong Seq, bool Exists) GetBatchSequence(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (0, false);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsBatchSeq, hdr);
if (value is null || value.Value.Length == 0)
return (0, false);
var parsed = ServerUtilities.ParseInt64(value.Value.Span);
return parsed < 0 ? (0, false) : ((ulong)parsed, true);
}
public bool IsClustered()
{
_mu.EnterReadLock();
try
{
return IsClusteredInternal();
}
finally
{
_mu.ExitReadLock();
}
}
internal bool IsClusteredInternal() => _node is IRaftNode;
internal void QueueInbound(IpQueue<InMsg> inbound, string subject, string? reply, byte[]? hdr, byte[]? msg, object? sourceInfo, object? trace)
{
_ = sourceInfo;
_ = trace;
var inboundMsg = InMsg.Rent();
inboundMsg.Subject = subject;
inboundMsg.Reply = reply;
inboundMsg.Hdr = hdr;
inboundMsg.Msg = msg;
var (_, error) = inbound.Push(inboundMsg);
if (error != null)
inboundMsg.ReturnToPool();
}
internal JsApiMsgGetResponse ProcessDirectGetRequest(string reply, byte[]? hdr, byte[]? msg)
{
var response = new JsApiMsgGetResponse();
if (string.IsNullOrWhiteSpace(reply))
return response;
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
return response;
}
if (msg == null || msg.Length == 0)
{
response.Error = JsApiErrors.NewJSBadRequestError();
return response;
}
JsApiMsgGetRequest? request;
try
{
request = JsonSerializer.Deserialize<JsApiMsgGetRequest>(msg);
}
catch (JsonException)
{
response.Error = JsApiErrors.NewJSBadRequestError();
return response;
}
if (request == null)
{
response.Error = JsApiErrors.NewJSBadRequestError();
return response;
}
return GetDirectRequest(request, reply);
}
internal JsApiMsgGetResponse ProcessDirectGetLastBySubjectRequest(string subject, string reply, byte[]? hdr, byte[]? msg)
{
var response = new JsApiMsgGetResponse();
if (string.IsNullOrWhiteSpace(reply))
return response;
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
return response;
}
var request = msg == null || msg.Length == 0
? new JsApiMsgGetRequest()
: JsonSerializer.Deserialize<JsApiMsgGetRequest>(msg) ?? new JsApiMsgGetRequest();
var key = ExtractDirectGetLastBySubjectKey(subject);
if (string.IsNullOrEmpty(key))
{
response.Error = JsApiErrors.NewJSBadRequestError();
return response;
}
request.LastFor = key;
return GetDirectRequest(request, reply);
}
internal JsApiMsgGetResponse GetDirectMulti(JsApiMsgGetRequest request, string reply)
{
_ = reply;
if (Store == null)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
var filters = request.MultiLastFor ?? [];
if (filters.Length == 0)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
var (seqs, error) = Store.MultiLastSeqs(filters, request.UpToSeq, 1024);
if (error != null || seqs.Length == 0)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
var firstSeq = seqs[0];
var loaded = Store.LoadMsg(firstSeq, new StoreMsg());
if (loaded == null)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) };
}
internal JsApiMsgGetResponse GetDirectRequest(JsApiMsgGetRequest request, string reply)
{
_ = reply;
if (request.MultiLastFor is { Length: > 0 })
return GetDirectMulti(request, reply);
if (Store == null)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
StoreMsg? loaded = null;
if (request.Seq > 0)
{
loaded = Store.LoadMsg(request.Seq, new StoreMsg());
}
else if (!string.IsNullOrWhiteSpace(request.LastFor))
{
loaded = Store.LoadLastMsg(request.LastFor!, new StoreMsg());
}
else if (!string.IsNullOrWhiteSpace(request.NextFor))
{
var (sm, _) = Store.LoadNextMsg(request.NextFor!, SubscriptionIndex.SubjectHasWildcard(request.NextFor!), request.Seq, new StoreMsg());
loaded = sm;
}
else if (request.StartTime.HasValue)
{
var seq = Store.GetSeqFromTime(request.StartTime.Value);
if (seq > 0)
loaded = Store.LoadMsg(seq, new StoreMsg());
}
if (loaded == null)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) };
}
private static StoredMsg ToStoredMsg(StoreMsg loaded) => new()
{
Subject = loaded.Subject,
Sequence = loaded.Seq,
Header = loaded.Hdr,
Data = loaded.Msg,
Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime,
};
private static string? GetRequiredApiLevelHeader(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return null;
var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr);
return value == null || value.Length == 0 ? null : Encoding.ASCII.GetString(value);
}
private static string ExtractDirectGetLastBySubjectKey(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return string.Empty;
var parts = subject.Split('.');
return parts.Length <= 5 ? string.Empty : string.Join('.', parts.Skip(5));
}
}

View File

@@ -0,0 +1,98 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal Exception? ProcessInboundJetStreamMsg(InMsg? msg)
{
if (msg == null)
return new ArgumentNullException(nameof(msg));
try
{
return ProcessJetStreamMsg(
msg.Subject,
msg.Reply ?? string.Empty,
msg.Hdr,
msg.Msg,
lseq: 0,
ts: 0,
msgTrace: null,
sourced: false,
canRespond: true);
}
finally
{
msg.ReturnToPool();
}
}
internal Exception? ProcessJetStreamMsg(
string subject,
string reply,
byte[]? hdr,
byte[]? msg,
ulong lseq,
long ts,
object? msgTrace,
bool sourced,
bool canRespond)
{
_ = reply;
_ = msgTrace;
_ = sourced;
_ = canRespond;
if (string.IsNullOrWhiteSpace(subject))
return new ArgumentException("subject is required", nameof(subject));
if (Store == null)
return new InvalidOperationException("store not initialized");
var batchId = GetBatchId(hdr);
if (!string.IsNullOrEmpty(batchId))
return ProcessJetStreamBatchMsg(batchId, subject, reply, hdr, msg, msgTrace);
try
{
var (seq, _) = Store.StoreMsg(subject, hdr, msg, ttl: 0);
if (lseq > 0)
seq = lseq;
if (ts == 0)
ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
Interlocked.Exchange(ref LastSeq, (long)seq);
return null;
}
catch (Exception ex)
{
return ex;
}
}
internal Exception? ProcessJetStreamBatchMsg(string batchId, string subject, string reply, byte[]? hdr, byte[]? msg, object? msgTrace)
{
_ = reply;
_ = msgTrace;
if (string.IsNullOrWhiteSpace(batchId))
return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishInvalidBatchIDError().ToString());
var (_, exists) = GetBatchSequence(hdr);
if (!exists)
return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishMissingSeqError().ToString());
if (Store == null)
return new InvalidOperationException("store not initialized");
try
{
Store.StoreMsg(subject, hdr, msg, ttl: 0);
return null;
}
catch (Exception ex)
{
return ex;
}
}
}

View File

@@ -0,0 +1,20 @@
using System.Collections.Concurrent;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class InMsg
{
private static readonly ConcurrentBag<InMsg> Pool = new();
internal static InMsg Rent() => Pool.TryTake(out var msg) ? msg : new InMsg();
internal void ReturnToPool()
{
Subject = string.Empty;
Reply = null;
Hdr = null;
Msg = null;
Client = null;
Pool.Add(this);
}
}

View File

@@ -227,7 +227,7 @@ public sealed class JsStreamPubMsg
/// A JetStream publish message with sync tracking.
/// Mirrors <c>jsPubMsg</c> in server/stream.go.
/// </summary>
public sealed class JsPubMsg
public sealed partial class JsPubMsg
{
public string Subject { get; set; } = string.Empty;
public string? Reply { get; set; }
@@ -245,7 +245,7 @@ public sealed class JsPubMsg
/// An inbound message to be processed by the JetStream layer.
/// Mirrors <c>inMsg</c> in server/stream.go.
/// </summary>
public sealed class InMsg
public sealed partial class InMsg
{
public string Subject { get; set; } = string.Empty;
public string? Reply { get; set; }
@@ -277,7 +277,7 @@ public sealed class InMsg
/// A cached/clustered message for replication.
/// Mirrors <c>cMsg</c> in server/stream.go.
/// </summary>
public sealed class CMsg
public sealed partial class CMsg
{
public string Subject { get; set; } = string.Empty;
public byte[]? Msg { get; set; }