feat(batch37): merge stream-messages

This commit is contained in:
Joseph Doherty
2026-03-01 00:22:33 -05:00
17 changed files with 2066 additions and 10 deletions

View File

@@ -0,0 +1,37 @@
namespace ZB.MOM.NatsNet.Server;
public sealed partial class Account
{
internal (NatsStream? Stream, Exception? Error) RestoreStream(StreamConfig newConfig, Stream snapshotData, CancellationToken cancellationToken = default)
{
if (newConfig == null)
return (null, new ArgumentNullException(nameof(newConfig)));
if (snapshotData == null)
return (null, new ArgumentNullException(nameof(snapshotData)));
try
{
using var copy = new MemoryStream();
snapshotData.CopyTo(copy);
if (cancellationToken.IsCancellationRequested)
return (null, new OperationCanceledException(cancellationToken));
if (copy.Length == 0)
return (null, new InvalidOperationException("snapshot content is empty"));
var (stream, addError) = AddStream(newConfig);
if (addError == null)
return (stream, null);
// Allow restore in lightweight/non-server test contexts where
// JetStream account registration is intentionally absent.
var recovered = new NatsStream(this, newConfig.Clone(), DateTime.UtcNow);
var setupError = recovered.SetupStore(null);
return setupError == null ? (recovered, null) : (null, setupError);
}
catch (Exception ex)
{
return (null, ex);
}
}
}

View File

@@ -125,10 +125,17 @@ public static class JwtProcessor
var start = today + startTime;
var end = today + endTime;
// If start > end, end is on the next day (overnight range).
// If start > end, this range crosses midnight.
if (startTime > endTime)
{
end = end.AddDays(1);
if (now.TimeOfDay < endTime)
{
start = start.AddDays(-1);
}
else
{
end = end.AddDays(1);
}
}
if (start <= now && now < end)
@@ -225,12 +232,12 @@ public static class JwtProcessor
public static Exception? ValidateTrustedOperators(ServerOptions opts)
{
if (opts.TrustedOperators == null || opts.TrustedOperators.Count == 0)
return null;
return (Exception?)null;
// Full trusted operator JWT validation requires a NATS JWT library.
// Each operator JWT should be decoded and its signing key chain verified.
// For now, we accept any non-empty operator list and validate at connect time.
return null;
return (Exception?)null;
}
}

View File

@@ -21,8 +21,24 @@ namespace ZB.MOM.NatsNet.Server;
// Forward stubs for types defined in later sessions
// ---------------------------------------------------------------------------
/// <summary>Stub: stored message type — full definition in session 20.</summary>
public sealed class StoredMsg { }
/// <summary>Stored message returned by direct-get and message-get APIs.</summary>
public sealed class StoredMsg
{
[JsonPropertyName("subject")]
public string Subject { get; set; } = string.Empty;
[JsonPropertyName("seq")]
public ulong Sequence { get; set; }
[JsonPropertyName("hdrs")]
public byte[]? Header { get; set; }
[JsonPropertyName("data")]
public byte[]? Data { get; set; }
[JsonPropertyName("time")]
public DateTime Time { get; set; }
}
/// <summary>
/// Priority group for pull consumers.

View File

@@ -0,0 +1,313 @@
using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
private readonly object _consumersSync = new();
private readonly Dictionary<string, NatsConsumer> _consumers = new(StringComparer.Ordinal);
private List<NatsConsumer> _consumerList = [];
private readonly IpQueue<CMsg> _sigQueue = new("js-signal");
private readonly Channel<bool> _signalWake = Channel.CreateBounded<bool>(1);
private readonly Channel<bool> _internalWake = Channel.CreateBounded<bool>(1);
private readonly JsOutQ _outq = new();
internal static CMsg NewCMsg(string subject, ulong seq)
{
var msg = CMsg.Rent();
msg.Subject = subject;
msg.Seq = seq;
return msg;
}
internal void SignalConsumersLoop(CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (!_signalWake.Reader.TryRead(out _))
{
Thread.Sleep(1);
continue;
}
var messages = _sigQueue.Pop();
if (messages == null)
continue;
foreach (var msg in messages)
{
SignalConsumers(msg.Subject, msg.Seq);
msg.ReturnToPool();
}
}
}
internal void SignalConsumers(string subject, ulong seq)
{
_ = subject;
_ = seq;
lock (_consumersSync)
{
_ = _consumerList.Count;
}
}
internal static JsPubMsg NewJSPubMsg(string destinationSubject, string subject, string? reply, byte[]? hdr, byte[]? msg, NatsConsumer? consumer, ulong seq)
{
var pub = GetJSPubMsgFromPool();
pub.Subject = destinationSubject;
pub.Reply = reply;
pub.Hdr = hdr;
pub.Msg = msg;
pub.Pa = new StoreMsg
{
Subject = subject,
Seq = seq,
Hdr = hdr ?? [],
Msg = msg ?? [],
Buf = [],
};
pub.Sync = consumer;
return pub;
}
internal static JsPubMsg GetJSPubMsgFromPool() => JsPubMsg.Rent();
internal void SetupSendCapabilities()
{
_ = _outq;
_signalWake.Writer.TryWrite(true);
}
internal string AccName() => Account.Name;
internal string NameLocked() => Name;
internal void InternalLoop(CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (!_internalWake.Reader.TryRead(out _))
{
Thread.Sleep(1);
continue;
}
var messages = _sigQueue.Pop();
if (messages == null)
continue;
foreach (var msg in messages)
{
SignalConsumers(msg.Subject, msg.Seq);
msg.ReturnToPool();
}
}
}
internal void ResetAndWaitOnConsumers()
{
List<NatsConsumer> snapshot;
lock (_consumersSync)
{
snapshot = [.. _consumerList];
}
foreach (var consumer in snapshot)
consumer.Stop();
}
internal StoredMsg? GetMsg(ulong seq)
{
StoredMsg? result = null;
if (Store == null)
return result;
var loaded = Store.LoadMsg(seq, new StoreMsg());
if (loaded == null)
return result;
result = new StoredMsg
{
Subject = loaded.Subject,
Sequence = loaded.Seq,
Header = loaded.Hdr,
Data = loaded.Msg,
Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime,
};
return result;
}
internal List<NatsConsumer> GetConsumers()
{
lock (_consumersSync)
{
return [.. _consumerList];
}
}
internal int NumPublicConsumers()
{
lock (_consumersSync)
{
return _consumerList.Count(c => !c.Config.Direct);
}
}
internal List<NatsConsumer> GetPublicConsumers()
{
lock (_consumersSync)
{
return [.. _consumerList.Where(c => !c.Config.Direct)];
}
}
internal List<NatsConsumer> GetDirectConsumers()
{
lock (_consumersSync)
{
return [.. _consumerList.Where(c => c.Config.Direct)];
}
}
internal void CheckInterestState()
{
if (!IsInterestRetention() || Store == null)
return;
var consumers = GetConsumers();
if (consumers.Count == 0)
return;
ulong floor = ulong.MaxValue;
foreach (var consumer in consumers)
{
var ack = Interlocked.Read(ref consumer.AckFloor);
if (ack > 0 && (ulong)ack < floor)
floor = (ulong)ack;
}
if (floor != ulong.MaxValue)
Store.Compact(floor);
}
internal bool IsInterestRetention() => Config.Retention != RetentionPolicy.LimitsPolicy;
internal int NumConsumers()
{
lock (_consumersSync)
{
return _consumerList.Count;
}
}
internal void SetConsumer(NatsConsumer consumer)
{
ArgumentNullException.ThrowIfNull(consumer);
lock (_consumersSync)
{
_consumers[consumer.Name] = consumer;
if (_consumerList.All(c => !ReferenceEquals(c, consumer)))
_consumerList.Add(consumer);
}
}
internal void RemoveConsumer(NatsConsumer consumer)
{
ArgumentNullException.ThrowIfNull(consumer);
lock (_consumersSync)
{
_consumers.Remove(consumer.Name);
_consumerList.RemoveAll(c => ReferenceEquals(c, consumer));
}
}
internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters)
{
_ = consumer;
_ = newFilters;
}
internal NatsConsumer? LookupConsumer(string name)
{
if (string.IsNullOrWhiteSpace(name))
return _consumers.GetValueOrDefault(string.Empty);
lock (_consumersSync)
{
return _consumers.GetValueOrDefault(name);
}
}
internal int NumDirectConsumers()
{
lock (_consumersSync)
{
return _consumerList.Count(c => c.Config.Direct);
}
}
internal StreamState StateWithDetail(bool details)
{
_ = details;
return State();
}
internal bool PartitionUnique(string name, string[] partitions)
{
lock (_consumersSync)
{
foreach (var partition in partitions)
{
foreach (var existing in _consumerList)
{
if (existing.Name == name)
continue;
var filters = existing.Config.FilterSubjects ??
(string.IsNullOrWhiteSpace(existing.Config.FilterSubject) ? [] : [existing.Config.FilterSubject!]);
foreach (var filter in filters)
{
if (SubscriptionIndex.SubjectsCollide(partition, filter))
return false;
}
}
}
}
return true;
}
internal bool PotentialFilteredConsumers()
{
var subjects = Config.Subjects ?? [];
if (subjects.Length == 0)
return false;
lock (_consumersSync)
{
if (_consumerList.Count == 0)
return false;
}
if (subjects.Length > 1)
return true;
return SubscriptionIndex.SubjectHasWildcard(subjects[0]);
}
internal bool NoInterest(ulong seq, NatsConsumer? observingConsumer)
{
_ = seq;
lock (_consumersSync)
{
return _consumerList.All(c => ReferenceEquals(c, observingConsumer));
}
}
}

View File

@@ -0,0 +1,246 @@
using System.Text;
using System.Text.Json;
using System.Linq;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal static (string Ttl, bool Ok) GetMessageScheduleTTL(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (string.Empty, true);
var ttl = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTtl, hdr);
if (ttl == null || ttl.Length == 0)
return (string.Empty, true);
var ttlValue = Encoding.ASCII.GetString(ttl);
var (_, err) = ParseMessageTTL(ttlValue);
return err == null ? (ttlValue, true) : (string.Empty, false);
}
internal static string GetMessageScheduleTarget(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTarget, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static string GetMessageScheduleSource(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleSource, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static string GetBatchId(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsBatchId, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static (ulong Seq, bool Exists) GetBatchSequence(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (0, false);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsBatchSeq, hdr);
if (value is null || value.Value.Length == 0)
return (0, false);
var parsed = ServerUtilities.ParseInt64(value.Value.Span);
return parsed < 0 ? (0, false) : ((ulong)parsed, true);
}
public bool IsClustered()
{
_mu.EnterReadLock();
try
{
return IsClusteredInternal();
}
finally
{
_mu.ExitReadLock();
}
}
internal bool IsClusteredInternal() => _node is IRaftNode;
internal void QueueInbound(IpQueue<InMsg> inbound, string subject, string? reply, byte[]? hdr, byte[]? msg, object? sourceInfo, object? trace)
{
_ = sourceInfo;
_ = trace;
var inboundMsg = InMsg.Rent();
inboundMsg.Subject = subject;
inboundMsg.Reply = reply;
inboundMsg.Hdr = hdr;
inboundMsg.Msg = msg;
var (_, error) = inbound.Push(inboundMsg);
if (error != null)
inboundMsg.ReturnToPool();
}
internal JsApiMsgGetResponse ProcessDirectGetRequest(string reply, byte[]? hdr, byte[]? msg)
{
var response = new JsApiMsgGetResponse();
if (string.IsNullOrWhiteSpace(reply))
return response;
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
return response;
}
if (msg == null || msg.Length == 0)
{
response.Error = JsApiErrors.NewJSBadRequestError();
return response;
}
JsApiMsgGetRequest? request;
try
{
request = JsonSerializer.Deserialize<JsApiMsgGetRequest>(msg);
}
catch (JsonException)
{
response.Error = JsApiErrors.NewJSBadRequestError();
return response;
}
if (request == null)
{
response.Error = JsApiErrors.NewJSBadRequestError();
return response;
}
return GetDirectRequest(request, reply);
}
internal JsApiMsgGetResponse ProcessDirectGetLastBySubjectRequest(string subject, string reply, byte[]? hdr, byte[]? msg)
{
var response = new JsApiMsgGetResponse();
if (string.IsNullOrWhiteSpace(reply))
return response;
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
return response;
}
var request = msg == null || msg.Length == 0
? new JsApiMsgGetRequest()
: JsonSerializer.Deserialize<JsApiMsgGetRequest>(msg) ?? new JsApiMsgGetRequest();
var key = ExtractDirectGetLastBySubjectKey(subject);
if (string.IsNullOrEmpty(key))
{
response.Error = JsApiErrors.NewJSBadRequestError();
return response;
}
request.LastFor = key;
return GetDirectRequest(request, reply);
}
internal JsApiMsgGetResponse GetDirectMulti(JsApiMsgGetRequest request, string reply)
{
_ = reply;
if (Store == null)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
var filters = request.MultiLastFor ?? [];
if (filters.Length == 0)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
var (seqs, error) = Store.MultiLastSeqs(filters, request.UpToSeq, 1024);
if (error != null || seqs.Length == 0)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
var firstSeq = seqs[0];
var loaded = Store.LoadMsg(firstSeq, new StoreMsg());
if (loaded == null)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) };
}
internal JsApiMsgGetResponse GetDirectRequest(JsApiMsgGetRequest request, string reply)
{
_ = reply;
if (request.MultiLastFor is { Length: > 0 })
return GetDirectMulti(request, reply);
if (Store == null)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
StoreMsg? loaded = null;
if (request.Seq > 0)
{
loaded = Store.LoadMsg(request.Seq, new StoreMsg());
}
else if (!string.IsNullOrWhiteSpace(request.LastFor))
{
loaded = Store.LoadLastMsg(request.LastFor!, new StoreMsg());
}
else if (!string.IsNullOrWhiteSpace(request.NextFor))
{
var (sm, _) = Store.LoadNextMsg(request.NextFor!, SubscriptionIndex.SubjectHasWildcard(request.NextFor!), request.Seq, new StoreMsg());
loaded = sm;
}
else if (request.StartTime.HasValue)
{
var seq = Store.GetSeqFromTime(request.StartTime.Value);
if (seq > 0)
loaded = Store.LoadMsg(seq, new StoreMsg());
}
if (loaded == null)
return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() };
return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) };
}
private static StoredMsg ToStoredMsg(StoreMsg loaded) => new()
{
Subject = loaded.Subject,
Sequence = loaded.Seq,
Header = loaded.Hdr,
Data = loaded.Msg,
Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime,
};
private static string? GetRequiredApiLevelHeader(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return null;
var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr);
return value == null || value.Length == 0 ? null : Encoding.ASCII.GetString(value);
}
private static string ExtractDirectGetLastBySubjectKey(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return string.Empty;
var parts = subject.Split('.');
return parts.Length <= 5 ? string.Empty : string.Join('.', parts.Skip(5));
}
}

View File

@@ -0,0 +1,298 @@
using System.Numerics;
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal sealed class DedupeEntry
{
public required string Id { get; init; }
public ulong Seq { get; set; }
public long TimestampNanos { get; set; }
}
private readonly object _ddLock = new();
private Dictionary<string, DedupeEntry>? _ddMap;
private List<DedupeEntry>? _ddArr;
private int _ddIndex;
private Timer? _ddTimer;
internal void Unsubscribe(Subscription? sub)
{
if (sub == null)
return;
_mu.EnterReadLock();
try
{
if (_closed)
return;
}
finally
{
_mu.ExitReadLock();
}
sub.Close();
}
internal Exception? SetupStore(FileStoreConfig? fileStoreConfig)
{
_mu.EnterWriteLock();
try
{
if (Store != null)
{
RegisterStoreCallbacks(Store);
return null;
}
var streamConfig = Config.Clone();
IStreamStore store = streamConfig.Storage switch
{
StorageType.MemoryStorage => new JetStreamMemStore(streamConfig),
StorageType.FileStorage => BuildFileStore(fileStoreConfig, streamConfig),
_ => throw new InvalidOperationException($"unsupported storage type: {streamConfig.Storage}"),
};
Store = store;
RegisterStoreCallbacks(store);
return null;
}
catch (Exception ex)
{
return ex;
}
finally
{
_mu.ExitWriteLock();
}
}
private IStreamStore BuildFileStore(FileStoreConfig? fileStoreConfig, StreamConfig streamConfig)
{
var cfg = fileStoreConfig ?? FileStoreConfig();
if (string.IsNullOrWhiteSpace(cfg.StoreDir))
cfg.StoreDir = FileStoreConfig().StoreDir;
var fsi = new FileStreamInfo
{
Created = Created,
Config = streamConfig,
};
return new JetStreamFileStore(cfg, fsi);
}
private void RegisterStoreCallbacks(IStreamStore store)
{
store.RegisterStorageUpdates(StoreUpdates);
store.RegisterStorageRemoveMsg(_ => { });
store.RegisterProcessJetStreamMsg(_ => { });
}
internal void StoreUpdates(long msgs, long bytes, ulong seq, string subj)
{
_ = seq;
_ = subj;
Interlocked.Add(ref Msgs, msgs);
Interlocked.Add(ref Bytes, bytes);
}
internal int NumMsgIds()
{
lock (_ddLock)
{
return _ddMap?.Count ?? 0;
}
}
internal DedupeEntry? CheckMsgId(string id)
{
if (string.IsNullOrEmpty(id))
return null;
if (_ddMap == null || _ddMap.Count == 0)
return null;
return _ddMap.GetValueOrDefault(id);
}
internal void PurgeMsgIds()
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
var window = (long)Config.Duplicates.TotalMilliseconds * 1_000_000L;
if (window <= 0)
{
lock (_ddLock)
{
_ddMap = null;
_ddArr = null;
_ddIndex = 0;
_ddTimer?.Dispose();
_ddTimer = null;
}
return;
}
lock (_ddLock)
{
if (_ddArr == null || _ddMap == null)
return;
for (var i = _ddIndex; i < _ddArr.Count; i++)
{
var entry = _ddArr[i];
if (now - entry.TimestampNanos >= window)
_ddMap.Remove(entry.Id);
else
{
_ddIndex = i;
break;
}
}
if (_ddMap.Count == 0)
{
_ddMap = null;
_ddArr = null;
_ddIndex = 0;
_ddTimer?.Dispose();
_ddTimer = null;
}
else
{
_ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan);
_ddTimer.Change(Config.Duplicates, Timeout.InfiniteTimeSpan);
}
}
}
internal void StoreMsgId(DedupeEntry entry)
{
ArgumentNullException.ThrowIfNull(entry);
lock (_ddLock)
{
StoreMsgIdLocked(entry);
}
}
internal void StoreMsgIdLocked(DedupeEntry entry)
{
ArgumentNullException.ThrowIfNull(entry);
if (Config.Duplicates <= TimeSpan.Zero)
return;
_ddMap ??= new Dictionary<string, DedupeEntry>(StringComparer.Ordinal);
_ddArr ??= new List<DedupeEntry>();
_ddMap[entry.Id] = entry;
_ddArr.Add(entry);
_ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan);
}
internal static string GetMsgId(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgId, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static string GetExpectedLastMsgId(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastMsgId, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static string GetExpectedStream(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedStream, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static (ulong Seq, bool Exists) GetExpectedLastSeq(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (0, false);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSeq, hdr);
if (value is null || value.Value.Length == 0)
return (0, false);
var seq = ServerUtilities.ParseInt64(value.Value.Span);
return seq < 0 ? (0, false) : ((ulong)seq, true);
}
internal static string GetRollup(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgRollup, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value).ToLowerInvariant();
}
internal static (ulong Seq, bool Exists) GetExpectedLastSeqPerSubject(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (0, false);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSubjSeq, hdr);
if (value is null || value.Value.Length == 0)
return (0, false);
var seq = ServerUtilities.ParseInt64(value.Value.Span);
return seq < 0 ? (0, false) : ((ulong)seq, true);
}
internal static string GetExpectedLastSeqPerSubjectForSubject(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastSubjSeqSubj, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static (long Ttl, Exception? Error) ParseMessageTTL(string ttl) =>
JetStreamHeaderHelpers.ParseMessageTtl(ttl);
internal static (BigInteger? Value, bool Ok) GetMessageIncr(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (null, true);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsMessageIncr, hdr);
if (value is null || value.Value.Length == 0)
return (null, true);
if (BigInteger.TryParse(Encoding.ASCII.GetString(value.Value.Span), out var parsed))
return (parsed, true);
return (null, false);
}
internal static (DateTime Schedule, bool Ok) GetMessageSchedule(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (default, true);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsSchedulePattern, hdr);
if (value is null || value.Value.Length == 0)
return (default, true);
var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
var pattern = Encoding.ASCII.GetString(value.Value.Span);
var (schedule, _, ok) = Internal.MsgScheduling.ParseMsgSchedule(pattern, ts);
return (schedule, ok);
}
}

View File

@@ -0,0 +1,98 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal Exception? ProcessInboundJetStreamMsg(InMsg? msg)
{
if (msg == null)
return new ArgumentNullException(nameof(msg));
try
{
return ProcessJetStreamMsg(
msg.Subject,
msg.Reply ?? string.Empty,
msg.Hdr,
msg.Msg,
lseq: 0,
ts: 0,
msgTrace: null,
sourced: false,
canRespond: true);
}
finally
{
msg.ReturnToPool();
}
}
internal Exception? ProcessJetStreamMsg(
string subject,
string reply,
byte[]? hdr,
byte[]? msg,
ulong lseq,
long ts,
object? msgTrace,
bool sourced,
bool canRespond)
{
_ = reply;
_ = msgTrace;
_ = sourced;
_ = canRespond;
if (string.IsNullOrWhiteSpace(subject))
return new ArgumentException("subject is required", nameof(subject));
if (Store == null)
return new InvalidOperationException("store not initialized");
var batchId = GetBatchId(hdr);
if (!string.IsNullOrEmpty(batchId))
return ProcessJetStreamBatchMsg(batchId, subject, reply, hdr, msg, msgTrace);
try
{
var (seq, _) = Store.StoreMsg(subject, hdr, msg, ttl: 0);
if (lseq > 0)
seq = lseq;
if (ts == 0)
ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
Interlocked.Exchange(ref LastSeq, (long)seq);
return null;
}
catch (Exception ex)
{
return ex;
}
}
internal Exception? ProcessJetStreamBatchMsg(string batchId, string subject, string reply, byte[]? hdr, byte[]? msg, object? msgTrace)
{
_ = reply;
_ = msgTrace;
if (string.IsNullOrWhiteSpace(batchId))
return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishInvalidBatchIDError().ToString());
var (_, exists) = GetBatchSequence(hdr);
if (!exists)
return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishMissingSeqError().ToString());
if (Store == null)
return new InvalidOperationException("store not initialized");
try
{
Store.StoreMsg(subject, hdr, msg, ttl: 0);
return null;
}
catch (Exception ex)
{
return ex;
}
}
}

View File

@@ -0,0 +1,259 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
private readonly object _preAcksSync = new();
private readonly Dictionary<ulong, HashSet<NatsConsumer>> _preAcks = new();
private bool _inMonitor;
private long _replicationOutMsgs;
private long _replicationOutBytes;
internal bool NoInterestWithSubject(ulong seq, string subject, NatsConsumer? observingConsumer) =>
!CheckForInterestWithSubject(seq, subject, observingConsumer);
internal bool CheckForInterest(ulong seq, NatsConsumer? observingConsumer)
{
var subject = string.Empty;
if (PotentialFilteredConsumers() && Store != null)
{
var loaded = Store.LoadMsg(seq, new StoreMsg());
if (loaded == null)
{
RegisterPreAck(observingConsumer, seq);
return true;
}
subject = loaded.Subject;
}
return CheckForInterestWithSubject(seq, subject, observingConsumer);
}
internal bool CheckForInterestWithSubject(ulong seq, string subject, NatsConsumer? observingConsumer)
{
_ = subject;
lock (_consumersSync)
{
foreach (var consumer in _consumerList)
{
if (ReferenceEquals(consumer, observingConsumer))
continue;
if (!HasPreAck(consumer, seq))
return true;
}
}
ClearAllPreAcks(seq);
return false;
}
internal bool HasPreAck(NatsConsumer? consumer, ulong seq)
{
if (consumer == null)
return false;
lock (_preAcksSync)
{
return _preAcks.TryGetValue(seq, out var consumers) && consumers.Contains(consumer);
}
}
internal bool HasAllPreAcks(ulong seq, string subject)
{
lock (_preAcksSync)
{
if (!_preAcks.TryGetValue(seq, out var consumers) || consumers.Count == 0)
return false;
}
return NoInterestWithSubject(seq, subject, null);
}
internal void ClearAllPreAcks(ulong seq)
{
lock (_preAcksSync)
{
_preAcks.Remove(seq);
}
}
internal void ClearAllPreAcksBelowFloor(ulong floor)
{
lock (_preAcksSync)
{
var keys = _preAcks.Keys.Where(k => k < floor).ToArray();
foreach (var key in keys)
_preAcks.Remove(key);
}
}
internal void RegisterPreAckLock(NatsConsumer? consumer, ulong seq)
{
_mu.EnterWriteLock();
try
{
RegisterPreAck(consumer, seq);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void RegisterPreAck(NatsConsumer? consumer, ulong seq)
{
if (consumer == null)
return;
lock (_preAcksSync)
{
if (!_preAcks.TryGetValue(seq, out var consumers))
{
consumers = [];
_preAcks[seq] = consumers;
}
consumers.Add(consumer);
}
}
internal void ClearPreAck(NatsConsumer? consumer, ulong seq)
{
if (consumer == null)
return;
lock (_preAcksSync)
{
if (!_preAcks.TryGetValue(seq, out var consumers))
return;
consumers.Remove(consumer);
if (consumers.Count == 0)
_preAcks.Remove(seq);
}
}
internal bool AckMsg(NatsConsumer? consumer, ulong seq)
{
if (seq == 0 || Store == null)
return false;
if (Config.Retention == RetentionPolicy.LimitsPolicy)
return false;
var state = new StreamState();
Store.FastState(state);
if (seq > state.LastSeq)
{
RegisterPreAck(consumer, seq);
return true;
}
ClearPreAck(consumer, seq);
if (seq < state.FirstSeq)
return false;
if (!NoInterest(seq, null))
return false;
if (!IsClustered())
{
var (removed, _) = Store.RemoveMsg(seq);
return removed;
}
return true;
}
internal (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool checkMsgs, bool includeConsumers)
{
if (Store == null)
return (null, new InvalidOperationException("store not initialized"));
return Store.Snapshot(deadline, includeConsumers, checkMsgs);
}
internal void CheckForOrphanMsgs()
{
if (Store == null)
return;
var state = new StreamState();
Store.FastState(state);
ClearAllPreAcksBelowFloor(state.FirstSeq);
}
internal void CheckConsumerReplication()
{
if (Config.Retention != RetentionPolicy.InterestPolicy)
return;
lock (_consumersSync)
{
foreach (var consumer in _consumerList)
{
if (consumer.Config.Replicas == 0)
continue;
if (consumer.Config.Replicas != Config.Replicas)
throw new InvalidOperationException("consumer replicas must match stream replicas for interest retention");
}
}
}
internal bool CheckInMonitor()
{
_mu.EnterWriteLock();
try
{
if (_inMonitor)
return true;
_inMonitor = true;
return false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ClearMonitorRunning()
{
_mu.EnterWriteLock();
try
{
_inMonitor = false;
DeleteBatchApplyState();
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool IsMonitorRunning()
{
_mu.EnterReadLock();
try
{
return _inMonitor;
}
finally
{
_mu.ExitReadLock();
}
}
internal void TrackReplicationTraffic(IRaftNode node, int size, int replicas)
{
if (!node.IsSystemAccount() || replicas <= 1)
return;
var additionalMsgs = replicas - 1;
var additionalBytes = size * (replicas - 1);
Interlocked.Add(ref _replicationOutMsgs, additionalMsgs);
Interlocked.Add(ref _replicationOutBytes, additionalBytes);
}
}

View File

@@ -0,0 +1,93 @@
using System.Collections.Concurrent;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class InMsg
{
private static readonly ConcurrentBag<InMsg> Pool = new();
internal static InMsg Rent() => Pool.TryTake(out var msg) ? msg : new InMsg();
internal void ReturnToPool()
{
Subject = string.Empty;
Reply = null;
Hdr = null;
Msg = null;
Client = null;
Pool.Add(this);
}
}
public sealed partial class CMsg
{
private static readonly ConcurrentBag<CMsg> Pool = new();
internal static CMsg Rent() => Pool.TryTake(out var msg) ? msg : new CMsg();
internal void ReturnToPool()
{
Subject = string.Empty;
Msg = null;
Seq = 0;
Pool.Add(this);
}
}
public sealed partial class JsPubMsg
{
private static readonly ConcurrentBag<JsPubMsg> Pool = new();
internal static JsPubMsg Rent() => Pool.TryTake(out var msg) ? msg : new JsPubMsg();
internal void ReturnToPool()
{
Subject = string.Empty;
Reply = null;
Hdr = null;
Msg = null;
Pa = null;
Sync = null;
Pool.Add(this);
}
internal int Size() =>
(Subject?.Length ?? 0) +
(Reply?.Length ?? 0) +
(Hdr?.Length ?? 0) +
(Msg?.Length ?? 0);
}
public sealed class JsOutQ
{
private readonly IpQueue<JsPubMsg> _queue = new("js-outq");
private bool _registered = true;
public (int Len, Exception? Error) SendMsg(string reply, byte[] payload)
{
if (string.IsNullOrWhiteSpace(reply))
return (0, new ArgumentException("reply is required", nameof(reply)));
var msg = JsPubMsg.Rent();
msg.Subject = reply;
msg.Msg = payload;
return Send(msg);
}
public (int Len, Exception? Error) Send(JsPubMsg msg)
{
if (!_registered)
return (0, new InvalidOperationException("queue is unregistered"));
return _queue.Push(msg);
}
public void Unregister()
{
_registered = false;
_queue.Unregister();
}
internal JsPubMsg[]? Pop() => _queue.Pop();
}

View File

@@ -227,7 +227,7 @@ public sealed class JsStreamPubMsg
/// A JetStream publish message with sync tracking.
/// Mirrors <c>jsPubMsg</c> in server/stream.go.
/// </summary>
public sealed class JsPubMsg
public sealed partial class JsPubMsg
{
public string Subject { get; set; } = string.Empty;
public string? Reply { get; set; }
@@ -245,7 +245,7 @@ public sealed class JsPubMsg
/// An inbound message to be processed by the JetStream layer.
/// Mirrors <c>inMsg</c> in server/stream.go.
/// </summary>
public sealed class InMsg
public sealed partial class InMsg
{
public string Subject { get; set; } = string.Empty;
public string? Reply { get; set; }
@@ -277,7 +277,7 @@ public sealed class InMsg
/// A cached/clustered message for replication.
/// Mirrors <c>cMsg</c> in server/stream.go.
/// </summary>
public sealed class CMsg
public sealed partial class CMsg
{
public string Subject { get; set; } = string.Empty;
public byte[]? Msg { get; set; }