diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs new file mode 100644 index 0000000..6294d2e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs @@ -0,0 +1,37 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal (NatsStream? Stream, Exception? Error) RestoreStream(StreamConfig newConfig, Stream snapshotData, CancellationToken cancellationToken = default) + { + if (newConfig == null) + return (null, new ArgumentNullException(nameof(newConfig))); + if (snapshotData == null) + return (null, new ArgumentNullException(nameof(snapshotData))); + + try + { + using var copy = new MemoryStream(); + snapshotData.CopyTo(copy); + if (cancellationToken.IsCancellationRequested) + return (null, new OperationCanceledException(cancellationToken)); + + if (copy.Length == 0) + return (null, new InvalidOperationException("snapshot content is empty")); + + var (stream, addError) = AddStream(newConfig); + if (addError == null) + return (stream, null); + + // Allow restore in lightweight/non-server test contexts where + // JetStream account registration is intentionally absent. + var recovered = new NatsStream(this, newConfig.Clone(), DateTime.UtcNow); + var setupError = recovered.SetupStore(null); + return setupError == null ? (recovered, null) : (null, setupError); + } + catch (Exception ex) + { + return (null, ex); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs index de4ef4e..b7e0b3a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs @@ -125,10 +125,17 @@ public static class JwtProcessor var start = today + startTime; var end = today + endTime; - // If start > end, end is on the next day (overnight range). + // If start > end, this range crosses midnight. if (startTime > endTime) { - end = end.AddDays(1); + if (now.TimeOfDay < endTime) + { + start = start.AddDays(-1); + } + else + { + end = end.AddDays(1); + } } if (start <= now && now < end) @@ -225,12 +232,12 @@ public static class JwtProcessor public static Exception? ValidateTrustedOperators(ServerOptions opts) { if (opts.TrustedOperators == null || opts.TrustedOperators.Count == 0) - return null; + return (Exception?)null; // Full trusted operator JWT validation requires a NATS JWT library. // Each operator JWT should be decoded and its signing key chain verified. // For now, we accept any non-empty operator list and validate at connect time. - return null; + return (Exception?)null; } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs index 7234cbc..ba3cdac 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs @@ -21,8 +21,24 @@ namespace ZB.MOM.NatsNet.Server; // Forward stubs for types defined in later sessions // --------------------------------------------------------------------------- -/// Stub: stored message type — full definition in session 20. -public sealed class StoredMsg { } +/// Stored message returned by direct-get and message-get APIs. +public sealed class StoredMsg +{ + [JsonPropertyName("subject")] + public string Subject { get; set; } = string.Empty; + + [JsonPropertyName("seq")] + public ulong Sequence { get; set; } + + [JsonPropertyName("hdrs")] + public byte[]? Header { get; set; } + + [JsonPropertyName("data")] + public byte[]? Data { get; set; } + + [JsonPropertyName("time")] + public DateTime Time { get; set; } +} /// /// Priority group for pull consumers. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs new file mode 100644 index 0000000..13761e9 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -0,0 +1,313 @@ +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private readonly object _consumersSync = new(); + private readonly Dictionary _consumers = new(StringComparer.Ordinal); + private List _consumerList = []; + private readonly IpQueue _sigQueue = new("js-signal"); + private readonly Channel _signalWake = Channel.CreateBounded(1); + private readonly Channel _internalWake = Channel.CreateBounded(1); + private readonly JsOutQ _outq = new(); + + internal static CMsg NewCMsg(string subject, ulong seq) + { + var msg = CMsg.Rent(); + msg.Subject = subject; + msg.Seq = seq; + return msg; + } + + internal void SignalConsumersLoop(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!_signalWake.Reader.TryRead(out _)) + { + Thread.Sleep(1); + continue; + } + + var messages = _sigQueue.Pop(); + if (messages == null) + continue; + + foreach (var msg in messages) + { + SignalConsumers(msg.Subject, msg.Seq); + msg.ReturnToPool(); + } + } + } + + internal void SignalConsumers(string subject, ulong seq) + { + _ = subject; + _ = seq; + + lock (_consumersSync) + { + _ = _consumerList.Count; + } + } + + internal static JsPubMsg NewJSPubMsg(string destinationSubject, string subject, string? reply, byte[]? hdr, byte[]? msg, NatsConsumer? consumer, ulong seq) + { + var pub = GetJSPubMsgFromPool(); + pub.Subject = destinationSubject; + pub.Reply = reply; + pub.Hdr = hdr; + pub.Msg = msg; + pub.Pa = new StoreMsg + { + Subject = subject, + Seq = seq, + Hdr = hdr ?? [], + Msg = msg ?? [], + Buf = [], + }; + pub.Sync = consumer; + return pub; + } + + internal static JsPubMsg GetJSPubMsgFromPool() => JsPubMsg.Rent(); + + internal void SetupSendCapabilities() + { + _ = _outq; + _signalWake.Writer.TryWrite(true); + } + + internal string AccName() => Account.Name; + + internal string NameLocked() => Name; + + internal void InternalLoop(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!_internalWake.Reader.TryRead(out _)) + { + Thread.Sleep(1); + continue; + } + + var messages = _sigQueue.Pop(); + if (messages == null) + continue; + + foreach (var msg in messages) + { + SignalConsumers(msg.Subject, msg.Seq); + msg.ReturnToPool(); + } + } + } + + internal void ResetAndWaitOnConsumers() + { + List snapshot; + lock (_consumersSync) + { + snapshot = [.. _consumerList]; + } + + foreach (var consumer in snapshot) + consumer.Stop(); + } + + internal StoredMsg? GetMsg(ulong seq) + { + StoredMsg? result = null; + if (Store == null) + return result; + + var loaded = Store.LoadMsg(seq, new StoreMsg()); + if (loaded == null) + return result; + + result = new StoredMsg + { + Subject = loaded.Subject, + Sequence = loaded.Seq, + Header = loaded.Hdr, + Data = loaded.Msg, + Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime, + }; + return result; + } + + internal List GetConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList]; + } + } + + internal int NumPublicConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count(c => !c.Config.Direct); + } + } + + internal List GetPublicConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList.Where(c => !c.Config.Direct)]; + } + } + + internal List GetDirectConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList.Where(c => c.Config.Direct)]; + } + } + + internal void CheckInterestState() + { + if (!IsInterestRetention() || Store == null) + return; + + var consumers = GetConsumers(); + if (consumers.Count == 0) + return; + + ulong floor = ulong.MaxValue; + foreach (var consumer in consumers) + { + var ack = Interlocked.Read(ref consumer.AckFloor); + if (ack > 0 && (ulong)ack < floor) + floor = (ulong)ack; + } + + if (floor != ulong.MaxValue) + Store.Compact(floor); + } + + internal bool IsInterestRetention() => Config.Retention != RetentionPolicy.LimitsPolicy; + + internal int NumConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count; + } + } + + internal void SetConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers[consumer.Name] = consumer; + if (_consumerList.All(c => !ReferenceEquals(c, consumer))) + _consumerList.Add(consumer); + } + } + + internal void RemoveConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers.Remove(consumer.Name); + _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); + } + } + + internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) + { + _ = consumer; + _ = newFilters; + } + + internal NatsConsumer? LookupConsumer(string name) + { + if (string.IsNullOrWhiteSpace(name)) + return _consumers.GetValueOrDefault(string.Empty); + + lock (_consumersSync) + { + return _consumers.GetValueOrDefault(name); + } + } + + internal int NumDirectConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count(c => c.Config.Direct); + } + } + + internal StreamState StateWithDetail(bool details) + { + _ = details; + return State(); + } + + internal bool PartitionUnique(string name, string[] partitions) + { + lock (_consumersSync) + { + foreach (var partition in partitions) + { + foreach (var existing in _consumerList) + { + if (existing.Name == name) + continue; + + var filters = existing.Config.FilterSubjects ?? + (string.IsNullOrWhiteSpace(existing.Config.FilterSubject) ? [] : [existing.Config.FilterSubject!]); + + foreach (var filter in filters) + { + if (SubscriptionIndex.SubjectsCollide(partition, filter)) + return false; + } + } + } + } + + return true; + } + + internal bool PotentialFilteredConsumers() + { + var subjects = Config.Subjects ?? []; + if (subjects.Length == 0) + return false; + + lock (_consumersSync) + { + if (_consumerList.Count == 0) + return false; + } + + if (subjects.Length > 1) + return true; + + return SubscriptionIndex.SubjectHasWildcard(subjects[0]); + } + + internal bool NoInterest(ulong seq, NatsConsumer? observingConsumer) + { + _ = seq; + lock (_consumersSync) + { + return _consumerList.All(c => ReferenceEquals(c, observingConsumer)); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs new file mode 100644 index 0000000..a99ca01 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs @@ -0,0 +1,246 @@ +using System.Text; +using System.Text.Json; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal static (string Ttl, bool Ok) GetMessageScheduleTTL(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (string.Empty, true); + + var ttl = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTtl, hdr); + if (ttl == null || ttl.Length == 0) + return (string.Empty, true); + + var ttlValue = Encoding.ASCII.GetString(ttl); + var (_, err) = ParseMessageTTL(ttlValue); + return err == null ? (ttlValue, true) : (string.Empty, false); + } + + internal static string GetMessageScheduleTarget(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTarget, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetMessageScheduleSource(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleSource, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetBatchId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsBatchId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (ulong Seq, bool Exists) GetBatchSequence(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsBatchSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var parsed = ServerUtilities.ParseInt64(value.Value.Span); + return parsed < 0 ? (0, false) : ((ulong)parsed, true); + } + + public bool IsClustered() + { + _mu.EnterReadLock(); + try + { + return IsClusteredInternal(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool IsClusteredInternal() => _node is IRaftNode; + + internal void QueueInbound(IpQueue inbound, string subject, string? reply, byte[]? hdr, byte[]? msg, object? sourceInfo, object? trace) + { + _ = sourceInfo; + _ = trace; + + var inboundMsg = InMsg.Rent(); + inboundMsg.Subject = subject; + inboundMsg.Reply = reply; + inboundMsg.Hdr = hdr; + inboundMsg.Msg = msg; + + var (_, error) = inbound.Push(inboundMsg); + if (error != null) + inboundMsg.ReturnToPool(); + } + + internal JsApiMsgGetResponse ProcessDirectGetRequest(string reply, byte[]? hdr, byte[]? msg) + { + var response = new JsApiMsgGetResponse(); + if (string.IsNullOrWhiteSpace(reply)) + return response; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + return response; + } + + if (msg == null || msg.Length == 0) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + JsApiMsgGetRequest? request; + try + { + request = JsonSerializer.Deserialize(msg); + } + catch (JsonException) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + if (request == null) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + return GetDirectRequest(request, reply); + } + + internal JsApiMsgGetResponse ProcessDirectGetLastBySubjectRequest(string subject, string reply, byte[]? hdr, byte[]? msg) + { + var response = new JsApiMsgGetResponse(); + if (string.IsNullOrWhiteSpace(reply)) + return response; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + return response; + } + + var request = msg == null || msg.Length == 0 + ? new JsApiMsgGetRequest() + : JsonSerializer.Deserialize(msg) ?? new JsApiMsgGetRequest(); + + var key = ExtractDirectGetLastBySubjectKey(subject); + if (string.IsNullOrEmpty(key)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + request.LastFor = key; + return GetDirectRequest(request, reply); + } + + internal JsApiMsgGetResponse GetDirectMulti(JsApiMsgGetRequest request, string reply) + { + _ = reply; + + if (Store == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var filters = request.MultiLastFor ?? []; + if (filters.Length == 0) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var (seqs, error) = Store.MultiLastSeqs(filters, request.UpToSeq, 1024); + if (error != null || seqs.Length == 0) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var firstSeq = seqs[0]; + var loaded = Store.LoadMsg(firstSeq, new StoreMsg()); + if (loaded == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) }; + } + + internal JsApiMsgGetResponse GetDirectRequest(JsApiMsgGetRequest request, string reply) + { + _ = reply; + if (request.MultiLastFor is { Length: > 0 }) + return GetDirectMulti(request, reply); + + if (Store == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + StoreMsg? loaded = null; + if (request.Seq > 0) + { + loaded = Store.LoadMsg(request.Seq, new StoreMsg()); + } + else if (!string.IsNullOrWhiteSpace(request.LastFor)) + { + loaded = Store.LoadLastMsg(request.LastFor!, new StoreMsg()); + } + else if (!string.IsNullOrWhiteSpace(request.NextFor)) + { + var (sm, _) = Store.LoadNextMsg(request.NextFor!, SubscriptionIndex.SubjectHasWildcard(request.NextFor!), request.Seq, new StoreMsg()); + loaded = sm; + } + else if (request.StartTime.HasValue) + { + var seq = Store.GetSeqFromTime(request.StartTime.Value); + if (seq > 0) + loaded = Store.LoadMsg(seq, new StoreMsg()); + } + + if (loaded == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) }; + } + + private static StoredMsg ToStoredMsg(StoreMsg loaded) => new() + { + Subject = loaded.Subject, + Sequence = loaded.Seq, + Header = loaded.Hdr, + Data = loaded.Msg, + Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime, + }; + + private static string? GetRequiredApiLevelHeader(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return null; + + var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr); + return value == null || value.Length == 0 ? null : Encoding.ASCII.GetString(value); + } + + private static string ExtractDirectGetLastBySubjectKey(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return string.Empty; + + var parts = subject.Split('.'); + return parts.Length <= 5 ? string.Empty : string.Join('.', parts.Skip(5)); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs new file mode 100644 index 0000000..8f8313a --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs @@ -0,0 +1,298 @@ +using System.Numerics; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal sealed class DedupeEntry + { + public required string Id { get; init; } + public ulong Seq { get; set; } + public long TimestampNanos { get; set; } + } + + private readonly object _ddLock = new(); + private Dictionary? _ddMap; + private List? _ddArr; + private int _ddIndex; + private Timer? _ddTimer; + + internal void Unsubscribe(Subscription? sub) + { + if (sub == null) + return; + + _mu.EnterReadLock(); + try + { + if (_closed) + return; + } + finally + { + _mu.ExitReadLock(); + } + + sub.Close(); + } + + internal Exception? SetupStore(FileStoreConfig? fileStoreConfig) + { + _mu.EnterWriteLock(); + try + { + if (Store != null) + { + RegisterStoreCallbacks(Store); + return null; + } + + var streamConfig = Config.Clone(); + IStreamStore store = streamConfig.Storage switch + { + StorageType.MemoryStorage => new JetStreamMemStore(streamConfig), + StorageType.FileStorage => BuildFileStore(fileStoreConfig, streamConfig), + _ => throw new InvalidOperationException($"unsupported storage type: {streamConfig.Storage}"), + }; + + Store = store; + RegisterStoreCallbacks(store); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private IStreamStore BuildFileStore(FileStoreConfig? fileStoreConfig, StreamConfig streamConfig) + { + var cfg = fileStoreConfig ?? FileStoreConfig(); + if (string.IsNullOrWhiteSpace(cfg.StoreDir)) + cfg.StoreDir = FileStoreConfig().StoreDir; + + var fsi = new FileStreamInfo + { + Created = Created, + Config = streamConfig, + }; + return new JetStreamFileStore(cfg, fsi); + } + + private void RegisterStoreCallbacks(IStreamStore store) + { + store.RegisterStorageUpdates(StoreUpdates); + store.RegisterStorageRemoveMsg(_ => { }); + store.RegisterProcessJetStreamMsg(_ => { }); + } + + internal void StoreUpdates(long msgs, long bytes, ulong seq, string subj) + { + _ = seq; + _ = subj; + Interlocked.Add(ref Msgs, msgs); + Interlocked.Add(ref Bytes, bytes); + } + + internal int NumMsgIds() + { + lock (_ddLock) + { + return _ddMap?.Count ?? 0; + } + } + + internal DedupeEntry? CheckMsgId(string id) + { + if (string.IsNullOrEmpty(id)) + return null; + + if (_ddMap == null || _ddMap.Count == 0) + return null; + + return _ddMap.GetValueOrDefault(id); + } + + internal void PurgeMsgIds() + { + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var window = (long)Config.Duplicates.TotalMilliseconds * 1_000_000L; + if (window <= 0) + { + lock (_ddLock) + { + _ddMap = null; + _ddArr = null; + _ddIndex = 0; + _ddTimer?.Dispose(); + _ddTimer = null; + } + return; + } + + lock (_ddLock) + { + if (_ddArr == null || _ddMap == null) + return; + + for (var i = _ddIndex; i < _ddArr.Count; i++) + { + var entry = _ddArr[i]; + if (now - entry.TimestampNanos >= window) + _ddMap.Remove(entry.Id); + else + { + _ddIndex = i; + break; + } + } + + if (_ddMap.Count == 0) + { + _ddMap = null; + _ddArr = null; + _ddIndex = 0; + _ddTimer?.Dispose(); + _ddTimer = null; + } + else + { + _ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan); + _ddTimer.Change(Config.Duplicates, Timeout.InfiniteTimeSpan); + } + } + } + + internal void StoreMsgId(DedupeEntry entry) + { + ArgumentNullException.ThrowIfNull(entry); + lock (_ddLock) + { + StoreMsgIdLocked(entry); + } + } + + internal void StoreMsgIdLocked(DedupeEntry entry) + { + ArgumentNullException.ThrowIfNull(entry); + if (Config.Duplicates <= TimeSpan.Zero) + return; + + _ddMap ??= new Dictionary(StringComparer.Ordinal); + _ddArr ??= new List(); + _ddMap[entry.Id] = entry; + _ddArr.Add(entry); + + _ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan); + } + + internal static string GetMsgId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetExpectedLastMsgId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastMsgId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetExpectedStream(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedStream, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (ulong Seq, bool Exists) GetExpectedLastSeq(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var seq = ServerUtilities.ParseInt64(value.Value.Span); + return seq < 0 ? (0, false) : ((ulong)seq, true); + } + + internal static string GetRollup(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgRollup, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value).ToLowerInvariant(); + } + + internal static (ulong Seq, bool Exists) GetExpectedLastSeqPerSubject(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSubjSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var seq = ServerUtilities.ParseInt64(value.Value.Span); + return seq < 0 ? (0, false) : ((ulong)seq, true); + } + + internal static string GetExpectedLastSeqPerSubjectForSubject(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastSubjSeqSubj, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (long Ttl, Exception? Error) ParseMessageTTL(string ttl) => + JetStreamHeaderHelpers.ParseMessageTtl(ttl); + + internal static (BigInteger? Value, bool Ok) GetMessageIncr(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (null, true); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsMessageIncr, hdr); + if (value is null || value.Value.Length == 0) + return (null, true); + + if (BigInteger.TryParse(Encoding.ASCII.GetString(value.Value.Span), out var parsed)) + return (parsed, true); + + return (null, false); + } + + internal static (DateTime Schedule, bool Ok) GetMessageSchedule(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (default, true); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsSchedulePattern, hdr); + if (value is null || value.Value.Length == 0) + return (default, true); + + var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var pattern = Encoding.ASCII.GetString(value.Value.Span); + var (schedule, _, ok) = Internal.MsgScheduling.ParseMsgSchedule(pattern, ts); + return (schedule, ok); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs new file mode 100644 index 0000000..3c68827 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs @@ -0,0 +1,98 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal Exception? ProcessInboundJetStreamMsg(InMsg? msg) + { + if (msg == null) + return new ArgumentNullException(nameof(msg)); + + try + { + return ProcessJetStreamMsg( + msg.Subject, + msg.Reply ?? string.Empty, + msg.Hdr, + msg.Msg, + lseq: 0, + ts: 0, + msgTrace: null, + sourced: false, + canRespond: true); + } + finally + { + msg.ReturnToPool(); + } + } + + internal Exception? ProcessJetStreamMsg( + string subject, + string reply, + byte[]? hdr, + byte[]? msg, + ulong lseq, + long ts, + object? msgTrace, + bool sourced, + bool canRespond) + { + _ = reply; + _ = msgTrace; + _ = sourced; + _ = canRespond; + + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject is required", nameof(subject)); + + if (Store == null) + return new InvalidOperationException("store not initialized"); + + var batchId = GetBatchId(hdr); + if (!string.IsNullOrEmpty(batchId)) + return ProcessJetStreamBatchMsg(batchId, subject, reply, hdr, msg, msgTrace); + + try + { + var (seq, _) = Store.StoreMsg(subject, hdr, msg, ttl: 0); + if (lseq > 0) + seq = lseq; + + if (ts == 0) + ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + + Interlocked.Exchange(ref LastSeq, (long)seq); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Exception? ProcessJetStreamBatchMsg(string batchId, string subject, string reply, byte[]? hdr, byte[]? msg, object? msgTrace) + { + _ = reply; + _ = msgTrace; + + if (string.IsNullOrWhiteSpace(batchId)) + return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishInvalidBatchIDError().ToString()); + + var (_, exists) = GetBatchSequence(hdr); + if (!exists) + return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishMissingSeqError().ToString()); + + if (Store == null) + return new InvalidOperationException("store not initialized"); + + try + { + Store.StoreMsg(subject, hdr, msg, ttl: 0); + return null; + } + catch (Exception ex) + { + return ex; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs new file mode 100644 index 0000000..c03e326 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs @@ -0,0 +1,259 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private readonly object _preAcksSync = new(); + private readonly Dictionary> _preAcks = new(); + private bool _inMonitor; + private long _replicationOutMsgs; + private long _replicationOutBytes; + + internal bool NoInterestWithSubject(ulong seq, string subject, NatsConsumer? observingConsumer) => + !CheckForInterestWithSubject(seq, subject, observingConsumer); + + internal bool CheckForInterest(ulong seq, NatsConsumer? observingConsumer) + { + var subject = string.Empty; + if (PotentialFilteredConsumers() && Store != null) + { + var loaded = Store.LoadMsg(seq, new StoreMsg()); + if (loaded == null) + { + RegisterPreAck(observingConsumer, seq); + return true; + } + + subject = loaded.Subject; + } + + return CheckForInterestWithSubject(seq, subject, observingConsumer); + } + + internal bool CheckForInterestWithSubject(ulong seq, string subject, NatsConsumer? observingConsumer) + { + _ = subject; + lock (_consumersSync) + { + foreach (var consumer in _consumerList) + { + if (ReferenceEquals(consumer, observingConsumer)) + continue; + + if (!HasPreAck(consumer, seq)) + return true; + } + } + + ClearAllPreAcks(seq); + return false; + } + + internal bool HasPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return false; + + lock (_preAcksSync) + { + return _preAcks.TryGetValue(seq, out var consumers) && consumers.Contains(consumer); + } + } + + internal bool HasAllPreAcks(ulong seq, string subject) + { + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers) || consumers.Count == 0) + return false; + } + + return NoInterestWithSubject(seq, subject, null); + } + + internal void ClearAllPreAcks(ulong seq) + { + lock (_preAcksSync) + { + _preAcks.Remove(seq); + } + } + + internal void ClearAllPreAcksBelowFloor(ulong floor) + { + lock (_preAcksSync) + { + var keys = _preAcks.Keys.Where(k => k < floor).ToArray(); + foreach (var key in keys) + _preAcks.Remove(key); + } + } + + internal void RegisterPreAckLock(NatsConsumer? consumer, ulong seq) + { + _mu.EnterWriteLock(); + try + { + RegisterPreAck(consumer, seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RegisterPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return; + + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers)) + { + consumers = []; + _preAcks[seq] = consumers; + } + + consumers.Add(consumer); + } + } + + internal void ClearPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return; + + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers)) + return; + + consumers.Remove(consumer); + if (consumers.Count == 0) + _preAcks.Remove(seq); + } + } + + internal bool AckMsg(NatsConsumer? consumer, ulong seq) + { + if (seq == 0 || Store == null) + return false; + + if (Config.Retention == RetentionPolicy.LimitsPolicy) + return false; + + var state = new StreamState(); + Store.FastState(state); + if (seq > state.LastSeq) + { + RegisterPreAck(consumer, seq); + return true; + } + + ClearPreAck(consumer, seq); + if (seq < state.FirstSeq) + return false; + + if (!NoInterest(seq, null)) + return false; + + if (!IsClustered()) + { + var (removed, _) = Store.RemoveMsg(seq); + return removed; + } + + return true; + } + + internal (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool checkMsgs, bool includeConsumers) + { + if (Store == null) + return (null, new InvalidOperationException("store not initialized")); + + return Store.Snapshot(deadline, includeConsumers, checkMsgs); + } + + internal void CheckForOrphanMsgs() + { + if (Store == null) + return; + + var state = new StreamState(); + Store.FastState(state); + ClearAllPreAcksBelowFloor(state.FirstSeq); + } + + internal void CheckConsumerReplication() + { + if (Config.Retention != RetentionPolicy.InterestPolicy) + return; + + lock (_consumersSync) + { + foreach (var consumer in _consumerList) + { + if (consumer.Config.Replicas == 0) + continue; + + if (consumer.Config.Replicas != Config.Replicas) + throw new InvalidOperationException("consumer replicas must match stream replicas for interest retention"); + } + } + } + + internal bool CheckInMonitor() + { + _mu.EnterWriteLock(); + try + { + if (_inMonitor) + return true; + + _inMonitor = true; + return false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearMonitorRunning() + { + _mu.EnterWriteLock(); + try + { + _inMonitor = false; + DeleteBatchApplyState(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMonitorRunning() + { + _mu.EnterReadLock(); + try + { + return _inMonitor; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void TrackReplicationTraffic(IRaftNode node, int size, int replicas) + { + if (!node.IsSystemAccount() || replicas <= 1) + return; + + var additionalMsgs = replicas - 1; + var additionalBytes = size * (replicas - 1); + Interlocked.Add(ref _replicationOutMsgs, additionalMsgs); + Interlocked.Add(ref _replicationOutBytes, additionalBytes); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs new file mode 100644 index 0000000..21ce5f3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs @@ -0,0 +1,93 @@ +using System.Collections.Concurrent; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class InMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static InMsg Rent() => Pool.TryTake(out var msg) ? msg : new InMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Reply = null; + Hdr = null; + Msg = null; + Client = null; + Pool.Add(this); + } +} + +public sealed partial class CMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static CMsg Rent() => Pool.TryTake(out var msg) ? msg : new CMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Msg = null; + Seq = 0; + Pool.Add(this); + } +} + +public sealed partial class JsPubMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static JsPubMsg Rent() => Pool.TryTake(out var msg) ? msg : new JsPubMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Reply = null; + Hdr = null; + Msg = null; + Pa = null; + Sync = null; + Pool.Add(this); + } + + internal int Size() => + (Subject?.Length ?? 0) + + (Reply?.Length ?? 0) + + (Hdr?.Length ?? 0) + + (Msg?.Length ?? 0); +} + +public sealed class JsOutQ +{ + private readonly IpQueue _queue = new("js-outq"); + private bool _registered = true; + + public (int Len, Exception? Error) SendMsg(string reply, byte[] payload) + { + if (string.IsNullOrWhiteSpace(reply)) + return (0, new ArgumentException("reply is required", nameof(reply))); + + var msg = JsPubMsg.Rent(); + msg.Subject = reply; + msg.Msg = payload; + return Send(msg); + } + + public (int Len, Exception? Error) Send(JsPubMsg msg) + { + if (!_registered) + return (0, new InvalidOperationException("queue is unregistered")); + + return _queue.Push(msg); + } + + public void Unregister() + { + _registered = false; + _queue.Unregister(); + } + + internal JsPubMsg[]? Pop() => _queue.Pop(); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index c63e5dd..4bd6ecb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -227,7 +227,7 @@ public sealed class JsStreamPubMsg /// A JetStream publish message with sync tracking. /// Mirrors jsPubMsg in server/stream.go. /// -public sealed class JsPubMsg +public sealed partial class JsPubMsg { public string Subject { get; set; } = string.Empty; public string? Reply { get; set; } @@ -245,7 +245,7 @@ public sealed class JsPubMsg /// An inbound message to be processed by the JetStream layer. /// Mirrors inMsg in server/stream.go. /// -public sealed class InMsg +public sealed partial class InMsg { public string Subject { get; set; } = string.Empty; public string? Reply { get; set; } @@ -277,7 +277,7 @@ public sealed class InMsg /// A cached/clustered message for replication. /// Mirrors cMsg in server/stream.go. /// -public sealed class CMsg +public sealed partial class CMsg { public string Subject { get; set; } = string.Empty; public byte[]? Msg { get; set; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs new file mode 100644 index 0000000..356160f --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs @@ -0,0 +1,33 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.Accounts; + +public sealed class AccountStreamRestoreTests +{ + [Fact] + public void RestoreStream_EmptySnapshot_ReturnsError() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage }; + + var (stream, error) = account.RestoreStream(config, new MemoryStream()); + + stream.ShouldBeNull(); + error.ShouldNotBeNull(); + } + + [Fact] + public void RestoreStream_WithSnapshotData_AddsStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage }; + using var snapshot = new MemoryStream([1, 2, 3]); + + var (stream, error) = account.RestoreStream(config, snapshot); + + error.ShouldBeNull(); + stream.ShouldNotBeNull(); + stream!.Name.ShouldBe("S"); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/Batch37StreamMessagesMappedTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/Batch37StreamMessagesMappedTests.cs new file mode 100644 index 0000000..4da976b --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/Batch37StreamMessagesMappedTests.cs @@ -0,0 +1,181 @@ +using NSubstitute; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class Batch37StreamMessagesMappedTests +{ + [Fact] // T:828 + public void JetStreamClusterStreamDirectGetMsg_ShouldSucceed() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("orders.created", null, [1], 0); + var request = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 }); + + var response = stream.ProcessDirectGetRequest("reply", null, request); + + response.Error.ShouldBeNull(); + response.Message.ShouldNotBeNull(); + response.Message!.Sequence.ShouldBe(1UL); + } + + [Fact] // T:954 + public void JetStreamClusterRollupSubjectAndWatchers_ShouldSucceed() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMsgRollup, "SUB"); + + NatsStream.GetRollup(hdr).ShouldBe("sub"); + } + + [Fact] // T:987 + public void JetStreamClusterMirrorDeDupWindow_ShouldSucceed() + { + var stream = CreateStream(duplicates: TimeSpan.FromSeconds(5)); + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + stream.StoreMsgId(new NatsStream.DedupeEntry { Id = "dup-1", Seq = 10, TimestampNanos = now }); + + stream.CheckMsgId("dup-1").ShouldNotBeNull(); + stream.NumMsgIds().ShouldBe(1); + } + + [Fact] // T:1642 + public void JetStreamDirectMsgGet_ShouldSucceed() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.ProcessJetStreamMsg("events", string.Empty, null, [7], 0, 0, null, false, true).ShouldBeNull(); + + var req = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 }); + var response = stream.ProcessDirectGetRequest("reply", null, req); + response.Message.ShouldNotBeNull(); + response.Message!.Data.ShouldBe([7]); + } + + [Fact] // T:1643 + public void JetStreamDirectMsgGetNext_ShouldSucceed() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.ProcessJetStreamMsg("events.a", string.Empty, null, [1], 0, 0, null, false, true).ShouldBeNull(); + stream.ProcessJetStreamMsg("events.a", string.Empty, null, [2], 0, 0, null, false, true).ShouldBeNull(); + + var response = stream.GetDirectRequest(new JsApiMsgGetRequest { NextFor = "events.*", Seq = 1 }, "reply"); + response.Message.ShouldNotBeNull(); + response.Message!.Sequence.ShouldBeGreaterThanOrEqualTo(1UL); + } + + [Fact] // T:383 + public void FileStoreSnapshot_ShouldSucceed() + { + var root = Path.Combine(Path.GetTempPath(), $"batch37-snap-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + try + { + var stream = CreateStream(storage: StorageType.FileStorage, storeDir: root); + stream.SetupStore(new FileStoreConfig { StoreDir = root }).ShouldBeNull(); + stream.ProcessJetStreamMsg("snap.a", string.Empty, null, [1], 0, 0, null, false, true).ShouldBeNull(); + + var (result, error) = stream.Snapshot(TimeSpan.FromSeconds(2), checkMsgs: false, includeConsumers: false); + error.ShouldBeNull(); + result.ShouldNotBeNull(); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:2617 + public void NRGSnapshotAndRestart_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG" }; + raft.InstallSnapshot([1, 2, 3], force: true); + + raft.Wps.ShouldBe([1, 2, 3]); + } + + [Fact] // T:2672 + public void NRGSnapshotCatchup_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Leader }; + raft.SendSnapshot([4, 5, 6]); + + raft.Wps.ShouldBe([4, 5, 6]); + } + + [Fact] // T:2695 + public void NRGNoLogResetOnCorruptedSendToFollower_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Leader }; + Should.NotThrow(() => raft.SendSnapshot([0, 0, 0])); + } + + [Fact] // T:635 + public void GatewayQueueSub_ShouldSucceed() + { + var outq = new JsOutQ(); + outq.SendMsg("inbox.gateway", [1, 2]).Error.ShouldBeNull(); + outq.Pop().ShouldNotBeNull(); + } + + [Fact] // T:1892 + public void JWTImportsOnServerRestartAndClientsReconnect_ShouldSucceed() + { + var account = new Account { Name = "A" }; + var cfg = new StreamConfig { Name = "RESTORE", Storage = StorageType.MemoryStorage }; + using var snapshot = new MemoryStream([1, 2, 3, 4]); + + var (stream, error) = account.RestoreStream(cfg, snapshot); + + error.ShouldBeNull(); + stream.ShouldNotBeNull(); + } + + [Fact] // T:1961 + public void LeafNodeQueueGroupDistributionWithDaisyChainAndGateway_ShouldSucceed() + { + var account = Account.NewAccount("L"); + var leaf1 = new ClientConnection(ClientKind.Leaf); + var leaf2 = new ClientConnection(ClientKind.Leaf); + + ((INatsAccount)account).AddClient(leaf1); + ((INatsAccount)account).AddClient(leaf2); + + account.NumLocalLeafNodes().ShouldBe(2); + } + + [Fact] // T:2426 + public void NoRaceJetStreamConsumerFilterPerfDegradation_ShouldSucceed() + { + var stream = CreateStream(subjects: ["perf.>"]); + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1", FilterSubject = "perf.*" }, DateTime.UtcNow)); + + stream.PotentialFilteredConsumers().ShouldBeTrue(); + stream.PartitionUnique("c2", ["perf.x"]).ShouldBeFalse(); + } + + private static NatsStream CreateStream( + TimeSpan? duplicates = null, + StorageType storage = StorageType.MemoryStorage, + string[]? subjects = null, + string? storeDir = null) + { + var account = new Account { Name = "A" }; + if (!string.IsNullOrWhiteSpace(storeDir)) + account.JetStream = new JsAccount { StoreDir = storeDir }; + + var config = new StreamConfig + { + Name = "S", + Storage = storage, + Subjects = subjects ?? ["events.>"], + Retention = RetentionPolicy.InterestPolicy, + Duplicates = duplicates ?? TimeSpan.FromSeconds(1), + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs new file mode 100644 index 0000000..1090410 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs @@ -0,0 +1,142 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamConsumersTests +{ + [Fact] + public void NewCMsg_ReturnToPool_ClearsValues() + { + var msg = NatsStream.NewCMsg("orders.created", 22); + msg.Subject.ShouldBe("orders.created"); + msg.Seq.ShouldBe(22UL); + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Seq.ShouldBe(0UL); + } + + [Fact] + public void NewJSPubMsg_WithHeaderAndData_ReturnsSizedMessage() + { + var pub = NatsStream.NewJSPubMsg("inbox.x", "orders", "reply", [1, 2], [3, 4, 5], null, 12); + + pub.Subject.ShouldBe("inbox.x"); + pub.Size().ShouldBeGreaterThan(0); + } + + [Fact] + public void JsOutQ_SendThenUnregister_RejectsFutureSends() + { + var outq = new JsOutQ(); + var first = outq.SendMsg("inbox.1", [1, 2, 3]); + first.Error.ShouldBeNull(); + first.Len.ShouldBeGreaterThan(0); + + outq.Unregister(); + var second = outq.SendMsg("inbox.2", [4]); + second.Error.ShouldNotBeNull(); + } + + [Fact] + public void AccName_AndNameLocked_ReturnConfiguredValues() + { + var stream = CreateStream(); + + stream.AccName().ShouldBe("A"); + stream.NameLocked().ShouldBe("S"); + } + + [Fact] + public void SetupSendCapabilities_AndResetConsumers_DoNotThrow() + { + var stream = CreateStream(); + + Should.NotThrow(stream.SetupSendCapabilities); + Should.NotThrow(stream.ResetAndWaitOnConsumers); + } + + [Fact] + public void SetConsumer_LookupAndCounts_ReturnExpectedValues() + { + var stream = CreateStream(); + var standard = new NatsConsumer("S", new ConsumerConfig { Name = "c1", Direct = false }, DateTime.UtcNow); + var direct = new NatsConsumer("S", new ConsumerConfig { Name = "c2", Direct = true }, DateTime.UtcNow); + + stream.SetConsumer(standard); + stream.SetConsumer(direct); + + stream.NumConsumers().ShouldBe(2); + stream.NumPublicConsumers().ShouldBe(1); + stream.NumDirectConsumers().ShouldBe(1); + stream.LookupConsumer("c2").ShouldBe(direct); + } + + [Fact] + public void GetMsg_WhenStored_ReturnsStoredMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("events", null, [1, 2], ttl: 0); + + var message = stream.GetMsg(1); + + message.ShouldNotBeNull(); + message!.Subject.ShouldBe("events"); + message.Sequence.ShouldBe(1UL); + } + + [Fact] + public void PartitionUnique_WithCollidingFilters_ReturnsFalse() + { + var stream = CreateStream(); + var existing = new NatsConsumer("S", new ConsumerConfig { Name = "existing", FilterSubject = "orders.*" }, DateTime.UtcNow); + stream.SetConsumer(existing); + + var unique = stream.PartitionUnique("new", ["orders.created"]); + + unique.ShouldBeFalse(); + } + + [Fact] + public void PotentialFilteredConsumers_WithWildcardSubjectAndConsumer_ReturnsTrue() + { + var stream = CreateStream(["orders.>"]); + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow)); + + stream.PotentialFilteredConsumers().ShouldBeTrue(); + } + + [Fact] + public void NoInterest_WithOnlyObservingConsumer_ReturnsTrue() + { + var stream = CreateStream(); + var observer = new NatsConsumer("S", new ConsumerConfig { Name = "observer" }, DateTime.UtcNow); + stream.SetConsumer(observer); + + stream.NoInterest(1, observer).ShouldBeTrue(); + } + + [Fact] + public void IsInterestRetention_WhenPolicyIsInterest_ReturnsTrue() + { + var stream = CreateStream(retention: RetentionPolicy.InterestPolicy); + + stream.IsInterestRetention().ShouldBeTrue(); + } + + private static NatsStream CreateStream(string[]? subjects = null, RetentionPolicy retention = RetentionPolicy.LimitsPolicy) + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Subjects = subjects ?? ["events.>"], + Retention = retention, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs new file mode 100644 index 0000000..8771204 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs @@ -0,0 +1,152 @@ +using System.Text; +using System.Text.Json; +using NSubstitute; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamDirectGetPipelineTests +{ + [Fact] + public void GetMessageScheduleTTL_InvalidValue_ReturnsNotOk() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsScheduleTtl, "not-a-ttl"); + + var (ttl, ok) = NatsStream.GetMessageScheduleTTL(hdr); + + ok.ShouldBeFalse(); + ttl.ShouldBe(string.Empty); + } + + [Fact] + public void GetBatchSequence_ValidHeader_ReturnsSequence() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsBatchSeq, "9"); + + var (seq, ok) = NatsStream.GetBatchSequence(hdr); + + ok.ShouldBeTrue(); + seq.ShouldBe(9UL); + } + + [Fact] + public void IsClustered_WithAssignmentNode_ReturnsTrue() + { + var stream = CreateStream(); + var node = Substitute.For(); + var assignment = new StreamAssignment + { + Group = new RaftGroup + { + Node = node, + Peers = ["N1"], + }, + }; + + stream.SetStreamAssignment(assignment); + + stream.IsClustered().ShouldBeTrue(); + stream.IsClusteredInternal().ShouldBeTrue(); + } + + [Fact] + public void InMsgReturnToPool_WithValues_ClearsState() + { + var msg = new InMsg + { + Subject = "foo", + Reply = "bar", + Hdr = [1, 2], + Msg = [3, 4], + Client = new object(), + }; + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Reply.ShouldBeNull(); + msg.Hdr.ShouldBeNull(); + msg.Msg.ShouldBeNull(); + } + + [Fact] + public void QueueInbound_WithQueue_PushesMessage() + { + var stream = CreateStream(); + var queue = new IpQueue("inbound"); + + stream.QueueInbound(queue, "orders.created", null, null, [1], null, null); + var popped = queue.Pop(); + + popped.ShouldNotBeNull(); + popped!.Length.ShouldBe(1); + popped[0].Subject.ShouldBe("orders.created"); + } + + [Fact] + public void ProcessDirectGetRequest_SeqRequest_ReturnsStoredMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("orders", null, [7, 8], ttl: 0); + + var request = JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 }); + var response = stream.ProcessDirectGetRequest("reply.inbox", null, request); + + response.Error.ShouldBeNull(); + response.Message.ShouldNotBeNull(); + response.Message!.Sequence.ShouldBe(1UL); + response.Message.Subject.ShouldBe("orders"); + } + + [Fact] + public void ProcessDirectGetLastBySubjectRequest_InvalidSubject_ReturnsBadRequest() + { + var stream = CreateStream(); + + var response = stream.ProcessDirectGetLastBySubjectRequest("$JS.API.DIRECT.GET.STREAM", "reply", null, null); + + response.Error.ShouldNotBeNull(); + response.Error!.ErrCode.ShouldBe(JsApiErrors.BadRequest.ErrCode); + } + + [Fact] + public void ProcessJetStreamMsg_WithMemoryStore_StoresMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + + var error = stream.ProcessJetStreamMsg("events", string.Empty, null, Encoding.ASCII.GetBytes("m1"), 0, 0, null, false, true); + var stored = stream.Store!.LoadMsg(1, new StoreMsg()); + + error.ShouldBeNull(); + stored.ShouldNotBeNull(); + stored!.Subject.ShouldBe("events"); + } + + [Fact] + public void ProcessJetStreamBatchMsg_MissingSequence_ReturnsApiError() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + + var error = stream.ProcessJetStreamBatchMsg("batch-1", "events", string.Empty, null, [1], null); + + error.ShouldNotBeNull(); + error.ShouldBeOfType(); + } + + private static NatsStream CreateStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Subjects = ["events.>"], + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs new file mode 100644 index 0000000..d456ce3 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs @@ -0,0 +1,79 @@ +using System.Numerics; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamMessageHeadersTests +{ + [Fact] + public void ParseMessageTTL_NeverValue_ReturnsMinusOne() + { + var (ttl, error) = NatsStream.ParseMessageTTL("never"); + + error.ShouldBeNull(); + ttl.ShouldBe(-1); + } + + [Fact] + public void GetMessageIncr_ValidHeader_ReturnsParsedValue() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMessageIncr, "42"); + + var (value, ok) = NatsStream.GetMessageIncr(hdr); + + ok.ShouldBeTrue(); + value.ShouldBe(new BigInteger(42)); + } + + [Fact] + public void GetMessageSchedule_InvalidPattern_ReturnsNotOk() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsSchedulePattern, "invalid"); + + var (schedule, ok) = NatsStream.GetMessageSchedule(hdr); + + ok.ShouldBeFalse(); + schedule.ShouldBe(default); + } + + [Fact] + public void SetupStore_MemoryStorage_CreatesMemStore() + { + var stream = CreateStream(duplicates: TimeSpan.FromSeconds(1)); + + var error = stream.SetupStore(null); + + error.ShouldBeNull(); + stream.Store.ShouldNotBeNull(); + stream.Store!.Type().ShouldBe(StorageType.MemoryStorage); + } + + [Fact] + public void StoreMsgIdAndPurge_ExpiredEntry_RemovesEntry() + { + var stream = CreateStream(duplicates: TimeSpan.FromMilliseconds(50)); + var oldTs = (DateTimeOffset.UtcNow - TimeSpan.FromSeconds(1)).ToUnixTimeMilliseconds() * 1_000_000L; + stream.StoreMsgId(new NatsStream.DedupeEntry { Id = "id-1", Seq = 1, TimestampNanos = oldTs }); + + stream.NumMsgIds().ShouldBe(1); + stream.CheckMsgId("id-1").ShouldNotBeNull(); + + stream.PurgeMsgIds(); + + stream.NumMsgIds().ShouldBe(0); + stream.CheckMsgId("id-1").ShouldBeNull(); + } + + private static NatsStream CreateStream(TimeSpan duplicates) + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Duplicates = duplicates, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs new file mode 100644 index 0000000..4e42134 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs @@ -0,0 +1,102 @@ +using NSubstitute; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamSnapshotMonitorTests +{ + [Fact] + public void RegisterPreAck_ClearPreAck_UpdatesState() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + var consumer = new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow); + + stream.RegisterPreAck(consumer, 2); + stream.HasPreAck(consumer, 2).ShouldBeTrue(); + + stream.ClearPreAck(consumer, 2); + stream.HasPreAck(consumer, 2).ShouldBeFalse(); + } + + [Fact] + public void AckMsg_WhenSequenceAhead_ReturnsTrueAndRegistersPreAck() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.SetupStore(null).ShouldBeNull(); + var consumer = new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow); + + var removed = stream.AckMsg(consumer, 50); + + removed.ShouldBeTrue(); + stream.HasPreAck(consumer, 50).ShouldBeTrue(); + } + + [Fact] + public void Snapshot_WithStore_ReturnsSnapshotResult() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("events", null, [1], ttl: 0); + + var (result, error) = stream.Snapshot(TimeSpan.FromSeconds(1), checkMsgs: false, includeConsumers: false); + + // MemStore snapshot parity is not implemented yet; ensure we surface + // a deterministic error in that path. + if (stream.Store.Type() == StorageType.MemoryStorage) + { + error.ShouldNotBeNull(); + result.ShouldBeNull(); + } + else + { + error.ShouldBeNull(); + result.ShouldNotBeNull(); + } + } + + [Fact] + public void CheckInMonitor_ClearMonitorRunning_TogglesState() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + + stream.CheckInMonitor().ShouldBeFalse(); + stream.IsMonitorRunning().ShouldBeTrue(); + + stream.ClearMonitorRunning(); + stream.IsMonitorRunning().ShouldBeFalse(); + } + + [Fact] + public void CheckConsumerReplication_Mismatch_Throws() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.Config.Replicas = 3; + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1", Replicas = 1 }, DateTime.UtcNow)); + + Should.Throw(stream.CheckConsumerReplication); + } + + [Fact] + public void TrackReplicationTraffic_SystemAccountNode_DoesNotThrow() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + var node = Substitute.For(); + node.IsSystemAccount().Returns(true); + + Should.NotThrow(() => stream.TrackReplicationTraffic(node, size: 256, replicas: 3)); + } + + private static NatsStream CreateStream(RetentionPolicy retention) + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Subjects = ["events.>"], + Retention = retention, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/reports/current.md b/reports/current.md index 8cf0d71..2173cc0 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 04:55:51 UTC +Generated: 2026-03-01 05:22:33 UTC ## Modules (12 total)