diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs new file mode 100644 index 0000000..8f8313a --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs @@ -0,0 +1,298 @@ +using System.Numerics; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal sealed class DedupeEntry + { + public required string Id { get; init; } + public ulong Seq { get; set; } + public long TimestampNanos { get; set; } + } + + private readonly object _ddLock = new(); + private Dictionary? _ddMap; + private List? _ddArr; + private int _ddIndex; + private Timer? _ddTimer; + + internal void Unsubscribe(Subscription? sub) + { + if (sub == null) + return; + + _mu.EnterReadLock(); + try + { + if (_closed) + return; + } + finally + { + _mu.ExitReadLock(); + } + + sub.Close(); + } + + internal Exception? SetupStore(FileStoreConfig? fileStoreConfig) + { + _mu.EnterWriteLock(); + try + { + if (Store != null) + { + RegisterStoreCallbacks(Store); + return null; + } + + var streamConfig = Config.Clone(); + IStreamStore store = streamConfig.Storage switch + { + StorageType.MemoryStorage => new JetStreamMemStore(streamConfig), + StorageType.FileStorage => BuildFileStore(fileStoreConfig, streamConfig), + _ => throw new InvalidOperationException($"unsupported storage type: {streamConfig.Storage}"), + }; + + Store = store; + RegisterStoreCallbacks(store); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private IStreamStore BuildFileStore(FileStoreConfig? fileStoreConfig, StreamConfig streamConfig) + { + var cfg = fileStoreConfig ?? FileStoreConfig(); + if (string.IsNullOrWhiteSpace(cfg.StoreDir)) + cfg.StoreDir = FileStoreConfig().StoreDir; + + var fsi = new FileStreamInfo + { + Created = Created, + Config = streamConfig, + }; + return new JetStreamFileStore(cfg, fsi); + } + + private void RegisterStoreCallbacks(IStreamStore store) + { + store.RegisterStorageUpdates(StoreUpdates); + store.RegisterStorageRemoveMsg(_ => { }); + store.RegisterProcessJetStreamMsg(_ => { }); + } + + internal void StoreUpdates(long msgs, long bytes, ulong seq, string subj) + { + _ = seq; + _ = subj; + Interlocked.Add(ref Msgs, msgs); + Interlocked.Add(ref Bytes, bytes); + } + + internal int NumMsgIds() + { + lock (_ddLock) + { + return _ddMap?.Count ?? 0; + } + } + + internal DedupeEntry? CheckMsgId(string id) + { + if (string.IsNullOrEmpty(id)) + return null; + + if (_ddMap == null || _ddMap.Count == 0) + return null; + + return _ddMap.GetValueOrDefault(id); + } + + internal void PurgeMsgIds() + { + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var window = (long)Config.Duplicates.TotalMilliseconds * 1_000_000L; + if (window <= 0) + { + lock (_ddLock) + { + _ddMap = null; + _ddArr = null; + _ddIndex = 0; + _ddTimer?.Dispose(); + _ddTimer = null; + } + return; + } + + lock (_ddLock) + { + if (_ddArr == null || _ddMap == null) + return; + + for (var i = _ddIndex; i < _ddArr.Count; i++) + { + var entry = _ddArr[i]; + if (now - entry.TimestampNanos >= window) + _ddMap.Remove(entry.Id); + else + { + _ddIndex = i; + break; + } + } + + if (_ddMap.Count == 0) + { + _ddMap = null; + _ddArr = null; + _ddIndex = 0; + _ddTimer?.Dispose(); + _ddTimer = null; + } + else + { + _ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan); + _ddTimer.Change(Config.Duplicates, Timeout.InfiniteTimeSpan); + } + } + } + + internal void StoreMsgId(DedupeEntry entry) + { + ArgumentNullException.ThrowIfNull(entry); + lock (_ddLock) + { + StoreMsgIdLocked(entry); + } + } + + internal void StoreMsgIdLocked(DedupeEntry entry) + { + ArgumentNullException.ThrowIfNull(entry); + if (Config.Duplicates <= TimeSpan.Zero) + return; + + _ddMap ??= new Dictionary(StringComparer.Ordinal); + _ddArr ??= new List(); + _ddMap[entry.Id] = entry; + _ddArr.Add(entry); + + _ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan); + } + + internal static string GetMsgId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetExpectedLastMsgId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastMsgId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetExpectedStream(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedStream, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (ulong Seq, bool Exists) GetExpectedLastSeq(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var seq = ServerUtilities.ParseInt64(value.Value.Span); + return seq < 0 ? (0, false) : ((ulong)seq, true); + } + + internal static string GetRollup(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgRollup, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value).ToLowerInvariant(); + } + + internal static (ulong Seq, bool Exists) GetExpectedLastSeqPerSubject(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSubjSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var seq = ServerUtilities.ParseInt64(value.Value.Span); + return seq < 0 ? (0, false) : ((ulong)seq, true); + } + + internal static string GetExpectedLastSeqPerSubjectForSubject(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastSubjSeqSubj, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (long Ttl, Exception? Error) ParseMessageTTL(string ttl) => + JetStreamHeaderHelpers.ParseMessageTtl(ttl); + + internal static (BigInteger? Value, bool Ok) GetMessageIncr(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (null, true); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsMessageIncr, hdr); + if (value is null || value.Value.Length == 0) + return (null, true); + + if (BigInteger.TryParse(Encoding.ASCII.GetString(value.Value.Span), out var parsed)) + return (parsed, true); + + return (null, false); + } + + internal static (DateTime Schedule, bool Ok) GetMessageSchedule(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (default, true); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsSchedulePattern, hdr); + if (value is null || value.Value.Length == 0) + return (default, true); + + var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var pattern = Encoding.ASCII.GetString(value.Value.Span); + var (schedule, _, ok) = Internal.MsgScheduling.ParseMsgSchedule(pattern, ts); + return (schedule, ok); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs new file mode 100644 index 0000000..d456ce3 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs @@ -0,0 +1,79 @@ +using System.Numerics; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamMessageHeadersTests +{ + [Fact] + public void ParseMessageTTL_NeverValue_ReturnsMinusOne() + { + var (ttl, error) = NatsStream.ParseMessageTTL("never"); + + error.ShouldBeNull(); + ttl.ShouldBe(-1); + } + + [Fact] + public void GetMessageIncr_ValidHeader_ReturnsParsedValue() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMessageIncr, "42"); + + var (value, ok) = NatsStream.GetMessageIncr(hdr); + + ok.ShouldBeTrue(); + value.ShouldBe(new BigInteger(42)); + } + + [Fact] + public void GetMessageSchedule_InvalidPattern_ReturnsNotOk() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsSchedulePattern, "invalid"); + + var (schedule, ok) = NatsStream.GetMessageSchedule(hdr); + + ok.ShouldBeFalse(); + schedule.ShouldBe(default); + } + + [Fact] + public void SetupStore_MemoryStorage_CreatesMemStore() + { + var stream = CreateStream(duplicates: TimeSpan.FromSeconds(1)); + + var error = stream.SetupStore(null); + + error.ShouldBeNull(); + stream.Store.ShouldNotBeNull(); + stream.Store!.Type().ShouldBe(StorageType.MemoryStorage); + } + + [Fact] + public void StoreMsgIdAndPurge_ExpiredEntry_RemovesEntry() + { + var stream = CreateStream(duplicates: TimeSpan.FromMilliseconds(50)); + var oldTs = (DateTimeOffset.UtcNow - TimeSpan.FromSeconds(1)).ToUnixTimeMilliseconds() * 1_000_000L; + stream.StoreMsgId(new NatsStream.DedupeEntry { Id = "id-1", Seq = 1, TimestampNanos = oldTs }); + + stream.NumMsgIds().ShouldBe(1); + stream.CheckMsgId("id-1").ShouldNotBeNull(); + + stream.PurgeMsgIds(); + + stream.NumMsgIds().ShouldBe(0); + stream.CheckMsgId("id-1").ShouldBeNull(); + } + + private static NatsStream CreateStream(TimeSpan duplicates) + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Duplicates = duplicates, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 5652a7f..3d4b277 100644 Binary files a/porting.db and b/porting.db differ